lint code

This commit is contained in:
Lorenzo Venerandi
2026-03-01 15:57:54 +01:00
parent 7401783847
commit 95ab55c428

View File

@@ -1391,11 +1391,7 @@ class DatabaseManager:
query = session.query(IpStats.ip, IpStats.total_requests) query = session.query(IpStats.ip, IpStats.total_requests)
query = self._public_ip_filter(query, IpStats.ip, server_ip) query = self._public_ip_filter(query, IpStats.ip, server_ip)
results = ( results = query.order_by(IpStats.total_requests.desc()).limit(limit).all()
query.order_by(IpStats.total_requests.desc())
.limit(limit)
.all()
)
return [(row.ip, row.total_requests) for row in results] return [(row.ip, row.total_requests) for row in results]
finally: finally:
@@ -1591,9 +1587,8 @@ class DatabaseManager:
# Count distinct paths per IP using SQL GROUP BY # Count distinct paths per IP using SQL GROUP BY
count_col = func.count(distinct(AccessLog.path)).label("path_count") count_col = func.count(distinct(AccessLog.path)).label("path_count")
base_query = ( base_query = session.query(AccessLog.ip, count_col).filter(
session.query(AccessLog.ip, count_col) AccessLog.is_honeypot_trigger == True
.filter(AccessLog.is_honeypot_trigger == True)
) )
base_query = self._public_ip_filter(base_query, AccessLog.ip, server_ip) base_query = self._public_ip_filter(base_query, AccessLog.ip, server_ip)
base_query = base_query.group_by(AccessLog.ip) base_query = base_query.group_by(AccessLog.ip)
@@ -1603,11 +1598,17 @@ class DatabaseManager:
# Apply sorting # Apply sorting
if sort_by == "count": if sort_by == "count":
order_expr = count_col.desc() if sort_order == "desc" else count_col.asc() order_expr = (
count_col.desc() if sort_order == "desc" else count_col.asc()
)
else: else:
order_expr = AccessLog.ip.desc() if sort_order == "desc" else AccessLog.ip.asc() order_expr = (
AccessLog.ip.desc() if sort_order == "desc" else AccessLog.ip.asc()
)
ip_rows = base_query.order_by(order_expr).offset(offset).limit(page_size).all() ip_rows = (
base_query.order_by(order_expr).offset(offset).limit(page_size).all()
)
# Fetch distinct paths only for the paginated IPs # Fetch distinct paths only for the paginated IPs
paginated_ips = [row.ip for row in ip_rows] paginated_ips = [row.ip for row in ip_rows]
@@ -1820,18 +1821,23 @@ class DatabaseManager:
count_col = func.count(AccessLog.id).label("count") count_col = func.count(AccessLog.id).label("count")
# Get total number of distinct paths # Get total number of distinct paths
total_paths = session.query(func.count(distinct(AccessLog.path))).scalar() or 0 total_paths = (
session.query(func.count(distinct(AccessLog.path))).scalar() or 0
# Build query with SQL-level sorting and pagination
query = (
session.query(AccessLog.path, count_col)
.group_by(AccessLog.path)
) )
# Build query with SQL-level sorting and pagination
query = session.query(AccessLog.path, count_col).group_by(AccessLog.path)
if sort_by == "count": if sort_by == "count":
order_expr = count_col.desc() if sort_order == "desc" else count_col.asc() order_expr = (
count_col.desc() if sort_order == "desc" else count_col.asc()
)
else: else:
order_expr = AccessLog.path.desc() if sort_order == "desc" else AccessLog.path.asc() order_expr = (
AccessLog.path.desc()
if sort_order == "desc"
else AccessLog.path.asc()
)
results = query.order_by(order_expr).offset(offset).limit(page_size).all() results = query.order_by(order_expr).offset(offset).limit(page_size).all()
total_pages = max(1, (total_paths + page_size - 1) // page_size) total_pages = max(1, (total_paths + page_size - 1) // page_size)
@@ -1879,7 +1885,8 @@ class DatabaseManager:
total_uas = ( total_uas = (
session.query(func.count(distinct(AccessLog.user_agent))) session.query(func.count(distinct(AccessLog.user_agent)))
.filter(*base_filter) .filter(*base_filter)
.scalar() or 0 .scalar()
or 0
) )
# Build query with SQL-level sorting and pagination # Build query with SQL-level sorting and pagination
@@ -1890,10 +1897,13 @@ class DatabaseManager:
) )
if sort_by == "count": if sort_by == "count":
order_expr = count_col.desc() if sort_order == "desc" else count_col.asc() order_expr = (
count_col.desc() if sort_order == "desc" else count_col.asc()
)
else: else:
order_expr = ( order_expr = (
AccessLog.user_agent.desc() if sort_order == "desc" AccessLog.user_agent.desc()
if sort_order == "desc"
else AccessLog.user_agent.asc() else AccessLog.user_agent.asc()
) )