diff --git a/netbox/dcim/tests/test_views.py b/netbox/dcim/tests/test_views.py index b23f7e16d7..2e6d5ebc58 100644 --- a/netbox/dcim/tests/test_views.py +++ b/netbox/dcim/tests/test_views.py @@ -1078,14 +1078,14 @@ class ModuleTypeTestCase(ViewTestCases.PrimaryObjectViewTestCase): 'dcim.add_modulebaytemplate', ) + def verify_module_type_profile(scenario_name): + # TODO: remove extra regression asserts once parent test supports testing all import fields + fan_module_type = ModuleType.objects.get(part_number='generic-fan') + fan_module_type_profile = ModuleTypeProfile.objects.get(name='Fan') + assert fan_module_type.profile == fan_module_type_profile + # run base test - super().test_bulk_import_objects_with_permission() - - # TODO: remove extra regression asserts once parent test supports testing all import fields - fan_module_type = ModuleType.objects.get(part_number='generic-fan') - fan_module_type_profile = ModuleTypeProfile.objects.get(name='Fan') - - assert fan_module_type.profile == fan_module_type_profile + super().test_bulk_import_objects_with_permission(post_import_callback=verify_module_type_profile) @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], EXEMPT_EXCLUDE_MODELS=[]) def test_bulk_import_objects_with_constrained_permission(self): @@ -3290,8 +3290,10 @@ class CableTestCase( Device(name='Device 1', site=sites[0], device_type=devicetype, role=role), Device(name='Device 2', site=sites[0], device_type=devicetype, role=role), Device(name='Device 3', site=sites[0], device_type=devicetype, role=role), + Device(name='Device 4', site=sites[0], device_type=devicetype, role=role), # Create 'Device 1' assigned to 'Site 2' (allowed since the site is different) Device(name='Device 1', site=sites[1], device_type=devicetype, role=role), + Device(name='Device 5', site=sites[1], device_type=devicetype, role=role), ) Device.objects.bulk_create(devices) @@ -3300,22 +3302,36 @@ class CableTestCase( vc.save() interfaces = ( + # Device 1, Site 1 Interface(device=devices[0], name='Interface 1', type=InterfaceTypeChoices.TYPE_1GE_FIXED), Interface(device=devices[0], name='Interface 2', type=InterfaceTypeChoices.TYPE_1GE_FIXED), Interface(device=devices[0], name='Interface 3', type=InterfaceTypeChoices.TYPE_1GE_FIXED), + # Device 2, Site 1 Interface(device=devices[1], name='Interface 1', type=InterfaceTypeChoices.TYPE_1GE_FIXED), Interface(device=devices[1], name='Interface 2', type=InterfaceTypeChoices.TYPE_1GE_FIXED), Interface(device=devices[1], name='Interface 3', type=InterfaceTypeChoices.TYPE_1GE_FIXED), + # Device 3, Site 1 Interface(device=devices[2], name='Interface 1', type=InterfaceTypeChoices.TYPE_1GE_FIXED), Interface(device=devices[2], name='Interface 2', type=InterfaceTypeChoices.TYPE_1GE_FIXED), Interface(device=devices[2], name='Interface 3', type=InterfaceTypeChoices.TYPE_1GE_FIXED), + # Device 3, Site 1 Interface(device=devices[3], name='Interface 1', type=InterfaceTypeChoices.TYPE_1GE_FIXED), Interface(device=devices[3], name='Interface 2', type=InterfaceTypeChoices.TYPE_1GE_FIXED), Interface(device=devices[3], name='Interface 3', type=InterfaceTypeChoices.TYPE_1GE_FIXED), + # Device 1, Site 2 + Interface(device=devices[4], name='Interface 1', type=InterfaceTypeChoices.TYPE_1GE_FIXED), + Interface(device=devices[4], name='Interface 2', type=InterfaceTypeChoices.TYPE_1GE_FIXED), + Interface(device=devices[4], name='Interface 3', type=InterfaceTypeChoices.TYPE_1GE_FIXED), + + # Device 1, Site 2 + Interface(device=devices[5], name='Interface 1', type=InterfaceTypeChoices.TYPE_1GE_FIXED), + Interface(device=devices[5], name='Interface 2', type=InterfaceTypeChoices.TYPE_1GE_FIXED), + Interface(device=devices[5], name='Interface 3', type=InterfaceTypeChoices.TYPE_1GE_FIXED), + Interface(device=devices[1], name='Device 2 Interface', type=InterfaceTypeChoices.TYPE_1GE_FIXED), Interface(device=devices[2], name='Device 3 Interface', type=InterfaceTypeChoices.TYPE_1GE_FIXED), - Interface(device=devices[3], name='Interface 4', type=InterfaceTypeChoices.TYPE_1GE_FIXED), - Interface(device=devices[3], name='Interface 5', type=InterfaceTypeChoices.TYPE_1GE_FIXED), + Interface(device=devices[4], name='Interface 4', type=InterfaceTypeChoices.TYPE_1GE_FIXED), + Interface(device=devices[4], name='Interface 5', type=InterfaceTypeChoices.TYPE_1GE_FIXED), ) Interface.objects.bulk_create(interfaces) @@ -3342,16 +3358,29 @@ class CableTestCase( 'tags': [t.pk for t in tags], } - # Ensure that CSV bulk import supports assigning terminations from parent devices that share - # the same device name, provided those devices belong to different sites. - cls.csv_data = ( - "side_a_site,side_a_device,side_a_type,side_a_name,side_b_site,side_b_device,side_b_type,side_b_name", - "Site 1,Device 3,dcim.interface,Interface 1,Site 2,Device 1,dcim.interface,Interface 1", - "Site 1,Device 3,dcim.interface,Interface 2,Site 2,Device 1,dcim.interface,Interface 2", - "Site 1,Device 3,dcim.interface,Interface 3,Site 2,Device 1,dcim.interface,Interface 3", - "Site 1,Device 1,dcim.interface,Device 2 Interface,Site 2,Device 1,dcim.interface,Interface 4", - "Site 1,Device 1,dcim.interface,Device 3 Interface,Site 2,Device 1,dcim.interface,Interface 5", - ) + cls.csv_data = { + 'default': ( + "side_a_device,side_a_type,side_a_name,side_b_device,side_b_type,side_b_name", + "Device 4,dcim.interface,Interface 1,Device 5,dcim.interface,Interface 1", + "Device 3,dcim.interface,Interface 2,Device 4,dcim.interface,Interface 2", + "Device 3,dcim.interface,Interface 3,Device 4,dcim.interface,Interface 3", + + # The following is no longer possible in this scenario, because there are multiple + # devices named "Device 1" across multiple sites. See the "site-filtering" scenario + # below for how to specify a site for non-unique device names. + # "Device 1,dcim.interface,Device 3 Interface,Device 4,dcim.interface,Interface 5", + ), + 'site-filtering': ( + # Ensure that CSV bulk import supports assigning terminations from parent devices + # that share the same device name, provided those devices belong to different sites. + "side_a_site,side_a_device,side_a_type,side_a_name,side_b_site,side_b_device,side_b_type,side_b_name", + "Site 1,Device 3,dcim.interface,Interface 1,Site 2,Device 1,dcim.interface,Interface 1", + "Site 1,Device 3,dcim.interface,Interface 2,Site 2,Device 1,dcim.interface,Interface 2", + "Site 1,Device 3,dcim.interface,Interface 3,Site 2,Device 1,dcim.interface,Interface 3", + "Site 1,Device 1,dcim.interface,Device 2 Interface,Site 2,Device 1,dcim.interface,Interface 4", + "Site 1,Device 1,dcim.interface,Device 3 Interface,Site 2,Device 1,dcim.interface,Interface 5", + ) + } cls.csv_update_data = ( "id,label,color", diff --git a/netbox/utilities/testing/base.py b/netbox/utilities/testing/base.py index 6d17fa1ecd..1a0c3f46ba 100644 --- a/netbox/utilities/testing/base.py +++ b/netbox/utilities/testing/base.py @@ -1,9 +1,11 @@ import json +from contextlib import contextmanager from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.contenttypes.models import ContentType from django.contrib.postgres.fields import ArrayField, RangeField from django.core.exceptions import FieldDoesNotExist +from django.db import transaction from django.db.models import ManyToManyField, ManyToManyRel, JSONField from django.forms.models import model_to_dict from django.test import Client, TestCase as _TestCase @@ -36,6 +38,20 @@ class TestCase(_TestCase): self.client = Client() self.client.force_login(self.user) + @contextmanager + def cleanupSubTest(self, **params): + """ + Context manager that wraps subTest with automatic cleanup. + All database changes within the context will be rolled back. + """ + sid = transaction.savepoint() + + try: + with self.subTest(**params): + yield + finally: + transaction.savepoint_rollback(sid) + # # Permissions management # diff --git a/netbox/utilities/testing/views.py b/netbox/utilities/testing/views.py index 99a6dd43ae..f00b21d083 100644 --- a/netbox/utilities/testing/views.py +++ b/netbox/utilities/testing/views.py @@ -152,7 +152,6 @@ class ViewTestCases: @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], EXEMPT_EXCLUDE_MODELS=[]) def test_create_object_with_permission(self): - # Assign unconstrained permission obj_perm = ObjectPermission( name='Test permission', @@ -586,19 +585,59 @@ class ViewTestCases: response = self.client.post(**request) self.assertHttpStatus(response, 302) self.assertEqual(initial_count + self.bulk_create_count, self._get_queryset().count()) - for instance in self._get_queryset().order_by('-pk')[:self.bulk_create_count]: + for instance in self._get_queryset().order_by('-pk')[: self.bulk_create_count]: self.assertInstanceEqual(instance, self.bulk_create_data, exclude=self.validation_excluded_fields) class BulkImportObjectsViewTestCase(ModelViewTestCase): """ Create multiple instances from imported data. - :csv_data: A list of CSV-formatted lines (starting with the headers) to be used for bulk object import. + :csv_data: CSV data for bulk import testing. Supports two formats: + + 1. Tuple/list format (backwards compatible): + csv_data = ( + "name,slug,description", + "Object 1,object-1,First object", + "Object 2,object-2,Second object", + ) + + 2. Dictionary format for multiple scenarios: + csv_data = { + 'default': ( + "name,slug,description", + "Object 1,object-1,First object", + ), + 'with_optional_fields': ( + "name,slug,description,comments", + "Object 2,object-2,Second object,With comments", + ) + } + + When using dictionary format, test_bulk_import_objects_with_permission() + runs each scenario as a separate subtest with clear output: + + test_bulk_import_objects_with_permission (scenario=default) ... ok + test_bulk_import_objects_with_permission (scenario=with_optional_fields) ... ok """ + csv_data = () - def _get_csv_data(self): - return '\n'.join(self.csv_data) + def get_scenarios(self): + return self.csv_data.keys() if isinstance(self.csv_data, dict) else ['default'] + + def _get_csv_data(self, scenario_name='default'): + """ + Get CSV data for testing. Supports both tuple/list and dictionary formats. + """ + if isinstance(self.csv_data, dict): + if scenario_name not in self.csv_data: + available = ', '.join(self.csv_data.keys()) + raise ValueError(f"Scenario '{scenario_name}' not found in csv_data. Available: {available}") + return '\n'.join(self.csv_data[scenario_name]) + elif isinstance(self.csv_data, (tuple, list)): + return '\n'.join(self.csv_data) + else: + raise TypeError(f'csv_data must be a tuple, list, or dictionary, got {type(self.csv_data)}') def _get_update_csv_data(self): return self.csv_update_data, '\n'.join(self.csv_update_data) @@ -620,10 +659,36 @@ class ViewTestCases: self.assertHttpStatus(response, 403) @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], EXEMPT_EXCLUDE_MODELS=[]) - def test_bulk_import_objects_with_permission(self): + def test_bulk_import_objects_with_permission(self, post_import_callback=None): + # Assign model-level permission once for all scenarios + obj_perm = ObjectPermission(name='Test permission', actions=['add']) + obj_perm.save() + obj_perm.users.add(self.user) + obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model)) + + # Try GET with model-level permission (only once) + self.assertHttpStatus(self.client.get(self._get_url('bulk_import')), 200) + + # Test each scenario + for scenario_name in self.get_scenarios(): + with self.cleanupSubTest(scenario=scenario_name): + self._test_bulk_import_with_permission_scenario(scenario_name) + + if post_import_callback: + post_import_callback(scenario_name) + + def _test_bulk_import_with_permission_scenario(self, scenario_name): + """ + Helper method to test a single bulk import scenario. + """ initial_count = self._get_queryset().count() + + # Get CSV data for this scenario + scenario_data = self._get_csv_data(scenario_name) + expected_new_objects = len(scenario_data.splitlines()) - 1 + data = { - 'data': self._get_csv_data(), + 'data': scenario_data, 'format': ImportFormatChoices.CSV, 'csv_delimiter': CSVDelimiterChoices.AUTO, } @@ -632,33 +697,24 @@ class ViewTestCases: if issubclass(self.model, ChangeLoggingMixin): data['changelog_message'] = get_random_string(10) - # Assign model-level permission - obj_perm = ObjectPermission( - name='Test permission', - actions=['add'] - ) - obj_perm.save() - obj_perm.users.add(self.user) - obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model)) - - # Try GET with model-level permission - self.assertHttpStatus(self.client.get(self._get_url('bulk_import')), 200) - # Test POST with permission response = self.client.post(self._get_url('bulk_import'), data) self.assertHttpStatus(response, 302) - self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1) + + # Verify object count increase + self.assertEqual(self._get_queryset().count(), initial_count + expected_new_objects) # Verify ObjectChange creation if issubclass(self.model, ChangeLoggingMixin): request_id = response.headers.get('X-Request-ID') - self.assertIsNotNone(request_id, "Unable to determine request ID from response") + self.assertIsNotNone(request_id, 'Unable to determine request ID from response') objectchanges = ObjectChange.objects.filter( changed_object_type=ContentType.objects.get_for_model(self.model), request_id=request_id, action=ObjectChangeActionChoices.ACTION_CREATE, ) - self.assertEqual(len(objectchanges), len(self.csv_data) - 1) + self.assertEqual(len(objectchanges), expected_new_objects) + for oc in objectchanges: self.assertEqual(oc.message, data['changelog_message']) @@ -701,35 +757,52 @@ class ViewTestCases: self.assertEqual(value, value) @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], EXEMPT_EXCLUDE_MODELS=[]) - def test_bulk_import_objects_with_constrained_permission(self): - initial_count = self._get_queryset().count() - data = { - 'data': self._get_csv_data(), - 'format': ImportFormatChoices.CSV, - 'csv_delimiter': CSVDelimiterChoices.AUTO, - } - - # Assign constrained permission + def test_bulk_import_objects_with_constrained_permission(self, post_import_callback=None): + # Assign constrained permission (deny all initially) obj_perm = ObjectPermission( name='Test permission', constraints={'pk': 0}, # Dummy permission to deny all - actions=['add'] + actions=['add'], ) obj_perm.save() obj_perm.users.add(self.user) obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model)) - # Attempt to import non-permitted objects + # Test each scenario with constrained permissions + for scenario_name in self.get_scenarios(): + with self.cleanupSubTest(scenario=scenario_name): + self._test_bulk_import_constrained_scenario(scenario_name, obj_perm) + + if post_import_callback: + post_import_callback(scenario_name) + + def _test_bulk_import_constrained_scenario(self, scenario_name, obj_perm): + """ + Helper method to test a single bulk import scenario with constrained permissions. + """ + initial_count = self._get_queryset().count() + + # Get CSV data for this scenario + scenario_data = self._get_csv_data(scenario_name) + expected_new_objects = len(scenario_data.splitlines()) - 1 + + data = { + 'data': scenario_data, + 'format': ImportFormatChoices.CSV, + 'csv_delimiter': CSVDelimiterChoices.AUTO, + } + + # Attempt to import non-permitted objects (should fail) self.assertHttpStatus(self.client.post(self._get_url('bulk_import'), data), 200) self.assertEqual(self._get_queryset().count(), initial_count) - # Update permission constraints + # Update permission constraints to allow all obj_perm.constraints = {'pk__gt': 0} # Dummy permission to allow all obj_perm.save() - # Import permitted objects + # Import permitted objects (should succeed) self.assertHttpStatus(self.client.post(self._get_url('bulk_import'), data), 302) - self.assertEqual(self._get_queryset().count(), initial_count + len(self.csv_data) - 1) + self.assertEqual(self._get_queryset().count(), initial_count + expected_new_objects) class BulkEditObjectsViewTestCase(ModelViewTestCase): """