fixing merge issues with main

This commit is contained in:
Phillip Tarrant
2026-01-03 14:40:08 -06:00
26 changed files with 1527 additions and 82 deletions

View File

@@ -4,6 +4,9 @@ LABEL org.opencontainers.image.source=https://github.com/BlessedRebuS/Krawl
WORKDIR /app
COPY requirements.txt /app/
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ /app/src/
COPY wordlists.json /app/

View File

@@ -187,6 +187,7 @@ To customize the deception server installation several **environment variables**
| `DASHBOARD_SECRET_PATH` | Custom dashboard path | Auto-generated |
| `PROBABILITY_ERROR_CODES` | Error response probability (0-100%) | `0` |
| `SERVER_HEADER` | HTTP Server header for deception | `Apache/2.2.22 (Ubuntu)` |
| `TIMEZONE` | IANA timezone for logs and dashboard (e.g., `America/New_York`, `Europe/Rome`) | System timezone |
## robots.txt
The actual (juicy) robots.txt configuration is the following

View File

@@ -25,6 +25,8 @@ services:
# - CANARY_TOKEN_URL=http://canarytokens.com/api/users/YOUR_TOKEN/passwords.txt
# Optional: Set custom dashboard path (auto-generated if not set)
# - DASHBOARD_SECRET_PATH=/my-secret-dashboard
# Optional: Set timezone for logs and dashboard (e.g., America/New_York, Europe/Rome)
# - TIMEZONE=UTC
restart: unless-stopped
healthcheck:
test: ["CMD", "python3", "-c", "import requests; requests.get('http://localhost:5000')"]

90
docs/coding-guidelines.md Normal file
View File

@@ -0,0 +1,90 @@
### Coding Standards
**Style & Structure**
- Prefer longer, explicit code over compact one-liners
- Always include docstrings for functions/classes + inline comments
- Strongly prefer OOP-style code (classes over functional/nested functions)
- Strong typing throughout (dataclasses, TypedDict, Enums, type hints)
- Value future-proofing and expanded usage insights
**Data Design**
- Use dataclasses for internal data modeling
- Typed JSON structures
- Functions return fully typed objects (no loose dicts)
- Snapshot files in JSON or YAML
- Human-readable fields (e.g., `sql_injection`, `xss_attempt`)
**Templates & UI**
- Don't mix large HTML/CSS blocks in Python code
- Prefer Jinja templates for HTML rendering
- Clean CSS, minimal inline clutter, readable template logic
**Writing & Documentation**
- Markdown documentation
- Clear section headers
- Roadmap/Phase/Feature-Session style documents
**Logging**
- Use singleton for logging found in `src\logger.py`
- Setup logging at app start:
```
initialize_logging()
app_logger = get_app_logger()
access_logger = get_access_logger()
credential_logger = get_credential_logger()
```
**Preferred Pip Packages**
- API/Web Server: Simple Python
- HTTP: Requests
- SQLite: Sqlalchemy
- Database Migrations: Alembic
### Error Handling
- Custom exception classes for domain-specific errors
- Consistent error response formats (JSON structure)
- Logging severity levels (ERROR vs WARNING)
### Configuration
- `.env` for secrets (never committed)
- Maintain `.env.example` in each component for documentation
- Typed config loaders using dataclasses
- Validation on startup
### Containerization & Deployment
- Explicit Dockerfiles
- Production-friendly hardening (distroless/slim when meaningful)
- Use git branch as tag
### Dependency Management
- Use `requirements.txt` and virtual environments (`python3 -m venv venv`)
- Use path `venv` for all virtual environments
- Pin versions to version ranges (or exact versions if pinning a particular version)
- Activate venv before running code (unless in Docker)
### Testing Standards
- Manual testing preferred for applications
- **tests:** Use shell scripts with curl/httpie for simulation and attack scripts.
- tests should be located in `tests` directory
### Git Standards
**Branch Strategy:**
- `master` - Production-ready code only
- `beta` - Public pre-release testing
- `dev` - Main development branch, integration point
**Workflow:**
- Feature work branches off `dev` (e.g., `feature/add-scheduler`)
- Merge features back to `dev` for testing
- Promote `dev` → `beta` for public testing (when applicable)
- Promote `beta` (or `dev`) → `master` for production
**Commit Messages:**
- Use conventional commit format: `feat:`, `fix:`, `docs:`, `refactor:`, etc.
- Keep commits atomic and focused
- Write clear, descriptive messages
**Tagging:**
- Tag releases on `master` with semantic versioning (e.g., `v1.2.3`)
- Optionally tag beta releases (e.g., `v1.2.3-beta.1`)

View File

@@ -16,3 +16,15 @@ data:
PROBABILITY_ERROR_CODES: {{ .Values.config.probabilityErrorCodes | quote }}
SERVER_HEADER: {{ .Values.config.serverHeader | quote }}
CANARY_TOKEN_URL: {{ .Values.config.canaryTokenUrl | quote }}
{{- if .Values.config.dashboardSecretPath }}
DASHBOARD_SECRET_PATH: {{ .Values.config.dashboardSecretPath | quote }}
{{- end }}
{{- if .Values.config.serverHeader }}
SERVER_HEADER: {{ .Values.config.serverHeader | quote }}
{{- end }}
{{- if .Values.config.timezone }}
TIMEZONE: {{ .Values.config.timezone | quote }}
{{- end }}
# Database configuration
DATABASE_PATH: {{ .Values.database.path | quote }}
DATABASE_RETENTION_DAYS: {{ .Values.database.retentionDays | quote }}

View File

@@ -54,6 +54,10 @@ spec:
mountPath: /app/wordlists.json
subPath: wordlists.json
readOnly: true
{{- if .Values.database.persistence.enabled }}
- name: database
mountPath: /app/data
{{- end }}
{{- with .Values.resources }}
resources:
{{- toYaml . | nindent 12 }}
@@ -62,6 +66,16 @@ spec:
- name: wordlists
configMap:
name: {{ include "krawl.fullname" . }}-wordlists
{{- if .Values.database.persistence.enabled }}
- name: database
{{- if .Values.database.persistence.existingClaim }}
persistentVolumeClaim:
claimName: {{ .Values.database.persistence.existingClaim }}
{{- else }}
persistentVolumeClaim:
claimName: {{ include "krawl.fullname" . }}-db
{{- end }}
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}

17
helm/templates/pvc.yaml Normal file
View File

@@ -0,0 +1,17 @@
{{- if and .Values.database.persistence.enabled (not .Values.database.persistence.existingClaim) }}
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ include "krawl.fullname" . }}-db
labels:
{{- include "krawl.labels" . | nindent 4 }}
spec:
accessModes:
- {{ .Values.database.persistence.accessMode }}
{{- if .Values.database.persistence.storageClassName }}
storageClassName: {{ .Values.database.persistence.storageClassName }}
{{- end }}
resources:
requests:
storage: {{ .Values.database.persistence.size }}
{{- end }}

View File

@@ -74,7 +74,29 @@ config:
canaryTokenTries: 10
probabilityErrorCodes: 0
serverHeader: "Apache/2.2.22 (Ubuntu)"
# timezone: "UTC"
# serverHeader: "Apache/2.2.22 (Ubuntu)"
# dashboardSecretPath: "/my-secret-dashboard"
# canaryTokenUrl: set-your-canary-token-url-here
# timezone: "UTC" # IANA timezone (e.g., "America/New_York", "Europe/Rome"). If not set, system timezone is used.
# Database configuration
database:
# Path to the SQLite database file
path: "data/krawl.db"
# Number of days to retain access logs and attack data
retentionDays: 30
# Persistence configuration
persistence:
enabled: true
# Storage class name (use default if not specified)
# storageClassName: ""
# Access mode for the persistent volume
accessMode: ReadWriteOnce
# Size of the persistent volume
size: 1Gi
# Optional: Use existing PVC
# existingClaim: ""
networkPolicy:
enabled: true
@@ -268,6 +290,17 @@ wordlists:
- .git/
- keys/
- credentials/
server_headers:
- Apache/2.2.22 (Ubuntu)
- nginx/1.18.0
- Microsoft-IIS/10.0
- LiteSpeed
- Caddy
- Gunicorn/20.0.4
- uvicorn/0.13.4
- Express
- Flask/1.1.2
- Django/3.1
error_codes:
- 400
- 401

View File

@@ -20,6 +20,9 @@ data:
CANARY_TOKEN_TRIES: "10"
PROBABILITY_ERROR_CODES: "0"
# CANARY_TOKEN_URL: set-your-canary-token-url-here
# Database configuration
DATABASE_PATH: "data/krawl.db"
DATABASE_RETENTION_DAYS: "30"
---
apiVersion: v1
kind: ConfigMap
@@ -227,6 +230,20 @@ data:
]
}
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: krawl-db
namespace: krawl-system
labels:
app: krawl-server
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
---
apiVersion: apps/v1
kind: Deployment
metadata:
@@ -260,6 +277,8 @@ spec:
mountPath: /app/wordlists.json
subPath: wordlists.json
readOnly: true
- name: database
mountPath: /app/data
resources:
requests:
memory: "64Mi"
@@ -271,6 +290,9 @@ spec:
- name: wordlists
configMap:
name: krawl-wordlists
- name: database
persistentVolumeClaim:
claimName: krawl-db
---
apiVersion: v1
kind: Service

View File

@@ -15,3 +15,7 @@ data:
PROBABILITY_ERROR_CODES: "0"
SERVER_HEADER: "Apache/2.2.22 (Ubuntu)"
# CANARY_TOKEN_URL: set-your-canary-token-url-here
# TIMEZONE: "UTC" # IANA timezone (e.g., "America/New_York", "Europe/Rome")
# Database configuration
DATABASE_PATH: "data/krawl.db"
DATABASE_RETENTION_DAYS: "30"

View File

@@ -31,6 +31,8 @@ spec:
mountPath: /app/wordlists.json
subPath: wordlists.json
readOnly: true
- name: database
mountPath: /app/data
resources:
requests:
memory: "64Mi"
@@ -42,3 +44,6 @@ spec:
- name: wordlists
configMap:
name: krawl-wordlists
- name: database
persistentVolumeClaim:
claimName: krawl-db

View File

@@ -5,6 +5,7 @@ resources:
- namespace.yaml
- configmap.yaml
- wordlists-configmap.yaml
- pvc.yaml
- deployment.yaml
- service.yaml
- network-policy.yaml

View File

@@ -0,0 +1,13 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: krawl-db
namespace: krawl-system
labels:
app: krawl-server
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi

8
requirements.txt Normal file
View File

@@ -0,0 +1,8 @@
# Krawl Honeypot Dependencies
# Install with: pip install -r requirements.txt
# Configuration
PyYAML>=6.0
# Database ORM
SQLAlchemy>=2.0.0,<3.0.0

View File

@@ -1,8 +1,14 @@
#!/usr/bin/env python3
import os
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Tuple
from zoneinfo import ZoneInfo
import time
import yaml
@dataclass
@@ -10,6 +16,7 @@ class Config:
"""Configuration class for the deception server"""
port: int = 5000
delay: int = 100 # milliseconds
server_header: str = ""
links_length_range: Tuple[int, int] = (5, 15)
links_per_page_range: Tuple[int, int] = (10, 15)
char_space: str = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
@@ -21,30 +28,116 @@ class Config:
api_server_port: int = 8080
api_server_path: str = "/api/v2/users"
probability_error_codes: int = 0 # Percentage (0-100)
server_header: str = "Apache/2.2.22 (Ubuntu)"
# Database settings
database_path: str = "data/krawl.db"
database_retention_days: int = 30
timezone: str = None # IANA timezone (e.g., 'America/New_York', 'Europe/Rome')
@staticmethod
# Try to fetch timezone before if not set
def get_system_timezone() -> str:
"""Get the system's default timezone"""
try:
if os.path.islink('/etc/localtime'):
tz_path = os.readlink('/etc/localtime')
if 'zoneinfo/' in tz_path:
return tz_path.split('zoneinfo/')[-1]
local_tz = time.tzname[time.daylight]
if local_tz and local_tz != 'UTC':
return local_tz
except Exception:
pass
# Default fallback to UTC
return 'UTC'
def get_timezone(self) -> ZoneInfo:
"""Get configured timezone as ZoneInfo object"""
if self.timezone:
try:
return ZoneInfo(self.timezone)
except Exception:
pass
system_tz = self.get_system_timezone()
try:
return ZoneInfo(system_tz)
except Exception:
return ZoneInfo('UTC')
@classmethod
def from_env(cls) -> 'Config':
"""Create configuration from environment variables"""
def from_yaml(cls) -> 'Config':
"""Create configuration from YAML file"""
config_location = os.getenv('CONFIG_LOCATION', 'config.yaml')
config_path = Path(__file__).parent.parent / config_location
try:
with open(config_path, 'r') as f:
data = yaml.safe_load(f)
except FileNotFoundError:
print(f"Error: Configuration file '{config_path}' not found.", file=sys.stderr)
print(f"Please create a config.yaml file or set CONFIG_LOCATION environment variable.", file=sys.stderr)
sys.exit(1)
except yaml.YAMLError as e:
print(f"Error: Invalid YAML in configuration file '{config_path}': {e}", file=sys.stderr)
sys.exit(1)
if data is None:
data = {}
# Extract nested values with defaults
server = data.get('server', {})
links = data.get('links', {})
canary = data.get('canary', {})
dashboard = data.get('dashboard', {})
api = data.get('api', {})
database = data.get('database', {})
behavior = data.get('behavior', {})
# Handle dashboard_secret_path - auto-generate if null/not set
dashboard_path = dashboard.get('secret_path')
if dashboard_path is None:
dashboard_path = f'/{os.urandom(16).hex()}'
else:
# ensure the dashboard path starts with a /
if dashboard_path[:1] != "/":
dashboard_path = f"/{dashboard_path}"
return cls(
port=int(os.getenv('PORT', 5000)),
delay=int(os.getenv('DELAY', 100)),
port=server.get('port', 5000),
delay=server.get('delay', 100),
server_header=server.get('server_header',""),
timezone=server.get('timezone'),
links_length_range=(
int(os.getenv('LINKS_MIN_LENGTH', 5)),
int(os.getenv('LINKS_MAX_LENGTH', 15))
links.get('min_length', 5),
links.get('max_length', 15)
),
links_per_page_range=(
int(os.getenv('LINKS_MIN_PER_PAGE', 10)),
int(os.getenv('LINKS_MAX_PER_PAGE', 15))
links.get('min_per_page', 10),
links.get('max_per_page', 15)
),
char_space=os.getenv('CHAR_SPACE', 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'),
max_counter=int(os.getenv('MAX_COUNTER', 10)),
canary_token_url=os.getenv('CANARY_TOKEN_URL'),
canary_token_tries=int(os.getenv('CANARY_TOKEN_TRIES', 10)),
dashboard_secret_path=os.getenv('DASHBOARD_SECRET_PATH', f'/{os.urandom(16).hex()}'),
api_server_url=os.getenv('API_SERVER_URL'),
api_server_port=int(os.getenv('API_SERVER_PORT', 8080)),
api_server_path=os.getenv('API_SERVER_PATH', '/api/v2/users'),
probability_error_codes=int(os.getenv('PROBABILITY_ERROR_CODES', 5)),
server_header=os.getenv('SERVER_HEADER', 'Apache/2.2.22 (Ubuntu)')
char_space=links.get('char_space', 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'),
max_counter=links.get('max_counter', 10),
canary_token_url=canary.get('token_url'),
canary_token_tries=canary.get('token_tries', 10),
dashboard_secret_path=dashboard_path,
api_server_url=api.get('server_url'),
api_server_port=api.get('server_port', 8080),
api_server_path=api.get('server_path', '/api/v2/users'),
probability_error_codes=behavior.get('probability_error_codes', 0),
database_path=database.get('path', 'data/krawl.db'),
database_retention_days=database.get('retention_days', 30),
)
_config_instance = None
def get_config() -> Config:
"""Get the singleton Config instance"""
global _config_instance
if _config_instance is None:
_config_instance = Config.from_yaml()
return _config_instance

555
src/database.py Normal file
View File

@@ -0,0 +1,555 @@
#!/usr/bin/env python3
"""
Database singleton module for the Krawl honeypot.
Provides SQLAlchemy session management and database initialization.
"""
import os
import stat
from datetime import datetime
from typing import Optional, List, Dict, Any
from sqlalchemy import create_engine, func, distinct, case
from sqlalchemy.orm import sessionmaker, scoped_session, Session
from models import Base, AccessLog, CredentialAttempt, AttackDetection, IpStats
from sanitizer import (
sanitize_ip,
sanitize_path,
sanitize_user_agent,
sanitize_credential,
sanitize_attack_pattern,
)
class DatabaseManager:
"""
Singleton database manager for the Krawl honeypot.
Handles database initialization, session management, and provides
methods for persisting access logs, credentials, and attack detections.
"""
_instance: Optional["DatabaseManager"] = None
def __new__(cls) -> "DatabaseManager":
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def initialize(self, database_path: str = "data/krawl.db") -> None:
"""
Initialize the database connection and create tables.
Args:
database_path: Path to the SQLite database file
"""
if self._initialized:
return
# Create data directory if it doesn't exist
data_dir = os.path.dirname(database_path)
if data_dir and not os.path.exists(data_dir):
os.makedirs(data_dir, exist_ok=True)
# Create SQLite database with check_same_thread=False for multi-threaded access
database_url = f"sqlite:///{database_path}"
self._engine = create_engine(
database_url,
connect_args={"check_same_thread": False},
echo=False # Set to True for SQL debugging
)
# Create session factory with scoped_session for thread safety
session_factory = sessionmaker(bind=self._engine)
self._Session = scoped_session(session_factory)
# Create all tables
Base.metadata.create_all(self._engine)
# Set restrictive file permissions (owner read/write only)
if os.path.exists(database_path):
try:
os.chmod(database_path, stat.S_IRUSR | stat.S_IWUSR) # 600
except OSError:
# May fail on some systems, not critical
pass
self._initialized = True
@property
def session(self) -> Session:
"""Get a thread-local database session."""
if not self._initialized:
raise RuntimeError("DatabaseManager not initialized. Call initialize() first.")
return self._Session()
def close_session(self) -> None:
"""Close the current thread-local session."""
if self._initialized:
self._Session.remove()
def persist_access(
self,
ip: str,
path: str,
user_agent: str = "",
method: str = "GET",
is_suspicious: bool = False,
is_honeypot_trigger: bool = False,
attack_types: Optional[List[str]] = None,
matched_patterns: Optional[Dict[str, str]] = None
) -> Optional[int]:
"""
Persist an access log entry to the database.
Args:
ip: Client IP address
path: Requested path
user_agent: Client user agent string
method: HTTP method (GET, POST, HEAD)
is_suspicious: Whether the request was flagged as suspicious
is_honeypot_trigger: Whether a honeypot path was accessed
attack_types: List of detected attack types
matched_patterns: Dict mapping attack_type to matched pattern
Returns:
The ID of the created AccessLog record, or None on error
"""
session = self.session
try:
# Create access log with sanitized fields
access_log = AccessLog(
ip=sanitize_ip(ip),
path=sanitize_path(path),
user_agent=sanitize_user_agent(user_agent),
method=method[:10],
is_suspicious=is_suspicious,
is_honeypot_trigger=is_honeypot_trigger,
timestamp=datetime.utcnow()
)
session.add(access_log)
session.flush() # Get the ID before committing
# Add attack detections if any
if attack_types:
matched_patterns = matched_patterns or {}
for attack_type in attack_types:
detection = AttackDetection(
access_log_id=access_log.id,
attack_type=attack_type[:50],
matched_pattern=sanitize_attack_pattern(
matched_patterns.get(attack_type, "")
)
)
session.add(detection)
# Update IP stats
self._update_ip_stats(session, ip)
session.commit()
return access_log.id
except Exception as e:
session.rollback()
# Log error but don't crash - database persistence is secondary to honeypot function
print(f"Database error persisting access: {e}")
return None
finally:
self.close_session()
def persist_credential(
self,
ip: str,
path: str,
username: Optional[str] = None,
password: Optional[str] = None
) -> Optional[int]:
"""
Persist a credential attempt to the database.
Args:
ip: Client IP address
path: Login form path
username: Submitted username
password: Submitted password
Returns:
The ID of the created CredentialAttempt record, or None on error
"""
session = self.session
try:
credential = CredentialAttempt(
ip=sanitize_ip(ip),
path=sanitize_path(path),
username=sanitize_credential(username),
password=sanitize_credential(password),
timestamp=datetime.utcnow()
)
session.add(credential)
session.commit()
return credential.id
except Exception as e:
session.rollback()
print(f"Database error persisting credential: {e}")
return None
finally:
self.close_session()
def _update_ip_stats(self, session: Session, ip: str) -> None:
"""
Update IP statistics (upsert pattern).
Args:
session: Active database session
ip: IP address to update
"""
sanitized_ip = sanitize_ip(ip)
now = datetime.utcnow()
ip_stats = session.query(IpStats).filter(IpStats.ip == sanitized_ip).first()
if ip_stats:
ip_stats.total_requests += 1
ip_stats.last_seen = now
else:
ip_stats = IpStats(
ip=sanitized_ip,
total_requests=1,
first_seen=now,
last_seen=now
)
session.add(ip_stats)
def get_access_logs(
self,
limit: int = 100,
offset: int = 0,
ip_filter: Optional[str] = None,
suspicious_only: bool = False
) -> List[Dict[str, Any]]:
"""
Retrieve access logs with optional filtering.
Args:
limit: Maximum number of records to return
offset: Number of records to skip
ip_filter: Filter by IP address
suspicious_only: Only return suspicious requests
Returns:
List of access log dictionaries
"""
session = self.session
try:
query = session.query(AccessLog).order_by(AccessLog.timestamp.desc())
if ip_filter:
query = query.filter(AccessLog.ip == sanitize_ip(ip_filter))
if suspicious_only:
query = query.filter(AccessLog.is_suspicious == True)
logs = query.offset(offset).limit(limit).all()
return [
{
'id': log.id,
'ip': log.ip,
'path': log.path,
'user_agent': log.user_agent,
'method': log.method,
'is_suspicious': log.is_suspicious,
'is_honeypot_trigger': log.is_honeypot_trigger,
'timestamp': log.timestamp.isoformat(),
'attack_types': [d.attack_type for d in log.attack_detections]
}
for log in logs
]
finally:
self.close_session()
def get_credential_attempts(
self,
limit: int = 100,
offset: int = 0,
ip_filter: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
Retrieve credential attempts with optional filtering.
Args:
limit: Maximum number of records to return
offset: Number of records to skip
ip_filter: Filter by IP address
Returns:
List of credential attempt dictionaries
"""
session = self.session
try:
query = session.query(CredentialAttempt).order_by(
CredentialAttempt.timestamp.desc()
)
if ip_filter:
query = query.filter(CredentialAttempt.ip == sanitize_ip(ip_filter))
attempts = query.offset(offset).limit(limit).all()
return [
{
'id': attempt.id,
'ip': attempt.ip,
'path': attempt.path,
'username': attempt.username,
'password': attempt.password,
'timestamp': attempt.timestamp.isoformat()
}
for attempt in attempts
]
finally:
self.close_session()
def get_ip_stats(self, limit: int = 100) -> List[Dict[str, Any]]:
"""
Retrieve IP statistics ordered by total requests.
Args:
limit: Maximum number of records to return
Returns:
List of IP stats dictionaries
"""
session = self.session
try:
stats = session.query(IpStats).order_by(
IpStats.total_requests.desc()
).limit(limit).all()
return [
{
'ip': s.ip,
'total_requests': s.total_requests,
'first_seen': s.first_seen.isoformat(),
'last_seen': s.last_seen.isoformat(),
'country_code': s.country_code,
'city': s.city,
'asn': s.asn,
'asn_org': s.asn_org,
'reputation_score': s.reputation_score,
'reputation_source': s.reputation_source
}
for s in stats
]
finally:
self.close_session()
def get_dashboard_counts(self) -> Dict[str, int]:
"""
Get aggregate statistics for the dashboard.
Returns:
Dictionary with total_accesses, unique_ips, unique_paths,
suspicious_accesses, honeypot_triggered, honeypot_ips
"""
session = self.session
try:
# Get main aggregate counts in one query
result = session.query(
func.count(AccessLog.id).label('total_accesses'),
func.count(distinct(AccessLog.ip)).label('unique_ips'),
func.count(distinct(AccessLog.path)).label('unique_paths'),
func.sum(case((AccessLog.is_suspicious == True, 1), else_=0)).label('suspicious_accesses'),
func.sum(case((AccessLog.is_honeypot_trigger == True, 1), else_=0)).label('honeypot_triggered')
).first()
# Get unique IPs that triggered honeypots
honeypot_ips = session.query(
func.count(distinct(AccessLog.ip))
).filter(AccessLog.is_honeypot_trigger == True).scalar() or 0
return {
'total_accesses': result.total_accesses or 0,
'unique_ips': result.unique_ips or 0,
'unique_paths': result.unique_paths or 0,
'suspicious_accesses': int(result.suspicious_accesses or 0),
'honeypot_triggered': int(result.honeypot_triggered or 0),
'honeypot_ips': honeypot_ips
}
finally:
self.close_session()
def get_top_ips(self, limit: int = 10) -> List[tuple]:
"""
Get top IP addresses by access count.
Args:
limit: Maximum number of results
Returns:
List of (ip, count) tuples ordered by count descending
"""
session = self.session
try:
results = session.query(
AccessLog.ip,
func.count(AccessLog.id).label('count')
).group_by(AccessLog.ip).order_by(
func.count(AccessLog.id).desc()
).limit(limit).all()
return [(row.ip, row.count) for row in results]
finally:
self.close_session()
def get_top_paths(self, limit: int = 10) -> List[tuple]:
"""
Get top paths by access count.
Args:
limit: Maximum number of results
Returns:
List of (path, count) tuples ordered by count descending
"""
session = self.session
try:
results = session.query(
AccessLog.path,
func.count(AccessLog.id).label('count')
).group_by(AccessLog.path).order_by(
func.count(AccessLog.id).desc()
).limit(limit).all()
return [(row.path, row.count) for row in results]
finally:
self.close_session()
def get_top_user_agents(self, limit: int = 10) -> List[tuple]:
"""
Get top user agents by access count.
Args:
limit: Maximum number of results
Returns:
List of (user_agent, count) tuples ordered by count descending
"""
session = self.session
try:
results = session.query(
AccessLog.user_agent,
func.count(AccessLog.id).label('count')
).filter(
AccessLog.user_agent.isnot(None),
AccessLog.user_agent != ''
).group_by(AccessLog.user_agent).order_by(
func.count(AccessLog.id).desc()
).limit(limit).all()
return [(row.user_agent, row.count) for row in results]
finally:
self.close_session()
def get_recent_suspicious(self, limit: int = 20) -> List[Dict[str, Any]]:
"""
Get recent suspicious access attempts.
Args:
limit: Maximum number of results
Returns:
List of access log dictionaries with is_suspicious=True
"""
session = self.session
try:
logs = session.query(AccessLog).filter(
AccessLog.is_suspicious == True
).order_by(AccessLog.timestamp.desc()).limit(limit).all()
return [
{
'ip': log.ip,
'path': log.path,
'user_agent': log.user_agent,
'timestamp': log.timestamp.isoformat()
}
for log in logs
]
finally:
self.close_session()
def get_honeypot_triggered_ips(self) -> List[tuple]:
"""
Get IPs that triggered honeypot paths with the paths they accessed.
Returns:
List of (ip, [paths]) tuples
"""
session = self.session
try:
# Get all honeypot triggers grouped by IP
results = session.query(
AccessLog.ip,
AccessLog.path
).filter(
AccessLog.is_honeypot_trigger == True
).all()
# Group paths by IP
ip_paths: Dict[str, List[str]] = {}
for row in results:
if row.ip not in ip_paths:
ip_paths[row.ip] = []
if row.path not in ip_paths[row.ip]:
ip_paths[row.ip].append(row.path)
return [(ip, paths) for ip, paths in ip_paths.items()]
finally:
self.close_session()
def get_recent_attacks(self, limit: int = 20) -> List[Dict[str, Any]]:
"""
Get recent access logs that have attack detections.
Args:
limit: Maximum number of results
Returns:
List of access log dicts with attack_types included
"""
session = self.session
try:
# Get access logs that have attack detections
logs = session.query(AccessLog).join(
AttackDetection
).order_by(AccessLog.timestamp.desc()).limit(limit).all()
return [
{
'ip': log.ip,
'path': log.path,
'user_agent': log.user_agent,
'timestamp': log.timestamp.isoformat(),
'attack_types': [d.attack_type for d in log.attack_detections]
}
for log in logs
]
finally:
self.close_session()
# Module-level singleton instance
_db_manager = DatabaseManager()
def get_database() -> DatabaseManager:
"""Get the database manager singleton instance."""
return _db_manager
def initialize_database(database_path: str = "data/krawl.db") -> None:
"""Initialize the database system."""
_db_manager.initialize(database_path)

View File

@@ -9,7 +9,8 @@ import string
import json
from templates import html_templates
from wordlists import get_wordlists
from config import Config
from logger import get_app_logger
def random_username() -> str:
"""Generate random username"""
@@ -36,6 +37,16 @@ def random_email(username: str = None) -> str:
username = random_username()
return f"{username}@{random.choice(wl.email_domains)}"
def random_server_header() -> str:
"""Generate random server header"""
if Config.from_env().server_header:
server_header = Config.from_env().server_header
else:
wl = get_wordlists()
server_header = random.choice(wl.server_headers)
return server_header
def random_api_key() -> str:
"""Generate random API key"""

View File

@@ -14,7 +14,7 @@ from templates import html_templates
from templates.dashboard_template import generate_dashboard
from generators import (
credentials_txt, passwords_txt, users_json, api_keys_json,
api_response, directory_listing
api_response, directory_listing, random_server_header
)
from wordlists import get_wordlists
from sql_errors import generate_sql_error_response, get_sql_response_with_data
@@ -56,7 +56,7 @@ class Handler(BaseHTTPRequestHandler):
def version_string(self) -> str:
"""Return custom server version for deception."""
return self.config.server_header
return random_server_header()
def _should_return_error(self) -> bool:
"""Check if we should return an error based on probability"""
@@ -342,17 +342,21 @@ class Handler(BaseHTTPRequestHandler):
self.access_logger.warning(f"[POST DATA] {post_data[:200]}")
# Parse and log credentials
username, password = self.tracker.parse_credentials(post_data)
if username or password:
# Log to dedicated credentials.log file
timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
credential_line = f"{timestamp}|{client_ip}|{username or 'N/A'}|{password or 'N/A'}|{self.path}"
self.credential_logger.info(credential_line)
# Also record in tracker for dashboard
self.tracker.record_credential_attempt(client_ip, self.path, username or 'N/A', password or 'N/A')
self.access_logger.warning(f"[CREDENTIALS CAPTURED] {client_ip} - Username: {username or 'N/A'} - Path: {self.path}")
self.tracker.record_access(client_ip, self.path, user_agent, post_data)
# send the post data (body) to the record_access function so the post data can be used to detect suspicious things.
self.tracker.record_access(client_ip, self.path, user_agent, post_data, method='POST')
time.sleep(1)
@@ -495,7 +499,7 @@ class Handler(BaseHTTPRequestHandler):
self.app_logger.error(f"Error generating dashboard: {e}")
return
self.tracker.record_access(client_ip, self.path, user_agent)
self.tracker.record_access(client_ip, self.path, user_agent, method='GET')
if self.tracker.is_suspicious_user_agent(user_agent):
self.access_logger.warning(f"[SUSPICIOUS] {client_ip} - {user_agent[:50]} - {self.path}")

View File

@@ -8,6 +8,23 @@ Provides two loggers: app (application) and access (HTTP access logs).
import logging
import os
from logging.handlers import RotatingFileHandler
from typing import Optional
from zoneinfo import ZoneInfo
from datetime import datetime
class TimezoneFormatter(logging.Formatter):
"""Custom formatter that respects configured timezone"""
def __init__(self, fmt=None, datefmt=None, timezone: Optional[ZoneInfo] = None):
super().__init__(fmt, datefmt)
self.timezone = timezone or ZoneInfo('UTC')
def formatTime(self, record, datefmt=None):
"""Override formatTime to use configured timezone"""
dt = datetime.fromtimestamp(record.created, tz=self.timezone)
if datefmt:
return dt.strftime(datefmt)
return dt.isoformat()
class LoggerManager:
@@ -20,23 +37,27 @@ class LoggerManager:
cls._instance._initialized = False
return cls._instance
def initialize(self, log_dir: str = "logs") -> None:
def initialize(self, log_dir: str = "logs", timezone: Optional[ZoneInfo] = None) -> None:
"""
Initialize the logging system with rotating file handlers.
Args:
log_dir: Directory for log files (created if not exists)
timezone: ZoneInfo timezone for log timestamps (defaults to UTC)
"""
if self._initialized:
return
self.timezone = timezone or ZoneInfo('UTC')
# Create log directory if it doesn't exist
os.makedirs(log_dir, exist_ok=True)
# Common format for all loggers
log_format = logging.Formatter(
log_format = TimezoneFormatter(
"[%(asctime)s] %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
datefmt="%Y-%m-%d %H:%M:%S",
timezone=self.timezone
)
# Rotation settings: 1MB max, 5 backups
@@ -83,7 +104,7 @@ class LoggerManager:
self._credential_logger.handlers.clear()
# Credential logger uses a simple format: timestamp|ip|username|password|path
credential_format = logging.Formatter("%(message)s")
credential_format = TimezoneFormatter("%(message)s", timezone=self.timezone)
credential_file_handler = RotatingFileHandler(
os.path.join(log_dir, "credentials.log"),
@@ -136,6 +157,6 @@ def get_credential_logger() -> logging.Logger:
return _logger_manager.credentials
def initialize_logging(log_dir: str = "logs") -> None:
def initialize_logging(log_dir: str = "logs", timezone: Optional[ZoneInfo] = None) -> None:
"""Initialize the logging system."""
_logger_manager.initialize(log_dir)
_logger_manager.initialize(log_dir, timezone)

143
src/models.py Normal file
View File

@@ -0,0 +1,143 @@
#!/usr/bin/env python3
"""
SQLAlchemy ORM models for the Krawl honeypot database.
Stores access logs, credential attempts, attack detections, and IP statistics.
"""
from datetime import datetime
from typing import Optional, List
from sqlalchemy import String, Integer, Boolean, DateTime, ForeignKey, Index
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sanitizer import (
MAX_IP_LENGTH,
MAX_PATH_LENGTH,
MAX_USER_AGENT_LENGTH,
MAX_CREDENTIAL_LENGTH,
MAX_ATTACK_PATTERN_LENGTH,
MAX_CITY_LENGTH,
MAX_ASN_ORG_LENGTH,
MAX_REPUTATION_SOURCE_LENGTH,
)
class Base(DeclarativeBase):
"""Base class for all ORM models."""
pass
class AccessLog(Base):
"""
Records all HTTP requests to the honeypot.
Stores request metadata, suspicious activity flags, and timestamps
for analysis and dashboard display.
"""
__tablename__ = 'access_logs'
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
ip: Mapped[str] = mapped_column(String(MAX_IP_LENGTH), nullable=False, index=True)
path: Mapped[str] = mapped_column(String(MAX_PATH_LENGTH), nullable=False)
user_agent: Mapped[Optional[str]] = mapped_column(String(MAX_USER_AGENT_LENGTH), nullable=True)
method: Mapped[str] = mapped_column(String(10), nullable=False, default='GET')
is_suspicious: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
is_honeypot_trigger: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
timestamp: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=datetime.utcnow, index=True)
# Relationship to attack detections
attack_detections: Mapped[List["AttackDetection"]] = relationship(
"AttackDetection",
back_populates="access_log",
cascade="all, delete-orphan"
)
# Indexes for common queries
__table_args__ = (
Index('ix_access_logs_ip_timestamp', 'ip', 'timestamp'),
Index('ix_access_logs_is_suspicious', 'is_suspicious'),
Index('ix_access_logs_is_honeypot_trigger', 'is_honeypot_trigger'),
)
def __repr__(self) -> str:
return f"<AccessLog(id={self.id}, ip='{self.ip}', path='{self.path[:50]}')>"
class CredentialAttempt(Base):
"""
Records captured login attempts from honeypot login forms.
Stores the submitted username and password along with request metadata.
"""
__tablename__ = 'credential_attempts'
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
ip: Mapped[str] = mapped_column(String(MAX_IP_LENGTH), nullable=False, index=True)
path: Mapped[str] = mapped_column(String(MAX_PATH_LENGTH), nullable=False)
username: Mapped[Optional[str]] = mapped_column(String(MAX_CREDENTIAL_LENGTH), nullable=True)
password: Mapped[Optional[str]] = mapped_column(String(MAX_CREDENTIAL_LENGTH), nullable=True)
timestamp: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=datetime.utcnow, index=True)
# Composite index for common queries
__table_args__ = (
Index('ix_credential_attempts_ip_timestamp', 'ip', 'timestamp'),
)
def __repr__(self) -> str:
return f"<CredentialAttempt(id={self.id}, ip='{self.ip}', username='{self.username}')>"
class AttackDetection(Base):
"""
Records detected attack patterns in requests.
Linked to the parent AccessLog record. Multiple attack types can be
detected in a single request.
"""
__tablename__ = 'attack_detections'
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
access_log_id: Mapped[int] = mapped_column(
Integer,
ForeignKey('access_logs.id', ondelete='CASCADE'),
nullable=False,
index=True
)
attack_type: Mapped[str] = mapped_column(String(50), nullable=False)
matched_pattern: Mapped[Optional[str]] = mapped_column(String(MAX_ATTACK_PATTERN_LENGTH), nullable=True)
# Relationship back to access log
access_log: Mapped["AccessLog"] = relationship("AccessLog", back_populates="attack_detections")
def __repr__(self) -> str:
return f"<AttackDetection(id={self.id}, type='{self.attack_type}')>"
class IpStats(Base):
"""
Aggregated statistics per IP address.
Includes fields for future GeoIP and reputation enrichment.
Updated on each request from an IP.
"""
__tablename__ = 'ip_stats'
ip: Mapped[str] = mapped_column(String(MAX_IP_LENGTH), primary_key=True)
total_requests: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
first_seen: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=datetime.utcnow)
last_seen: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=datetime.utcnow)
# GeoIP fields (populated by future enrichment)
country_code: Mapped[Optional[str]] = mapped_column(String(2), nullable=True)
city: Mapped[Optional[str]] = mapped_column(String(MAX_CITY_LENGTH), nullable=True)
asn: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
asn_org: Mapped[Optional[str]] = mapped_column(String(MAX_ASN_ORG_LENGTH), nullable=True)
# Reputation fields (populated by future enrichment)
reputation_score: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
reputation_source: Mapped[Optional[str]] = mapped_column(String(MAX_REPUTATION_SOURCE_LENGTH), nullable=True)
reputation_updated: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
def __repr__(self) -> str:
return f"<IpStats(ip='{self.ip}', total_requests={self.total_requests})>"

113
src/sanitizer.py Normal file
View File

@@ -0,0 +1,113 @@
#!/usr/bin/env python3
"""
Sanitization utilities for safe database storage and HTML output.
Protects against SQL injection payloads, XSS, and storage exhaustion attacks.
"""
import html
import re
from typing import Optional
# Field length limits for database storage
MAX_IP_LENGTH = 45 # IPv6 max length
MAX_PATH_LENGTH = 2048 # URL max practical length
MAX_USER_AGENT_LENGTH = 512
MAX_CREDENTIAL_LENGTH = 256
MAX_ATTACK_PATTERN_LENGTH = 256
MAX_CITY_LENGTH = 128
MAX_ASN_ORG_LENGTH = 256
MAX_REPUTATION_SOURCE_LENGTH = 64
def sanitize_for_storage(value: Optional[str], max_length: int) -> str:
"""
Sanitize and truncate string for safe database storage.
Removes null bytes and control characters that could cause issues
with database storage or log processing.
Args:
value: The string to sanitize
max_length: Maximum length to truncate to
Returns:
Sanitized and truncated string, empty string if input is None/empty
"""
if not value:
return ""
# Convert to string if not already
value = str(value)
# Remove null bytes and control characters (except newline \n, tab \t, carriage return \r)
# Control chars are 0x00-0x1F and 0x7F, we keep 0x09 (tab), 0x0A (newline), 0x0D (carriage return)
cleaned = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', '', value)
# Truncate to max length
return cleaned[:max_length]
def sanitize_ip(value: Optional[str]) -> str:
"""Sanitize IP address for storage."""
return sanitize_for_storage(value, MAX_IP_LENGTH)
def sanitize_path(value: Optional[str]) -> str:
"""Sanitize URL path for storage."""
return sanitize_for_storage(value, MAX_PATH_LENGTH)
def sanitize_user_agent(value: Optional[str]) -> str:
"""Sanitize user agent string for storage."""
return sanitize_for_storage(value, MAX_USER_AGENT_LENGTH)
def sanitize_credential(value: Optional[str]) -> str:
"""Sanitize username or password for storage."""
return sanitize_for_storage(value, MAX_CREDENTIAL_LENGTH)
def sanitize_attack_pattern(value: Optional[str]) -> str:
"""Sanitize matched attack pattern for storage."""
return sanitize_for_storage(value, MAX_ATTACK_PATTERN_LENGTH)
def escape_html(value: Optional[str]) -> str:
"""
Escape HTML special characters for safe display in web pages.
Prevents stored XSS attacks when displaying user-controlled data
in the dashboard.
Args:
value: The string to escape
Returns:
HTML-escaped string, empty string if input is None/empty
"""
if not value:
return ""
return html.escape(str(value))
def escape_html_truncated(value: Optional[str], max_display_length: int) -> str:
"""
Escape HTML and truncate for display.
Args:
value: The string to escape and truncate
max_display_length: Maximum display length (truncation happens before escaping)
Returns:
HTML-escaped and truncated string
"""
if not value:
return ""
value_str = str(value)
if len(value_str) > max_display_length:
value_str = value_str[:max_display_length] + "..."
return html.escape(value_str)

View File

@@ -12,6 +12,7 @@ from config import Config
from tracker import AccessTracker
from handler import Handler
from logger import initialize_logging, get_app_logger, get_access_logger, get_credential_logger
from database import initialize_database
def print_usage():
@@ -33,6 +34,10 @@ def print_usage():
print(' PROBABILITY_ERROR_CODES - Probability (0-100) to return HTTP error codes (default: 0)')
print(' CHAR_SPACE - Characters for random links')
print(' SERVER_HEADER - HTTP Server header for deception (default: Apache/2.2.22 (Ubuntu))')
print(' DATABASE_PATH - Path to SQLite database (default: data/krawl.db)')
print(' DATABASE_RETENTION_DAYS - Days to retain database records (default: 30)')
print(' TIMEZONE - IANA timezone for logs/dashboard (e.g., America/New_York, Europe/Rome)')
print(' If not set, system timezone will be used')
def main():
@@ -41,15 +46,27 @@ def main():
print_usage()
exit(0)
# Initialize logging
initialize_logging()
config = Config.from_env()
# Get timezone configuration
tz = config.get_timezone()
# Initialize logging with timezone
initialize_logging(timezone=tz)
app_logger = get_app_logger()
access_logger = get_access_logger()
credential_logger = get_credential_logger()
config = Config.from_env()
tracker = AccessTracker()
# Initialize database for persistent storage
try:
initialize_database(config.database_path)
app_logger.info(f'Database initialized at: {config.database_path}')
except Exception as e:
app_logger.warning(f'Database initialization failed: {e}. Continuing with in-memory only.')
tracker = AccessTracker(timezone=tz)
Handler.config = config
Handler.tracker = tracker
@@ -71,6 +88,7 @@ def main():
try:
app_logger.info(f'Starting deception server on port {config.port}...')
app_logger.info(f'Timezone configured: {tz.key}')
app_logger.info(f'Dashboard available at: {config.dashboard_secret_path}')
if config.canary_token_url:
app_logger.info(f'Canary token will appear after {config.canary_token_tries} tries')

View File

@@ -5,49 +5,67 @@ Dashboard template for viewing honeypot statistics.
Customize this template to change the dashboard appearance.
"""
import html
from datetime import datetime
def _escape(value) -> str:
"""Escape HTML special characters to prevent XSS attacks."""
if value is None:
return ""
return html.escape(str(value))
def format_timestamp(iso_timestamp: str) -> str:
"""Format ISO timestamp for display (YYYY-MM-DD HH:MM:SS)"""
try:
dt = datetime.fromisoformat(iso_timestamp)
return dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
# Fallback for old format
return iso_timestamp.split("T")[1][:8] if "T" in iso_timestamp else iso_timestamp
def generate_dashboard(stats: dict) -> str:
"""Generate dashboard HTML with access statistics"""
# Generate IP rows
# Generate IP rows (IPs are generally safe but escape for consistency)
top_ips_rows = '\n'.join([
f'<tr><td class="rank">{i+1}</td><td>{ip}</td><td>{count}</td></tr>'
f'<tr><td class="rank">{i+1}</td><td>{_escape(ip)}</td><td>{count}</td></tr>'
for i, (ip, count) in enumerate(stats['top_ips'])
]) or '<tr><td colspan="3" style="text-align:center;">No data</td></tr>'
# Generate paths rows
# Generate paths rows (CRITICAL: paths can contain XSS payloads)
top_paths_rows = '\n'.join([
f'<tr><td class="rank">{i+1}</td><td>{path}</td><td>{count}</td></tr>'
f'<tr><td class="rank">{i+1}</td><td>{_escape(path)}</td><td>{count}</td></tr>'
for i, (path, count) in enumerate(stats['top_paths'])
]) or '<tr><td colspan="3" style="text-align:center;">No data</td></tr>'
# Generate User-Agent rows
# Generate User-Agent rows (CRITICAL: user agents can contain XSS payloads)
top_ua_rows = '\n'.join([
f'<tr><td class="rank">{i+1}</td><td style="word-break: break-all;">{ua[:80]}</td><td>{count}</td></tr>'
f'<tr><td class="rank">{i+1}</td><td style="word-break: break-all;">{_escape(ua[:80])}</td><td>{count}</td></tr>'
for i, (ua, count) in enumerate(stats['top_user_agents'])
]) or '<tr><td colspan="3" style="text-align:center;">No data</td></tr>'
# Generate suspicious accesses rows
# Generate suspicious accesses rows (CRITICAL: multiple user-controlled fields)
suspicious_rows = '\n'.join([
f'<tr><td>{log["ip"]}</td><td>{log["path"]}</td><td style="word-break: break-all;">{log["user_agent"][:60]}</td><td>{log["timestamp"].split("T")[1][:8]}</td></tr>'
f'<tr><td>{_escape(log["ip"])}</td><td>{_escape(log["path"])}</td><td style="word-break: break-all;">{_escape(log["user_agent"][:60])}</td><td>{_escape(log["timestamp"].split("T")[1][:8])}</td></tr>'
for log in stats['recent_suspicious'][-10:]
]) or '<tr><td colspan="4" style="text-align:center;">No suspicious activity detected</td></tr>'
# Generate honeypot triggered IPs rows
honeypot_rows = '\n'.join([
f'<tr><td>{ip}</td><td style="word-break: break-all;">{", ".join(paths)}</td><td>{len(paths)}</td></tr>'
f'<tr><td>{_escape(ip)}</td><td style="word-break: break-all;">{_escape(", ".join(paths))}</td><td>{len(paths)}</td></tr>'
for ip, paths in stats.get('honeypot_triggered_ips', [])
]) or '<tr><td colspan="3" style="text-align:center;">No honeypot triggers yet</td></tr>'
# Generate attack types rows
# Generate attack types rows (CRITICAL: paths and user agents are user-controlled)
attack_type_rows = '\n'.join([
f'<tr><td>{log["ip"]}</td><td>{log["path"]}</td><td>{", ".join(log["attack_types"])}</td><td style="word-break: break-all;">{log["user_agent"][:60]}</td><td>{log["timestamp"].split("T")[1][:8]}</td></tr>'
f'<tr><td>{_escape(log["ip"])}</td><td>{_escape(log["path"])}</td><td>{_escape(", ".join(log["attack_types"]))}</td><td style="word-break: break-all;">{_escape(log["user_agent"][:60])}</td><td>{_escape(log["timestamp"].split("T")[1][:8])}</td></tr>'
for log in stats.get('attack_types', [])[-10:]
]) or '<tr><td colspan="4" style="text-align:center;">No attacks detected</td></tr>'
# Generate credential attempts rows
# Generate credential attempts rows (CRITICAL: usernames and passwords are user-controlled)
credential_rows = '\n'.join([
f'<tr><td>{log["ip"]}</td><td>{log["username"]}</td><td>{log["password"]}</td><td>{log["path"]}</td><td>{log["timestamp"].split("T")[1][:8]}</td></tr>'
f'<tr><td>{_escape(log["ip"])}</td><td>{_escape(log["username"])}</td><td>{_escape(log["password"])}</td><td>{_escape(log["path"])}</td><td>{_escape(log["timestamp"].split("T")[1][:8])}</td></tr>'
for log in stats.get('credential_attempts', [])[-20:]
]) or '<tr><td colspan="5" style="text-align:center;">No credentials captured yet</td></tr>'
@@ -172,7 +190,7 @@ def generate_dashboard(stats: dict) -> str:
</div>
<div class="table-container alert-section">
<h2>🍯 Honeypot Triggers</h2>
<h2>🍯 Honeypot Triggers by IP</h2>
<table>
<thead>
<tr>

View File

@@ -1,21 +1,36 @@
#!/usr/bin/env python3
from typing import Dict, List, Tuple
from typing import Dict, List, Tuple, Optional
from collections import defaultdict
from datetime import datetime
from zoneinfo import ZoneInfo
import re
import urllib.parse
from wordlists import get_wordlists
from database import get_database, DatabaseManager
class AccessTracker:
"""Track IP addresses and paths accessed"""
def __init__(self):
"""
Track IP addresses and paths accessed.
Maintains in-memory structures for fast dashboard access and
persists data to SQLite for long-term storage and analysis.
"""
def __init__(self, db_manager: Optional[DatabaseManager] = None, timezone: Optional[ZoneInfo] = None):
"""
Initialize the access tracker.
Args:
db_manager: Optional DatabaseManager for persistence.
If None, will use the global singleton.
"""
self.ip_counts: Dict[str, int] = defaultdict(int)
self.path_counts: Dict[str, int] = defaultdict(int)
self.user_agent_counts: Dict[str, int] = defaultdict(int)
self.access_log: List[Dict] = []
self.credential_attempts: List[Dict] = []
self.timezone = timezone or ZoneInfo('UTC')
self.suspicious_patterns = [
'bot', 'crawler', 'spider', 'scraper', 'curl', 'wget', 'python-requests',
'scanner', 'nikto', 'sqlmap', 'nmap', 'masscan', 'nessus', 'acunetix',
@@ -39,6 +54,25 @@ class AccessTracker:
# Track IPs that accessed honeypot paths from robots.txt
self.honeypot_triggered: Dict[str, List[str]] = defaultdict(list)
# Database manager for persistence (lazily initialized)
self._db_manager = db_manager
@property
def db(self) -> Optional[DatabaseManager]:
"""
Get the database manager, lazily initializing if needed.
Returns:
DatabaseManager instance or None if not available
"""
if self._db_manager is None:
try:
self._db_manager = get_database()
except Exception:
# Database not initialized, persistence disabled
pass
return self._db_manager
def parse_credentials(self, post_data: str) -> Tuple[str, str]:
"""
Parse username and password from POST data.
@@ -81,36 +115,77 @@ class AccessTracker:
return username, password
def record_credential_attempt(self, ip: str, path: str, username: str, password: str):
"""Record a credential login attempt"""
"""
Record a credential login attempt.
Stores in both in-memory list and SQLite database.
"""
# In-memory storage for dashboard
self.credential_attempts.append({
'ip': ip,
'path': path,
'username': username,
'password': password,
'timestamp': datetime.now().isoformat()
'timestamp': datetime.now(self.timezone).isoformat()
})
def record_access(self, ip: str, path: str, user_agent: str = '', body: str = ''):
"""Record an access attempt"""
# Persist to database
if self.db:
try:
self.db.persist_credential(
ip=ip,
path=path,
username=username,
password=password
)
except Exception:
# Don't crash if database persistence fails
pass
def record_access(
self,
ip: str,
path: str,
user_agent: str = '',
body: str = '',
method: str = 'GET'
):
"""
Record an access attempt.
Stores in both in-memory structures and SQLite database.
Args:
ip: Client IP address
path: Requested path
user_agent: Client user agent string
body: Request body (for POST/PUT)
method: HTTP method
"""
self.ip_counts[ip] += 1
self.path_counts[path] += 1
if user_agent:
self.user_agent_counts[user_agent] += 1
# path attack type detection
# Path attack type detection
attack_findings = self.detect_attack_type(path)
# post / put data
# POST/PUT body attack detection
if len(body) > 0:
attack_findings.extend(self.detect_attack_type(body))
is_suspicious = self.is_suspicious_user_agent(user_agent) or self.is_honeypot_path(path) or len(attack_findings) > 0
is_suspicious = (
self.is_suspicious_user_agent(user_agent) or
self.is_honeypot_path(path) or
len(attack_findings) > 0
)
is_honeypot = self.is_honeypot_path(path)
# Track if this IP accessed a honeypot path
if self.is_honeypot_path(path):
if is_honeypot:
self.honeypot_triggered[ip].append(path)
# In-memory storage for dashboard
self.access_log.append({
'ip': ip,
'path': path,
@@ -118,9 +193,25 @@ class AccessTracker:
'suspicious': is_suspicious,
'honeypot_triggered': self.is_honeypot_path(path),
'attack_types':attack_findings,
'timestamp': datetime.now().isoformat()
'timestamp': datetime.now(self.timezone).isoformat()
})
# Persist to database
if self.db:
try:
self.db.persist_access(
ip=ip,
path=path,
user_agent=user_agent,
method=method,
is_suspicious=is_suspicious,
is_honeypot_trigger=is_honeypot,
attack_types=attack_findings if attack_findings else None
)
except Exception:
# Don't crash if database persistence fails
pass
def detect_attack_type(self, data:str) -> list[str]:
"""
Returns a list of all attack types found in path data
@@ -190,21 +281,20 @@ class AccessTracker:
return [(ip, paths) for ip, paths in self.honeypot_triggered.items()]
def get_stats(self) -> Dict:
"""Get statistics summary"""
suspicious_count = sum(1 for log in self.access_log if log.get('suspicious', False))
honeypot_count = sum(1 for log in self.access_log if log.get('honeypot_triggered', False))
return {
'total_accesses': len(self.access_log),
'unique_ips': len(self.ip_counts),
'unique_paths': len(self.path_counts),
'suspicious_accesses': suspicious_count,
'honeypot_triggered': honeypot_count,
'honeypot_ips': len(self.honeypot_triggered),
'top_ips': self.get_top_ips(10),
'top_paths': self.get_top_paths(10),
'top_user_agents': self.get_top_user_agents(10),
'recent_suspicious': self.get_suspicious_accesses(20),
'honeypot_triggered_ips': self.get_honeypot_triggered_ips(),
'attack_types': self.get_attack_type_accesses(20),
'credential_attempts': self.credential_attempts[-50:] # Last 50 attempts
}
"""Get statistics summary from database."""
if not self.db:
raise RuntimeError("Database not available for dashboard stats")
# Get aggregate counts from database
stats = self.db.get_dashboard_counts()
# Add detailed lists from database
stats['top_ips'] = self.db.get_top_ips(10)
stats['top_paths'] = self.db.get_top_paths(10)
stats['top_user_agents'] = self.db.get_top_user_agents(10)
stats['recent_suspicious'] = self.db.get_recent_suspicious(20)
stats['honeypot_triggered_ips'] = self.db.get_honeypot_triggered_ips()
stats['attack_types'] = self.db.get_recent_attacks(20)
stats['credential_attempts'] = self.db.get_credential_attempts(limit=50)
return stats

View File

@@ -57,7 +57,8 @@ class Wordlists:
},
"users": {
"roles": ["Administrator", "User"]
}
},
"server_headers": ["Apache/2.4.41 (Ubuntu)", "nginx/1.18.0"]
}
@property
@@ -124,6 +125,9 @@ class Wordlists:
def server_errors(self):
return self._data.get("server_errors", {})
def server_headers(self):
return self._data.get("server_headers", [])
_wordlists_instance = None

150
tests/test_credentials.sh Executable file
View File

@@ -0,0 +1,150 @@
#!/bin/bash
# This script sends various POST requests with credentials to the honeypot
GREEN='\033[0;32m'
BLUE='\033[0;34m'
YELLOW='\033[1;33m'
RED='\033[0;31m'
NC='\033[0m'
# Configuration
HOST="localhost"
PORT="5000"
BASE_URL="http://${HOST}:${PORT}"
echo -e "${BLUE}========================================${NC}"
echo -e "${BLUE}Krawl Credential Logging Test Script${NC}"
echo -e "${BLUE}========================================${NC}\n"
# Check if server is running
echo -e "${YELLOW}Checking if server is running on ${BASE_URL}...${NC}"
if ! curl -s -f "${BASE_URL}/health" > /dev/null 2>&1; then
echo -e "${RED}❌ Server is not running. Please start the Krawl server first.${NC}"
echo -e "${YELLOW}Run: python3 src/server.py${NC}"
exit 1
fi
echo -e "${GREEN}✓ Server is running${NC}\n"
# Test 1: Simple login form POST
echo -e "${YELLOW}Test 1: POST to /login with form data${NC}"
curl -s -X POST "${BASE_URL}/login" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "username=admin&password=admin123" \
> /dev/null
echo -e "${GREEN}✓ Sent: admin / admin123${NC}\n"
sleep 1
# Test 2: Admin panel login
echo -e "${YELLOW}Test 2: POST to /admin with credentials${NC}"
curl -s -X POST "${BASE_URL}/admin" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "user=root&pass=toor&submit=Login" \
> /dev/null
echo -e "${GREEN}✓ Sent: root / toor${NC}\n"
sleep 1
# Test 3: WordPress login attempt
echo -e "${YELLOW}Test 3: POST to /wp-login.php${NC}"
curl -s -X POST "${BASE_URL}/wp-login.php" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "log=wpuser&pwd=Password1&wp-submit=Log+In" \
> /dev/null
echo -e "${GREEN}✓ Sent: wpuser / Password1${NC}\n"
sleep 1
# Test 4: JSON formatted credentials
echo -e "${YELLOW}Test 4: POST to /api/login with JSON${NC}"
curl -s -X POST "${BASE_URL}/api/login" \
-H "Content-Type: application/json" \
-d '{"username":"apiuser","password":"apipass123","remember":true}' \
> /dev/null
echo -e "${GREEN}✓ Sent: apiuser / apipass123${NC}\n"
sleep 1
# Test 5: SSH-style login
echo -e "${YELLOW}Test 5: POST to /ssh with credentials${NC}"
curl -s -X POST "${BASE_URL}/ssh" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "username=sshuser&password=P@ssw0rd!" \
> /dev/null
echo -e "${GREEN}✓ Sent: sshuser / P@ssw0rd!${NC}\n"
sleep 1
# Test 6: Database admin
echo -e "${YELLOW}Test 6: POST to /phpmyadmin with credentials${NC}"
curl -s -X POST "${BASE_URL}/phpmyadmin" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "pma_username=dbadmin&pma_password=dbpass123&server=1" \
> /dev/null
echo -e "${GREEN}✓ Sent: dbadmin / dbpass123${NC}\n"
sleep 1
# Test 7: Multiple fields with email
echo -e "${YELLOW}Test 7: POST to /register with email${NC}"
curl -s -X POST "${BASE_URL}/register" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "email=test@example.com&username=newuser&password=NewPass123&confirm_password=NewPass123" \
> /dev/null
echo -e "${GREEN}✓ Sent: newuser / NewPass123 (email: test@example.com)${NC}\n"
sleep 1
# Test 8: FTP credentials
echo -e "${YELLOW}Test 8: POST to /ftp/login${NC}"
curl -s -X POST "${BASE_URL}/ftp/login" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "ftpuser=ftpadmin&ftppass=ftp123456" \
> /dev/null
echo -e "${GREEN}✓ Sent: ftpadmin / ftp123456${NC}\n"
sleep 1
# Test 9: Common brute force attempt
echo -e "${YELLOW}Test 9: Multiple attempts (simulating brute force)${NC}"
for i in {1..3}; do
curl -s -X POST "${BASE_URL}/login" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "username=admin&password=pass${i}" \
> /dev/null
echo -e "${GREEN}✓ Attempt $i: admin / pass${i}${NC}"
sleep 0.5
done
echo ""
sleep 1
# Test 10: Special characters in credentials
echo -e "${YELLOW}Test 10: POST with special characters${NC}"
curl -s -X POST "${BASE_URL}/login" \
-H "Content-Type: application/x-www-form-urlencoded" \
--data-urlencode "username=user@domain.com" \
--data-urlencode "password=P@\$\$w0rd!#%" \
> /dev/null
echo -e "${GREEN}✓ Sent: user@domain.com / P@\$\$w0rd!#%${NC}\n"
echo -e "${BLUE}========================================${NC}"
echo -e "${GREEN}✓ All credential tests completed!${NC}"
echo -e "${BLUE}========================================${NC}\n"
echo -e "${YELLOW}Check the results:${NC}"
echo -e " 1. View the log file: ${GREEN}tail -20 logs/credentials.log${NC}"
echo -e " 2. View the dashboard: ${GREEN}${BASE_URL}/dashboard${NC}"
echo -e " 3. Check recent logs: ${GREEN}tail -20 logs/access.log ${NC}\n"
# Display last 10 credential entries if log file exists
if [ -f "src/logs/credentials.log" ]; then
echo -e "${BLUE}========================================${NC}"
echo -e "${BLUE}Last 10 Captured Credentials:${NC}"
echo -e "${BLUE}========================================${NC}"
tail -10 src/logs/credentials.log
echo ""
fi
echo -e "${YELLOW}💡 Tip: Open ${BASE_URL}/dashboard in your browser to see the credentials in real-time!${NC}"