""" Database operations for the pyworker package. """ import json from contextlib import contextmanager from datetime import datetime from typing import Any, Dict, Generator, List, Optional, TypedDict, Union from typing import Literal as TypeLiteral import psycopg from psycopg.rows import dict_row from psycopg.sql import SQL, Composed, Literal from psycopg_pool import ConnectionPool # Proper import for the connection pool from pyworker.config import config from pyworker.utils.app_logging import setup_logging logger = setup_logging(__name__) # --- Type Definitions --- # Moved from tasks/comment_moderation.py class CommentType(TypedDict): id: int upvotes: int status: str # Assuming CommentStatus Enum isn't used across modules yet suspicious: bool requiresAdminReview: bool communityNote: Optional[str] internalNote: Optional[str] privateContext: Optional[str] content: str rating: Optional[float] createdAt: datetime updatedAt: datetime authorId: int serviceId: int parentId: Optional[int] # Add author/service/reply fields if needed by update_comment # Moved from utils/ai.py RatingType = TypeLiteral["info", "warning", "alert"] class UserRightType(TypedDict): text: str rating: RatingType class DataSharingType(TypedDict): text: str rating: RatingType class DataCollectedType(TypedDict): text: str rating: RatingType class KycOrSourceOfFundsType(TypedDict): text: str rating: RatingType class TosReviewType(TypedDict, total=False): contentHash: str kycLevel: int summary: str complexity: TypeLiteral["low", "medium", "high"] highlights: List[Dict[str, Any]] class CommentSentimentSummaryType(TypedDict): summary: str sentiment: TypeLiteral["positive", "negative", "neutral"] whatUsersLike: List[str] whatUsersDislike: List[str] class CommentModerationType(TypedDict): isSpam: bool requiresAdminReview: bool contextNote: str internalNote: str commentQuality: int QueryType = Union[str, bytes, SQL, Composed, Literal] # --- Database Connection Pool --- _db_pool: Optional[ConnectionPool] = None def get_db_pool() -> ConnectionPool: """ Get or create the database connection pool. Returns: A connection pool object. """ global _db_pool if _db_pool is None: try: # Create a new connection pool with min connections of 2 and max of 10 _db_pool = ConnectionPool( conninfo=config.db_connection_string, min_size=2, max_size=10, # Configure how connections are initialized kwargs={ "autocommit": False, }, ) logger.info("Database connection pool initialized") except Exception as e: logger.error(f"Error creating database connection pool: {e}") raise return _db_pool def close_db_pool(): """ Close the database connection pool. This should be called when the application is shutting down. """ global _db_pool if _db_pool is not None: logger.info("Closing database connection pool") _db_pool.close() _db_pool = None @contextmanager def get_db_connection() -> Generator[psycopg.Connection, None, None]: """ Context manager for database connections. Yields: A database connection object from the pool. """ pool = get_db_pool() try: # Use the connection method which returns a connection as a context manager with pool.connection() as conn: # Set the schema explicitly after connection with conn.cursor() as cursor: cursor.execute("SET search_path TO public") yield conn # The connection will be automatically returned to the pool # when the with block exits except Exception as e: logger.error(f"Error connecting to the database: {e}") raise # --- Database Functions --- def fetch_all_services() -> List[Dict[str, Any]]: """ Fetch all public and verified services from the database. Returns: A list of service dictionaries. """ services = [] try: with get_db_connection() as conn: with conn.cursor(row_factory=dict_row) as cursor: cursor.execute(""" SELECT id, name, slug, description, "kycLevel", "overallScore", "privacyScore", "trustScore", "verificationStatus", "serviceVisibility", "tosUrls", "serviceUrls", "onionUrls", "i2pUrls", "tosReview", "tosReviewAt", "userSentiment", "userSentimentAt" FROM "Service" WHERE "serviceVisibility" = 'PUBLIC' AND ("verificationStatus" = 'VERIFICATION_SUCCESS' OR "verificationStatus" = 'COMMUNITY_CONTRIBUTED' OR "verificationStatus" = 'APPROVED') ORDER BY id """) services = cursor.fetchall() logger.info(f"Fetched {len(services)} services from the database") except Exception as e: logger.error(f"Error fetching services: {e}") return services def fetch_services_with_pending_comments() -> List[Dict[str, Any]]: """ Fetch all public and verified services that have at least one pending comment. Returns: A list of service dictionaries. """ services = [] try: with get_db_connection() as conn: with conn.cursor(row_factory=dict_row) as cursor: cursor.execute(""" SELECT DISTINCT s.id, s.name, s.slug, s.description, s."kycLevel", s."overallScore", s."privacyScore", s."trustScore", s."verificationStatus", s."serviceVisibility", s."tosUrls", s."serviceUrls", s."onionUrls", s."i2pUrls", s."tosReview", s."tosReviewAt", s."userSentiment", s."userSentimentAt" FROM "Service" s JOIN "Comment" c ON s.id = c."serviceId" WHERE c.status = 'PENDING' AND s."serviceVisibility" = 'PUBLIC' AND (s."verificationStatus" = 'VERIFICATION_SUCCESS' OR s."verificationStatus" = 'COMMUNITY_CONTRIBUTED' OR s."verificationStatus" = 'APPROVED') ORDER BY s.id """) services = cursor.fetchall() logger.info( f"Fetched {len(services)} services with pending comments from the database" ) except Exception as e: logger.error(f"Error fetching services with pending comments: {e}") return services def fetch_service_attributes(service_id: int) -> List[Dict[str, Any]]: """ Fetch attributes for a specific service. Args: service_id: The ID of the service. Returns: A list of attribute dictionaries. """ attributes = [] try: with get_db_connection() as conn: with conn.cursor(row_factory=dict_row) as cursor: cursor.execute( """ SELECT a.id, a.slug, a.title, a.description, a.category, a.type FROM "Attribute" a JOIN "ServiceAttribute" sa ON a.id = sa."attributeId" WHERE sa."serviceId" = %s """, (service_id,), ) attributes = cursor.fetchall() except Exception as e: logger.error(f"Error fetching attributes for service {service_id}: {e}") return attributes def get_attribute_id_by_slug(slug: str) -> Optional[int]: attribute_id = None try: with get_db_connection() as conn: with conn.cursor(row_factory=dict_row) as cursor: cursor.execute('SELECT id FROM "Attribute" WHERE slug = %s', (slug,)) row = cursor.fetchone() if row: attribute_id = row["id"] except Exception as e: logger.error(f"Error fetching attribute id for slug '{slug}': {e}") return attribute_id def add_service_attribute(service_id: int, attribute_id: int) -> bool: try: with get_db_connection() as conn: with conn.cursor(row_factory=dict_row) as cursor: cursor.execute( 'SELECT 1 FROM "ServiceAttribute" WHERE "serviceId" = %s AND "attributeId" = %s', (service_id, attribute_id), ) if cursor.fetchone(): return True cursor.execute( 'INSERT INTO "ServiceAttribute" ("serviceId", "attributeId", "createdAt") VALUES (%s, %s, NOW())', (service_id, attribute_id), ) conn.commit() logger.info( f"Added attribute id {attribute_id} to service {service_id}" ) return True except Exception as e: logger.error( f"Error adding attribute id {attribute_id} to service {service_id}: {e}" ) return False def remove_service_attribute(service_id: int, attribute_id: int) -> bool: try: with get_db_connection() as conn: with conn.cursor(row_factory=dict_row) as cursor: cursor.execute( 'DELETE FROM "ServiceAttribute" WHERE "serviceId" = %s AND "attributeId" = %s', (service_id, attribute_id), ) conn.commit() logger.info( f"Removed attribute id {attribute_id} from service {service_id}" ) return True except Exception as e: logger.error( f"Error removing attribute id {attribute_id} from service {service_id}: {e}" ) return False def add_service_attribute_by_slug(service_id: int, attribute_slug: str) -> bool: attribute_id = get_attribute_id_by_slug(attribute_slug) if attribute_id is None: logger.error(f"Attribute with slug '{attribute_slug}' not found.") return False return add_service_attribute(service_id, attribute_id) def remove_service_attribute_by_slug(service_id: int, attribute_slug: str) -> bool: attribute_id = get_attribute_id_by_slug(attribute_slug) if attribute_id is None: logger.error(f"Attribute with slug '{attribute_slug}' not found.") return False return remove_service_attribute(service_id, attribute_id) def save_tos_review(service_id: int, review: TosReviewType): """ Save a TOS review for a specific service. Args: service_id: The ID of the service. review: A TypedDict containing the review data. """ try: # Serialize the dictionary to a JSON string for the database review_json = json.dumps(review) with get_db_connection() as conn: with conn.cursor(row_factory=dict_row) as cursor: cursor.execute( """ UPDATE "Service" SET "tosReview" = %s, "tosReviewAt" = NOW() WHERE id = %s """, (review_json, service_id), ) conn.commit() logger.info(f"Successfully saved TOS review for service {service_id}") except Exception as e: logger.error(f"Error saving TOS review for service {service_id}: {e}") def update_kyc_level(service_id: int, kyc_level: int) -> bool: """ Update the KYC level for a specific service. Args: service_id: The ID of the service. kyc_level: The new KYC level (0-4). Returns: bool: True if the update was successful, False otherwise. """ try: # Ensure the kyc_level is within the valid range if not 0 <= kyc_level <= 4: logger.error( f"Invalid KYC level ({kyc_level}) for service {service_id}. Must be between 0 and 4." ) return False with get_db_connection() as conn: with conn.cursor(row_factory=dict_row) as cursor: cursor.execute( """ UPDATE "Service" SET "kycLevel" = %s, "updatedAt" = NOW() WHERE id = %s """, (kyc_level, service_id), ) conn.commit() logger.info( f"Successfully updated KYC level to {kyc_level} for service {service_id}" ) return True except Exception as e: logger.error(f"Error updating KYC level for service {service_id}: {e}") return False def get_comments(service_id: int, status: str = "APPROVED") -> List[Dict[str, Any]]: """ Get all comments for a specific service with the specified status. Args: service_id: The ID of the service. status: The status of comments to fetch (e.g. 'APPROVED', 'PENDING'). Defaults to 'APPROVED'. Returns: A list of comment dictionaries. NOTE: The structure returned by the SQL query might be different from CommentType. Adjust CommentType or parsing if needed elsewhere. """ comments = [] try: with get_db_connection() as conn: with conn.cursor(row_factory=dict_row) as cursor: cursor.execute( """ WITH RECURSIVE comment_tree AS ( -- Base case: get all root comments (no parent) SELECT c.id, c.content, c.rating, c.upvotes, c."createdAt", c."updatedAt", c."parentId", c.status, u.id as "authorId", u.name as "authorName", u."displayName" as "authorDisplayName", u.picture as "authorPicture", u.verified as "authorVerified", 0 as depth FROM "Comment" c JOIN "User" u ON c."authorId" = u.id WHERE c."serviceId" = %s AND c.status = %s AND c."parentId" IS NULL UNION ALL -- Recursive case: get all replies SELECT c.id, c.content, c.rating, c.upvotes, c."createdAt", c."updatedAt", c."parentId", c.status, u.id as "authorId", u.name as "authorName", u."displayName" as "authorDisplayName", u.picture as "authorPicture", u.verified as "authorVerified", ct.depth + 1 FROM "Comment" c JOIN "User" u ON c."authorId" = u.id JOIN comment_tree ct ON c."parentId" = ct.id WHERE c.status = %s ) SELECT * FROM comment_tree ORDER BY "createdAt" DESC, depth ASC """, (service_id, status, status), ) comments = cursor.fetchall() except Exception as e: logger.error( f"Error fetching comments for service {service_id} with status {status}: {e}" ) return comments def get_max_comment_updated_at( service_id: int, status: str = "APPROVED" ) -> Optional[datetime]: """ Get the maximum 'updatedAt' timestamp for comments of a specific service and status. Args: service_id: The ID of the service. status: The status of comments to consider. Returns: The maximum 'updatedAt' timestamp as a datetime object, or None if no matching comments. """ max_updated_at = None try: with get_db_connection() as conn: with ( conn.cursor() as cursor ): # dict_row not strictly needed for single value cursor.execute( """ SELECT MAX("updatedAt") FROM "Comment" WHERE "serviceId" = %s AND status = %s """, (service_id, status), ) result = cursor.fetchone() if result and result[0] is not None: max_updated_at = result[0] except Exception as e: logger.error( f"Error fetching max comment updatedAt for service {service_id} with status {status}: {e}" ) return max_updated_at def save_user_sentiment( service_id: int, sentiment: Optional[CommentSentimentSummaryType], last_processed_comment_timestamp: Optional[datetime], ): """ Save user sentiment for a specific service and the timestamp of the last comment processed. Args: service_id: The ID of the service. sentiment: A dictionary containing the sentiment data, or None to clear it. last_processed_comment_timestamp: The 'updatedAt' timestamp of the most recent comment considered in this sentiment analysis. Can be None. """ try: sentiment_json = json.dumps(sentiment) if sentiment is not None else None with get_db_connection() as conn: with conn.cursor() as cursor: # row_factory not needed for UPDATE cursor.execute( """ UPDATE "Service" SET "userSentiment" = %s, "userSentimentAt" = %s WHERE id = %s """, (sentiment_json, last_processed_comment_timestamp, service_id), ) conn.commit() if sentiment: logger.info( f"Successfully saved user sentiment for service {service_id} with last comment processed at {last_processed_comment_timestamp}" ) else: logger.info( f"Successfully cleared user sentiment for service {service_id}, last comment processed at set to {last_processed_comment_timestamp}" ) except Exception as e: logger.error(f"Error saving user sentiment for service {service_id}: {e}") def update_comment_moderation(comment_data: CommentType): """ Update an existing comment in the database based on moderation results. Args: comment_data: A TypedDict representing the comment data to update. Expected keys are defined in CommentType. """ comment_id = comment_data.get("id") if not comment_id: logger.error("Cannot update comment: 'id' is missing from comment_data.") return try: with get_db_connection() as conn: with conn.cursor() as cursor: cursor.execute( """ UPDATE "Comment" SET status = %(status)s, "requiresAdminReview" = %(requiresAdminReview)s, "communityNote" = %(communityNote)s, "internalNote" = %(internalNote)s, "updatedAt" = NOW() WHERE id = %(id)s """, comment_data, ) conn.commit() logger.info(f"Successfully updated comment {comment_id}") except Exception as e: logger.error(f"Error updating comment {comment_id}: {e}") def touch_service_updated_at(service_id: int) -> bool: """ Update the updatedAt field for a specific service to now. Args: service_id: The ID of the service. Returns: bool: True if the update was successful, False otherwise. """ try: with get_db_connection() as conn: with conn.cursor(row_factory=dict_row) as cursor: cursor.execute( """ UPDATE "Service" SET "updatedAt" = NOW() WHERE id = %s """, (service_id,), ) conn.commit() logger.info(f"Successfully touched updatedAt for service {service_id}") return True except Exception as e: logger.error(f"Error touching updatedAt for service {service_id}: {e}") return False def run_db_query(query: Any, params: Optional[Any] = None) -> List[Dict[str, Any]]: results = [] try: with get_db_connection() as conn: with conn.cursor(row_factory=dict_row) as cursor: if params is None: cursor.execute(query) else: cursor.execute(query, params) results = cursor.fetchall() except Exception as e: logger.error(f"Error running query: {e}") return results def execute_db_command(command: str, params: Optional[Any] = None) -> int: """ Execute a database command (INSERT, UPDATE, DELETE) and return affected rows. Args: command: The SQL command string. params: Optional parameters for the command. Returns: The number of rows affected by the command. """ affected_rows = 0 try: with get_db_connection() as conn: with conn.cursor() as cursor: # Cast the string to the expected type to satisfy the type checker # In runtime, this is equivalent to passing the command directly cursor.execute(command, params) # type: ignore affected_rows = cursor.rowcount conn.commit() logger.info(f"Executed command, {affected_rows} rows affected.") except Exception as e: logger.error(f"Error executing command: {e}") return affected_rows def create_attribute( slug: str, title: str, description: str, category: str, type: str, privacy_points: float = 0, trust_points: float = 0, overall_points: float = 0, ) -> Optional[int]: """ Create a new attribute in the database if it doesn't already exist. Args: slug: The unique slug for the attribute. title: The display title of the attribute. description: The description of the attribute. category: The category of the attribute (e.g., 'TRUST', 'PRIVACY'). type: The type of the attribute (e.g., 'WARNING', 'FEATURE'). privacy_points: Points affecting privacy score (default: 0). trust_points: Points affecting trust score (default: 0). overall_points: Points affecting overall score (default: 0). Returns: The ID of the created (or existing) attribute, or None if creation failed. """ try: with get_db_connection() as conn: with conn.cursor(row_factory=dict_row) as cursor: # First check if the attribute already exists cursor.execute('SELECT id FROM "Attribute" WHERE slug = %s', (slug,)) row = cursor.fetchone() if row: logger.info( f"Attribute with slug '{slug}' already exists, id: {row['id']}" ) return row["id"] # Create the attribute if it doesn't exist cursor.execute( """ INSERT INTO "Attribute" ( slug, title, description, "privacyPoints", "trustPoints", category, type, "createdAt", "updatedAt" ) VALUES ( %s, %s, %s, %s, %s, %s, %s, NOW(), NOW() ) RETURNING id """, ( slug, title, description, privacy_points, trust_points, category, type, ), ) conn.commit() result = cursor.fetchone() if result is None: logger.error( f"Failed to retrieve ID for newly created attribute with slug '{slug}'" ) return None attribute_id = result["id"] logger.info( f"Created new attribute with slug '{slug}', id: {attribute_id}" ) return attribute_id except Exception as e: logger.error(f"Error creating attribute with slug '{slug}': {e}") return None