diff --git a/src/analyzer.py b/src/analyzer.py index a745813..85ce529 100644 --- a/src/analyzer.py +++ b/src/analyzer.py @@ -8,6 +8,9 @@ from datetime import datetime, timedelta import re from wordlists import get_wordlists from config import get_config +import requests +from sanitizer import sanitize_for_storage, sanitize_dict + """ Functions for user activity analysis """ @@ -228,6 +231,10 @@ class Analyzer: for name, pattern in wl.attack_urls.items(): if re.search(pattern, queried_path, re.IGNORECASE): attack_urls_found_list.append(pattern) + + #remove duplicates + attack_urls_found_list = set(attack_urls_found_list) + attack_urls_found_list = list(attack_urls_found_list) if len(attack_urls_found_list) > attack_urls_threshold: score["attacker"]["attack_url"] = True @@ -281,3 +288,32 @@ class Analyzer: self._db_manager.update_ip_stats_analysis(ip, analyzed_metrics, category, category_scores, last_analysis) return 0 + + def update_ip_rep_infos(self, ip: str) -> list[str]: + api_url = "https://iprep.lcrawl.com/api/iprep/" + params = { + "cidr": ip + } + headers = { + "Content-Type": "application/json" + } + + response = requests.get(api_url, headers=headers, params=params) + payload = response.json() + + if payload["results"]: + data = payload["results"][0] + + country_iso_code = data["geoip_data"]["country_iso_code"] + asn = data["geoip_data"]["asn_autonomous_system_number"] + asn_org = data["geoip_data"]["asn_autonomous_system_organization"] + list_on = data["list_on"] + + sanitized_country_iso_code = sanitize_for_storage(country_iso_code, 3) + sanitized_asn = sanitize_for_storage(asn, 100) + sanitized_asn_org = sanitize_for_storage(asn_org, 100) + sanitized_list_on = sanitize_dict(list_on, 100000) + + self._db_manager.update_ip_rep_infos(ip, sanitized_country_iso_code, sanitized_asn, sanitized_asn_org, sanitized_list_on) + + return \ No newline at end of file diff --git a/src/database.py b/src/database.py index 9d8e444..b5622db 100644 --- a/src/database.py +++ b/src/database.py @@ -246,7 +246,7 @@ class DatabaseManager: ip_stats.category_scores = category_scores ip_stats.last_analysis = last_analysis - def manual_update_category(self, ip: str, category: str) -> None: + def manual_update_category(self, ip: str, category: str) -> None: """ Update IP category as a result of a manual intervention by an admin @@ -257,11 +257,36 @@ class DatabaseManager: """ session = self.session + sanitized_ip = sanitize_ip(ip) ip_stats = session.query(IpStats).filter(IpStats.ip == sanitized_ip).first() + ip_stats.category = category ip_stats.manual_category = True + def update_ip_rep_infos(self, ip: str, country_code: str, asn: str, asn_org: str, list_on: Dict[str,str]) -> None: + """ + Update IP rep stats + + Args: + ip: IP address + country_code: IP address country code + asn: IP address ASN + asn_org: IP address ASN ORG + list_on: public lists containing the IP address + + """ + session = self.session + + sanitized_ip = sanitize_ip(ip) + ip_stats = session.query(IpStats).filter(IpStats.ip == sanitized_ip).first() + + ip_stats.country_code = country_code + ip_stats.asn = asn + ip_stats.asn_org = asn_org + ip_stats.list_on = list_on + + def get_access_logs( self, limit: int = 100, diff --git a/src/handler.py b/src/handler.py index eef528d..00238e7 100644 --- a/src/handler.py +++ b/src/handler.py @@ -417,6 +417,7 @@ class Handler(BaseHTTPRequestHandler): self.tracker.record_access(client_ip, self.path, user_agent, method='GET') self.analyzer.infer_user_category(client_ip) + self.analyzer.update_ip_rep_infos(client_ip) if self.tracker.is_suspicious_user_agent(user_agent): self.access_logger.warning(f"[SUSPICIOUS] {client_ip} - {user_agent[:50]} - {self.path}") diff --git a/src/models.py b/src/models.py index 190ef26..5e5cd2c 100644 --- a/src/models.py +++ b/src/models.py @@ -134,6 +134,7 @@ class IpStats(Base): city: Mapped[Optional[str]] = mapped_column(String(MAX_CITY_LENGTH), nullable=True) asn: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) asn_org: Mapped[Optional[str]] = mapped_column(String(MAX_ASN_ORG_LENGTH), nullable=True) + list_on: Mapped[Optional[Dict[str,str]]] = mapped_column(JSON, nullable=True) # Reputation fields (populated by future enrichment) reputation_score: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) @@ -149,34 +150,4 @@ class IpStats(Base): def __repr__(self) -> str: - return f"" - -# class IpLog(Base): -# """ -# Records all IPs that have accessed the honeypot, along with aggregated stats and inferred user category. -# """ -# __tablename__ = 'ip_logs' - -# id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) -# ip: Mapped[str] = mapped_column(String(MAX_IP_LENGTH), nullable=False, index=True) -# stats: Mapped[List[str]] = mapped_column(String(MAX_PATH_LENGTH)) -# category: Mapped[str] = mapped_column(String(15)) -# manual_category: Mapped[bool] = mapped_column(Boolean, default=False) -# last_analysis: Mapped[datetime] = mapped_column(DateTime, index=True), - -# # Relationship to attack detections -# access_logs: Mapped[List["AccessLog"]] = relationship( -# "AccessLog", -# back_populates="ip", -# cascade="all, delete-orphan" -# ) - -# # Indexes for common queries -# __table_args__ = ( -# Index('ix_access_logs_ip_timestamp', 'ip', 'timestamp'), -# Index('ix_access_logs_is_suspicious', 'is_suspicious'), -# Index('ix_access_logs_is_honeypot_trigger', 'is_honeypot_trigger'), -# ) - -# def __repr__(self) -> str: -# return f"" \ No newline at end of file + return f"" \ No newline at end of file diff --git a/src/sanitizer.py b/src/sanitizer.py index f783129..a04d0c0 100644 --- a/src/sanitizer.py +++ b/src/sanitizer.py @@ -7,7 +7,7 @@ Protects against SQL injection payloads, XSS, and storage exhaustion attacks. import html import re -from typing import Optional +from typing import Optional, Dict # Field length limits for database storage @@ -111,3 +111,6 @@ def escape_html_truncated(value: Optional[str], max_display_length: int) -> str: value_str = value_str[:max_display_length] + "..." return html.escape(value_str) + +def sanitize_dict(value: Optional[Dict[str,str]], max_display_length): + return {k: sanitize_for_storage(v, max_display_length) for k, v in value.items()} \ No newline at end of file