added ip rep fetch + bug fix

This commit is contained in:
Leonardo Bambini
2026-01-07 22:56:01 +01:00
parent 190d74e1a7
commit 4f42b946f3
5 changed files with 69 additions and 33 deletions

View File

@@ -8,6 +8,9 @@ from datetime import datetime, timedelta
import re
from wordlists import get_wordlists
from config import get_config
import requests
from sanitizer import sanitize_for_storage, sanitize_dict
"""
Functions for user activity analysis
"""
@@ -228,6 +231,10 @@ class Analyzer:
for name, pattern in wl.attack_urls.items():
if re.search(pattern, queried_path, re.IGNORECASE):
attack_urls_found_list.append(pattern)
#remove duplicates
attack_urls_found_list = set(attack_urls_found_list)
attack_urls_found_list = list(attack_urls_found_list)
if len(attack_urls_found_list) > attack_urls_threshold:
score["attacker"]["attack_url"] = True
@@ -281,3 +288,32 @@ class Analyzer:
self._db_manager.update_ip_stats_analysis(ip, analyzed_metrics, category, category_scores, last_analysis)
return 0
def update_ip_rep_infos(self, ip: str) -> list[str]:
api_url = "https://iprep.lcrawl.com/api/iprep/"
params = {
"cidr": ip
}
headers = {
"Content-Type": "application/json"
}
response = requests.get(api_url, headers=headers, params=params)
payload = response.json()
if payload["results"]:
data = payload["results"][0]
country_iso_code = data["geoip_data"]["country_iso_code"]
asn = data["geoip_data"]["asn_autonomous_system_number"]
asn_org = data["geoip_data"]["asn_autonomous_system_organization"]
list_on = data["list_on"]
sanitized_country_iso_code = sanitize_for_storage(country_iso_code, 3)
sanitized_asn = sanitize_for_storage(asn, 100)
sanitized_asn_org = sanitize_for_storage(asn_org, 100)
sanitized_list_on = sanitize_dict(list_on, 100000)
self._db_manager.update_ip_rep_infos(ip, sanitized_country_iso_code, sanitized_asn, sanitized_asn_org, sanitized_list_on)
return

View File

@@ -246,7 +246,7 @@ class DatabaseManager:
ip_stats.category_scores = category_scores
ip_stats.last_analysis = last_analysis
def manual_update_category(self, ip: str, category: str) -> None:
def manual_update_category(self, ip: str, category: str) -> None:
"""
Update IP category as a result of a manual intervention by an admin
@@ -257,11 +257,36 @@ class DatabaseManager:
"""
session = self.session
sanitized_ip = sanitize_ip(ip)
ip_stats = session.query(IpStats).filter(IpStats.ip == sanitized_ip).first()
ip_stats.category = category
ip_stats.manual_category = True
def update_ip_rep_infos(self, ip: str, country_code: str, asn: str, asn_org: str, list_on: Dict[str,str]) -> None:
"""
Update IP rep stats
Args:
ip: IP address
country_code: IP address country code
asn: IP address ASN
asn_org: IP address ASN ORG
list_on: public lists containing the IP address
"""
session = self.session
sanitized_ip = sanitize_ip(ip)
ip_stats = session.query(IpStats).filter(IpStats.ip == sanitized_ip).first()
ip_stats.country_code = country_code
ip_stats.asn = asn
ip_stats.asn_org = asn_org
ip_stats.list_on = list_on
def get_access_logs(
self,
limit: int = 100,

View File

@@ -417,6 +417,7 @@ class Handler(BaseHTTPRequestHandler):
self.tracker.record_access(client_ip, self.path, user_agent, method='GET')
self.analyzer.infer_user_category(client_ip)
self.analyzer.update_ip_rep_infos(client_ip)
if self.tracker.is_suspicious_user_agent(user_agent):
self.access_logger.warning(f"[SUSPICIOUS] {client_ip} - {user_agent[:50]} - {self.path}")

View File

@@ -134,6 +134,7 @@ class IpStats(Base):
city: Mapped[Optional[str]] = mapped_column(String(MAX_CITY_LENGTH), nullable=True)
asn: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
asn_org: Mapped[Optional[str]] = mapped_column(String(MAX_ASN_ORG_LENGTH), nullable=True)
list_on: Mapped[Optional[Dict[str,str]]] = mapped_column(JSON, nullable=True)
# Reputation fields (populated by future enrichment)
reputation_score: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
@@ -149,34 +150,4 @@ class IpStats(Base):
def __repr__(self) -> str:
return f"<IpStats(ip='{self.ip}', total_requests={self.total_requests})>"
# class IpLog(Base):
# """
# Records all IPs that have accessed the honeypot, along with aggregated stats and inferred user category.
# """
# __tablename__ = 'ip_logs'
# id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
# ip: Mapped[str] = mapped_column(String(MAX_IP_LENGTH), nullable=False, index=True)
# stats: Mapped[List[str]] = mapped_column(String(MAX_PATH_LENGTH))
# category: Mapped[str] = mapped_column(String(15))
# manual_category: Mapped[bool] = mapped_column(Boolean, default=False)
# last_analysis: Mapped[datetime] = mapped_column(DateTime, index=True),
# # Relationship to attack detections
# access_logs: Mapped[List["AccessLog"]] = relationship(
# "AccessLog",
# back_populates="ip",
# cascade="all, delete-orphan"
# )
# # Indexes for common queries
# __table_args__ = (
# Index('ix_access_logs_ip_timestamp', 'ip', 'timestamp'),
# Index('ix_access_logs_is_suspicious', 'is_suspicious'),
# Index('ix_access_logs_is_honeypot_trigger', 'is_honeypot_trigger'),
# )
# def __repr__(self) -> str:
# return f"<AccessLog(id={self.id}, ip='{self.ip}', path='{self.path[:50]}')>"
return f"<IpStats(ip='{self.ip}', total_requests={self.total_requests})>"

View File

@@ -7,7 +7,7 @@ Protects against SQL injection payloads, XSS, and storage exhaustion attacks.
import html
import re
from typing import Optional
from typing import Optional, Dict
# Field length limits for database storage
@@ -111,3 +111,6 @@ def escape_html_truncated(value: Optional[str], max_display_length: int) -> str:
value_str = value_str[:max_display_length] + "..."
return html.escape(value_str)
def sanitize_dict(value: Optional[Dict[str,str]], max_display_length):
return {k: sanitize_for_storage(v, max_display_length) for k, v in value.items()}