updated dashboard to pull data from db. This closes issue #10

This commit is contained in:
Phillip Tarrant
2025-12-28 13:52:46 -06:00
parent 6487cb493d
commit a4baedffd9
5 changed files with 218 additions and 23 deletions

View File

@@ -10,7 +10,7 @@ import stat
from datetime import datetime
from typing import Optional, List, Dict, Any
from sqlalchemy import create_engine
from sqlalchemy import create_engine, func, distinct, case
from sqlalchemy.orm import sessionmaker, scoped_session, Session
from models import Base, AccessLog, CredentialAttempt, AttackDetection, IpStats
@@ -346,6 +346,200 @@ class DatabaseManager:
finally:
self.close_session()
def get_dashboard_counts(self) -> Dict[str, int]:
"""
Get aggregate statistics for the dashboard.
Returns:
Dictionary with total_accesses, unique_ips, unique_paths,
suspicious_accesses, honeypot_triggered, honeypot_ips
"""
session = self.session
try:
# Get main aggregate counts in one query
result = session.query(
func.count(AccessLog.id).label('total_accesses'),
func.count(distinct(AccessLog.ip)).label('unique_ips'),
func.count(distinct(AccessLog.path)).label('unique_paths'),
func.sum(case((AccessLog.is_suspicious == True, 1), else_=0)).label('suspicious_accesses'),
func.sum(case((AccessLog.is_honeypot_trigger == True, 1), else_=0)).label('honeypot_triggered')
).first()
# Get unique IPs that triggered honeypots
honeypot_ips = session.query(
func.count(distinct(AccessLog.ip))
).filter(AccessLog.is_honeypot_trigger == True).scalar() or 0
return {
'total_accesses': result.total_accesses or 0,
'unique_ips': result.unique_ips or 0,
'unique_paths': result.unique_paths or 0,
'suspicious_accesses': int(result.suspicious_accesses or 0),
'honeypot_triggered': int(result.honeypot_triggered or 0),
'honeypot_ips': honeypot_ips
}
finally:
self.close_session()
def get_top_ips(self, limit: int = 10) -> List[tuple]:
"""
Get top IP addresses by access count.
Args:
limit: Maximum number of results
Returns:
List of (ip, count) tuples ordered by count descending
"""
session = self.session
try:
results = session.query(
AccessLog.ip,
func.count(AccessLog.id).label('count')
).group_by(AccessLog.ip).order_by(
func.count(AccessLog.id).desc()
).limit(limit).all()
return [(row.ip, row.count) for row in results]
finally:
self.close_session()
def get_top_paths(self, limit: int = 10) -> List[tuple]:
"""
Get top paths by access count.
Args:
limit: Maximum number of results
Returns:
List of (path, count) tuples ordered by count descending
"""
session = self.session
try:
results = session.query(
AccessLog.path,
func.count(AccessLog.id).label('count')
).group_by(AccessLog.path).order_by(
func.count(AccessLog.id).desc()
).limit(limit).all()
return [(row.path, row.count) for row in results]
finally:
self.close_session()
def get_top_user_agents(self, limit: int = 10) -> List[tuple]:
"""
Get top user agents by access count.
Args:
limit: Maximum number of results
Returns:
List of (user_agent, count) tuples ordered by count descending
"""
session = self.session
try:
results = session.query(
AccessLog.user_agent,
func.count(AccessLog.id).label('count')
).filter(
AccessLog.user_agent.isnot(None),
AccessLog.user_agent != ''
).group_by(AccessLog.user_agent).order_by(
func.count(AccessLog.id).desc()
).limit(limit).all()
return [(row.user_agent, row.count) for row in results]
finally:
self.close_session()
def get_recent_suspicious(self, limit: int = 20) -> List[Dict[str, Any]]:
"""
Get recent suspicious access attempts.
Args:
limit: Maximum number of results
Returns:
List of access log dictionaries with is_suspicious=True
"""
session = self.session
try:
logs = session.query(AccessLog).filter(
AccessLog.is_suspicious == True
).order_by(AccessLog.timestamp.desc()).limit(limit).all()
return [
{
'ip': log.ip,
'path': log.path,
'user_agent': log.user_agent,
'timestamp': log.timestamp.isoformat()
}
for log in logs
]
finally:
self.close_session()
def get_honeypot_triggered_ips(self) -> List[tuple]:
"""
Get IPs that triggered honeypot paths with the paths they accessed.
Returns:
List of (ip, [paths]) tuples
"""
session = self.session
try:
# Get all honeypot triggers grouped by IP
results = session.query(
AccessLog.ip,
AccessLog.path
).filter(
AccessLog.is_honeypot_trigger == True
).all()
# Group paths by IP
ip_paths: Dict[str, List[str]] = {}
for row in results:
if row.ip not in ip_paths:
ip_paths[row.ip] = []
if row.path not in ip_paths[row.ip]:
ip_paths[row.ip].append(row.path)
return [(ip, paths) for ip, paths in ip_paths.items()]
finally:
self.close_session()
def get_recent_attacks(self, limit: int = 20) -> List[Dict[str, Any]]:
"""
Get recent access logs that have attack detections.
Args:
limit: Maximum number of results
Returns:
List of access log dicts with attack_types included
"""
session = self.session
try:
# Get access logs that have attack detections
logs = session.query(AccessLog).join(
AttackDetection
).order_by(AccessLog.timestamp.desc()).limit(limit).all()
return [
{
'ip': log.ip,
'path': log.path,
'user_agent': log.user_agent,
'timestamp': log.timestamp.isoformat(),
'attack_types': [d.attack_type for d in log.attack_detections]
}
for log in logs
]
finally:
self.close_session()
# Module-level singleton instance
_db_manager = DatabaseManager()

View File

@@ -53,9 +53,11 @@ class AccessLog(Base):
cascade="all, delete-orphan"
)
# Composite index for common queries
# Indexes for common queries
__table_args__ = (
Index('ix_access_logs_ip_timestamp', 'ip', 'timestamp'),
Index('ix_access_logs_is_suspicious', 'is_suspicious'),
Index('ix_access_logs_is_honeypot_trigger', 'is_honeypot_trigger'),
)
def __repr__(self) -> str:

View File

@@ -190,7 +190,7 @@ def generate_dashboard(stats: dict) -> str:
</div>
<div class="table-container alert-section">
<h2>🍯 Honeypot Triggers</h2>
<h2>🍯 Honeypot Triggers by IP</h2>
<table>
<thead>
<tr>

View File

@@ -276,21 +276,20 @@ class AccessTracker:
return [(ip, paths) for ip, paths in self.honeypot_triggered.items()]
def get_stats(self) -> Dict:
"""Get statistics summary"""
suspicious_count = sum(1 for log in self.access_log if log.get('suspicious', False))
honeypot_count = sum(1 for log in self.access_log if log.get('honeypot_triggered', False))
return {
'total_accesses': len(self.access_log),
'unique_ips': len(self.ip_counts),
'unique_paths': len(self.path_counts),
'suspicious_accesses': suspicious_count,
'honeypot_triggered': honeypot_count,
'honeypot_ips': len(self.honeypot_triggered),
'top_ips': self.get_top_ips(10),
'top_paths': self.get_top_paths(10),
'top_user_agents': self.get_top_user_agents(10),
'recent_suspicious': self.get_suspicious_accesses(20),
'honeypot_triggered_ips': self.get_honeypot_triggered_ips(),
'attack_types': self.get_attack_type_accesses(20),
'credential_attempts': self.credential_attempts[-50:] # Last 50 attempts
}
"""Get statistics summary from database."""
if not self.db:
raise RuntimeError("Database not available for dashboard stats")
# Get aggregate counts from database
stats = self.db.get_dashboard_counts()
# Add detailed lists from database
stats['top_ips'] = self.db.get_top_ips(10)
stats['top_paths'] = self.db.get_top_paths(10)
stats['top_user_agents'] = self.db.get_top_user_agents(10)
stats['recent_suspicious'] = self.db.get_recent_suspicious(20)
stats['honeypot_triggered_ips'] = self.db.get_honeypot_triggered_ips()
stats['attack_types'] = self.db.get_recent_attacks(20)
stats['credential_attempts'] = self.db.get_credential_attempts(limit=50)
return stats

View File

@@ -134,9 +134,9 @@ echo -e "${GREEN}✓ All credential tests completed!${NC}"
echo -e "${BLUE}========================================${NC}\n"
echo -e "${YELLOW}Check the results:${NC}"
echo -e " 1. View the log file: ${GREEN}cat src/logs/credentials.log${NC}"
echo -e " 1. View the log file: ${GREEN}tail -20 logs/credentials.log${NC}"
echo -e " 2. View the dashboard: ${GREEN}${BASE_URL}/dashboard${NC}"
echo -e " 3. Check recent logs: ${GREEN}tail -20 src/logs/krawl.log${NC}\n"
echo -e " 3. Check recent logs: ${GREEN}tail -20 logs/access.log ${NC}\n"
# Display last 10 credential entries if log file exists
if [ -f "src/logs/credentials.log" ]; then