58 lines
1.5 KiB
Python
58 lines
1.5 KiB
Python
|
|
# tasks/export_malicious_ips.py
|
||
|
|
|
||
|
|
import os
|
||
|
|
from logger import get_app_logger
|
||
|
|
from database import get_database
|
||
|
|
from models import AccessLog
|
||
|
|
from sqlalchemy import distinct
|
||
|
|
|
||
|
|
app_logger = get_app_logger()
|
||
|
|
|
||
|
|
# ----------------------
|
||
|
|
# TASK CONFIG
|
||
|
|
# ----------------------
|
||
|
|
TASK_CONFIG = {
|
||
|
|
"name": "export-malicious-ips",
|
||
|
|
"cron": "*/5 * * * *",
|
||
|
|
"enabled": True,
|
||
|
|
"run_when_loaded": True
|
||
|
|
}
|
||
|
|
|
||
|
|
EXPORTS_DIR = "exports"
|
||
|
|
OUTPUT_FILE = os.path.join(EXPORTS_DIR, "malicious_ips.txt")
|
||
|
|
|
||
|
|
# ----------------------
|
||
|
|
# TASK LOGIC
|
||
|
|
# ----------------------
|
||
|
|
def main():
|
||
|
|
"""
|
||
|
|
Export all IPs flagged as suspicious to a text file.
|
||
|
|
TasksMaster will call this function based on the cron schedule.
|
||
|
|
"""
|
||
|
|
task_name = TASK_CONFIG.get("name")
|
||
|
|
app_logger.info(f"[Background Task] {task_name} starting...")
|
||
|
|
|
||
|
|
try:
|
||
|
|
db = get_database()
|
||
|
|
session = db.session
|
||
|
|
|
||
|
|
# Query distinct suspicious IPs
|
||
|
|
results = session.query(distinct(AccessLog.ip)).filter(
|
||
|
|
AccessLog.is_suspicious == True
|
||
|
|
).all()
|
||
|
|
|
||
|
|
# Ensure exports directory exists
|
||
|
|
os.makedirs(EXPORTS_DIR, exist_ok=True)
|
||
|
|
|
||
|
|
# Write IPs to file (one per line)
|
||
|
|
with open(OUTPUT_FILE, 'w') as f:
|
||
|
|
for (ip,) in results:
|
||
|
|
f.write(f"{ip}\n")
|
||
|
|
|
||
|
|
app_logger.info(f"[Background Task] {task_name} exported {len(results)} IPs to {OUTPUT_FILE}")
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
app_logger.error(f"[Background Task] {task_name} failed: {e}")
|
||
|
|
finally:
|
||
|
|
db.close_session()
|