netbox/netbox/extras/events.py

201 lines
6.8 KiB
Python
Raw Normal View History

import logging
15692: Introduce background jobs (#16927) * Introduce reusable BackgroundJob framework A new abstract class can be used to implement job function classes. It handles the necessary logic for starting and stopping jobs, including exception handling and rescheduling of recurring jobs. This commit also includes the migration of data source jobs to the new framework. * Restore using import_string for jobs Using the 'import_string()' utility from Django allows the job script class to be simplified, as module imports no longer need to avoid loops. This should make it easier to queue and maintain jobs. * Use SyncDataSourceJob for management command Instead of maintaining two separate job execution logics, the same job is now used for both background and interactive execution. * Implement BackgroundJob for running scripts The independent implementations of interactive and background script execution have been merged into a single BackgroundJob implementation. * Fix documentation of model features * Ensure consitent code style * Introduce reusable ScheduledJob A new abstract class can be used to implement job function classes that specialize in scheduling. These use the same logic as regular BackgroundJobs, but ensure that they are only scheduled once at any given time. * Introduce reusable SystemJob A new abstract class can be used to implement job function classes that specialize in system background tasks (e.g. synchronization or housekeeping). In addition to the features of the BackgroundJob and ScheduledJob classes, these implement additional logic to not need to be bound to an existing NetBox object and to setup job schedules on plugin load instead of an interactive request. * Add documentation for jobs framework * Revert "Use SyncDataSourceJob for management" This partially reverts commit db591d4. The 'run_now' parameter of 'enqueue()' remains, as its being used by following commits. * Merge enqueued status into JobStatusChoices * Fix logger for ScriptJob * Remove job name for scripts Because scripts are already linked through the Job Instance field, the name is displayed twice. Removing this reduces redundancy and opens up the possibility of simplifying the BackgroundJob framework in future commits. * Merge ScheduledJob into BackgroundJob Instead of using separate classes, the logic of ScheduledJob is now merged into the generic BackgroundJob class. This allows reusing the same logic, but dynamically deciding whether to enqueue the same job once or multiple times. * Add name attribute for BackgroundJob Instead of defining individual names on enqueue, BackgroundJob classes can now set a job name in their meta class. This is equivalent to other Django classes and NetBox scripts. * Drop enqueue_sync_job() method from DataSource * Import ScriptJob directly * Relax requirement for Jobs to reference a specific object * Rename 'run_now' arg on Job.enqueue() to 'immediate' * Fix queue lookup in Job enqueue * Collapse SystemJob into BackgroundJob * Remove legacy JobResultStatusChoices ChoiceSet was moved to core in 40572b5. * Use queue 'low' for system jobs by default System jobs usually perform low-priority background tasks and therefore can use a different queue than 'default', which is used for regular jobs related to specific objects. * Add test cases for BackgroundJob handling * Fix enqueue interval jobs As the job's name is set by enqueue(), it must not be passed in handle() to avoid duplicate kwargs with the same name. * Honor schedule_at for job's enqueue_once Not only can a job's interval change, but so can the time at which it is scheduled to run. If a specific scheduled time is set, it will also be checked against the current job schedule. If there are any changes, the job is rescheduled with the new time. * Switch BackgroundJob to regular methods Instead of using a class method for run(), a regular method is used for this purpose. This gives the possibility to add more convenience methods in the future, e.g. for interacting with the job object or for logging, as implemented for scripts. * Fix background tasks documentation * Test enqueue in combination with enqueue_once * Rename background jobs to tasks (to differentiate from RQ) * Touch up docs * Revert "Use queue 'low' for system jobs by default" This reverts commit b17b2050df953427769ab6a337c6995ac2a60615. * Remove system background job This commit reverts commits 4880d81 and 0b15ecf. Using the database 'connection_created' signal for job registration feels a little wrong at this point, as it would trigger registration very often. However, the background job framework is prepared for this use case and can be used by plugins once the auto-registration of jobs is solved. * Fix runscript management command Defining names for background jobs was disabled with fb75389. The preceeding changes in 257976d did forget the management command. * Use regular imports for ScriptJob * Rename BackgroundJob to JobRunner --------- Co-authored-by: Jeremy Stretch <jstretch@netboxlabs.com>
2024-07-30 19:31:21 +02:00
from collections import defaultdict
from django.conf import settings
from django.utils import timezone
from django.utils.module_loading import import_string
from django.utils.translation import gettext as _
from django_rq import get_queue
from core.events import *
from core.models import ObjectType
from netbox.config import get_config
from netbox.constants import RQ_QUEUE_DEFAULT
from netbox.models.features import has_feature
from users.models import User
from utilities.api import get_serializer_for_model
from utilities.request import copy_safe_request
from utilities.rqworker import get_rq_retry
from utilities.serialization import serialize_object
from .choices import EventRuleActionChoices
from .models import EventRule
logger = logging.getLogger('netbox.events_processor')
def serialize_for_event(instance):
"""
Return a serialized representation of the given instance suitable for use in a queued event.
"""
serializer_class = get_serializer_for_model(instance.__class__)
serializer_context = {
'request': None,
}
serializer = serializer_class(instance, context=serializer_context)
return serializer.data
def get_snapshots(instance, event_type):
snapshots = {
'prechange': getattr(instance, '_prechange_snapshot', None),
'postchange': None,
}
if event_type != OBJECT_DELETED:
# Use model's serialize_object() method if defined; fall back to serialize_object() utility function
if hasattr(instance, 'serialize_object'):
snapshots['postchange'] = instance.serialize_object()
else:
snapshots['postchange'] = serialize_object(instance)
return snapshots
def enqueue_event(queue, instance, request, event_type):
"""
Enqueue a serialized representation of a created/updated/deleted object for the processing of
events once the request has completed.
"""
# Bail if this type of object does not support event rules
if not has_feature(instance, 'event_rules'):
return
app_label = instance._meta.app_label
model_name = instance._meta.model_name
assert instance.pk is not None
key = f'{app_label}.{model_name}:{instance.pk}'
if key in queue:
queue[key]['data'] = serialize_for_event(instance)
queue[key]['snapshots']['postchange'] = get_snapshots(instance, event_type)['postchange']
# If the object is being deleted, update any prior "update" event to "delete"
if event_type == OBJECT_DELETED:
queue[key]['event_type'] = event_type
else:
queue[key] = {
'object_type': ObjectType.objects.get_for_model(instance),
'object_id': instance.pk,
'event_type': event_type,
'data': serialize_for_event(instance),
'snapshots': get_snapshots(instance, event_type),
'request': request,
# Legacy request attributes for backward compatibility
'username': request.user.username,
'request_id': request.id,
}
def process_event_rules(event_rules, object_type, event_type, data, username=None, snapshots=None, request=None):
user = User.objects.get(username=username) if username else None
for event_rule in event_rules:
# Evaluate event rule conditions (if any)
if not event_rule.eval_conditions(data):
continue
# Compile event data
event_data = event_rule.action_data or {}
event_data.update(data)
# Webhooks
if event_rule.action_type == EventRuleActionChoices.WEBHOOK:
# Select the appropriate RQ queue
queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT)
rq_queue = get_queue(queue_name)
# Compile the task parameters
params = {
"event_rule": event_rule,
"object_type": object_type,
"event_type": event_type,
"data": event_data,
"snapshots": snapshots,
"timestamp": timezone.now().isoformat(),
"username": username,
"retry": get_rq_retry()
}
if snapshots:
params["snapshots"] = snapshots
if request:
params["request"] = copy_safe_request(request)
# Enqueue the task
rq_queue.enqueue(
"extras.webhooks.send_webhook",
**params
)
# Scripts
elif event_rule.action_type == EventRuleActionChoices.SCRIPT:
# Resolve the script from action parameters
script = event_rule.action_object.python_class()
# Enqueue a Job to record the script's execution
15692: Introduce background jobs (#16927) * Introduce reusable BackgroundJob framework A new abstract class can be used to implement job function classes. It handles the necessary logic for starting and stopping jobs, including exception handling and rescheduling of recurring jobs. This commit also includes the migration of data source jobs to the new framework. * Restore using import_string for jobs Using the 'import_string()' utility from Django allows the job script class to be simplified, as module imports no longer need to avoid loops. This should make it easier to queue and maintain jobs. * Use SyncDataSourceJob for management command Instead of maintaining two separate job execution logics, the same job is now used for both background and interactive execution. * Implement BackgroundJob for running scripts The independent implementations of interactive and background script execution have been merged into a single BackgroundJob implementation. * Fix documentation of model features * Ensure consitent code style * Introduce reusable ScheduledJob A new abstract class can be used to implement job function classes that specialize in scheduling. These use the same logic as regular BackgroundJobs, but ensure that they are only scheduled once at any given time. * Introduce reusable SystemJob A new abstract class can be used to implement job function classes that specialize in system background tasks (e.g. synchronization or housekeeping). In addition to the features of the BackgroundJob and ScheduledJob classes, these implement additional logic to not need to be bound to an existing NetBox object and to setup job schedules on plugin load instead of an interactive request. * Add documentation for jobs framework * Revert "Use SyncDataSourceJob for management" This partially reverts commit db591d4. The 'run_now' parameter of 'enqueue()' remains, as its being used by following commits. * Merge enqueued status into JobStatusChoices * Fix logger for ScriptJob * Remove job name for scripts Because scripts are already linked through the Job Instance field, the name is displayed twice. Removing this reduces redundancy and opens up the possibility of simplifying the BackgroundJob framework in future commits. * Merge ScheduledJob into BackgroundJob Instead of using separate classes, the logic of ScheduledJob is now merged into the generic BackgroundJob class. This allows reusing the same logic, but dynamically deciding whether to enqueue the same job once or multiple times. * Add name attribute for BackgroundJob Instead of defining individual names on enqueue, BackgroundJob classes can now set a job name in their meta class. This is equivalent to other Django classes and NetBox scripts. * Drop enqueue_sync_job() method from DataSource * Import ScriptJob directly * Relax requirement for Jobs to reference a specific object * Rename 'run_now' arg on Job.enqueue() to 'immediate' * Fix queue lookup in Job enqueue * Collapse SystemJob into BackgroundJob * Remove legacy JobResultStatusChoices ChoiceSet was moved to core in 40572b5. * Use queue 'low' for system jobs by default System jobs usually perform low-priority background tasks and therefore can use a different queue than 'default', which is used for regular jobs related to specific objects. * Add test cases for BackgroundJob handling * Fix enqueue interval jobs As the job's name is set by enqueue(), it must not be passed in handle() to avoid duplicate kwargs with the same name. * Honor schedule_at for job's enqueue_once Not only can a job's interval change, but so can the time at which it is scheduled to run. If a specific scheduled time is set, it will also be checked against the current job schedule. If there are any changes, the job is rescheduled with the new time. * Switch BackgroundJob to regular methods Instead of using a class method for run(), a regular method is used for this purpose. This gives the possibility to add more convenience methods in the future, e.g. for interacting with the job object or for logging, as implemented for scripts. * Fix background tasks documentation * Test enqueue in combination with enqueue_once * Rename background jobs to tasks (to differentiate from RQ) * Touch up docs * Revert "Use queue 'low' for system jobs by default" This reverts commit b17b2050df953427769ab6a337c6995ac2a60615. * Remove system background job This commit reverts commits 4880d81 and 0b15ecf. Using the database 'connection_created' signal for job registration feels a little wrong at this point, as it would trigger registration very often. However, the background job framework is prepared for this use case and can be used by plugins once the auto-registration of jobs is solved. * Fix runscript management command Defining names for background jobs was disabled with fb75389. The preceeding changes in 257976d did forget the management command. * Use regular imports for ScriptJob * Rename BackgroundJob to JobRunner --------- Co-authored-by: Jeremy Stretch <jstretch@netboxlabs.com>
2024-07-30 19:31:21 +02:00
from extras.jobs import ScriptJob
ScriptJob.enqueue(
instance=event_rule.action_object,
name=script.name,
user=user,
data=event_data
)
# Notification groups
elif event_rule.action_type == EventRuleActionChoices.NOTIFICATION:
# Bulk-create notifications for all members of the notification group
event_rule.action_object.notify(
object_type=object_type,
object_id=event_data['id'],
object_repr=event_data.get('display'),
event_type=event_type
)
else:
raise ValueError(_("Unknown action type for an event rule: {action_type}").format(
action_type=event_rule.action_type
))
def process_event_queue(events):
"""
Flush a list of object representation to RQ for EventRule processing.
"""
events_cache = defaultdict(dict)
for event in events:
event_type = event['event_type']
object_type = event['object_type']
# Cache applicable Event Rules
if object_type not in events_cache[event_type]:
events_cache[event_type][object_type] = EventRule.objects.filter(
event_types__contains=[event['event_type']],
object_types=object_type,
enabled=True
)
event_rules = events_cache[event_type][object_type]
process_event_rules(
event_rules=event_rules,
object_type=object_type,
event_type=event['event_type'],
data=event['data'],
username=event['username'],
snapshots=event['snapshots'],
request=event['request'],
)
def flush_events(events):
"""
Flush a list of object representations to RQ for event processing.
"""
if events:
for name in settings.EVENTS_PIPELINE:
try:
func = import_string(name)
func(events)
except ImportError as e:
logger.error(_("Cannot import events pipeline {name} error: {error}").format(name=name, error=e))