diff --git a/src/database.py b/src/database.py index 58a4505..e0de320 100644 --- a/src/database.py +++ b/src/database.py @@ -10,7 +10,7 @@ import stat from datetime import datetime from typing import Optional, List, Dict, Any -from sqlalchemy import create_engine +from sqlalchemy import create_engine, func, distinct, case from sqlalchemy.orm import sessionmaker, scoped_session, Session from models import Base, AccessLog, CredentialAttempt, AttackDetection, IpStats @@ -346,6 +346,200 @@ class DatabaseManager: finally: self.close_session() + def get_dashboard_counts(self) -> Dict[str, int]: + """ + Get aggregate statistics for the dashboard. + + Returns: + Dictionary with total_accesses, unique_ips, unique_paths, + suspicious_accesses, honeypot_triggered, honeypot_ips + """ + session = self.session + try: + # Get main aggregate counts in one query + result = session.query( + func.count(AccessLog.id).label('total_accesses'), + func.count(distinct(AccessLog.ip)).label('unique_ips'), + func.count(distinct(AccessLog.path)).label('unique_paths'), + func.sum(case((AccessLog.is_suspicious == True, 1), else_=0)).label('suspicious_accesses'), + func.sum(case((AccessLog.is_honeypot_trigger == True, 1), else_=0)).label('honeypot_triggered') + ).first() + + # Get unique IPs that triggered honeypots + honeypot_ips = session.query( + func.count(distinct(AccessLog.ip)) + ).filter(AccessLog.is_honeypot_trigger == True).scalar() or 0 + + return { + 'total_accesses': result.total_accesses or 0, + 'unique_ips': result.unique_ips or 0, + 'unique_paths': result.unique_paths or 0, + 'suspicious_accesses': int(result.suspicious_accesses or 0), + 'honeypot_triggered': int(result.honeypot_triggered or 0), + 'honeypot_ips': honeypot_ips + } + finally: + self.close_session() + + def get_top_ips(self, limit: int = 10) -> List[tuple]: + """ + Get top IP addresses by access count. + + Args: + limit: Maximum number of results + + Returns: + List of (ip, count) tuples ordered by count descending + """ + session = self.session + try: + results = session.query( + AccessLog.ip, + func.count(AccessLog.id).label('count') + ).group_by(AccessLog.ip).order_by( + func.count(AccessLog.id).desc() + ).limit(limit).all() + + return [(row.ip, row.count) for row in results] + finally: + self.close_session() + + def get_top_paths(self, limit: int = 10) -> List[tuple]: + """ + Get top paths by access count. + + Args: + limit: Maximum number of results + + Returns: + List of (path, count) tuples ordered by count descending + """ + session = self.session + try: + results = session.query( + AccessLog.path, + func.count(AccessLog.id).label('count') + ).group_by(AccessLog.path).order_by( + func.count(AccessLog.id).desc() + ).limit(limit).all() + + return [(row.path, row.count) for row in results] + finally: + self.close_session() + + def get_top_user_agents(self, limit: int = 10) -> List[tuple]: + """ + Get top user agents by access count. + + Args: + limit: Maximum number of results + + Returns: + List of (user_agent, count) tuples ordered by count descending + """ + session = self.session + try: + results = session.query( + AccessLog.user_agent, + func.count(AccessLog.id).label('count') + ).filter( + AccessLog.user_agent.isnot(None), + AccessLog.user_agent != '' + ).group_by(AccessLog.user_agent).order_by( + func.count(AccessLog.id).desc() + ).limit(limit).all() + + return [(row.user_agent, row.count) for row in results] + finally: + self.close_session() + + def get_recent_suspicious(self, limit: int = 20) -> List[Dict[str, Any]]: + """ + Get recent suspicious access attempts. + + Args: + limit: Maximum number of results + + Returns: + List of access log dictionaries with is_suspicious=True + """ + session = self.session + try: + logs = session.query(AccessLog).filter( + AccessLog.is_suspicious == True + ).order_by(AccessLog.timestamp.desc()).limit(limit).all() + + return [ + { + 'ip': log.ip, + 'path': log.path, + 'user_agent': log.user_agent, + 'timestamp': log.timestamp.isoformat() + } + for log in logs + ] + finally: + self.close_session() + + def get_honeypot_triggered_ips(self) -> List[tuple]: + """ + Get IPs that triggered honeypot paths with the paths they accessed. + + Returns: + List of (ip, [paths]) tuples + """ + session = self.session + try: + # Get all honeypot triggers grouped by IP + results = session.query( + AccessLog.ip, + AccessLog.path + ).filter( + AccessLog.is_honeypot_trigger == True + ).all() + + # Group paths by IP + ip_paths: Dict[str, List[str]] = {} + for row in results: + if row.ip not in ip_paths: + ip_paths[row.ip] = [] + if row.path not in ip_paths[row.ip]: + ip_paths[row.ip].append(row.path) + + return [(ip, paths) for ip, paths in ip_paths.items()] + finally: + self.close_session() + + def get_recent_attacks(self, limit: int = 20) -> List[Dict[str, Any]]: + """ + Get recent access logs that have attack detections. + + Args: + limit: Maximum number of results + + Returns: + List of access log dicts with attack_types included + """ + session = self.session + try: + # Get access logs that have attack detections + logs = session.query(AccessLog).join( + AttackDetection + ).order_by(AccessLog.timestamp.desc()).limit(limit).all() + + return [ + { + 'ip': log.ip, + 'path': log.path, + 'user_agent': log.user_agent, + 'timestamp': log.timestamp.isoformat(), + 'attack_types': [d.attack_type for d in log.attack_detections] + } + for log in logs + ] + finally: + self.close_session() + # Module-level singleton instance _db_manager = DatabaseManager() diff --git a/src/models.py b/src/models.py index f6e7d30..40dae0b 100644 --- a/src/models.py +++ b/src/models.py @@ -53,9 +53,11 @@ class AccessLog(Base): cascade="all, delete-orphan" ) - # Composite index for common queries + # Indexes for common queries __table_args__ = ( Index('ix_access_logs_ip_timestamp', 'ip', 'timestamp'), + Index('ix_access_logs_is_suspicious', 'is_suspicious'), + Index('ix_access_logs_is_honeypot_trigger', 'is_honeypot_trigger'), ) def __repr__(self) -> str: diff --git a/src/templates/dashboard_template.py b/src/templates/dashboard_template.py index 2323843..455833d 100644 --- a/src/templates/dashboard_template.py +++ b/src/templates/dashboard_template.py @@ -190,7 +190,7 @@ def generate_dashboard(stats: dict) -> str: