1
0
mirror of https://github.com/netbox-community/netbox.git synced 2024-05-10 07:54:54 +00:00

Closes #13729: Censor sensitive data source parameters in change log (#15032)

This commit is contained in:
Jeremy Stretch
2024-02-05 11:35:12 -05:00
committed by GitHub
parent 1a9149d7d4
commit fde9c1664a
3 changed files with 149 additions and 0 deletions

View File

@ -14,6 +14,7 @@ from django.utils import timezone
from django.utils.module_loading import import_string
from django.utils.translation import gettext as _
from netbox.constants import CENSOR_TOKEN, CENSOR_TOKEN_CHANGED
from netbox.models import PrimaryModel
from netbox.models.features import JobsMixin
from netbox.registry import registry
@ -130,6 +131,28 @@ class DataSource(JobsMixin, PrimaryModel):
'source_url': f"URLs for local sources must start with file:// (or specify no scheme)"
})
def to_objectchange(self, action):
objectchange = super().to_objectchange(action)
# Censor any backend parameters marked as sensitive in the serialized data
pre_change_params = {}
post_change_params = {}
if objectchange.prechange_data:
pre_change_params = objectchange.prechange_data.get('parameters') or {} # parameters may be None
if objectchange.postchange_data:
post_change_params = objectchange.postchange_data.get('parameters') or {}
for param in self.backend_class.sensitive_parameters:
if post_change_params.get(param):
if post_change_params[param] != pre_change_params.get(param):
# Set the "changed" token if the parameter's value has been modified
post_change_params[param] = CENSOR_TOKEN_CHANGED
else:
post_change_params[param] = CENSOR_TOKEN
if pre_change_params.get(param):
pre_change_params[param] = CENSOR_TOKEN
return objectchange
def enqueue_sync_job(self, request):
"""
Enqueue a background job to synchronize the DataSource by calling sync().

View File

@ -0,0 +1,122 @@
from django.test import TestCase
from core.models import DataSource
from extras.choices import ObjectChangeActionChoices
from netbox.constants import CENSOR_TOKEN, CENSOR_TOKEN_CHANGED
class DataSourceChangeLoggingTestCase(TestCase):
def test_password_added_on_create(self):
datasource = DataSource.objects.create(
name='Data Source 1',
type='git',
source_url='http://localhost/',
parameters={
'username': 'jeff',
'password': 'foobar123',
}
)
objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_CREATE)
self.assertIsNone(objectchange.prechange_data)
self.assertEqual(objectchange.postchange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.postchange_data['parameters']['password'], CENSOR_TOKEN_CHANGED)
def test_password_added_on_update(self):
datasource = DataSource.objects.create(
name='Data Source 1',
type='git',
source_url='http://localhost/'
)
datasource.snapshot()
# Add a blank password
datasource.parameters = {
'username': 'jeff',
'password': '',
}
objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_UPDATE)
self.assertIsNone(objectchange.prechange_data['parameters'])
self.assertEqual(objectchange.postchange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.postchange_data['parameters']['password'], '')
# Add a password
datasource.parameters = {
'username': 'jeff',
'password': 'foobar123',
}
objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(objectchange.postchange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.postchange_data['parameters']['password'], CENSOR_TOKEN_CHANGED)
def test_password_changed(self):
datasource = DataSource.objects.create(
name='Data Source 1',
type='git',
source_url='http://localhost/',
parameters={
'username': 'jeff',
'password': 'password1',
}
)
datasource.snapshot()
# Change the password
datasource.parameters['password'] = 'password2'
objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(objectchange.prechange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.prechange_data['parameters']['password'], CENSOR_TOKEN)
self.assertEqual(objectchange.postchange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.postchange_data['parameters']['password'], CENSOR_TOKEN_CHANGED)
def test_password_removed_on_update(self):
datasource = DataSource.objects.create(
name='Data Source 1',
type='git',
source_url='http://localhost/',
parameters={
'username': 'jeff',
'password': 'foobar123',
}
)
datasource.snapshot()
objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(objectchange.prechange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.prechange_data['parameters']['password'], CENSOR_TOKEN)
self.assertEqual(objectchange.postchange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.postchange_data['parameters']['password'], CENSOR_TOKEN)
# Remove the password
datasource.parameters['password'] = ''
objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(objectchange.prechange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.prechange_data['parameters']['password'], CENSOR_TOKEN)
self.assertEqual(objectchange.postchange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.postchange_data['parameters']['password'], '')
def test_password_not_modified(self):
datasource = DataSource.objects.create(
name='Data Source 1',
type='git',
source_url='http://localhost/',
parameters={
'username': 'username1',
'password': 'foobar123',
}
)
datasource.snapshot()
# Remove the password
datasource.parameters['username'] = 'username2'
objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(objectchange.prechange_data['parameters']['username'], 'username1')
self.assertEqual(objectchange.prechange_data['parameters']['password'], CENSOR_TOKEN)
self.assertEqual(objectchange.postchange_data['parameters']['username'], 'username2')
self.assertEqual(objectchange.postchange_data['parameters']['password'], CENSOR_TOKEN)

View File

@ -36,3 +36,7 @@ DEFAULT_ACTION_PERMISSIONS = {
'bulk_edit': {'change'},
'bulk_delete': {'delete'},
}
# General-purpose tokens
CENSOR_TOKEN = '********'
CENSOR_TOKEN_CHANGED = '***CHANGED***'