Added POST log and dashboard for used credentials

This commit is contained in:
Patrick Di Fazio
2025-12-27 19:17:27 +01:00
parent d13ceb4888
commit 828f04261f
5 changed files with 129 additions and 2 deletions

View File

@@ -3,6 +3,7 @@
import logging
import random
import time
from datetime import datetime
from http.server import BaseHTTPRequestHandler
from typing import Optional, List
@@ -25,6 +26,7 @@ class Handler(BaseHTTPRequestHandler):
counter: int = 0
app_logger: logging.Logger = None
access_logger: logging.Logger = None
credential_logger: logging.Logger = None
def _get_client_ip(self) -> str:
"""Extract client IP address from request, checking proxy headers first"""
@@ -213,6 +215,19 @@ class Handler(BaseHTTPRequestHandler):
self.access_logger.warning(f"[POST DATA] {post_data[:200]}")
# Parse and log credentials
username, password = self.tracker.parse_credentials(post_data)
if username or password:
# Log to dedicated credentials.log file
timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
credential_line = f"{timestamp}|{client_ip}|{username or 'N/A'}|{password or 'N/A'}|{self.path}"
self.credential_logger.info(credential_line)
# Also record in tracker for dashboard
self.tracker.record_credential_attempt(client_ip, self.path, username or 'N/A', password or 'N/A')
self.access_logger.warning(f"[CREDENTIALS CAPTURED] {client_ip} - Username: {username or 'N/A'} - Path: {self.path}")
# send the post data (body) to the record_access function so the post data can be used to detect suspicious things.
self.tracker.record_access(client_ip, self.path, user_agent, post_data)

View File

@@ -77,6 +77,22 @@ class LoggerManager:
access_stream_handler.setFormatter(log_format)
self._access_logger.addHandler(access_stream_handler)
# Setup credential logger (special format, no stream handler)
self._credential_logger = logging.getLogger("krawl.credentials")
self._credential_logger.setLevel(logging.INFO)
self._credential_logger.handlers.clear()
# Credential logger uses a simple format: timestamp|ip|username|password|path
credential_format = logging.Formatter("%(message)s")
credential_file_handler = RotatingFileHandler(
os.path.join(log_dir, "credentials.log"),
maxBytes=max_bytes,
backupCount=backup_count
)
credential_file_handler.setFormatter(credential_format)
self._credential_logger.addHandler(credential_file_handler)
self._initialized = True
@property
@@ -93,6 +109,13 @@ class LoggerManager:
self.initialize()
return self._access_logger
@property
def credentials(self) -> logging.Logger:
"""Get the credentials logger."""
if not self._initialized:
self.initialize()
return self._credential_logger
# Module-level singleton instance
_logger_manager = LoggerManager()
@@ -108,6 +131,11 @@ def get_access_logger() -> logging.Logger:
return _logger_manager.access
def get_credential_logger() -> logging.Logger:
"""Get the credential logger instance."""
return _logger_manager.credentials
def initialize_logging(log_dir: str = "logs") -> None:
"""Initialize the logging system."""
_logger_manager.initialize(log_dir)

View File

@@ -11,7 +11,7 @@ from http.server import HTTPServer
from config import Config
from tracker import AccessTracker
from handler import Handler
from logger import initialize_logging, get_app_logger, get_access_logger
from logger import initialize_logging, get_app_logger, get_access_logger, get_credential_logger
def print_usage():
@@ -45,6 +45,7 @@ def main():
initialize_logging()
app_logger = get_app_logger()
access_logger = get_access_logger()
credential_logger = get_credential_logger()
config = Config.from_env()
@@ -55,6 +56,7 @@ def main():
Handler.counter = config.canary_token_tries
Handler.app_logger = app_logger
Handler.access_logger = access_logger
Handler.credential_logger = credential_logger
if len(sys.argv) == 2:
try:

View File

@@ -45,6 +45,12 @@ def generate_dashboard(stats: dict) -> str:
for log in stats.get('attack_types', [])[-10:]
]) or '<tr><td colspan="4" style="text-align:center;">No attacks detected</td></tr>'
# Generate credential attempts rows
credential_rows = '\n'.join([
f'<tr><td>{log["ip"]}</td><td>{log["username"]}</td><td>{log["password"]}</td><td>{log["path"]}</td><td>{log["timestamp"].split("T")[1][:8]}</td></tr>'
for log in stats.get('credential_attempts', [])[-20:]
]) or '<tr><td colspan="5" style="text-align:center;">No credentials captured yet</td></tr>'
return f"""<!DOCTYPE html>
<html>
<head>
@@ -159,6 +165,10 @@ def generate_dashboard(stats: dict) -> str:
<div class="stat-value alert">{stats.get('honeypot_ips', 0)}</div>
<div class="stat-label">Honeypot Caught</div>
</div>
<div class="stat-card alert">
<div class="stat-value alert">{len(stats.get('credential_attempts', []))}</div>
<div class="stat-label">Credentials Captured</div>
</div>
</div>
<div class="table-container alert-section">
@@ -194,6 +204,24 @@ def generate_dashboard(stats: dict) -> str:
</table>
</div>
<div class="table-container alert-section">
<h2>🔑 Captured Credentials</h2>
<table>
<thead>
<tr>
<th>IP Address</th>
<th>Username</th>
<th>Password</th>
<th>Path</th>
<th>Time</th>
</tr>
</thead>
<tbody>
{credential_rows}
</tbody>
</table>
</div>
<div class="table-container alert-section">
<h2>&#128520; Detected Attack Types</h2>
<table>

View File

@@ -4,6 +4,7 @@ from typing import Dict, List, Tuple
from collections import defaultdict
from datetime import datetime
import re
import urllib.parse
class AccessTracker:
@@ -13,6 +14,7 @@ class AccessTracker:
self.path_counts: Dict[str, int] = defaultdict(int)
self.user_agent_counts: Dict[str, int] = defaultdict(int)
self.access_log: List[Dict] = []
self.credential_attempts: List[Dict] = []
self.suspicious_patterns = [
'bot', 'crawler', 'spider', 'scraper', 'curl', 'wget', 'python-requests',
'scanner', 'nikto', 'sqlmap', 'nmap', 'masscan', 'nessus', 'acunetix',
@@ -31,6 +33,57 @@ class AccessTracker:
# Track IPs that accessed honeypot paths from robots.txt
self.honeypot_triggered: Dict[str, List[str]] = defaultdict(list)
def parse_credentials(self, post_data: str) -> Tuple[str, str]:
"""
Parse username and password from POST data.
Returns tuple (username, password) or (None, None) if not found.
"""
if not post_data:
return None, None
username = None
password = None
try:
# Parse URL-encoded form data
parsed = urllib.parse.parse_qs(post_data)
# Common username field names
username_fields = ['username', 'user', 'login', 'email', 'log', 'userid', 'account']
for field in username_fields:
if field in parsed and parsed[field]:
username = parsed[field][0]
break
# Common password field names
password_fields = ['password', 'pass', 'passwd', 'pwd', 'passphrase']
for field in password_fields:
if field in parsed and parsed[field]:
password = parsed[field][0]
break
except Exception:
# If parsing fails, try simple regex patterns
username_match = re.search(r'(?:username|user|login|email|log)=([^&\s]+)', post_data, re.IGNORECASE)
password_match = re.search(r'(?:password|pass|passwd|pwd)=([^&\s]+)', post_data, re.IGNORECASE)
if username_match:
username = urllib.parse.unquote_plus(username_match.group(1))
if password_match:
password = urllib.parse.unquote_plus(password_match.group(1))
return username, password
def record_credential_attempt(self, ip: str, path: str, username: str, password: str):
"""Record a credential login attempt"""
self.credential_attempts.append({
'ip': ip,
'path': path,
'username': username,
'password': password,
'timestamp': datetime.now().isoformat()
})
def record_access(self, ip: str, path: str, user_agent: str = '', body: str = ''):
"""Record an access attempt"""
self.ip_counts[ip] += 1
@@ -146,5 +199,6 @@ class AccessTracker:
'top_user_agents': self.get_top_user_agents(10),
'recent_suspicious': self.get_suspicious_accesses(20),
'honeypot_triggered_ips': self.get_honeypot_triggered_ips(),
'attack_types': self.get_attack_type_accesses(20)
'attack_types': self.get_attack_type_accesses(20),
'credential_attempts': self.credential_attempts[-50:] # Last 50 attempts
}