From 1ee23ba6fa206668186baf8a0e3d2c140f85b333 Mon Sep 17 00:00:00 2001 From: Jeremy Stretch Date: Thu, 2 Oct 2025 15:04:29 -0400 Subject: [PATCH] Initial work on #20210 --- contrib/openapi.json | 235 ++++++++++++++++-- docs/integrations/rest-api.md | 23 +- netbox/account/tables.py | 3 +- netbox/account/views.py | 2 +- netbox/core/tests/test_api.py | 4 +- netbox/netbox/api/authentication.py | 108 +++++--- netbox/netbox/configuration_testing.py | 4 + netbox/netbox/settings.py | 8 + netbox/netbox/tests/test_authentication.py | 124 +++++++-- netbox/templates/users/token.html | 19 +- netbox/users/api/serializers_/tokens.py | 28 +-- netbox/users/choices.py | 17 ++ netbox/users/constants.py | 4 + netbox/users/filtersets.py | 2 +- netbox/users/forms/bulk_import.py | 15 +- netbox/users/forms/filtersets.py | 7 +- netbox/users/forms/model_forms.py | 35 ++- .../users/migrations/0014_users_token_v2.py | 65 +++++ netbox/users/models/tokens.py | 140 +++++++++-- netbox/users/tables.py | 3 +- netbox/users/tests/test_api.py | 6 +- netbox/users/tests/test_filtersets.py | 30 ++- netbox/users/tests/test_views.py | 31 +-- netbox/users/utils.py | 17 ++ netbox/utilities/testing/api.py | 5 +- netbox/utilities/testing/views.py | 24 +- 26 files changed, 787 insertions(+), 172 deletions(-) create mode 100644 netbox/users/choices.py create mode 100644 netbox/users/migrations/0014_users_token_v2.py diff --git a/contrib/openapi.json b/contrib/openapi.json index 839aba0b4d..3618a36afb 100644 --- a/contrib/openapi.json +++ b/contrib/openapi.json @@ -166111,6 +166111,91 @@ "type": "string" } }, + { + "in": "query", + "name": "pepper", + "schema": { + "type": "array", + "items": { + "type": "integer", + "format": "int32" + } + }, + "explode": true, + "style": "form" + }, + { + "in": "query", + "name": "pepper__empty", + "schema": { + "type": "boolean" + } + }, + { + "in": "query", + "name": "pepper__gt", + "schema": { + "type": "array", + "items": { + "type": "integer", + "format": "int32" + } + }, + "explode": true, + "style": "form" + }, + { + "in": "query", + "name": "pepper__gte", + "schema": { + "type": "array", + "items": { + "type": "integer", + "format": "int32" + } + }, + "explode": true, + "style": "form" + }, + { + "in": "query", + "name": "pepper__lt", + "schema": { + "type": "array", + "items": { + "type": "integer", + "format": "int32" + } + }, + "explode": true, + "style": "form" + }, + { + "in": "query", + "name": "pepper__lte", + "schema": { + "type": "array", + "items": { + "type": "integer", + "format": "int32" + } + }, + "explode": true, + "style": "form" + }, + { + "in": "query", + "name": "pepper__n", + "schema": { + "type": "array", + "items": { + "type": "integer", + "format": "int32" + } + }, + "explode": true, + "style": "form" + }, { "in": "query", "name": "q", @@ -166171,6 +166256,19 @@ "explode": true, "style": "form" }, + { + "in": "query", + "name": "version", + "schema": { + "type": "integer", + "x-spec-enum-id": "b5df70f0bffd12cb", + "enum": [ + 1, + 2 + ] + }, + "description": "* `1` - v1\n* `2` - v2" + }, { "in": "query", "name": "write_enabled", @@ -228068,6 +228166,17 @@ "type": "object", "description": "Extends the built-in ModelSerializer to enforce calling full_clean() on a copy of the associated instance during\nvalidation. (DRF does not do this by default; see https://github.com/encode/django-rest-framework/issues/3144)", "properties": { + "version": { + "enum": [ + 1, + 2 + ], + "type": "integer", + "description": "* `1` - v1\n* `2` - v2", + "x-spec-enum-id": "b5df70f0bffd12cb", + "minimum": 0, + "maximum": 32767 + }, "user": { "oneOf": [ { @@ -228078,6 +228187,10 @@ } ] }, + "description": { + "type": "string", + "maxLength": 200 + }, "expires": { "type": "string", "format": "date-time", @@ -228088,19 +228201,20 @@ "format": "date-time", "nullable": true }, - "key": { - "type": "string", - "writeOnly": true, - "maxLength": 40, - "minLength": 40 - }, "write_enabled": { "type": "boolean", "description": "Permit create/update/delete operations using this key" }, - "description": { + "pepper": { + "type": "integer", + "maximum": 32767, + "minimum": 0, + "nullable": true, + "description": "ID of the cryptographic pepper used to hash the token (v2 only)" + }, + "token": { "type": "string", - "maxLength": 200 + "minLength": 1 } } }, @@ -244302,9 +244416,30 @@ "type": "string", "readOnly": true }, + "version": { + "enum": [ + 1, + 2 + ], + "type": "integer", + "description": "* `1` - v1\n* `2` - v2", + "x-spec-enum-id": "b5df70f0bffd12cb", + "minimum": 0, + "maximum": 32767 + }, + "key": { + "type": "string", + "readOnly": true, + "nullable": true, + "description": "v2 token identification key" + }, "user": { "$ref": "#/components/schemas/BriefUser" }, + "description": { + "type": "string", + "maxLength": 200 + }, "created": { "type": "string", "format": "date-time", @@ -244324,9 +244459,15 @@ "type": "boolean", "description": "Permit create/update/delete operations using this key" }, - "description": { - "type": "string", - "maxLength": 200 + "pepper": { + "type": "integer", + "maximum": 32767, + "minimum": 0, + "nullable": true, + "description": "ID of the cryptographic pepper used to hash the token (v2 only)" + }, + "token": { + "type": "string" } }, "required": [ @@ -244334,6 +244475,7 @@ "display", "display_url", "id", + "key", "url", "user" ] @@ -244360,6 +244502,17 @@ "type": "string", "readOnly": true }, + "version": { + "enum": [ + 1, + 2 + ], + "type": "integer", + "description": "* `1` - v1\n* `2` - v2", + "x-spec-enum-id": "b5df70f0bffd12cb", + "minimum": 0, + "maximum": 32767 + }, "user": { "allOf": [ { @@ -244368,6 +244521,10 @@ ], "readOnly": true }, + "key": { + "type": "string", + "readOnly": true + }, "created": { "type": "string", "format": "date-time", @@ -244383,10 +244540,6 @@ "format": "date-time", "readOnly": true }, - "key": { - "type": "string", - "readOnly": true - }, "write_enabled": { "type": "boolean", "description": "Permit create/update/delete operations using this key" @@ -244394,6 +244547,9 @@ "description": { "type": "string", "maxLength": 200 + }, + "token": { + "type": "string" } }, "required": [ @@ -244411,6 +244567,17 @@ "type": "object", "description": "Extends the built-in ModelSerializer to enforce calling full_clean() on a copy of the associated instance during\nvalidation. (DRF does not do this by default; see https://github.com/encode/django-rest-framework/issues/3144)", "properties": { + "version": { + "enum": [ + 1, + 2 + ], + "type": "integer", + "description": "* `1` - v1\n* `2` - v2", + "x-spec-enum-id": "b5df70f0bffd12cb", + "minimum": 0, + "maximum": 32767 + }, "expires": { "type": "string", "format": "date-time", @@ -244433,6 +244600,10 @@ "type": "string", "writeOnly": true, "minLength": 1 + }, + "token": { + "type": "string", + "minLength": 1 } }, "required": [ @@ -244444,6 +244615,17 @@ "type": "object", "description": "Extends the built-in ModelSerializer to enforce calling full_clean() on a copy of the associated instance during\nvalidation. (DRF does not do this by default; see https://github.com/encode/django-rest-framework/issues/3144)", "properties": { + "version": { + "enum": [ + 1, + 2 + ], + "type": "integer", + "description": "* `1` - v1\n* `2` - v2", + "x-spec-enum-id": "b5df70f0bffd12cb", + "minimum": 0, + "maximum": 32767 + }, "user": { "oneOf": [ { @@ -244454,6 +244636,10 @@ } ] }, + "description": { + "type": "string", + "maxLength": 200 + }, "expires": { "type": "string", "format": "date-time", @@ -244464,19 +244650,20 @@ "format": "date-time", "nullable": true }, - "key": { - "type": "string", - "writeOnly": true, - "maxLength": 40, - "minLength": 40 - }, "write_enabled": { "type": "boolean", "description": "Permit create/update/delete operations using this key" }, - "description": { + "pepper": { + "type": "integer", + "maximum": 32767, + "minimum": 0, + "nullable": true, + "description": "ID of the cryptographic pepper used to hash the token (v2 only)" + }, + "token": { "type": "string", - "maxLength": 200 + "minLength": 1 } }, "required": [ @@ -256709,7 +256896,7 @@ "type": "apiKey", "in": "header", "name": "Authorization", - "description": "Token-based authentication with required prefix \"Token\"" + "description": "Set `Token ` (v1) or `Bearer ` (v2) in the Authorization header" } } }, diff --git a/docs/integrations/rest-api.md b/docs/integrations/rest-api.md index 47fb65494b..9cecbca3d6 100644 --- a/docs/integrations/rest-api.md +++ b/docs/integrations/rest-api.md @@ -657,14 +657,17 @@ A token is a unique identifier mapped to a NetBox user account. Each user may ha By default, all users can create and manage their own REST API tokens under the user control panel in the UI or via the REST API. This ability can be disabled by overriding the [`DEFAULT_PERMISSIONS`](../configuration/security.md#default_permissions) configuration parameter. -Each token contains a 160-bit key represented as 40 hexadecimal characters. When creating a token, you'll typically leave the key field blank so that a random key will be automatically generated. However, NetBox allows you to specify a key in case you need to restore a previously deleted token to operation. +!!! info "Token Versions" + Beginning with NetBox v4.5, two types of API token are supported, denoted as v1 and v2. Users are strongly encouraged to create only v2 tokens, as these provide much stronger security than v1 tokens. Support for v1 tokens will be removed in a future NetBox release. + +When creating a token, you'll typically leave the key field blank so that a random key will be automatically generated. However, NetBox allows you to specify a key in case you need to restore a previously deleted token to operation. Additionally, a token can be set to expire at a specific time. This can be useful if an external client needs to be granted temporary access to NetBox. !!! info "Restricting Token Retrieval" The ability to retrieve the key value of a previously-created API token can be restricted by disabling the [`ALLOW_TOKEN_RETRIEVAL`](../configuration/security.md#allow_token_retrieval) configuration parameter. -### Restricting Write Operations +#### Restricting Write Operations By default, a token can be used to perform all actions via the API that a user would be permitted to do via the web UI. Deselecting the "write enabled" option will restrict API requests made with the token to read operations (e.g. GET) only. @@ -681,10 +684,22 @@ It is possible to provision authentication tokens for other users via the REST A ### Authenticating to the API -An authentication token is attached to a request by setting the `Authorization` header to the string `Token` followed by a space and the user's token: +An authentication token is included with a request in its `Authorization` header. The format of the header value depends on the version of token in use. v2 tokens use the following form, concatenating the token's key and plaintext value with a period: ``` -$ curl -H "Authorization: Token $TOKEN" \ +Authorization: Bearer . +``` + +v1 tokens use the prefix `Token` rather than `Bearer`, and include only the token plaintext. (v1 tokens do not have a key.) + +``` +Authorization: Token +``` + +Below is an example REST API request utilizing a v2 token. + +``` +$ curl -H "Authorization: Bearer ." \ -H "Accept: application/json; indent=4" \ https://netbox/api/dcim/sites/ { diff --git a/netbox/account/tables.py b/netbox/account/tables.py index bcc0a0ccd4..0b15a8a13a 100644 --- a/netbox/account/tables.py +++ b/netbox/account/tables.py @@ -53,5 +53,6 @@ class UserTokenTable(NetBoxTable): class Meta(NetBoxTable.Meta): model = UserToken fields = ( - 'pk', 'id', 'key', 'description', 'write_enabled', 'created', 'expires', 'last_used', 'allowed_ips', + 'pk', 'id', 'version', 'key', 'pepper', 'description', 'write_enabled', 'created', 'expires', 'last_used', + 'allowed_ips', ) diff --git a/netbox/account/views.py b/netbox/account/views.py index f5ef534ce4..b513f04e42 100644 --- a/netbox/account/views.py +++ b/netbox/account/views.py @@ -343,7 +343,7 @@ class UserTokenView(LoginRequiredMixin, View): def get(self, request, pk): token = get_object_or_404(UserToken.objects.filter(user=request.user), pk=pk) - key = token.key if settings.ALLOW_TOKEN_RETRIEVAL else None + key = token.key if token.v2 or settings.ALLOW_TOKEN_RETRIEVAL else None return render(request, 'account/token.html', { 'object': token, diff --git a/netbox/core/tests/test_api.py b/netbox/core/tests/test_api.py index 4d612e1578..46070c4b49 100644 --- a/netbox/core/tests/test_api.py +++ b/netbox/core/tests/test_api.py @@ -135,8 +135,8 @@ class BackgroundTaskTestCase(TestCase): """ # Create the test user and assign permissions self.user = User.objects.create_user(username='testuser', is_active=True) - self.token = Token.objects.create(user=self.user) - self.header = {'HTTP_AUTHORIZATION': f'Token {self.token.key}'} + self.token = Token.objects.create(version=1, user=self.user) + self.header = {'HTTP_AUTHORIZATION': f'Token {self.token.token}'} # Clear all queues prior to running each test get_queue('default').connection.flushall() diff --git a/netbox/netbox/api/authentication.py b/netbox/netbox/api/authentication.py index f0bd5fd27d..9c73259bf3 100644 --- a/netbox/netbox/api/authentication.py +++ b/netbox/netbox/api/authentication.py @@ -2,47 +2,86 @@ import logging from django.conf import settings from django.utils import timezone -from rest_framework import authentication, exceptions +from drf_spectacular.extensions import OpenApiAuthenticationExtension +from rest_framework import exceptions +from rest_framework.authentication import BaseAuthentication, get_authorization_header from rest_framework.permissions import BasePermission, DjangoObjectPermissions, SAFE_METHODS from netbox.config import get_config from users.models import Token from utilities.request import get_client_ip +V1_KEYWORD = 'token' +V2_KEYWORD = 'bearer' -class TokenAuthentication(authentication.TokenAuthentication): + +class TokenAuthentication(BaseAuthentication): """ A custom authentication scheme which enforces Token expiration times and source IP restrictions. """ model = Token def authenticate(self, request): - result = super().authenticate(request) + if not (auth := get_authorization_header(request).split()): + return - if result: - token = result[1] + # Check for Token/Bearer keyword in HTTP header value & infer token version + if auth[0].lower() == V1_KEYWORD.lower().encode(): + version = 1 + elif auth[0].lower() == V2_KEYWORD.lower().encode(): + version = 2 + else: + return - # Enforce source IP restrictions (if any) set on the token - if token.allowed_ips: - client_ip = get_client_ip(request) - if client_ip is None: - raise exceptions.AuthenticationFailed( - "Client IP address could not be determined for validation. Check that the HTTP server is " - "correctly configured to pass the required header(s)." - ) - if not token.validate_client_ip(client_ip): - raise exceptions.AuthenticationFailed( - f"Source IP {client_ip} is not permitted to authenticate using this token." - ) - - return result - - def authenticate_credentials(self, key): - model = self.get_model() + # Extract token key from authorization header + if len(auth) != 2: + raise exceptions.AuthenticationFailed("Invalid authorization header: Error parsing token") try: - token = model.objects.prefetch_related('user').get(key=key) - except model.DoesNotExist: - raise exceptions.AuthenticationFailed("Invalid token") + auth_value = auth[1].decode() + except UnicodeError: + raise exceptions.AuthenticationFailed("Invalid authorization header: Token contains invalid characters") + + # Look for a matching token in the database + if version == 1: + key, plaintext = None, auth_value + else: + try: + key, plaintext = auth_value.split('.', 1) + except ValueError: + raise exceptions.AuthenticationFailed( + "Invalid authorization header: Could not parse key from v2 token. Did you mean to use 'Token' " + "instead of 'Bearer'?" + ) + try: + qs = Token.objects.prefetch_related('user') + if version == 1: + # Fetch v1 token by querying plaintext value directly + token = qs.get(version=version, plaintext=plaintext) + else: + # Fetch v2 token by key, then validate the plaintext + token = qs.get(version=version, key=key) + if not token.validate(plaintext): + # TODO: Consider security implications of enabling validation of token key without valid plaintext + raise exceptions.AuthenticationFailed(f"Validation failed for v2 token {key}") + except Token.DoesNotExist: + raise exceptions.AuthenticationFailed(f"Invalid v{version} token") + + # Enforce source IP restrictions (if any) set on the token + if token.allowed_ips: + client_ip = get_client_ip(request) + if client_ip is None: + raise exceptions.AuthenticationFailed( + "Client IP address could not be determined for validation. Check that the HTTP server is " + "correctly configured to pass the required header(s)." + ) + if not token.validate_client_ip(client_ip): + raise exceptions.AuthenticationFailed( + f"Source IP {client_ip} is not permitted to authenticate using this token." + ) + + # Enforce the Token's expiration time, if one has been set. + if token.is_expired: + raise exceptions.AuthenticationFailed("Token expired") # Update last used, but only once per minute at most. This reduces write load on the database if not token.last_used or (timezone.now() - token.last_used).total_seconds() > 60: @@ -54,11 +93,8 @@ class TokenAuthentication(authentication.TokenAuthentication): else: Token.objects.filter(pk=token.pk).update(last_used=timezone.now()) - # Enforce the Token's expiration time, if one has been set. - if token.is_expired: - raise exceptions.AuthenticationFailed("Token expired") - user = token.user + # When LDAP authentication is active try to load user data from LDAP directory if 'netbox.authentication.LDAPBackend' in settings.REMOTE_AUTH_BACKEND: from netbox.authentication import LDAPBackend @@ -132,3 +168,17 @@ class IsAuthenticatedOrLoginNotRequired(BasePermission): if not settings.LOGIN_REQUIRED: return True return request.user.is_authenticated + + +class TokenScheme(OpenApiAuthenticationExtension): + target_class = 'netbox.api.authentication.TokenAuthentication' + name = 'tokenAuth' + match_subclasses = True + + def get_security_definition(self, auto_schema): + return { + 'type': 'apiKey', + 'in': 'header', + 'name': 'Authorization', + 'description': 'Set `Token ` (v1) or `Bearer ` (v2) in the Authorization header', + } diff --git a/netbox/netbox/configuration_testing.py b/netbox/netbox/configuration_testing.py index 52973e94de..bbe6dbada1 100644 --- a/netbox/netbox/configuration_testing.py +++ b/netbox/netbox/configuration_testing.py @@ -45,6 +45,10 @@ DEFAULT_PERMISSIONS = {} ALLOW_TOKEN_RETRIEVAL = True +API_TOKEN_PEPPERS = { + 0: 'TEST-VALUE-DO-NOT-USE', +} + LOGGING = { 'version': 1, 'disable_existing_loggers': True diff --git a/netbox/netbox/settings.py b/netbox/netbox/settings.py index c0d7f92308..a912c2d6e8 100644 --- a/netbox/netbox/settings.py +++ b/netbox/netbox/settings.py @@ -65,6 +65,7 @@ elif hasattr(configuration, 'DATABASE') and hasattr(configuration, 'DATABASES'): ADMINS = getattr(configuration, 'ADMINS', []) ALLOW_TOKEN_RETRIEVAL = getattr(configuration, 'ALLOW_TOKEN_RETRIEVAL', False) ALLOWED_HOSTS = getattr(configuration, 'ALLOWED_HOSTS') # Required +API_TOKEN_PEPPERS = getattr(configuration, 'API_TOKEN_PEPPERS', {}) AUTH_PASSWORD_VALIDATORS = getattr(configuration, 'AUTH_PASSWORD_VALIDATORS', [ { "NAME": "django.contrib.auth.password_validation.MinimumLengthValidator", @@ -215,6 +216,13 @@ if len(SECRET_KEY) < 50: f" python {BASE_DIR}/generate_secret_key.py" ) +# Validate API token peppers +for key in API_TOKEN_PEPPERS: + if type(key) is not int: + raise ImproperlyConfigured(f"Invalid API_TOKEN_PEPPERS key: {key}. All keys must be integers.") +if not API_TOKEN_PEPPERS: + warnings.warn("API_TOKEN_PEPPERS is not defined. v2 API tokens cannot be used.") + # Validate update repo URL and timeout if RELEASE_CHECK_URL: try: diff --git a/netbox/netbox/tests/test_authentication.py b/netbox/netbox/tests/test_authentication.py index 9eb21661dd..e30ae87000 100644 --- a/netbox/netbox/tests/test_authentication.py +++ b/netbox/netbox/tests/test_authentication.py @@ -16,35 +16,79 @@ from utilities.testing.api import APITestCase class TokenAuthenticationTestCase(APITestCase): @override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*']) - def test_token_authentication(self): - url = reverse('dcim-api:site-list') - + def test_no_token(self): # Request without a token should return a 403 - response = self.client.get(url) + response = self.client.get(reverse('dcim-api:site-list')) self.assertEqual(response.status_code, 403) + @override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*']) + def test_v1_token_valid(self): + # Create a v1 token + token = Token.objects.create(version=1, user=self.user) + # Valid token should return a 200 - token = Token.objects.create(user=self.user) - response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}') - self.assertEqual(response.status_code, 200) + header = f'Token {token.token}' + response = self.client.get(reverse('dcim-api:site-list'), HTTP_AUTHORIZATION=header) + self.assertEqual(response.status_code, 200, response.data) # Check that the token's last_used time has been updated token.refresh_from_db() self.assertIsNotNone(token.last_used) + @override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*']) + def test_v1_token_invalid(self): + # Invalid token should return a 403 + header = 'Token XXXXXXXXXX' + response = self.client.get(reverse('dcim-api:site-list'), HTTP_AUTHORIZATION=header) + self.assertEqual(response.status_code, 403) + self.assertEqual(response.data['detail'], "Invalid v1 token") + + @override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*']) + def test_v2_token_valid(self): + # Create a v2 token + token = Token.objects.create(version=2, user=self.user) + + # Valid token should return a 200 + header = f'Bearer {token.key}.{token.token}' + response = self.client.get(reverse('dcim-api:site-list'), HTTP_AUTHORIZATION=header) + self.assertEqual(response.status_code, 200, response.data) + + # Check that the token's last_used time has been updated + token.refresh_from_db() + self.assertIsNotNone(token.last_used) + + @override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*']) + def test_v2_token_invalid(self): + # Invalid token should return a 403 + header = 'Bearer XXXXXXXXXX.XXXXXXXXXX' + response = self.client.get(reverse('dcim-api:site-list'), HTTP_AUTHORIZATION=header) + self.assertEqual(response.status_code, 403) + self.assertEqual(response.data['detail'], "Invalid v2 token") + @override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*']) def test_token_expiration(self): url = reverse('dcim-api:site-list') - # Request without a non-expired token should succeed - token = Token.objects.create(user=self.user) - response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}') + # Create v1 & v2 tokens + future = datetime.datetime(2100, 1, 1, tzinfo=datetime.timezone.utc) + token1 = Token.objects.create(version=1, user=self.user, expires=future) + token2 = Token.objects.create(version=2, user=self.user, expires=future) + + # Request with a non-expired token should succeed + response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token1.token}') + self.assertEqual(response.status_code, 200) + response = self.client.get(url, HTTP_AUTHORIZATION=f'Bearer {token2.key}.{token2.token}') self.assertEqual(response.status_code, 200) # Request with an expired token should fail - token.expires = datetime.datetime(2020, 1, 1, tzinfo=datetime.timezone.utc) - token.save() - response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}') + past = datetime.datetime(2020, 1, 1, tzinfo=datetime.timezone.utc) + token1.expires = past + token1.save() + token2.expires = past + token2.save() + response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token1.key}') + self.assertEqual(response.status_code, 403) + response = self.client.get(url, HTTP_AUTHORIZATION=f'Bearer {token2.key}') self.assertEqual(response.status_code, 403) @override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*']) @@ -55,28 +99,60 @@ class TokenAuthenticationTestCase(APITestCase): 'slug': 'site-1', } + # Create v1 & v2 tokens + token1 = Token.objects.create(version=1, user=self.user, write_enabled=False) + token2 = Token.objects.create(version=2, user=self.user, write_enabled=False) + # Request with a write-disabled token should fail - token = Token.objects.create(user=self.user, write_enabled=False) - response = self.client.post(url, data, format='json', HTTP_AUTHORIZATION=f'Token {token.key}') + response = self.client.post(url, data, format='json', HTTP_AUTHORIZATION=f'Token {token1.token}') + self.assertEqual(response.status_code, 403) + response = self.client.post(url, data, format='json', HTTP_AUTHORIZATION=f'Bearer {token2.key}.{token2.token}') self.assertEqual(response.status_code, 403) # Request with a write-enabled token should succeed - token.write_enabled = True - token.save() - response = self.client.post(url, data, format='json', HTTP_AUTHORIZATION=f'Token {token.key}') + token1.write_enabled = True + token1.save() + token2.write_enabled = True + token2.save() + response = self.client.post(url, data, format='json', HTTP_AUTHORIZATION=f'Token {token1.token}') + self.assertEqual(response.status_code, 403) + response = self.client.post(url, data, format='json', HTTP_AUTHORIZATION=f'Bearer {token2.key}.{token2.token}') self.assertEqual(response.status_code, 403) @override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*']) def test_token_allowed_ips(self): url = reverse('dcim-api:site-list') + # Create v1 & v2 tokens + token1 = Token.objects.create(version=1, user=self.user, allowed_ips=['192.0.2.0/24']) + token2 = Token.objects.create(version=2, user=self.user, allowed_ips=['192.0.2.0/24']) + # Request from a non-allowed client IP should fail - token = Token.objects.create(user=self.user, allowed_ips=['192.0.2.0/24']) - response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}', REMOTE_ADDR='127.0.0.1') + response = self.client.get( + url, + HTTP_AUTHORIZATION=f'Token {token1.token}', + REMOTE_ADDR='127.0.0.1' + ) + self.assertEqual(response.status_code, 403) + response = self.client.get( + url, + HTTP_AUTHORIZATION=f'Bearer {token2.key}.{token2.token}', + REMOTE_ADDR='127.0.0.1' + ) self.assertEqual(response.status_code, 403) - # Request with an expired token should fail - response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}', REMOTE_ADDR='192.0.2.1') + # Request from an allowed client IP should succeed + response = self.client.get( + url, + HTTP_AUTHORIZATION=f'Token {token1.token}', + REMOTE_ADDR='192.0.2.1' + ) + self.assertEqual(response.status_code, 200) + response = self.client.get( + url, + HTTP_AUTHORIZATION=f'Bearer {token2.key}.{token2.token}', + REMOTE_ADDR='192.0.2.1' + ) self.assertEqual(response.status_code, 200) @@ -426,8 +502,8 @@ class ObjectPermissionAPIViewTestCase(TestCase): Create a test user and token for API calls. """ self.user = User.objects.create(username='testuser') - self.token = Token.objects.create(user=self.user) - self.header = {'HTTP_AUTHORIZATION': 'Token {}'.format(self.token.key)} + self.token = Token.objects.create(version=1, user=self.user) + self.header = {'HTTP_AUTHORIZATION': f'Token {self.token.token}'} @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) def test_get_object(self): diff --git a/netbox/templates/users/token.html b/netbox/templates/users/token.html index 674476d51e..039d027592 100644 --- a/netbox/templates/users/token.html +++ b/netbox/templates/users/token.html @@ -14,9 +14,24 @@

{% trans "Token" %}

- - + + + {% if object.version == 1 %} + + + + + {% else %} + + + + + + + + + {% endif %}
{% trans "Key" %}{% if settings.ALLOW_TOKEN_RETRIEVAL %}{{ object.key }}{% else %}{{ object.partial }}{% endif %}{% trans "Version" %}{{ object.version }}
{% trans "Token" %}{{ object.partial }}
{% trans "Key" %}{{ object }}
{% trans "Pepper" %}{{ object.pepper }}
{% trans "User" %} diff --git a/netbox/users/api/serializers_/tokens.py b/netbox/users/api/serializers_/tokens.py index 150291ee6d..f7da4dd13f 100644 --- a/netbox/users/api/serializers_/tokens.py +++ b/netbox/users/api/serializers_/tokens.py @@ -1,4 +1,3 @@ -from django.conf import settings from django.contrib.auth import authenticate from rest_framework import serializers from rest_framework.exceptions import AuthenticationFailed, PermissionDenied @@ -15,14 +14,13 @@ __all__ = ( class TokenSerializer(ValidatedModelSerializer): - key = serializers.CharField( - min_length=40, - max_length=40, - allow_blank=True, + token = serializers.CharField( required=False, - write_only=not settings.ALLOW_TOKEN_RETRIEVAL + default=Token.generate, + ) + user = UserSerializer( + nested=True ) - user = UserSerializer(nested=True) allowed_ips = serializers.ListField( child=IPNetworkSerializer(), required=False, @@ -33,15 +31,11 @@ class TokenSerializer(ValidatedModelSerializer): class Meta: model = Token fields = ( - 'id', 'url', 'display_url', 'display', 'user', 'created', 'expires', 'last_used', 'key', 'write_enabled', - 'description', 'allowed_ips', + 'id', 'url', 'display_url', 'display', 'version', 'key', 'user', 'description', 'created', 'expires', + 'last_used', 'write_enabled', 'pepper', 'allowed_ips', 'token', ) - brief_fields = ('id', 'url', 'display', 'key', 'write_enabled', 'description') - - def to_internal_value(self, data): - if not getattr(self.instance, 'key', None) and 'key' not in data: - data['key'] = Token.generate_key() - return super().to_internal_value(data) + read_only_fields = ('key',) + brief_fields = ('id', 'url', 'display', 'version', 'key', 'write_enabled', 'description') def validate(self, data): @@ -75,8 +69,8 @@ class TokenProvisionSerializer(TokenSerializer): class Meta: model = Token fields = ( - 'id', 'url', 'display_url', 'display', 'user', 'created', 'expires', 'last_used', 'key', 'write_enabled', - 'description', 'allowed_ips', 'username', 'password', + 'id', 'url', 'display_url', 'display', 'version', 'user', 'key', 'created', 'expires', 'last_used', 'key', + 'write_enabled', 'description', 'allowed_ips', 'username', 'password', 'token', ) def validate(self, data): diff --git a/netbox/users/choices.py b/netbox/users/choices.py new file mode 100644 index 0000000000..547633c4e9 --- /dev/null +++ b/netbox/users/choices.py @@ -0,0 +1,17 @@ +from django.utils.translation import gettext_lazy as _ + +from utilities.choices import ChoiceSet + +__all__ = ( + 'TokenVersionChoices', +) + + +class TokenVersionChoices(ChoiceSet): + V1 = 1 + V2 = 2 + + CHOICES = [ + (V1, _('v1')), + (V2, _('v2')), + ] diff --git a/netbox/users/constants.py b/netbox/users/constants.py index e92623c820..b02c482e05 100644 --- a/netbox/users/constants.py +++ b/netbox/users/constants.py @@ -1,3 +1,5 @@ +import string + from django.db.models import Q @@ -7,3 +9,5 @@ OBJECTPERMISSION_OBJECT_TYPES = Q( ) CONSTRAINT_TOKEN_USER = '$user' + +TOKEN_CHARSET = string.ascii_letters + string.digits diff --git a/netbox/users/filtersets.py b/netbox/users/filtersets.py index 4e15104107..3cbef8b1ab 100644 --- a/netbox/users/filtersets.py +++ b/netbox/users/filtersets.py @@ -133,7 +133,7 @@ class TokenFilterSet(BaseFilterSet): class Meta: model = Token - fields = ('id', 'key', 'write_enabled', 'description', 'last_used') + fields = ('id', 'version', 'key', 'pepper', 'write_enabled', 'description', 'last_used') def search(self, queryset, name, value): if not value.strip(): diff --git a/netbox/users/forms/bulk_import.py b/netbox/users/forms/bulk_import.py index f478dedbff..bdda61a44d 100644 --- a/netbox/users/forms/bulk_import.py +++ b/netbox/users/forms/bulk_import.py @@ -1,6 +1,7 @@ from django import forms from django.utils.translation import gettext as _ from users.models import * +from users.choices import TokenVersionChoices from utilities.forms import CSVModelForm @@ -34,12 +35,18 @@ class UserImportForm(CSVModelForm): class TokenImportForm(CSVModelForm): - key = forms.CharField( - label=_('Key'), + version = forms.ChoiceField( + choices=TokenVersionChoices, + initial=TokenVersionChoices.V2, required=False, - help_text=_("If no key is provided, one will be generated automatically.") + help_text=_("Specify version 1 or 2 (v2 will be used by default)") + ) + token = forms.CharField( + label=_('Token'), + required=False, + help_text=_("If no token is provided, one will be generated automatically.") ) class Meta: model = Token - fields = ('user', 'key', 'write_enabled', 'expires', 'description',) + fields = ('user', 'version', 'token', 'write_enabled', 'expires', 'description',) diff --git a/netbox/users/forms/filtersets.py b/netbox/users/forms/filtersets.py index 61e55949c6..96f5a48d2e 100644 --- a/netbox/users/forms/filtersets.py +++ b/netbox/users/forms/filtersets.py @@ -3,6 +3,7 @@ from django.utils.translation import gettext_lazy as _ from netbox.forms import NetBoxModelFilterSetForm from netbox.forms.mixins import SavedFiltersMixin +from users.choices import TokenVersionChoices from users.models import Group, ObjectPermission, Token, User from utilities.forms import BOOLEAN_WITH_BLANK_CHOICES, FilterForm from utilities.forms.fields import DynamicModelMultipleChoiceField @@ -110,7 +111,11 @@ class TokenFilterForm(SavedFiltersMixin, FilterForm): model = Token fieldsets = ( FieldSet('q', 'filter_id',), - FieldSet('user_id', 'write_enabled', 'expires', 'last_used', name=_('Token')), + FieldSet('version', 'user_id', 'write_enabled', 'expires', 'last_used', name=_('Token')), + ) + version = forms.ChoiceField( + choices=TokenVersionChoices, + required=False, ) user_id = DynamicModelMultipleChoiceField( queryset=User.objects.all(), diff --git a/netbox/users/forms/model_forms.py b/netbox/users/forms/model_forms.py index 4f4e2fd439..9b6c8aaba3 100644 --- a/netbox/users/forms/model_forms.py +++ b/netbox/users/forms/model_forms.py @@ -12,6 +12,7 @@ from core.models import ObjectType from ipam.formfields import IPNetworkFormField from ipam.validators import prefix_validator from netbox.preferences import PREFERENCES +from users.choices import TokenVersionChoices from users.constants import * from users.models import * from utilities.data import flatten_dict @@ -115,10 +116,10 @@ class UserConfigForm(forms.ModelForm, metaclass=UserConfigFormMetaclass): class UserTokenForm(forms.ModelForm): - key = forms.CharField( - label=_('Key'), + token = forms.CharField( + label=_('Token'), help_text=_( - 'Keys must be at least 40 characters in length. Be sure to record your key prior to ' + 'Tokens must be at least 40 characters in length. Be sure to record your key prior to ' 'submitting this form, as it may no longer be accessible once the token has been created.' ), widget=forms.TextInput( @@ -138,7 +139,7 @@ class UserTokenForm(forms.ModelForm): class Meta: model = Token fields = [ - 'key', 'write_enabled', 'expires', 'description', 'allowed_ips', + 'version', 'token', 'write_enabled', 'expires', 'description', 'allowed_ips', ] widgets = { 'expires': DateTimePicker(), @@ -147,13 +148,27 @@ class UserTokenForm(forms.ModelForm): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - # Omit the key field if token retrieval is not permitted - if self.instance.pk and not settings.ALLOW_TOKEN_RETRIEVAL: - del self.fields['key'] + if self.instance.pk: + # Disable the version & user fields for existing Tokens + self.fields['version'].disabled = True + self.fields['user'].disabled = True + + # Omit the key field when editing an existing token if token retrieval is not permitted + if self.instance.v1 and settings.ALLOW_TOKEN_RETRIEVAL: + self.fields['token'].initial = self.instance.key + else: + del self.fields['token'] # Generate an initial random key if none has been specified - if not self.instance.pk and not self.initial.get('key'): - self.initial['key'] = Token.generate_key() + if self.instance._state.adding and not self.initial.get('token'): + self.initial['version'] = TokenVersionChoices.V2 + self.initial['token'] = Token.generate() + + def save(self, commit=True): + if self.cleaned_data.get('token'): + self.instance.token = self.cleaned_data['token'] + + return super().save(commit=commit) class TokenForm(UserTokenForm): @@ -165,7 +180,7 @@ class TokenForm(UserTokenForm): class Meta: model = Token fields = [ - 'user', 'key', 'write_enabled', 'expires', 'description', 'allowed_ips', + 'version', 'token', 'user', 'write_enabled', 'expires', 'description', 'allowed_ips', ] widgets = { 'expires': DateTimePicker(), diff --git a/netbox/users/migrations/0014_users_token_v2.py b/netbox/users/migrations/0014_users_token_v2.py new file mode 100644 index 0000000000..9e18e4a728 --- /dev/null +++ b/netbox/users/migrations/0014_users_token_v2.py @@ -0,0 +1,65 @@ +import django.core.validators +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('users', '0013_user_remove_is_staff'), + ] + + operations = [ + # Rename the original key field to "plaintext" + migrations.RenameField( + model_name='token', + old_name='key', + new_name='plaintext', + ), + migrations.RunSQL( + sql="ALTER INDEX IF EXISTS users_token_key_820deccd_like RENAME TO users_token_plaintext_46c6f315_like", + ), + migrations.RunSQL( + sql="ALTER INDEX IF EXISTS users_token_key_key RENAME TO users_token_plaintext_key", + ), + # Make plaintext (formerly key) nullable for v2 tokens + migrations.AlterField( + model_name='token', + name='plaintext', + field=models.CharField( + max_length=40, + unique=True, + blank=True, + null=True, + validators=[django.core.validators.MinLengthValidator(40)] + ), + ), + # Add version field to distinguish v1 and v2 tokens + migrations.AddField( + model_name='token', + name='version', + field=models.PositiveSmallIntegerField(default=1), # Mark all existing Tokens as v1 + preserve_default=False, + ), + # Change the default version for new tokens to v2 + migrations.AlterField( + model_name='token', + name='version', + field=models.PositiveSmallIntegerField(default=2), + ), + # Add new key, pepper, and hmac_digest fields for v2 tokens + migrations.AddField( + model_name='token', + name='key', + field=models.CharField(blank=True, max_length=16, null=True, unique=True), + ), + migrations.AddField( + model_name='token', + name='pepper', + field=models.PositiveSmallIntegerField(blank=True, null=True), + ), + migrations.AddField( + model_name='token', + name='hmac_digest', + field=models.CharField(blank=True, max_length=64, null=True), + ), + ] diff --git a/netbox/users/models/tokens.py b/netbox/users/models/tokens.py index 3c1284bc9c..cf35c4e6ab 100644 --- a/netbox/users/models/tokens.py +++ b/netbox/users/models/tokens.py @@ -1,8 +1,12 @@ import binascii +import hashlib +import hmac +import random import os from django.conf import settings from django.contrib.postgres.fields import ArrayField +from django.core.exceptions import ValidationError from django.core.validators import MinLengthValidator from django.db import models from django.urls import reverse @@ -11,6 +15,9 @@ from django.utils.translation import gettext_lazy as _ from netaddr import IPNetwork from ipam.fields import IPNetworkField +from users.choices import TokenVersionChoices +from users.constants import TOKEN_CHARSET +from users.utils import get_current_pepper from utilities.querysets import RestrictedQuerySet __all__ = ( @@ -23,11 +30,21 @@ class Token(models.Model): An API token used for user authentication. This extends the stock model to allow each user to have multiple tokens. It also supports setting an expiration time and toggling write ability. """ + version = models.PositiveSmallIntegerField( + verbose_name=_('version'), + choices=TokenVersionChoices, + default=TokenVersionChoices.V2, + ) user = models.ForeignKey( to='users.User', on_delete=models.CASCADE, related_name='tokens' ) + description = models.CharField( + verbose_name=_('description'), + max_length=200, + blank=True + ) created = models.DateTimeField( verbose_name=_('created'), auto_now_add=True @@ -42,21 +59,40 @@ class Token(models.Model): blank=True, null=True ) - key = models.CharField( - verbose_name=_('key'), - max_length=40, - unique=True, - validators=[MinLengthValidator(40)] - ) write_enabled = models.BooleanField( verbose_name=_('write enabled'), default=True, help_text=_('Permit create/update/delete operations using this key') ) - description = models.CharField( - verbose_name=_('description'), - max_length=200, - blank=True + # For legacy v1 tokens, this field stores the plaintext 40-char token value. Not used for v2. + plaintext = models.CharField( + verbose_name=_('plaintext'), + max_length=40, + unique=True, + blank=True, + null=True, + validators=[MinLengthValidator(40)], + ) + key = models.CharField( + verbose_name=_('key'), + max_length=16, + unique=True, + blank=True, + null=True, + help_text=_('v2 token identification key'), + ) + pepper = models.PositiveSmallIntegerField( + verbose_name=_('pepper'), + blank=True, + null=True, + help_text=_('ID of the cryptographic pepper used to hash the token (v2 only)'), + ) + hmac_digest = models.CharField( + verbose_name=_('digest'), + max_length=64, + blank=True, + null=True, + help_text=_('SHA256 hash of the token and pepper (v2 only)'), ) allowed_ips = ArrayField( base_field=IPNetworkField(), @@ -72,36 +108,108 @@ class Token(models.Model): objects = RestrictedQuerySet.as_manager() class Meta: + ordering = ('-created',) verbose_name = _('token') verbose_name_plural = _('tokens') - ordering = ('-created',) + + def __init__(self, *args, token=None, **kwargs): + super().__init__(*args, **kwargs) + + self.token = token def __str__(self): - return self.key if settings.ALLOW_TOKEN_RETRIEVAL else self.partial + if self.v1: + return self.partial + return self.key def get_absolute_url(self): return reverse('users:token', args=[self.pk]) + @property + def v1(self): + return self.version == 1 + + @property + def v2(self): + return self.version == 2 + @property def partial(self): - return f'**********************************{self.key[-6:]}' if self.key else '' + return f'**********************************{self.plaintext[-6:]}' if self.plaintext else '' + + @property + def token(self): + return getattr(self, '_token', None) + + @token.setter + def token(self, value): + self._token = value + if value is not None: + if self.v1: + self.plaintext = value + elif self.v2: + self.key = self.key or self.generate(16) + self.update_digest() + + def clean(self): + if self._state.adding and self.v2 and not settings.API_TOKEN_PEPPERS: + raise ValidationError(_("Cannot create v2 tokens: API_TOKEN_PEPPERS is not defined.")) def save(self, *args, **kwargs): - if not self.key: - self.key = self.generate_key() + # If creating a new Token and no token value has been specified, generate one + if self._state.adding and self.token is None: + self.token = self.generate() + return super().save(*args, **kwargs) @staticmethod def generate_key(): - # Generate a random 160-bit key expressed in hexadecimal. + """ + DEPRECATED: Generate and return a random 160-bit key expressed in hexadecimal. + """ return binascii.hexlify(os.urandom(20)).decode() + @staticmethod + def generate(length=40): + """ + Generate and return a random token value of the given length. + """ + return ''.join(random.choice(TOKEN_CHARSET) for _ in range(length)) + + def update_digest(self): + """ + Recalculate and save the HMAC digest using the currently defined pepper and token values. + """ + self.pepper, pepper_value = get_current_pepper() + self.hmac_digest = hmac.new( + pepper_value.encode('utf-8'), + self.token.encode('utf-8'), + hashlib.sha256 + ).hexdigest() + @property def is_expired(self): if self.expires is None or timezone.now() < self.expires: return False return True + def validate(self, token): + """ + Returns true if the given token value validates. + """ + if self.is_expired: + return False + if self.v1: + return token == self.key + if self.v2: + try: + pepper = settings.API_TOKEN_PEPPERS[self.pepper] + except KeyError: + # Invalid pepper ID + return False + digest = hmac.new(pepper.encode('utf-8'), token.encode('utf-8'), hashlib.sha256).hexdigest() + return digest == self.hmac_digest + def validate_client_ip(self, client_ip): """ Validate the API client IP address against the source IP restrictions (if any) set on the token. diff --git a/netbox/users/tables.py b/netbox/users/tables.py index 40cbeca47d..b8683cc872 100644 --- a/netbox/users/tables.py +++ b/netbox/users/tables.py @@ -22,7 +22,8 @@ class TokenTable(UserTokenTable): class Meta(NetBoxTable.Meta): model = Token fields = ( - 'pk', 'id', 'key', 'user', 'description', 'write_enabled', 'created', 'expires', 'last_used', 'allowed_ips', + 'pk', 'id', 'version', 'key', 'pepper', 'user', 'description', 'write_enabled', 'created', 'expires', + 'last_used', 'allowed_ips', ) diff --git a/netbox/users/tests/test_api.py b/netbox/users/tests/test_api.py index 71496f0077..f0218179ae 100644 --- a/netbox/users/tests/test_api.py +++ b/netbox/users/tests/test_api.py @@ -197,7 +197,7 @@ class TokenTest( APIViewTestCases.DeleteObjectViewTestCase ): model = Token - brief_fields = ['description', 'display', 'id', 'key', 'url', 'write_enabled'] + brief_fields = ['description', 'display', 'id', 'key', 'url', 'version', 'write_enabled'] bulk_update_data = { 'description': 'New description', } @@ -256,8 +256,8 @@ class TokenTest( response = self.client.post(url, data, format='json', **self.header) self.assertEqual(response.status_code, 201) - self.assertIn('key', response.data) - self.assertEqual(len(response.data['key']), 40) + self.assertIn('token', response.data) + self.assertEqual(len(response.data['token']), 40) self.assertEqual(response.data['description'], data['description']) self.assertEqual(response.data['expires'], data['expires']) token = Token.objects.get(user=user) diff --git a/netbox/users/tests/test_filtersets.py b/netbox/users/tests/test_filtersets.py index e15df0d189..f7404cedd4 100644 --- a/netbox/users/tests/test_filtersets.py +++ b/netbox/users/tests/test_filtersets.py @@ -266,7 +266,7 @@ class ObjectPermissionTestCase(TestCase, BaseFilterSetTests): class TokenTestCase(TestCase, BaseFilterSetTests): queryset = Token.objects.all() filterset = filtersets.TokenFilterSet - ignore_fields = ('allowed_ips',) + ignore_fields = ('plaintext', 'hmac_digest', 'allowed_ips') @classmethod def setUpTestData(cls): @@ -282,21 +282,39 @@ class TokenTestCase(TestCase, BaseFilterSetTests): past_date = make_aware(datetime.datetime(2000, 1, 1)) tokens = ( Token( - user=users[0], key=Token.generate_key(), expires=future_date, write_enabled=True, description='foobar1' + version=1, + user=users[0], + expires=future_date, + write_enabled=True, + description='foobar1', ), Token( - user=users[1], key=Token.generate_key(), expires=future_date, write_enabled=True, description='foobar2' + version=2, + user=users[1], + expires=future_date, + write_enabled=True, + description='foobar2', ), Token( - user=users[2], key=Token.generate_key(), expires=past_date, write_enabled=False + version=2, + user=users[2], + expires=past_date, + write_enabled=False, ), ) - Token.objects.bulk_create(tokens) + for token in tokens: + token.save() def test_q(self): params = {'q': 'foobar1'} self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1) + def test_version(self): + params = {'version': 1} + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1) + params = {'version': 2} + self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2) + def test_user(self): users = User.objects.order_by('id')[:2] params = {'user_id': [users[0].pk, users[1].pk]} @@ -313,7 +331,7 @@ class TokenTestCase(TestCase, BaseFilterSetTests): self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1) def test_key(self): - tokens = Token.objects.all()[:2] + tokens = Token.objects.filter(version=2) params = {'key': [tokens[0].key, tokens[1].key]} self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2) diff --git a/netbox/users/tests/test_views.py b/netbox/users/tests/test_views.py index e66c00d0a1..0395c2209d 100644 --- a/netbox/users/tests/test_views.py +++ b/netbox/users/tests/test_views.py @@ -215,6 +215,7 @@ class TokenTestCase( ): model = Token maxDiff = None + validation_excluded_fields = ['token', 'user'] @classmethod def setUpTestData(cls): @@ -223,32 +224,34 @@ class TokenTestCase( create_test_user('User 2'), ) tokens = ( - Token(key='123456789012345678901234567890123456789A', user=users[0]), - Token(key='123456789012345678901234567890123456789B', user=users[0]), - Token(key='123456789012345678901234567890123456789C', user=users[1]), + Token(user=users[0]), + Token(user=users[0]), + Token(user=users[1]), ) - Token.objects.bulk_create(tokens) + for token in tokens: + token.save() cls.form_data = { + 'version': 2, + 'token': '4F9DAouzURLbicyoG55htImgqQ0b4UZHP5LUYgl5', 'user': users[0].pk, - 'key': '1234567890123456789012345678901234567890', - 'description': 'testdescription', + 'description': 'Test token', } cls.csv_data = ( - "key,user,description", - f"123456789012345678901234567890123456789D,{users[0].pk},testdescriptionD", - f"123456789012345678901234567890123456789E,{users[1].pk},testdescriptionE", - f"123456789012345678901234567890123456789F,{users[1].pk},testdescriptionF", + "token,user,description", + f"123456789012345678901234567890123456789A,{users[0].pk},Test token", + f"123456789012345678901234567890123456789B,{users[1].pk},Test token", + f"123456789012345678901234567890123456789C,{users[1].pk},Test token", ) cls.csv_update_data = ( "id,description", - f"{tokens[0].pk},testdescriptionH", - f"{tokens[1].pk},testdescriptionI", - f"{tokens[2].pk},testdescriptionJ", + f"{tokens[0].pk},New description", + f"{tokens[1].pk},New description", + f"{tokens[2].pk},New description", ) cls.bulk_edit_data = { - 'description': 'newdescription', + 'description': 'New description', } diff --git a/netbox/users/utils.py b/netbox/users/utils.py index 114d8ab6dd..045d192c71 100644 --- a/netbox/users/utils.py +++ b/netbox/users/utils.py @@ -1,5 +1,12 @@ +from django.conf import settings +from django.core.exceptions import ImproperlyConfigured from social_core.storage import NO_ASCII_REGEX, NO_SPECIAL_REGEX +__all__ = ( + 'clean_username', + 'get_current_pepper', +) + def clean_username(value): """Clean username removing any unsupported character""" @@ -7,3 +14,13 @@ def clean_username(value): value = NO_SPECIAL_REGEX.sub('', value) value = value.replace(':', '') return value + + +def get_current_pepper(): + """ + Return the ID and value of the newest (highest ID) cryptographic pepper. + """ + if len(settings.API_TOKEN_PEPPERS) < 1: + raise ImproperlyConfigured("Must define API_TOKEN_PEPPERS to use v2 API tokens") + newest_id = sorted(settings.API_TOKEN_PEPPERS)[-1] + return newest_id, settings.API_TOKEN_PEPPERS[newest_id] diff --git a/netbox/utilities/testing/api.py b/netbox/utilities/testing/api.py index 1fe8813679..32e5fe53f3 100644 --- a/netbox/utilities/testing/api.py +++ b/netbox/utilities/testing/api.py @@ -49,8 +49,8 @@ class APITestCase(ModelTestCase): # Create the test user and assign permissions self.user = User.objects.create_user(username='testuser') self.add_permissions(*self.user_permissions) - self.token = Token.objects.create(user=self.user) - self.header = {'HTTP_AUTHORIZATION': f'Token {self.token.key}'} + self.token = Token.objects.create(version=1, user=self.user) + self.header = {'HTTP_AUTHORIZATION': f'Token {self.token.plaintext}'} def _get_view_namespace(self): return f'{self.view_namespace or self.model._meta.app_label}-api' @@ -153,6 +153,7 @@ class APIViewTestCases: url = f'{self._get_list_url()}?brief=1' response = self.client.get(url, **self.header) + self.assertHttpStatus(response, status.HTTP_200_OK) self.assertEqual(len(response.data['results']), self._get_queryset().count()) self.assertEqual(sorted(response.data['results'][0]), self.brief_fields) diff --git a/netbox/utilities/testing/views.py b/netbox/utilities/testing/views.py index f00b21d083..c054dc5a2c 100644 --- a/netbox/utilities/testing/views.py +++ b/netbox/utilities/testing/views.py @@ -240,10 +240,12 @@ class ViewTestCases: :form_data: Data to be used when updating the first existing object. """ form_data = {} + form_edit_data = {} validation_excluded_fields = [] def test_edit_object_without_permission(self): instance = self._get_queryset().first() + form_data = self.form_edit_data or self.form_data # Try GET without permission with disable_warnings('django.request'): @@ -252,7 +254,7 @@ class ViewTestCases: # Try POST without permission request = { 'path': self._get_url('edit', instance), - 'data': post_data(self.form_data), + 'data': post_data(form_data), } with disable_warnings('django.request'): self.assertHttpStatus(self.client.post(**request), 403) @@ -260,6 +262,7 @@ class ViewTestCases: @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], EXEMPT_EXCLUDE_MODELS=[]) def test_edit_object_with_permission(self): instance = self._get_queryset().first() + form_data = self.form_edit_data or self.form_data # Assign model-level permission obj_perm = ObjectPermission( @@ -275,21 +278,21 @@ class ViewTestCases: # Add custom field data if the model supports it if issubclass(self.model, CustomFieldsMixin): - add_custom_field_data(self.form_data, self.model) + add_custom_field_data(form_data, self.model) # If supported, add a changelog message if issubclass(self.model, ChangeLoggingMixin): - if 'changelog_message' not in self.form_data: - self.form_data['changelog_message'] = get_random_string(10) + if 'changelog_message' not in form_data: + form_data['changelog_message'] = get_random_string(10) # Try POST with model-level permission request = { 'path': self._get_url('edit', instance), - 'data': post_data(self.form_data), + 'data': post_data(form_data), } self.assertHttpStatus(self.client.post(**request), 302) instance = self._get_queryset().get(pk=instance.pk) - self.assertInstanceEqual(instance, self.form_data, exclude=self.validation_excluded_fields) + self.assertInstanceEqual(instance, form_data, exclude=self.validation_excluded_fields) # Verify ObjectChange creation if issubclass(self.model, ChangeLoggingMixin): @@ -299,11 +302,12 @@ class ViewTestCases: ) self.assertEqual(len(objectchanges), 1) self.assertEqual(objectchanges[0].action, ObjectChangeActionChoices.ACTION_UPDATE) - self.assertEqual(objectchanges[0].message, self.form_data['changelog_message']) + self.assertEqual(objectchanges[0].message, form_data['changelog_message']) @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], EXEMPT_EXCLUDE_MODELS=[]) def test_edit_object_with_constrained_permission(self): instance1, instance2 = self._get_queryset().all()[:2] + form_data = self.form_edit_data or self.form_data # Assign constrained permission obj_perm = ObjectPermission( @@ -324,16 +328,16 @@ class ViewTestCases: # Try to edit a permitted object request = { 'path': self._get_url('edit', instance1), - 'data': post_data(self.form_data), + 'data': post_data(form_data), } self.assertHttpStatus(self.client.post(**request), 302) instance = self._get_queryset().get(pk=instance1.pk) - self.assertInstanceEqual(instance, self.form_data, exclude=self.validation_excluded_fields) + self.assertInstanceEqual(instance, form_data, exclude=self.validation_excluded_fields) # Try to edit a non-permitted object request = { 'path': self._get_url('edit', instance2), - 'data': post_data(self.form_data), + 'data': post_data(form_data), } self.assertHttpStatus(self.client.post(**request), 404)