Merge pull request #30 from BlessedRebuS/feat/background-tasks

Feat/background tasks
This commit is contained in:
Patrick Di Fazio
2026-01-09 20:39:38 +01:00
committed by GitHub
13 changed files with 443 additions and 15 deletions

3
.gitignore vendored
View File

@@ -76,3 +76,6 @@ data/
# Personal canary tokens or sensitive configs
*canary*token*.yaml
personal-values.yaml
#exports dir (keeping .gitkeep so we have the dir)
/exports/*

View File

@@ -17,7 +17,7 @@ COPY entrypoint.sh /app/
COPY config.yaml /app/
RUN useradd -m -u 1000 krawl && \
mkdir -p /app/logs /app/data && \
mkdir -p /app/logs /app/data /app/exports && \
chown -R krawl:krawl /app && \
chmod +x /app/entrypoint.sh

View File

@@ -12,6 +12,7 @@ services:
- ./wordlists.json:/app/wordlists.json:ro
- ./config.yaml:/app/config.yaml:ro
- ./logs:/app/logs
- ./exports:/app/exports
environment:
- CONFIG_LOCATION=config.yaml
restart: unless-stopped

View File

@@ -2,7 +2,7 @@
set -e
# Fix ownership of mounted directories
chown -R krawl:krawl /app/logs /app/data 2>/dev/null || true
chown -R krawl:krawl /app/logs /app/data /app/exports 2>/dev/null || true
# Drop to krawl user and run the application
exec gosu krawl "$@"

0
exports/.gitkeep Normal file
View File

View File

@@ -6,3 +6,6 @@ PyYAML>=6.0
# Database ORM
SQLAlchemy>=2.0.0,<3.0.0
# Scheduling
APScheduler>=3.11.2

View File

@@ -9,10 +9,13 @@ import re
import urllib.parse
from wordlists import get_wordlists
from config import get_config
from logger import get_app_logger
"""
Functions for user activity analysis
"""
app_logger = get_app_logger()
class Analyzer:
"""
Analyzes users activity and produces aggregated insights
@@ -57,7 +60,7 @@ class Analyzer:
attack_urls_threshold = config.attack_urls_threshold
uneven_request_timing_time_window_seconds = config.uneven_request_timing_time_window_seconds
print(f"http_risky_methods_threshold: {http_risky_methods_threshold}")
app_logger.debug(f"http_risky_methods_threshold: {http_risky_methods_threshold}")
score = {}
score["attacker"] = {"risky_http_methods": False, "robots_violations": False, "uneven_request_timing": False, "different_user_agents": False, "attack_url": False}
@@ -196,7 +199,7 @@ class Analyzer:
variance = sum((x - mean) ** 2 for x in time_diffs) / len(time_diffs)
std = variance ** 0.5
cv = std/mean
print(f"Mean: {mean} - Variance {variance} - Standard Deviation {std} - Coefficient of Variation: {cv}")
app_logger.debug(f"Mean: {mean} - Variance {variance} - Standard Deviation {std} - Coefficient of Variation: {cv}")
if cv >= uneven_request_timing_threshold:
score["attacker"]["uneven_request_timing"] = True
@@ -291,10 +294,13 @@ class Analyzer:
regular_user_score = regular_user_score + score["regular_user"]["different_user_agents"] * weights["regular_user"]["different_user_agents"]
regular_user_score = regular_user_score + score["regular_user"]["attack_url"] * weights["regular_user"]["attack_url"]
print(f"Attacker score: {attacker_score}")
print(f"Good Crawler score: {good_crawler_score}")
print(f"Bad Crawler score: {bad_crawler_score}")
print(f"Regular User score: {regular_user_score}")
score_details = f"""
Attacker score: {attacker_score}
Good Crawler score: {good_crawler_score}
Bad Crawler score: {bad_crawler_score}
Regular User score: {regular_user_score}
"""
app_logger.debug(score_details)
analyzed_metrics = {"risky_http_methods": http_method_attacker_score, "robots_violations": violated_robots_ratio, "uneven_request_timing": mean, "different_user_agents": user_agents_used, "attack_url": attack_urls_found_list}
category_scores = {"attacker": attacker_score, "good_crawler": good_crawler_score, "bad_crawler": bad_crawler_score, "regular_user": regular_user_score}

View File

@@ -23,6 +23,9 @@ from sanitizer import (
sanitize_attack_pattern,
)
from logger import get_app_logger
applogger = get_app_logger()
class DatabaseManager:
"""
@@ -155,7 +158,7 @@ class DatabaseManager:
except Exception as e:
session.rollback()
# Log error but don't crash - database persistence is secondary to honeypot function
print(f"Database error persisting access: {e}")
applogger.critical(f"Database error persisting access: {e}")
return None
finally:
self.close_session()
@@ -194,7 +197,7 @@ class DatabaseManager:
except Exception as e:
session.rollback()
print(f"Database error persisting credential: {e}")
applogger.critical(f"Database error persisting credential: {e}")
return None
finally:
self.close_session()
@@ -237,7 +240,8 @@ class DatabaseManager:
last_analysis: timestamp of last analysis
"""
print(f"Analyzed metrics {analyzed_metrics}, category {category}, category scores {category_scores}, last analysis {last_analysis}")
applogger.debug(f"Analyzed metrics {analyzed_metrics}, category {category}, category scores {category_scores}, last analysis {last_analysis}")
applogger.info(f"IP: {ip} category has been updated to {category}")
session = self.session
sanitized_ip = sanitize_ip(ip)
@@ -314,7 +318,7 @@ class DatabaseManager:
session.commit()
except Exception as e:
session.rollback()
print(f"Error recording category change: {e}")
applogger.error(f"Error recording category change: {e}")
def get_category_history(self, ip: str) -> List[Dict[str, Any]]:
"""

View File

@@ -408,7 +408,8 @@ class Handler(BaseHTTPRequestHandler):
try:
stats = self.tracker.get_stats()
timezone = str(self.config.timezone) if self.config.timezone else 'UTC'
self.wfile.write(generate_dashboard(stats, timezone).encode())
dashboard_path = self.config.dashboard_secret_path
self.wfile.write(generate_dashboard(stats, timezone, dashboard_path).encode())
except BrokenPipeError:
pass
except Exception as e:
@@ -442,6 +443,35 @@ class Handler(BaseHTTPRequestHandler):
self.wfile.write(json.dumps({'error': str(e)}).encode())
return
# API endpoint for downloading malicious IPs file
if self.config.dashboard_secret_path and self.path == f"{self.config.dashboard_secret_path}/api/download/malicious_ips.txt":
import os
file_path = os.path.join(os.path.dirname(__file__), 'exports', 'malicious_ips.txt')
try:
if os.path.exists(file_path):
with open(file_path, 'rb') as f:
content = f.read()
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.send_header('Content-Disposition', 'attachment; filename="malicious_ips.txt"')
self.send_header('Content-Length', str(len(content)))
self.end_headers()
self.wfile.write(content)
else:
self.send_response(404)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(b'File not found')
except BrokenPipeError:
pass
except Exception as e:
self.app_logger.error(f"Error serving malicious IPs file: {e}")
self.send_response(500)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(b'Internal server error')
return
self.tracker.record_access(client_ip, self.path, user_agent, method='GET')
self.analyzer.infer_user_category(client_ip)

View File

@@ -14,6 +14,7 @@ from analyzer import Analyzer
from handler import Handler
from logger import initialize_logging, get_app_logger, get_access_logger, get_credential_logger
from database import initialize_database
from tasks_master import get_tasksmaster
def print_usage():
@@ -92,6 +93,10 @@ def main():
except IOError:
app_logger.warning("Can't read input file. Using randomly generated links.")
# tasks master init
tasks_master = get_tasksmaster()
tasks_master.run_scheduled_tasks()
try:
app_logger.info(f'Starting deception server on port {config.port}...')
app_logger.info(f'Timezone configured: {tz.key}')

View File

@@ -0,0 +1,57 @@
# tasks/export_malicious_ips.py
import os
from logger import get_app_logger
from database import get_database
from models import AccessLog
from sqlalchemy import distinct
app_logger = get_app_logger()
# ----------------------
# TASK CONFIG
# ----------------------
TASK_CONFIG = {
"name": "export-malicious-ips",
"cron": "*/5 * * * *",
"enabled": True,
"run_when_loaded": True
}
EXPORTS_DIR = "exports"
OUTPUT_FILE = os.path.join(EXPORTS_DIR, "malicious_ips.txt")
# ----------------------
# TASK LOGIC
# ----------------------
def main():
"""
Export all IPs flagged as suspicious to a text file.
TasksMaster will call this function based on the cron schedule.
"""
task_name = TASK_CONFIG.get("name")
app_logger.info(f"[Background Task] {task_name} starting...")
try:
db = get_database()
session = db.session
# Query distinct suspicious IPs
results = session.query(distinct(AccessLog.ip)).filter(
AccessLog.is_suspicious == True
).all()
# Ensure exports directory exists
os.makedirs(EXPORTS_DIR, exist_ok=True)
# Write IPs to file (one per line)
with open(OUTPUT_FILE, 'w') as f:
for (ip,) in results:
f.write(f"{ip}\n")
app_logger.info(f"[Background Task] {task_name} exported {len(results)} IPs to {OUTPUT_FILE}")
except Exception as e:
app_logger.error(f"[Background Task] {task_name} failed: {e}")
finally:
db.close_session()

288
src/tasks_master.py Normal file
View File

@@ -0,0 +1,288 @@
import os
import sys
import datetime
import functools
import threading
import importlib
import importlib.util
from logger import initialize_logging, get_app_logger, get_access_logger, get_credential_logger
app_logger = get_app_logger()
try:
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
except ModuleNotFoundError:
msg = (
"Required modules are not installed. "
"Can not continue with module / application loading.\n"
"Install it with: pip install -r requirements"
)
print(msg, file=sys.stderr)
app_logger.error(msg)
exit()
# ---------- TASKSMASTER CLASS ----------
class TasksMaster:
TASK_DEFAULT_CRON = '*/15 * * * *'
TASK_JITTER = 240
TASKS_FOLDER = os.path.join(os.path.dirname(__file__), "tasks")
def __init__(self, scheduler: BackgroundScheduler):
self.tasks = self._config_tasks()
self.scheduler = scheduler
self.last_run_times = {}
self.scheduler.add_listener(self.job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
def _config_tasks(self):
"""
Loads tasks from the TASKS_FOLDER and logs how many were found.
"""
tasks_defined = self._load_tasks_from_folder(self.TASKS_FOLDER)
app_logger.info(f"Scheduled Tasks Loaded from folder: {self.TASKS_FOLDER}")
return tasks_defined
def _load_tasks_from_folder(self, folder_path):
"""
Loads and registers task modules from a specified folder.
This function scans the given folder for Python (.py) files, dynamically
imports each as a module, and looks for two attributes:
- TASK_CONFIG: A dictionary containing task metadata, specifically the
'name' and 'cron' (cron schedule string).
- main: A callable function that represents the task's execution logic.
Tasks with both attributes are added to a list with their configuration and
execution function.
Args:
folder_path (str): Path to the folder containing task scripts.
Returns:
list[dict]: A list of task definitions with keys:
- 'name' (str): The name of the task.
- 'filename' (str): The file the task was loaded from.
- 'cron' (str): The crontab string for scheduling.
- 'enabled' (bool): Whether the task is enabled.
- 'run_when_loaded' (bool): Whether to run the task immediately.
"""
tasks = []
if not os.path.exists(folder_path):
app_logger.error(f"{folder_path} does not exist! Unable to load tasks!")
return tasks
# we sort the files so that we have a set order, which helps with debugging
for filename in sorted(os.listdir(folder_path)):
# skip any non python files, as well as any __pycache__ or .pyc files that might creep in there
if not filename.endswith('.py') or filename.startswith("__"):
continue
path = os.path.join(folder_path, filename)
module_name = filename[:-3]
spec = importlib.util.spec_from_file_location(f"tasks.{module_name}", path)
module = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(module)
sys.modules[f"tasks.{module_name}"] = module
except Exception as e:
app_logger.error(f"Failed to import {filename}: {e}")
continue
# if we have a tasks config and a main function, we attempt to schedule it
if hasattr(module, 'TASK_CONFIG') and hasattr(module, 'main'):
# ensure task_config is a dict
if not isinstance(module.TASK_CONFIG, dict):
app_logger.error(f"TASK_CONFIG is not a dict in {filename}. Skipping task.")
continue
task_cron = module.TASK_CONFIG.get("cron") or self.TASK_DEFAULT_CRON
task_name = module.TASK_CONFIG.get("name", module_name)
# ensure the task_cron is a valid cron value
try:
CronTrigger.from_crontab(task_cron)
except ValueError as ve:
app_logger.error(f"Invalid cron format for task {task_name}: {ve} - Skipping this task")
continue
task = {
'name': module.TASK_CONFIG.get('name', module_name),
'filename': filename,
'cron': task_cron,
"enabled": module.TASK_CONFIG.get("enabled", False),
"run_when_loaded": module.TASK_CONFIG.get("run_when_loaded", False)
}
tasks.append(task)
# we are missing things, and we log what's missing
else:
if not hasattr(module, 'TASK_CONFIG'):
app_logger.warning(f"Missing TASK_CONFIG in {filename}")
elif not hasattr(module, 'main'):
app_logger.warning(f"Missing main() in {filename}")
return tasks
def _add_jobs(self):
# for each task in the tasks config file...
for task_to_run in self.tasks:
# remember, these tasks, are built from the "load_tasks_from_folder" function,
# if you want to pass data from the TASKS_CONFIG dict, you need to pass it there to get it here.
task_name = task_to_run.get("name")
run_when_loaded = task_to_run.get("run_when_loaded")
module_name = os.path.splitext(task_to_run.get("filename"))[0]
task_enabled = task_to_run.get("enabled", False)
# if no crontab set for this task, we use 15 as the default.
task_cron = task_to_run.get("cron") or self.TASK_DEFAULT_CRON
# if task is disabled, skip this one
if not task_enabled:
app_logger.info(f"{task_name} is disabled in client config. Skipping task")
continue
try:
if os.path.isfile(os.path.join(self.TASKS_FOLDER, task_to_run.get("filename"))):
# schedule the task now that everything has checked out above...
self._schedule_task(task_name, module_name, task_cron, run_when_loaded)
app_logger.info(f"Scheduled {module_name} cron is set to {task_cron}.", extra={"task": task_to_run})
else:
app_logger.info(f"Skipping invalid or unsafe file: {task_to_run.get('filename')}", extra={"task": task_to_run})
except Exception as e:
app_logger.error(f"Error scheduling task: {e}", extra={"tasks": task_to_run})
def _schedule_task(self, task_name, module_name, task_cron, run_when_loaded):
try:
# Dynamically import the module
module = importlib.import_module(f"tasks.{module_name}")
# Check if the module has a 'main' function
if hasattr(module, 'main'):
app_logger.info(f"Scheduling {task_name} - {module_name} Main Function")
# unique_job_id
job_identifier = f"{module_name}__{task_name}"
# little insurance to make sure the cron is set to something and not none
if task_cron is None:
task_cron = self.TASK_DEFAULT_CRON
trigger = CronTrigger.from_crontab(task_cron)
# schedule the task / job
if run_when_loaded:
app_logger.info(f"Task: {task_name} is set to run instantly. Scheduling to run on scheduler start")
self.scheduler.add_job(
module.main,
trigger,
id=job_identifier,
jitter=self.TASK_JITTER,
name=task_name,
next_run_time=datetime.datetime.now(),
max_instances=1
)
else:
self.scheduler.add_job(
module.main,
trigger,
id=job_identifier,
jitter=self.TASK_JITTER,
name=task_name,
max_instances=1
)
else:
app_logger.error(f"{module_name} does not define a 'main' function.")
except Exception as e:
app_logger.error(f"Failed to load {module_name}: {e}")
def job_listener(self, event):
job_id = event.job_id
self.last_run_times[job_id] = datetime.datetime.now()
if event.exception:
app_logger.error(f"Job {event.job_id} failed: {event.exception}")
else:
app_logger.info(f"Job {event.job_id} completed successfully.")
def list_jobs(self):
scheduled_jobs = self.scheduler.get_jobs()
jobs_list = []
for job in scheduled_jobs:
jobs_list.append({
"id": job.id,
"name": job.name,
"next_run": job.next_run_time,
})
return jobs_list
def run_scheduled_tasks(self):
"""
Runs and schedules enabled tasks using the background scheduler.
This method performs the following:
1. Retrieves the current task configurations and updates internal state.
2. Adds new jobs to the scheduler based on the latest configuration.
3. Starts the scheduler to begin executing tasks at their defined intervals.
This ensures the scheduler is always running with the most up-to-date
task definitions and enabled status.
"""
# Add enabled tasks to the scheduler
self._add_jobs()
# Start the scheduler to begin executing the scheduled tasks (if not already running)
if not self.scheduler.running:
self.scheduler.start()
# ---------- SINGLETON WRAPPER ----------
T = type
def singleton_loader(func):
"""Decorator to ensure only one instance exists."""
cache: dict[str, T] = {}
lock = threading.Lock()
@functools.wraps(func)
def wrapper(*args, **kwargs) -> T:
with lock:
if func.__name__ not in cache:
cache[func.__name__] = func(*args, **kwargs)
return cache[func.__name__]
return wrapper
@singleton_loader
def get_tasksmaster(scheduler: BackgroundScheduler | None = None) -> TasksMaster:
"""
Returns the singleton TasksMaster instance.
- Automatically creates a BackgroundScheduler if none is provided.
- Automatically starts the scheduler when the singleton is created.
:param scheduler: Optional APScheduler instance. If None, a new BackgroundScheduler will be created.
"""
if scheduler is None:
scheduler = BackgroundScheduler()
tm_instance = TasksMaster(scheduler)
# Auto-start scheduler if not already running
if not scheduler.running:
scheduler.start()
app_logger.info("TasksMaster scheduler started automatically with singleton creation.")
return tm_instance

View File

@@ -38,12 +38,13 @@ def format_timestamp(iso_timestamp: str, timezone: str = 'UTC', time_only: bool
return iso_timestamp.split("T")[1][:8] if "T" in iso_timestamp else iso_timestamp
def generate_dashboard(stats: dict, timezone: str = 'UTC') -> str:
def generate_dashboard(stats: dict, timezone: str = 'UTC', dashboard_path: str = '') -> str:
"""Generate dashboard HTML with access statistics
Args:
stats: Statistics dictionary
timezone: IANA timezone string (e.g., 'Europe/Paris', 'America/New_York')
dashboard_path: The secret dashboard path for generating API URLs
"""
# Generate IP rows with clickable functionality for dropdown stats
@@ -164,12 +165,36 @@ def generate_dashboard(stats: dict, timezone: str = 'UTC') -> str:
.container {{
max-width: 1400px;
margin: 0 auto;
position: relative;
}}
h1 {{
color: #58a6ff;
text-align: center;
margin-bottom: 40px;
}}
.download-section {{
position: absolute;
top: 0;
right: 0;
}}
.download-btn {{
display: inline-block;
padding: 8px 14px;
background: #238636;
color: #ffffff;
text-decoration: none;
border-radius: 6px;
font-weight: 500;
font-size: 13px;
transition: background 0.2s;
border: 1px solid #2ea043;
}}
.download-btn:hover {{
background: #2ea043;
}}
.download-btn:active {{
background: #1f7a2f;
}}
.stats-grid {{
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
@@ -450,6 +475,11 @@ def generate_dashboard(stats: dict, timezone: str = 'UTC') -> str:
</head>
<body>
<div class="container">
<div class="download-section">
<a href="{dashboard_path}/api/download/malicious_ips.txt" class="download-btn" download>
Export Malicious IPs
</a>
</div>
<h1>Krawl Dashboard</h1>
<div class="stats-grid">
@@ -599,6 +629,7 @@ def generate_dashboard(stats: dict, timezone: str = 'UTC') -> str:
<script>
// Server timezone configuration
const SERVER_TIMEZONE = '{timezone}';
const DASHBOARD_PATH = '{dashboard_path}';
// Convert UTC timestamp to configured timezone
function formatTimestamp(isoTimestamp) {{
@@ -704,7 +735,7 @@ def generate_dashboard(stats: dict, timezone: str = 'UTC') -> str:
if (dropdown) {{
dropdown.innerHTML = '<div class="loading">Loading stats...</div>';
try {{
const response = await fetch(`${{window.location.pathname}}/api/ip-stats/${{ip}}`, {{
const response = await fetch(`${{DASHBOARD_PATH}}/api/ip-stats/${{ip}}`, {{
cache: 'no-store',
headers: {{
'Cache-Control': 'no-cache',