Optimize scheduled tasks to reduce unnecessary processing
- Add conditional execution to export-malicious-ips task: only runs
when honeypot was accessed in last 5 minutes
- Add since_minutes parameter to get_access_logs() for time filtering
- Optimize analyze-ips task to only process IPs with activity in the
last minute, fetching full history per-IP instead of all logs
- Exclude RFC1918 private addresses and non-routable IPs from IP
reputation enrichment (10.x, 172.16-31.x, 192.168.x, 127.x, 169.254.x)
This commit is contained in:
@@ -7,7 +7,7 @@ Provides SQLAlchemy session management and database initialization.
|
|||||||
|
|
||||||
import os
|
import os
|
||||||
import stat
|
import stat
|
||||||
from datetime import datetime
|
from datetime import datetime, timedelta
|
||||||
from typing import Optional, List, Dict, Any
|
from typing import Optional, List, Dict, Any
|
||||||
from zoneinfo import ZoneInfo
|
from zoneinfo import ZoneInfo
|
||||||
|
|
||||||
@@ -390,6 +390,7 @@ class DatabaseManager:
|
|||||||
def get_unenriched_ips(self, limit: int = 100) -> List[str]:
|
def get_unenriched_ips(self, limit: int = 100) -> List[str]:
|
||||||
"""
|
"""
|
||||||
Get IPs that don't have reputation data yet.
|
Get IPs that don't have reputation data yet.
|
||||||
|
Excludes RFC1918 private addresses and other non-routable IPs.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
limit: Maximum number of IPs to return
|
limit: Maximum number of IPs to return
|
||||||
@@ -400,7 +401,18 @@ class DatabaseManager:
|
|||||||
session = self.session
|
session = self.session
|
||||||
try:
|
try:
|
||||||
ips = session.query(IpStats.ip).filter(
|
ips = session.query(IpStats.ip).filter(
|
||||||
IpStats.country_code.is_(None)
|
IpStats.country_code.is_(None),
|
||||||
|
~IpStats.ip.like('10.%'),
|
||||||
|
~IpStats.ip.like('172.16.%'),
|
||||||
|
~IpStats.ip.like('172.17.%'),
|
||||||
|
~IpStats.ip.like('172.18.%'),
|
||||||
|
~IpStats.ip.like('172.19.%'),
|
||||||
|
~IpStats.ip.like('172.2_.%'),
|
||||||
|
~IpStats.ip.like('172.30.%'),
|
||||||
|
~IpStats.ip.like('172.31.%'),
|
||||||
|
~IpStats.ip.like('192.168.%'),
|
||||||
|
~IpStats.ip.like('127.%'),
|
||||||
|
~IpStats.ip.like('169.254.%')
|
||||||
).limit(limit).all()
|
).limit(limit).all()
|
||||||
return [ip[0] for ip in ips]
|
return [ip[0] for ip in ips]
|
||||||
finally:
|
finally:
|
||||||
@@ -411,7 +423,8 @@ class DatabaseManager:
|
|||||||
limit: int = 100,
|
limit: int = 100,
|
||||||
offset: int = 0,
|
offset: int = 0,
|
||||||
ip_filter: Optional[str] = None,
|
ip_filter: Optional[str] = None,
|
||||||
suspicious_only: bool = False
|
suspicious_only: bool = False,
|
||||||
|
since_minutes: Optional[int] = None
|
||||||
) -> List[Dict[str, Any]]:
|
) -> List[Dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
Retrieve access logs with optional filtering.
|
Retrieve access logs with optional filtering.
|
||||||
@@ -421,6 +434,7 @@ class DatabaseManager:
|
|||||||
offset: Number of records to skip
|
offset: Number of records to skip
|
||||||
ip_filter: Filter by IP address
|
ip_filter: Filter by IP address
|
||||||
suspicious_only: Only return suspicious requests
|
suspicious_only: Only return suspicious requests
|
||||||
|
since_minutes: Only return logs from the last N minutes
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
List of access log dictionaries
|
List of access log dictionaries
|
||||||
@@ -433,6 +447,9 @@ class DatabaseManager:
|
|||||||
query = query.filter(AccessLog.ip == sanitize_ip(ip_filter))
|
query = query.filter(AccessLog.ip == sanitize_ip(ip_filter))
|
||||||
if suspicious_only:
|
if suspicious_only:
|
||||||
query = query.filter(AccessLog.is_suspicious == True)
|
query = query.filter(AccessLog.is_suspicious == True)
|
||||||
|
if since_minutes is not None:
|
||||||
|
cutoff_time = datetime.now(tz=ZoneInfo('UTC')) - timedelta(minutes=since_minutes)
|
||||||
|
query = query.filter(AccessLog.timestamp >= cutoff_time)
|
||||||
|
|
||||||
logs = query.offset(offset).limit(limit).all()
|
logs = query.offset(offset).limit(limit).all()
|
||||||
|
|
||||||
|
|||||||
@@ -73,12 +73,18 @@ def main():
|
|||||||
"attack_url": 0
|
"attack_url": 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
accesses = db_manager.get_access_logs(limit=999999999)
|
# Get IPs with recent activity (last minute to match cron schedule)
|
||||||
ips = {item['ip'] for item in accesses}
|
recent_accesses = db_manager.get_access_logs(limit=999999999, since_minutes=1)
|
||||||
|
ips_to_analyze = {item['ip'] for item in recent_accesses}
|
||||||
|
|
||||||
for ip in ips:
|
if not ips_to_analyze:
|
||||||
ip_accesses = [item for item in accesses if item["ip"] == ip]
|
app_logger.debug("[Background Task] analyze-ips: No recent activity, skipping")
|
||||||
total_accesses_count = len(accesses)
|
return
|
||||||
|
|
||||||
|
for ip in ips_to_analyze:
|
||||||
|
# Get full history for this IP to perform accurate analysis
|
||||||
|
ip_accesses = db_manager.get_access_logs(limit=999999999, ip_filter=ip)
|
||||||
|
total_accesses_count = len(ip_accesses)
|
||||||
if total_accesses_count <= 0:
|
if total_accesses_count <= 0:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ def main():
|
|||||||
|
|
||||||
# Only get IPs that haven't been enriched yet
|
# Only get IPs that haven't been enriched yet
|
||||||
unenriched_ips = db_manager.get_unenriched_ips(limit=50)
|
unenriched_ips = db_manager.get_unenriched_ips(limit=50)
|
||||||
|
app_logger.info(f"{len(unenriched_ips)} IP's need to be have reputation enrichment.")
|
||||||
for ip in unenriched_ips:
|
for ip in unenriched_ips:
|
||||||
try:
|
try:
|
||||||
api_url = "https://iprep.lcrawl.com/api/iprep/"
|
api_url = "https://iprep.lcrawl.com/api/iprep/"
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
# tasks/export_malicious_ips.py
|
# tasks/export_malicious_ips.py
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from zoneinfo import ZoneInfo
|
||||||
from logger import get_app_logger
|
from logger import get_app_logger
|
||||||
from database import get_database
|
from database import get_database
|
||||||
from models import AccessLog
|
from models import AccessLog
|
||||||
@@ -24,6 +26,15 @@ OUTPUT_FILE = os.path.join(EXPORTS_DIR, "malicious_ips.txt")
|
|||||||
# ----------------------
|
# ----------------------
|
||||||
# TASK LOGIC
|
# TASK LOGIC
|
||||||
# ----------------------
|
# ----------------------
|
||||||
|
def has_recent_honeypot_access(session, minutes: int = 5) -> bool:
|
||||||
|
"""Check if honeypot was accessed in the last N minutes."""
|
||||||
|
cutoff_time = datetime.now(tz=ZoneInfo('UTC')) - timedelta(minutes=minutes)
|
||||||
|
count = session.query(AccessLog).filter(
|
||||||
|
AccessLog.is_honeypot_trigger == True,
|
||||||
|
AccessLog.timestamp >= cutoff_time
|
||||||
|
).count()
|
||||||
|
return count > 0
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
"""
|
"""
|
||||||
Export all IPs flagged as suspicious to a text file.
|
Export all IPs flagged as suspicious to a text file.
|
||||||
@@ -36,6 +47,11 @@ def main():
|
|||||||
db = get_database()
|
db = get_database()
|
||||||
session = db.session
|
session = db.session
|
||||||
|
|
||||||
|
# Check for recent honeypot activity
|
||||||
|
if not has_recent_honeypot_access(session):
|
||||||
|
app_logger.info(f"[Background Task] {task_name} skipped - no honeypot access in last 5 minutes")
|
||||||
|
return
|
||||||
|
|
||||||
# Query distinct suspicious IPs
|
# Query distinct suspicious IPs
|
||||||
results = session.query(distinct(AccessLog.ip)).filter(
|
results = session.query(distinct(AccessLog.ip)).filter(
|
||||||
AccessLog.is_suspicious == True
|
AccessLog.is_suspicious == True
|
||||||
|
|||||||
Reference in New Issue
Block a user