Files
kycnotme/pyworker/pyworker/database.py
2025-06-11 10:39:20 +00:00

738 lines
26 KiB
Python

"""
Database operations for the pyworker package.
"""
import json
from contextlib import contextmanager
from datetime import datetime
from typing import Any, Dict, Generator, List, Optional, TypedDict, Union
from typing import Literal as TypeLiteral
import psycopg
from psycopg.rows import dict_row
from psycopg.sql import SQL, Composed, Literal
from psycopg_pool import ConnectionPool # Proper import for the connection pool
from pyworker.config import config
from pyworker.utils.app_logging import setup_logging
logger = setup_logging(__name__)
# --- Type Definitions ---
# Moved from tasks/comment_moderation.py
class CommentType(TypedDict):
id: int
upvotes: int
status: str # Assuming CommentStatus Enum isn't used across modules yet
suspicious: bool
requiresAdminReview: bool
communityNote: Optional[str]
internalNote: Optional[str]
privateContext: Optional[str]
content: str
rating: Optional[float]
createdAt: datetime
updatedAt: datetime
authorId: int
serviceId: int
parentId: Optional[int]
# Add author/service/reply fields if needed by update_comment
# Moved from utils/ai.py
RatingType = TypeLiteral["info", "warning", "alert"]
class UserRightType(TypedDict):
text: str
rating: RatingType
class DataSharingType(TypedDict):
text: str
rating: RatingType
class DataCollectedType(TypedDict):
text: str
rating: RatingType
class KycOrSourceOfFundsType(TypedDict):
text: str
rating: RatingType
class TosReviewType(TypedDict, total=False):
contentHash: str
kycLevel: int
summary: str
complexity: TypeLiteral["low", "medium", "high"]
highlights: List[Dict[str, Any]]
class CommentSentimentSummaryType(TypedDict):
summary: str
sentiment: TypeLiteral["positive", "negative", "neutral"]
whatUsersLike: List[str]
whatUsersDislike: List[str]
class CommentModerationType(TypedDict):
isSpam: bool
requiresAdminReview: bool
contextNote: str
internalNote: str
commentQuality: int
QueryType = Union[str, bytes, SQL, Composed, Literal]
# --- Database Connection Pool ---
_db_pool: Optional[ConnectionPool] = None
def get_db_pool() -> ConnectionPool:
"""
Get or create the database connection pool.
Returns:
A connection pool object.
"""
global _db_pool
if _db_pool is None:
try:
# Create a new connection pool with min connections of 2 and max of 10
_db_pool = ConnectionPool(
conninfo=config.db_connection_string,
min_size=2,
max_size=10,
# Configure how connections are initialized
kwargs={
"autocommit": False,
},
)
logger.info("Database connection pool initialized")
except Exception as e:
logger.error(f"Error creating database connection pool: {e}")
raise
return _db_pool
def close_db_pool():
"""
Close the database connection pool.
This should be called when the application is shutting down.
"""
global _db_pool
if _db_pool is not None:
logger.info("Closing database connection pool")
_db_pool.close()
_db_pool = None
@contextmanager
def get_db_connection() -> Generator[psycopg.Connection, None, None]:
"""
Context manager for database connections.
Yields:
A database connection object from the pool.
"""
pool = get_db_pool()
try:
# Use the connection method which returns a connection as a context manager
with pool.connection() as conn:
# Set the schema explicitly after connection
with conn.cursor() as cursor:
cursor.execute("SET search_path TO public")
yield conn
# The connection will be automatically returned to the pool
# when the with block exits
except Exception as e:
logger.error(f"Error connecting to the database: {e}")
raise
# --- Database Functions ---
def fetch_all_services() -> List[Dict[str, Any]]:
"""
Fetch all public and verified services from the database.
Returns:
A list of service dictionaries.
"""
services = []
try:
with get_db_connection() as conn:
with conn.cursor(row_factory=dict_row) as cursor:
cursor.execute("""
SELECT id, name, slug, description, "kycLevel", "overallScore",
"privacyScore", "trustScore", "verificationStatus",
"serviceVisibility", "tosUrls", "serviceUrls", "onionUrls", "i2pUrls",
"tosReview", "tosReviewAt", "userSentiment", "userSentimentAt"
FROM "Service"
WHERE "serviceVisibility" = 'PUBLIC'
AND ("verificationStatus" = 'VERIFICATION_SUCCESS'
OR "verificationStatus" = 'COMMUNITY_CONTRIBUTED'
OR "verificationStatus" = 'APPROVED')
ORDER BY id
""")
services = cursor.fetchall()
logger.info(f"Fetched {len(services)} services from the database")
except Exception as e:
logger.error(f"Error fetching services: {e}")
return services
def fetch_services_with_pending_comments() -> List[Dict[str, Any]]:
"""
Fetch all public and verified services that have at least one pending comment.
Returns:
A list of service dictionaries.
"""
services = []
try:
with get_db_connection() as conn:
with conn.cursor(row_factory=dict_row) as cursor:
cursor.execute("""
SELECT DISTINCT s.id, s.name, s.slug, s.description, s."kycLevel", s."overallScore",
s."privacyScore", s."trustScore", s."verificationStatus",
s."serviceVisibility", s."tosUrls", s."serviceUrls", s."onionUrls", s."i2pUrls",
s."tosReview", s."tosReviewAt", s."userSentiment", s."userSentimentAt"
FROM "Service" s
JOIN "Comment" c ON s.id = c."serviceId"
WHERE c.status = 'PENDING'
AND s."serviceVisibility" = 'PUBLIC'
AND (s."verificationStatus" = 'VERIFICATION_SUCCESS'
OR s."verificationStatus" = 'COMMUNITY_CONTRIBUTED'
OR s."verificationStatus" = 'APPROVED')
ORDER BY s.id
""")
services = cursor.fetchall()
logger.info(
f"Fetched {len(services)} services with pending comments from the database"
)
except Exception as e:
logger.error(f"Error fetching services with pending comments: {e}")
return services
def fetch_service_attributes(service_id: int) -> List[Dict[str, Any]]:
"""
Fetch attributes for a specific service.
Args:
service_id: The ID of the service.
Returns:
A list of attribute dictionaries.
"""
attributes = []
try:
with get_db_connection() as conn:
with conn.cursor(row_factory=dict_row) as cursor:
cursor.execute(
"""
SELECT a.id, a.slug, a.title, a.description, a.category, a.type
FROM "Attribute" a
JOIN "ServiceAttribute" sa ON a.id = sa."attributeId"
WHERE sa."serviceId" = %s
""",
(service_id,),
)
attributes = cursor.fetchall()
except Exception as e:
logger.error(f"Error fetching attributes for service {service_id}: {e}")
return attributes
def get_attribute_id_by_slug(slug: str) -> Optional[int]:
attribute_id = None
try:
with get_db_connection() as conn:
with conn.cursor(row_factory=dict_row) as cursor:
cursor.execute('SELECT id FROM "Attribute" WHERE slug = %s', (slug,))
row = cursor.fetchone()
if row:
attribute_id = row["id"]
except Exception as e:
logger.error(f"Error fetching attribute id for slug '{slug}': {e}")
return attribute_id
def add_service_attribute(service_id: int, attribute_id: int) -> bool:
try:
with get_db_connection() as conn:
with conn.cursor(row_factory=dict_row) as cursor:
cursor.execute(
'SELECT 1 FROM "ServiceAttribute" WHERE "serviceId" = %s AND "attributeId" = %s',
(service_id, attribute_id),
)
if cursor.fetchone():
return True
cursor.execute(
'INSERT INTO "ServiceAttribute" ("serviceId", "attributeId", "createdAt") VALUES (%s, %s, NOW())',
(service_id, attribute_id),
)
conn.commit()
logger.info(
f"Added attribute id {attribute_id} to service {service_id}"
)
return True
except Exception as e:
logger.error(
f"Error adding attribute id {attribute_id} to service {service_id}: {e}"
)
return False
def remove_service_attribute(service_id: int, attribute_id: int) -> bool:
try:
with get_db_connection() as conn:
with conn.cursor(row_factory=dict_row) as cursor:
cursor.execute(
'DELETE FROM "ServiceAttribute" WHERE "serviceId" = %s AND "attributeId" = %s',
(service_id, attribute_id),
)
conn.commit()
logger.info(
f"Removed attribute id {attribute_id} from service {service_id}"
)
return True
except Exception as e:
logger.error(
f"Error removing attribute id {attribute_id} from service {service_id}: {e}"
)
return False
def add_service_attribute_by_slug(service_id: int, attribute_slug: str) -> bool:
attribute_id = get_attribute_id_by_slug(attribute_slug)
if attribute_id is None:
logger.error(f"Attribute with slug '{attribute_slug}' not found.")
return False
return add_service_attribute(service_id, attribute_id)
def remove_service_attribute_by_slug(service_id: int, attribute_slug: str) -> bool:
attribute_id = get_attribute_id_by_slug(attribute_slug)
if attribute_id is None:
logger.error(f"Attribute with slug '{attribute_slug}' not found.")
return False
return remove_service_attribute(service_id, attribute_id)
def save_tos_review(service_id: int, review: Optional[TosReviewType]):
"""Persist a TOS review and/or update the timestamp for a service.
If *review* is ``None`` the existing review (if any) is preserved while
only the ``tosReviewAt`` column is updated. This ensures we still track
when the review task last ran even if the review generation failed or
produced no changes.
"""
try:
with get_db_connection() as conn:
with conn.cursor(row_factory=dict_row) as cursor:
if review is None:
cursor.execute(
'UPDATE "Service" SET "tosReviewAt" = NOW() WHERE id = %s AND "tosReview" IS NULL',
(service_id,),
)
else:
review_json = json.dumps(review)
cursor.execute(
'UPDATE "Service" SET "tosReview" = %s, "tosReviewAt" = NOW() WHERE id = %s',
(review_json, service_id),
)
conn.commit()
logger.info(
f"Successfully saved TOS review (updated={review is not None}) for service {service_id}"
)
except Exception as e:
logger.error(f"Error saving TOS review for service {service_id}: {e}")
def update_kyc_level(service_id: int, kyc_level: int) -> bool:
"""
Update the KYC level for a specific service.
Args:
service_id: The ID of the service.
kyc_level: The new KYC level (0-4).
Returns:
bool: True if the update was successful, False otherwise.
"""
try:
# Ensure the kyc_level is within the valid range
if not 0 <= kyc_level <= 4:
logger.error(
f"Invalid KYC level ({kyc_level}) for service {service_id}. Must be between 0 and 4."
)
return False
with get_db_connection() as conn:
with conn.cursor(row_factory=dict_row) as cursor:
cursor.execute(
"""
UPDATE "Service"
SET "kycLevel" = %s, "updatedAt" = NOW()
WHERE id = %s
""",
(kyc_level, service_id),
)
conn.commit()
logger.info(
f"Successfully updated KYC level to {kyc_level} for service {service_id}"
)
return True
except Exception as e:
logger.error(f"Error updating KYC level for service {service_id}: {e}")
return False
def get_comments(service_id: int, status: str = "APPROVED") -> List[Dict[str, Any]]:
"""
Get all comments for a specific service with the specified status.
Args:
service_id: The ID of the service.
status: The status of comments to fetch (e.g. 'APPROVED', 'PENDING'). Defaults to 'APPROVED'.
Returns:
A list of comment dictionaries.
NOTE: The structure returned by the SQL query might be different from CommentType.
Adjust CommentType or parsing if needed elsewhere.
"""
comments = []
try:
with get_db_connection() as conn:
with conn.cursor(row_factory=dict_row) as cursor:
cursor.execute(
"""
WITH RECURSIVE comment_tree AS (
-- Base case: get all root comments (no parent)
SELECT
c.id,
c.content,
c.rating,
c.upvotes,
c."createdAt",
c."updatedAt",
c."parentId",
c.status,
u.id as "authorId",
u.name as "authorName",
u."displayName" as "authorDisplayName",
u.picture as "authorPicture",
u.verified as "authorVerified",
0 as depth
FROM "Comment" c
JOIN "User" u ON c."authorId" = u.id
WHERE c."serviceId" = %s
AND c.status = %s
AND c."parentId" IS NULL
UNION ALL
-- Recursive case: get all replies
SELECT
c.id,
c.content,
c.rating,
c.upvotes,
c."createdAt",
c."updatedAt",
c."parentId",
c.status,
u.id as "authorId",
u.name as "authorName",
u."displayName" as "authorDisplayName",
u.picture as "authorPicture",
u.verified as "authorVerified",
ct.depth + 1
FROM "Comment" c
JOIN "User" u ON c."authorId" = u.id
JOIN comment_tree ct ON c."parentId" = ct.id
WHERE c.status = %s
)
SELECT * FROM comment_tree
ORDER BY "createdAt" DESC, depth ASC
""",
(service_id, status, status),
)
comments = cursor.fetchall()
except Exception as e:
logger.error(
f"Error fetching comments for service {service_id} with status {status}: {e}"
)
return comments
def get_max_comment_updated_at(
service_id: int, status: str = "APPROVED"
) -> Optional[datetime]:
"""
Get the maximum 'updatedAt' timestamp for comments of a specific service and status.
Args:
service_id: The ID of the service.
status: The status of comments to consider.
Returns:
The maximum 'updatedAt' timestamp as a datetime object, or None if no matching comments.
"""
max_updated_at = None
try:
with get_db_connection() as conn:
with (
conn.cursor() as cursor
): # dict_row not strictly needed for single value
cursor.execute(
"""
SELECT MAX("updatedAt")
FROM "Comment"
WHERE "serviceId" = %s AND status = %s
""",
(service_id, status),
)
result = cursor.fetchone()
if result and result[0] is not None:
max_updated_at = result[0]
except Exception as e:
logger.error(
f"Error fetching max comment updatedAt for service {service_id} with status {status}: {e}"
)
return max_updated_at
def save_user_sentiment(
service_id: int,
sentiment: Optional[CommentSentimentSummaryType],
last_processed_comment_timestamp: Optional[datetime],
):
"""
Save user sentiment for a specific service and the timestamp of the last comment processed.
Args:
service_id: The ID of the service.
sentiment: A dictionary containing the sentiment data, or None to clear it.
last_processed_comment_timestamp: The 'updatedAt' timestamp of the most recent comment
considered in this sentiment analysis. Can be None.
"""
try:
sentiment_json = json.dumps(sentiment) if sentiment is not None else None
with get_db_connection() as conn:
with conn.cursor() as cursor: # row_factory not needed for UPDATE
cursor.execute(
"""
UPDATE "Service"
SET "userSentiment" = %s, "userSentimentAt" = %s
WHERE id = %s
""",
(sentiment_json, last_processed_comment_timestamp, service_id),
)
conn.commit()
if sentiment:
logger.info(
f"Successfully saved user sentiment for service {service_id} with last comment processed at {last_processed_comment_timestamp}"
)
else:
logger.info(
f"Successfully cleared user sentiment for service {service_id}, last comment processed at set to {last_processed_comment_timestamp}"
)
except Exception as e:
logger.error(f"Error saving user sentiment for service {service_id}: {e}")
def update_comment_moderation(comment_data: CommentType):
"""
Update an existing comment in the database based on moderation results.
Args:
comment_data: A TypedDict representing the comment data to update.
Expected keys are defined in CommentType.
"""
comment_id = comment_data.get("id")
if not comment_id:
logger.error("Cannot update comment: 'id' is missing from comment_data.")
return
try:
with get_db_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(
"""
UPDATE "Comment"
SET
status = %(status)s,
"requiresAdminReview" = %(requiresAdminReview)s,
"communityNote" = %(communityNote)s,
"internalNote" = %(internalNote)s,
"updatedAt" = NOW()
WHERE id = %(id)s
""",
comment_data,
)
conn.commit()
logger.info(f"Successfully updated comment {comment_id}")
except Exception as e:
logger.error(f"Error updating comment {comment_id}: {e}")
def touch_service_updated_at(service_id: int) -> bool:
"""
Update the updatedAt field for a specific service to now.
Args:
service_id: The ID of the service.
Returns:
bool: True if the update was successful, False otherwise.
"""
try:
with get_db_connection() as conn:
with conn.cursor(row_factory=dict_row) as cursor:
cursor.execute(
"""
UPDATE "Service"
SET "updatedAt" = NOW()
WHERE id = %s
""",
(service_id,),
)
conn.commit()
logger.info(f"Successfully touched updatedAt for service {service_id}")
return True
except Exception as e:
logger.error(f"Error touching updatedAt for service {service_id}: {e}")
return False
def run_db_query(query: Any, params: Optional[Any] = None) -> List[Dict[str, Any]]:
results = []
try:
with get_db_connection() as conn:
with conn.cursor(row_factory=dict_row) as cursor:
if params is None:
cursor.execute(query)
else:
cursor.execute(query, params)
results = cursor.fetchall()
except Exception as e:
logger.error(f"Error running query: {e}")
return results
def execute_db_command(command: str, params: Optional[Any] = None) -> int:
"""
Execute a database command (INSERT, UPDATE, DELETE) and return affected rows.
Args:
command: The SQL command string.
params: Optional parameters for the command.
Returns:
The number of rows affected by the command.
"""
affected_rows = 0
try:
with get_db_connection() as conn:
with conn.cursor() as cursor:
# Cast the string to the expected type to satisfy the type checker
# In runtime, this is equivalent to passing the command directly
cursor.execute(command, params) # type: ignore
affected_rows = cursor.rowcount
conn.commit()
logger.info(f"Executed command, {affected_rows} rows affected.")
except Exception as e:
logger.error(f"Error executing command: {e}")
return affected_rows
def create_attribute(
slug: str,
title: str,
description: str,
category: str,
type: str,
privacy_points: float = 0,
trust_points: float = 0,
overall_points: float = 0,
) -> Optional[int]:
"""
Create a new attribute in the database if it doesn't already exist.
Args:
slug: The unique slug for the attribute.
title: The display title of the attribute.
description: The description of the attribute.
category: The category of the attribute (e.g., 'TRUST', 'PRIVACY').
type: The type of the attribute (e.g., 'WARNING', 'FEATURE').
privacy_points: Points affecting privacy score (default: 0).
trust_points: Points affecting trust score (default: 0).
overall_points: Points affecting overall score (default: 0).
Returns:
The ID of the created (or existing) attribute, or None if creation failed.
"""
try:
with get_db_connection() as conn:
with conn.cursor(row_factory=dict_row) as cursor:
# First check if the attribute already exists
cursor.execute('SELECT id FROM "Attribute" WHERE slug = %s', (slug,))
row = cursor.fetchone()
if row:
logger.info(
f"Attribute with slug '{slug}' already exists, id: {row['id']}"
)
return row["id"]
# Create the attribute if it doesn't exist
cursor.execute(
"""
INSERT INTO "Attribute" (
slug, title, description, "privacyPoints", "trustPoints",
category, type, "createdAt", "updatedAt"
) VALUES (
%s, %s, %s, %s, %s, %s, %s, NOW(), NOW()
) RETURNING id
""",
(
slug,
title,
description,
privacy_points,
trust_points,
category,
type,
),
)
conn.commit()
result = cursor.fetchone()
if result is None:
logger.error(
f"Failed to retrieve ID for newly created attribute with slug '{slug}'"
)
return None
attribute_id = result["id"]
logger.info(
f"Created new attribute with slug '{slug}', id: {attribute_id}"
)
return attribute_id
except Exception as e:
logger.error(f"Error creating attribute with slug '{slug}': {e}")
return None