Release 202507030838

This commit is contained in:
pluja
2025-07-03 08:38:11 +00:00
parent 01488b8b3b
commit a545726abf
28 changed files with 1044 additions and 282 deletions

View File

@@ -14,8 +14,11 @@ OPENAI_BASE_URL="https://xxxxxx/api/v1"
OPENAI_MODEL="xxxxxxxxx"
OPENAI_RETRY=3
CRON_TOSREVIEW_TASK=0 0 1 * * # Every month
CRON_USER_SENTIMENT_TASK=0 0 * * * # Every day
CRON_COMMENT_MODERATION_TASK=0 0 * * * # Every hour
CRON_FORCE_TRIGGERS_TASK=0 2 * * * # Every day
CRON_SERVICE_SCORE_RECALC_TASK=*/5 * * * * # Every 10 minutes
# Task schedules ---------------------------------------------------
CRON_TOSREVIEW_TASK="0 0 1 * *" # every month
CRON_USER_SENTIMENT_TASK="0 0 * * *" # daily
CRON_COMMENT_MODERATION_TASK="0 * * * *" # hourly
CRON_FORCE_TRIGGERS_TASK="0 2 * * *" # daily 02:00
CRON_INACTIVE_USERS_TASK="0 6 * * *" # daily 06:00
CRON_SERVICE_SCORE_RECALC_TASK="*0 0 * * *" # dayly
CRON_SERVICE_SCORE_RECALC_ALL_TASK="0 0 * * *" # daily

View File

@@ -38,6 +38,7 @@ Required environment variables:
- `CRON_MODERATION_TASK`: Cron expression for comment moderation task
- `CRON_FORCE_TRIGGERS_TASK`: Cron expression for force triggers task
- `CRON_SERVICE_SCORE_RECALC_TASK`: Cron expression for service score recalculation task
- `CRON_INACTIVE_USERS_TASK`: Cron expression for inactive users cleanup task
## Usage
@@ -60,6 +61,9 @@ uv run -m pyworker force-triggers
# Run service score recalculation task
uv run -m pyworker service-score-recalc [--service-id ID]
# Run inactive users cleanup task
uv run -m pyworker inactive-users
```
### Worker Mode
@@ -106,6 +110,15 @@ Tasks will run according to their configured cron schedules.
- Calculates privacy, trust, and overall scores
- Scheduled via `CRON_SERVICE-SCORE-RECALC_TASK`
### Inactive Users Task
- Handles cleanup of inactive user accounts
- Identifies users who have been inactive for 1 year (no comments, votes, suggestions, and 0 karma)
- Sends deletion warning notifications at 30, 15, 5, and 1 day intervals
- Deletes accounts that remain inactive after the warning period
- Cancels deletion for users who become active again
- Scheduled via `CRON_INACTIVE_USERS_TASK`
## Development
### Project Structure
@@ -124,6 +137,7 @@ pyworker/
│ │ ├── base.py
│ │ ├── comment_moderation.py
│ │ ├── force_triggers.py
│ │ ├── inactive_users.py
│ │ ├── service_score_recalc.py
│ │ ├── tos_review.py
│ │ └── user_sentiment.py

View File

@@ -17,6 +17,7 @@ from pyworker.scheduler import TaskScheduler
from .tasks import (
CommentModerationTask,
ForceTriggersTask,
InactiveUsersTask,
ServiceScoreRecalculationTask,
TosReviewTask,
UserSentimentTask,
@@ -101,6 +102,12 @@ def parse_args(args: List[str]) -> argparse.Namespace:
help="Recalculate service scores for all services",
)
# Inactive users task
subparsers.add_parser(
"inactive-users",
help="Handle inactive users - send deletion warnings and clean up accounts",
)
return parser.parse_args(args)
@@ -371,6 +378,30 @@ def run_service_score_recalc_all_task() -> int:
return run_service_score_recalc_task(all_services=True)
def run_inactive_users_task() -> int:
"""
Run the inactive users task.
Returns:
Exit code.
"""
logger.info("Starting inactive users task")
try:
# Initialize task and use as context manager
with InactiveUsersTask() as task: # type: ignore
result = task.run() # type: ignore
logger.info(f"Inactive users task completed. Results: {result}")
return 0
except Exception as e:
logger.exception(f"Error running inactive users task: {e}")
return 1
finally:
# Ensure connection pool is closed even if an error occurs
close_db_pool()
def run_worker_mode() -> int:
"""
Run in worker mode, scheduling tasks to run periodically.
@@ -382,54 +413,37 @@ def run_worker_mode() -> int:
# Get task schedules from config
task_schedules = config.task_schedules
if not task_schedules:
logger.info(
"Found %s cron schedule%s from environment variables: %s",
len(task_schedules),
"s" if len(task_schedules) != 1 else "",
", ".join(task_schedules.keys()) if task_schedules else "<none>",
)
required_tasks: dict[str, Any] = {
"tosreview": run_tos_task,
"user_sentiment": run_sentiment_task,
"comment_moderation": run_moderation_task,
"force_triggers": run_force_triggers_task,
"inactive_users": run_inactive_users_task,
"service_score_recalc": run_service_score_recalc_task,
"service_score_recalc_all": run_service_score_recalc_all_task,
}
missing_tasks = [t for t in required_tasks if t not in task_schedules]
if missing_tasks:
logger.error(
"No task schedules defined. Set CRON_TASKNAME_TASK environment variables."
"Missing cron schedule for task%s: %s. Set the corresponding CRON_<TASKNAME>_TASK environment variable%s.",
"s" if len(missing_tasks) != 1 else "",
", ".join(missing_tasks),
"s" if len(missing_tasks) != 1 else "",
)
return 1
logger.info(
f"Found {len(task_schedules)} scheduled tasks: {', '.join(task_schedules.keys())}"
)
# Initialize the scheduler
scheduler = TaskScheduler()
# Register tasks with their schedules
for task_name, cron_expression in task_schedules.items():
if task_name.lower() == "tosreview":
scheduler.register_task(task_name, cron_expression, run_tos_task)
elif task_name.lower() == "user_sentiment":
scheduler.register_task(task_name, cron_expression, run_sentiment_task)
elif task_name.lower() == "comment_moderation":
scheduler.register_task(task_name, cron_expression, run_moderation_task)
elif task_name.lower() == "force_triggers":
scheduler.register_task(task_name, cron_expression, run_force_triggers_task)
elif task_name.lower() == "service_score_recalc":
scheduler.register_task(
task_name, cron_expression, run_service_score_recalc_task
)
elif task_name.lower() == "service_score_recalc_all":
scheduler.register_task(
task_name,
cron_expression,
run_service_score_recalc_all_task,
)
else:
logger.warning(f"Unknown task '{task_name}', skipping")
# Register service score recalculation task (every 5 minutes)
scheduler.register_task(
"service_score_recalc",
"*/5 * * * *",
run_service_score_recalc_task,
)
# Register daily service score recalculation for all services
scheduler.register_task(
"service_score_recalc_all",
"0 0 * * *",
run_service_score_recalc_all_task,
)
for task_name, task_callable in required_tasks.items():
scheduler.register_task(task_name, task_schedules[task_name], task_callable)
# Start the scheduler if tasks were registered
if scheduler.tasks:
@@ -484,6 +498,8 @@ def main() -> int:
)
elif args.task == "service-score-recalc-all":
return run_service_score_recalc_all_task()
elif args.task == "inactive-users":
return run_inactive_users_task()
elif args.task:
logger.error(f"Unknown task: {args.task}")
return 1

View File

@@ -14,6 +14,7 @@ from pyworker.database import close_db_pool
from .tasks import (
CommentModerationTask,
ForceTriggersTask,
InactiveUsersTask,
ServiceScoreRecalculationTask,
TosReviewTask,
UserSentimentTask,
@@ -80,6 +81,8 @@ class TaskScheduler:
task_instance = ForceTriggersTask()
elif task_name.lower() == "service_score_recalc":
task_instance = ServiceScoreRecalculationTask()
elif task_name.lower() == "inactive_users":
task_instance = InactiveUsersTask()
else:
self.logger.warning(f"Unknown task '{task_name}', skipping")
return

View File

@@ -3,6 +3,7 @@
from .base import Task
from .comment_moderation import CommentModerationTask
from .force_triggers import ForceTriggersTask
from .inactive_users import InactiveUsersTask
from .service_score_recalc import ServiceScoreRecalculationTask
from .tos_review import TosReviewTask
from .user_sentiment import UserSentimentTask
@@ -11,6 +12,7 @@ __all__ = [
"Task",
"CommentModerationTask",
"ForceTriggersTask",
"InactiveUsersTask",
"ServiceScoreRecalculationTask",
"TosReviewTask",
"UserSentimentTask",

View File

@@ -0,0 +1,258 @@
"""
Task for handling inactive users - sending deletion warnings and cleaning up accounts.
"""
from datetime import datetime, timedelta, date
from typing import Any, Dict, List, Optional
from pyworker.database import execute_db_command, run_db_query
from pyworker.tasks.base import Task
class InactiveUsersTask(Task):
"""Task for handling inactive users"""
def __init__(self):
"""Initialize the inactive users task."""
super().__init__("inactive_users")
def run(self) -> Dict[str, Any]:
"""
Run the inactive users task.
This task:
1. Identifies users who have been inactive for 1 year
2. Schedules them for deletion
3. Sends warning notifications at 30, 15, 5, and 1 day intervals
4. Deletes accounts that have reached their deletion date
"""
results = {
"users_scheduled_for_deletion": 0,
"notifications_sent": 0,
"accounts_deleted": 0,
"deletions_cancelled": 0,
}
# Step 1: Cancel deletion for users who became active again
results["deletions_cancelled"] = self._cancel_deletion_for_active_users()
# Step 2: Schedule new inactive users for deletion
results["users_scheduled_for_deletion"] = self._schedule_inactive_users_for_deletion()
# Step 3: Send warning notifications
results["notifications_sent"] = self._send_deletion_warnings()
# Step 4: Delete accounts that have reached their deletion date
results["accounts_deleted"] = self._delete_scheduled_accounts()
self.logger.info(
f"Inactive users task completed. "
f"Deletions cancelled: {results['deletions_cancelled']}, "
f"Scheduled: {results['users_scheduled_for_deletion']}, "
f"Notifications sent: {results['notifications_sent']}, "
f"Accounts deleted: {results['accounts_deleted']}"
)
return results
def _schedule_inactive_users_for_deletion(self) -> int:
"""
Schedule inactive users for deletion.
A user is considered inactive if:
- Account was created more than 1 year ago
- Has 0 karma
- Has no comments, comment votes, or service suggestions
- Is not scheduled for deletion already
- Is not an admin or moderator
"""
one_year_ago = datetime.now() - timedelta(days=365)
deletion_date = date.today() + timedelta(days=30) # 30 days from today
# Find inactive users
query = """
UPDATE "User"
SET "scheduledDeletionAt" = %s, "updatedAt" = NOW()
WHERE "id" IN (
SELECT u."id"
FROM "User" u
WHERE u."createdAt" < %s
AND u."scheduledDeletionAt" IS NULL
AND u."admin" = false
AND u."moderator" = false
AND u."totalKarma" = 0
AND NOT EXISTS (
SELECT 1 FROM "Comment" c WHERE c."authorId" = u."id"
)
AND NOT EXISTS (
SELECT 1 FROM "CommentVote" cv WHERE cv."userId" = u."id"
)
AND NOT EXISTS (
SELECT 1 FROM "ServiceSuggestion" ss WHERE ss."userId" = u."id"
)
)
"""
count = execute_db_command(query, (deletion_date, one_year_ago))
self.logger.info(f"Scheduled {count} inactive users for deletion on {deletion_date}")
return count
def _send_deletion_warnings(self) -> int:
"""
Send deletion warning notifications to users at appropriate intervals.
"""
today = date.today()
notifications_sent = 0
# Define warning intervals and their corresponding notification types
warning_intervals = [
(30, 'ACCOUNT_DELETION_WARNING_30_DAYS'),
(15, 'ACCOUNT_DELETION_WARNING_15_DAYS'),
(5, 'ACCOUNT_DELETION_WARNING_5_DAYS'),
(1, 'ACCOUNT_DELETION_WARNING_1_DAY'),
]
for days_before, notification_type in warning_intervals:
# Find users who should receive this warning (exact date match)
target_date = today + timedelta(days=days_before)
# Check if user is still inactive (no recent activity)
users_query = """
SELECT u."id", u."name", u."scheduledDeletionAt"
FROM "User" u
WHERE u."scheduledDeletionAt" = %s
AND u."admin" = false
AND u."moderator" = false
AND u."totalKarma" = 0
AND NOT EXISTS (
SELECT 1 FROM "Notification" n
WHERE n."userId" = u."id"
AND n."type" = %s
AND n."createdAt" > (u."scheduledDeletionAt" - INTERVAL '30 days')
)
-- Still check if user is inactive (no activity since being scheduled)
AND NOT EXISTS (
SELECT 1 FROM "Comment" c
WHERE c."authorId" = u."id"
AND c."createdAt" > (u."scheduledDeletionAt" - INTERVAL '30 days')
)
AND NOT EXISTS (
SELECT 1 FROM "CommentVote" cv
WHERE cv."userId" = u."id"
AND cv."createdAt" > (u."scheduledDeletionAt" - INTERVAL '30 days')
)
AND NOT EXISTS (
SELECT 1 FROM "ServiceSuggestion" ss
WHERE ss."userId" = u."id"
AND ss."createdAt" > (u."scheduledDeletionAt" - INTERVAL '30 days')
)
"""
users = run_db_query(users_query, (target_date, notification_type))
# Create notifications for these users
for user in users:
insert_notification_query = """
INSERT INTO "Notification" ("userId", "type", "createdAt", "updatedAt")
VALUES (%s, %s, NOW(), NOW())
ON CONFLICT DO NOTHING
"""
execute_db_command(insert_notification_query, (user['id'], notification_type))
notifications_sent += 1
self.logger.info(
f"Sent {notification_type} notification to user {user['name']} "
f"(ID: {user['id']}) scheduled for deletion on {user['scheduledDeletionAt']}"
)
return notifications_sent
def _delete_scheduled_accounts(self) -> int:
"""
Delete accounts that have reached their scheduled deletion date and are still inactive.
"""
today = date.today()
# Find users scheduled for deletion who are still inactive
users_to_delete_query = """
SELECT u."id", u."name", u."scheduledDeletionAt"
FROM "User" u
WHERE u."scheduledDeletionAt" <= %s
AND u."admin" = false
AND u."moderator" = false
AND u."totalKarma" = 0
-- Double-check they're still inactive
AND NOT EXISTS (
SELECT 1 FROM "Comment" c
WHERE c."authorId" = u."id"
AND c."createdAt" > (u."scheduledDeletionAt" - INTERVAL '30 days')
)
AND NOT EXISTS (
SELECT 1 FROM "CommentVote" cv
WHERE cv."userId" = u."id"
AND cv."createdAt" > (u."scheduledDeletionAt" - INTERVAL '30 days')
)
AND NOT EXISTS (
SELECT 1 FROM "ServiceSuggestion" ss
WHERE ss."userId" = u."id"
AND ss."createdAt" > (u."scheduledDeletionAt" - INTERVAL '30 days')
)
"""
users_to_delete = run_db_query(users_to_delete_query, (today,))
deleted_count = 0
for user in users_to_delete:
try:
# Delete the user (this will cascade and delete related records)
delete_query = 'DELETE FROM "User" WHERE "id" = %s'
execute_db_command(delete_query, (user['id'],))
deleted_count += 1
self.logger.info(
f"Deleted inactive user {user['name']} (ID: {user['id']}) "
f"scheduled for deletion on {user['scheduledDeletionAt']}"
)
except Exception as e:
self.logger.error(
f"Failed to delete user {user['name']} (ID: {user['id']}): {e}"
)
return deleted_count
def _cancel_deletion_for_active_users(self) -> int:
"""
Cancel scheduled deletion for users who have become active again.
"""
# Find users scheduled for deletion who have recent activity or gained karma
query = """
UPDATE "User"
SET "scheduledDeletionAt" = NULL, "updatedAt" = NOW()
WHERE "scheduledDeletionAt" IS NOT NULL
AND (
"totalKarma" > 0
OR EXISTS (
SELECT 1 FROM "Comment" c
WHERE c."authorId" = "User"."id"
AND c."createdAt" > ("User"."scheduledDeletionAt" - INTERVAL '30 days')
)
OR EXISTS (
SELECT 1 FROM "CommentVote" cv
WHERE cv."userId" = "User"."id"
AND cv."createdAt" > ("User"."scheduledDeletionAt" - INTERVAL '30 days')
)
OR EXISTS (
SELECT 1 FROM "ServiceSuggestion" ss
WHERE ss."userId" = "User"."id"
AND ss."createdAt" > ("User"."scheduledDeletionAt" - INTERVAL '30 days')
)
)
"""
count = execute_db_command(query)
if count > 0:
self.logger.info(f"Cancelled deletion for {count} users who became active again or gained karma")
return count