From e6eed2f6472d8ca691fccfa805d658b56a8f1ec3 Mon Sep 17 00:00:00 2001 From: Lorenzo Venerandi Date: Mon, 9 Mar 2026 14:57:19 +0100 Subject: [PATCH] feat: implement IP tracking functionality with database integration --- src/database.py | 99 +++++++++++++++++++++++++++++++++++++++++++++++++ src/models.py | 26 +++++++++++++ 2 files changed, 125 insertions(+) diff --git a/src/database.py b/src/database.py index 84eff37..3998831 100644 --- a/src/database.py +++ b/src/database.py @@ -34,6 +34,7 @@ from models import ( AttackDetection, IpStats, CategoryHistory, + TrackedIp, ) from sanitizer import ( sanitize_ip, @@ -2328,6 +2329,104 @@ class DatabaseManager: finally: self.close_session() + # ── IP Tracking ────────────────────────────────────────────────── + + def track_ip(self, ip: str) -> bool: + """Add an IP to the tracked list with a snapshot of its current stats.""" + session = self.session + sanitized_ip = sanitize_ip(ip) + existing = session.query(TrackedIp).filter(TrackedIp.ip == sanitized_ip).first() + if existing: + return True # already tracked + + # Snapshot essential data from ip_stats + stats = session.query(IpStats).filter(IpStats.ip == sanitized_ip).first() + tracked = TrackedIp( + ip=sanitized_ip, + tracked_since=datetime.now(), + category=stats.category if stats else None, + total_requests=stats.total_requests if stats else 0, + country_code=stats.country_code if stats else None, + city=stats.city if stats else None, + last_seen=stats.last_seen if stats else None, + ) + session.add(tracked) + try: + session.commit() + return True + except Exception as e: + session.rollback() + applogger.error(f"Error tracking IP {sanitized_ip}: {e}") + return False + + def untrack_ip(self, ip: str) -> bool: + """Remove an IP from the tracked list.""" + session = self.session + sanitized_ip = sanitize_ip(ip) + tracked = session.query(TrackedIp).filter(TrackedIp.ip == sanitized_ip).first() + if not tracked: + return False + session.delete(tracked) + try: + session.commit() + return True + except Exception as e: + session.rollback() + applogger.error(f"Error untracking IP {sanitized_ip}: {e}") + return False + + def is_ip_tracked(self, ip: str) -> bool: + """Check if an IP is currently tracked.""" + session = self.session + sanitized_ip = sanitize_ip(ip) + try: + return session.query(TrackedIp).filter(TrackedIp.ip == sanitized_ip).first() is not None + finally: + self.close_session() + + def get_tracked_ips_paginated( + self, + page: int = 1, + page_size: int = 25, + ) -> Dict[str, Any]: + """Get all tracked IPs, paginated. Reads only from tracked_ips table.""" + session = self.session + try: + total = session.query(TrackedIp).count() + total_pages = max(1, (total + page_size - 1) // page_size) + + tracked_rows = ( + session.query(TrackedIp) + .order_by(TrackedIp.tracked_since.desc()) + .offset((page - 1) * page_size) + .limit(page_size) + .all() + ) + + items = [] + for t in tracked_rows: + items.append({ + "ip": t.ip, + "tracked_since": t.tracked_since.isoformat() if t.tracked_since else None, + "category": t.category, + "total_requests": t.total_requests or 0, + "country_code": t.country_code, + "city": t.city, + "last_seen": t.last_seen.isoformat() if t.last_seen else None, + }) + + return { + "tracked_ips": items, + "pagination": { + "page": page, + "page_size": page_size, + "total": total, + "total_pages": total_pages, + }, + } + finally: + self.close_session() + # Module-level singleton instance _db_manager = DatabaseManager() diff --git a/src/models.py b/src/models.py index d759e52..727d70a 100644 --- a/src/models.py +++ b/src/models.py @@ -244,6 +244,32 @@ class CategoryHistory(Base): return f" {self.new_category})>" +class TrackedIp(Base): + """ + Manually tracked IP addresses for monitoring. + + Stores a snapshot of essential ip_stats data at tracking time + so the tracked IPs panel never needs to query the large ip_stats table. + """ + + __tablename__ = "tracked_ips" + + ip: Mapped[str] = mapped_column(String(MAX_IP_LENGTH), primary_key=True) + tracked_since: Mapped[datetime] = mapped_column( + DateTime, nullable=False, default=datetime.utcnow + ) + + # Snapshot from ip_stats at tracking time + category: Mapped[Optional[str]] = mapped_column(String(50), nullable=True) + total_requests: Mapped[int] = mapped_column(Integer, default=0, nullable=True) + country_code: Mapped[Optional[str]] = mapped_column(String(2), nullable=True) + city: Mapped[Optional[str]] = mapped_column(String(MAX_CITY_LENGTH), nullable=True) + last_seen: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) + + def __repr__(self) -> str: + return f"" + + # class IpLog(Base): # """ # Records all IPs that have accessed the honeypot, along with aggregated stats and inferred user category.