Merge branch 'dev' into feat/randomized-server-header

This commit is contained in:
Patrick Di Fazio
2025-12-30 00:02:44 +01:00
committed by GitHub
12 changed files with 259 additions and 65 deletions

View File

@@ -3,6 +3,8 @@
import os
from dataclasses import dataclass
from typing import Optional, Tuple
from zoneinfo import ZoneInfo
import time
@dataclass
@@ -22,6 +24,40 @@ class Config:
api_server_path: str = "/api/v2/users"
probability_error_codes: int = 0 # Percentage (0-100)
server_header: Optional[str] = None
timezone: str = None # IANA timezone (e.g., 'America/New_York', 'Europe/Rome')
@staticmethod
# Try to fetch timezone before if not set
def get_system_timezone() -> str:
"""Get the system's default timezone"""
try:
if os.path.islink('/etc/localtime'):
tz_path = os.readlink('/etc/localtime')
if 'zoneinfo/' in tz_path:
return tz_path.split('zoneinfo/')[-1]
local_tz = time.tzname[time.daylight]
if local_tz and local_tz != 'UTC':
return local_tz
except Exception:
pass
# Default fallback to UTC
return 'UTC'
def get_timezone(self) -> ZoneInfo:
"""Get configured timezone as ZoneInfo object"""
if self.timezone:
try:
return ZoneInfo(self.timezone)
except Exception:
pass
system_tz = self.get_system_timezone()
try:
return ZoneInfo(system_tz)
except Exception:
return ZoneInfo('UTC')
@classmethod
def from_env(cls) -> 'Config':
@@ -45,6 +81,8 @@ class Config:
api_server_url=os.getenv('API_SERVER_URL'),
api_server_port=int(os.getenv('API_SERVER_PORT', 8080)),
api_server_path=os.getenv('API_SERVER_PATH', '/api/v2/users'),
probability_error_codes=int(os.getenv('PROBABILITY_ERROR_CODES', 5)),
probability_error_codes=int(os.getenv('PROBABILITY_ERROR_CODES', 0)),
server_header=os.getenv('SERVER_HEADER')
timezone=os.getenv('TIMEZONE') # If not set, will use system timezone
)

View File

@@ -8,6 +8,23 @@ Provides two loggers: app (application) and access (HTTP access logs).
import logging
import os
from logging.handlers import RotatingFileHandler
from typing import Optional
from zoneinfo import ZoneInfo
from datetime import datetime
class TimezoneFormatter(logging.Formatter):
"""Custom formatter that respects configured timezone"""
def __init__(self, fmt=None, datefmt=None, timezone: Optional[ZoneInfo] = None):
super().__init__(fmt, datefmt)
self.timezone = timezone or ZoneInfo('UTC')
def formatTime(self, record, datefmt=None):
"""Override formatTime to use configured timezone"""
dt = datetime.fromtimestamp(record.created, tz=self.timezone)
if datefmt:
return dt.strftime(datefmt)
return dt.isoformat()
class LoggerManager:
@@ -20,23 +37,27 @@ class LoggerManager:
cls._instance._initialized = False
return cls._instance
def initialize(self, log_dir: str = "logs") -> None:
def initialize(self, log_dir: str = "logs", timezone: Optional[ZoneInfo] = None) -> None:
"""
Initialize the logging system with rotating file handlers.
Args:
log_dir: Directory for log files (created if not exists)
timezone: ZoneInfo timezone for log timestamps (defaults to UTC)
"""
if self._initialized:
return
self.timezone = timezone or ZoneInfo('UTC')
# Create log directory if it doesn't exist
os.makedirs(log_dir, exist_ok=True)
# Common format for all loggers
log_format = logging.Formatter(
log_format = TimezoneFormatter(
"[%(asctime)s] %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
datefmt="%Y-%m-%d %H:%M:%S",
timezone=self.timezone
)
# Rotation settings: 1MB max, 5 backups
@@ -83,7 +104,7 @@ class LoggerManager:
self._credential_logger.handlers.clear()
# Credential logger uses a simple format: timestamp|ip|username|password|path
credential_format = logging.Formatter("%(message)s")
credential_format = TimezoneFormatter("%(message)s", timezone=self.timezone)
credential_file_handler = RotatingFileHandler(
os.path.join(log_dir, "credentials.log"),
@@ -136,6 +157,6 @@ def get_credential_logger() -> logging.Logger:
return _logger_manager.credentials
def initialize_logging(log_dir: str = "logs") -> None:
def initialize_logging(log_dir: str = "logs", timezone: Optional[ZoneInfo] = None) -> None:
"""Initialize the logging system."""
_logger_manager.initialize(log_dir)
_logger_manager.initialize(log_dir, timezone)

View File

@@ -33,6 +33,8 @@ def print_usage():
print(' PROBABILITY_ERROR_CODES - Probability (0-100) to return HTTP error codes (default: 0)')
print(' CHAR_SPACE - Characters for random links')
print(' SERVER_HEADER - HTTP Server header for deception (default: Apache/2.2.22 (Ubuntu))')
print(' TIMEZONE - IANA timezone for logs/dashboard (e.g., America/New_York, Europe/Rome)')
print(' If not set, system timezone will be used')
def main():
@@ -41,15 +43,19 @@ def main():
print_usage()
exit(0)
# Initialize logging
initialize_logging()
config = Config.from_env()
# Get timezone configuration
tz = config.get_timezone()
# Initialize logging with timezone
initialize_logging(timezone=tz)
app_logger = get_app_logger()
access_logger = get_access_logger()
credential_logger = get_credential_logger()
config = Config.from_env()
tracker = AccessTracker()
# Initialize tracker with timezone
tracker = AccessTracker(timezone=tz)
Handler.config = config
Handler.tracker = tracker
@@ -71,6 +77,7 @@ def main():
try:
app_logger.info(f'Starting deception server on port {config.port}...')
app_logger.info(f'Timezone configured: {tz.key}')
app_logger.info(f'Dashboard available at: {config.dashboard_secret_path}')
if config.canary_token_url:
app_logger.info(f'Canary token will appear after {config.canary_token_tries} tries')

View File

@@ -5,6 +5,18 @@ Dashboard template for viewing honeypot statistics.
Customize this template to change the dashboard appearance.
"""
from datetime import datetime
def format_timestamp(iso_timestamp: str) -> str:
"""Format ISO timestamp for display (YYYY-MM-DD HH:MM:SS)"""
try:
dt = datetime.fromisoformat(iso_timestamp)
return dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
# Fallback for old format
return iso_timestamp.split("T")[1][:8] if "T" in iso_timestamp else iso_timestamp
def generate_dashboard(stats: dict) -> str:
"""Generate dashboard HTML with access statistics"""
@@ -29,7 +41,7 @@ def generate_dashboard(stats: dict) -> str:
# Generate suspicious accesses rows
suspicious_rows = '\n'.join([
f'<tr><td>{log["ip"]}</td><td>{log["path"]}</td><td style="word-break: break-all;">{log["user_agent"][:60]}</td><td>{log["timestamp"].split("T")[1][:8]}</td></tr>'
f'<tr><td>{log["ip"]}</td><td>{log["path"]}</td><td style="word-break: break-all;">{log["user_agent"][:60]}</td><td>{format_timestamp(log["timestamp"])}</td></tr>'
for log in stats['recent_suspicious'][-10:]
]) or '<tr><td colspan="4" style="text-align:center;">No suspicious activity detected</td></tr>'
@@ -41,13 +53,13 @@ def generate_dashboard(stats: dict) -> str:
# Generate attack types rows
attack_type_rows = '\n'.join([
f'<tr><td>{log["ip"]}</td><td>{log["path"]}</td><td>{", ".join(log["attack_types"])}</td><td style="word-break: break-all;">{log["user_agent"][:60]}</td><td>{log["timestamp"].split("T")[1][:8]}</td></tr>'
f'<tr><td>{log["ip"]}</td><td>{log["path"]}</td><td>{", ".join(log["attack_types"])}</td><td style="word-break: break-all;">{log["user_agent"][:60]}</td><td>{format_timestamp(log["timestamp"])}</td></tr>'
for log in stats.get('attack_types', [])[-10:]
]) or '<tr><td colspan="4" style="text-align:center;">No attacks detected</td></tr>'
# Generate credential attempts rows
credential_rows = '\n'.join([
f'<tr><td>{log["ip"]}</td><td>{log["username"]}</td><td>{log["password"]}</td><td>{log["path"]}</td><td>{log["timestamp"].split("T")[1][:8]}</td></tr>'
f'<tr><td>{log["ip"]}</td><td>{log["username"]}</td><td>{log["password"]}</td><td>{log["path"]}</td><td>{format_timestamp(log["timestamp"])}</td></tr>'
for log in stats.get('credential_attempts', [])[-20:]
]) or '<tr><td colspan="5" style="text-align:center;">No credentials captured yet</td></tr>'

View File

@@ -1,20 +1,22 @@
#!/usr/bin/env python3
from typing import Dict, List, Tuple
from typing import Dict, List, Tuple, Optional
from collections import defaultdict
from datetime import datetime
from zoneinfo import ZoneInfo
import re
import urllib.parse
class AccessTracker:
"""Track IP addresses and paths accessed"""
def __init__(self):
def __init__(self, timezone: Optional[ZoneInfo] = None):
self.ip_counts: Dict[str, int] = defaultdict(int)
self.path_counts: Dict[str, int] = defaultdict(int)
self.user_agent_counts: Dict[str, int] = defaultdict(int)
self.access_log: List[Dict] = []
self.credential_attempts: List[Dict] = []
self.timezone = timezone or ZoneInfo('UTC')
self.suspicious_patterns = [
'bot', 'crawler', 'spider', 'scraper', 'curl', 'wget', 'python-requests',
'scanner', 'nikto', 'sqlmap', 'nmap', 'masscan', 'nessus', 'acunetix',
@@ -81,7 +83,7 @@ class AccessTracker:
'path': path,
'username': username,
'password': password,
'timestamp': datetime.now().isoformat()
'timestamp': datetime.now(self.timezone).isoformat()
})
def record_access(self, ip: str, path: str, user_agent: str = '', body: str = ''):
@@ -112,7 +114,7 @@ class AccessTracker:
'suspicious': is_suspicious,
'honeypot_triggered': self.is_honeypot_path(path),
'attack_types':attack_findings,
'timestamp': datetime.now().isoformat()
'timestamp': datetime.now(self.timezone).isoformat()
})
def detect_attack_type(self, data:str) -> list[str]: