Merge branch 'dev' into feat/background-tasks

This commit is contained in:
Phillip Tarrant
2026-01-07 11:51:27 -06:00
13 changed files with 1405 additions and 120 deletions

View File

@@ -6,7 +6,7 @@ server:
timezone: null # e.g., "America/New_York" or null for system default
# manually set the server header, if null a random one will be used.
server_header: "Apache/2.2.22 (Ubuntu)"
server_header: null
links:
min_length: 5
@@ -35,4 +35,12 @@ database:
retention_days: 30
behavior:
probability_error_codes: 0 # 0-100 percentage
probability_error_codes: 0 # 0-100 percentage
analyzer:
# http_risky_methods_threshold: 0.1
# violated_robots_threshold: 0.1
# uneven_request_timing_threshold: 5
# uneven_request_timing_time_window_seconds: 300
# user_agents_used_threshold: 2
# attack_urls_threshold: 1

283
src/analyzer.py Normal file
View File

@@ -0,0 +1,283 @@
#!/usr/bin/env python3
from sqlalchemy import select
from typing import Optional
from database import get_database, DatabaseManager
from zoneinfo import ZoneInfo
from pathlib import Path
from datetime import datetime, timedelta
import re
from wordlists import get_wordlists
from config import get_config
"""
Functions for user activity analysis
"""
class Analyzer:
"""
Analyzes users activity and produces aggregated insights
"""
def __init__(self, db_manager: Optional[DatabaseManager] = None, timezone: Optional[ZoneInfo] = None):
"""
Initialize the access tracker.
Args:
db_manager: Optional DatabaseManager for persistence.
If None, will use the global singleton.
"""
self.timezone = timezone or ZoneInfo('UTC')
# Database manager for persistence (lazily initialized)
self._db_manager = db_manager
@property
def db(self) -> Optional[DatabaseManager]:
"""
Get the database manager, lazily initializing if needed.
Returns:
DatabaseManager instance or None if not available
"""
if self._db_manager is None:
try:
self._db_manager = get_database()
except Exception:
# Database not initialized, persistence disabled
pass
return self._db_manager
def infer_user_category(self, ip: str) -> str:
config = get_config()
http_risky_methods_threshold = config.http_risky_methods_threshold
violated_robots_threshold = config.violated_robots_threshold
uneven_request_timing_threshold = config.uneven_request_timing_threshold
user_agents_used_threshold = config.user_agents_used_threshold
attack_urls_threshold = config.attack_urls_threshold
uneven_request_timing_time_window_seconds = config.uneven_request_timing_time_window_seconds
print(f"http_risky_methods_threshold: {http_risky_methods_threshold}")
score = {}
score["attacker"] = {"risky_http_methods": False, "robots_violations": False, "uneven_request_timing": False, "different_user_agents": False, "attack_url": False}
score["good_crawler"] = {"risky_http_methods": False, "robots_violations": False, "uneven_request_timing": False, "different_user_agents": False, "attack_url": False}
score["bad_crawler"] = {"risky_http_methods": False, "robots_violations": False, "uneven_request_timing": False, "different_user_agents": False, "attack_url": False}
score["regular_user"] = {"risky_http_methods": False, "robots_violations": False, "uneven_request_timing": False, "different_user_agents": False, "attack_url": False}
#1-3 low, 4-6 mid, 7-9 high, 10-20 extreme
weights = {
"attacker": {
"risky_http_methods": 6,
"robots_violations": 4,
"uneven_request_timing": 3,
"different_user_agents": 8,
"attack_url": 15
},
"good_crawler": {
"risky_http_methods": 1,
"robots_violations": 0,
"uneven_request_timing": 0,
"different_user_agents": 0,
"attack_url": 0
},
"bad_crawler": {
"risky_http_methods": 2,
"robots_violations": 7,
"uneven_request_timing": 0,
"different_user_agents": 5,
"attack_url": 5
},
"regular_user": {
"risky_http_methods": 0,
"robots_violations": 0,
"uneven_request_timing": 8,
"different_user_agents": 3,
"attack_url": 0
}
}
accesses = self.db.get_access_logs(ip_filter = ip, limit=1000)
total_accesses_count = len(accesses)
if total_accesses_count <= 0:
return
#--------------------- HTTP Methods ---------------------
get_accesses_count = len([item for item in accesses if item["method"] == "GET"])
post_accesses_count = len([item for item in accesses if item["method"] == "POST"])
put_accesses_count = len([item for item in accesses if item["method"] == "PUT"])
delete_accesses_count = len([item for item in accesses if item["method"] == "DELETE"])
head_accesses_count = len([item for item in accesses if item["method"] == "HEAD"])
options_accesses_count = len([item for item in accesses if item["method"] == "OPTIONS"])
patch_accesses_count = len([item for item in accesses if item["method"] == "PATCH"])
if total_accesses_count > http_risky_methods_threshold:
http_method_attacker_score = (post_accesses_count + put_accesses_count + delete_accesses_count + options_accesses_count + patch_accesses_count) / total_accesses_count
else:
http_method_attacker_score = 0
#print(f"HTTP Method attacker score: {http_method_attacker_score}")
if http_method_attacker_score >= http_risky_methods_threshold:
score["attacker"]["risky_http_methods"] = True
score["good_crawler"]["risky_http_methods"] = False
score["bad_crawler"]["risky_http_methods"] = True
score["regular_user"]["risky_http_methods"] = False
else:
score["attacker"]["risky_http_methods"] = False
score["good_crawler"]["risky_http_methods"] = True
score["bad_crawler"]["risky_http_methods"] = False
score["regular_user"]["risky_http_methods"] = False
#--------------------- Robots Violations ---------------------
#respect robots.txt and login/config pages access frequency
robots_disallows = []
robots_path = Path(__file__).parent / "templates" / "html" / "robots.txt"
with open(robots_path, "r") as f:
for line in f:
line = line.strip()
if not line:
continue
parts = line.split(":")
if parts[0] == "Disallow":
parts[1] = parts[1].rstrip("/")
#print(f"DISALLOW {parts[1]}")
robots_disallows.append(parts[1].strip())
#if 0 100% sure is good crawler, if >10% of robots violated is bad crawler or attacker
violated_robots_count = len([item for item in accesses if item["path"].rstrip("/") in tuple(robots_disallows)])
#print(f"Violated robots count: {violated_robots_count}")
if total_accesses_count > 0:
violated_robots_ratio = violated_robots_count / total_accesses_count
else:
violated_robots_ratio = 0
if violated_robots_ratio >= violated_robots_threshold:
score["attacker"]["robots_violations"] = True
score["good_crawler"]["robots_violations"] = False
score["bad_crawler"]["robots_violations"] = True
score["regular_user"]["robots_violations"] = False
else:
score["attacker"]["robots_violations"] = False
score["good_crawler"]["robots_violations"] = False
score["bad_crawler"]["robots_violations"] = False
score["regular_user"]["robots_violations"] = False
#--------------------- Requests Timing ---------------------
#Request rate and timing: steady, throttled, polite vs attackers' bursty, aggressive, or oddly rhythmic behavior
timestamps = [datetime.fromisoformat(item["timestamp"]) for item in accesses]
timestamps = [ts for ts in timestamps if datetime.utcnow() - ts <= timedelta(seconds=uneven_request_timing_time_window_seconds)]
timestamps = sorted(timestamps, reverse=True)
time_diffs = []
for i in range(0, len(timestamps)-1):
diff = (timestamps[i] - timestamps[i+1]).total_seconds()
time_diffs.append(diff)
mean = 0
variance = 0
std = 0
cv = 0
if time_diffs:
mean = sum(time_diffs) / len(time_diffs)
variance = sum((x - mean) ** 2 for x in time_diffs) / len(time_diffs)
std = variance ** 0.5
cv = std/mean
print(f"Mean: {mean} - Variance {variance} - Standard Deviation {std} - Coefficient of Variation: {cv}")
if cv >= uneven_request_timing_threshold:
score["attacker"]["uneven_request_timing"] = True
score["good_crawler"]["uneven_request_timing"] = False
score["bad_crawler"]["uneven_request_timing"] = False
score["regular_user"]["uneven_request_timing"] = True
else:
score["attacker"]["uneven_request_timing"] = False
score["good_crawler"]["uneven_request_timing"] = False
score["bad_crawler"]["uneven_request_timing"] = False
score["regular_user"]["uneven_request_timing"] = False
#--------------------- Different User Agents ---------------------
#Header Quality and Consistency: Crawlers tend to use complete and consistent headers, attackers might miss, fake, or change headers
user_agents_used = [item["user_agent"] for item in accesses]
user_agents_used = list(dict.fromkeys(user_agents_used))
#print(f"User agents used: {user_agents_used}")
if len(user_agents_used) >= user_agents_used_threshold:
score["attacker"]["different_user_agents"] = True
score["good_crawler"]["different_user_agents"] = False
score["bad_crawler"]["different_user_agentss"] = True
score["regular_user"]["different_user_agents"] = False
else:
score["attacker"]["different_user_agents"] = False
score["good_crawler"]["different_user_agents"] = False
score["bad_crawler"]["different_user_agents"] = False
score["regular_user"]["different_user_agents"] = False
#--------------------- Attack URLs ---------------------
attack_urls_found_list = []
wl = get_wordlists()
if wl.attack_urls:
queried_paths = [item["path"] for item in accesses]
for queried_path in queried_paths:
for name, pattern in wl.attack_urls.items():
if re.search(pattern, queried_path, re.IGNORECASE):
attack_urls_found_list.append(pattern)
if len(attack_urls_found_list) > attack_urls_threshold:
score["attacker"]["attack_url"] = True
score["good_crawler"]["attack_url"] = False
score["bad_crawler"]["attack_url"] = False
score["regular_user"]["attack_url"] = False
else:
score["attacker"]["attack_url"] = False
score["good_crawler"]["attack_url"] = False
score["bad_crawler"]["attack_url"] = False
score["regular_user"]["attack_url"] = False
#--------------------- Calculate score ---------------------
attacker_score = good_crawler_score = bad_crawler_score = regular_user_score = 0
attacker_score = score["attacker"]["risky_http_methods"] * weights["attacker"]["risky_http_methods"]
attacker_score = attacker_score + score["attacker"]["robots_violations"] * weights["attacker"]["robots_violations"]
attacker_score = attacker_score + score["attacker"]["uneven_request_timing"] * weights["attacker"]["uneven_request_timing"]
attacker_score = attacker_score + score["attacker"]["different_user_agents"] * weights["attacker"]["different_user_agents"]
attacker_score = attacker_score + score["attacker"]["attack_url"] * weights["attacker"]["attack_url"]
good_crawler_score = score["good_crawler"]["risky_http_methods"] * weights["good_crawler"]["risky_http_methods"]
good_crawler_score = good_crawler_score + score["good_crawler"]["robots_violations"] * weights["good_crawler"]["robots_violations"]
good_crawler_score = good_crawler_score + score["good_crawler"]["uneven_request_timing"] * weights["good_crawler"]["uneven_request_timing"]
good_crawler_score = good_crawler_score + score["good_crawler"]["different_user_agents"] * weights["good_crawler"]["different_user_agents"]
good_crawler_score = good_crawler_score + score["good_crawler"]["attack_url"] * weights["good_crawler"]["attack_url"]
bad_crawler_score = score["bad_crawler"]["risky_http_methods"] * weights["bad_crawler"]["risky_http_methods"]
bad_crawler_score = bad_crawler_score + score["bad_crawler"]["robots_violations"] * weights["bad_crawler"]["robots_violations"]
bad_crawler_score = bad_crawler_score + score["bad_crawler"]["uneven_request_timing"] * weights["bad_crawler"]["uneven_request_timing"]
bad_crawler_score = bad_crawler_score + score["bad_crawler"]["different_user_agents"] * weights["bad_crawler"]["different_user_agents"]
bad_crawler_score = bad_crawler_score + score["bad_crawler"]["attack_url"] * weights["bad_crawler"]["attack_url"]
regular_user_score = score["regular_user"]["risky_http_methods"] * weights["regular_user"]["risky_http_methods"]
regular_user_score = regular_user_score + score["regular_user"]["robots_violations"] * weights["regular_user"]["robots_violations"]
regular_user_score = regular_user_score + score["regular_user"]["uneven_request_timing"] * weights["regular_user"]["uneven_request_timing"]
regular_user_score = regular_user_score + score["regular_user"]["different_user_agents"] * weights["regular_user"]["different_user_agents"]
regular_user_score = regular_user_score + score["regular_user"]["attack_url"] * weights["regular_user"]["attack_url"]
print(f"Attacker score: {attacker_score}")
print(f"Good Crawler score: {good_crawler_score}")
print(f"Bad Crawler score: {bad_crawler_score}")
print(f"Regular User score: {regular_user_score}")
analyzed_metrics = {"risky_http_methods": http_method_attacker_score, "robots_violations": violated_robots_ratio, "uneven_request_timing": mean, "different_user_agents": user_agents_used, "attack_url": attack_urls_found_list}
category_scores = {"attacker": attacker_score, "good_crawler": good_crawler_score, "bad_crawler": bad_crawler_score, "regular_user": regular_user_score}
category = max(category_scores, key=category_scores.get)
last_analysis = datetime.utcnow()
self._db_manager.update_ip_stats_analysis(ip, analyzed_metrics, category, category_scores, last_analysis)
return 0

View File

@@ -34,6 +34,14 @@ class Config:
database_retention_days: int = 30
timezone: str = None # IANA timezone (e.g., 'America/New_York', 'Europe/Rome')
# Analyzer settings
http_risky_methods_threshold: float = None
violated_robots_threshold: float = None
uneven_request_timing_threshold: float = None
uneven_request_timing_time_window_seconds: float = None
user_agents_used_threshold: float = None
attack_urls_threshold: float = None
@staticmethod
# Try to fetch timezone before if not set
def get_system_timezone() -> str:
@@ -95,6 +103,7 @@ class Config:
api = data.get('api', {})
database = data.get('database', {})
behavior = data.get('behavior', {})
analyzer = data.get('analyzer') or {}
# Handle dashboard_secret_path - auto-generate if null/not set
dashboard_path = dashboard.get('secret_path')
@@ -129,6 +138,12 @@ class Config:
probability_error_codes=behavior.get('probability_error_codes', 0),
database_path=database.get('path', 'data/krawl.db'),
database_retention_days=database.get('retention_days', 30),
http_risky_methods_threshold=analyzer.get('http_risky_methods_threshold', 0.1),
violated_robots_threshold=analyzer.get('violated_robots_threshold', 0.1),
uneven_request_timing_threshold=analyzer.get('uneven_request_timing_threshold', 0.5), # coefficient of variation
uneven_request_timing_time_window_seconds=analyzer.get('uneven_request_timing_time_window_seconds', 300),
user_agents_used_threshold=analyzer.get('user_agents_used_threshold', 2),
attack_urls_threshold=analyzer.get('attack_urls_threshold', 1)
)

View File

@@ -13,7 +13,7 @@ from typing import Optional, List, Dict, Any
from sqlalchemy import create_engine, func, distinct, case
from sqlalchemy.orm import sessionmaker, scoped_session, Session
from models import Base, AccessLog, CredentialAttempt, AttackDetection, IpStats
from models import Base, AccessLog, CredentialAttempt, AttackDetection, IpStats, CategoryHistory
from sanitizer import (
sanitize_ip,
sanitize_path,
@@ -223,6 +223,108 @@ class DatabaseManager:
)
session.add(ip_stats)
def update_ip_stats_analysis(self, ip: str, analyzed_metrics: Dict[str, object], category: str, category_scores: Dict[str, int], last_analysis: datetime) -> None:
"""
Update IP statistics (ip is already persisted).
Records category change in history if category has changed.
Args:
ip: IP address to update
analyzed_metrics: metric values analyzed be the analyzer
category: inferred category
category_scores: inferred category scores
last_analysis: timestamp of last analysis
"""
print(f"Analyzed metrics {analyzed_metrics}, category {category}, category scores {category_scores}, last analysis {last_analysis}")
session = self.session
sanitized_ip = sanitize_ip(ip)
ip_stats = session.query(IpStats).filter(IpStats.ip == sanitized_ip).first()
# Check if category has changed and record it
old_category = ip_stats.category
if old_category != category:
self._record_category_change(sanitized_ip, old_category, category, last_analysis)
ip_stats.analyzed_metrics = analyzed_metrics
ip_stats.category = category
ip_stats.category_scores = category_scores
ip_stats.last_analysis = last_analysis
def manual_update_category(self, ip: str, category: str) -> None:
"""
Update IP category as a result of a manual intervention by an admin
Args:
ip: IP address to update
category: selected category
"""
session = self.session
sanitized_ip = sanitize_ip(ip)
ip_stats = session.query(IpStats).filter(IpStats.ip == sanitized_ip).first()
# Record the manual category change
old_category = ip_stats.category
if old_category != category:
self._record_category_change(sanitized_ip, old_category, category, datetime.utcnow())
ip_stats.category = category
ip_stats.manual_category = True
def _record_category_change(self, ip: str, old_category: Optional[str], new_category: str, timestamp: datetime) -> None:
"""
Internal method to record category changes in history.
Args:
ip: IP address
old_category: Previous category (None if first categorization)
new_category: New category
timestamp: When the change occurred
"""
session = self.session
try:
history_entry = CategoryHistory(
ip=ip,
old_category=old_category,
new_category=new_category,
timestamp=timestamp
)
session.add(history_entry)
session.commit()
except Exception as e:
session.rollback()
print(f"Error recording category change: {e}")
def get_category_history(self, ip: str) -> List[Dict[str, Any]]:
"""
Retrieve category change history for a specific IP.
Args:
ip: IP address to get history for
Returns:
List of category change records ordered by timestamp
"""
session = self.session
try:
sanitized_ip = sanitize_ip(ip)
history = session.query(CategoryHistory).filter(
CategoryHistory.ip == sanitized_ip
).order_by(CategoryHistory.timestamp.asc()).all()
return [
{
'old_category': h.old_category,
'new_category': h.new_category,
'timestamp': h.timestamp.isoformat()
}
for h in history
]
finally:
self.close_session()
def get_access_logs(
self,
limit: int = 100,
@@ -270,6 +372,56 @@ class DatabaseManager:
finally:
self.close_session()
# def persist_ip(
# self,
# ip: str
# ) -> Optional[int]:
# """
# Persist an ip entry to the database.
# Args:
# ip: Client IP address
# Returns:
# The ID of the created IpLog record, or None on error
# """
# session = self.session
# try:
# # Create access log with sanitized fields
# ip_log = AccessLog(
# ip=sanitize_ip(ip),
# manual_category = False
# )
# session.add(access_log)
# session.flush() # Get the ID before committing
# # Add attack detections if any
# if attack_types:
# matched_patterns = matched_patterns or {}
# for attack_type in attack_types:
# detection = AttackDetection(
# access_log_id=access_log.id,
# attack_type=attack_type[:50],
# matched_pattern=sanitize_attack_pattern(
# matched_patterns.get(attack_type, "")
# )
# )
# session.add(detection)
# # Update IP stats
# self._update_ip_stats(session, ip)
# session.commit()
# return access_log.id
# except Exception as e:
# session.rollback()
# # Log error but don't crash - database persistence is secondary to honeypot function
# print(f"Database error persisting access: {e}")
# return None
# finally:
# self.close_session()
def get_credential_attempts(
self,
limit: int = 100,
@@ -339,13 +491,58 @@ class DatabaseManager:
'asn': s.asn,
'asn_org': s.asn_org,
'reputation_score': s.reputation_score,
'reputation_source': s.reputation_source
'reputation_source': s.reputation_source,
'analyzed_metrics': s.analyzed_metrics,
'category': s.category,
'manual_category': s.manual_category,
'last_analysis': s.last_analysis
}
for s in stats
]
finally:
self.close_session()
def get_ip_stats_by_ip(self, ip: str) -> Optional[Dict[str, Any]]:
"""
Retrieve IP statistics for a specific IP address.
Args:
ip: The IP address to look up
Returns:
Dictionary with IP stats or None if not found
"""
session = self.session
try:
stat = session.query(IpStats).filter(IpStats.ip == ip).first()
if not stat:
return None
# Get category history for this IP
category_history = self.get_category_history(ip)
return {
'ip': stat.ip,
'total_requests': stat.total_requests,
'first_seen': stat.first_seen.isoformat() if stat.first_seen else None,
'last_seen': stat.last_seen.isoformat() if stat.last_seen else None,
'country_code': stat.country_code,
'city': stat.city,
'asn': stat.asn,
'asn_org': stat.asn_org,
'reputation_score': stat.reputation_score,
'reputation_source': stat.reputation_source,
'analyzed_metrics': stat.analyzed_metrics or {},
'category': stat.category,
'category_scores': stat.category_scores or {},
'manual_category': stat.manual_category,
'last_analysis': stat.last_analysis.isoformat() if stat.last_analysis else None,
'category_history': category_history
}
finally:
self.close_session()
def get_dashboard_counts(self) -> Dict[str, int]:
"""
Get aggregate statistics for the dashboard.
@@ -540,6 +737,47 @@ class DatabaseManager:
finally:
self.close_session()
# def get_ip_logs(
# self,
# limit: int = 100,
# offset: int = 0,
# ip_filter: Optional[str] = None
# ) -> List[Dict[str, Any]]:
# """
# Retrieve ip logs with optional filtering.
# Args:
# limit: Maximum number of records to return
# offset: Number of records to skip
# ip_filter: Filter by IP address
# Returns:
# List of ip log dictionaries
# """
# session = self.session
# try:
# query = session.query(IpLog).order_by(IpLog.last_access.desc())
# if ip_filter:
# query = query.filter(IpLog.ip == sanitize_ip(ip_filter))
# logs = query.offset(offset).limit(limit).all()
# return [
# {
# 'id': log.id,
# 'ip': log.ip,
# 'stats': log.stats,
# 'category': log.category,
# 'manual_category': log.manual_category,
# 'last_evaluation': log.last_evaluation,
# 'last_access': log.last_access
# }
# for log in logs
# ]
# finally:
# self.close_session()
# Module-level singleton instance
_db_manager = DatabaseManager()

View File

@@ -10,6 +10,7 @@ from urllib.parse import urlparse, parse_qs
from config import Config
from tracker import AccessTracker
from analyzer import Analyzer
from templates import html_templates
from templates.dashboard_template import generate_dashboard
from generators import (
@@ -27,6 +28,7 @@ class Handler(BaseHTTPRequestHandler):
webpages: Optional[List[str]] = None
config: Config = None
tracker: AccessTracker = None
analyzer: Analyzer = None
counter: int = 0
app_logger: logging.Logger = None
access_logger: logging.Logger = None
@@ -138,108 +140,25 @@ class Handler(BaseHTTPRequestHandler):
random.seed(seed)
num_pages = random.randint(*self.config.links_per_page_range)
html = f"""<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Krawl</title>
<style>
body {{
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
background-color: #0d1117;
color: #c9d1d9;
margin: 0;
padding: 40px 20px;
min-height: 100vh;
display: flex;
flex-direction: column;
align-items: center;
}}
.container {{
max-width: 1200px;
width: 100%;
}}
h1 {{
color: #f85149;
text-align: center;
font-size: 48px;
margin: 60px 0 30px;
}}
.counter {{
color: #f85149;
text-align: center;
font-size: 56px;
font-weight: bold;
margin-bottom: 60px;
}}
.links-container {{
display: flex;
flex-direction: column;
gap: 20px;
align-items: center;
}}
.link-box {{
background: #161b22;
border: 1px solid #30363d;
border-radius: 6px;
padding: 15px 30px;
min-width: 300px;
text-align: center;
transition: all 0.3s ease;
}}
.link-box:hover {{
background: #1c2128;
border-color: #58a6ff;
transform: translateY(-2px);
box-shadow: 0 4px 12px rgba(88, 166, 255, 0.2);
}}
a {{
color: #58a6ff;
text-decoration: none;
font-size: 20px;
font-weight: 700;
}}
a:hover {{
color: #79c0ff;
}}
.canary-token {{
background: #1c1917;
border: 2px solid #f85149;
border-radius: 8px;
padding: 30px 50px;
margin: 40px auto;
max-width: 800px;
overflow-x: auto;
}}
.canary-token a {{
color: #f85149;
font-size: 18px;
white-space: nowrap;
}}
</style>
</head>
<body>
<div class="container">
<h1>Krawl me! &#128376;</h1>
<div class="counter">{Handler.counter}</div>
# Build the content HTML
content = ""
<div class="links-container">
"""
# Add canary token if needed
if Handler.counter <= 0 and self.config.canary_token_url:
html += f"""
content += f"""
<div class="link-box canary-token">
<a href="{self.config.canary_token_url}">{self.config.canary_token_url}</a>
</div>
"""
# Add links
if self.webpages is None:
for _ in range(num_pages):
address = ''.join([
random.choice(self.config.char_space)
for _ in range(random.randint(*self.config.links_length_range))
])
html += f"""
content += f"""
<div class="link-box">
<a href="{address}">{address}</a>
</div>
@@ -247,18 +166,14 @@ class Handler(BaseHTTPRequestHandler):
else:
for _ in range(num_pages):
address = random.choice(self.webpages)
html += f"""
content += f"""
<div class="link-box">
<a href="{address}">{address}</a>
</div>
"""
html += """
</div>
</div>
</body>
</html>"""
return html
# Return the complete page using the template
return html_templates.main_page(Handler.counter, content)
def do_HEAD(self):
"""Sends header information"""
@@ -498,8 +413,37 @@ class Handler(BaseHTTPRequestHandler):
except Exception as e:
self.app_logger.error(f"Error generating dashboard: {e}")
return
# API endpoint for fetching IP stats
if self.config.dashboard_secret_path and self.path.startswith(f"{self.config.dashboard_secret_path}/api/ip-stats/"):
ip_address = self.path.replace(f"{self.config.dashboard_secret_path}/api/ip-stats/", "")
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
# Prevent browser caching - force fresh data from database every time
self.send_header('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0')
self.send_header('Pragma', 'no-cache')
self.send_header('Expires', '0')
self.end_headers()
try:
from database import get_database
import json
db = get_database()
ip_stats = db.get_ip_stats_by_ip(ip_address)
if ip_stats:
self.wfile.write(json.dumps(ip_stats).encode())
else:
self.wfile.write(json.dumps({'error': 'IP not found'}).encode())
except BrokenPipeError:
pass
except Exception as e:
self.app_logger.error(f"Error fetching IP stats: {e}")
self.wfile.write(json.dumps({'error': str(e)}).encode())
return
self.tracker.record_access(client_ip, self.path, user_agent, method='GET')
self.analyzer.infer_user_category(client_ip)
if self.tracker.is_suspicious_user_agent(user_agent):
self.access_logger.warning(f"[SUSPICIOUS] {client_ip} - {user_agent[:50]} - {self.path}")

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Migration script to add CategoryHistory table to existing databases.
Run this once to upgrade your database schema.
"""
import sys
from pathlib import Path
# Add parent directory to path to import modules
sys.path.insert(0, str(Path(__file__).parent.parent))
from database import get_database, DatabaseManager
from models import Base, CategoryHistory
def migrate():
"""Create CategoryHistory table if it doesn't exist."""
print("Starting migration: Adding CategoryHistory table...")
try:
db = get_database()
# Initialize database if not already done
if not db._initialized:
db.initialize()
# Create only the CategoryHistory table
CategoryHistory.__table__.create(db._engine, checkfirst=True)
print("✓ Migration completed successfully!")
print(" - CategoryHistory table created")
except Exception as e:
print(f"✗ Migration failed: {e}")
sys.exit(1)
if __name__ == "__main__":
migrate()

View File

@@ -6,9 +6,9 @@ Stores access logs, credential attempts, attack detections, and IP statistics.
"""
from datetime import datetime
from typing import Optional, List
from typing import Optional, List, Dict
from sqlalchemy import String, Integer, Boolean, DateTime, ForeignKey, Index
from sqlalchemy import String, Integer, Boolean, DateTime, ForeignKey, Index, JSON
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sanitizer import (
@@ -38,6 +38,7 @@ class AccessLog(Base):
__tablename__ = 'access_logs'
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
#ip: Mapped[str] = mapped_column(String(MAX_IP_LENGTH), nullable=False, index=True, ForeignKey('ip_logs.id', ondelete='CASCADE'))
ip: Mapped[str] = mapped_column(String(MAX_IP_LENGTH), nullable=False, index=True)
path: Mapped[str] = mapped_column(String(MAX_PATH_LENGTH), nullable=False)
user_agent: Mapped[Optional[str]] = mapped_column(String(MAX_USER_AGENT_LENGTH), nullable=True)
@@ -139,5 +140,68 @@ class IpStats(Base):
reputation_source: Mapped[Optional[str]] = mapped_column(String(MAX_REPUTATION_SOURCE_LENGTH), nullable=True)
reputation_updated: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
#Analyzed metrics, category and category scores
analyzed_metrics: Mapped[Dict[str,object]] = mapped_column(JSON, nullable=True)
category: Mapped[str] = mapped_column(String, nullable=True)
category_scores: Mapped[Dict[str,int]] = mapped_column(JSON, nullable=True)
manual_category: Mapped[bool] = mapped_column(Boolean, default=False, nullable=True)
last_analysis: Mapped[datetime] = mapped_column(DateTime, nullable=True)
def __repr__(self) -> str:
return f"<IpStats(ip='{self.ip}', total_requests={self.total_requests})>"
class CategoryHistory(Base):
"""
Records category changes for IP addresses over time.
Tracks when an IP's category changes, storing both the previous
and new category along with timestamp for timeline visualization.
"""
__tablename__ = 'category_history'
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
ip: Mapped[str] = mapped_column(String(MAX_IP_LENGTH), nullable=False, index=True)
old_category: Mapped[Optional[str]] = mapped_column(String(50), nullable=True)
new_category: Mapped[str] = mapped_column(String(50), nullable=False)
timestamp: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=datetime.utcnow, index=True)
# Composite index for efficient IP-based timeline queries
__table_args__ = (
Index('ix_category_history_ip_timestamp', 'ip', 'timestamp'),
)
def __repr__(self) -> str:
return f"<CategoryHistory(ip='{self.ip}', {self.old_category} -> {self.new_category})>"
# class IpLog(Base):
# """
# Records all IPs that have accessed the honeypot, along with aggregated stats and inferred user category.
# """
# __tablename__ = 'ip_logs'
# id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
# ip: Mapped[str] = mapped_column(String(MAX_IP_LENGTH), nullable=False, index=True)
# stats: Mapped[List[str]] = mapped_column(String(MAX_PATH_LENGTH))
# category: Mapped[str] = mapped_column(String(15))
# manual_category: Mapped[bool] = mapped_column(Boolean, default=False)
# last_analysis: Mapped[datetime] = mapped_column(DateTime, index=True),
# # Relationship to attack detections
# access_logs: Mapped[List["AccessLog"]] = relationship(
# "AccessLog",
# back_populates="ip",
# cascade="all, delete-orphan"
# )
# # Indexes for common queries
# __table_args__ = (
# Index('ix_access_logs_ip_timestamp', 'ip', 'timestamp'),
# Index('ix_access_logs_is_suspicious', 'is_suspicious'),
# Index('ix_access_logs_is_honeypot_trigger', 'is_honeypot_trigger'),
# )
# def __repr__(self) -> str:
# return f"<AccessLog(id={self.id}, ip='{self.ip}', path='{self.path[:50]}')>"

View File

@@ -10,6 +10,7 @@ from http.server import HTTPServer
from config import get_config
from tracker import AccessTracker
from analyzer import Analyzer
from handler import Handler
from logger import initialize_logging, get_app_logger, get_access_logger, get_credential_logger
from database import initialize_database
@@ -71,9 +72,11 @@ def main():
app_logger.warning(f'Database initialization failed: {e}. Continuing with in-memory only.')
tracker = AccessTracker(timezone=tz)
analyzer = Analyzer(timezone=tz)
Handler.config = config
Handler.tracker = tracker
Handler.analyzer = analyzer
Handler.counter = config.canary_token_tries
Handler.app_logger = app_logger
Handler.access_logger = access_logger

View File

@@ -27,9 +27,20 @@ def format_timestamp(iso_timestamp: str) -> str:
def generate_dashboard(stats: dict) -> str:
"""Generate dashboard HTML with access statistics"""
# Generate IP rows (IPs are generally safe but escape for consistency)
# Generate IP rows with clickable functionality for dropdown stats
top_ips_rows = '\n'.join([
f'<tr><td class="rank">{i+1}</td><td>{_escape(ip)}</td><td>{count}</td></tr>'
f'''<tr class="ip-row" data-ip="{_escape(ip)}">
<td class="rank">{i+1}</td>
<td class="ip-clickable">{_escape(ip)}</td>
<td>{count}</td>
</tr>
<tr class="ip-stats-row" id="stats-row-{_escape(ip).replace(".", "-")}" style="display: none;">
<td colspan="3" class="ip-stats-cell">
<div class="ip-stats-dropdown">
<div class="loading">Loading stats...</div>
</div>
</td>
</tr>'''
for i, (ip, count) in enumerate(stats['top_ips'])
]) or '<tr><td colspan="3" style="text-align:center;">No data</td></tr>'
@@ -45,27 +56,76 @@ def generate_dashboard(stats: dict) -> str:
for i, (ua, count) in enumerate(stats['top_user_agents'])
]) or '<tr><td colspan="3" style="text-align:center;">No data</td></tr>'
# Generate suspicious accesses rows (CRITICAL: multiple user-controlled fields)
# Generate suspicious accesses rows with clickable IPs
suspicious_rows = '\n'.join([
f'<tr><td>{_escape(log["ip"])}</td><td>{_escape(log["path"])}</td><td style="word-break: break-all;">{_escape(log["user_agent"][:60])}</td><td>{_escape(log["timestamp"].split("T")[1][:8])}</td></tr>'
f'''<tr class="ip-row" data-ip="{_escape(log["ip"])}">
<td class="ip-clickable">{_escape(log["ip"])}</td>
<td>{_escape(log["path"])}</td>
<td style="word-break: break-all;">{_escape(log["user_agent"][:60])}</td>
<td>{_escape(log["timestamp"].split("T")[1][:8])}</td>
</tr>
<tr class="ip-stats-row" id="stats-row-suspicious-{_escape(log["ip"]).replace(".", "-")}" style="display: none;">
<td colspan="4" class="ip-stats-cell">
<div class="ip-stats-dropdown">
<div class="loading">Loading stats...</div>
</div>
</td>
</tr>'''
for log in stats['recent_suspicious'][-10:]
]) or '<tr><td colspan="4" style="text-align:center;">No suspicious activity detected</td></tr>'
# Generate honeypot triggered IPs rows
# Generate honeypot triggered IPs rows with clickable IPs
honeypot_rows = '\n'.join([
f'<tr><td>{_escape(ip)}</td><td style="word-break: break-all;">{_escape(", ".join(paths))}</td><td>{len(paths)}</td></tr>'
f'''<tr class="ip-row" data-ip="{_escape(ip)}">
<td class="ip-clickable">{_escape(ip)}</td>
<td style="word-break: break-all;">{_escape(", ".join(paths))}</td>
<td>{len(paths)}</td>
</tr>
<tr class="ip-stats-row" id="stats-row-honeypot-{_escape(ip).replace(".", "-")}" style="display: none;">
<td colspan="3" class="ip-stats-cell">
<div class="ip-stats-dropdown">
<div class="loading">Loading stats...</div>
</div>
</td>
</tr>'''
for ip, paths in stats.get('honeypot_triggered_ips', [])
]) or '<tr><td colspan="3" style="text-align:center;">No honeypot triggers yet</td></tr>'
# Generate attack types rows (CRITICAL: paths and user agents are user-controlled)
# Generate attack types rows with clickable IPs
attack_type_rows = '\n'.join([
f'<tr><td>{_escape(log["ip"])}</td><td>{_escape(log["path"])}</td><td>{_escape(", ".join(log["attack_types"]))}</td><td style="word-break: break-all;">{_escape(log["user_agent"][:60])}</td><td>{_escape(log["timestamp"].split("T")[1][:8])}</td></tr>'
f'''<tr class="ip-row" data-ip="{_escape(log["ip"])}">
<td class="ip-clickable">{_escape(log["ip"])}</td>
<td>{_escape(log["path"])}</td>
<td>{_escape(", ".join(log["attack_types"]))}</td>
<td style="word-break: break-all;">{_escape(log["user_agent"][:60])}</td>
<td>{_escape(log["timestamp"].split("T")[1][:8])}</td>
</tr>
<tr class="ip-stats-row" id="stats-row-attack-{_escape(log["ip"]).replace(".", "-")}" style="display: none;">
<td colspan="5" class="ip-stats-cell">
<div class="ip-stats-dropdown">
<div class="loading">Loading stats...</div>
</div>
</td>
</tr>'''
for log in stats.get('attack_types', [])[-10:]
]) or '<tr><td colspan="4" style="text-align:center;">No attacks detected</td></tr>'
# Generate credential attempts rows (CRITICAL: usernames and passwords are user-controlled)
# Generate credential attempts rows with clickable IPs
credential_rows = '\n'.join([
f'<tr><td>{_escape(log["ip"])}</td><td>{_escape(log["username"])}</td><td>{_escape(log["password"])}</td><td>{_escape(log["path"])}</td><td>{_escape(log["timestamp"].split("T")[1][:8])}</td></tr>'
f'''<tr class="ip-row" data-ip="{_escape(log["ip"])}">
<td class="ip-clickable">{_escape(log["ip"])}</td>
<td>{_escape(log["username"])}</td>
<td>{_escape(log["password"])}</td>
<td>{_escape(log["path"])}</td>
<td>{_escape(log["timestamp"].split("T")[1][:8])}</td>
</tr>
<tr class="ip-stats-row" id="stats-row-cred-{_escape(log["ip"]).replace(".", "-")}" style="display: none;">
<td colspan="5" class="ip-stats-cell">
<div class="ip-stats-dropdown">
<div class="loading">Loading stats...</div>
</div>
</td>
</tr>'''
for log in stats.get('credential_attempts', [])[-20:]
]) or '<tr><td colspan="5" style="text-align:center;">No credentials captured yet</td></tr>'
@@ -156,11 +216,214 @@ def generate_dashboard(stats: dict) -> str:
background: #1c1917;
border-left: 4px solid #f85149;
}}
th.sortable {{
cursor: pointer;
user-select: none;
position: relative;
padding-right: 24px;
}}
th.sortable:hover {{
background: #1c2128;
}}
th.sortable::after {{
content: '';
position: absolute;
right: 8px;
opacity: 0.5;
font-size: 12px;
}}
th.sortable.asc::after {{
content: '';
opacity: 1;
}}
th.sortable.desc::after {{
content: '';
opacity: 1;
}}
.ip-row {{
transition: background-color 0.2s;
}}
.ip-clickable {{
cursor: pointer;
color: #58a6ff !important;
font-weight: 500;
text-decoration: underline;
text-decoration-style: dotted;
text-underline-offset: 3px;
}}
.ip-clickable:hover {{
color: #79c0ff !important;
text-decoration-style: solid;
background: #1c2128;
}}
.ip-stats-row {{
background: #0d1117;
}}
.ip-stats-cell {{
padding: 0 !important;
}}
.ip-stats-dropdown {{
margin-top: 10px;
padding: 15px;
background: #0d1117;
border: 1px solid #30363d;
border-radius: 6px;
font-size: 13px;
display: flex;
gap: 20px;
}}
.stats-left {{
flex: 1;
}}
.stats-right {{
flex: 0 0 200px;
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
}}
.radar-chart {{
position: relative;
width: 220px;
height: 220px;
overflow: visible;
}}
.radar-legend {{
margin-top: 10px;
font-size: 11px;
}}
.radar-legend-item {{
display: flex;
align-items: center;
gap: 6px;
margin: 3px 0;
}}
.radar-legend-color {{
width: 12px;
height: 12px;
border-radius: 2px;
}}
.ip-stats-dropdown .loading {{
color: #8b949e;
font-style: italic;
}}
.stat-row {{
display: flex;
justify-content: space-between;
padding: 5px 0;
border-bottom: 1px solid #21262d;
}}
.stat-row:last-child {{
border-bottom: none;
}}
.stat-label-sm {{
color: #8b949e;
font-weight: 500;
}}
.stat-value-sm {{
color: #58a6ff;
font-weight: 600;
}}
.category-badge {{
display: inline-block;
padding: 4px 8px;
border-radius: 4px;
font-size: 12px;
font-weight: 600;
text-transform: uppercase;
}}
.category-attacker {{
background: #f851491a;
color: #f85149;
border: 1px solid #f85149;
}}
.category-good-crawler {{
background: #3fb9501a;
color: #3fb950;
border: 1px solid #3fb950;
}}
.category-bad-crawler {{
background: #f0883e1a;
color: #f0883e;
border: 1px solid #f0883e;
}}
.category-regular-user {{
background: #58a6ff1a;
color: #58a6ff;
border: 1px solid #58a6ff;
}}
.timeline-container {{
margin-top: 15px;
padding-top: 15px;
border-top: 1px solid #30363d;
}}
.timeline-title {{
color: #58a6ff;
font-size: 13px;
font-weight: 600;
margin-bottom: 10px;
}}
.timeline {{
position: relative;
padding-left: 30px;
}}
.timeline::before {{
content: '';
position: absolute;
left: 12px;
top: 5px;
bottom: 5px;
width: 3px;
background: #30363d;
}}
.timeline-item {{
position: relative;
padding-bottom: 15px;
}}
.timeline-item:last-child {{
padding-bottom: 0;
}}
.timeline-marker {{
position: absolute;
left: -26px;
width: 16px;
height: 16px;
border-radius: 50%;
border: 2px solid #0d1117;
}}
.timeline-marker.attacker {{
background: #f85149;
}}
.timeline-marker.good-crawler {{
background: #3fb950;
}}
.timeline-marker.bad-crawler {{
background: #f0883e;
}}
.timeline-marker.regular-user {{
background: #58a6ff;
}}
.timeline-content {{
font-size: 12px;
}}
.timeline-category {{
font-weight: 600;
}}
.timeline-timestamp {{
color: #8b949e;
font-size: 11px;
margin-top: 2px;
}}
.timeline-arrow {{
color: #8b949e;
margin: 0 7px;
}}
</style>
</head>
<body>
<div class="container">
<h1>&#128375;&#65039; Krawl Dashboard</h1>
<h1>Krawl Dashboard</h1>
<div class="stats-grid">
<div class="stat-card">
@@ -190,13 +453,13 @@ def generate_dashboard(stats: dict) -> str:
</div>
<div class="table-container alert-section">
<h2>🍯 Honeypot Triggers by IP</h2>
<table>
<h2>Honeypot Triggers by IP</h2>
<table id="honeypot-table">
<thead>
<tr>
<th>IP Address</th>
<th class="sortable" data-sort="ip">IP Address</th>
<th>Accessed Paths</th>
<th>Count</th>
<th class="sortable" data-sort="count">Count</th>
</tr>
</thead>
<tbody>
@@ -206,7 +469,7 @@ def generate_dashboard(stats: dict) -> str:
</div>
<div class="table-container alert-section">
<h2>&#9888;&#65039; Recent Suspicious Activity</h2>
<h2>Recent Suspicious Activity</h2>
<table>
<thead>
<tr>
@@ -223,7 +486,7 @@ def generate_dashboard(stats: dict) -> str:
</div>
<div class="table-container alert-section">
<h2>🔑 Captured Credentials</h2>
<h2>Captured Credentials</h2>
<table>
<thead>
<tr>
@@ -241,7 +504,7 @@ def generate_dashboard(stats: dict) -> str:
</div>
<div class="table-container alert-section">
<h2>&#128520; Detected Attack Types</h2>
<h2>Detected Attack Types</h2>
<table>
<thead>
<tr>
@@ -306,6 +569,303 @@ def generate_dashboard(stats: dict) -> str:
</table>
</div>
</div>
<script>
// Add sorting functionality to tables
document.querySelectorAll('th.sortable').forEach(header => {{
header.addEventListener('click', function() {{
const table = this.closest('table');
const tbody = table.querySelector('tbody');
const rows = Array.from(tbody.querySelectorAll('tr'));
const sortType = this.getAttribute('data-sort');
const columnIndex = Array.from(this.parentElement.children).indexOf(this);
// Determine sort direction
const isAscending = this.classList.contains('asc');
// Remove sort classes from all headers in this table
table.querySelectorAll('th.sortable').forEach(th => {{
th.classList.remove('asc', 'desc');
}});
// Add appropriate class to clicked header
this.classList.add(isAscending ? 'desc' : 'asc');
// Sort rows
rows.sort((a, b) => {{
let aValue = a.cells[columnIndex].textContent.trim();
let bValue = b.cells[columnIndex].textContent.trim();
// Handle numeric sorting
if (sortType === 'count') {{
aValue = parseInt(aValue) || 0;
bValue = parseInt(bValue) || 0;
return isAscending ? bValue - aValue : aValue - bValue;
}}
// Handle IP address sorting
if (sortType === 'ip') {{
const ipToNum = ip => {{
const parts = ip.split('.');
if (parts.length !== 4) return 0;
return parts.reduce((acc, part, i) => acc + (parseInt(part) || 0) * Math.pow(256, 3 - i), 0);
}};
const aNum = ipToNum(aValue);
const bNum = ipToNum(bValue);
return isAscending ? bNum - aNum : aNum - bNum;
}}
// Default string sorting
if (isAscending) {{
return bValue.localeCompare(aValue);
}} else {{
return aValue.localeCompare(bValue);
}}
}});
// Re-append sorted rows
rows.forEach(row => tbody.appendChild(row));
}});
}});
// IP stats dropdown functionality
document.querySelectorAll('.ip-clickable').forEach(cell => {{
cell.addEventListener('click', async function(e) {{
const row = e.currentTarget.closest('.ip-row');
if (!row) return;
const ip = row.getAttribute('data-ip');
const statsRow = row.nextElementSibling;
if (!statsRow || !statsRow.classList.contains('ip-stats-row')) return;
const isVisible = getComputedStyle(statsRow).display !== 'none';
document.querySelectorAll('.ip-stats-row').forEach(r => {{
r.style.display = 'none';
}});
if (isVisible) return;
statsRow.style.display = 'table-row';
const dropdown = statsRow.querySelector('.ip-stats-dropdown');
// Always fetch fresh data from database
if (dropdown) {{
dropdown.innerHTML = '<div class="loading">Loading stats...</div>';
try {{
const response = await fetch(`${{window.location.pathname}}/api/ip-stats/${{ip}}`, {{
cache: 'no-store',
headers: {{
'Cache-Control': 'no-cache',
'Pragma': 'no-cache'
}}
}});
if (!response.ok) throw new Error(`HTTP ${{response.status}}`);
const data = await response.json();
dropdown.innerHTML = data.error
? `<div style="color:#f85149;">Error: ${{data.error}}</div>`
: formatIpStats(data);
}} catch (err) {{
dropdown.innerHTML = `<div style="color:#f85149;">Failed to load stats: ${{err.message}}</div>`;
}}
}}
}});
}});
function formatIpStats(stats) {{
let html = '<div class="stats-left">';
// Basic info
html += '<div class="stat-row">';
html += '<span class="stat-label-sm">Total Requests:</span>';
html += `<span class="stat-value-sm">${{stats.total_requests || 0}}</span>`;
html += '</div>';
html += '<div class="stat-row">';
html += '<span class="stat-label-sm">First Seen:</span>';
html += `<span class="stat-value-sm">${{stats.first_seen ? new Date(stats.first_seen).toLocaleString() : 'N/A'}}</span>`;
html += '</div>';
html += '<div class="stat-row">';
html += '<span class="stat-label-sm">Last Seen:</span>';
html += `<span class="stat-value-sm">${{stats.last_seen ? new Date(stats.last_seen).toLocaleString() : 'N/A'}}</span>`;
html += '</div>';
// Category
if (stats.category) {{
html += '<div class="stat-row">';
html += '<span class="stat-label-sm">Category:</span>';
const categoryClass = 'category-' + stats.category.toLowerCase().replace('_', '-');
html += `<span class="category-badge ${{categoryClass}}">${{stats.category}}</span>`;
html += '</div>';
}}
// GeoIP info if available
if (stats.country_code || stats.city) {{
html += '<div class="stat-row">';
html += '<span class="stat-label-sm">Location:</span>';
html += `<span class="stat-value-sm">${{stats.city || ''}}${{stats.city && stats.country_code ? ', ' : ''}}${{stats.country_code || 'Unknown'}}</span>`;
html += '</div>';
}}
if (stats.asn_org) {{
html += '<div class="stat-row">';
html += '<span class="stat-label-sm">ASN Org:</span>';
html += `<span class="stat-value-sm">${{stats.asn_org}}</span>`;
html += '</div>';
}}
// Reputation score if available
if (stats.reputation_score !== null && stats.reputation_score !== undefined) {{
html += '<div class="stat-row">';
html += '<span class="stat-label-sm">Reputation Score:</span>';
html += `<span class="stat-value-sm">${{stats.reputation_score}} ${{stats.reputation_source ? '(' + stats.reputation_source + ')' : ''}}</span>`;
html += '</div>';
}}
// Category History Timeline
if (stats.category_history && stats.category_history.length > 0) {{
html += '<div class="timeline-container">';
html += '<div class="timeline-title">Behavior Timeline</div>';
html += '<div class="timeline">';
stats.category_history.forEach((change, index) => {{
const categoryClass = change.new_category.toLowerCase().replace('_', '-');
const timestamp = new Date(change.timestamp).toLocaleString();
html += '<div class="timeline-item">';
html += `<div class="timeline-marker ${{categoryClass}}"></div>`;
html += '<div class="timeline-content">';
if (change.old_category) {{
const oldCategoryBadge = 'category-' + change.old_category.toLowerCase().replace('_', '-');
html += `<span class="category-badge ${{oldCategoryBadge}}">${{change.old_category}}</span>`;
html += '<span class="timeline-arrow">→</span>';
}} else {{
html += '<span style="color: #8b949e;">Initial:</span> ';
}}
const newCategoryBadge = 'category-' + change.new_category.toLowerCase().replace('_', '-');
html += `<span class="category-badge ${{newCategoryBadge}}">${{change.new_category}}</span>`;
html += `<div class="timeline-timestamp">${{timestamp}}</div>`;
html += '</div>';
html += '</div>';
}});
html += '</div>';
html += '</div>';
}}
html += '</div>';
// Radar chart on the right
if (stats.category_scores && Object.keys(stats.category_scores).length > 0) {{
html += '<div class="stats-right">';
html += '<div style="font-size: 13px; font-weight: 600; color: #58a6ff; margin-bottom: 10px;">Category Score</div>';
html += '<svg class="radar-chart" viewBox="-30 -30 260 260" preserveAspectRatio="xMidYMid meet">';
const scores = {{
attacker: stats.category_scores.attacker || 0,
good_crawler: stats.category_scores.good_crawler || 0,
bad_crawler: stats.category_scores.bad_crawler || 0,
regular_user: stats.category_scores.regular_user || 0
}};
// Normalize scores for better visualization
const maxScore = Math.max(...Object.values(scores), 1);
const minVisibleRadius = 0.15; // Minimum 15% visibility even for 0 values
const normalizedScores = {{}};
Object.keys(scores).forEach(key => {{
// Scale values: ensure minimum visibility + proportional to max
normalizedScores[key] = minVisibleRadius + (scores[key] / maxScore) * (1 - minVisibleRadius);
}});
const colors = {{
attacker: '#f85149',
good_crawler: '#3fb950',
bad_crawler: '#f0883e',
regular_user: '#58a6ff'
}};
const labels = {{
attacker: 'Attacker',
good_crawler: 'Good Bot',
bad_crawler: 'Bad Bot',
regular_user: 'User'
}};
// Draw radar background grid
const cx = 100, cy = 100, maxRadius = 75;
for (let i = 1; i <= 5; i++) {{
const r = (maxRadius / 5) * i;
html += `<circle cx="${{cx}}" cy="${{cy}}" r="${{r}}" fill="none" stroke="#30363d" stroke-width="0.5"/>`;
}}
// Draw axes
const angles = [0, 90, 180, 270];
const keys = ['good_crawler', 'regular_user', 'bad_crawler', 'attacker'];
angles.forEach((angle, i) => {{
const rad = (angle - 90) * Math.PI / 180;
const x2 = cx + maxRadius * Math.cos(rad);
const y2 = cy + maxRadius * Math.sin(rad);
html += `<line x1="${{cx}}" y1="${{cy}}" x2="${{x2}}" y2="${{y2}}" stroke="#30363d" stroke-width="0.5"/>`;
// Add labels at consistent distance
const labelDist = maxRadius + 35;
const lx = cx + labelDist * Math.cos(rad);
const ly = cy + labelDist * Math.sin(rad);
html += `<text x="${{lx}}" y="${{ly}}" fill="#8b949e" font-size="12" text-anchor="middle" dominant-baseline="middle">${{labels[keys[i]]}}</text>`;
}});
// Draw filled polygon for scores
let points = [];
angles.forEach((angle, i) => {{
const normalizedScore = normalizedScores[keys[i]];
const rad = (angle - 90) * Math.PI / 180;
const r = normalizedScore * maxRadius;
const x = cx + r * Math.cos(rad);
const y = cy + r * Math.sin(rad);
points.push(`${{x}},${{y}}`);
}});
// Determine dominant category color
const dominantKey = Object.keys(scores).reduce((a, b) => scores[a] > scores[b] ? a : b);
const dominantColor = colors[dominantKey];
// Draw single colored area
html += `<polygon points="${{points.join(' ')}}" fill="${{dominantColor}}" fill-opacity="0.4" stroke="${{dominantColor}}" stroke-width="2.5"/>`;
// Draw points
angles.forEach((angle, i) => {{
const normalizedScore = normalizedScores[keys[i]];
const rad = (angle - 90) * Math.PI / 180;
const r = normalizedScore * maxRadius;
const x = cx + r * Math.cos(rad);
const y = cy + r * Math.sin(rad);
html += `<circle cx="${{x}}" cy="${{y}}" r="4.5" fill="${{colors[keys[i]]}}" stroke="#0d1117" stroke-width="2"/>`;
}});
html += '</svg>';
// Legend
html += '<div class="radar-legend">';
keys.forEach(key => {{
html += '<div class="radar-legend-item">';
html += `<div class="radar-legend-color" style="background: ${{colors[key]}};"></div>`;
html += `<span style="color: #8b949e;">${{labels[key]}}: ${{scores[key]}} pt</span>`;
html += '</div>';
}});
html += '</div>';
html += '</div>';
}}
return html;
}}
</script>
</body>
</html>
"""

View File

@@ -0,0 +1,106 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Krawl</title>
<style>
body {{
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
background-color: #0d1117;
color: #c9d1d9;
margin: 0;
padding: 0;
height: 100vh;
display: flex;
flex-direction: column;
align-items: center;
overflow: hidden;
}}
.container {{
max-width: 1200px;
width: 100%;
height: 100vh;
display: flex;
flex-direction: column;
padding: 20px;
box-sizing: border-box;
}}
h1 {{
color: #f85149;
text-align: center;
font-size: 36px;
margin: 40px 0 20px 0;
flex-shrink: 0;
}}
.counter {{
color: #f85149;
text-align: center;
font-size: 32px;
font-weight: bold;
margin: 0 0 30px 0;
flex-shrink: 0;
}}
.links-container {{
display: flex;
flex-direction: column;
gap: 10px;
align-items: center;
overflow-y: auto;
overflow-x: hidden;
flex: 1;
padding-top: 10px;
}}
.links-container::-webkit-scrollbar {{
width: 0px;
}}
.link-box {{
background: #161b22;
border: 1px solid #30363d;
border-radius: 6px;
padding: 10px 20px;
min-width: 300px;
text-align: center;
transition: all 0.3s ease;
}}
.link-box:hover {{
background: #1c2128;
border-color: #58a6ff;
transform: translateY(-2px);
box-shadow: 0 4px 12px rgba(88, 166, 255, 0.2);
}}
a {{
color: #58a6ff;
text-decoration: none;
font-size: 16px;
font-weight: 700;
}}
a:hover {{
color: #79c0ff;
}}
.canary-token {{
background: #1c1917;
border: 2px solid #f85149;
border-radius: 8px;
padding: 20px 30px;
margin: 20px auto;
max-width: 800px;
overflow-x: auto;
}}
.canary-token a {{
color: #f85149;
font-size: 14px;
white-space: nowrap;
}}
</style>
</head>
<body>
<div class="container">
<h1>Krawl me!</h1>
<div class="counter">{counter}</div>
<div class="links-container">
{content}
</div>
</div>
</body>
</html>

View File

@@ -60,3 +60,8 @@ def product_search() -> str:
def input_form() -> str:
"""Generate input form page for XSS honeypot"""
return load_template("input_form")
def main_page(counter: int, content: str) -> str:
"""Generate main Krawl page with links and canary token"""
return load_template("main_page", counter=counter, content=content)

View File

@@ -125,8 +125,13 @@ class Wordlists:
def server_errors(self):
return self._data.get("server_errors", {})
@property
def server_headers(self):
return self._data.get("server_headers", [])
@property
def attack_urls(self):
return self._data.get("attack_urls", [])
_wordlists_instance = None

View File

@@ -358,5 +358,19 @@
"xss_attempt": "(<script|</script|javascript:|onerror=|onload=|onclick=|onmouseover=|onfocus=|onblur=|<iframe|<img|<svg|<embed|<object|<body|<input|eval\\(|alert\\(|prompt\\(|confirm\\(|document\\.|window\\.|<style|expression\\(|vbscript:|data:text/html)",
"common_probes": "(wp-admin|phpmyadmin|\\.env|\\.git|/admin|/config)",
"shell_injection": "(\\||;|`|\\$\\(|&&)"
},
"server_headers": [
"Apache/2.4.41 (Ubuntu)",
"nginx/1.18.0",
"Microsoft-IIS/10.0",
"cloudflare",
"AmazonS3",
"gunicorn/20.1.0"
],
"attack_urls": {
"path_traversal": "\\.\\.",
"sql_injection": "('|--|;|\bOR\b|\bUNION\b|\bSELECT\b|\bDROP\b)",
"xss_attempt": "(<script|javascript:|onerror=|onload=)",
"shell_injection": "(\\||;|`|\\$\\(|&&)"
}
}