feat: implement IP ban and rate-limiting logic in database with migration scripts
This commit is contained in:
147
src/database.py
147
src/database.py
@@ -344,6 +344,153 @@ class DatabaseManager:
|
||||
)
|
||||
session.add(ip_stats)
|
||||
|
||||
def increment_page_visit(self, ip: str, max_pages_limit: int) -> int:
|
||||
"""
|
||||
Increment the page visit counter for an IP and apply ban if limit reached.
|
||||
|
||||
Args:
|
||||
ip: Client IP address
|
||||
max_pages_limit: Page visit threshold before banning
|
||||
|
||||
Returns:
|
||||
The updated page visit count
|
||||
"""
|
||||
session = self.session
|
||||
try:
|
||||
sanitized_ip = sanitize_ip(ip)
|
||||
ip_stats = (
|
||||
session.query(IpStats).filter(IpStats.ip == sanitized_ip).first()
|
||||
)
|
||||
|
||||
if not ip_stats:
|
||||
now = datetime.now()
|
||||
ip_stats = IpStats(
|
||||
ip=sanitized_ip,
|
||||
total_requests=0,
|
||||
first_seen=now,
|
||||
last_seen=now,
|
||||
page_visit_count=1,
|
||||
)
|
||||
session.add(ip_stats)
|
||||
session.commit()
|
||||
return 1
|
||||
|
||||
ip_stats.page_visit_count = (ip_stats.page_visit_count or 0) + 1
|
||||
|
||||
if ip_stats.page_visit_count >= max_pages_limit:
|
||||
ip_stats.total_violations = (ip_stats.total_violations or 0) + 1
|
||||
ip_stats.ban_multiplier = 2 ** (ip_stats.total_violations - 1)
|
||||
ip_stats.ban_timestamp = datetime.now()
|
||||
|
||||
session.commit()
|
||||
return ip_stats.page_visit_count
|
||||
|
||||
except Exception as e:
|
||||
session.rollback()
|
||||
applogger.error(f"Error incrementing page visit for {ip}: {e}")
|
||||
return 0
|
||||
finally:
|
||||
self.close_session()
|
||||
|
||||
def is_banned_ip(self, ip: str, ban_duration_seconds: int) -> bool:
|
||||
"""
|
||||
Check if an IP is currently banned.
|
||||
|
||||
Args:
|
||||
ip: Client IP address
|
||||
ban_duration_seconds: Base ban duration in seconds
|
||||
|
||||
Returns:
|
||||
True if the IP is currently banned
|
||||
"""
|
||||
session = self.session
|
||||
try:
|
||||
sanitized_ip = sanitize_ip(ip)
|
||||
ip_stats = (
|
||||
session.query(IpStats).filter(IpStats.ip == sanitized_ip).first()
|
||||
)
|
||||
|
||||
if not ip_stats or ip_stats.ban_timestamp is None:
|
||||
return False
|
||||
|
||||
effective_duration = ban_duration_seconds * (ip_stats.ban_multiplier or 1)
|
||||
elapsed = (datetime.now() - ip_stats.ban_timestamp).total_seconds()
|
||||
|
||||
if elapsed > effective_duration:
|
||||
# Ban expired — reset count for next cycle
|
||||
ip_stats.page_visit_count = 0
|
||||
ip_stats.ban_timestamp = None
|
||||
session.commit()
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
applogger.error(f"Error checking ban status for {ip}: {e}")
|
||||
return False
|
||||
finally:
|
||||
self.close_session()
|
||||
|
||||
def get_ban_info(self, ip: str, ban_duration_seconds: int) -> dict:
|
||||
"""
|
||||
Get detailed ban information for an IP.
|
||||
|
||||
Args:
|
||||
ip: Client IP address
|
||||
ban_duration_seconds: Base ban duration in seconds
|
||||
|
||||
Returns:
|
||||
Dictionary with ban status details
|
||||
"""
|
||||
session = self.session
|
||||
try:
|
||||
sanitized_ip = sanitize_ip(ip)
|
||||
ip_stats = (
|
||||
session.query(IpStats).filter(IpStats.ip == sanitized_ip).first()
|
||||
)
|
||||
|
||||
if not ip_stats:
|
||||
return {
|
||||
"is_banned": False,
|
||||
"violations": 0,
|
||||
"ban_multiplier": 1,
|
||||
"remaining_ban_seconds": 0,
|
||||
}
|
||||
|
||||
violations = ip_stats.total_violations or 0
|
||||
multiplier = ip_stats.ban_multiplier or 1
|
||||
|
||||
if ip_stats.ban_timestamp is None:
|
||||
return {
|
||||
"is_banned": False,
|
||||
"violations": violations,
|
||||
"ban_multiplier": multiplier,
|
||||
"remaining_ban_seconds": 0,
|
||||
}
|
||||
|
||||
effective_duration = ban_duration_seconds * multiplier
|
||||
elapsed = (datetime.now() - ip_stats.ban_timestamp).total_seconds()
|
||||
remaining = max(0, effective_duration - elapsed)
|
||||
|
||||
return {
|
||||
"is_banned": remaining > 0,
|
||||
"violations": violations,
|
||||
"ban_multiplier": multiplier,
|
||||
"effective_ban_duration_seconds": effective_duration,
|
||||
"remaining_ban_seconds": remaining,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
applogger.error(f"Error getting ban info for {ip}: {e}")
|
||||
return {
|
||||
"is_banned": False,
|
||||
"violations": 0,
|
||||
"ban_multiplier": 1,
|
||||
"remaining_ban_seconds": 0,
|
||||
}
|
||||
finally:
|
||||
self.close_session()
|
||||
|
||||
def update_ip_stats_analysis(
|
||||
self,
|
||||
ip: str,
|
||||
|
||||
Reference in New Issue
Block a user