Files
honeypot.es/DataServer.py
Malin 1aa164263f feat: initial T-Pot attack map with Spanish UI and Docker support
- Full Spanish interface (all UI text, popups, charts, tables)
- Dark and light mode support
- Disclaimer banner: no data logged, public European service
- Footer: Servicio ofrecido por Cloud Host (cloudhost.es)
- Docker: single container (Redis + DataServer + AttackMapServer)
- Remote T-Pot support via ELASTICSEARCH_URL env var (direct or SSH tunnel)
- Based on telekom-security/t-pot-attack-map (Apache 2.0)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-25 21:23:28 +01:00

472 lines
15 KiB
Python

import datetime
import json
import time
import os
import pytz
import redis
from elasticsearch import Elasticsearch
from tzlocal import get_localzone
# Configuration via environment variables
# ELASTICSEARCH_URL: full URL to Elasticsearch (e.g. http://tpot-host:64298)
# REDIS_HOST: Redis hostname (default: localhost)
es_url = os.getenv("ELASTICSEARCH_URL", "http://localhost:9200")
redis_ip = os.getenv("REDIS_HOST", "localhost")
redis_channel = 'attack-map-production'
version = 'Data Server 3.0.0'
local_tz = get_localzone()
output_text = os.getenv("TPOT_ATTACKMAP_TEXT", "ENABLED").upper()
es = Elasticsearch(es_url)
# Track disconnection state for reconnection messages
was_disconnected_es = False
was_disconnected_redis = False
# Global Redis client for persistent connection
redis_client = None
event_count = 1
# Color Codes for Attack Map
service_rgb = {
'CHARGEN': '#4CAF50',
'FTP-DATA': '#F44336',
'FTP': '#FF5722',
'SSH': '#FF9800',
'TELNET': '#FFC107',
'SMTP': '#8BC34A',
'WINS': '#009688',
'DNS': '#00BCD4',
'DHCP': '#03A9F4',
'TFTP': '#2196F3',
'HTTP': '#3F51B5',
'DICOM': '#9C27B0',
'POP3': '#E91E63',
'NTP': '#795548',
'RPC': '#607D8B',
'IMAP': '#9E9E9E',
'SNMP': '#FF6B35',
'LDAP': '#FF8E53',
'HTTPS': '#0080FF',
'SMB': '#BF00FF',
'SMTPS': '#80FF00',
'EMAIL': '#00FF80',
'IPMI': '#00FFFF',
'IPP': '#8000FF',
'IMAPS': '#FF0080',
'POP3S': '#80FF80',
'NFS': '#FF8080',
'SOCKS': '#8080FF',
'SQL': '#00FF00',
'ORACLE': '#FFFF00',
'PPTP': '#FF00FF',
'MQTT': '#00FF40',
'SSDP': '#40FF00',
'IEC104': '#FF4000',
'HL7': '#4000FF',
'MYSQL': '#00FF00',
'RDP': '#FF0060',
'IPSEC': '#60FF00',
'SIP': '#FFCCFF',
'POSTGRESQL': '#00CCFF',
'ADB': '#FFCCCC',
'VNC': '#0000FF',
'REDIS': '#CC00FF',
'IRC': '#FFCC00',
'JETDIRECT': '#8000FF',
'ELASTICSEARCH': '#FF8000',
'INDUSTRIAL': '#80FF40',
'MEMCACHED': '#40FF80',
'MONGODB': '#FF4080',
'SCADA': '#8040FF',
'OTHER': '#78909C'
}
# Port to Protocol Mapping
PORT_MAP = {
19: "CHARGEN",
20: "FTP-DATA",
21: "FTP",
22: "SSH",
2222: "SSH",
23: "TELNET",
2223: "TELNET",
25: "SMTP",
42: "WINS",
53: "DNS",
67: "DHCP",
69: "TFTP",
80: "HTTP",
81: "HTTP",
104: "DICOM",
110: "POP3",
123: "NTP",
135: "RPC",
143: "IMAP",
161: "SNMP",
389: "LDAP",
443: "HTTPS",
445: "SMB",
465: "SMTPS",
587: "EMAIL",
623: "IPMI",
631: "IPP",
993: "IMAPS",
995: "POP3S",
1025: "NFS",
1080: "SOCKS",
1433: "SQL",
1521: "ORACLE",
1723: "PPTP",
1883: "MQTT",
1900: "SSDP",
2404: "IEC104",
2575: "HL7",
3306: "MYSQL",
3389: "RDP",
5000: "IPSEC",
5060: "SIP",
5061: "SIP",
5432: "POSTGRESQL",
5555: "ADB",
5900: "VNC",
6379: "REDIS",
6667: "IRC",
8080: "HTTP",
8888: "HTTP",
8443: "HTTPS",
9100: "JETDIRECT",
9200: "ELASTICSEARCH",
10001: "INDUSTRIAL",
11112: "DICOM",
11211: "MEMCACHED",
27017: "MONGODB",
50100: "SCADA"
}
def connect_redis(redis_ip):
global redis_client
try:
if redis_client:
redis_client.ping()
return redis_client
except Exception:
pass
redis_client = redis.StrictRedis(host=redis_ip, port=6379, db=0)
return redis_client
def push_honeypot_stats(honeypot_stats):
redis_instance = connect_redis(redis_ip)
tmp = json.dumps(honeypot_stats)
redis_instance.publish(redis_channel, tmp)
def get_honeypot_stats(timedelta):
ES_query_stats = {
"bool": {
"must": [],
"filter": [
{
"terms": {
"type.keyword": [
"Adbhoney", "Beelzebub", "Ciscoasa", "CitrixHoneypot", "ConPot",
"Cowrie", "Ddospot", "Dicompot", "Dionaea", "ElasticPot",
"Endlessh", "Galah", "Glutton", "Go-pot", "H0neytr4p", "Hellpot", "Heralding",
"Honeyaml", "Honeytrap", "Honeypots", "Log4pot", "Ipphoney", "Mailoney",
"Medpot", "Miniprint", "Redishoneypot", "Sentrypeer", "Tanner", "Wordpot"
]
}
},
{
"range": {
"@timestamp": {
"format": "strict_date_optional_time",
"gte": "now-" + timedelta,
"lte": "now"
}
}
},
{
"exists": {
"field": "geoip.ip"
}
}
]
}
}
return ES_query_stats
def update_honeypot_data():
global was_disconnected_es, was_disconnected_redis
processed_data = []
last = {"1m", "1h", "24h"}
mydelta = 10
time_last_request = datetime.datetime.now(datetime.UTC) - datetime.timedelta(seconds=mydelta)
last_stats_time = datetime.datetime.now(datetime.UTC) - datetime.timedelta(seconds=10)
while True:
now = datetime.datetime.now(datetime.UTC)
if (now - last_stats_time).total_seconds() >= 10:
last_stats_time = now
honeypot_stats = {}
for i in last:
try:
es_honeypot_stats = es.search(index="logstash-*", aggs={}, size=0, track_total_hits=True, query=get_honeypot_stats(i))
honeypot_stats.update({"last_"+i: es_honeypot_stats['hits']['total']['value']})
except Exception:
pass
honeypot_stats.update({"type": "Stats"})
push_honeypot_stats(honeypot_stats)
mylast_dt = time_last_request.replace(tzinfo=None)
mynow_dt = (datetime.datetime.now(datetime.UTC) - datetime.timedelta(seconds=mydelta)).replace(tzinfo=None)
mylast = str(mylast_dt).split(" ")
mynow = str(mynow_dt).split(" ")
ES_query = {
"bool": {
"must": [
{
"query_string": {
"query": (
"type:(Adbhoney OR Beelzebub OR Ciscoasa OR CitrixHoneypot OR ConPot OR Cowrie "
"OR Ddospot OR Dicompot OR Dionaea OR ElasticPot OR Endlessh OR Galah OR Glutton OR Go-pot OR H0neytr4p "
"OR Hellpot OR Heralding OR Honeyaml OR Honeypots OR Honeytrap OR Ipphoney OR Log4pot OR Mailoney "
"OR Medpot OR Miniprint OR Redishoneypot OR Sentrypeer OR Tanner OR Wordpot)"
)
}
}
],
"filter": [
{
"range": {
"@timestamp": {
"gte": mylast[0] + "T" + mylast[1],
"lte": mynow[0] + "T" + mynow[1]
}
}
}
]
}
}
res = es.search(index="logstash-*", size=100, query=ES_query)
hits = res['hits']
if len(hits['hits']) != 0:
time_last_request = datetime.datetime.now(datetime.UTC) - datetime.timedelta(seconds=mydelta)
for hit in hits['hits']:
try:
process_datas = process_data(hit)
if process_datas is not None:
processed_data.append(process_datas)
except Exception:
pass
if len(processed_data) != 0:
push(processed_data)
processed_data = []
time.sleep(0.5)
def process_data(hit):
alert = {}
alert["honeypot"] = hit["_source"]["type"]
alert["country"] = hit["_source"]["geoip"].get("country_name", "")
alert["country_code"] = hit["_source"]["geoip"].get("country_code2", "")
alert["continent_code"] = hit["_source"]["geoip"].get("continent_code", "")
alert["dst_lat"] = hit["_source"]["geoip_ext"]["latitude"]
alert["dst_long"] = hit["_source"]["geoip_ext"]["longitude"]
alert["dst_ip"] = hit["_source"]["geoip_ext"]["ip"]
alert["dst_iso_code"] = hit["_source"]["geoip_ext"].get("country_code2", "")
alert["dst_country_name"] = hit["_source"]["geoip_ext"].get("country_name", "")
alert["tpot_hostname"] = hit["_source"]["t-pot_hostname"]
try:
dt = datetime.datetime.fromisoformat(hit["_source"]["@timestamp"])
alert["event_time"] = dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
alert["event_time"] = str(hit["_source"]["@timestamp"][0:10]) + " " + str(hit["_source"]["@timestamp"][11:19])
alert["iso_code"] = hit["_source"]["geoip"]["country_code2"]
alert["latitude"] = hit["_source"]["geoip"]["latitude"]
alert["longitude"] = hit["_source"]["geoip"]["longitude"]
alert["dst_port"] = hit["_source"]["dest_port"]
alert["protocol"] = port_to_type(hit["_source"]["dest_port"])
alert["src_ip"] = hit["_source"]["src_ip"]
try:
alert["src_port"] = hit["_source"]["src_port"]
except Exception:
alert["src_port"] = 0
try:
alert["ip_rep"] = hit["_source"]["ip_rep"]
except Exception:
alert["ip_rep"] = "reputation unknown"
if not alert["src_ip"] == "":
try:
alert["color"] = service_rgb[alert["protocol"].upper()]
except Exception:
alert["color"] = service_rgb["OTHER"]
return alert
else:
print("SRC IP EMPTY")
return None
def port_to_type(port):
try:
return PORT_MAP.get(int(port), "OTHER")
except Exception:
return "OTHER"
def push(alerts):
global event_count
redis_instance = connect_redis(redis_ip)
for alert in alerts:
if output_text == "ENABLED":
my_time = datetime.datetime.strptime(alert["event_time"], "%Y-%m-%d %H:%M:%S")
my_time = my_time.replace(tzinfo=pytz.UTC)
local_event_time = my_time.astimezone(local_tz)
local_event_time = local_event_time.strftime("%Y-%m-%d %H:%M:%S")
table_data = [
[local_event_time, alert["country"], alert["src_ip"], alert["ip_rep"].title(),
alert["protocol"], alert["honeypot"], alert["tpot_hostname"]]
]
min_widths = [19, 20, 15, 18, 10, 14, 14]
for row in table_data:
formatted_line = " | ".join(
"{:<{width}}".format(str(value), width=min_widths[i]) for i, value in enumerate(row))
print(formatted_line)
json_data = {
"protocol": alert["protocol"],
"color": alert["color"],
"iso_code": alert["iso_code"],
"honeypot": alert["honeypot"],
"src_port": alert["src_port"],
"event_time": alert["event_time"],
"src_lat": alert["latitude"],
"src_ip": alert["src_ip"],
"ip_rep": alert["ip_rep"].title(),
"type": "Traffic",
"dst_long": alert["dst_long"],
"continent_code": alert["continent_code"],
"dst_lat": alert["dst_lat"],
"event_count": event_count,
"country": alert["country"],
"src_long": alert["longitude"],
"dst_port": alert["dst_port"],
"dst_ip": alert["dst_ip"],
"dst_iso_code": alert["dst_iso_code"],
"dst_country_name": alert["dst_country_name"],
"tpot_hostname": alert["tpot_hostname"]
}
event_count += 1
tmp = json.dumps(json_data)
redis_instance.publish(redis_channel, tmp)
def check_connections():
"""Check both Elasticsearch and Redis connections on startup."""
print(f"[*] Connecting to Elasticsearch at {es_url}")
print(f"[*] Connecting to Redis at {redis_ip}:6379")
es_ready = False
redis_ready = False
es_waiting_printed = False
redis_waiting_printed = False
while not (es_ready and redis_ready):
if not es_ready:
try:
es.info()
print("[*] Elasticsearch connection established")
es_ready = True
except Exception as e:
if not es_waiting_printed:
print(f"[...] Waiting for Elasticsearch... (Error: {type(e).__name__})")
es_waiting_printed = True
if not redis_ready:
try:
r = redis.StrictRedis(host=redis_ip, port=6379, db=0)
r.ping()
print("[*] Redis connection established")
redis_ready = True
except Exception as e:
if not redis_waiting_printed:
print(f"[...] Waiting for Redis... (Error: {type(e).__name__})")
redis_waiting_printed = True
if not (es_ready and redis_ready):
time.sleep(5)
return True
if __name__ == '__main__':
print(version)
check_connections()
print("[*] Starting data server...\n")
try:
while True:
try:
update_honeypot_data()
except Exception as e:
error_type = type(e).__name__
error_msg = str(e)
if "6379" in error_msg or "Redis" in error_msg or "redis" in error_msg.lower():
if not was_disconnected_redis:
print(f"[ ] Connection lost to Redis ({error_type}), retrying...")
was_disconnected_redis = True
elif "Connection" in error_type or "urllib3" in error_msg or "elastic" in error_msg.lower():
if not was_disconnected_es:
print(f"[ ] Connection lost to Elasticsearch ({error_type}), retrying...")
was_disconnected_es = True
else:
print(f"[ ] Error: {error_type}: {error_msg}")
if not was_disconnected_redis:
try:
r = connect_redis(redis_ip)
r.ping()
except Exception:
print("[ ] Connection lost to Redis (Check), retrying...")
was_disconnected_redis = True
if not was_disconnected_es:
try:
es.info()
except Exception:
print("[ ] Connection lost to Elasticsearch (Check), retrying...")
was_disconnected_es = True
time.sleep(5)
if was_disconnected_es:
try:
es.info()
print("[*] Elasticsearch connection re-established")
was_disconnected_es = False
except Exception:
pass
if was_disconnected_redis:
try:
r = connect_redis(redis_ip)
r.ping()
print("[*] Redis connection re-established")
was_disconnected_redis = False
except Exception:
pass
except KeyboardInterrupt:
print('\nSHUTTING DOWN')
exit()