Merge branch 'dev' into feat/randomized-server-header

This commit is contained in:
Patrick Di Fazio
2025-12-30 00:14:14 +01:00
committed by GitHub
12 changed files with 1071 additions and 50 deletions

View File

@@ -24,6 +24,9 @@ class Config:
api_server_path: str = "/api/v2/users"
probability_error_codes: int = 0 # Percentage (0-100)
server_header: Optional[str] = None
# Database settings
database_path: str = "data/krawl.db"
database_retention_days: int = 30
timezone: str = None # IANA timezone (e.g., 'America/New_York', 'Europe/Rome')
@staticmethod
@@ -83,6 +86,8 @@ class Config:
api_server_path=os.getenv('API_SERVER_PATH', '/api/v2/users'),
probability_error_codes=int(os.getenv('PROBABILITY_ERROR_CODES', 0)),
server_header=os.getenv('SERVER_HEADER')
database_path=os.getenv('DATABASE_PATH', 'data/krawl.db'),
database_retention_days=int(os.getenv('DATABASE_RETENTION_DAYS', 30)),
timezone=os.getenv('TIMEZONE') # If not set, will use system timezone
)

555
src/database.py Normal file
View File

@@ -0,0 +1,555 @@
#!/usr/bin/env python3
"""
Database singleton module for the Krawl honeypot.
Provides SQLAlchemy session management and database initialization.
"""
import os
import stat
from datetime import datetime
from typing import Optional, List, Dict, Any
from sqlalchemy import create_engine, func, distinct, case
from sqlalchemy.orm import sessionmaker, scoped_session, Session
from models import Base, AccessLog, CredentialAttempt, AttackDetection, IpStats
from sanitizer import (
sanitize_ip,
sanitize_path,
sanitize_user_agent,
sanitize_credential,
sanitize_attack_pattern,
)
class DatabaseManager:
"""
Singleton database manager for the Krawl honeypot.
Handles database initialization, session management, and provides
methods for persisting access logs, credentials, and attack detections.
"""
_instance: Optional["DatabaseManager"] = None
def __new__(cls) -> "DatabaseManager":
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def initialize(self, database_path: str = "data/krawl.db") -> None:
"""
Initialize the database connection and create tables.
Args:
database_path: Path to the SQLite database file
"""
if self._initialized:
return
# Create data directory if it doesn't exist
data_dir = os.path.dirname(database_path)
if data_dir and not os.path.exists(data_dir):
os.makedirs(data_dir, exist_ok=True)
# Create SQLite database with check_same_thread=False for multi-threaded access
database_url = f"sqlite:///{database_path}"
self._engine = create_engine(
database_url,
connect_args={"check_same_thread": False},
echo=False # Set to True for SQL debugging
)
# Create session factory with scoped_session for thread safety
session_factory = sessionmaker(bind=self._engine)
self._Session = scoped_session(session_factory)
# Create all tables
Base.metadata.create_all(self._engine)
# Set restrictive file permissions (owner read/write only)
if os.path.exists(database_path):
try:
os.chmod(database_path, stat.S_IRUSR | stat.S_IWUSR) # 600
except OSError:
# May fail on some systems, not critical
pass
self._initialized = True
@property
def session(self) -> Session:
"""Get a thread-local database session."""
if not self._initialized:
raise RuntimeError("DatabaseManager not initialized. Call initialize() first.")
return self._Session()
def close_session(self) -> None:
"""Close the current thread-local session."""
if self._initialized:
self._Session.remove()
def persist_access(
self,
ip: str,
path: str,
user_agent: str = "",
method: str = "GET",
is_suspicious: bool = False,
is_honeypot_trigger: bool = False,
attack_types: Optional[List[str]] = None,
matched_patterns: Optional[Dict[str, str]] = None
) -> Optional[int]:
"""
Persist an access log entry to the database.
Args:
ip: Client IP address
path: Requested path
user_agent: Client user agent string
method: HTTP method (GET, POST, HEAD)
is_suspicious: Whether the request was flagged as suspicious
is_honeypot_trigger: Whether a honeypot path was accessed
attack_types: List of detected attack types
matched_patterns: Dict mapping attack_type to matched pattern
Returns:
The ID of the created AccessLog record, or None on error
"""
session = self.session
try:
# Create access log with sanitized fields
access_log = AccessLog(
ip=sanitize_ip(ip),
path=sanitize_path(path),
user_agent=sanitize_user_agent(user_agent),
method=method[:10],
is_suspicious=is_suspicious,
is_honeypot_trigger=is_honeypot_trigger,
timestamp=datetime.utcnow()
)
session.add(access_log)
session.flush() # Get the ID before committing
# Add attack detections if any
if attack_types:
matched_patterns = matched_patterns or {}
for attack_type in attack_types:
detection = AttackDetection(
access_log_id=access_log.id,
attack_type=attack_type[:50],
matched_pattern=sanitize_attack_pattern(
matched_patterns.get(attack_type, "")
)
)
session.add(detection)
# Update IP stats
self._update_ip_stats(session, ip)
session.commit()
return access_log.id
except Exception as e:
session.rollback()
# Log error but don't crash - database persistence is secondary to honeypot function
print(f"Database error persisting access: {e}")
return None
finally:
self.close_session()
def persist_credential(
self,
ip: str,
path: str,
username: Optional[str] = None,
password: Optional[str] = None
) -> Optional[int]:
"""
Persist a credential attempt to the database.
Args:
ip: Client IP address
path: Login form path
username: Submitted username
password: Submitted password
Returns:
The ID of the created CredentialAttempt record, or None on error
"""
session = self.session
try:
credential = CredentialAttempt(
ip=sanitize_ip(ip),
path=sanitize_path(path),
username=sanitize_credential(username),
password=sanitize_credential(password),
timestamp=datetime.utcnow()
)
session.add(credential)
session.commit()
return credential.id
except Exception as e:
session.rollback()
print(f"Database error persisting credential: {e}")
return None
finally:
self.close_session()
def _update_ip_stats(self, session: Session, ip: str) -> None:
"""
Update IP statistics (upsert pattern).
Args:
session: Active database session
ip: IP address to update
"""
sanitized_ip = sanitize_ip(ip)
now = datetime.utcnow()
ip_stats = session.query(IpStats).filter(IpStats.ip == sanitized_ip).first()
if ip_stats:
ip_stats.total_requests += 1
ip_stats.last_seen = now
else:
ip_stats = IpStats(
ip=sanitized_ip,
total_requests=1,
first_seen=now,
last_seen=now
)
session.add(ip_stats)
def get_access_logs(
self,
limit: int = 100,
offset: int = 0,
ip_filter: Optional[str] = None,
suspicious_only: bool = False
) -> List[Dict[str, Any]]:
"""
Retrieve access logs with optional filtering.
Args:
limit: Maximum number of records to return
offset: Number of records to skip
ip_filter: Filter by IP address
suspicious_only: Only return suspicious requests
Returns:
List of access log dictionaries
"""
session = self.session
try:
query = session.query(AccessLog).order_by(AccessLog.timestamp.desc())
if ip_filter:
query = query.filter(AccessLog.ip == sanitize_ip(ip_filter))
if suspicious_only:
query = query.filter(AccessLog.is_suspicious == True)
logs = query.offset(offset).limit(limit).all()
return [
{
'id': log.id,
'ip': log.ip,
'path': log.path,
'user_agent': log.user_agent,
'method': log.method,
'is_suspicious': log.is_suspicious,
'is_honeypot_trigger': log.is_honeypot_trigger,
'timestamp': log.timestamp.isoformat(),
'attack_types': [d.attack_type for d in log.attack_detections]
}
for log in logs
]
finally:
self.close_session()
def get_credential_attempts(
self,
limit: int = 100,
offset: int = 0,
ip_filter: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
Retrieve credential attempts with optional filtering.
Args:
limit: Maximum number of records to return
offset: Number of records to skip
ip_filter: Filter by IP address
Returns:
List of credential attempt dictionaries
"""
session = self.session
try:
query = session.query(CredentialAttempt).order_by(
CredentialAttempt.timestamp.desc()
)
if ip_filter:
query = query.filter(CredentialAttempt.ip == sanitize_ip(ip_filter))
attempts = query.offset(offset).limit(limit).all()
return [
{
'id': attempt.id,
'ip': attempt.ip,
'path': attempt.path,
'username': attempt.username,
'password': attempt.password,
'timestamp': attempt.timestamp.isoformat()
}
for attempt in attempts
]
finally:
self.close_session()
def get_ip_stats(self, limit: int = 100) -> List[Dict[str, Any]]:
"""
Retrieve IP statistics ordered by total requests.
Args:
limit: Maximum number of records to return
Returns:
List of IP stats dictionaries
"""
session = self.session
try:
stats = session.query(IpStats).order_by(
IpStats.total_requests.desc()
).limit(limit).all()
return [
{
'ip': s.ip,
'total_requests': s.total_requests,
'first_seen': s.first_seen.isoformat(),
'last_seen': s.last_seen.isoformat(),
'country_code': s.country_code,
'city': s.city,
'asn': s.asn,
'asn_org': s.asn_org,
'reputation_score': s.reputation_score,
'reputation_source': s.reputation_source
}
for s in stats
]
finally:
self.close_session()
def get_dashboard_counts(self) -> Dict[str, int]:
"""
Get aggregate statistics for the dashboard.
Returns:
Dictionary with total_accesses, unique_ips, unique_paths,
suspicious_accesses, honeypot_triggered, honeypot_ips
"""
session = self.session
try:
# Get main aggregate counts in one query
result = session.query(
func.count(AccessLog.id).label('total_accesses'),
func.count(distinct(AccessLog.ip)).label('unique_ips'),
func.count(distinct(AccessLog.path)).label('unique_paths'),
func.sum(case((AccessLog.is_suspicious == True, 1), else_=0)).label('suspicious_accesses'),
func.sum(case((AccessLog.is_honeypot_trigger == True, 1), else_=0)).label('honeypot_triggered')
).first()
# Get unique IPs that triggered honeypots
honeypot_ips = session.query(
func.count(distinct(AccessLog.ip))
).filter(AccessLog.is_honeypot_trigger == True).scalar() or 0
return {
'total_accesses': result.total_accesses or 0,
'unique_ips': result.unique_ips or 0,
'unique_paths': result.unique_paths or 0,
'suspicious_accesses': int(result.suspicious_accesses or 0),
'honeypot_triggered': int(result.honeypot_triggered or 0),
'honeypot_ips': honeypot_ips
}
finally:
self.close_session()
def get_top_ips(self, limit: int = 10) -> List[tuple]:
"""
Get top IP addresses by access count.
Args:
limit: Maximum number of results
Returns:
List of (ip, count) tuples ordered by count descending
"""
session = self.session
try:
results = session.query(
AccessLog.ip,
func.count(AccessLog.id).label('count')
).group_by(AccessLog.ip).order_by(
func.count(AccessLog.id).desc()
).limit(limit).all()
return [(row.ip, row.count) for row in results]
finally:
self.close_session()
def get_top_paths(self, limit: int = 10) -> List[tuple]:
"""
Get top paths by access count.
Args:
limit: Maximum number of results
Returns:
List of (path, count) tuples ordered by count descending
"""
session = self.session
try:
results = session.query(
AccessLog.path,
func.count(AccessLog.id).label('count')
).group_by(AccessLog.path).order_by(
func.count(AccessLog.id).desc()
).limit(limit).all()
return [(row.path, row.count) for row in results]
finally:
self.close_session()
def get_top_user_agents(self, limit: int = 10) -> List[tuple]:
"""
Get top user agents by access count.
Args:
limit: Maximum number of results
Returns:
List of (user_agent, count) tuples ordered by count descending
"""
session = self.session
try:
results = session.query(
AccessLog.user_agent,
func.count(AccessLog.id).label('count')
).filter(
AccessLog.user_agent.isnot(None),
AccessLog.user_agent != ''
).group_by(AccessLog.user_agent).order_by(
func.count(AccessLog.id).desc()
).limit(limit).all()
return [(row.user_agent, row.count) for row in results]
finally:
self.close_session()
def get_recent_suspicious(self, limit: int = 20) -> List[Dict[str, Any]]:
"""
Get recent suspicious access attempts.
Args:
limit: Maximum number of results
Returns:
List of access log dictionaries with is_suspicious=True
"""
session = self.session
try:
logs = session.query(AccessLog).filter(
AccessLog.is_suspicious == True
).order_by(AccessLog.timestamp.desc()).limit(limit).all()
return [
{
'ip': log.ip,
'path': log.path,
'user_agent': log.user_agent,
'timestamp': log.timestamp.isoformat()
}
for log in logs
]
finally:
self.close_session()
def get_honeypot_triggered_ips(self) -> List[tuple]:
"""
Get IPs that triggered honeypot paths with the paths they accessed.
Returns:
List of (ip, [paths]) tuples
"""
session = self.session
try:
# Get all honeypot triggers grouped by IP
results = session.query(
AccessLog.ip,
AccessLog.path
).filter(
AccessLog.is_honeypot_trigger == True
).all()
# Group paths by IP
ip_paths: Dict[str, List[str]] = {}
for row in results:
if row.ip not in ip_paths:
ip_paths[row.ip] = []
if row.path not in ip_paths[row.ip]:
ip_paths[row.ip].append(row.path)
return [(ip, paths) for ip, paths in ip_paths.items()]
finally:
self.close_session()
def get_recent_attacks(self, limit: int = 20) -> List[Dict[str, Any]]:
"""
Get recent access logs that have attack detections.
Args:
limit: Maximum number of results
Returns:
List of access log dicts with attack_types included
"""
session = self.session
try:
# Get access logs that have attack detections
logs = session.query(AccessLog).join(
AttackDetection
).order_by(AccessLog.timestamp.desc()).limit(limit).all()
return [
{
'ip': log.ip,
'path': log.path,
'user_agent': log.user_agent,
'timestamp': log.timestamp.isoformat(),
'attack_types': [d.attack_type for d in log.attack_detections]
}
for log in logs
]
finally:
self.close_session()
# Module-level singleton instance
_db_manager = DatabaseManager()
def get_database() -> DatabaseManager:
"""Get the database manager singleton instance."""
return _db_manager
def initialize_database(database_path: str = "data/krawl.db") -> None:
"""Initialize the database system."""
_db_manager.initialize(database_path)

View File

@@ -229,7 +229,7 @@ class Handler(BaseHTTPRequestHandler):
self.access_logger.warning(f"[CREDENTIALS CAPTURED] {client_ip} - Username: {username or 'N/A'} - Path: {self.path}")
# send the post data (body) to the record_access function so the post data can be used to detect suspicious things.
self.tracker.record_access(client_ip, self.path, user_agent, post_data)
self.tracker.record_access(client_ip, self.path, user_agent, post_data, method='POST')
time.sleep(1)
@@ -347,7 +347,7 @@ class Handler(BaseHTTPRequestHandler):
self.app_logger.error(f"Error generating dashboard: {e}")
return
self.tracker.record_access(client_ip, self.path, user_agent)
self.tracker.record_access(client_ip, self.path, user_agent, method='GET')
if self.tracker.is_suspicious_user_agent(user_agent):
self.access_logger.warning(f"[SUSPICIOUS] {client_ip} - {user_agent[:50]} - {self.path}")

143
src/models.py Normal file
View File

@@ -0,0 +1,143 @@
#!/usr/bin/env python3
"""
SQLAlchemy ORM models for the Krawl honeypot database.
Stores access logs, credential attempts, attack detections, and IP statistics.
"""
from datetime import datetime
from typing import Optional, List
from sqlalchemy import String, Integer, Boolean, DateTime, ForeignKey, Index
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sanitizer import (
MAX_IP_LENGTH,
MAX_PATH_LENGTH,
MAX_USER_AGENT_LENGTH,
MAX_CREDENTIAL_LENGTH,
MAX_ATTACK_PATTERN_LENGTH,
MAX_CITY_LENGTH,
MAX_ASN_ORG_LENGTH,
MAX_REPUTATION_SOURCE_LENGTH,
)
class Base(DeclarativeBase):
"""Base class for all ORM models."""
pass
class AccessLog(Base):
"""
Records all HTTP requests to the honeypot.
Stores request metadata, suspicious activity flags, and timestamps
for analysis and dashboard display.
"""
__tablename__ = 'access_logs'
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
ip: Mapped[str] = mapped_column(String(MAX_IP_LENGTH), nullable=False, index=True)
path: Mapped[str] = mapped_column(String(MAX_PATH_LENGTH), nullable=False)
user_agent: Mapped[Optional[str]] = mapped_column(String(MAX_USER_AGENT_LENGTH), nullable=True)
method: Mapped[str] = mapped_column(String(10), nullable=False, default='GET')
is_suspicious: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
is_honeypot_trigger: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
timestamp: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=datetime.utcnow, index=True)
# Relationship to attack detections
attack_detections: Mapped[List["AttackDetection"]] = relationship(
"AttackDetection",
back_populates="access_log",
cascade="all, delete-orphan"
)
# Indexes for common queries
__table_args__ = (
Index('ix_access_logs_ip_timestamp', 'ip', 'timestamp'),
Index('ix_access_logs_is_suspicious', 'is_suspicious'),
Index('ix_access_logs_is_honeypot_trigger', 'is_honeypot_trigger'),
)
def __repr__(self) -> str:
return f"<AccessLog(id={self.id}, ip='{self.ip}', path='{self.path[:50]}')>"
class CredentialAttempt(Base):
"""
Records captured login attempts from honeypot login forms.
Stores the submitted username and password along with request metadata.
"""
__tablename__ = 'credential_attempts'
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
ip: Mapped[str] = mapped_column(String(MAX_IP_LENGTH), nullable=False, index=True)
path: Mapped[str] = mapped_column(String(MAX_PATH_LENGTH), nullable=False)
username: Mapped[Optional[str]] = mapped_column(String(MAX_CREDENTIAL_LENGTH), nullable=True)
password: Mapped[Optional[str]] = mapped_column(String(MAX_CREDENTIAL_LENGTH), nullable=True)
timestamp: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=datetime.utcnow, index=True)
# Composite index for common queries
__table_args__ = (
Index('ix_credential_attempts_ip_timestamp', 'ip', 'timestamp'),
)
def __repr__(self) -> str:
return f"<CredentialAttempt(id={self.id}, ip='{self.ip}', username='{self.username}')>"
class AttackDetection(Base):
"""
Records detected attack patterns in requests.
Linked to the parent AccessLog record. Multiple attack types can be
detected in a single request.
"""
__tablename__ = 'attack_detections'
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
access_log_id: Mapped[int] = mapped_column(
Integer,
ForeignKey('access_logs.id', ondelete='CASCADE'),
nullable=False,
index=True
)
attack_type: Mapped[str] = mapped_column(String(50), nullable=False)
matched_pattern: Mapped[Optional[str]] = mapped_column(String(MAX_ATTACK_PATTERN_LENGTH), nullable=True)
# Relationship back to access log
access_log: Mapped["AccessLog"] = relationship("AccessLog", back_populates="attack_detections")
def __repr__(self) -> str:
return f"<AttackDetection(id={self.id}, type='{self.attack_type}')>"
class IpStats(Base):
"""
Aggregated statistics per IP address.
Includes fields for future GeoIP and reputation enrichment.
Updated on each request from an IP.
"""
__tablename__ = 'ip_stats'
ip: Mapped[str] = mapped_column(String(MAX_IP_LENGTH), primary_key=True)
total_requests: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
first_seen: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=datetime.utcnow)
last_seen: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=datetime.utcnow)
# GeoIP fields (populated by future enrichment)
country_code: Mapped[Optional[str]] = mapped_column(String(2), nullable=True)
city: Mapped[Optional[str]] = mapped_column(String(MAX_CITY_LENGTH), nullable=True)
asn: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
asn_org: Mapped[Optional[str]] = mapped_column(String(MAX_ASN_ORG_LENGTH), nullable=True)
# Reputation fields (populated by future enrichment)
reputation_score: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
reputation_source: Mapped[Optional[str]] = mapped_column(String(MAX_REPUTATION_SOURCE_LENGTH), nullable=True)
reputation_updated: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
def __repr__(self) -> str:
return f"<IpStats(ip='{self.ip}', total_requests={self.total_requests})>"

113
src/sanitizer.py Normal file
View File

@@ -0,0 +1,113 @@
#!/usr/bin/env python3
"""
Sanitization utilities for safe database storage and HTML output.
Protects against SQL injection payloads, XSS, and storage exhaustion attacks.
"""
import html
import re
from typing import Optional
# Field length limits for database storage
MAX_IP_LENGTH = 45 # IPv6 max length
MAX_PATH_LENGTH = 2048 # URL max practical length
MAX_USER_AGENT_LENGTH = 512
MAX_CREDENTIAL_LENGTH = 256
MAX_ATTACK_PATTERN_LENGTH = 256
MAX_CITY_LENGTH = 128
MAX_ASN_ORG_LENGTH = 256
MAX_REPUTATION_SOURCE_LENGTH = 64
def sanitize_for_storage(value: Optional[str], max_length: int) -> str:
"""
Sanitize and truncate string for safe database storage.
Removes null bytes and control characters that could cause issues
with database storage or log processing.
Args:
value: The string to sanitize
max_length: Maximum length to truncate to
Returns:
Sanitized and truncated string, empty string if input is None/empty
"""
if not value:
return ""
# Convert to string if not already
value = str(value)
# Remove null bytes and control characters (except newline \n, tab \t, carriage return \r)
# Control chars are 0x00-0x1F and 0x7F, we keep 0x09 (tab), 0x0A (newline), 0x0D (carriage return)
cleaned = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', '', value)
# Truncate to max length
return cleaned[:max_length]
def sanitize_ip(value: Optional[str]) -> str:
"""Sanitize IP address for storage."""
return sanitize_for_storage(value, MAX_IP_LENGTH)
def sanitize_path(value: Optional[str]) -> str:
"""Sanitize URL path for storage."""
return sanitize_for_storage(value, MAX_PATH_LENGTH)
def sanitize_user_agent(value: Optional[str]) -> str:
"""Sanitize user agent string for storage."""
return sanitize_for_storage(value, MAX_USER_AGENT_LENGTH)
def sanitize_credential(value: Optional[str]) -> str:
"""Sanitize username or password for storage."""
return sanitize_for_storage(value, MAX_CREDENTIAL_LENGTH)
def sanitize_attack_pattern(value: Optional[str]) -> str:
"""Sanitize matched attack pattern for storage."""
return sanitize_for_storage(value, MAX_ATTACK_PATTERN_LENGTH)
def escape_html(value: Optional[str]) -> str:
"""
Escape HTML special characters for safe display in web pages.
Prevents stored XSS attacks when displaying user-controlled data
in the dashboard.
Args:
value: The string to escape
Returns:
HTML-escaped string, empty string if input is None/empty
"""
if not value:
return ""
return html.escape(str(value))
def escape_html_truncated(value: Optional[str], max_display_length: int) -> str:
"""
Escape HTML and truncate for display.
Args:
value: The string to escape and truncate
max_display_length: Maximum display length (truncation happens before escaping)
Returns:
HTML-escaped and truncated string
"""
if not value:
return ""
value_str = str(value)
if len(value_str) > max_display_length:
value_str = value_str[:max_display_length] + "..."
return html.escape(value_str)

View File

@@ -12,6 +12,7 @@ from config import Config
from tracker import AccessTracker
from handler import Handler
from logger import initialize_logging, get_app_logger, get_access_logger, get_credential_logger
from database import initialize_database
def print_usage():
@@ -33,6 +34,8 @@ def print_usage():
print(' PROBABILITY_ERROR_CODES - Probability (0-100) to return HTTP error codes (default: 0)')
print(' CHAR_SPACE - Characters for random links')
print(' SERVER_HEADER - HTTP Server header for deception (default: Apache/2.2.22 (Ubuntu))')
print(' DATABASE_PATH - Path to SQLite database (default: data/krawl.db)')
print(' DATABASE_RETENTION_DAYS - Days to retain database records (default: 30)')
print(' TIMEZONE - IANA timezone for logs/dashboard (e.g., America/New_York, Europe/Rome)')
print(' If not set, system timezone will be used')
@@ -54,7 +57,15 @@ def main():
access_logger = get_access_logger()
credential_logger = get_credential_logger()
# Initialize tracker with timezone
config = Config.from_env()
# Initialize database for persistent storage
try:
initialize_database(config.database_path)
app_logger.info(f'Database initialized at: {config.database_path}')
except Exception as e:
app_logger.warning(f'Database initialization failed: {e}. Continuing with in-memory only.')
tracker = AccessTracker(timezone=tz)
Handler.config = config

View File

@@ -5,8 +5,14 @@ Dashboard template for viewing honeypot statistics.
Customize this template to change the dashboard appearance.
"""
import html
from datetime import datetime
def _escape(value) -> str:
"""Escape HTML special characters to prevent XSS attacks."""
if value is None:
return ""
return html.escape(str(value))
def format_timestamp(iso_timestamp: str) -> str:
"""Format ISO timestamp for display (YYYY-MM-DD HH:MM:SS)"""
@@ -21,45 +27,45 @@ def format_timestamp(iso_timestamp: str) -> str:
def generate_dashboard(stats: dict) -> str:
"""Generate dashboard HTML with access statistics"""
# Generate IP rows
# Generate IP rows (IPs are generally safe but escape for consistency)
top_ips_rows = '\n'.join([
f'<tr><td class="rank">{i+1}</td><td>{ip}</td><td>{count}</td></tr>'
f'<tr><td class="rank">{i+1}</td><td>{_escape(ip)}</td><td>{count}</td></tr>'
for i, (ip, count) in enumerate(stats['top_ips'])
]) or '<tr><td colspan="3" style="text-align:center;">No data</td></tr>'
# Generate paths rows
# Generate paths rows (CRITICAL: paths can contain XSS payloads)
top_paths_rows = '\n'.join([
f'<tr><td class="rank">{i+1}</td><td>{path}</td><td>{count}</td></tr>'
f'<tr><td class="rank">{i+1}</td><td>{_escape(path)}</td><td>{count}</td></tr>'
for i, (path, count) in enumerate(stats['top_paths'])
]) or '<tr><td colspan="3" style="text-align:center;">No data</td></tr>'
# Generate User-Agent rows
# Generate User-Agent rows (CRITICAL: user agents can contain XSS payloads)
top_ua_rows = '\n'.join([
f'<tr><td class="rank">{i+1}</td><td style="word-break: break-all;">{ua[:80]}</td><td>{count}</td></tr>'
f'<tr><td class="rank">{i+1}</td><td style="word-break: break-all;">{_escape(ua[:80])}</td><td>{count}</td></tr>'
for i, (ua, count) in enumerate(stats['top_user_agents'])
]) or '<tr><td colspan="3" style="text-align:center;">No data</td></tr>'
# Generate suspicious accesses rows
# Generate suspicious accesses rows (CRITICAL: multiple user-controlled fields)
suspicious_rows = '\n'.join([
f'<tr><td>{log["ip"]}</td><td>{log["path"]}</td><td style="word-break: break-all;">{log["user_agent"][:60]}</td><td>{format_timestamp(log["timestamp"])}</td></tr>'
f'<tr><td>{_escape(log["ip"])}</td><td>{_escape(log["path"])}</td><td style="word-break: break-all;">{_escape(log["user_agent"][:60])}</td><td>{_escape(log["timestamp"].split("T")[1][:8])}</td></tr>'
for log in stats['recent_suspicious'][-10:]
]) or '<tr><td colspan="4" style="text-align:center;">No suspicious activity detected</td></tr>'
# Generate honeypot triggered IPs rows
honeypot_rows = '\n'.join([
f'<tr><td>{ip}</td><td style="word-break: break-all;">{", ".join(paths)}</td><td>{len(paths)}</td></tr>'
f'<tr><td>{_escape(ip)}</td><td style="word-break: break-all;">{_escape(", ".join(paths))}</td><td>{len(paths)}</td></tr>'
for ip, paths in stats.get('honeypot_triggered_ips', [])
]) or '<tr><td colspan="3" style="text-align:center;">No honeypot triggers yet</td></tr>'
# Generate attack types rows
# Generate attack types rows (CRITICAL: paths and user agents are user-controlled)
attack_type_rows = '\n'.join([
f'<tr><td>{log["ip"]}</td><td>{log["path"]}</td><td>{", ".join(log["attack_types"])}</td><td style="word-break: break-all;">{log["user_agent"][:60]}</td><td>{format_timestamp(log["timestamp"])}</td></tr>'
f'<tr><td>{_escape(log["ip"])}</td><td>{_escape(log["path"])}</td><td>{_escape(", ".join(log["attack_types"]))}</td><td style="word-break: break-all;">{_escape(log["user_agent"][:60])}</td><td>{_escape(log["timestamp"].split("T")[1][:8])}</td></tr>'
for log in stats.get('attack_types', [])[-10:]
]) or '<tr><td colspan="4" style="text-align:center;">No attacks detected</td></tr>'
# Generate credential attempts rows
# Generate credential attempts rows (CRITICAL: usernames and passwords are user-controlled)
credential_rows = '\n'.join([
f'<tr><td>{log["ip"]}</td><td>{log["username"]}</td><td>{log["password"]}</td><td>{log["path"]}</td><td>{format_timestamp(log["timestamp"])}</td></tr>'
f'<tr><td>{_escape(log["ip"])}</td><td>{_escape(log["username"])}</td><td>{_escape(log["password"])}</td><td>{_escape(log["path"])}</td><td>{_escape(log["timestamp"].split("T")[1][:8])}</td></tr>'
for log in stats.get('credential_attempts', [])[-20:]
]) or '<tr><td colspan="5" style="text-align:center;">No credentials captured yet</td></tr>'
@@ -184,7 +190,7 @@ def generate_dashboard(stats: dict) -> str:
</div>
<div class="table-container alert-section">
<h2>🍯 Honeypot Triggers</h2>
<h2>🍯 Honeypot Triggers by IP</h2>
<table>
<thead>
<tr>

View File

@@ -7,10 +7,24 @@ from zoneinfo import ZoneInfo
import re
import urllib.parse
from database import get_database, DatabaseManager
class AccessTracker:
"""Track IP addresses and paths accessed"""
def __init__(self, timezone: Optional[ZoneInfo] = None):
"""
Track IP addresses and paths accessed.
Maintains in-memory structures for fast dashboard access and
persists data to SQLite for long-term storage and analysis.
"""
def __init__(self, db_manager: Optional[DatabaseManager] = None, timezone: Optional[ZoneInfo] = None):
"""
Initialize the access tracker.
Args:
db_manager: Optional DatabaseManager for persistence.
If None, will use the global singleton.
"""
self.ip_counts: Dict[str, int] = defaultdict(int)
self.path_counts: Dict[str, int] = defaultdict(int)
self.user_agent_counts: Dict[str, int] = defaultdict(int)
@@ -23,7 +37,7 @@ class AccessTracker:
'burp', 'zap', 'w3af', 'metasploit', 'nuclei', 'gobuster', 'dirbuster'
]
# common attack types such as xss, shell injection, probes
# Common attack types such as xss, shell injection, probes
self.attack_types = {
'path_traversal': r'\.\.',
'sql_injection': r"('|--|;|\bOR\b|\bUNION\b|\bSELECT\b|\bDROP\b)",
@@ -35,6 +49,25 @@ class AccessTracker:
# Track IPs that accessed honeypot paths from robots.txt
self.honeypot_triggered: Dict[str, List[str]] = defaultdict(list)
# Database manager for persistence (lazily initialized)
self._db_manager = db_manager
@property
def db(self) -> Optional[DatabaseManager]:
"""
Get the database manager, lazily initializing if needed.
Returns:
DatabaseManager instance or None if not available
"""
if self._db_manager is None:
try:
self._db_manager = get_database()
except Exception:
# Database not initialized, persistence disabled
pass
return self._db_manager
def parse_credentials(self, post_data: str) -> Tuple[str, str]:
"""
Parse username and password from POST data.
@@ -77,7 +110,12 @@ class AccessTracker:
return username, password
def record_credential_attempt(self, ip: str, path: str, username: str, password: str):
"""Record a credential login attempt"""
"""
Record a credential login attempt.
Stores in both in-memory list and SQLite database.
"""
# In-memory storage for dashboard
self.credential_attempts.append({
'ip': ip,
'path': path,
@@ -86,27 +124,63 @@ class AccessTracker:
'timestamp': datetime.now(self.timezone).isoformat()
})
def record_access(self, ip: str, path: str, user_agent: str = '', body: str = ''):
"""Record an access attempt"""
# Persist to database
if self.db:
try:
self.db.persist_credential(
ip=ip,
path=path,
username=username,
password=password
)
except Exception:
# Don't crash if database persistence fails
pass
def record_access(
self,
ip: str,
path: str,
user_agent: str = '',
body: str = '',
method: str = 'GET'
):
"""
Record an access attempt.
Stores in both in-memory structures and SQLite database.
Args:
ip: Client IP address
path: Requested path
user_agent: Client user agent string
body: Request body (for POST/PUT)
method: HTTP method
"""
self.ip_counts[ip] += 1
self.path_counts[path] += 1
if user_agent:
self.user_agent_counts[user_agent] += 1
# path attack type detection
# Path attack type detection
attack_findings = self.detect_attack_type(path)
# post / put data
# POST/PUT body attack detection
if len(body) > 0:
attack_findings.extend(self.detect_attack_type(body))
is_suspicious = self.is_suspicious_user_agent(user_agent) or self.is_honeypot_path(path) or len(attack_findings) > 0
is_suspicious = (
self.is_suspicious_user_agent(user_agent) or
self.is_honeypot_path(path) or
len(attack_findings) > 0
)
is_honeypot = self.is_honeypot_path(path)
# Track if this IP accessed a honeypot path
if self.is_honeypot_path(path):
if is_honeypot:
self.honeypot_triggered[ip].append(path)
# In-memory storage for dashboard
self.access_log.append({
'ip': ip,
'path': path,
@@ -117,6 +191,22 @@ class AccessTracker:
'timestamp': datetime.now(self.timezone).isoformat()
})
# Persist to database
if self.db:
try:
self.db.persist_access(
ip=ip,
path=path,
user_agent=user_agent,
method=method,
is_suspicious=is_suspicious,
is_honeypot_trigger=is_honeypot,
attack_types=attack_findings if attack_findings else None
)
except Exception:
# Don't crash if database persistence fails
pass
def detect_attack_type(self, data:str) -> list[str]:
"""
Returns a list of all attack types found in path data
@@ -186,21 +276,20 @@ class AccessTracker:
return [(ip, paths) for ip, paths in self.honeypot_triggered.items()]
def get_stats(self) -> Dict:
"""Get statistics summary"""
suspicious_count = sum(1 for log in self.access_log if log.get('suspicious', False))
honeypot_count = sum(1 for log in self.access_log if log.get('honeypot_triggered', False))
return {
'total_accesses': len(self.access_log),
'unique_ips': len(self.ip_counts),
'unique_paths': len(self.path_counts),
'suspicious_accesses': suspicious_count,
'honeypot_triggered': honeypot_count,
'honeypot_ips': len(self.honeypot_triggered),
'top_ips': self.get_top_ips(10),
'top_paths': self.get_top_paths(10),
'top_user_agents': self.get_top_user_agents(10),
'recent_suspicious': self.get_suspicious_accesses(20),
'honeypot_triggered_ips': self.get_honeypot_triggered_ips(),
'attack_types': self.get_attack_type_accesses(20),
'credential_attempts': self.credential_attempts[-50:] # Last 50 attempts
}
"""Get statistics summary from database."""
if not self.db:
raise RuntimeError("Database not available for dashboard stats")
# Get aggregate counts from database
stats = self.db.get_dashboard_counts()
# Add detailed lists from database
stats['top_ips'] = self.db.get_top_ips(10)
stats['top_paths'] = self.db.get_top_paths(10)
stats['top_user_agents'] = self.db.get_top_user_agents(10)
stats['recent_suspicious'] = self.db.get_recent_suspicious(20)
stats['honeypot_triggered_ips'] = self.db.get_honeypot_triggered_ips()
stats['attack_types'] = self.db.get_recent_attacks(20)
stats['credential_attempts'] = self.db.get_credential_attempts(limit=50)
return stats