added scoring system + db model modifications

This commit is contained in:
Leonardo Bambini
2026-01-04 19:12:23 +01:00
parent 5a00e374e6
commit 48f38cb28e
7 changed files with 484 additions and 4 deletions

290
src/analyzer.py Normal file
View File

@@ -0,0 +1,290 @@
#!/usr/bin/env python3
from sqlalchemy import select
from typing import Optional
from database import get_database, DatabaseManager
from zoneinfo import ZoneInfo
from pathlib import Path
from datetime import datetime, timedelta
import re
from wordlists import get_wordlists
"""
Functions for user activity analysis
"""
class Analyzer:
"""
Analyzes users activity and produces aggregated insights
"""
def __init__(self, db_manager: Optional[DatabaseManager] = None, timezone: Optional[ZoneInfo] = None):
"""
Initialize the access tracker.
Args:
db_manager: Optional DatabaseManager for persistence.
If None, will use the global singleton.
"""
self.timezone = timezone or ZoneInfo('UTC')
# Database manager for persistence (lazily initialized)
self._db_manager = db_manager
@property
def db(self) -> Optional[DatabaseManager]:
"""
Get the database manager, lazily initializing if needed.
Returns:
DatabaseManager instance or None if not available
"""
if self._db_manager is None:
try:
self._db_manager = get_database()
except Exception:
# Database not initialized, persistence disabled
pass
return self._db_manager
def infer_user_category(self, ip: str) -> str:
score = {}
score["attacker"] = {"risky_http_methods": False, "robots_violations": False, "uneven_request_timing": False, "different_user_agents": False, "attack_url": False}
score["good_crawler"] = {"risky_http_methods": False, "robots_violations": False, "uneven_request_timing": False, "different_user_agents": False, "attack_url": False}
score["bad_crawler"] = {"risky_http_methods": False, "robots_violations": False, "uneven_request_timing": False, "different_user_agents": False, "attack_url": False}
score["regular_user"] = {"risky_http_methods": False, "robots_violations": False, "uneven_request_timing": False, "different_user_agents": False, "attack_url": False}
#1-3 low, 4-6 mid, 7-9 high, 10-20 extreme
weights = {
"attacker": {
"risky_http_methods": 6,
"robots_violations": 4,
"uneven_request_timing": 5,
"different_user_agents": 8,
"attack_url": 15
},
"good_crawler": {
"risky_http_methods": 0,
"robots_violations": 0,
"uneven_request_timing": 0,
"different_user_agents": 0,
"attack_url": 0
},
"bad_crawler": {
"risky_http_methods": 2,
"robots_violations": 4,
"uneven_request_timing": 0,
"different_user_agents": 5,
"attack_url": 5
},
"regular_user": {
"risky_http_methods": 0,
"robots_violations": 0,
"uneven_request_timing": 8,
"different_user_agents": 3,
"attack_url": 0
}
}
accesses = self.db.get_access_logs(ip_filter = ip, limit=1000)
total_accesses_count = len(accesses)
if total_accesses_count <= 0:
return
#--------------------- HTTP Methods ---------------------
get_accesses_count = len([item for item in accesses if item["method"] == "GET"])
post_accesses_count = len([item for item in accesses if item["method"] == "POST"])
put_accesses_count = len([item for item in accesses if item["method"] == "PUT"])
delete_accesses_count = len([item for item in accesses if item["method"] == "DELETE"])
head_accesses_count = len([item for item in accesses if item["method"] == "HEAD"])
options_accesses_count = len([item for item in accesses if item["method"] == "OPTIONS"])
patch_accesses_count = len([item for item in accesses if item["method"] == "PATCH"])
#print(f"TOTAL: {total_accesses_count} - GET: {get_accesses_count} - POST: {post_accesses_count}")
#if >5% attacker or bad crawler
if total_accesses_count > 0:
http_method_attacker_score = (post_accesses_count + put_accesses_count + delete_accesses_count + options_accesses_count + patch_accesses_count) / total_accesses_count
else:
http_method_attacker_score = 0
#print(f"HTTP Method attacker score: {http_method_attacker_score}")
if http_method_attacker_score > 0.2:
score["attacker"]["risky_http_methods"] = True
score["good_crawler"]["risky_http_methods"] = False
score["bad_crawler"]["risky_http_methods"] = True
score["regular_user"]["risky_http_methods"] = False
else:
score["attacker"]["risky_http_methods"] = False
score["good_crawler"]["risky_http_methods"] = False
score["bad_crawler"]["risky_http_methods"] = False
score["regular_user"]["risky_http_methods"] = False
#print(f"Updated score: {score}")
#--------------------- Robots Violations ---------------------
#respect robots.txt and login/config pages access frequency
robots_disallows = []
robots_path = config_path = Path(__file__).parent / "templates" / "html" / "robots.txt"
with open(robots_path, "r") as f:
for line in f:
line = line.strip()
if not line:
continue
parts = line.split(":")
if parts[0] == "Disallow":
parts[1] = parts[1].rstrip("/")
#print(f"DISALLOW {parts[1]}")
robots_disallows.append(parts[1].strip())
#if 0 100% sure is good crawler, if >10% of robots violated is bad crawler or attacker
violated_robots_count = len([item for item in accesses if item["path"].rstrip("/") in tuple(robots_disallows)])
#print(f"Violated robots count: {violated_robots_count}")
if total_accesses_count > 0:
violated_robots_ratio = violated_robots_count / total_accesses_count
else:
violated_robots_ratio = 0
if violated_robots_ratio > 0.10:
score["attacker"]["robots_violations"] = True
score["good_crawler"]["robots_violations"] = False
score["bad_crawler"]["robots_violations"] = True
score["regular_user"]["robots_violations"] = False
else:
score["attacker"]["robots_violations"] = True
score["good_crawler"]["robots_violations"] = False
score["bad_crawler"]["robots_violations"] = True
score["regular_user"]["robots_violations"] = False
#--------------------- Requests Timing ---------------------
#Request rate and timing: steady, throttled, polite vs attackers' bursty, aggressive, or oddly rhythmic behavior
timestamps = [datetime.fromisoformat(item["timestamp"]) for item in accesses]
print(f"Timestamps #: {len(timestamps)}")
timestamps = [ts for ts in timestamps if datetime.utcnow() - ts <= timedelta(minutes=5)]
print(f"Timestamps #: {len(timestamps)}")
timestamps = sorted(timestamps, reverse=True)
print(f"Timestamps #: {len(timestamps)}")
time_diffs = []
for i in range(0, len(timestamps)-1):
diff = (timestamps[i] - timestamps[i+1]).total_seconds()
time_diffs.append(diff)
print(f"Time diffs: {time_diffs}")
mean = 0
variance = 0
std = 0
cv = 0
if time_diffs:
mean = sum(time_diffs) / len(time_diffs)
variance = sum((x - mean) ** 2 for x in time_diffs) / len(time_diffs)
std = variance ** 0.5
cv = std/mean
print(f"Mean: {mean} - Variance {variance} - Standard Deviation {std} - Coefficient of Variation: {cv}")
if mean > 4:
score["attacker"]["uneven_request_timing"] = True
score["good_crawler"]["uneven_request_timing"] = False
score["bad_crawler"]["uneven_request_timing"] = False
score["regular_user"]["uneven_request_timing"] = True
else:
score["attacker"]["uneven_request_timing"] = True
score["good_crawler"]["uneven_request_timing"] = False
score["bad_crawler"]["uneven_request_timing"] = True
score["regular_user"]["uneven_request_timing"] = False
#--------------------- Different User Agents ---------------------
#Header Quality and Consistency: Crawlers tend to use complete and consistent headers, attackers might miss, fake, or change headers
user_agents_used = [item["user_agent"] for item in accesses]
user_agents_used = list(dict.fromkeys(user_agents_used))
#print(f"User agents used: {user_agents_used}")
if len(user_agents_used)> 4:
score["attacker"]["different_user_agents"] = True
score["good_crawler"]["different_user_agents"] = False
score["bad_crawler"]["different_user_agentss"] = True
score["regular_user"]["different_user_agents"] = False
else:
score["attacker"]["different_user_agents"] = True
score["good_crawler"]["different_user_agents"] = False
score["bad_crawler"]["different_user_agents"] = True
score["regular_user"]["different_user_agents"] = False
#--------------------- Attack URLs ---------------------
attack_url_found = False
# attack_types = {
# 'path_traversal': r'\.\.',
# 'sql_injection': r"('|--|;|\bOR\b|\bUNION\b|\bSELECT\b|\bDROP\b)",
# 'xss_attempt': r'(<script|javascript:|onerror=|onload=)',
# 'shell_injection': r'(\||;|`|\$\(|&&)'
# }
wl = get_wordlists()
if wl.attack_urls:
queried_paths = [item["path"] for item in accesses]
for queried_path in queried_paths:
#print(f"QUERIED PATH: {queried_path}")
for name, pattern in wl.attack_urls.items():
#print(f"Pattern: {pattern}")
if re.search(pattern, queried_path, re.IGNORECASE):
attack_url_found = True
if attack_url_found:
score["attacker"]["attack_url"] = True
score["good_crawler"]["attack_url"] = False
score["bad_crawler"]["attack_url"] = False
score["regular_user"]["attack_url"] = False
else:
score["attacker"]["attack_url"] = False
score["good_crawler"]["attack_url"] = False
score["bad_crawler"]["attack_url"] = False
score["regular_user"]["attack_url"] = False
#--------------------- Calculate score ---------------------
attacker_score = score["attacker"]["risky_http_methods"] * weights["attacker"]["risky_http_methods"]
attacker_score = attacker_score + score["attacker"]["robots_violations"] * weights["attacker"]["robots_violations"]
attacker_score = attacker_score + score["attacker"]["uneven_request_timing"] * weights["attacker"]["uneven_request_timing"]
attacker_score = attacker_score + score["attacker"]["different_user_agents"] * weights["attacker"]["different_user_agents"]
attacker_score = attacker_score + score["attacker"]["attack_url"] * weights["attacker"]["attack_url"]
good_crawler_score = score["good_crawler"]["risky_http_methods"] * weights["good_crawler"]["risky_http_methods"]
good_crawler_score = good_crawler_score + score["good_crawler"]["robots_violations"] * weights["good_crawler"]["robots_violations"]
good_crawler_score = good_crawler_score + score["good_crawler"]["uneven_request_timing"] * weights["good_crawler"]["uneven_request_timing"]
good_crawler_score = good_crawler_score + score["good_crawler"]["different_user_agents"] * weights["good_crawler"]["different_user_agents"]
good_crawler_score = good_crawler_score + score["good_crawler"]["attack_url"] * weights["good_crawler"]["attack_url"]
bad_crawler_score = score["bad_crawler"]["risky_http_methods"] * weights["bad_crawler"]["risky_http_methods"]
bad_crawler_score = bad_crawler_score + score["bad_crawler"]["robots_violations"] * weights["bad_crawler"]["robots_violations"]
bad_crawler_score = bad_crawler_score + score["bad_crawler"]["uneven_request_timing"] * weights["bad_crawler"]["uneven_request_timing"]
bad_crawler_score = bad_crawler_score + score["bad_crawler"]["different_user_agents"] * weights["bad_crawler"]["different_user_agents"]
bad_crawler_score = bad_crawler_score + score["bad_crawler"]["attack_url"] * weights["bad_crawler"]["attack_url"]
regular_user_score = score["regular_user"]["risky_http_methods"] * weights["regular_user"]["risky_http_methods"]
regular_user_score = regular_user_score + score["regular_user"]["robots_violations"] * weights["regular_user"]["robots_violations"]
regular_user_score = regular_user_score + score["regular_user"]["uneven_request_timing"] * weights["regular_user"]["uneven_request_timing"]
regular_user_score = regular_user_score + score["regular_user"]["different_user_agents"] * weights["regular_user"]["different_user_agents"]
regular_user_score = regular_user_score + score["regular_user"]["attack_url"] * weights["regular_user"]["attack_url"]
#print(f"Attacker score: {attacker_score}")
#print(f"Good Crawler score: {good_crawler_score}")
#print(f"Bad Crawler score: {bad_crawler_score}")
#print(f"Regular User score: {regular_user_score}")
analyzed_metrics = {"risky_http_methods": http_method_attacker_score, "robots_violations": violated_robots_ratio, "uneven_request_timing": mean, "different_user_agents": user_agents_used, "attack_url": attack_url_found}
category_scores = {"attacker": attacker_score, "good_crawler": good_crawler_score, "bad_crawler": bad_crawler_score, "regular_user": regular_user_score}
category = max(category_scores, key=category_scores.get)
last_analysis = datetime.utcnow()
self._db_manager.update_ip_stats_analysis(ip, analyzed_metrics, category, category_scores, last_analysis)
return 0

View File

@@ -223,6 +223,45 @@ class DatabaseManager:
)
session.add(ip_stats)
def update_ip_stats_analysis(self, ip: str, analyzed_metrics: Dict[str, object], category: str, category_scores: Dict[str, int], last_analysis: datetime) -> None:
"""
Update IP statistics (ip is already persisted).
Args:
ip: IP address to update
analyzed_metrics: metric values analyzed be the analyzer
category: inferred category
category_scores: inferred category scores
last_analysis: timestamp of last analysis
"""
print(f"Analyzed metrics {analyzed_metrics}, category {category}, category scores {category_scores}, last analysis {last_analysis}")
session = self.session
sanitized_ip = sanitize_ip(ip)
ip_stats = session.query(IpStats).filter(IpStats.ip == sanitized_ip).first()
ip_stats.analyzed_metrics = analyzed_metrics
ip_stats.category = category
ip_stats.category_scores = category_scores
ip_stats.last_analysis = last_analysis
def manual_update_category(self, ip: str, category: str) -> None:
"""
Update IP category as a result of a manual intervention by an admin
Args:
ip: IP address to update
category: selected category
"""
session = self.session
ip_stats = session.query(IpStats).filter(IpStats.ip == sanitized_ip).first()
ip_stats.category = category
ip_stats.manual_category = True
def get_access_logs(
self,
limit: int = 100,
@@ -270,6 +309,56 @@ class DatabaseManager:
finally:
self.close_session()
# def persist_ip(
# self,
# ip: str
# ) -> Optional[int]:
# """
# Persist an ip entry to the database.
# Args:
# ip: Client IP address
# Returns:
# The ID of the created IpLog record, or None on error
# """
# session = self.session
# try:
# # Create access log with sanitized fields
# ip_log = AccessLog(
# ip=sanitize_ip(ip),
# manual_category = False
# )
# session.add(access_log)
# session.flush() # Get the ID before committing
# # Add attack detections if any
# if attack_types:
# matched_patterns = matched_patterns or {}
# for attack_type in attack_types:
# detection = AttackDetection(
# access_log_id=access_log.id,
# attack_type=attack_type[:50],
# matched_pattern=sanitize_attack_pattern(
# matched_patterns.get(attack_type, "")
# )
# )
# session.add(detection)
# # Update IP stats
# self._update_ip_stats(session, ip)
# session.commit()
# return access_log.id
# except Exception as e:
# session.rollback()
# # Log error but don't crash - database persistence is secondary to honeypot function
# print(f"Database error persisting access: {e}")
# return None
# finally:
# self.close_session()
def get_credential_attempts(
self,
limit: int = 100,
@@ -339,7 +428,11 @@ class DatabaseManager:
'asn': s.asn,
'asn_org': s.asn_org,
'reputation_score': s.reputation_score,
'reputation_source': s.reputation_source
'reputation_source': s.reputation_source,
'analyzed_metrics': s.analyzed_metrics,
'category': s.category,
'manual_category': s.manual_category,
'last_analysis': s.last_analysis
}
for s in stats
]
@@ -540,6 +633,47 @@ class DatabaseManager:
finally:
self.close_session()
# def get_ip_logs(
# self,
# limit: int = 100,
# offset: int = 0,
# ip_filter: Optional[str] = None
# ) -> List[Dict[str, Any]]:
# """
# Retrieve ip logs with optional filtering.
# Args:
# limit: Maximum number of records to return
# offset: Number of records to skip
# ip_filter: Filter by IP address
# Returns:
# List of ip log dictionaries
# """
# session = self.session
# try:
# query = session.query(IpLog).order_by(IpLog.last_access.desc())
# if ip_filter:
# query = query.filter(IpLog.ip == sanitize_ip(ip_filter))
# logs = query.offset(offset).limit(limit).all()
# return [
# {
# 'id': log.id,
# 'ip': log.ip,
# 'stats': log.stats,
# 'category': log.category,
# 'manual_category': log.manual_category,
# 'last_evaluation': log.last_evaluation,
# 'last_access': log.last_access
# }
# for log in logs
# ]
# finally:
# self.close_session()
# Module-level singleton instance
_db_manager = DatabaseManager()

View File

@@ -9,6 +9,7 @@ from typing import Optional, List
from config import Config
from tracker import AccessTracker
from analyzer import Analyzer
from templates import html_templates
from templates.dashboard_template import generate_dashboard
from generators import (
@@ -23,6 +24,7 @@ class Handler(BaseHTTPRequestHandler):
webpages: Optional[List[str]] = None
config: Config = None
tracker: AccessTracker = None
analyzer: Analyzer = None
counter: int = 0
app_logger: logging.Logger = None
access_logger: logging.Logger = None
@@ -348,6 +350,8 @@ class Handler(BaseHTTPRequestHandler):
return
self.tracker.record_access(client_ip, self.path, user_agent, method='GET')
self.analyzer.infer_user_category(client_ip)
if self.tracker.is_suspicious_user_agent(user_agent):
self.access_logger.warning(f"[SUSPICIOUS] {client_ip} - {user_agent[:50]} - {self.path}")

View File

@@ -6,9 +6,9 @@ Stores access logs, credential attempts, attack detections, and IP statistics.
"""
from datetime import datetime
from typing import Optional, List
from typing import Optional, List, Dict
from sqlalchemy import String, Integer, Boolean, DateTime, ForeignKey, Index
from sqlalchemy import String, Integer, Boolean, DateTime, ForeignKey, Index, JSON
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sanitizer import (
@@ -38,6 +38,7 @@ class AccessLog(Base):
__tablename__ = 'access_logs'
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
#ip: Mapped[str] = mapped_column(String(MAX_IP_LENGTH), nullable=False, index=True, ForeignKey('ip_logs.id', ondelete='CASCADE'))
ip: Mapped[str] = mapped_column(String(MAX_IP_LENGTH), nullable=False, index=True)
path: Mapped[str] = mapped_column(String(MAX_PATH_LENGTH), nullable=False)
user_agent: Mapped[Optional[str]] = mapped_column(String(MAX_USER_AGENT_LENGTH), nullable=True)
@@ -139,5 +140,43 @@ class IpStats(Base):
reputation_source: Mapped[Optional[str]] = mapped_column(String(MAX_REPUTATION_SOURCE_LENGTH), nullable=True)
reputation_updated: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
#Analyzed metrics, category and category scores
analyzed_metrics: Mapped[Dict[str,object]] = mapped_column(JSON, nullable=True)
category: Mapped[str] = mapped_column(String, nullable=True)
category_scores: Mapped[Dict[str,int]] = mapped_column(JSON, nullable=True)
manual_category: Mapped[bool] = mapped_column(Boolean, default=False, nullable=True)
last_analysis: Mapped[datetime] = mapped_column(DateTime, nullable=True)
def __repr__(self) -> str:
return f"<IpStats(ip='{self.ip}', total_requests={self.total_requests})>"
# class IpLog(Base):
# """
# Records all IPs that have accessed the honeypot, along with aggregated stats and inferred user category.
# """
# __tablename__ = 'ip_logs'
# id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
# ip: Mapped[str] = mapped_column(String(MAX_IP_LENGTH), nullable=False, index=True)
# stats: Mapped[List[str]] = mapped_column(String(MAX_PATH_LENGTH))
# category: Mapped[str] = mapped_column(String(15))
# manual_category: Mapped[bool] = mapped_column(Boolean, default=False)
# last_analysis: Mapped[datetime] = mapped_column(DateTime, index=True),
# # Relationship to attack detections
# access_logs: Mapped[List["AccessLog"]] = relationship(
# "AccessLog",
# back_populates="ip",
# cascade="all, delete-orphan"
# )
# # Indexes for common queries
# __table_args__ = (
# Index('ix_access_logs_ip_timestamp', 'ip', 'timestamp'),
# Index('ix_access_logs_is_suspicious', 'is_suspicious'),
# Index('ix_access_logs_is_honeypot_trigger', 'is_honeypot_trigger'),
# )
# def __repr__(self) -> str:
# return f"<AccessLog(id={self.id}, ip='{self.ip}', path='{self.path[:50]}')>"

View File

@@ -10,6 +10,7 @@ from http.server import HTTPServer
from config import Config
from tracker import AccessTracker
from analyzer import Analyzer
from handler import Handler
from logger import initialize_logging, get_app_logger, get_access_logger, get_credential_logger
from database import initialize_database
@@ -67,9 +68,11 @@ def main():
app_logger.warning(f'Database initialization failed: {e}. Continuing with in-memory only.')
tracker = AccessTracker(timezone=tz)
analyzer = Analyzer(timezone=tz)
Handler.config = config
Handler.tracker = tracker
Handler.analyzer = analyzer
Handler.counter = config.canary_token_tries
Handler.app_logger = app_logger
Handler.access_logger = access_logger

View File

@@ -116,6 +116,10 @@ class Wordlists:
@property
def server_headers(self):
return self._data.get("server_headers", [])
@property
def attack_urls(self):
return self._data.get("attack_urls", [])
_wordlists_instance = None

View File

@@ -201,5 +201,11 @@
"cloudflare",
"AmazonS3",
"gunicorn/20.1.0"
]
],
"attack_urls": {
"path_traversal": "\\.\\.",
"sql_injection": "('|--|;|\bOR\b|\bUNION\b|\bSELECT\b|\bDROP\b)",
"xss_attempt": "(<script|javascript:|onerror=|onload=)",
"shell_injection": "(\\||;|`|\\$\\(|&&)"
}
}