Merge branch 'dev' into feat/background-tasks
This commit is contained in:
@@ -6,6 +6,7 @@ from zoneinfo import ZoneInfo
|
||||
from pathlib import Path
|
||||
from datetime import datetime, timedelta
|
||||
import re
|
||||
import urllib.parse
|
||||
from wordlists import get_wordlists
|
||||
from config import get_config
|
||||
from logger import get_app_logger
|
||||
@@ -104,6 +105,15 @@ class Analyzer:
|
||||
total_accesses_count = len(accesses)
|
||||
if total_accesses_count <= 0:
|
||||
return
|
||||
|
||||
# Set category as "unknown" for the first 5 requests
|
||||
if total_accesses_count < 3:
|
||||
category = "unknown"
|
||||
analyzed_metrics = {}
|
||||
category_scores = {"attacker": 0, "good_crawler": 0, "bad_crawler": 0, "regular_user": 0, "unknown": 0}
|
||||
last_analysis = datetime.now(tz=ZoneInfo('UTC'))
|
||||
self._db_manager.update_ip_stats_analysis(ip, analyzed_metrics, category, category_scores, last_analysis)
|
||||
return 0
|
||||
|
||||
#--------------------- HTTP Methods ---------------------
|
||||
|
||||
@@ -150,7 +160,7 @@ class Analyzer:
|
||||
robots_disallows.append(parts[1].strip())
|
||||
|
||||
#if 0 100% sure is good crawler, if >10% of robots violated is bad crawler or attacker
|
||||
violated_robots_count = len([item for item in accesses if item["path"].rstrip("/") in tuple(robots_disallows)])
|
||||
violated_robots_count = len([item for item in accesses if any(item["path"].rstrip("/").startswith(disallow) for disallow in robots_disallows)])
|
||||
#print(f"Violated robots count: {violated_robots_count}")
|
||||
if total_accesses_count > 0:
|
||||
violated_robots_ratio = violated_robots_count / total_accesses_count
|
||||
@@ -171,7 +181,8 @@ class Analyzer:
|
||||
#--------------------- Requests Timing ---------------------
|
||||
#Request rate and timing: steady, throttled, polite vs attackers' bursty, aggressive, or oddly rhythmic behavior
|
||||
timestamps = [datetime.fromisoformat(item["timestamp"]) for item in accesses]
|
||||
timestamps = [ts for ts in timestamps if datetime.utcnow() - ts <= timedelta(seconds=uneven_request_timing_time_window_seconds)]
|
||||
now_utc = datetime.now(tz=ZoneInfo('UTC'))
|
||||
timestamps = [ts for ts in timestamps if now_utc - ts <= timedelta(seconds=uneven_request_timing_time_window_seconds)]
|
||||
timestamps = sorted(timestamps, reverse=True)
|
||||
|
||||
time_diffs = []
|
||||
@@ -224,13 +235,25 @@ class Analyzer:
|
||||
attack_urls_found_list = []
|
||||
|
||||
wl = get_wordlists()
|
||||
if wl.attack_urls:
|
||||
if wl.attack_patterns:
|
||||
queried_paths = [item["path"] for item in accesses]
|
||||
|
||||
for queried_path in queried_paths:
|
||||
for name, pattern in wl.attack_urls.items():
|
||||
if re.search(pattern, queried_path, re.IGNORECASE):
|
||||
attack_urls_found_list.append(pattern)
|
||||
# URL decode the path to catch encoded attacks
|
||||
try:
|
||||
decoded_path = urllib.parse.unquote(queried_path)
|
||||
# Double decode to catch double-encoded attacks
|
||||
decoded_path_twice = urllib.parse.unquote(decoded_path)
|
||||
except Exception:
|
||||
decoded_path = queried_path
|
||||
decoded_path_twice = queried_path
|
||||
|
||||
for name, pattern in wl.attack_patterns.items():
|
||||
# Check original, decoded, and double-decoded paths
|
||||
if (re.search(pattern, queried_path, re.IGNORECASE) or
|
||||
re.search(pattern, decoded_path, re.IGNORECASE) or
|
||||
re.search(pattern, decoded_path_twice, re.IGNORECASE)):
|
||||
attack_urls_found_list.append(f"{name}: {pattern}")
|
||||
|
||||
if len(attack_urls_found_list) > attack_urls_threshold:
|
||||
score["attacker"]["attack_url"] = True
|
||||
@@ -282,7 +305,7 @@ class Analyzer:
|
||||
analyzed_metrics = {"risky_http_methods": http_method_attacker_score, "robots_violations": violated_robots_ratio, "uneven_request_timing": mean, "different_user_agents": user_agents_used, "attack_url": attack_urls_found_list}
|
||||
category_scores = {"attacker": attacker_score, "good_crawler": good_crawler_score, "bad_crawler": bad_crawler_score, "regular_user": regular_user_score}
|
||||
category = max(category_scores, key=category_scores.get)
|
||||
last_analysis = datetime.utcnow()
|
||||
last_analysis = datetime.now(tz=ZoneInfo('UTC'))
|
||||
|
||||
self._db_manager.update_ip_stats_analysis(ip, analyzed_metrics, category, category_scores, last_analysis)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user