#!/usr/bin/env python3 """ SQLAlchemy ORM models for the Krawl honeypot database. Stores access logs, credential attempts, attack detections, and IP statistics. """ from datetime import datetime from typing import Optional, List from sqlalchemy import String, Integer, Boolean, DateTime, ForeignKey, Index from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship from sanitizer import ( MAX_IP_LENGTH, MAX_PATH_LENGTH, MAX_USER_AGENT_LENGTH, MAX_CREDENTIAL_LENGTH, MAX_ATTACK_PATTERN_LENGTH, MAX_CITY_LENGTH, MAX_ASN_ORG_LENGTH, MAX_REPUTATION_SOURCE_LENGTH, ) class Base(DeclarativeBase): """Base class for all ORM models.""" pass class AccessLog(Base): """ Records all HTTP requests to the honeypot. Stores request metadata, suspicious activity flags, and timestamps for analysis and dashboard display. """ __tablename__ = 'access_logs' id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) ip: Mapped[str] = mapped_column(String(MAX_IP_LENGTH), nullable=False, index=True) path: Mapped[str] = mapped_column(String(MAX_PATH_LENGTH), nullable=False) user_agent: Mapped[Optional[str]] = mapped_column(String(MAX_USER_AGENT_LENGTH), nullable=True) method: Mapped[str] = mapped_column(String(10), nullable=False, default='GET') is_suspicious: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) is_honeypot_trigger: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) timestamp: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=datetime.utcnow, index=True) # Relationship to attack detections attack_detections: Mapped[List["AttackDetection"]] = relationship( "AttackDetection", back_populates="access_log", cascade="all, delete-orphan" ) # Composite index for common queries __table_args__ = ( Index('ix_access_logs_ip_timestamp', 'ip', 'timestamp'), ) def __repr__(self) -> str: return f"" class CredentialAttempt(Base): """ Records captured login attempts from honeypot login forms. Stores the submitted username and password along with request metadata. """ __tablename__ = 'credential_attempts' id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) ip: Mapped[str] = mapped_column(String(MAX_IP_LENGTH), nullable=False, index=True) path: Mapped[str] = mapped_column(String(MAX_PATH_LENGTH), nullable=False) username: Mapped[Optional[str]] = mapped_column(String(MAX_CREDENTIAL_LENGTH), nullable=True) password: Mapped[Optional[str]] = mapped_column(String(MAX_CREDENTIAL_LENGTH), nullable=True) timestamp: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=datetime.utcnow, index=True) # Composite index for common queries __table_args__ = ( Index('ix_credential_attempts_ip_timestamp', 'ip', 'timestamp'), ) def __repr__(self) -> str: return f"" class AttackDetection(Base): """ Records detected attack patterns in requests. Linked to the parent AccessLog record. Multiple attack types can be detected in a single request. """ __tablename__ = 'attack_detections' id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) access_log_id: Mapped[int] = mapped_column( Integer, ForeignKey('access_logs.id', ondelete='CASCADE'), nullable=False, index=True ) attack_type: Mapped[str] = mapped_column(String(50), nullable=False) matched_pattern: Mapped[Optional[str]] = mapped_column(String(MAX_ATTACK_PATTERN_LENGTH), nullable=True) # Relationship back to access log access_log: Mapped["AccessLog"] = relationship("AccessLog", back_populates="attack_detections") def __repr__(self) -> str: return f"" class IpStats(Base): """ Aggregated statistics per IP address. Includes fields for future GeoIP and reputation enrichment. Updated on each request from an IP. """ __tablename__ = 'ip_stats' ip: Mapped[str] = mapped_column(String(MAX_IP_LENGTH), primary_key=True) total_requests: Mapped[int] = mapped_column(Integer, nullable=False, default=0) first_seen: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=datetime.utcnow) last_seen: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=datetime.utcnow) # GeoIP fields (populated by future enrichment) country_code: Mapped[Optional[str]] = mapped_column(String(2), nullable=True) city: Mapped[Optional[str]] = mapped_column(String(MAX_CITY_LENGTH), nullable=True) asn: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) asn_org: Mapped[Optional[str]] = mapped_column(String(MAX_ASN_ORG_LENGTH), nullable=True) # Reputation fields (populated by future enrichment) reputation_score: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) reputation_source: Mapped[Optional[str]] = mapped_column(String(MAX_REPUTATION_SOURCE_LENGTH), nullable=True) reputation_updated: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) def __repr__(self) -> str: return f""