Added POST log and dashboard for used credentials
This commit is contained in:
@@ -4,6 +4,7 @@ from typing import Dict, List, Tuple
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
import re
|
||||
import urllib.parse
|
||||
|
||||
|
||||
class AccessTracker:
|
||||
@@ -13,6 +14,7 @@ class AccessTracker:
|
||||
self.path_counts: Dict[str, int] = defaultdict(int)
|
||||
self.user_agent_counts: Dict[str, int] = defaultdict(int)
|
||||
self.access_log: List[Dict] = []
|
||||
self.credential_attempts: List[Dict] = []
|
||||
self.suspicious_patterns = [
|
||||
'bot', 'crawler', 'spider', 'scraper', 'curl', 'wget', 'python-requests',
|
||||
'scanner', 'nikto', 'sqlmap', 'nmap', 'masscan', 'nessus', 'acunetix',
|
||||
@@ -31,6 +33,57 @@ class AccessTracker:
|
||||
# Track IPs that accessed honeypot paths from robots.txt
|
||||
self.honeypot_triggered: Dict[str, List[str]] = defaultdict(list)
|
||||
|
||||
def parse_credentials(self, post_data: str) -> Tuple[str, str]:
|
||||
"""
|
||||
Parse username and password from POST data.
|
||||
Returns tuple (username, password) or (None, None) if not found.
|
||||
"""
|
||||
if not post_data:
|
||||
return None, None
|
||||
|
||||
username = None
|
||||
password = None
|
||||
|
||||
try:
|
||||
# Parse URL-encoded form data
|
||||
parsed = urllib.parse.parse_qs(post_data)
|
||||
|
||||
# Common username field names
|
||||
username_fields = ['username', 'user', 'login', 'email', 'log', 'userid', 'account']
|
||||
for field in username_fields:
|
||||
if field in parsed and parsed[field]:
|
||||
username = parsed[field][0]
|
||||
break
|
||||
|
||||
# Common password field names
|
||||
password_fields = ['password', 'pass', 'passwd', 'pwd', 'passphrase']
|
||||
for field in password_fields:
|
||||
if field in parsed and parsed[field]:
|
||||
password = parsed[field][0]
|
||||
break
|
||||
|
||||
except Exception:
|
||||
# If parsing fails, try simple regex patterns
|
||||
username_match = re.search(r'(?:username|user|login|email|log)=([^&\s]+)', post_data, re.IGNORECASE)
|
||||
password_match = re.search(r'(?:password|pass|passwd|pwd)=([^&\s]+)', post_data, re.IGNORECASE)
|
||||
|
||||
if username_match:
|
||||
username = urllib.parse.unquote_plus(username_match.group(1))
|
||||
if password_match:
|
||||
password = urllib.parse.unquote_plus(password_match.group(1))
|
||||
|
||||
return username, password
|
||||
|
||||
def record_credential_attempt(self, ip: str, path: str, username: str, password: str):
|
||||
"""Record a credential login attempt"""
|
||||
self.credential_attempts.append({
|
||||
'ip': ip,
|
||||
'path': path,
|
||||
'username': username,
|
||||
'password': password,
|
||||
'timestamp': datetime.now().isoformat()
|
||||
})
|
||||
|
||||
def record_access(self, ip: str, path: str, user_agent: str = '', body: str = ''):
|
||||
"""Record an access attempt"""
|
||||
self.ip_counts[ip] += 1
|
||||
@@ -146,5 +199,6 @@ class AccessTracker:
|
||||
'top_user_agents': self.get_top_user_agents(10),
|
||||
'recent_suspicious': self.get_suspicious_accesses(20),
|
||||
'honeypot_triggered_ips': self.get_honeypot_triggered_ips(),
|
||||
'attack_types': self.get_attack_type_accesses(20)
|
||||
'attack_types': self.get_attack_type_accesses(20),
|
||||
'credential_attempts': self.credential_attempts[-50:] # Last 50 attempts
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user