Release 202506241430
This commit is contained in:
@@ -95,6 +95,12 @@ def parse_args(args: List[str]) -> argparse.Namespace:
|
||||
help="Recalculate scores for all services (ignores --service-id)",
|
||||
)
|
||||
|
||||
# Service Score Recalculation task for all services
|
||||
subparsers.add_parser(
|
||||
"service-score-recalc-all",
|
||||
help="Recalculate service scores for all services",
|
||||
)
|
||||
|
||||
return parser.parse_args(args)
|
||||
|
||||
|
||||
@@ -358,6 +364,13 @@ def run_service_score_recalc_task(
|
||||
close_db_pool()
|
||||
|
||||
|
||||
def run_service_score_recalc_all_task() -> int:
|
||||
"""
|
||||
Run the service score recalculation task for all services.
|
||||
"""
|
||||
return run_service_score_recalc_task(all_services=True)
|
||||
|
||||
|
||||
def run_worker_mode() -> int:
|
||||
"""
|
||||
Run in worker mode, scheduling tasks to run periodically.
|
||||
@@ -396,6 +409,12 @@ def run_worker_mode() -> int:
|
||||
scheduler.register_task(
|
||||
task_name, cron_expression, run_service_score_recalc_task
|
||||
)
|
||||
elif task_name.lower() == "service_score_recalc_all":
|
||||
scheduler.register_task(
|
||||
task_name,
|
||||
cron_expression,
|
||||
run_service_score_recalc_all_task,
|
||||
)
|
||||
else:
|
||||
logger.warning(f"Unknown task '{task_name}', skipping")
|
||||
|
||||
@@ -405,6 +424,12 @@ def run_worker_mode() -> int:
|
||||
"*/5 * * * *",
|
||||
run_service_score_recalc_task,
|
||||
)
|
||||
# Register daily service score recalculation for all services
|
||||
scheduler.register_task(
|
||||
"service_score_recalc_all",
|
||||
"0 0 * * *",
|
||||
run_service_score_recalc_all_task,
|
||||
)
|
||||
|
||||
# Start the scheduler if tasks were registered
|
||||
if scheduler.tasks:
|
||||
@@ -457,6 +482,8 @@ def main() -> int:
|
||||
return run_service_score_recalc_task(
|
||||
args.service_id, getattr(args, "all", False)
|
||||
)
|
||||
elif args.task == "service-score-recalc-all":
|
||||
return run_service_score_recalc_all_task()
|
||||
elif args.task:
|
||||
logger.error(f"Unknown task: {args.task}")
|
||||
return 1
|
||||
|
||||
@@ -62,25 +62,27 @@ class TaskScheduler:
|
||||
cron_expression: Cron expression defining the schedule.
|
||||
task_func: Function to execute.
|
||||
*args: Arguments to pass to the task function.
|
||||
**kwargs: Keyword arguments to pass to the task function.
|
||||
**kwargs: Keyword arguments to pass to the task function. `instantiate` is a special kwarg.
|
||||
"""
|
||||
instantiate = kwargs.pop("instantiate", True)
|
||||
# Declare task_instance variable with type annotation upfront
|
||||
task_instance: Any = None
|
||||
|
||||
# Initialize the appropriate task class based on the task name
|
||||
if task_name.lower() == "tosreview":
|
||||
task_instance = TosReviewTask()
|
||||
elif task_name.lower() == "user_sentiment":
|
||||
task_instance = UserSentimentTask()
|
||||
elif task_name.lower() == "comment_moderation":
|
||||
task_instance = CommentModerationTask()
|
||||
elif task_name.lower() == "force_triggers":
|
||||
task_instance = ForceTriggersTask()
|
||||
elif task_name.lower() == "service_score_recalc":
|
||||
task_instance = ServiceScoreRecalculationTask()
|
||||
else:
|
||||
self.logger.warning(f"Unknown task '{task_name}', skipping")
|
||||
return
|
||||
if instantiate:
|
||||
# Initialize the appropriate task class based on the task name
|
||||
if task_name.lower() == "tosreview":
|
||||
task_instance = TosReviewTask()
|
||||
elif task_name.lower() == "user_sentiment":
|
||||
task_instance = UserSentimentTask()
|
||||
elif task_name.lower() == "comment_moderation":
|
||||
task_instance = CommentModerationTask()
|
||||
elif task_name.lower() == "force_triggers":
|
||||
task_instance = ForceTriggersTask()
|
||||
elif task_name.lower() == "service_score_recalc":
|
||||
task_instance = ServiceScoreRecalculationTask()
|
||||
else:
|
||||
self.logger.warning(f"Unknown task '{task_name}', skipping")
|
||||
return
|
||||
|
||||
self.tasks[task_name] = {
|
||||
"cron": cron_expression,
|
||||
@@ -126,8 +128,12 @@ class TaskScheduler:
|
||||
self.logger.info(f"Running task '{task_name}'")
|
||||
# Use task instance as a context manager to ensure
|
||||
# a single database connection is used for the entire task
|
||||
with task_info["instance"]:
|
||||
# Execute the registered task function with its arguments
|
||||
if task_info["instance"]:
|
||||
with task_info["instance"]:
|
||||
# Execute the registered task function with its arguments
|
||||
task_info["func"](*task_info["args"], **task_info["kwargs"])
|
||||
else:
|
||||
# Execute the registered task function without a context manager
|
||||
task_info["func"](*task_info["args"], **task_info["kwargs"])
|
||||
self.logger.info(f"Task '{task_name}' completed")
|
||||
except Exception as e:
|
||||
|
||||
Reference in New Issue
Block a user