feat: implement IP tracking functionality with database integration

This commit is contained in:
Lorenzo Venerandi
2026-03-09 14:57:19 +01:00
parent 45b4f81467
commit e6eed2f647
2 changed files with 125 additions and 0 deletions

View File

@@ -34,6 +34,7 @@ from models import (
AttackDetection,
IpStats,
CategoryHistory,
TrackedIp,
)
from sanitizer import (
sanitize_ip,
@@ -2328,6 +2329,104 @@ class DatabaseManager:
finally:
self.close_session()
# ── IP Tracking ──────────────────────────────────────────────────
def track_ip(self, ip: str) -> bool:
"""Add an IP to the tracked list with a snapshot of its current stats."""
session = self.session
sanitized_ip = sanitize_ip(ip)
existing = session.query(TrackedIp).filter(TrackedIp.ip == sanitized_ip).first()
if existing:
return True # already tracked
# Snapshot essential data from ip_stats
stats = session.query(IpStats).filter(IpStats.ip == sanitized_ip).first()
tracked = TrackedIp(
ip=sanitized_ip,
tracked_since=datetime.now(),
category=stats.category if stats else None,
total_requests=stats.total_requests if stats else 0,
country_code=stats.country_code if stats else None,
city=stats.city if stats else None,
last_seen=stats.last_seen if stats else None,
)
session.add(tracked)
try:
session.commit()
return True
except Exception as e:
session.rollback()
applogger.error(f"Error tracking IP {sanitized_ip}: {e}")
return False
def untrack_ip(self, ip: str) -> bool:
"""Remove an IP from the tracked list."""
session = self.session
sanitized_ip = sanitize_ip(ip)
tracked = session.query(TrackedIp).filter(TrackedIp.ip == sanitized_ip).first()
if not tracked:
return False
session.delete(tracked)
try:
session.commit()
return True
except Exception as e:
session.rollback()
applogger.error(f"Error untracking IP {sanitized_ip}: {e}")
return False
def is_ip_tracked(self, ip: str) -> bool:
"""Check if an IP is currently tracked."""
session = self.session
sanitized_ip = sanitize_ip(ip)
try:
return session.query(TrackedIp).filter(TrackedIp.ip == sanitized_ip).first() is not None
finally:
self.close_session()
def get_tracked_ips_paginated(
self,
page: int = 1,
page_size: int = 25,
) -> Dict[str, Any]:
"""Get all tracked IPs, paginated. Reads only from tracked_ips table."""
session = self.session
try:
total = session.query(TrackedIp).count()
total_pages = max(1, (total + page_size - 1) // page_size)
tracked_rows = (
session.query(TrackedIp)
.order_by(TrackedIp.tracked_since.desc())
.offset((page - 1) * page_size)
.limit(page_size)
.all()
)
items = []
for t in tracked_rows:
items.append({
"ip": t.ip,
"tracked_since": t.tracked_since.isoformat() if t.tracked_since else None,
"category": t.category,
"total_requests": t.total_requests or 0,
"country_code": t.country_code,
"city": t.city,
"last_seen": t.last_seen.isoformat() if t.last_seen else None,
})
return {
"tracked_ips": items,
"pagination": {
"page": page,
"page_size": page_size,
"total": total,
"total_pages": total_pages,
},
}
finally:
self.close_session()
# Module-level singleton instance
_db_manager = DatabaseManager()

View File

@@ -244,6 +244,32 @@ class CategoryHistory(Base):
return f"<CategoryHistory(ip='{self.ip}', {self.old_category} -> {self.new_category})>"
class TrackedIp(Base):
"""
Manually tracked IP addresses for monitoring.
Stores a snapshot of essential ip_stats data at tracking time
so the tracked IPs panel never needs to query the large ip_stats table.
"""
__tablename__ = "tracked_ips"
ip: Mapped[str] = mapped_column(String(MAX_IP_LENGTH), primary_key=True)
tracked_since: Mapped[datetime] = mapped_column(
DateTime, nullable=False, default=datetime.utcnow
)
# Snapshot from ip_stats at tracking time
category: Mapped[Optional[str]] = mapped_column(String(50), nullable=True)
total_requests: Mapped[int] = mapped_column(Integer, default=0, nullable=True)
country_code: Mapped[Optional[str]] = mapped_column(String(2), nullable=True)
city: Mapped[Optional[str]] = mapped_column(String(MAX_CITY_LENGTH), nullable=True)
last_seen: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
def __repr__(self) -> str:
return f"<TrackedIp(ip='{self.ip}')>"
# class IpLog(Base):
# """
# Records all IPs that have accessed the honeypot, along with aggregated stats and inferred user category.