Merge pull request #31 from BlessedRebuS/fix/scoring-algorithm

fixed categorization visualization, fixed date in the dashboard, fixe…
This commit is contained in:
Phillip Tarrant
2026-01-08 13:41:54 -06:00
committed by GitHub
8 changed files with 154 additions and 58 deletions

View File

@@ -14,6 +14,7 @@ RUN pip install --no-cache-dir -r requirements.txt
COPY src/ /app/src/
COPY wordlists.json /app/
COPY entrypoint.sh /app/
COPY config.yaml /app/
RUN useradd -m -u 1000 krawl && \
mkdir -p /app/logs /app/data && \

View File

@@ -3,7 +3,7 @@
server:
port: 5000
delay: 100 # Response delay in milliseconds
timezone: null # e.g., "America/New_York" or null for system default
timezone: null # e.g., "America/New_York", "Europe/Paris" or null for system default
# manually set the server header, if null a random one will be used.
server_header: null
@@ -11,8 +11,8 @@ server:
links:
min_length: 5
max_length: 15
min_per_page: 10
max_per_page: 15
min_per_page: 5
max_per_page: 10
char_space: "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
max_counter: 10
@@ -38,9 +38,9 @@ behavior:
probability_error_codes: 0 # 0-100 percentage
analyzer:
# http_risky_methods_threshold: 0.1
# violated_robots_threshold: 0.1
# uneven_request_timing_threshold: 5
# uneven_request_timing_time_window_seconds: 300
# user_agents_used_threshold: 2
# attack_urls_threshold: 1
http_risky_methods_threshold: 0.1
violated_robots_threshold: 0.1
uneven_request_timing_threshold: 2
uneven_request_timing_time_window_seconds: 300
user_agents_used_threshold: 2
attack_urls_threshold: 1

View File

@@ -6,6 +6,7 @@ from zoneinfo import ZoneInfo
from pathlib import Path
from datetime import datetime, timedelta
import re
import urllib.parse
from wordlists import get_wordlists
from config import get_config
"""
@@ -101,6 +102,15 @@ class Analyzer:
total_accesses_count = len(accesses)
if total_accesses_count <= 0:
return
# Set category as "unknown" for the first 5 requests
if total_accesses_count < 3:
category = "unknown"
analyzed_metrics = {}
category_scores = {"attacker": 0, "good_crawler": 0, "bad_crawler": 0, "regular_user": 0, "unknown": 0}
last_analysis = datetime.now(tz=ZoneInfo('UTC'))
self._db_manager.update_ip_stats_analysis(ip, analyzed_metrics, category, category_scores, last_analysis)
return 0
#--------------------- HTTP Methods ---------------------
@@ -147,7 +157,7 @@ class Analyzer:
robots_disallows.append(parts[1].strip())
#if 0 100% sure is good crawler, if >10% of robots violated is bad crawler or attacker
violated_robots_count = len([item for item in accesses if item["path"].rstrip("/") in tuple(robots_disallows)])
violated_robots_count = len([item for item in accesses if any(item["path"].rstrip("/").startswith(disallow) for disallow in robots_disallows)])
#print(f"Violated robots count: {violated_robots_count}")
if total_accesses_count > 0:
violated_robots_ratio = violated_robots_count / total_accesses_count
@@ -168,7 +178,8 @@ class Analyzer:
#--------------------- Requests Timing ---------------------
#Request rate and timing: steady, throttled, polite vs attackers' bursty, aggressive, or oddly rhythmic behavior
timestamps = [datetime.fromisoformat(item["timestamp"]) for item in accesses]
timestamps = [ts for ts in timestamps if datetime.utcnow() - ts <= timedelta(seconds=uneven_request_timing_time_window_seconds)]
now_utc = datetime.now(tz=ZoneInfo('UTC'))
timestamps = [ts for ts in timestamps if now_utc - ts <= timedelta(seconds=uneven_request_timing_time_window_seconds)]
timestamps = sorted(timestamps, reverse=True)
time_diffs = []
@@ -221,13 +232,25 @@ class Analyzer:
attack_urls_found_list = []
wl = get_wordlists()
if wl.attack_urls:
if wl.attack_patterns:
queried_paths = [item["path"] for item in accesses]
for queried_path in queried_paths:
for name, pattern in wl.attack_urls.items():
if re.search(pattern, queried_path, re.IGNORECASE):
attack_urls_found_list.append(pattern)
# URL decode the path to catch encoded attacks
try:
decoded_path = urllib.parse.unquote(queried_path)
# Double decode to catch double-encoded attacks
decoded_path_twice = urllib.parse.unquote(decoded_path)
except Exception:
decoded_path = queried_path
decoded_path_twice = queried_path
for name, pattern in wl.attack_patterns.items():
# Check original, decoded, and double-decoded paths
if (re.search(pattern, queried_path, re.IGNORECASE) or
re.search(pattern, decoded_path, re.IGNORECASE) or
re.search(pattern, decoded_path_twice, re.IGNORECASE)):
attack_urls_found_list.append(f"{name}: {pattern}")
if len(attack_urls_found_list) > attack_urls_threshold:
score["attacker"]["attack_url"] = True
@@ -276,7 +299,7 @@ class Analyzer:
analyzed_metrics = {"risky_http_methods": http_method_attacker_score, "robots_violations": violated_robots_ratio, "uneven_request_timing": mean, "different_user_agents": user_agents_used, "attack_url": attack_urls_found_list}
category_scores = {"attacker": attacker_score, "good_crawler": good_crawler_score, "bad_crawler": bad_crawler_score, "regular_user": regular_user_score}
category = max(category_scores, key=category_scores.get)
last_analysis = datetime.utcnow()
last_analysis = datetime.now(tz=ZoneInfo('UTC'))
self._db_manager.update_ip_stats_analysis(ip, analyzed_metrics, category, category_scores, last_analysis)

View File

@@ -9,6 +9,7 @@ import os
import stat
from datetime import datetime
from typing import Optional, List, Dict, Any
from zoneinfo import ZoneInfo
from sqlalchemy import create_engine, func, distinct, case
from sqlalchemy.orm import sessionmaker, scoped_session, Session
@@ -127,7 +128,7 @@ class DatabaseManager:
method=method[:10],
is_suspicious=is_suspicious,
is_honeypot_trigger=is_honeypot_trigger,
timestamp=datetime.utcnow()
timestamp=datetime.now(tz=ZoneInfo('UTC'))
)
session.add(access_log)
session.flush() # Get the ID before committing
@@ -185,7 +186,7 @@ class DatabaseManager:
path=sanitize_path(path),
username=sanitize_credential(username),
password=sanitize_credential(password),
timestamp=datetime.utcnow()
timestamp=datetime.now(tz=ZoneInfo('UTC'))
)
session.add(credential)
session.commit()
@@ -207,7 +208,7 @@ class DatabaseManager:
ip: IP address to update
"""
sanitized_ip = sanitize_ip(ip)
now = datetime.utcnow()
now = datetime.now(tz=ZoneInfo('UTC'))
ip_stats = session.query(IpStats).filter(IpStats.ip == sanitized_ip).first()
@@ -251,6 +252,12 @@ class DatabaseManager:
ip_stats.category = category
ip_stats.category_scores = category_scores
ip_stats.last_analysis = last_analysis
try:
session.commit()
except Exception as e:
session.rollback()
print(f"Error updating IP stats analysis: {e}")
def manual_update_category(self, ip: str, category: str) -> None:
"""
@@ -268,14 +275,21 @@ class DatabaseManager:
# Record the manual category change
old_category = ip_stats.category
if old_category != category:
self._record_category_change(sanitized_ip, old_category, category, datetime.utcnow())
self._record_category_change(sanitized_ip, old_category, category, datetime.now(tz=ZoneInfo('UTC')))
ip_stats.category = category
ip_stats.manual_category = True
try:
session.commit()
except Exception as e:
session.rollback()
print(f"Error updating manual category: {e}")
def _record_category_change(self, ip: str, old_category: Optional[str], new_category: str, timestamp: datetime) -> None:
"""
Internal method to record category changes in history.
Only records if there's an actual change from a previous category.
Args:
ip: IP address
@@ -283,6 +297,11 @@ class DatabaseManager:
new_category: New category
timestamp: When the change occurred
"""
# Don't record initial categorization (when old_category is None)
# Only record actual category changes
if old_category is None:
return
session = self.session
try:
history_entry = CategoryHistory(
@@ -318,7 +337,7 @@ class DatabaseManager:
{
'old_category': h.old_category,
'new_category': h.new_category,
'timestamp': h.timestamp.isoformat()
'timestamp': h.timestamp.isoformat() + '+00:00'
}
for h in history
]
@@ -364,7 +383,7 @@ class DatabaseManager:
'method': log.method,
'is_suspicious': log.is_suspicious,
'is_honeypot_trigger': log.is_honeypot_trigger,
'timestamp': log.timestamp.isoformat(),
'timestamp': log.timestamp.isoformat() + '+00:00',
'attack_types': [d.attack_type for d in log.attack_detections]
}
for log in logs
@@ -457,7 +476,7 @@ class DatabaseManager:
'path': attempt.path,
'username': attempt.username,
'password': attempt.password,
'timestamp': attempt.timestamp.isoformat()
'timestamp': attempt.timestamp.isoformat() + '+00:00'
}
for attempt in attempts
]
@@ -484,8 +503,8 @@ class DatabaseManager:
{
'ip': s.ip,
'total_requests': s.total_requests,
'first_seen': s.first_seen.isoformat(),
'last_seen': s.last_seen.isoformat(),
'first_seen': s.first_seen.isoformat() + '+00:00',
'last_seen': s.last_seen.isoformat() + '+00:00',
'country_code': s.country_code,
'city': s.city,
'asn': s.asn,
@@ -525,8 +544,8 @@ class DatabaseManager:
return {
'ip': stat.ip,
'total_requests': stat.total_requests,
'first_seen': stat.first_seen.isoformat() if stat.first_seen else None,
'last_seen': stat.last_seen.isoformat() if stat.last_seen else None,
'first_seen': stat.first_seen.isoformat() + '+00:00' if stat.first_seen else None,
'last_seen': stat.last_seen.isoformat() + '+00:00' if stat.last_seen else None,
'country_code': stat.country_code,
'city': stat.city,
'asn': stat.asn,
@@ -537,7 +556,7 @@ class DatabaseManager:
'category': stat.category,
'category_scores': stat.category_scores or {},
'manual_category': stat.manual_category,
'last_analysis': stat.last_analysis.isoformat() if stat.last_analysis else None,
'last_analysis': stat.last_analysis.isoformat() + '+00:00' if stat.last_analysis else None,
'category_history': category_history
}
finally:
@@ -671,7 +690,7 @@ class DatabaseManager:
'ip': log.ip,
'path': log.path,
'user_agent': log.user_agent,
'timestamp': log.timestamp.isoformat()
'timestamp': log.timestamp.isoformat() + '+00:00'
}
for log in logs
]
@@ -729,7 +748,7 @@ class DatabaseManager:
'ip': log.ip,
'path': log.path,
'user_agent': log.user_agent,
'timestamp': log.timestamp.isoformat(),
'timestamp': log.timestamp.isoformat() + '+00:00',
'attack_types': [d.attack_type for d in log.attack_detections]
}
for log in logs

View File

@@ -407,7 +407,8 @@ class Handler(BaseHTTPRequestHandler):
self.end_headers()
try:
stats = self.tracker.get_stats()
self.wfile.write(generate_dashboard(stats).encode())
timezone = str(self.config.timezone) if self.config.timezone else 'UTC'
self.wfile.write(generate_dashboard(stats, timezone).encode())
except BrokenPipeError:
pass
except Exception as e:

View File

@@ -7,6 +7,7 @@ Customize this template to change the dashboard appearance.
import html
from datetime import datetime
from zoneinfo import ZoneInfo
def _escape(value) -> str:
"""Escape HTML special characters to prevent XSS attacks."""
@@ -14,18 +15,36 @@ def _escape(value) -> str:
return ""
return html.escape(str(value))
def format_timestamp(iso_timestamp: str) -> str:
"""Format ISO timestamp for display (YYYY-MM-DD HH:MM:SS)"""
def format_timestamp(iso_timestamp: str, timezone: str = 'UTC', time_only: bool = False) -> str:
"""Format ISO timestamp for display with timezone conversion
Args:
iso_timestamp: ISO format timestamp string (UTC)
timezone: IANA timezone string to convert to
time_only: If True, return only HH:MM:SS, otherwise full datetime
"""
try:
# Parse UTC timestamp
dt = datetime.fromisoformat(iso_timestamp)
# Convert to target timezone
if dt.tzinfo is not None:
dt = dt.astimezone(ZoneInfo(timezone))
if time_only:
return dt.strftime("%H:%M:%S")
return dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
# Fallback for old format
return iso_timestamp.split("T")[1][:8] if "T" in iso_timestamp else iso_timestamp
def generate_dashboard(stats: dict) -> str:
"""Generate dashboard HTML with access statistics"""
def generate_dashboard(stats: dict, timezone: str = 'UTC') -> str:
"""Generate dashboard HTML with access statistics
Args:
stats: Statistics dictionary
timezone: IANA timezone string (e.g., 'Europe/Paris', 'America/New_York')
"""
# Generate IP rows with clickable functionality for dropdown stats
top_ips_rows = '\n'.join([
@@ -62,7 +81,7 @@ def generate_dashboard(stats: dict) -> str:
<td class="ip-clickable">{_escape(log["ip"])}</td>
<td>{_escape(log["path"])}</td>
<td style="word-break: break-all;">{_escape(log["user_agent"][:60])}</td>
<td>{_escape(log["timestamp"].split("T")[1][:8])}</td>
<td>{format_timestamp(log["timestamp"], timezone, time_only=True)}</td>
</tr>
<tr class="ip-stats-row" id="stats-row-suspicious-{_escape(log["ip"]).replace(".", "-")}" style="display: none;">
<td colspan="4" class="ip-stats-cell">
@@ -98,7 +117,7 @@ def generate_dashboard(stats: dict) -> str:
<td>{_escape(log["path"])}</td>
<td>{_escape(", ".join(log["attack_types"]))}</td>
<td style="word-break: break-all;">{_escape(log["user_agent"][:60])}</td>
<td>{_escape(log["timestamp"].split("T")[1][:8])}</td>
<td>{format_timestamp(log["timestamp"], timezone, time_only=True)}</td>
</tr>
<tr class="ip-stats-row" id="stats-row-attack-{_escape(log["ip"]).replace(".", "-")}" style="display: none;">
<td colspan="5" class="ip-stats-cell">
@@ -117,7 +136,7 @@ def generate_dashboard(stats: dict) -> str:
<td>{_escape(log["username"])}</td>
<td>{_escape(log["password"])}</td>
<td>{_escape(log["path"])}</td>
<td>{_escape(log["timestamp"].split("T")[1][:8])}</td>
<td>{format_timestamp(log["timestamp"], timezone, time_only=True)}</td>
</tr>
<tr class="ip-stats-row" id="stats-row-cred-{_escape(log["ip"]).replace(".", "-")}" style="display: none;">
<td colspan="5" class="ip-stats-cell">
@@ -352,6 +371,11 @@ def generate_dashboard(stats: dict) -> str:
color: #58a6ff;
border: 1px solid #58a6ff;
}}
.category-unknown {{
background: #8b949e1a;
color: #8b949e;
border: 1px solid #8b949e;
}}
.timeline-container {{
margin-top: 15px;
padding-top: 15px;
@@ -403,6 +427,9 @@ def generate_dashboard(stats: dict) -> str:
.timeline-marker.regular-user {{
background: #58a6ff;
}}
.timeline-marker.unknown {{
background: #8b949e;
}}
.timeline-content {{
font-size: 12px;
}}
@@ -570,6 +597,30 @@ def generate_dashboard(stats: dict) -> str:
</div>
</div>
<script>
// Server timezone configuration
const SERVER_TIMEZONE = '{timezone}';
// Convert UTC timestamp to configured timezone
function formatTimestamp(isoTimestamp) {{
if (!isoTimestamp) return 'N/A';
try {{
const date = new Date(isoTimestamp);
return date.toLocaleString('en-US', {{
timeZone: SERVER_TIMEZONE,
year: 'numeric',
month: '2-digit',
day: '2-digit',
hour: '2-digit',
minute: '2-digit',
second: '2-digit',
hour12: false
}});
}} catch (err) {{
console.error('Error formatting timestamp:', err);
return new Date(isoTimestamp).toLocaleString();
}}
}}
// Add sorting functionality to tables
document.querySelectorAll('th.sortable').forEach(header => {{
header.addEventListener('click', function() {{
@@ -684,12 +735,12 @@ def generate_dashboard(stats: dict) -> str:
html += '<div class="stat-row">';
html += '<span class="stat-label-sm">First Seen:</span>';
html += `<span class="stat-value-sm">${{stats.first_seen ? new Date(stats.first_seen).toLocaleString() : 'N/A'}}</span>`;
html += `<span class="stat-value-sm">${{formatTimestamp(stats.first_seen)}}</span>`;
html += '</div>';
html += '<div class="stat-row">';
html += '<span class="stat-label-sm">Last Seen:</span>';
html += `<span class="stat-value-sm">${{stats.last_seen ? new Date(stats.last_seen).toLocaleString() : 'N/A'}}</span>`;
html += `<span class="stat-value-sm">${{formatTimestamp(stats.last_seen)}}</span>`;
html += '</div>';
// Category
@@ -732,7 +783,7 @@ def generate_dashboard(stats: dict) -> str:
stats.category_history.forEach((change, index) => {{
const categoryClass = change.new_category.toLowerCase().replace('_', '-');
const timestamp = new Date(change.timestamp).toLocaleString();
const timestamp = formatTimestamp(change.timestamp);
html += '<div class="timeline-item">';
html += `<div class="timeline-marker ${{categoryClass}}"></div>`;
@@ -769,7 +820,8 @@ def generate_dashboard(stats: dict) -> str:
attacker: stats.category_scores.attacker || 0,
good_crawler: stats.category_scores.good_crawler || 0,
bad_crawler: stats.category_scores.bad_crawler || 0,
regular_user: stats.category_scores.regular_user || 0
regular_user: stats.category_scores.regular_user || 0,
unknown: stats.category_scores.unknown || 0
}};
// Normalize scores for better visualization
@@ -786,14 +838,16 @@ def generate_dashboard(stats: dict) -> str:
attacker: '#f85149',
good_crawler: '#3fb950',
bad_crawler: '#f0883e',
regular_user: '#58a6ff'
regular_user: '#58a6ff',
unknown: '#8b949e'
}};
const labels = {{
attacker: 'Attacker',
good_crawler: 'Good Bot',
bad_crawler: 'Bad Bot',
regular_user: 'User'
regular_user: 'User',
unknown: 'Unknown'
}};
// Draw radar background grid
@@ -803,9 +857,9 @@ def generate_dashboard(stats: dict) -> str:
html += `<circle cx="${{cx}}" cy="${{cy}}" r="${{r}}" fill="none" stroke="#30363d" stroke-width="0.5"/>`;
}}
// Draw axes
const angles = [0, 90, 180, 270];
const keys = ['good_crawler', 'regular_user', 'bad_crawler', 'attacker'];
// Draw axes (now with 5 points for pentagon)
const angles = [0, 72, 144, 216, 288];
const keys = ['good_crawler', 'regular_user', 'unknown', 'bad_crawler', 'attacker'];
angles.forEach((angle, i) => {{
const rad = (angle - 90) * Math.PI / 180;

View File

@@ -131,7 +131,8 @@ class Wordlists:
@property
def attack_urls(self):
return self._data.get("attack_urls", [])
"""Deprecated: use attack_patterns instead. Returns attack_patterns for backward compatibility."""
return self._data.get("attack_patterns", {})
_wordlists_instance = None

View File

@@ -353,11 +353,14 @@
}
},
"attack_patterns": {
"path_traversal": "\\.\\.",
"path_traversal": "(\\.\\.|%2e%2e|%252e%252e|\\.{2,}|%c0%ae|%c1%9c)",
"sql_injection": "('|\"|`|--|#|/\\*|\\*/|\\bunion\\b|\\bunion\\s+select\\b|\\bor\\b.*=.*|\\band\\b.*=.*|'.*or.*'.*=.*'|\\bsleep\\b|\\bwaitfor\\b|\\bdelay\\b|\\bbenchmark\\b|;.*select|;.*drop|;.*insert|;.*update|;.*delete|\\bexec\\b|\\bexecute\\b|\\bxp_cmdshell\\b|information_schema|table_schema|table_name)",
"xss_attempt": "(<script|</script|javascript:|onerror=|onload=|onclick=|onmouseover=|onfocus=|onblur=|<iframe|<img|<svg|<embed|<object|<body|<input|eval\\(|alert\\(|prompt\\(|confirm\\(|document\\.|window\\.|<style|expression\\(|vbscript:|data:text/html)",
"common_probes": "(wp-admin|phpmyadmin|\\.env|\\.git|/admin|/config)",
"shell_injection": "(\\||;|`|\\$\\(|&&)"
"shell_injection": "(\\||;|`|\\$\\(|&&|\\bnc\\b|\\bnetcat\\b|\\bwget\\b|\\bcurl\\b|/bin/bash|/bin/sh|cmd\\.exe)",
"lfi_rfi": "(file://|php://|expect://|data://|zip://|phar://|/etc/passwd|/etc/shadow|/proc/self|c:\\\\windows)",
"xxe_injection": "(<!ENTITY|<!DOCTYPE|SYSTEM|PUBLIC)",
"ldap_injection": "(\\*\\)|\\(\\||\\(&)",
"command_injection": "(&&|\\|\\||;|\\$\\{|\\$\\(|`)"
},
"server_headers": [
"Apache/2.4.41 (Ubuntu)",
@@ -366,11 +369,5 @@
"cloudflare",
"AmazonS3",
"gunicorn/20.1.0"
],
"attack_urls": {
"path_traversal": "\\.\\.",
"sql_injection": "('|--|;|\bOR\b|\bUNION\b|\bSELECT\b|\bDROP\b)",
"xss_attempt": "(<script|javascript:|onerror=|onload=)",
"shell_injection": "(\\||;|`|\\$\\(|&&)"
}
]
}