Fix/export malicious ip file creation (#62)

* Fix: update EXPORTS_DIR path to ensure correct directory structure

* Fix: remove unused imports and honeypot access check from export task
This commit is contained in:
Lorenzo Venerandi
2026-01-29 13:54:07 +01:00
committed by GitHub
parent a778262701
commit d3caa99ecc

View File

@@ -1,14 +1,11 @@
# tasks/export_malicious_ips.py
import os
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from logger import get_app_logger
from database import get_database
from config import get_config
from models import AccessLog, IpStats
from ip_utils import is_local_or_private_ip, is_valid_public_ip
from sqlalchemy import distinct
from models import IpStats
from ip_utils import is_valid_public_ip
app_logger = get_app_logger()
@@ -29,19 +26,6 @@ OUTPUT_FILE = os.path.join(EXPORTS_DIR, "malicious_ips.txt")
# ----------------------
# TASK LOGIC
# ----------------------
def has_recent_honeypot_access(session, minutes: int = 5) -> bool:
"""Check if honeypot was accessed in the last N minutes."""
cutoff_time = datetime.now() - timedelta(minutes=minutes)
count = (
session.query(AccessLog)
.filter(
AccessLog.is_honeypot_trigger == True, AccessLog.timestamp >= cutoff_time
)
.count()
)
return count > 0
def main():
"""
Export all attacker IPs to a text file, matching the "Attackers by Total Requests" dashboard table.
@@ -55,13 +39,6 @@ def main():
db = get_database()
session = db.session
# Check for recent honeypot activity
if not has_recent_honeypot_access(session):
app_logger.info(
f"[Background Task] {task_name} skipped - no honeypot access in last 5 minutes"
)
return
# Query attacker IPs from IpStats (same as dashboard "Attackers by Total Requests")
attackers = (
session.query(IpStats)