Initial work on #20210

This commit is contained in:
Jeremy Stretch 2025-10-02 15:04:29 -04:00
parent 23d7515b41
commit 1ee23ba6fa
26 changed files with 787 additions and 172 deletions

View File

@ -166111,6 +166111,91 @@
"type": "string"
}
},
{
"in": "query",
"name": "pepper",
"schema": {
"type": "array",
"items": {
"type": "integer",
"format": "int32"
}
},
"explode": true,
"style": "form"
},
{
"in": "query",
"name": "pepper__empty",
"schema": {
"type": "boolean"
}
},
{
"in": "query",
"name": "pepper__gt",
"schema": {
"type": "array",
"items": {
"type": "integer",
"format": "int32"
}
},
"explode": true,
"style": "form"
},
{
"in": "query",
"name": "pepper__gte",
"schema": {
"type": "array",
"items": {
"type": "integer",
"format": "int32"
}
},
"explode": true,
"style": "form"
},
{
"in": "query",
"name": "pepper__lt",
"schema": {
"type": "array",
"items": {
"type": "integer",
"format": "int32"
}
},
"explode": true,
"style": "form"
},
{
"in": "query",
"name": "pepper__lte",
"schema": {
"type": "array",
"items": {
"type": "integer",
"format": "int32"
}
},
"explode": true,
"style": "form"
},
{
"in": "query",
"name": "pepper__n",
"schema": {
"type": "array",
"items": {
"type": "integer",
"format": "int32"
}
},
"explode": true,
"style": "form"
},
{
"in": "query",
"name": "q",
@ -166171,6 +166256,19 @@
"explode": true,
"style": "form"
},
{
"in": "query",
"name": "version",
"schema": {
"type": "integer",
"x-spec-enum-id": "b5df70f0bffd12cb",
"enum": [
1,
2
]
},
"description": "* `1` - v1\n* `2` - v2"
},
{
"in": "query",
"name": "write_enabled",
@ -228068,6 +228166,17 @@
"type": "object",
"description": "Extends the built-in ModelSerializer to enforce calling full_clean() on a copy of the associated instance during\nvalidation. (DRF does not do this by default; see https://github.com/encode/django-rest-framework/issues/3144)",
"properties": {
"version": {
"enum": [
1,
2
],
"type": "integer",
"description": "* `1` - v1\n* `2` - v2",
"x-spec-enum-id": "b5df70f0bffd12cb",
"minimum": 0,
"maximum": 32767
},
"user": {
"oneOf": [
{
@ -228078,6 +228187,10 @@
}
]
},
"description": {
"type": "string",
"maxLength": 200
},
"expires": {
"type": "string",
"format": "date-time",
@ -228088,19 +228201,20 @@
"format": "date-time",
"nullable": true
},
"key": {
"type": "string",
"writeOnly": true,
"maxLength": 40,
"minLength": 40
},
"write_enabled": {
"type": "boolean",
"description": "Permit create/update/delete operations using this key"
},
"description": {
"pepper": {
"type": "integer",
"maximum": 32767,
"minimum": 0,
"nullable": true,
"description": "ID of the cryptographic pepper used to hash the token (v2 only)"
},
"token": {
"type": "string",
"maxLength": 200
"minLength": 1
}
}
},
@ -244302,9 +244416,30 @@
"type": "string",
"readOnly": true
},
"version": {
"enum": [
1,
2
],
"type": "integer",
"description": "* `1` - v1\n* `2` - v2",
"x-spec-enum-id": "b5df70f0bffd12cb",
"minimum": 0,
"maximum": 32767
},
"key": {
"type": "string",
"readOnly": true,
"nullable": true,
"description": "v2 token identification key"
},
"user": {
"$ref": "#/components/schemas/BriefUser"
},
"description": {
"type": "string",
"maxLength": 200
},
"created": {
"type": "string",
"format": "date-time",
@ -244324,9 +244459,15 @@
"type": "boolean",
"description": "Permit create/update/delete operations using this key"
},
"description": {
"type": "string",
"maxLength": 200
"pepper": {
"type": "integer",
"maximum": 32767,
"minimum": 0,
"nullable": true,
"description": "ID of the cryptographic pepper used to hash the token (v2 only)"
},
"token": {
"type": "string"
}
},
"required": [
@ -244334,6 +244475,7 @@
"display",
"display_url",
"id",
"key",
"url",
"user"
]
@ -244360,6 +244502,17 @@
"type": "string",
"readOnly": true
},
"version": {
"enum": [
1,
2
],
"type": "integer",
"description": "* `1` - v1\n* `2` - v2",
"x-spec-enum-id": "b5df70f0bffd12cb",
"minimum": 0,
"maximum": 32767
},
"user": {
"allOf": [
{
@ -244368,6 +244521,10 @@
],
"readOnly": true
},
"key": {
"type": "string",
"readOnly": true
},
"created": {
"type": "string",
"format": "date-time",
@ -244383,10 +244540,6 @@
"format": "date-time",
"readOnly": true
},
"key": {
"type": "string",
"readOnly": true
},
"write_enabled": {
"type": "boolean",
"description": "Permit create/update/delete operations using this key"
@ -244394,6 +244547,9 @@
"description": {
"type": "string",
"maxLength": 200
},
"token": {
"type": "string"
}
},
"required": [
@ -244411,6 +244567,17 @@
"type": "object",
"description": "Extends the built-in ModelSerializer to enforce calling full_clean() on a copy of the associated instance during\nvalidation. (DRF does not do this by default; see https://github.com/encode/django-rest-framework/issues/3144)",
"properties": {
"version": {
"enum": [
1,
2
],
"type": "integer",
"description": "* `1` - v1\n* `2` - v2",
"x-spec-enum-id": "b5df70f0bffd12cb",
"minimum": 0,
"maximum": 32767
},
"expires": {
"type": "string",
"format": "date-time",
@ -244433,6 +244600,10 @@
"type": "string",
"writeOnly": true,
"minLength": 1
},
"token": {
"type": "string",
"minLength": 1
}
},
"required": [
@ -244444,6 +244615,17 @@
"type": "object",
"description": "Extends the built-in ModelSerializer to enforce calling full_clean() on a copy of the associated instance during\nvalidation. (DRF does not do this by default; see https://github.com/encode/django-rest-framework/issues/3144)",
"properties": {
"version": {
"enum": [
1,
2
],
"type": "integer",
"description": "* `1` - v1\n* `2` - v2",
"x-spec-enum-id": "b5df70f0bffd12cb",
"minimum": 0,
"maximum": 32767
},
"user": {
"oneOf": [
{
@ -244454,6 +244636,10 @@
}
]
},
"description": {
"type": "string",
"maxLength": 200
},
"expires": {
"type": "string",
"format": "date-time",
@ -244464,19 +244650,20 @@
"format": "date-time",
"nullable": true
},
"key": {
"type": "string",
"writeOnly": true,
"maxLength": 40,
"minLength": 40
},
"write_enabled": {
"type": "boolean",
"description": "Permit create/update/delete operations using this key"
},
"description": {
"pepper": {
"type": "integer",
"maximum": 32767,
"minimum": 0,
"nullable": true,
"description": "ID of the cryptographic pepper used to hash the token (v2 only)"
},
"token": {
"type": "string",
"maxLength": 200
"minLength": 1
}
},
"required": [
@ -256709,7 +256896,7 @@
"type": "apiKey",
"in": "header",
"name": "Authorization",
"description": "Token-based authentication with required prefix \"Token\""
"description": "Set `Token <token>` (v1) or `Bearer <token>` (v2) in the Authorization header"
}
}
},

View File

@ -657,14 +657,17 @@ A token is a unique identifier mapped to a NetBox user account. Each user may ha
By default, all users can create and manage their own REST API tokens under the user control panel in the UI or via the REST API. This ability can be disabled by overriding the [`DEFAULT_PERMISSIONS`](../configuration/security.md#default_permissions) configuration parameter.
Each token contains a 160-bit key represented as 40 hexadecimal characters. When creating a token, you'll typically leave the key field blank so that a random key will be automatically generated. However, NetBox allows you to specify a key in case you need to restore a previously deleted token to operation.
!!! info "Token Versions"
Beginning with NetBox v4.5, two types of API token are supported, denoted as v1 and v2. Users are strongly encouraged to create only v2 tokens, as these provide much stronger security than v1 tokens. Support for v1 tokens will be removed in a future NetBox release.
When creating a token, you'll typically leave the key field blank so that a random key will be automatically generated. However, NetBox allows you to specify a key in case you need to restore a previously deleted token to operation.
Additionally, a token can be set to expire at a specific time. This can be useful if an external client needs to be granted temporary access to NetBox.
!!! info "Restricting Token Retrieval"
The ability to retrieve the key value of a previously-created API token can be restricted by disabling the [`ALLOW_TOKEN_RETRIEVAL`](../configuration/security.md#allow_token_retrieval) configuration parameter.
### Restricting Write Operations
#### Restricting Write Operations
By default, a token can be used to perform all actions via the API that a user would be permitted to do via the web UI. Deselecting the "write enabled" option will restrict API requests made with the token to read operations (e.g. GET) only.
@ -681,10 +684,22 @@ It is possible to provision authentication tokens for other users via the REST A
### Authenticating to the API
An authentication token is attached to a request by setting the `Authorization` header to the string `Token` followed by a space and the user's token:
An authentication token is included with a request in its `Authorization` header. The format of the header value depends on the version of token in use. v2 tokens use the following form, concatenating the token's key and plaintext value with a period:
```
$ curl -H "Authorization: Token $TOKEN" \
Authorization: Bearer <key>.<token>
```
v1 tokens use the prefix `Token` rather than `Bearer`, and include only the token plaintext. (v1 tokens do not have a key.)
```
Authorization: Token <token>
```
Below is an example REST API request utilizing a v2 token.
```
$ curl -H "Authorization: Bearer <key>.<token>" \
-H "Accept: application/json; indent=4" \
https://netbox/api/dcim/sites/
{

View File

@ -53,5 +53,6 @@ class UserTokenTable(NetBoxTable):
class Meta(NetBoxTable.Meta):
model = UserToken
fields = (
'pk', 'id', 'key', 'description', 'write_enabled', 'created', 'expires', 'last_used', 'allowed_ips',
'pk', 'id', 'version', 'key', 'pepper', 'description', 'write_enabled', 'created', 'expires', 'last_used',
'allowed_ips',
)

View File

@ -343,7 +343,7 @@ class UserTokenView(LoginRequiredMixin, View):
def get(self, request, pk):
token = get_object_or_404(UserToken.objects.filter(user=request.user), pk=pk)
key = token.key if settings.ALLOW_TOKEN_RETRIEVAL else None
key = token.key if token.v2 or settings.ALLOW_TOKEN_RETRIEVAL else None
return render(request, 'account/token.html', {
'object': token,

View File

@ -135,8 +135,8 @@ class BackgroundTaskTestCase(TestCase):
"""
# Create the test user and assign permissions
self.user = User.objects.create_user(username='testuser', is_active=True)
self.token = Token.objects.create(user=self.user)
self.header = {'HTTP_AUTHORIZATION': f'Token {self.token.key}'}
self.token = Token.objects.create(version=1, user=self.user)
self.header = {'HTTP_AUTHORIZATION': f'Token {self.token.token}'}
# Clear all queues prior to running each test
get_queue('default').connection.flushall()

View File

@ -2,47 +2,86 @@ import logging
from django.conf import settings
from django.utils import timezone
from rest_framework import authentication, exceptions
from drf_spectacular.extensions import OpenApiAuthenticationExtension
from rest_framework import exceptions
from rest_framework.authentication import BaseAuthentication, get_authorization_header
from rest_framework.permissions import BasePermission, DjangoObjectPermissions, SAFE_METHODS
from netbox.config import get_config
from users.models import Token
from utilities.request import get_client_ip
V1_KEYWORD = 'token'
V2_KEYWORD = 'bearer'
class TokenAuthentication(authentication.TokenAuthentication):
class TokenAuthentication(BaseAuthentication):
"""
A custom authentication scheme which enforces Token expiration times and source IP restrictions.
"""
model = Token
def authenticate(self, request):
result = super().authenticate(request)
if not (auth := get_authorization_header(request).split()):
return
if result:
token = result[1]
# Check for Token/Bearer keyword in HTTP header value & infer token version
if auth[0].lower() == V1_KEYWORD.lower().encode():
version = 1
elif auth[0].lower() == V2_KEYWORD.lower().encode():
version = 2
else:
return
# Enforce source IP restrictions (if any) set on the token
if token.allowed_ips:
client_ip = get_client_ip(request)
if client_ip is None:
raise exceptions.AuthenticationFailed(
"Client IP address could not be determined for validation. Check that the HTTP server is "
"correctly configured to pass the required header(s)."
)
if not token.validate_client_ip(client_ip):
raise exceptions.AuthenticationFailed(
f"Source IP {client_ip} is not permitted to authenticate using this token."
)
return result
def authenticate_credentials(self, key):
model = self.get_model()
# Extract token key from authorization header
if len(auth) != 2:
raise exceptions.AuthenticationFailed("Invalid authorization header: Error parsing token")
try:
token = model.objects.prefetch_related('user').get(key=key)
except model.DoesNotExist:
raise exceptions.AuthenticationFailed("Invalid token")
auth_value = auth[1].decode()
except UnicodeError:
raise exceptions.AuthenticationFailed("Invalid authorization header: Token contains invalid characters")
# Look for a matching token in the database
if version == 1:
key, plaintext = None, auth_value
else:
try:
key, plaintext = auth_value.split('.', 1)
except ValueError:
raise exceptions.AuthenticationFailed(
"Invalid authorization header: Could not parse key from v2 token. Did you mean to use 'Token' "
"instead of 'Bearer'?"
)
try:
qs = Token.objects.prefetch_related('user')
if version == 1:
# Fetch v1 token by querying plaintext value directly
token = qs.get(version=version, plaintext=plaintext)
else:
# Fetch v2 token by key, then validate the plaintext
token = qs.get(version=version, key=key)
if not token.validate(plaintext):
# TODO: Consider security implications of enabling validation of token key without valid plaintext
raise exceptions.AuthenticationFailed(f"Validation failed for v2 token {key}")
except Token.DoesNotExist:
raise exceptions.AuthenticationFailed(f"Invalid v{version} token")
# Enforce source IP restrictions (if any) set on the token
if token.allowed_ips:
client_ip = get_client_ip(request)
if client_ip is None:
raise exceptions.AuthenticationFailed(
"Client IP address could not be determined for validation. Check that the HTTP server is "
"correctly configured to pass the required header(s)."
)
if not token.validate_client_ip(client_ip):
raise exceptions.AuthenticationFailed(
f"Source IP {client_ip} is not permitted to authenticate using this token."
)
# Enforce the Token's expiration time, if one has been set.
if token.is_expired:
raise exceptions.AuthenticationFailed("Token expired")
# Update last used, but only once per minute at most. This reduces write load on the database
if not token.last_used or (timezone.now() - token.last_used).total_seconds() > 60:
@ -54,11 +93,8 @@ class TokenAuthentication(authentication.TokenAuthentication):
else:
Token.objects.filter(pk=token.pk).update(last_used=timezone.now())
# Enforce the Token's expiration time, if one has been set.
if token.is_expired:
raise exceptions.AuthenticationFailed("Token expired")
user = token.user
# When LDAP authentication is active try to load user data from LDAP directory
if 'netbox.authentication.LDAPBackend' in settings.REMOTE_AUTH_BACKEND:
from netbox.authentication import LDAPBackend
@ -132,3 +168,17 @@ class IsAuthenticatedOrLoginNotRequired(BasePermission):
if not settings.LOGIN_REQUIRED:
return True
return request.user.is_authenticated
class TokenScheme(OpenApiAuthenticationExtension):
target_class = 'netbox.api.authentication.TokenAuthentication'
name = 'tokenAuth'
match_subclasses = True
def get_security_definition(self, auto_schema):
return {
'type': 'apiKey',
'in': 'header',
'name': 'Authorization',
'description': 'Set `Token <token>` (v1) or `Bearer <token>` (v2) in the Authorization header',
}

View File

@ -45,6 +45,10 @@ DEFAULT_PERMISSIONS = {}
ALLOW_TOKEN_RETRIEVAL = True
API_TOKEN_PEPPERS = {
0: 'TEST-VALUE-DO-NOT-USE',
}
LOGGING = {
'version': 1,
'disable_existing_loggers': True

View File

@ -65,6 +65,7 @@ elif hasattr(configuration, 'DATABASE') and hasattr(configuration, 'DATABASES'):
ADMINS = getattr(configuration, 'ADMINS', [])
ALLOW_TOKEN_RETRIEVAL = getattr(configuration, 'ALLOW_TOKEN_RETRIEVAL', False)
ALLOWED_HOSTS = getattr(configuration, 'ALLOWED_HOSTS') # Required
API_TOKEN_PEPPERS = getattr(configuration, 'API_TOKEN_PEPPERS', {})
AUTH_PASSWORD_VALIDATORS = getattr(configuration, 'AUTH_PASSWORD_VALIDATORS', [
{
"NAME": "django.contrib.auth.password_validation.MinimumLengthValidator",
@ -215,6 +216,13 @@ if len(SECRET_KEY) < 50:
f" python {BASE_DIR}/generate_secret_key.py"
)
# Validate API token peppers
for key in API_TOKEN_PEPPERS:
if type(key) is not int:
raise ImproperlyConfigured(f"Invalid API_TOKEN_PEPPERS key: {key}. All keys must be integers.")
if not API_TOKEN_PEPPERS:
warnings.warn("API_TOKEN_PEPPERS is not defined. v2 API tokens cannot be used.")
# Validate update repo URL and timeout
if RELEASE_CHECK_URL:
try:

View File

@ -16,35 +16,79 @@ from utilities.testing.api import APITestCase
class TokenAuthenticationTestCase(APITestCase):
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
def test_token_authentication(self):
url = reverse('dcim-api:site-list')
def test_no_token(self):
# Request without a token should return a 403
response = self.client.get(url)
response = self.client.get(reverse('dcim-api:site-list'))
self.assertEqual(response.status_code, 403)
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
def test_v1_token_valid(self):
# Create a v1 token
token = Token.objects.create(version=1, user=self.user)
# Valid token should return a 200
token = Token.objects.create(user=self.user)
response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}')
self.assertEqual(response.status_code, 200)
header = f'Token {token.token}'
response = self.client.get(reverse('dcim-api:site-list'), HTTP_AUTHORIZATION=header)
self.assertEqual(response.status_code, 200, response.data)
# Check that the token's last_used time has been updated
token.refresh_from_db()
self.assertIsNotNone(token.last_used)
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
def test_v1_token_invalid(self):
# Invalid token should return a 403
header = 'Token XXXXXXXXXX'
response = self.client.get(reverse('dcim-api:site-list'), HTTP_AUTHORIZATION=header)
self.assertEqual(response.status_code, 403)
self.assertEqual(response.data['detail'], "Invalid v1 token")
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
def test_v2_token_valid(self):
# Create a v2 token
token = Token.objects.create(version=2, user=self.user)
# Valid token should return a 200
header = f'Bearer {token.key}.{token.token}'
response = self.client.get(reverse('dcim-api:site-list'), HTTP_AUTHORIZATION=header)
self.assertEqual(response.status_code, 200, response.data)
# Check that the token's last_used time has been updated
token.refresh_from_db()
self.assertIsNotNone(token.last_used)
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
def test_v2_token_invalid(self):
# Invalid token should return a 403
header = 'Bearer XXXXXXXXXX.XXXXXXXXXX'
response = self.client.get(reverse('dcim-api:site-list'), HTTP_AUTHORIZATION=header)
self.assertEqual(response.status_code, 403)
self.assertEqual(response.data['detail'], "Invalid v2 token")
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
def test_token_expiration(self):
url = reverse('dcim-api:site-list')
# Request without a non-expired token should succeed
token = Token.objects.create(user=self.user)
response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}')
# Create v1 & v2 tokens
future = datetime.datetime(2100, 1, 1, tzinfo=datetime.timezone.utc)
token1 = Token.objects.create(version=1, user=self.user, expires=future)
token2 = Token.objects.create(version=2, user=self.user, expires=future)
# Request with a non-expired token should succeed
response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token1.token}')
self.assertEqual(response.status_code, 200)
response = self.client.get(url, HTTP_AUTHORIZATION=f'Bearer {token2.key}.{token2.token}')
self.assertEqual(response.status_code, 200)
# Request with an expired token should fail
token.expires = datetime.datetime(2020, 1, 1, tzinfo=datetime.timezone.utc)
token.save()
response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}')
past = datetime.datetime(2020, 1, 1, tzinfo=datetime.timezone.utc)
token1.expires = past
token1.save()
token2.expires = past
token2.save()
response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token1.key}')
self.assertEqual(response.status_code, 403)
response = self.client.get(url, HTTP_AUTHORIZATION=f'Bearer {token2.key}')
self.assertEqual(response.status_code, 403)
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
@ -55,28 +99,60 @@ class TokenAuthenticationTestCase(APITestCase):
'slug': 'site-1',
}
# Create v1 & v2 tokens
token1 = Token.objects.create(version=1, user=self.user, write_enabled=False)
token2 = Token.objects.create(version=2, user=self.user, write_enabled=False)
# Request with a write-disabled token should fail
token = Token.objects.create(user=self.user, write_enabled=False)
response = self.client.post(url, data, format='json', HTTP_AUTHORIZATION=f'Token {token.key}')
response = self.client.post(url, data, format='json', HTTP_AUTHORIZATION=f'Token {token1.token}')
self.assertEqual(response.status_code, 403)
response = self.client.post(url, data, format='json', HTTP_AUTHORIZATION=f'Bearer {token2.key}.{token2.token}')
self.assertEqual(response.status_code, 403)
# Request with a write-enabled token should succeed
token.write_enabled = True
token.save()
response = self.client.post(url, data, format='json', HTTP_AUTHORIZATION=f'Token {token.key}')
token1.write_enabled = True
token1.save()
token2.write_enabled = True
token2.save()
response = self.client.post(url, data, format='json', HTTP_AUTHORIZATION=f'Token {token1.token}')
self.assertEqual(response.status_code, 403)
response = self.client.post(url, data, format='json', HTTP_AUTHORIZATION=f'Bearer {token2.key}.{token2.token}')
self.assertEqual(response.status_code, 403)
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
def test_token_allowed_ips(self):
url = reverse('dcim-api:site-list')
# Create v1 & v2 tokens
token1 = Token.objects.create(version=1, user=self.user, allowed_ips=['192.0.2.0/24'])
token2 = Token.objects.create(version=2, user=self.user, allowed_ips=['192.0.2.0/24'])
# Request from a non-allowed client IP should fail
token = Token.objects.create(user=self.user, allowed_ips=['192.0.2.0/24'])
response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}', REMOTE_ADDR='127.0.0.1')
response = self.client.get(
url,
HTTP_AUTHORIZATION=f'Token {token1.token}',
REMOTE_ADDR='127.0.0.1'
)
self.assertEqual(response.status_code, 403)
response = self.client.get(
url,
HTTP_AUTHORIZATION=f'Bearer {token2.key}.{token2.token}',
REMOTE_ADDR='127.0.0.1'
)
self.assertEqual(response.status_code, 403)
# Request with an expired token should fail
response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}', REMOTE_ADDR='192.0.2.1')
# Request from an allowed client IP should succeed
response = self.client.get(
url,
HTTP_AUTHORIZATION=f'Token {token1.token}',
REMOTE_ADDR='192.0.2.1'
)
self.assertEqual(response.status_code, 200)
response = self.client.get(
url,
HTTP_AUTHORIZATION=f'Bearer {token2.key}.{token2.token}',
REMOTE_ADDR='192.0.2.1'
)
self.assertEqual(response.status_code, 200)
@ -426,8 +502,8 @@ class ObjectPermissionAPIViewTestCase(TestCase):
Create a test user and token for API calls.
"""
self.user = User.objects.create(username='testuser')
self.token = Token.objects.create(user=self.user)
self.header = {'HTTP_AUTHORIZATION': 'Token {}'.format(self.token.key)}
self.token = Token.objects.create(version=1, user=self.user)
self.header = {'HTTP_AUTHORIZATION': f'Token {self.token.token}'}
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_get_object(self):

View File

@ -14,9 +14,24 @@
<h2 class="card-header">{% trans "Token" %}</h2>
<table class="table table-hover attr-table">
<tr>
<th scope="row">{% trans "Key" %}</th>
<td>{% if settings.ALLOW_TOKEN_RETRIEVAL %}{{ object.key }}{% else %}{{ object.partial }}{% endif %}</td>
<th scope="row">{% trans "Version" %}</th>
<td>{{ object.version }}</td>
</tr>
{% if object.version == 1 %}
<tr>
<th scope="row">{% trans "Token" %}</th>
<td>{{ object.partial }}</td>
</tr>
{% else %}
<tr>
<th scope="row">{% trans "Key" %}</th>
<td>{{ object }}</td>
</tr>
<tr>
<th scope="row">{% trans "Pepper" %}</th>
<td>{{ object.pepper }}</td>
</tr>
{% endif %}
<tr>
<th scope="row">{% trans "User" %}</th>
<td>

View File

@ -1,4 +1,3 @@
from django.conf import settings
from django.contrib.auth import authenticate
from rest_framework import serializers
from rest_framework.exceptions import AuthenticationFailed, PermissionDenied
@ -15,14 +14,13 @@ __all__ = (
class TokenSerializer(ValidatedModelSerializer):
key = serializers.CharField(
min_length=40,
max_length=40,
allow_blank=True,
token = serializers.CharField(
required=False,
write_only=not settings.ALLOW_TOKEN_RETRIEVAL
default=Token.generate,
)
user = UserSerializer(
nested=True
)
user = UserSerializer(nested=True)
allowed_ips = serializers.ListField(
child=IPNetworkSerializer(),
required=False,
@ -33,15 +31,11 @@ class TokenSerializer(ValidatedModelSerializer):
class Meta:
model = Token
fields = (
'id', 'url', 'display_url', 'display', 'user', 'created', 'expires', 'last_used', 'key', 'write_enabled',
'description', 'allowed_ips',
'id', 'url', 'display_url', 'display', 'version', 'key', 'user', 'description', 'created', 'expires',
'last_used', 'write_enabled', 'pepper', 'allowed_ips', 'token',
)
brief_fields = ('id', 'url', 'display', 'key', 'write_enabled', 'description')
def to_internal_value(self, data):
if not getattr(self.instance, 'key', None) and 'key' not in data:
data['key'] = Token.generate_key()
return super().to_internal_value(data)
read_only_fields = ('key',)
brief_fields = ('id', 'url', 'display', 'version', 'key', 'write_enabled', 'description')
def validate(self, data):
@ -75,8 +69,8 @@ class TokenProvisionSerializer(TokenSerializer):
class Meta:
model = Token
fields = (
'id', 'url', 'display_url', 'display', 'user', 'created', 'expires', 'last_used', 'key', 'write_enabled',
'description', 'allowed_ips', 'username', 'password',
'id', 'url', 'display_url', 'display', 'version', 'user', 'key', 'created', 'expires', 'last_used', 'key',
'write_enabled', 'description', 'allowed_ips', 'username', 'password', 'token',
)
def validate(self, data):

17
netbox/users/choices.py Normal file
View File

@ -0,0 +1,17 @@
from django.utils.translation import gettext_lazy as _
from utilities.choices import ChoiceSet
__all__ = (
'TokenVersionChoices',
)
class TokenVersionChoices(ChoiceSet):
V1 = 1
V2 = 2
CHOICES = [
(V1, _('v1')),
(V2, _('v2')),
]

View File

@ -1,3 +1,5 @@
import string
from django.db.models import Q
@ -7,3 +9,5 @@ OBJECTPERMISSION_OBJECT_TYPES = Q(
)
CONSTRAINT_TOKEN_USER = '$user'
TOKEN_CHARSET = string.ascii_letters + string.digits

View File

@ -133,7 +133,7 @@ class TokenFilterSet(BaseFilterSet):
class Meta:
model = Token
fields = ('id', 'key', 'write_enabled', 'description', 'last_used')
fields = ('id', 'version', 'key', 'pepper', 'write_enabled', 'description', 'last_used')
def search(self, queryset, name, value):
if not value.strip():

View File

@ -1,6 +1,7 @@
from django import forms
from django.utils.translation import gettext as _
from users.models import *
from users.choices import TokenVersionChoices
from utilities.forms import CSVModelForm
@ -34,12 +35,18 @@ class UserImportForm(CSVModelForm):
class TokenImportForm(CSVModelForm):
key = forms.CharField(
label=_('Key'),
version = forms.ChoiceField(
choices=TokenVersionChoices,
initial=TokenVersionChoices.V2,
required=False,
help_text=_("If no key is provided, one will be generated automatically.")
help_text=_("Specify version 1 or 2 (v2 will be used by default)")
)
token = forms.CharField(
label=_('Token'),
required=False,
help_text=_("If no token is provided, one will be generated automatically.")
)
class Meta:
model = Token
fields = ('user', 'key', 'write_enabled', 'expires', 'description',)
fields = ('user', 'version', 'token', 'write_enabled', 'expires', 'description',)

View File

@ -3,6 +3,7 @@ from django.utils.translation import gettext_lazy as _
from netbox.forms import NetBoxModelFilterSetForm
from netbox.forms.mixins import SavedFiltersMixin
from users.choices import TokenVersionChoices
from users.models import Group, ObjectPermission, Token, User
from utilities.forms import BOOLEAN_WITH_BLANK_CHOICES, FilterForm
from utilities.forms.fields import DynamicModelMultipleChoiceField
@ -110,7 +111,11 @@ class TokenFilterForm(SavedFiltersMixin, FilterForm):
model = Token
fieldsets = (
FieldSet('q', 'filter_id',),
FieldSet('user_id', 'write_enabled', 'expires', 'last_used', name=_('Token')),
FieldSet('version', 'user_id', 'write_enabled', 'expires', 'last_used', name=_('Token')),
)
version = forms.ChoiceField(
choices=TokenVersionChoices,
required=False,
)
user_id = DynamicModelMultipleChoiceField(
queryset=User.objects.all(),

View File

@ -12,6 +12,7 @@ from core.models import ObjectType
from ipam.formfields import IPNetworkFormField
from ipam.validators import prefix_validator
from netbox.preferences import PREFERENCES
from users.choices import TokenVersionChoices
from users.constants import *
from users.models import *
from utilities.data import flatten_dict
@ -115,10 +116,10 @@ class UserConfigForm(forms.ModelForm, metaclass=UserConfigFormMetaclass):
class UserTokenForm(forms.ModelForm):
key = forms.CharField(
label=_('Key'),
token = forms.CharField(
label=_('Token'),
help_text=_(
'Keys must be at least 40 characters in length. <strong>Be sure to record your key</strong> prior to '
'Tokens must be at least 40 characters in length. <strong>Be sure to record your key</strong> prior to '
'submitting this form, as it may no longer be accessible once the token has been created.'
),
widget=forms.TextInput(
@ -138,7 +139,7 @@ class UserTokenForm(forms.ModelForm):
class Meta:
model = Token
fields = [
'key', 'write_enabled', 'expires', 'description', 'allowed_ips',
'version', 'token', 'write_enabled', 'expires', 'description', 'allowed_ips',
]
widgets = {
'expires': DateTimePicker(),
@ -147,13 +148,27 @@ class UserTokenForm(forms.ModelForm):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Omit the key field if token retrieval is not permitted
if self.instance.pk and not settings.ALLOW_TOKEN_RETRIEVAL:
del self.fields['key']
if self.instance.pk:
# Disable the version & user fields for existing Tokens
self.fields['version'].disabled = True
self.fields['user'].disabled = True
# Omit the key field when editing an existing token if token retrieval is not permitted
if self.instance.v1 and settings.ALLOW_TOKEN_RETRIEVAL:
self.fields['token'].initial = self.instance.key
else:
del self.fields['token']
# Generate an initial random key if none has been specified
if not self.instance.pk and not self.initial.get('key'):
self.initial['key'] = Token.generate_key()
if self.instance._state.adding and not self.initial.get('token'):
self.initial['version'] = TokenVersionChoices.V2
self.initial['token'] = Token.generate()
def save(self, commit=True):
if self.cleaned_data.get('token'):
self.instance.token = self.cleaned_data['token']
return super().save(commit=commit)
class TokenForm(UserTokenForm):
@ -165,7 +180,7 @@ class TokenForm(UserTokenForm):
class Meta:
model = Token
fields = [
'user', 'key', 'write_enabled', 'expires', 'description', 'allowed_ips',
'version', 'token', 'user', 'write_enabled', 'expires', 'description', 'allowed_ips',
]
widgets = {
'expires': DateTimePicker(),

View File

@ -0,0 +1,65 @@
import django.core.validators
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('users', '0013_user_remove_is_staff'),
]
operations = [
# Rename the original key field to "plaintext"
migrations.RenameField(
model_name='token',
old_name='key',
new_name='plaintext',
),
migrations.RunSQL(
sql="ALTER INDEX IF EXISTS users_token_key_820deccd_like RENAME TO users_token_plaintext_46c6f315_like",
),
migrations.RunSQL(
sql="ALTER INDEX IF EXISTS users_token_key_key RENAME TO users_token_plaintext_key",
),
# Make plaintext (formerly key) nullable for v2 tokens
migrations.AlterField(
model_name='token',
name='plaintext',
field=models.CharField(
max_length=40,
unique=True,
blank=True,
null=True,
validators=[django.core.validators.MinLengthValidator(40)]
),
),
# Add version field to distinguish v1 and v2 tokens
migrations.AddField(
model_name='token',
name='version',
field=models.PositiveSmallIntegerField(default=1), # Mark all existing Tokens as v1
preserve_default=False,
),
# Change the default version for new tokens to v2
migrations.AlterField(
model_name='token',
name='version',
field=models.PositiveSmallIntegerField(default=2),
),
# Add new key, pepper, and hmac_digest fields for v2 tokens
migrations.AddField(
model_name='token',
name='key',
field=models.CharField(blank=True, max_length=16, null=True, unique=True),
),
migrations.AddField(
model_name='token',
name='pepper',
field=models.PositiveSmallIntegerField(blank=True, null=True),
),
migrations.AddField(
model_name='token',
name='hmac_digest',
field=models.CharField(blank=True, max_length=64, null=True),
),
]

View File

@ -1,8 +1,12 @@
import binascii
import hashlib
import hmac
import random
import os
from django.conf import settings
from django.contrib.postgres.fields import ArrayField
from django.core.exceptions import ValidationError
from django.core.validators import MinLengthValidator
from django.db import models
from django.urls import reverse
@ -11,6 +15,9 @@ from django.utils.translation import gettext_lazy as _
from netaddr import IPNetwork
from ipam.fields import IPNetworkField
from users.choices import TokenVersionChoices
from users.constants import TOKEN_CHARSET
from users.utils import get_current_pepper
from utilities.querysets import RestrictedQuerySet
__all__ = (
@ -23,11 +30,21 @@ class Token(models.Model):
An API token used for user authentication. This extends the stock model to allow each user to have multiple tokens.
It also supports setting an expiration time and toggling write ability.
"""
version = models.PositiveSmallIntegerField(
verbose_name=_('version'),
choices=TokenVersionChoices,
default=TokenVersionChoices.V2,
)
user = models.ForeignKey(
to='users.User',
on_delete=models.CASCADE,
related_name='tokens'
)
description = models.CharField(
verbose_name=_('description'),
max_length=200,
blank=True
)
created = models.DateTimeField(
verbose_name=_('created'),
auto_now_add=True
@ -42,21 +59,40 @@ class Token(models.Model):
blank=True,
null=True
)
key = models.CharField(
verbose_name=_('key'),
max_length=40,
unique=True,
validators=[MinLengthValidator(40)]
)
write_enabled = models.BooleanField(
verbose_name=_('write enabled'),
default=True,
help_text=_('Permit create/update/delete operations using this key')
)
description = models.CharField(
verbose_name=_('description'),
max_length=200,
blank=True
# For legacy v1 tokens, this field stores the plaintext 40-char token value. Not used for v2.
plaintext = models.CharField(
verbose_name=_('plaintext'),
max_length=40,
unique=True,
blank=True,
null=True,
validators=[MinLengthValidator(40)],
)
key = models.CharField(
verbose_name=_('key'),
max_length=16,
unique=True,
blank=True,
null=True,
help_text=_('v2 token identification key'),
)
pepper = models.PositiveSmallIntegerField(
verbose_name=_('pepper'),
blank=True,
null=True,
help_text=_('ID of the cryptographic pepper used to hash the token (v2 only)'),
)
hmac_digest = models.CharField(
verbose_name=_('digest'),
max_length=64,
blank=True,
null=True,
help_text=_('SHA256 hash of the token and pepper (v2 only)'),
)
allowed_ips = ArrayField(
base_field=IPNetworkField(),
@ -72,36 +108,108 @@ class Token(models.Model):
objects = RestrictedQuerySet.as_manager()
class Meta:
ordering = ('-created',)
verbose_name = _('token')
verbose_name_plural = _('tokens')
ordering = ('-created',)
def __init__(self, *args, token=None, **kwargs):
super().__init__(*args, **kwargs)
self.token = token
def __str__(self):
return self.key if settings.ALLOW_TOKEN_RETRIEVAL else self.partial
if self.v1:
return self.partial
return self.key
def get_absolute_url(self):
return reverse('users:token', args=[self.pk])
@property
def v1(self):
return self.version == 1
@property
def v2(self):
return self.version == 2
@property
def partial(self):
return f'**********************************{self.key[-6:]}' if self.key else ''
return f'**********************************{self.plaintext[-6:]}' if self.plaintext else ''
@property
def token(self):
return getattr(self, '_token', None)
@token.setter
def token(self, value):
self._token = value
if value is not None:
if self.v1:
self.plaintext = value
elif self.v2:
self.key = self.key or self.generate(16)
self.update_digest()
def clean(self):
if self._state.adding and self.v2 and not settings.API_TOKEN_PEPPERS:
raise ValidationError(_("Cannot create v2 tokens: API_TOKEN_PEPPERS is not defined."))
def save(self, *args, **kwargs):
if not self.key:
self.key = self.generate_key()
# If creating a new Token and no token value has been specified, generate one
if self._state.adding and self.token is None:
self.token = self.generate()
return super().save(*args, **kwargs)
@staticmethod
def generate_key():
# Generate a random 160-bit key expressed in hexadecimal.
"""
DEPRECATED: Generate and return a random 160-bit key expressed in hexadecimal.
"""
return binascii.hexlify(os.urandom(20)).decode()
@staticmethod
def generate(length=40):
"""
Generate and return a random token value of the given length.
"""
return ''.join(random.choice(TOKEN_CHARSET) for _ in range(length))
def update_digest(self):
"""
Recalculate and save the HMAC digest using the currently defined pepper and token values.
"""
self.pepper, pepper_value = get_current_pepper()
self.hmac_digest = hmac.new(
pepper_value.encode('utf-8'),
self.token.encode('utf-8'),
hashlib.sha256
).hexdigest()
@property
def is_expired(self):
if self.expires is None or timezone.now() < self.expires:
return False
return True
def validate(self, token):
"""
Returns true if the given token value validates.
"""
if self.is_expired:
return False
if self.v1:
return token == self.key
if self.v2:
try:
pepper = settings.API_TOKEN_PEPPERS[self.pepper]
except KeyError:
# Invalid pepper ID
return False
digest = hmac.new(pepper.encode('utf-8'), token.encode('utf-8'), hashlib.sha256).hexdigest()
return digest == self.hmac_digest
def validate_client_ip(self, client_ip):
"""
Validate the API client IP address against the source IP restrictions (if any) set on the token.

View File

@ -22,7 +22,8 @@ class TokenTable(UserTokenTable):
class Meta(NetBoxTable.Meta):
model = Token
fields = (
'pk', 'id', 'key', 'user', 'description', 'write_enabled', 'created', 'expires', 'last_used', 'allowed_ips',
'pk', 'id', 'version', 'key', 'pepper', 'user', 'description', 'write_enabled', 'created', 'expires',
'last_used', 'allowed_ips',
)

View File

@ -197,7 +197,7 @@ class TokenTest(
APIViewTestCases.DeleteObjectViewTestCase
):
model = Token
brief_fields = ['description', 'display', 'id', 'key', 'url', 'write_enabled']
brief_fields = ['description', 'display', 'id', 'key', 'url', 'version', 'write_enabled']
bulk_update_data = {
'description': 'New description',
}
@ -256,8 +256,8 @@ class TokenTest(
response = self.client.post(url, data, format='json', **self.header)
self.assertEqual(response.status_code, 201)
self.assertIn('key', response.data)
self.assertEqual(len(response.data['key']), 40)
self.assertIn('token', response.data)
self.assertEqual(len(response.data['token']), 40)
self.assertEqual(response.data['description'], data['description'])
self.assertEqual(response.data['expires'], data['expires'])
token = Token.objects.get(user=user)

View File

@ -266,7 +266,7 @@ class ObjectPermissionTestCase(TestCase, BaseFilterSetTests):
class TokenTestCase(TestCase, BaseFilterSetTests):
queryset = Token.objects.all()
filterset = filtersets.TokenFilterSet
ignore_fields = ('allowed_ips',)
ignore_fields = ('plaintext', 'hmac_digest', 'allowed_ips')
@classmethod
def setUpTestData(cls):
@ -282,21 +282,39 @@ class TokenTestCase(TestCase, BaseFilterSetTests):
past_date = make_aware(datetime.datetime(2000, 1, 1))
tokens = (
Token(
user=users[0], key=Token.generate_key(), expires=future_date, write_enabled=True, description='foobar1'
version=1,
user=users[0],
expires=future_date,
write_enabled=True,
description='foobar1',
),
Token(
user=users[1], key=Token.generate_key(), expires=future_date, write_enabled=True, description='foobar2'
version=2,
user=users[1],
expires=future_date,
write_enabled=True,
description='foobar2',
),
Token(
user=users[2], key=Token.generate_key(), expires=past_date, write_enabled=False
version=2,
user=users[2],
expires=past_date,
write_enabled=False,
),
)
Token.objects.bulk_create(tokens)
for token in tokens:
token.save()
def test_q(self):
params = {'q': 'foobar1'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
def test_version(self):
params = {'version': 1}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
params = {'version': 2}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_user(self):
users = User.objects.order_by('id')[:2]
params = {'user_id': [users[0].pk, users[1].pk]}
@ -313,7 +331,7 @@ class TokenTestCase(TestCase, BaseFilterSetTests):
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
def test_key(self):
tokens = Token.objects.all()[:2]
tokens = Token.objects.filter(version=2)
params = {'key': [tokens[0].key, tokens[1].key]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)

View File

@ -215,6 +215,7 @@ class TokenTestCase(
):
model = Token
maxDiff = None
validation_excluded_fields = ['token', 'user']
@classmethod
def setUpTestData(cls):
@ -223,32 +224,34 @@ class TokenTestCase(
create_test_user('User 2'),
)
tokens = (
Token(key='123456789012345678901234567890123456789A', user=users[0]),
Token(key='123456789012345678901234567890123456789B', user=users[0]),
Token(key='123456789012345678901234567890123456789C', user=users[1]),
Token(user=users[0]),
Token(user=users[0]),
Token(user=users[1]),
)
Token.objects.bulk_create(tokens)
for token in tokens:
token.save()
cls.form_data = {
'version': 2,
'token': '4F9DAouzURLbicyoG55htImgqQ0b4UZHP5LUYgl5',
'user': users[0].pk,
'key': '1234567890123456789012345678901234567890',
'description': 'testdescription',
'description': 'Test token',
}
cls.csv_data = (
"key,user,description",
f"123456789012345678901234567890123456789D,{users[0].pk},testdescriptionD",
f"123456789012345678901234567890123456789E,{users[1].pk},testdescriptionE",
f"123456789012345678901234567890123456789F,{users[1].pk},testdescriptionF",
"token,user,description",
f"123456789012345678901234567890123456789A,{users[0].pk},Test token",
f"123456789012345678901234567890123456789B,{users[1].pk},Test token",
f"123456789012345678901234567890123456789C,{users[1].pk},Test token",
)
cls.csv_update_data = (
"id,description",
f"{tokens[0].pk},testdescriptionH",
f"{tokens[1].pk},testdescriptionI",
f"{tokens[2].pk},testdescriptionJ",
f"{tokens[0].pk},New description",
f"{tokens[1].pk},New description",
f"{tokens[2].pk},New description",
)
cls.bulk_edit_data = {
'description': 'newdescription',
'description': 'New description',
}

View File

@ -1,5 +1,12 @@
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from social_core.storage import NO_ASCII_REGEX, NO_SPECIAL_REGEX
__all__ = (
'clean_username',
'get_current_pepper',
)
def clean_username(value):
"""Clean username removing any unsupported character"""
@ -7,3 +14,13 @@ def clean_username(value):
value = NO_SPECIAL_REGEX.sub('', value)
value = value.replace(':', '')
return value
def get_current_pepper():
"""
Return the ID and value of the newest (highest ID) cryptographic pepper.
"""
if len(settings.API_TOKEN_PEPPERS) < 1:
raise ImproperlyConfigured("Must define API_TOKEN_PEPPERS to use v2 API tokens")
newest_id = sorted(settings.API_TOKEN_PEPPERS)[-1]
return newest_id, settings.API_TOKEN_PEPPERS[newest_id]

View File

@ -49,8 +49,8 @@ class APITestCase(ModelTestCase):
# Create the test user and assign permissions
self.user = User.objects.create_user(username='testuser')
self.add_permissions(*self.user_permissions)
self.token = Token.objects.create(user=self.user)
self.header = {'HTTP_AUTHORIZATION': f'Token {self.token.key}'}
self.token = Token.objects.create(version=1, user=self.user)
self.header = {'HTTP_AUTHORIZATION': f'Token {self.token.plaintext}'}
def _get_view_namespace(self):
return f'{self.view_namespace or self.model._meta.app_label}-api'
@ -153,6 +153,7 @@ class APIViewTestCases:
url = f'{self._get_list_url()}?brief=1'
response = self.client.get(url, **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(len(response.data['results']), self._get_queryset().count())
self.assertEqual(sorted(response.data['results'][0]), self.brief_fields)

View File

@ -240,10 +240,12 @@ class ViewTestCases:
:form_data: Data to be used when updating the first existing object.
"""
form_data = {}
form_edit_data = {}
validation_excluded_fields = []
def test_edit_object_without_permission(self):
instance = self._get_queryset().first()
form_data = self.form_edit_data or self.form_data
# Try GET without permission
with disable_warnings('django.request'):
@ -252,7 +254,7 @@ class ViewTestCases:
# Try POST without permission
request = {
'path': self._get_url('edit', instance),
'data': post_data(self.form_data),
'data': post_data(form_data),
}
with disable_warnings('django.request'):
self.assertHttpStatus(self.client.post(**request), 403)
@ -260,6 +262,7 @@ class ViewTestCases:
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], EXEMPT_EXCLUDE_MODELS=[])
def test_edit_object_with_permission(self):
instance = self._get_queryset().first()
form_data = self.form_edit_data or self.form_data
# Assign model-level permission
obj_perm = ObjectPermission(
@ -275,21 +278,21 @@ class ViewTestCases:
# Add custom field data if the model supports it
if issubclass(self.model, CustomFieldsMixin):
add_custom_field_data(self.form_data, self.model)
add_custom_field_data(form_data, self.model)
# If supported, add a changelog message
if issubclass(self.model, ChangeLoggingMixin):
if 'changelog_message' not in self.form_data:
self.form_data['changelog_message'] = get_random_string(10)
if 'changelog_message' not in form_data:
form_data['changelog_message'] = get_random_string(10)
# Try POST with model-level permission
request = {
'path': self._get_url('edit', instance),
'data': post_data(self.form_data),
'data': post_data(form_data),
}
self.assertHttpStatus(self.client.post(**request), 302)
instance = self._get_queryset().get(pk=instance.pk)
self.assertInstanceEqual(instance, self.form_data, exclude=self.validation_excluded_fields)
self.assertInstanceEqual(instance, form_data, exclude=self.validation_excluded_fields)
# Verify ObjectChange creation
if issubclass(self.model, ChangeLoggingMixin):
@ -299,11 +302,12 @@ class ViewTestCases:
)
self.assertEqual(len(objectchanges), 1)
self.assertEqual(objectchanges[0].action, ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(objectchanges[0].message, self.form_data['changelog_message'])
self.assertEqual(objectchanges[0].message, form_data['changelog_message'])
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], EXEMPT_EXCLUDE_MODELS=[])
def test_edit_object_with_constrained_permission(self):
instance1, instance2 = self._get_queryset().all()[:2]
form_data = self.form_edit_data or self.form_data
# Assign constrained permission
obj_perm = ObjectPermission(
@ -324,16 +328,16 @@ class ViewTestCases:
# Try to edit a permitted object
request = {
'path': self._get_url('edit', instance1),
'data': post_data(self.form_data),
'data': post_data(form_data),
}
self.assertHttpStatus(self.client.post(**request), 302)
instance = self._get_queryset().get(pk=instance1.pk)
self.assertInstanceEqual(instance, self.form_data, exclude=self.validation_excluded_fields)
self.assertInstanceEqual(instance, form_data, exclude=self.validation_excluded_fields)
# Try to edit a non-permitted object
request = {
'path': self._get_url('edit', instance2),
'data': post_data(self.form_data),
'data': post_data(form_data),
}
self.assertHttpStatus(self.client.post(**request), 404)