import datetime import json import time import os import pytz import redis from elasticsearch import Elasticsearch from tzlocal import get_localzone # Configuration via environment variables # ELASTICSEARCH_URL: full URL to Elasticsearch (e.g. http://tpot-host:64298) # REDIS_HOST: Redis hostname (default: localhost) es_url = os.getenv("ELASTICSEARCH_URL", "http://localhost:9200") redis_ip = os.getenv("REDIS_HOST", "localhost") redis_channel = 'attack-map-production' version = 'Data Server 3.0.0' local_tz = get_localzone() output_text = os.getenv("TPOT_ATTACKMAP_TEXT", "ENABLED").upper() es = Elasticsearch(es_url) # Track disconnection state for reconnection messages was_disconnected_es = False was_disconnected_redis = False # Global Redis client for persistent connection redis_client = None event_count = 1 # Color Codes for Attack Map service_rgb = { 'CHARGEN': '#4CAF50', 'FTP-DATA': '#F44336', 'FTP': '#FF5722', 'SSH': '#FF9800', 'TELNET': '#FFC107', 'SMTP': '#8BC34A', 'WINS': '#009688', 'DNS': '#00BCD4', 'DHCP': '#03A9F4', 'TFTP': '#2196F3', 'HTTP': '#3F51B5', 'DICOM': '#9C27B0', 'POP3': '#E91E63', 'NTP': '#795548', 'RPC': '#607D8B', 'IMAP': '#9E9E9E', 'SNMP': '#FF6B35', 'LDAP': '#FF8E53', 'HTTPS': '#0080FF', 'SMB': '#BF00FF', 'SMTPS': '#80FF00', 'EMAIL': '#00FF80', 'IPMI': '#00FFFF', 'IPP': '#8000FF', 'IMAPS': '#FF0080', 'POP3S': '#80FF80', 'NFS': '#FF8080', 'SOCKS': '#8080FF', 'SQL': '#00FF00', 'ORACLE': '#FFFF00', 'PPTP': '#FF00FF', 'MQTT': '#00FF40', 'SSDP': '#40FF00', 'IEC104': '#FF4000', 'HL7': '#4000FF', 'MYSQL': '#00FF00', 'RDP': '#FF0060', 'IPSEC': '#60FF00', 'SIP': '#FFCCFF', 'POSTGRESQL': '#00CCFF', 'ADB': '#FFCCCC', 'VNC': '#0000FF', 'REDIS': '#CC00FF', 'IRC': '#FFCC00', 'JETDIRECT': '#8000FF', 'ELASTICSEARCH': '#FF8000', 'INDUSTRIAL': '#80FF40', 'MEMCACHED': '#40FF80', 'MONGODB': '#FF4080', 'SCADA': '#8040FF', 'OTHER': '#78909C' } # Port to Protocol Mapping PORT_MAP = { 19: "CHARGEN", 20: "FTP-DATA", 21: "FTP", 22: "SSH", 2222: "SSH", 23: "TELNET", 2223: "TELNET", 25: "SMTP", 42: "WINS", 53: "DNS", 67: "DHCP", 69: "TFTP", 80: "HTTP", 81: "HTTP", 104: "DICOM", 110: "POP3", 123: "NTP", 135: "RPC", 143: "IMAP", 161: "SNMP", 389: "LDAP", 443: "HTTPS", 445: "SMB", 465: "SMTPS", 587: "EMAIL", 623: "IPMI", 631: "IPP", 993: "IMAPS", 995: "POP3S", 1025: "NFS", 1080: "SOCKS", 1433: "SQL", 1521: "ORACLE", 1723: "PPTP", 1883: "MQTT", 1900: "SSDP", 2404: "IEC104", 2575: "HL7", 3306: "MYSQL", 3389: "RDP", 5000: "IPSEC", 5060: "SIP", 5061: "SIP", 5432: "POSTGRESQL", 5555: "ADB", 5900: "VNC", 6379: "REDIS", 6667: "IRC", 8080: "HTTP", 8888: "HTTP", 8443: "HTTPS", 9100: "JETDIRECT", 9200: "ELASTICSEARCH", 10001: "INDUSTRIAL", 11112: "DICOM", 11211: "MEMCACHED", 27017: "MONGODB", 50100: "SCADA" } def connect_redis(redis_ip): global redis_client try: if redis_client: redis_client.ping() return redis_client except Exception: pass redis_client = redis.StrictRedis(host=redis_ip, port=6379, db=0) return redis_client def push_honeypot_stats(honeypot_stats): redis_instance = connect_redis(redis_ip) tmp = json.dumps(honeypot_stats) redis_instance.publish(redis_channel, tmp) def get_honeypot_stats(timedelta): ES_query_stats = { "bool": { "must": [], "filter": [ { "terms": { "type.keyword": [ "Adbhoney", "Beelzebub", "Ciscoasa", "CitrixHoneypot", "ConPot", "Cowrie", "Ddospot", "Dicompot", "Dionaea", "ElasticPot", "Endlessh", "Galah", "Glutton", "Go-pot", "H0neytr4p", "Hellpot", "Heralding", "Honeyaml", "Honeytrap", "Honeypots", "Log4pot", "Ipphoney", "Mailoney", "Medpot", "Miniprint", "Redishoneypot", "Sentrypeer", "Tanner", "Wordpot" ] } }, { "range": { "@timestamp": { "format": "strict_date_optional_time", "gte": "now-" + timedelta, "lte": "now" } } }, { "exists": { "field": "geoip.ip" } } ] } } return ES_query_stats def update_honeypot_data(): global was_disconnected_es, was_disconnected_redis processed_data = [] last = {"1m", "1h", "24h"} mydelta = 10 time_last_request = datetime.datetime.now(datetime.UTC) - datetime.timedelta(seconds=mydelta) last_stats_time = datetime.datetime.now(datetime.UTC) - datetime.timedelta(seconds=10) while True: now = datetime.datetime.now(datetime.UTC) if (now - last_stats_time).total_seconds() >= 10: last_stats_time = now honeypot_stats = {} for i in last: try: es_honeypot_stats = es.search(index="logstash-*", aggs={}, size=0, track_total_hits=True, query=get_honeypot_stats(i)) honeypot_stats.update({"last_"+i: es_honeypot_stats['hits']['total']['value']}) except Exception: pass honeypot_stats.update({"type": "Stats"}) push_honeypot_stats(honeypot_stats) mylast_dt = time_last_request.replace(tzinfo=None) mynow_dt = (datetime.datetime.now(datetime.UTC) - datetime.timedelta(seconds=mydelta)).replace(tzinfo=None) mylast = str(mylast_dt).split(" ") mynow = str(mynow_dt).split(" ") ES_query = { "bool": { "must": [ { "query_string": { "query": ( "type:(Adbhoney OR Beelzebub OR Ciscoasa OR CitrixHoneypot OR ConPot OR Cowrie " "OR Ddospot OR Dicompot OR Dionaea OR ElasticPot OR Endlessh OR Galah OR Glutton OR Go-pot OR H0neytr4p " "OR Hellpot OR Heralding OR Honeyaml OR Honeypots OR Honeytrap OR Ipphoney OR Log4pot OR Mailoney " "OR Medpot OR Miniprint OR Redishoneypot OR Sentrypeer OR Tanner OR Wordpot)" ) } } ], "filter": [ { "range": { "@timestamp": { "gte": mylast[0] + "T" + mylast[1], "lte": mynow[0] + "T" + mynow[1] } } } ] } } res = es.search(index="logstash-*", size=100, query=ES_query) hits = res['hits'] if len(hits['hits']) != 0: time_last_request = datetime.datetime.now(datetime.UTC) - datetime.timedelta(seconds=mydelta) for hit in hits['hits']: try: process_datas = process_data(hit) if process_datas is not None: processed_data.append(process_datas) except Exception: pass if len(processed_data) != 0: push(processed_data) processed_data = [] time.sleep(0.5) def process_data(hit): alert = {} alert["honeypot"] = hit["_source"]["type"] alert["country"] = hit["_source"]["geoip"].get("country_name", "") alert["country_code"] = hit["_source"]["geoip"].get("country_code2", "") alert["continent_code"] = hit["_source"]["geoip"].get("continent_code", "") alert["dst_lat"] = hit["_source"]["geoip_ext"]["latitude"] alert["dst_long"] = hit["_source"]["geoip_ext"]["longitude"] alert["dst_ip"] = hit["_source"]["geoip_ext"]["ip"] alert["dst_iso_code"] = hit["_source"]["geoip_ext"].get("country_code2", "") alert["dst_country_name"] = hit["_source"]["geoip_ext"].get("country_name", "") alert["tpot_hostname"] = hit["_source"]["t-pot_hostname"] try: dt = datetime.datetime.fromisoformat(hit["_source"]["@timestamp"]) alert["event_time"] = dt.strftime("%Y-%m-%d %H:%M:%S") except Exception: alert["event_time"] = str(hit["_source"]["@timestamp"][0:10]) + " " + str(hit["_source"]["@timestamp"][11:19]) alert["iso_code"] = hit["_source"]["geoip"]["country_code2"] alert["latitude"] = hit["_source"]["geoip"]["latitude"] alert["longitude"] = hit["_source"]["geoip"]["longitude"] alert["dst_port"] = hit["_source"]["dest_port"] alert["protocol"] = port_to_type(hit["_source"]["dest_port"]) alert["src_ip"] = hit["_source"]["src_ip"] try: alert["src_port"] = hit["_source"]["src_port"] except Exception: alert["src_port"] = 0 try: alert["ip_rep"] = hit["_source"]["ip_rep"] except Exception: alert["ip_rep"] = "reputation unknown" if not alert["src_ip"] == "": try: alert["color"] = service_rgb[alert["protocol"].upper()] except Exception: alert["color"] = service_rgb["OTHER"] return alert else: print("SRC IP EMPTY") return None def port_to_type(port): try: return PORT_MAP.get(int(port), "OTHER") except Exception: return "OTHER" def push(alerts): global event_count redis_instance = connect_redis(redis_ip) for alert in alerts: if output_text == "ENABLED": my_time = datetime.datetime.strptime(alert["event_time"], "%Y-%m-%d %H:%M:%S") my_time = my_time.replace(tzinfo=pytz.UTC) local_event_time = my_time.astimezone(local_tz) local_event_time = local_event_time.strftime("%Y-%m-%d %H:%M:%S") table_data = [ [local_event_time, alert["country"], alert["src_ip"], alert["ip_rep"].title(), alert["protocol"], alert["honeypot"], alert["tpot_hostname"]] ] min_widths = [19, 20, 15, 18, 10, 14, 14] for row in table_data: formatted_line = " | ".join( "{:<{width}}".format(str(value), width=min_widths[i]) for i, value in enumerate(row)) print(formatted_line) json_data = { "protocol": alert["protocol"], "color": alert["color"], "iso_code": alert["iso_code"], "honeypot": alert["honeypot"], "src_port": alert["src_port"], "event_time": alert["event_time"], "src_lat": alert["latitude"], "src_ip": alert["src_ip"], "ip_rep": alert["ip_rep"].title(), "type": "Traffic", "dst_long": alert["dst_long"], "continent_code": alert["continent_code"], "dst_lat": alert["dst_lat"], "event_count": event_count, "country": alert["country"], "src_long": alert["longitude"], "dst_port": alert["dst_port"], "dst_ip": alert["dst_ip"], "dst_iso_code": alert["dst_iso_code"], "dst_country_name": alert["dst_country_name"], "tpot_hostname": alert["tpot_hostname"] } event_count += 1 tmp = json.dumps(json_data) redis_instance.publish(redis_channel, tmp) def check_connections(): """Check both Elasticsearch and Redis connections on startup.""" print(f"[*] Connecting to Elasticsearch at {es_url}") print(f"[*] Connecting to Redis at {redis_ip}:6379") es_ready = False redis_ready = False es_waiting_printed = False redis_waiting_printed = False while not (es_ready and redis_ready): if not es_ready: try: es.info() print("[*] Elasticsearch connection established") es_ready = True except Exception as e: if not es_waiting_printed: print(f"[...] Waiting for Elasticsearch... (Error: {type(e).__name__})") es_waiting_printed = True if not redis_ready: try: r = redis.StrictRedis(host=redis_ip, port=6379, db=0) r.ping() print("[*] Redis connection established") redis_ready = True except Exception as e: if not redis_waiting_printed: print(f"[...] Waiting for Redis... (Error: {type(e).__name__})") redis_waiting_printed = True if not (es_ready and redis_ready): time.sleep(5) return True if __name__ == '__main__': print(version) check_connections() print("[*] Starting data server...\n") try: while True: try: update_honeypot_data() except Exception as e: error_type = type(e).__name__ error_msg = str(e) if "6379" in error_msg or "Redis" in error_msg or "redis" in error_msg.lower(): if not was_disconnected_redis: print(f"[ ] Connection lost to Redis ({error_type}), retrying...") was_disconnected_redis = True elif "Connection" in error_type or "urllib3" in error_msg or "elastic" in error_msg.lower(): if not was_disconnected_es: print(f"[ ] Connection lost to Elasticsearch ({error_type}), retrying...") was_disconnected_es = True else: print(f"[ ] Error: {error_type}: {error_msg}") if not was_disconnected_redis: try: r = connect_redis(redis_ip) r.ping() except Exception: print("[ ] Connection lost to Redis (Check), retrying...") was_disconnected_redis = True if not was_disconnected_es: try: es.info() except Exception: print("[ ] Connection lost to Elasticsearch (Check), retrying...") was_disconnected_es = True time.sleep(5) if was_disconnected_es: try: es.info() print("[*] Elasticsearch connection re-established") was_disconnected_es = False except Exception: pass if was_disconnected_redis: try: r = connect_redis(redis_ip) r.ping() print("[*] Redis connection re-established") was_disconnected_redis = False except Exception: pass except KeyboardInterrupt: print('\nSHUTTING DOWN') exit()