Files
honeypot.es/AttackMapServer.py
Malin 1aa164263f feat: initial T-Pot attack map with Spanish UI and Docker support
- Full Spanish interface (all UI text, popups, charts, tables)
- Dark and light mode support
- Disclaimer banner: no data logged, public European service
- Footer: Servicio ofrecido por Cloud Host (cloudhost.es)
- Docker: single container (Redis + DataServer + AttackMapServer)
- Remote T-Pot support via ELASTICSEARCH_URL env var (direct or SSH tunnel)
- Based on telekom-security/t-pot-attack-map (Apache 2.0)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-25 21:23:28 +01:00

111 lines
3.8 KiB
Python

#!/usr/bin/python3
"""
Original code (tornado based) by Matthew May - mcmay.web@gmail.com
Adjusted code for asyncio, aiohttp and redis (asynchronous support) by t3chn0m4g3
Remote T-Pot support by honeypot.es
"""
import asyncio
import os
import redis.asyncio as redis
from aiohttp import web
# Configuration via environment variables
redis_host = os.getenv("REDIS_HOST", "localhost")
redis_url = f'redis://{redis_host}:6379'
web_port = int(os.getenv("WEB_PORT", "8080"))
version = 'Attack Map Server 3.0.0'
async def redis_subscriber(websockets):
was_disconnected = False
while True:
try:
r = redis.Redis.from_url(redis_url)
pubsub = r.pubsub()
channel = "attack-map-production"
await pubsub.subscribe(channel)
if was_disconnected:
print("[*] Redis connection re-established")
was_disconnected = False
while True:
message = await pubsub.get_message(ignore_subscribe_messages=True)
if message:
try:
json_data = message['data'].decode('utf-8')
await asyncio.gather(*[ws.send_str(json_data) for ws in websockets], return_exceptions=True)
except Exception:
print("Something went wrong while sending JSON data.")
else:
await asyncio.sleep(0.1)
except redis.RedisError as e:
print(f"[ ] Connection lost to Redis ({type(e).__name__}), retrying...")
was_disconnected = True
await asyncio.sleep(5)
async def my_websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
request.app['websockets'].append(ws)
print(f"[*] New WebSocket connection opened. Clients active: {len(request.app['websockets'])}")
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
await ws.send_str(msg.data)
elif msg.type == web.WSMsgType.ERROR:
print(f'WebSocket connection closed with exception {ws.exception()}')
request.app['websockets'].remove(ws)
print(f"[-] WebSocket connection closed. Clients active: {len(request.app['websockets'])}")
return ws
async def my_index_handler(request):
return web.FileResponse('static/index.html')
async def start_background_tasks(app):
app['websockets'] = []
app['redis_subscriber'] = asyncio.create_task(redis_subscriber(app['websockets']))
async def cleanup_background_tasks(app):
app['redis_subscriber'].cancel()
await app['redis_subscriber']
async def check_redis_connection():
"""Check Redis connection on startup and wait until available."""
print(f"[*] Checking Redis connection at {redis_url}...")
waiting_printed = False
while True:
try:
r = redis.Redis.from_url(redis_url)
await r.ping()
await r.aclose()
print("[*] Redis connection established")
return True
except Exception as e:
if not waiting_printed:
print(f"[...] Waiting for Redis... (Error: {type(e).__name__})")
waiting_printed = True
await asyncio.sleep(5)
async def make_webapp():
app = web.Application()
app.add_routes([
web.get('/', my_index_handler),
web.get('/websocket', my_websocket_handler),
web.static('/static/', 'static'),
web.static('/images/', 'static/images'),
web.static('/flags/', 'static/flags')
])
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
return app
if __name__ == '__main__':
print(version)
asyncio.run(check_redis_connection())
print(f"[*] Starting web server on port {web_port}...\n")
web.run_app(make_webapp(), port=web_port)