From f1c142c53d7f40dc8eec68d886928542ac44e9b6 Mon Sep 17 00:00:00 2001 From: Phillip Tarrant Date: Sun, 28 Dec 2025 10:43:32 -0600 Subject: [PATCH] feat: add SQLite persistent storage for request logging - Add SQLAlchemy-based database layer for persistent storage - Create models for access_logs, credential_attempts, attack_detections, ip_stats - Include fields for future GeoIP and reputation enrichment - Implement sanitization utilities to protect against malicious payloads - Fix XSS vulnerability in dashboard template (HTML escape all user data) - Add DATABASE_PATH and DATABASE_RETENTION_DAYS config options - Dual storage: in-memory for dashboard performance + SQLite for persistence New files: - src/models.py - SQLAlchemy ORM models - src/database.py - DatabaseManager singleton - src/sanitizer.py - Input sanitization and HTML escaping - requirements.txt - SQLAlchemy dependency Security protections: - Parameterized queries via SQLAlchemy ORM - Field length limits to prevent storage exhaustion - Null byte and control character stripping - HTML escaping on dashboard output --- .gitignore | 4 + docs/coding-guidelines.md | 90 +++++++ requirements.txt | 5 + src/config.py | 7 +- src/database.py | 361 ++++++++++++++++++++++++++++ src/handler.py | 4 +- src/models.py | 141 +++++++++++ src/sanitizer.py | 113 +++++++++ src/server.py | 10 + src/templates/dashboard_template.py | 35 ++- src/tracker.py | 122 ++++++++-- 11 files changed, 860 insertions(+), 32 deletions(-) create mode 100644 docs/coding-guidelines.md create mode 100644 requirements.txt create mode 100644 src/database.py create mode 100644 src/models.py create mode 100644 src/sanitizer.py diff --git a/.gitignore b/.gitignore index 5d758cb..a36748e 100644 --- a/.gitignore +++ b/.gitignore @@ -61,6 +61,10 @@ secrets/ *.log logs/ +# Database +data/ +*.db + # Temporary files *.tmp *.temp diff --git a/docs/coding-guidelines.md b/docs/coding-guidelines.md new file mode 100644 index 0000000..1e13575 --- /dev/null +++ b/docs/coding-guidelines.md @@ -0,0 +1,90 @@ +### Coding Standards + +**Style & Structure** +- Prefer longer, explicit code over compact one-liners +- Always include docstrings for functions/classes + inline comments +- Strongly prefer OOP-style code (classes over functional/nested functions) +- Strong typing throughout (dataclasses, TypedDict, Enums, type hints) +- Value future-proofing and expanded usage insights + +**Data Design** +- Use dataclasses for internal data modeling +- Typed JSON structures +- Functions return fully typed objects (no loose dicts) +- Snapshot files in JSON or YAML +- Human-readable fields (e.g., `sql_injection`, `xss_attempt`) + +**Templates & UI** +- Don't mix large HTML/CSS blocks in Python code +- Prefer Jinja templates for HTML rendering +- Clean CSS, minimal inline clutter, readable template logic + +**Writing & Documentation** +- Markdown documentation +- Clear section headers +- Roadmap/Phase/Feature-Session style documents + +**Logging** +- Use singleton for logging found in `src\logger.py` +- Setup logging at app start: + ``` + initialize_logging() + app_logger = get_app_logger() + access_logger = get_access_logger() + credential_logger = get_credential_logger() + ``` + +**Preferred Pip Packages** +- API/Web Server: Simple Python +- HTTP: Requests +- SQLite: Sqlalchemy +- Database Migrations: Alembic + +### Error Handling +- Custom exception classes for domain-specific errors +- Consistent error response formats (JSON structure) +- Logging severity levels (ERROR vs WARNING) + +### Configuration +- `.env` for secrets (never committed) +- Maintain `.env.example` in each component for documentation +- Typed config loaders using dataclasses +- Validation on startup + +### Containerization & Deployment +- Explicit Dockerfiles +- Production-friendly hardening (distroless/slim when meaningful) +- Use git branch as tag + +### Dependency Management +- Use `requirements.txt` and virtual environments (`python3 -m venv venv`) +- Use path `venv` for all virtual environments +- Pin versions to version ranges (or exact versions if pinning a particular version) +- Activate venv before running code (unless in Docker) + +### Testing Standards +- Manual testing preferred for applications +- **tests:** Use shell scripts with curl/httpie for simulation and attack scripts. +- tests should be located in `tests` directory + +### Git Standards + +**Branch Strategy:** +- `master` - Production-ready code only +- `beta` - Public pre-release testing +- `dev` - Main development branch, integration point + +**Workflow:** +- Feature work branches off `dev` (e.g., `feature/add-scheduler`) +- Merge features back to `dev` for testing +- Promote `dev` → `beta` for public testing (when applicable) +- Promote `beta` (or `dev`) → `master` for production + +**Commit Messages:** +- Use conventional commit format: `feat:`, `fix:`, `docs:`, `refactor:`, etc. +- Keep commits atomic and focused +- Write clear, descriptive messages + +**Tagging:** +- Tag releases on `master` with semantic versioning (e.g., `v1.2.3`) +- Optionally tag beta releases (e.g., `v1.2.3-beta.1`) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..94f74f2 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +# Krawl Honeypot Dependencies +# Install with: pip install -r requirements.txt + +# Database ORM +SQLAlchemy>=2.0.0,<3.0.0 diff --git a/src/config.py b/src/config.py index 7c6714c..76f1aed 100644 --- a/src/config.py +++ b/src/config.py @@ -22,6 +22,9 @@ class Config: api_server_path: str = "/api/v2/users" probability_error_codes: int = 0 # Percentage (0-100) server_header: str = "Apache/2.2.22 (Ubuntu)" + # Database settings + database_path: str = "data/krawl.db" + database_retention_days: int = 30 @classmethod def from_env(cls) -> 'Config': @@ -46,5 +49,7 @@ class Config: api_server_port=int(os.getenv('API_SERVER_PORT', 8080)), api_server_path=os.getenv('API_SERVER_PATH', '/api/v2/users'), probability_error_codes=int(os.getenv('PROBABILITY_ERROR_CODES', 5)), - server_header=os.getenv('SERVER_HEADER', 'Apache/2.2.22 (Ubuntu)') + server_header=os.getenv('SERVER_HEADER', 'Apache/2.2.22 (Ubuntu)'), + database_path=os.getenv('DATABASE_PATH', 'data/krawl.db'), + database_retention_days=int(os.getenv('DATABASE_RETENTION_DAYS', 30)) ) diff --git a/src/database.py b/src/database.py new file mode 100644 index 0000000..58a4505 --- /dev/null +++ b/src/database.py @@ -0,0 +1,361 @@ +#!/usr/bin/env python3 + +""" +Database singleton module for the Krawl honeypot. +Provides SQLAlchemy session management and database initialization. +""" + +import os +import stat +from datetime import datetime +from typing import Optional, List, Dict, Any + +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker, scoped_session, Session + +from models import Base, AccessLog, CredentialAttempt, AttackDetection, IpStats +from sanitizer import ( + sanitize_ip, + sanitize_path, + sanitize_user_agent, + sanitize_credential, + sanitize_attack_pattern, +) + + +class DatabaseManager: + """ + Singleton database manager for the Krawl honeypot. + + Handles database initialization, session management, and provides + methods for persisting access logs, credentials, and attack detections. + """ + _instance: Optional["DatabaseManager"] = None + + def __new__(cls) -> "DatabaseManager": + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def initialize(self, database_path: str = "data/krawl.db") -> None: + """ + Initialize the database connection and create tables. + + Args: + database_path: Path to the SQLite database file + """ + if self._initialized: + return + + # Create data directory if it doesn't exist + data_dir = os.path.dirname(database_path) + if data_dir and not os.path.exists(data_dir): + os.makedirs(data_dir, exist_ok=True) + + # Create SQLite database with check_same_thread=False for multi-threaded access + database_url = f"sqlite:///{database_path}" + self._engine = create_engine( + database_url, + connect_args={"check_same_thread": False}, + echo=False # Set to True for SQL debugging + ) + + # Create session factory with scoped_session for thread safety + session_factory = sessionmaker(bind=self._engine) + self._Session = scoped_session(session_factory) + + # Create all tables + Base.metadata.create_all(self._engine) + + # Set restrictive file permissions (owner read/write only) + if os.path.exists(database_path): + try: + os.chmod(database_path, stat.S_IRUSR | stat.S_IWUSR) # 600 + except OSError: + # May fail on some systems, not critical + pass + + self._initialized = True + + @property + def session(self) -> Session: + """Get a thread-local database session.""" + if not self._initialized: + raise RuntimeError("DatabaseManager not initialized. Call initialize() first.") + return self._Session() + + def close_session(self) -> None: + """Close the current thread-local session.""" + if self._initialized: + self._Session.remove() + + def persist_access( + self, + ip: str, + path: str, + user_agent: str = "", + method: str = "GET", + is_suspicious: bool = False, + is_honeypot_trigger: bool = False, + attack_types: Optional[List[str]] = None, + matched_patterns: Optional[Dict[str, str]] = None + ) -> Optional[int]: + """ + Persist an access log entry to the database. + + Args: + ip: Client IP address + path: Requested path + user_agent: Client user agent string + method: HTTP method (GET, POST, HEAD) + is_suspicious: Whether the request was flagged as suspicious + is_honeypot_trigger: Whether a honeypot path was accessed + attack_types: List of detected attack types + matched_patterns: Dict mapping attack_type to matched pattern + + Returns: + The ID of the created AccessLog record, or None on error + """ + session = self.session + try: + # Create access log with sanitized fields + access_log = AccessLog( + ip=sanitize_ip(ip), + path=sanitize_path(path), + user_agent=sanitize_user_agent(user_agent), + method=method[:10], + is_suspicious=is_suspicious, + is_honeypot_trigger=is_honeypot_trigger, + timestamp=datetime.utcnow() + ) + session.add(access_log) + session.flush() # Get the ID before committing + + # Add attack detections if any + if attack_types: + matched_patterns = matched_patterns or {} + for attack_type in attack_types: + detection = AttackDetection( + access_log_id=access_log.id, + attack_type=attack_type[:50], + matched_pattern=sanitize_attack_pattern( + matched_patterns.get(attack_type, "") + ) + ) + session.add(detection) + + # Update IP stats + self._update_ip_stats(session, ip) + + session.commit() + return access_log.id + + except Exception as e: + session.rollback() + # Log error but don't crash - database persistence is secondary to honeypot function + print(f"Database error persisting access: {e}") + return None + finally: + self.close_session() + + def persist_credential( + self, + ip: str, + path: str, + username: Optional[str] = None, + password: Optional[str] = None + ) -> Optional[int]: + """ + Persist a credential attempt to the database. + + Args: + ip: Client IP address + path: Login form path + username: Submitted username + password: Submitted password + + Returns: + The ID of the created CredentialAttempt record, or None on error + """ + session = self.session + try: + credential = CredentialAttempt( + ip=sanitize_ip(ip), + path=sanitize_path(path), + username=sanitize_credential(username), + password=sanitize_credential(password), + timestamp=datetime.utcnow() + ) + session.add(credential) + session.commit() + return credential.id + + except Exception as e: + session.rollback() + print(f"Database error persisting credential: {e}") + return None + finally: + self.close_session() + + def _update_ip_stats(self, session: Session, ip: str) -> None: + """ + Update IP statistics (upsert pattern). + + Args: + session: Active database session + ip: IP address to update + """ + sanitized_ip = sanitize_ip(ip) + now = datetime.utcnow() + + ip_stats = session.query(IpStats).filter(IpStats.ip == sanitized_ip).first() + + if ip_stats: + ip_stats.total_requests += 1 + ip_stats.last_seen = now + else: + ip_stats = IpStats( + ip=sanitized_ip, + total_requests=1, + first_seen=now, + last_seen=now + ) + session.add(ip_stats) + + def get_access_logs( + self, + limit: int = 100, + offset: int = 0, + ip_filter: Optional[str] = None, + suspicious_only: bool = False + ) -> List[Dict[str, Any]]: + """ + Retrieve access logs with optional filtering. + + Args: + limit: Maximum number of records to return + offset: Number of records to skip + ip_filter: Filter by IP address + suspicious_only: Only return suspicious requests + + Returns: + List of access log dictionaries + """ + session = self.session + try: + query = session.query(AccessLog).order_by(AccessLog.timestamp.desc()) + + if ip_filter: + query = query.filter(AccessLog.ip == sanitize_ip(ip_filter)) + if suspicious_only: + query = query.filter(AccessLog.is_suspicious == True) + + logs = query.offset(offset).limit(limit).all() + + return [ + { + 'id': log.id, + 'ip': log.ip, + 'path': log.path, + 'user_agent': log.user_agent, + 'method': log.method, + 'is_suspicious': log.is_suspicious, + 'is_honeypot_trigger': log.is_honeypot_trigger, + 'timestamp': log.timestamp.isoformat(), + 'attack_types': [d.attack_type for d in log.attack_detections] + } + for log in logs + ] + finally: + self.close_session() + + def get_credential_attempts( + self, + limit: int = 100, + offset: int = 0, + ip_filter: Optional[str] = None + ) -> List[Dict[str, Any]]: + """ + Retrieve credential attempts with optional filtering. + + Args: + limit: Maximum number of records to return + offset: Number of records to skip + ip_filter: Filter by IP address + + Returns: + List of credential attempt dictionaries + """ + session = self.session + try: + query = session.query(CredentialAttempt).order_by( + CredentialAttempt.timestamp.desc() + ) + + if ip_filter: + query = query.filter(CredentialAttempt.ip == sanitize_ip(ip_filter)) + + attempts = query.offset(offset).limit(limit).all() + + return [ + { + 'id': attempt.id, + 'ip': attempt.ip, + 'path': attempt.path, + 'username': attempt.username, + 'password': attempt.password, + 'timestamp': attempt.timestamp.isoformat() + } + for attempt in attempts + ] + finally: + self.close_session() + + def get_ip_stats(self, limit: int = 100) -> List[Dict[str, Any]]: + """ + Retrieve IP statistics ordered by total requests. + + Args: + limit: Maximum number of records to return + + Returns: + List of IP stats dictionaries + """ + session = self.session + try: + stats = session.query(IpStats).order_by( + IpStats.total_requests.desc() + ).limit(limit).all() + + return [ + { + 'ip': s.ip, + 'total_requests': s.total_requests, + 'first_seen': s.first_seen.isoformat(), + 'last_seen': s.last_seen.isoformat(), + 'country_code': s.country_code, + 'city': s.city, + 'asn': s.asn, + 'asn_org': s.asn_org, + 'reputation_score': s.reputation_score, + 'reputation_source': s.reputation_source + } + for s in stats + ] + finally: + self.close_session() + + +# Module-level singleton instance +_db_manager = DatabaseManager() + + +def get_database() -> DatabaseManager: + """Get the database manager singleton instance.""" + return _db_manager + + +def initialize_database(database_path: str = "data/krawl.db") -> None: + """Initialize the database system.""" + _db_manager.initialize(database_path) diff --git a/src/handler.py b/src/handler.py index ac7ca22..90214ac 100644 --- a/src/handler.py +++ b/src/handler.py @@ -229,7 +229,7 @@ class Handler(BaseHTTPRequestHandler): self.access_logger.warning(f"[CREDENTIALS CAPTURED] {client_ip} - Username: {username or 'N/A'} - Path: {self.path}") # send the post data (body) to the record_access function so the post data can be used to detect suspicious things. - self.tracker.record_access(client_ip, self.path, user_agent, post_data) + self.tracker.record_access(client_ip, self.path, user_agent, post_data, method='POST') time.sleep(1) @@ -347,7 +347,7 @@ class Handler(BaseHTTPRequestHandler): self.app_logger.error(f"Error generating dashboard: {e}") return - self.tracker.record_access(client_ip, self.path, user_agent) + self.tracker.record_access(client_ip, self.path, user_agent, method='GET') if self.tracker.is_suspicious_user_agent(user_agent): self.access_logger.warning(f"[SUSPICIOUS] {client_ip} - {user_agent[:50]} - {self.path}") diff --git a/src/models.py b/src/models.py new file mode 100644 index 0000000..f6e7d30 --- /dev/null +++ b/src/models.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python3 + +""" +SQLAlchemy ORM models for the Krawl honeypot database. +Stores access logs, credential attempts, attack detections, and IP statistics. +""" + +from datetime import datetime +from typing import Optional, List + +from sqlalchemy import String, Integer, Boolean, DateTime, ForeignKey, Index +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship + +from sanitizer import ( + MAX_IP_LENGTH, + MAX_PATH_LENGTH, + MAX_USER_AGENT_LENGTH, + MAX_CREDENTIAL_LENGTH, + MAX_ATTACK_PATTERN_LENGTH, + MAX_CITY_LENGTH, + MAX_ASN_ORG_LENGTH, + MAX_REPUTATION_SOURCE_LENGTH, +) + + +class Base(DeclarativeBase): + """Base class for all ORM models.""" + pass + + +class AccessLog(Base): + """ + Records all HTTP requests to the honeypot. + + Stores request metadata, suspicious activity flags, and timestamps + for analysis and dashboard display. + """ + __tablename__ = 'access_logs' + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + ip: Mapped[str] = mapped_column(String(MAX_IP_LENGTH), nullable=False, index=True) + path: Mapped[str] = mapped_column(String(MAX_PATH_LENGTH), nullable=False) + user_agent: Mapped[Optional[str]] = mapped_column(String(MAX_USER_AGENT_LENGTH), nullable=True) + method: Mapped[str] = mapped_column(String(10), nullable=False, default='GET') + is_suspicious: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) + is_honeypot_trigger: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) + timestamp: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=datetime.utcnow, index=True) + + # Relationship to attack detections + attack_detections: Mapped[List["AttackDetection"]] = relationship( + "AttackDetection", + back_populates="access_log", + cascade="all, delete-orphan" + ) + + # Composite index for common queries + __table_args__ = ( + Index('ix_access_logs_ip_timestamp', 'ip', 'timestamp'), + ) + + def __repr__(self) -> str: + return f"" + + +class CredentialAttempt(Base): + """ + Records captured login attempts from honeypot login forms. + + Stores the submitted username and password along with request metadata. + """ + __tablename__ = 'credential_attempts' + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + ip: Mapped[str] = mapped_column(String(MAX_IP_LENGTH), nullable=False, index=True) + path: Mapped[str] = mapped_column(String(MAX_PATH_LENGTH), nullable=False) + username: Mapped[Optional[str]] = mapped_column(String(MAX_CREDENTIAL_LENGTH), nullable=True) + password: Mapped[Optional[str]] = mapped_column(String(MAX_CREDENTIAL_LENGTH), nullable=True) + timestamp: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=datetime.utcnow, index=True) + + # Composite index for common queries + __table_args__ = ( + Index('ix_credential_attempts_ip_timestamp', 'ip', 'timestamp'), + ) + + def __repr__(self) -> str: + return f"" + + +class AttackDetection(Base): + """ + Records detected attack patterns in requests. + + Linked to the parent AccessLog record. Multiple attack types can be + detected in a single request. + """ + __tablename__ = 'attack_detections' + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + access_log_id: Mapped[int] = mapped_column( + Integer, + ForeignKey('access_logs.id', ondelete='CASCADE'), + nullable=False, + index=True + ) + attack_type: Mapped[str] = mapped_column(String(50), nullable=False) + matched_pattern: Mapped[Optional[str]] = mapped_column(String(MAX_ATTACK_PATTERN_LENGTH), nullable=True) + + # Relationship back to access log + access_log: Mapped["AccessLog"] = relationship("AccessLog", back_populates="attack_detections") + + def __repr__(self) -> str: + return f"" + + +class IpStats(Base): + """ + Aggregated statistics per IP address. + + Includes fields for future GeoIP and reputation enrichment. + Updated on each request from an IP. + """ + __tablename__ = 'ip_stats' + + ip: Mapped[str] = mapped_column(String(MAX_IP_LENGTH), primary_key=True) + total_requests: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + first_seen: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=datetime.utcnow) + last_seen: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=datetime.utcnow) + + # GeoIP fields (populated by future enrichment) + country_code: Mapped[Optional[str]] = mapped_column(String(2), nullable=True) + city: Mapped[Optional[str]] = mapped_column(String(MAX_CITY_LENGTH), nullable=True) + asn: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) + asn_org: Mapped[Optional[str]] = mapped_column(String(MAX_ASN_ORG_LENGTH), nullable=True) + + # Reputation fields (populated by future enrichment) + reputation_score: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) + reputation_source: Mapped[Optional[str]] = mapped_column(String(MAX_REPUTATION_SOURCE_LENGTH), nullable=True) + reputation_updated: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) + + def __repr__(self) -> str: + return f"" diff --git a/src/sanitizer.py b/src/sanitizer.py new file mode 100644 index 0000000..f783129 --- /dev/null +++ b/src/sanitizer.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 + +""" +Sanitization utilities for safe database storage and HTML output. +Protects against SQL injection payloads, XSS, and storage exhaustion attacks. +""" + +import html +import re +from typing import Optional + + +# Field length limits for database storage +MAX_IP_LENGTH = 45 # IPv6 max length +MAX_PATH_LENGTH = 2048 # URL max practical length +MAX_USER_AGENT_LENGTH = 512 +MAX_CREDENTIAL_LENGTH = 256 +MAX_ATTACK_PATTERN_LENGTH = 256 +MAX_CITY_LENGTH = 128 +MAX_ASN_ORG_LENGTH = 256 +MAX_REPUTATION_SOURCE_LENGTH = 64 + + +def sanitize_for_storage(value: Optional[str], max_length: int) -> str: + """ + Sanitize and truncate string for safe database storage. + + Removes null bytes and control characters that could cause issues + with database storage or log processing. + + Args: + value: The string to sanitize + max_length: Maximum length to truncate to + + Returns: + Sanitized and truncated string, empty string if input is None/empty + """ + if not value: + return "" + + # Convert to string if not already + value = str(value) + + # Remove null bytes and control characters (except newline \n, tab \t, carriage return \r) + # Control chars are 0x00-0x1F and 0x7F, we keep 0x09 (tab), 0x0A (newline), 0x0D (carriage return) + cleaned = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', '', value) + + # Truncate to max length + return cleaned[:max_length] + + +def sanitize_ip(value: Optional[str]) -> str: + """Sanitize IP address for storage.""" + return sanitize_for_storage(value, MAX_IP_LENGTH) + + +def sanitize_path(value: Optional[str]) -> str: + """Sanitize URL path for storage.""" + return sanitize_for_storage(value, MAX_PATH_LENGTH) + + +def sanitize_user_agent(value: Optional[str]) -> str: + """Sanitize user agent string for storage.""" + return sanitize_for_storage(value, MAX_USER_AGENT_LENGTH) + + +def sanitize_credential(value: Optional[str]) -> str: + """Sanitize username or password for storage.""" + return sanitize_for_storage(value, MAX_CREDENTIAL_LENGTH) + + +def sanitize_attack_pattern(value: Optional[str]) -> str: + """Sanitize matched attack pattern for storage.""" + return sanitize_for_storage(value, MAX_ATTACK_PATTERN_LENGTH) + + +def escape_html(value: Optional[str]) -> str: + """ + Escape HTML special characters for safe display in web pages. + + Prevents stored XSS attacks when displaying user-controlled data + in the dashboard. + + Args: + value: The string to escape + + Returns: + HTML-escaped string, empty string if input is None/empty + """ + if not value: + return "" + return html.escape(str(value)) + + +def escape_html_truncated(value: Optional[str], max_display_length: int) -> str: + """ + Escape HTML and truncate for display. + + Args: + value: The string to escape and truncate + max_display_length: Maximum display length (truncation happens before escaping) + + Returns: + HTML-escaped and truncated string + """ + if not value: + return "" + + value_str = str(value) + if len(value_str) > max_display_length: + value_str = value_str[:max_display_length] + "..." + + return html.escape(value_str) diff --git a/src/server.py b/src/server.py index fd8f7d2..a0b5ec3 100644 --- a/src/server.py +++ b/src/server.py @@ -12,6 +12,7 @@ from config import Config from tracker import AccessTracker from handler import Handler from logger import initialize_logging, get_app_logger, get_access_logger, get_credential_logger +from database import initialize_database def print_usage(): @@ -33,6 +34,8 @@ def print_usage(): print(' PROBABILITY_ERROR_CODES - Probability (0-100) to return HTTP error codes (default: 0)') print(' CHAR_SPACE - Characters for random links') print(' SERVER_HEADER - HTTP Server header for deception (default: Apache/2.2.22 (Ubuntu))') + print(' DATABASE_PATH - Path to SQLite database (default: data/krawl.db)') + print(' DATABASE_RETENTION_DAYS - Days to retain database records (default: 30)') def main(): @@ -49,6 +52,13 @@ def main(): config = Config.from_env() + # Initialize database for persistent storage + try: + initialize_database(config.database_path) + app_logger.info(f'Database initialized at: {config.database_path}') + except Exception as e: + app_logger.warning(f'Database initialization failed: {e}. Continuing with in-memory only.') + tracker = AccessTracker() Handler.config = config diff --git a/src/templates/dashboard_template.py b/src/templates/dashboard_template.py index a267278..92e950d 100644 --- a/src/templates/dashboard_template.py +++ b/src/templates/dashboard_template.py @@ -5,49 +5,58 @@ Dashboard template for viewing honeypot statistics. Customize this template to change the dashboard appearance. """ +import html + + +def _escape(value) -> str: + """Escape HTML special characters to prevent XSS attacks.""" + if value is None: + return "" + return html.escape(str(value)) + def generate_dashboard(stats: dict) -> str: """Generate dashboard HTML with access statistics""" - # Generate IP rows + # Generate IP rows (IPs are generally safe but escape for consistency) top_ips_rows = '\n'.join([ - f'{i+1}{ip}{count}' + f'{i+1}{_escape(ip)}{count}' for i, (ip, count) in enumerate(stats['top_ips']) ]) or 'No data' - # Generate paths rows + # Generate paths rows (CRITICAL: paths can contain XSS payloads) top_paths_rows = '\n'.join([ - f'{i+1}{path}{count}' + f'{i+1}{_escape(path)}{count}' for i, (path, count) in enumerate(stats['top_paths']) ]) or 'No data' - # Generate User-Agent rows + # Generate User-Agent rows (CRITICAL: user agents can contain XSS payloads) top_ua_rows = '\n'.join([ - f'{i+1}{ua[:80]}{count}' + f'{i+1}{_escape(ua[:80])}{count}' for i, (ua, count) in enumerate(stats['top_user_agents']) ]) or 'No data' - # Generate suspicious accesses rows + # Generate suspicious accesses rows (CRITICAL: multiple user-controlled fields) suspicious_rows = '\n'.join([ - f'{log["ip"]}{log["path"]}{log["user_agent"][:60]}{log["timestamp"].split("T")[1][:8]}' + f'{_escape(log["ip"])}{_escape(log["path"])}{_escape(log["user_agent"][:60])}{_escape(log["timestamp"].split("T")[1][:8])}' for log in stats['recent_suspicious'][-10:] ]) or 'No suspicious activity detected' # Generate honeypot triggered IPs rows honeypot_rows = '\n'.join([ - f'{ip}{", ".join(paths)}{len(paths)}' + f'{_escape(ip)}{_escape(", ".join(paths))}{len(paths)}' for ip, paths in stats.get('honeypot_triggered_ips', []) ]) or 'No honeypot triggers yet' - # Generate attack types rows + # Generate attack types rows (CRITICAL: paths and user agents are user-controlled) attack_type_rows = '\n'.join([ - f'{log["ip"]}{log["path"]}{", ".join(log["attack_types"])}{log["user_agent"][:60]}{log["timestamp"].split("T")[1][:8]}' + f'{_escape(log["ip"])}{_escape(log["path"])}{_escape(", ".join(log["attack_types"]))}{_escape(log["user_agent"][:60])}{_escape(log["timestamp"].split("T")[1][:8])}' for log in stats.get('attack_types', [])[-10:] ]) or 'No attacks detected' - # Generate credential attempts rows + # Generate credential attempts rows (CRITICAL: usernames and passwords are user-controlled) credential_rows = '\n'.join([ - f'{log["ip"]}{log["username"]}{log["password"]}{log["path"]}{log["timestamp"].split("T")[1][:8]}' + f'{_escape(log["ip"])}{_escape(log["username"])}{_escape(log["password"])}{_escape(log["path"])}{_escape(log["timestamp"].split("T")[1][:8])}' for log in stats.get('credential_attempts', [])[-20:] ]) or 'No credentials captured yet' diff --git a/src/tracker.py b/src/tracker.py index 717a4c3..04ded3b 100644 --- a/src/tracker.py +++ b/src/tracker.py @@ -1,15 +1,29 @@ #!/usr/bin/env python3 -from typing import Dict, List, Tuple +from typing import Dict, List, Tuple, Optional from collections import defaultdict from datetime import datetime import re import urllib.parse +from database import get_database, DatabaseManager + class AccessTracker: - """Track IP addresses and paths accessed""" - def __init__(self): + """ + Track IP addresses and paths accessed. + + Maintains in-memory structures for fast dashboard access and + persists data to SQLite for long-term storage and analysis. + """ + def __init__(self, db_manager: Optional[DatabaseManager] = None): + """ + Initialize the access tracker. + + Args: + db_manager: Optional DatabaseManager for persistence. + If None, will use the global singleton. + """ self.ip_counts: Dict[str, int] = defaultdict(int) self.path_counts: Dict[str, int] = defaultdict(int) self.user_agent_counts: Dict[str, int] = defaultdict(int) @@ -21,7 +35,7 @@ class AccessTracker: 'burp', 'zap', 'w3af', 'metasploit', 'nuclei', 'gobuster', 'dirbuster' ] - # common attack types such as xss, shell injection, probes + # Common attack types such as xss, shell injection, probes self.attack_types = { 'path_traversal': r'\.\.', 'sql_injection': r"('|--|;|\bOR\b|\bUNION\b|\bSELECT\b|\bDROP\b)", @@ -33,6 +47,25 @@ class AccessTracker: # Track IPs that accessed honeypot paths from robots.txt self.honeypot_triggered: Dict[str, List[str]] = defaultdict(list) + # Database manager for persistence (lazily initialized) + self._db_manager = db_manager + + @property + def db(self) -> Optional[DatabaseManager]: + """ + Get the database manager, lazily initializing if needed. + + Returns: + DatabaseManager instance or None if not available + """ + if self._db_manager is None: + try: + self._db_manager = get_database() + except Exception: + # Database not initialized, persistence disabled + pass + return self._db_manager + def parse_credentials(self, post_data: str) -> Tuple[str, str]: """ Parse username and password from POST data. @@ -75,7 +108,12 @@ class AccessTracker: return username, password def record_credential_attempt(self, ip: str, path: str, username: str, password: str): - """Record a credential login attempt""" + """ + Record a credential login attempt. + + Stores in both in-memory list and SQLite database. + """ + # In-memory storage for dashboard self.credential_attempts.append({ 'ip': ip, 'path': path, @@ -84,37 +122,89 @@ class AccessTracker: 'timestamp': datetime.now().isoformat() }) - def record_access(self, ip: str, path: str, user_agent: str = '', body: str = ''): - """Record an access attempt""" + # Persist to database + if self.db: + try: + self.db.persist_credential( + ip=ip, + path=path, + username=username, + password=password + ) + except Exception: + # Don't crash if database persistence fails + pass + + def record_access( + self, + ip: str, + path: str, + user_agent: str = '', + body: str = '', + method: str = 'GET' + ): + """ + Record an access attempt. + + Stores in both in-memory structures and SQLite database. + + Args: + ip: Client IP address + path: Requested path + user_agent: Client user agent string + body: Request body (for POST/PUT) + method: HTTP method + """ self.ip_counts[ip] += 1 self.path_counts[path] += 1 if user_agent: self.user_agent_counts[user_agent] += 1 - - # path attack type detection + + # Path attack type detection attack_findings = self.detect_attack_type(path) - # post / put data + # POST/PUT body attack detection if len(body) > 0: attack_findings.extend(self.detect_attack_type(body)) - is_suspicious = self.is_suspicious_user_agent(user_agent) or self.is_honeypot_path(path) or len(attack_findings) > 0 + is_suspicious = ( + self.is_suspicious_user_agent(user_agent) or + self.is_honeypot_path(path) or + len(attack_findings) > 0 + ) + is_honeypot = self.is_honeypot_path(path) - # Track if this IP accessed a honeypot path - if self.is_honeypot_path(path): + if is_honeypot: self.honeypot_triggered[ip].append(path) - + + # In-memory storage for dashboard self.access_log.append({ 'ip': ip, 'path': path, 'user_agent': user_agent, 'suspicious': is_suspicious, - 'honeypot_triggered': self.is_honeypot_path(path), - 'attack_types':attack_findings, + 'honeypot_triggered': is_honeypot, + 'attack_types': attack_findings, 'timestamp': datetime.now().isoformat() }) + # Persist to database + if self.db: + try: + self.db.persist_access( + ip=ip, + path=path, + user_agent=user_agent, + method=method, + is_suspicious=is_suspicious, + is_honeypot_trigger=is_honeypot, + attack_types=attack_findings if attack_findings else None + ) + except Exception: + # Don't crash if database persistence fails + pass + def detect_attack_type(self, data:str) -> list[str]: """ Returns a list of all attack types found in path data