made ip analysis and ip rep info fetch a scheduled task

This commit is contained in:
Leonardo Bambini
2026-01-10 14:53:31 +01:00
22 changed files with 1833 additions and 288 deletions

View File

@@ -6,8 +6,10 @@ from zoneinfo import ZoneInfo
from pathlib import Path
from datetime import datetime, timedelta
import re
import urllib.parse
from wordlists import get_wordlists
from config import get_config
from logger import get_app_logger
import requests
from sanitizer import sanitize_for_storage, sanitize_dict
@@ -15,6 +17,8 @@ from sanitizer import sanitize_for_storage, sanitize_dict
Functions for user activity analysis
"""
app_logger = get_app_logger()
class Analyzer:
"""
Analyzes users activity and produces aggregated insights
@@ -48,272 +52,299 @@ class Analyzer:
pass
return self._db_manager
def infer_user_category(self, ip: str) -> str:
# def infer_user_category(self, ip: str) -> str:
config = get_config()
# config = get_config()
http_risky_methods_threshold = config.http_risky_methods_threshold
violated_robots_threshold = config.violated_robots_threshold
uneven_request_timing_threshold = config.uneven_request_timing_threshold
user_agents_used_threshold = config.user_agents_used_threshold
attack_urls_threshold = config.attack_urls_threshold
uneven_request_timing_time_window_seconds = config.uneven_request_timing_time_window_seconds
# http_risky_methods_threshold = config.http_risky_methods_threshold
# violated_robots_threshold = config.violated_robots_threshold
# uneven_request_timing_threshold = config.uneven_request_timing_threshold
# user_agents_used_threshold = config.user_agents_used_threshold
# attack_urls_threshold = config.attack_urls_threshold
# uneven_request_timing_time_window_seconds = config.uneven_request_timing_time_window_seconds
print(f"http_risky_methods_threshold: {http_risky_methods_threshold}")
# app_logger.debug(f"http_risky_methods_threshold: {http_risky_methods_threshold}")
score = {}
score["attacker"] = {"risky_http_methods": False, "robots_violations": False, "uneven_request_timing": False, "different_user_agents": False, "attack_url": False}
score["good_crawler"] = {"risky_http_methods": False, "robots_violations": False, "uneven_request_timing": False, "different_user_agents": False, "attack_url": False}
score["bad_crawler"] = {"risky_http_methods": False, "robots_violations": False, "uneven_request_timing": False, "different_user_agents": False, "attack_url": False}
score["regular_user"] = {"risky_http_methods": False, "robots_violations": False, "uneven_request_timing": False, "different_user_agents": False, "attack_url": False}
# score = {}
# score["attacker"] = {"risky_http_methods": False, "robots_violations": False, "uneven_request_timing": False, "different_user_agents": False, "attack_url": False}
# score["good_crawler"] = {"risky_http_methods": False, "robots_violations": False, "uneven_request_timing": False, "different_user_agents": False, "attack_url": False}
# score["bad_crawler"] = {"risky_http_methods": False, "robots_violations": False, "uneven_request_timing": False, "different_user_agents": False, "attack_url": False}
# score["regular_user"] = {"risky_http_methods": False, "robots_violations": False, "uneven_request_timing": False, "different_user_agents": False, "attack_url": False}
#1-3 low, 4-6 mid, 7-9 high, 10-20 extreme
weights = {
"attacker": {
"risky_http_methods": 6,
"robots_violations": 4,
"uneven_request_timing": 3,
"different_user_agents": 8,
"attack_url": 15
},
"good_crawler": {
"risky_http_methods": 1,
"robots_violations": 0,
"uneven_request_timing": 0,
"different_user_agents": 0,
"attack_url": 0
},
"bad_crawler": {
"risky_http_methods": 2,
"robots_violations": 7,
"uneven_request_timing": 0,
"different_user_agents": 5,
"attack_url": 5
},
"regular_user": {
"risky_http_methods": 0,
"robots_violations": 0,
"uneven_request_timing": 8,
"different_user_agents": 3,
"attack_url": 0
}
}
# #1-3 low, 4-6 mid, 7-9 high, 10-20 extreme
# weights = {
# "attacker": {
# "risky_http_methods": 6,
# "robots_violations": 4,
# "uneven_request_timing": 3,
# "different_user_agents": 8,
# "attack_url": 15
# },
# "good_crawler": {
# "risky_http_methods": 1,
# "robots_violations": 0,
# "uneven_request_timing": 0,
# "different_user_agents": 0,
# "attack_url": 0
# },
# "bad_crawler": {
# "risky_http_methods": 2,
# "robots_violations": 7,
# "uneven_request_timing": 0,
# "different_user_agents": 5,
# "attack_url": 5
# },
# "regular_user": {
# "risky_http_methods": 0,
# "robots_violations": 0,
# "uneven_request_timing": 8,
# "different_user_agents": 3,
# "attack_url": 0
# }
# }
accesses = self.db.get_access_logs(ip_filter = ip, limit=1000)
total_accesses_count = len(accesses)
if total_accesses_count <= 0:
return
# accesses = self.db.get_access_logs(ip_filter = ip, limit=1000)
# total_accesses_count = len(accesses)
# if total_accesses_count <= 0:
# return
# # Set category as "unknown" for the first 5 requests
# if total_accesses_count < 3:
# category = "unknown"
# analyzed_metrics = {}
# category_scores = {"attacker": 0, "good_crawler": 0, "bad_crawler": 0, "regular_user": 0, "unknown": 0}
# last_analysis = datetime.now(tz=ZoneInfo('UTC'))
# self._db_manager.update_ip_stats_analysis(ip, analyzed_metrics, category, category_scores, last_analysis)
# return 0
#--------------------- HTTP Methods ---------------------
# #--------------------- HTTP Methods ---------------------
get_accesses_count = len([item for item in accesses if item["method"] == "GET"])
post_accesses_count = len([item for item in accesses if item["method"] == "POST"])
put_accesses_count = len([item for item in accesses if item["method"] == "PUT"])
delete_accesses_count = len([item for item in accesses if item["method"] == "DELETE"])
head_accesses_count = len([item for item in accesses if item["method"] == "HEAD"])
options_accesses_count = len([item for item in accesses if item["method"] == "OPTIONS"])
patch_accesses_count = len([item for item in accesses if item["method"] == "PATCH"])
# get_accesses_count = len([item for item in accesses if item["method"] == "GET"])
# post_accesses_count = len([item for item in accesses if item["method"] == "POST"])
# put_accesses_count = len([item for item in accesses if item["method"] == "PUT"])
# delete_accesses_count = len([item for item in accesses if item["method"] == "DELETE"])
# head_accesses_count = len([item for item in accesses if item["method"] == "HEAD"])
# options_accesses_count = len([item for item in accesses if item["method"] == "OPTIONS"])
# patch_accesses_count = len([item for item in accesses if item["method"] == "PATCH"])
if total_accesses_count > http_risky_methods_threshold:
http_method_attacker_score = (post_accesses_count + put_accesses_count + delete_accesses_count + options_accesses_count + patch_accesses_count) / total_accesses_count
else:
http_method_attacker_score = 0
# if total_accesses_count > http_risky_methods_threshold:
# http_method_attacker_score = (post_accesses_count + put_accesses_count + delete_accesses_count + options_accesses_count + patch_accesses_count) / total_accesses_count
# else:
# http_method_attacker_score = 0
#print(f"HTTP Method attacker score: {http_method_attacker_score}")
if http_method_attacker_score >= http_risky_methods_threshold:
score["attacker"]["risky_http_methods"] = True
score["good_crawler"]["risky_http_methods"] = False
score["bad_crawler"]["risky_http_methods"] = True
score["regular_user"]["risky_http_methods"] = False
else:
score["attacker"]["risky_http_methods"] = False
score["good_crawler"]["risky_http_methods"] = True
score["bad_crawler"]["risky_http_methods"] = False
score["regular_user"]["risky_http_methods"] = False
# #print(f"HTTP Method attacker score: {http_method_attacker_score}")
# if http_method_attacker_score >= http_risky_methods_threshold:
# score["attacker"]["risky_http_methods"] = True
# score["good_crawler"]["risky_http_methods"] = False
# score["bad_crawler"]["risky_http_methods"] = True
# score["regular_user"]["risky_http_methods"] = False
# else:
# score["attacker"]["risky_http_methods"] = False
# score["good_crawler"]["risky_http_methods"] = True
# score["bad_crawler"]["risky_http_methods"] = False
# score["regular_user"]["risky_http_methods"] = False
#--------------------- Robots Violations ---------------------
#respect robots.txt and login/config pages access frequency
robots_disallows = []
robots_path = Path(__file__).parent / "templates" / "html" / "robots.txt"
with open(robots_path, "r") as f:
for line in f:
line = line.strip()
if not line:
continue
parts = line.split(":")
# #--------------------- Robots Violations ---------------------
# #respect robots.txt and login/config pages access frequency
# robots_disallows = []
# robots_path = Path(__file__).parent / "templates" / "html" / "robots.txt"
# with open(robots_path, "r") as f:
# for line in f:
# line = line.strip()
# if not line:
# continue
# parts = line.split(":")
if parts[0] == "Disallow":
parts[1] = parts[1].rstrip("/")
#print(f"DISALLOW {parts[1]}")
robots_disallows.append(parts[1].strip())
# if parts[0] == "Disallow":
# parts[1] = parts[1].rstrip("/")
# #print(f"DISALLOW {parts[1]}")
# robots_disallows.append(parts[1].strip())
#if 0 100% sure is good crawler, if >10% of robots violated is bad crawler or attacker
violated_robots_count = len([item for item in accesses if item["path"].rstrip("/") in tuple(robots_disallows)])
#print(f"Violated robots count: {violated_robots_count}")
if total_accesses_count > 0:
violated_robots_ratio = violated_robots_count / total_accesses_count
else:
violated_robots_ratio = 0
# #if 0 100% sure is good crawler, if >10% of robots violated is bad crawler or attacker
# violated_robots_count = len([item for item in accesses if any(item["path"].rstrip("/").startswith(disallow) for disallow in robots_disallows)])
# #print(f"Violated robots count: {violated_robots_count}")
# if total_accesses_count > 0:
# violated_robots_ratio = violated_robots_count / total_accesses_count
# else:
# violated_robots_ratio = 0
if violated_robots_ratio >= violated_robots_threshold:
score["attacker"]["robots_violations"] = True
score["good_crawler"]["robots_violations"] = False
score["bad_crawler"]["robots_violations"] = True
score["regular_user"]["robots_violations"] = False
else:
score["attacker"]["robots_violations"] = False
score["good_crawler"]["robots_violations"] = False
score["bad_crawler"]["robots_violations"] = False
score["regular_user"]["robots_violations"] = False
# if violated_robots_ratio >= violated_robots_threshold:
# score["attacker"]["robots_violations"] = True
# score["good_crawler"]["robots_violations"] = False
# score["bad_crawler"]["robots_violations"] = True
# score["regular_user"]["robots_violations"] = False
# else:
# score["attacker"]["robots_violations"] = False
# score["good_crawler"]["robots_violations"] = False
# score["bad_crawler"]["robots_violations"] = False
# score["regular_user"]["robots_violations"] = False
#--------------------- Requests Timing ---------------------
#Request rate and timing: steady, throttled, polite vs attackers' bursty, aggressive, or oddly rhythmic behavior
timestamps = [datetime.fromisoformat(item["timestamp"]) for item in accesses]
timestamps = [ts for ts in timestamps if datetime.utcnow() - ts <= timedelta(seconds=uneven_request_timing_time_window_seconds)]
timestamps = sorted(timestamps, reverse=True)
# #--------------------- Requests Timing ---------------------
# #Request rate and timing: steady, throttled, polite vs attackers' bursty, aggressive, or oddly rhythmic behavior
# timestamps = [datetime.fromisoformat(item["timestamp"]) for item in accesses]
# now_utc = datetime.now(tz=ZoneInfo('UTC'))
# timestamps = [ts for ts in timestamps if now_utc - ts <= timedelta(seconds=uneven_request_timing_time_window_seconds)]
# timestamps = sorted(timestamps, reverse=True)
time_diffs = []
for i in range(0, len(timestamps)-1):
diff = (timestamps[i] - timestamps[i+1]).total_seconds()
time_diffs.append(diff)
# time_diffs = []
# for i in range(0, len(timestamps)-1):
# diff = (timestamps[i] - timestamps[i+1]).total_seconds()
# time_diffs.append(diff)
mean = 0
variance = 0
std = 0
cv = 0
if time_diffs:
mean = sum(time_diffs) / len(time_diffs)
variance = sum((x - mean) ** 2 for x in time_diffs) / len(time_diffs)
std = variance ** 0.5
cv = std/mean
print(f"Mean: {mean} - Variance {variance} - Standard Deviation {std} - Coefficient of Variation: {cv}")
# mean = 0
# variance = 0
# std = 0
# cv = 0
# if time_diffs:
# mean = sum(time_diffs) / len(time_diffs)
# variance = sum((x - mean) ** 2 for x in time_diffs) / len(time_diffs)
# std = variance ** 0.5
# cv = std/mean
# app_logger.debug(f"Mean: {mean} - Variance {variance} - Standard Deviation {std} - Coefficient of Variation: {cv}")
if cv >= uneven_request_timing_threshold:
score["attacker"]["uneven_request_timing"] = True
score["good_crawler"]["uneven_request_timing"] = False
score["bad_crawler"]["uneven_request_timing"] = False
score["regular_user"]["uneven_request_timing"] = True
else:
score["attacker"]["uneven_request_timing"] = False
score["good_crawler"]["uneven_request_timing"] = False
score["bad_crawler"]["uneven_request_timing"] = False
score["regular_user"]["uneven_request_timing"] = False
# if cv >= uneven_request_timing_threshold:
# score["attacker"]["uneven_request_timing"] = True
# score["good_crawler"]["uneven_request_timing"] = False
# score["bad_crawler"]["uneven_request_timing"] = False
# score["regular_user"]["uneven_request_timing"] = True
# else:
# score["attacker"]["uneven_request_timing"] = False
# score["good_crawler"]["uneven_request_timing"] = False
# score["bad_crawler"]["uneven_request_timing"] = False
# score["regular_user"]["uneven_request_timing"] = False
#--------------------- Different User Agents ---------------------
#Header Quality and Consistency: Crawlers tend to use complete and consistent headers, attackers might miss, fake, or change headers
user_agents_used = [item["user_agent"] for item in accesses]
user_agents_used = list(dict.fromkeys(user_agents_used))
#print(f"User agents used: {user_agents_used}")
# #--------------------- Different User Agents ---------------------
# #Header Quality and Consistency: Crawlers tend to use complete and consistent headers, attackers might miss, fake, or change headers
# user_agents_used = [item["user_agent"] for item in accesses]
# user_agents_used = list(dict.fromkeys(user_agents_used))
# #print(f"User agents used: {user_agents_used}")
if len(user_agents_used) >= user_agents_used_threshold:
score["attacker"]["different_user_agents"] = True
score["good_crawler"]["different_user_agents"] = False
score["bad_crawler"]["different_user_agentss"] = True
score["regular_user"]["different_user_agents"] = False
else:
score["attacker"]["different_user_agents"] = False
score["good_crawler"]["different_user_agents"] = False
score["bad_crawler"]["different_user_agents"] = False
score["regular_user"]["different_user_agents"] = False
# if len(user_agents_used) >= user_agents_used_threshold:
# score["attacker"]["different_user_agents"] = True
# score["good_crawler"]["different_user_agents"] = False
# score["bad_crawler"]["different_user_agentss"] = True
# score["regular_user"]["different_user_agents"] = False
# else:
# score["attacker"]["different_user_agents"] = False
# score["good_crawler"]["different_user_agents"] = False
# score["bad_crawler"]["different_user_agents"] = False
# score["regular_user"]["different_user_agents"] = False
#--------------------- Attack URLs ---------------------
# #--------------------- Attack URLs ---------------------
attack_urls_found_list = []
# attack_urls_found_list = []
wl = get_wordlists()
if wl.attack_urls:
queried_paths = [item["path"] for item in accesses]
# wl = get_wordlists()
# if wl.attack_patterns:
# queried_paths = [item["path"] for item in accesses]
for queried_path in queried_paths:
for name, pattern in wl.attack_urls.items():
if re.search(pattern, queried_path, re.IGNORECASE):
attack_urls_found_list.append(pattern)
# for queried_path in queried_paths:
# # URL decode the path to catch encoded attacks
# try:
# decoded_path = urllib.parse.unquote(queried_path)
# # Double decode to catch double-encoded attacks
# decoded_path_twice = urllib.parse.unquote(decoded_path)
# except Exception:
# decoded_path = queried_path
# decoded_path_twice = queried_path
# for name, pattern in wl.attack_patterns.items():
# # Check original, decoded, and double-decoded paths
# if (re.search(pattern, queried_path, re.IGNORECASE) or
# re.search(pattern, decoded_path, re.IGNORECASE) or
# re.search(pattern, decoded_path_twice, re.IGNORECASE)):
# attack_urls_found_list.append(f"{name}: {pattern}")
#remove duplicates
attack_urls_found_list = set(attack_urls_found_list)
attack_urls_found_list = list(attack_urls_found_list)
# #remove duplicates
# attack_urls_found_list = set(attack_urls_found_list)
# attack_urls_found_list = list(attack_urls_found_list)
if len(attack_urls_found_list) > attack_urls_threshold:
score["attacker"]["attack_url"] = True
score["good_crawler"]["attack_url"] = False
score["bad_crawler"]["attack_url"] = False
score["regular_user"]["attack_url"] = False
else:
score["attacker"]["attack_url"] = False
score["good_crawler"]["attack_url"] = False
score["bad_crawler"]["attack_url"] = False
score["regular_user"]["attack_url"] = False
# if len(attack_urls_found_list) > attack_urls_threshold:
# score["attacker"]["attack_url"] = True
# score["good_crawler"]["attack_url"] = False
# score["bad_crawler"]["attack_url"] = False
# score["regular_user"]["attack_url"] = False
# else:
# score["attacker"]["attack_url"] = False
# score["good_crawler"]["attack_url"] = False
# score["bad_crawler"]["attack_url"] = False
# score["regular_user"]["attack_url"] = False
#--------------------- Calculate score ---------------------
# #--------------------- Calculate score ---------------------
attacker_score = good_crawler_score = bad_crawler_score = regular_user_score = 0
# attacker_score = good_crawler_score = bad_crawler_score = regular_user_score = 0
attacker_score = score["attacker"]["risky_http_methods"] * weights["attacker"]["risky_http_methods"]
attacker_score = attacker_score + score["attacker"]["robots_violations"] * weights["attacker"]["robots_violations"]
attacker_score = attacker_score + score["attacker"]["uneven_request_timing"] * weights["attacker"]["uneven_request_timing"]
attacker_score = attacker_score + score["attacker"]["different_user_agents"] * weights["attacker"]["different_user_agents"]
attacker_score = attacker_score + score["attacker"]["attack_url"] * weights["attacker"]["attack_url"]
# attacker_score = score["attacker"]["risky_http_methods"] * weights["attacker"]["risky_http_methods"]
# attacker_score = attacker_score + score["attacker"]["robots_violations"] * weights["attacker"]["robots_violations"]
# attacker_score = attacker_score + score["attacker"]["uneven_request_timing"] * weights["attacker"]["uneven_request_timing"]
# attacker_score = attacker_score + score["attacker"]["different_user_agents"] * weights["attacker"]["different_user_agents"]
# attacker_score = attacker_score + score["attacker"]["attack_url"] * weights["attacker"]["attack_url"]
good_crawler_score = score["good_crawler"]["risky_http_methods"] * weights["good_crawler"]["risky_http_methods"]
good_crawler_score = good_crawler_score + score["good_crawler"]["robots_violations"] * weights["good_crawler"]["robots_violations"]
good_crawler_score = good_crawler_score + score["good_crawler"]["uneven_request_timing"] * weights["good_crawler"]["uneven_request_timing"]
good_crawler_score = good_crawler_score + score["good_crawler"]["different_user_agents"] * weights["good_crawler"]["different_user_agents"]
good_crawler_score = good_crawler_score + score["good_crawler"]["attack_url"] * weights["good_crawler"]["attack_url"]
# good_crawler_score = score["good_crawler"]["risky_http_methods"] * weights["good_crawler"]["risky_http_methods"]
# good_crawler_score = good_crawler_score + score["good_crawler"]["robots_violations"] * weights["good_crawler"]["robots_violations"]
# good_crawler_score = good_crawler_score + score["good_crawler"]["uneven_request_timing"] * weights["good_crawler"]["uneven_request_timing"]
# good_crawler_score = good_crawler_score + score["good_crawler"]["different_user_agents"] * weights["good_crawler"]["different_user_agents"]
# good_crawler_score = good_crawler_score + score["good_crawler"]["attack_url"] * weights["good_crawler"]["attack_url"]
bad_crawler_score = score["bad_crawler"]["risky_http_methods"] * weights["bad_crawler"]["risky_http_methods"]
bad_crawler_score = bad_crawler_score + score["bad_crawler"]["robots_violations"] * weights["bad_crawler"]["robots_violations"]
bad_crawler_score = bad_crawler_score + score["bad_crawler"]["uneven_request_timing"] * weights["bad_crawler"]["uneven_request_timing"]
bad_crawler_score = bad_crawler_score + score["bad_crawler"]["different_user_agents"] * weights["bad_crawler"]["different_user_agents"]
bad_crawler_score = bad_crawler_score + score["bad_crawler"]["attack_url"] * weights["bad_crawler"]["attack_url"]
# bad_crawler_score = score["bad_crawler"]["risky_http_methods"] * weights["bad_crawler"]["risky_http_methods"]
# bad_crawler_score = bad_crawler_score + score["bad_crawler"]["robots_violations"] * weights["bad_crawler"]["robots_violations"]
# bad_crawler_score = bad_crawler_score + score["bad_crawler"]["uneven_request_timing"] * weights["bad_crawler"]["uneven_request_timing"]
# bad_crawler_score = bad_crawler_score + score["bad_crawler"]["different_user_agents"] * weights["bad_crawler"]["different_user_agents"]
# bad_crawler_score = bad_crawler_score + score["bad_crawler"]["attack_url"] * weights["bad_crawler"]["attack_url"]
regular_user_score = score["regular_user"]["risky_http_methods"] * weights["regular_user"]["risky_http_methods"]
regular_user_score = regular_user_score + score["regular_user"]["robots_violations"] * weights["regular_user"]["robots_violations"]
regular_user_score = regular_user_score + score["regular_user"]["uneven_request_timing"] * weights["regular_user"]["uneven_request_timing"]
regular_user_score = regular_user_score + score["regular_user"]["different_user_agents"] * weights["regular_user"]["different_user_agents"]
regular_user_score = regular_user_score + score["regular_user"]["attack_url"] * weights["regular_user"]["attack_url"]
# regular_user_score = score["regular_user"]["risky_http_methods"] * weights["regular_user"]["risky_http_methods"]
# regular_user_score = regular_user_score + score["regular_user"]["robots_violations"] * weights["regular_user"]["robots_violations"]
# regular_user_score = regular_user_score + score["regular_user"]["uneven_request_timing"] * weights["regular_user"]["uneven_request_timing"]
# regular_user_score = regular_user_score + score["regular_user"]["different_user_agents"] * weights["regular_user"]["different_user_agents"]
# regular_user_score = regular_user_score + score["regular_user"]["attack_url"] * weights["regular_user"]["attack_url"]
print(f"Attacker score: {attacker_score}")
print(f"Good Crawler score: {good_crawler_score}")
print(f"Bad Crawler score: {bad_crawler_score}")
print(f"Regular User score: {regular_user_score}")
# score_details = f"""
# Attacker score: {attacker_score}
# Good Crawler score: {good_crawler_score}
# Bad Crawler score: {bad_crawler_score}
# Regular User score: {regular_user_score}
# """
# app_logger.debug(score_details)
analyzed_metrics = {"risky_http_methods": http_method_attacker_score, "robots_violations": violated_robots_ratio, "uneven_request_timing": mean, "different_user_agents": user_agents_used, "attack_url": attack_urls_found_list}
category_scores = {"attacker": attacker_score, "good_crawler": good_crawler_score, "bad_crawler": bad_crawler_score, "regular_user": regular_user_score}
category = max(category_scores, key=category_scores.get)
last_analysis = datetime.utcnow()
# analyzed_metrics = {"risky_http_methods": http_method_attacker_score, "robots_violations": violated_robots_ratio, "uneven_request_timing": mean, "different_user_agents": user_agents_used, "attack_url": attack_urls_found_list}
# category_scores = {"attacker": attacker_score, "good_crawler": good_crawler_score, "bad_crawler": bad_crawler_score, "regular_user": regular_user_score}
# category = max(category_scores, key=category_scores.get)
# last_analysis = datetime.now(tz=ZoneInfo('UTC'))
self._db_manager.update_ip_stats_analysis(ip, analyzed_metrics, category, category_scores, last_analysis)
# self._db_manager.update_ip_stats_analysis(ip, analyzed_metrics, category, category_scores, last_analysis)
return 0
# return 0
def update_ip_rep_infos(self, ip: str) -> list[str]:
api_url = "https://iprep.lcrawl.com/api/iprep/"
params = {
"cidr": ip
}
headers = {
"Content-Type": "application/json"
}
response = requests.get(api_url, headers=headers, params=params)
payload = response.json()
if payload["results"]:
data = payload["results"][0]
# def update_ip_rep_infos(self, ip: str) -> list[str]:
# api_url = "https://iprep.lcrawl.com/api/iprep/"
# params = {
# "cidr": ip
# }
# headers = {
# "Content-Type": "application/json"
# }
country_iso_code = data["geoip_data"]["country_iso_code"]
asn = data["geoip_data"]["asn_autonomous_system_number"]
asn_org = data["geoip_data"]["asn_autonomous_system_organization"]
list_on = data["list_on"]
# response = requests.get(api_url, headers=headers, params=params)
# payload = response.json()
sanitized_country_iso_code = sanitize_for_storage(country_iso_code, 3)
sanitized_asn = sanitize_for_storage(asn, 100)
sanitized_asn_org = sanitize_for_storage(asn_org, 100)
sanitized_list_on = sanitize_dict(list_on, 100000)
# if payload["results"]:
# data = payload["results"][0]
# country_iso_code = data["geoip_data"]["country_iso_code"]
# asn = data["geoip_data"]["asn_autonomous_system_number"]
# asn_org = data["geoip_data"]["asn_autonomous_system_organization"]
# list_on = data["list_on"]
# sanitized_country_iso_code = sanitize_for_storage(country_iso_code, 3)
# sanitized_asn = sanitize_for_storage(asn, 100)
# sanitized_asn_org = sanitize_for_storage(asn_org, 100)
# sanitized_list_on = sanitize_dict(list_on, 100000)
self._db_manager.update_ip_rep_infos(ip, sanitized_country_iso_code, sanitized_asn, sanitized_asn_org, sanitized_list_on)
# self._db_manager.update_ip_rep_infos(ip, sanitized_country_iso_code, sanitized_asn, sanitized_asn_org, sanitized_list_on)
return
# return