added categorization visualization and itmeline

This commit is contained in:
Patrick Di Fazio
2026-01-07 18:24:43 +01:00
parent 02aed9e65a
commit 7690841029
4 changed files with 240 additions and 8 deletions

View File

@@ -13,7 +13,7 @@ from typing import Optional, List, Dict, Any
from sqlalchemy import create_engine, func, distinct, case
from sqlalchemy.orm import sessionmaker, scoped_session, Session
from models import Base, AccessLog, CredentialAttempt, AttackDetection, IpStats
from models import Base, AccessLog, CredentialAttempt, AttackDetection, IpStats, CategoryHistory
from sanitizer import (
sanitize_ip,
sanitize_path,
@@ -226,6 +226,7 @@ class DatabaseManager:
def update_ip_stats_analysis(self, ip: str, analyzed_metrics: Dict[str, object], category: str, category_scores: Dict[str, int], last_analysis: datetime) -> None:
"""
Update IP statistics (ip is already persisted).
Records category change in history if category has changed.
Args:
ip: IP address to update
@@ -241,6 +242,11 @@ class DatabaseManager:
sanitized_ip = sanitize_ip(ip)
ip_stats = session.query(IpStats).filter(IpStats.ip == sanitized_ip).first()
# Check if category has changed and record it
old_category = ip_stats.category
if old_category != category:
self._record_category_change(sanitized_ip, old_category, category, last_analysis)
ip_stats.analyzed_metrics = analyzed_metrics
ip_stats.category = category
ip_stats.category_scores = category_scores
@@ -259,9 +265,66 @@ class DatabaseManager:
sanitized_ip = sanitize_ip(ip)
ip_stats = session.query(IpStats).filter(IpStats.ip == sanitized_ip).first()
# Record the manual category change
old_category = ip_stats.category
if old_category != category:
self._record_category_change(sanitized_ip, old_category, category, datetime.utcnow())
ip_stats.category = category
ip_stats.manual_category = True
def _record_category_change(self, ip: str, old_category: Optional[str], new_category: str, timestamp: datetime) -> None:
"""
Internal method to record category changes in history.
Args:
ip: IP address
old_category: Previous category (None if first categorization)
new_category: New category
timestamp: When the change occurred
"""
session = self.session
try:
history_entry = CategoryHistory(
ip=ip,
old_category=old_category,
new_category=new_category,
timestamp=timestamp
)
session.add(history_entry)
session.commit()
except Exception as e:
session.rollback()
print(f"Error recording category change: {e}")
def get_category_history(self, ip: str) -> List[Dict[str, Any]]:
"""
Retrieve category change history for a specific IP.
Args:
ip: IP address to get history for
Returns:
List of category change records ordered by timestamp
"""
session = self.session
try:
sanitized_ip = sanitize_ip(ip)
history = session.query(CategoryHistory).filter(
CategoryHistory.ip == sanitized_ip
).order_by(CategoryHistory.timestamp.asc()).all()
return [
{
'old_category': h.old_category,
'new_category': h.new_category,
'timestamp': h.timestamp.isoformat()
}
for h in history
]
finally:
self.close_session()
def get_access_logs(
self,
limit: int = 100,
@@ -456,6 +519,9 @@ class DatabaseManager:
if not stat:
return None
# Get category history for this IP
category_history = self.get_category_history(ip)
return {
'ip': stat.ip,
'total_requests': stat.total_requests,
@@ -471,7 +537,8 @@ class DatabaseManager:
'category': stat.category,
'category_scores': stat.category_scores or {},
'manual_category': stat.manual_category,
'last_analysis': stat.last_analysis.isoformat() if stat.last_analysis else None
'last_analysis': stat.last_analysis.isoformat() if stat.last_analysis else None,
'category_history': category_history
}
finally:
self.close_session()