diff --git a/netbox/secrets/api/views.py b/netbox/secrets/api/views.py index b4e75d82d..0ac087c63 100644 --- a/netbox/secrets/api/views.py +++ b/netbox/secrets/api/views.py @@ -1,7 +1,5 @@ import base64 -from Crypto.Cipher import XOR from Crypto.PublicKey import RSA -import os from django.http import HttpResponseBadRequest @@ -14,7 +12,7 @@ from rest_framework.viewsets import ModelViewSet from extras.api.renderers import FormlessBrowsableAPIRenderer, FreeRADIUSClientsRenderer from secrets.filters import SecretFilter -from secrets.models import Secret, SecretRole, UserKey +from secrets.models import Secret, SecretRole, SessionKey, UserKey from utilities.api import WritableSerializerMixin from . import serializers @@ -57,16 +55,25 @@ class SecretViewSet(WritableSerializerMixin, ModelViewSet): permission_classes = [IsAuthenticated] def _get_master_key(self, request): - cached_key = request.session.get('cached_key', None) - session_key = request.COOKIES.get('session_key', None) - if cached_key is None or session_key is None: + + # Check for a session key provided as a cookie or header + if 'session_key' in request.COOKIES: + session_key = base64.b64decode(request.COOKIES['session_key']) + elif 'HTTP_X_SESSION_KEY' in request.META: + session_key = base64.b64decode(request.META['HTTP_X_SESSION_KEY']) + else: return None - cached_key = base64.b64decode(cached_key) - session_key = base64.b64decode(session_key) + # Retrieve session key cipher (if any) for the current user + try: + sk = SessionKey.objects.get(user=request.user) + except SessionKey.DoesNotExist: + return None + + # Recover master key + # TODO: Exception handling + master_key = sk.get_master_key(session_key) - xor = XOR.new(session_key) - master_key = xor.encrypt(cached_key) return master_key def retrieve(self, request, *args, **kwargs): @@ -100,30 +107,6 @@ class SecretViewSet(WritableSerializerMixin, ModelViewSet): return Response(serializer.data) -class RSAKeyGeneratorView(APIView): - """ - Generate a new RSA key pair for a user. Authenticated because it's a ripe avenue for DoS. - """ - permission_classes = [IsAuthenticated] - - def get(self, request): - - # Determine what size key to generate - key_size = request.GET.get('key_size', 2048) - if key_size not in range(2048, 4097, 256): - key_size = 2048 - - # Export RSA private and public keys in PEM format - key = RSA.generate(key_size) - private_key = key.exportKey('PEM') - public_key = key.publickey().exportKey('PEM') - - return Response({ - 'private_key': private_key, - 'public_key': public_key, - }) - - class GetSessionKey(APIView): """ Cache an encrypted copy of the master key derived from the submitted private key. @@ -150,16 +133,42 @@ class GetSessionKey(APIView): if master_key is None: return HttpResponseBadRequest(ERR_PRIVKEY_INVALID) - # Generate a random 256-bit encryption key - session_key = os.urandom(32) - xor = XOR.new(session_key) - cached_key = xor.encrypt(master_key) + # Delete the existing SessionKey for this user if one exists + SessionKey.objects.filter(user=request.user).delete() - # Save XORed copy of the master key - request.session['cached_key'] = base64.b64encode(cached_key) + # Create a new SessionKey + sk = SessionKey(user=request.user) + sk.save(master_key=master_key) + # Return the session key both as JSON and as a cookie response = Response({ - 'session_key': base64.b64encode(session_key), + 'session_key': base64.b64encode(sk.key), + 'expiration_time': sk.expiration_time, }) - response.set_cookie('session_key', base64.b64encode(session_key)) + # TODO: Limit cookie path to secrets API URLs + response.set_cookie('session_key', base64.b64encode(sk.key), expires=sk.expiration_time) return response + + +class RSAKeyGeneratorView(APIView): + """ + Generate a new RSA key pair for a user. Authenticated because it's a ripe avenue for DoS. + """ + permission_classes = [IsAuthenticated] + + def get(self, request): + + # Determine what size key to generate + key_size = request.GET.get('key_size', 2048) + if key_size not in range(2048, 4097, 256): + key_size = 2048 + + # Export RSA private and public keys in PEM format + key = RSA.generate(key_size) + private_key = key.exportKey('PEM') + public_key = key.publickey().exportKey('PEM') + + return Response({ + 'private_key': private_key, + 'public_key': public_key, + }) diff --git a/netbox/secrets/migrations/0002_add_sessionkeys.py b/netbox/secrets/migrations/0002_add_sessionkeys.py new file mode 100644 index 000000000..c4b848b35 --- /dev/null +++ b/netbox/secrets/migrations/0002_add_sessionkeys.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.10.4 on 2017-02-03 17:10 +from __future__ import unicode_literals + +from django.conf import settings +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ('secrets', '0001_initial'), + ] + + operations = [ + migrations.CreateModel( + name='SessionKey', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('cipher', models.BinaryField(max_length=512)), + ('hash', models.CharField(editable=False, max_length=128)), + ('created', models.DateTimeField(auto_now_add=True)), + ('expiration_time', models.DateTimeField(blank=True, editable=False, null=True)), + ('user', models.OneToOneField(editable=False, on_delete=django.db.models.deletion.CASCADE, related_name='session_key', to=settings.AUTH_USER_MODEL)), + ], + options={ + 'ordering': ['user__username'], + }, + ), + migrations.AlterField( + model_name='userkey', + name='user', + field=models.OneToOneField(editable=False, on_delete=django.db.models.deletion.CASCADE, related_name='user_key', to=settings.AUTH_USER_MODEL), + ), + ] diff --git a/netbox/secrets/models.py b/netbox/secrets/models.py index a0c3e6f8b..8542c9bf3 100644 --- a/netbox/secrets/models.py +++ b/netbox/secrets/models.py @@ -1,5 +1,6 @@ +import datetime import os -from Crypto.Cipher import AES, PKCS1_OAEP +from Crypto.Cipher import AES, PKCS1_OAEP, XOR from Crypto.PublicKey import RSA from django.conf import settings @@ -8,6 +9,7 @@ from django.contrib.auth.models import Group, User from django.core.exceptions import ValidationError from django.core.urlresolvers import reverse from django.db import models +from django.utils import timezone from django.utils.encoding import force_bytes, python_2_unicode_compatible from dcim.models import Device @@ -16,11 +18,13 @@ from utilities.models import CreatedUpdatedModel from .hashers import SecretValidationHasher -def generate_master_key(): +def generate_random_key(bits=256): """ - Generate a new 256-bit (32 bytes) AES key to be used for symmetric encryption of secrets. + Generate a random encryption key. Sizes is given in bits and must be in increments of 32. """ - return os.urandom(32) + if bits % 32: + raise Exception("Invalid key size ({}). Key sizes must be in increments of 32 bits.".format(bits)) + return os.urandom(bits / 8) def encrypt_master_key(master_key, public_key): @@ -41,6 +45,14 @@ def decrypt_master_key(master_key_cipher, private_key): return cipher.decrypt(master_key_cipher) +def xor_keys(key_a, key_b): + """ + Return the binary XOR of two given keys. + """ + xor = XOR.new(key_a) + return xor.encrypt(key_b) + + class UserKeyQuerySet(models.QuerySet): def active(self): @@ -58,7 +70,7 @@ class UserKey(CreatedUpdatedModel): copy of the master encryption key. The encrypted instance of the master key can be decrypted only with the user's matching (private) decryption key. """ - user = models.OneToOneField(User, related_name='user_key', verbose_name='User') + user = models.OneToOneField(User, related_name='user_key', editable=False) public_key = models.TextField(verbose_name='RSA public key') master_key_cipher = models.BinaryField(max_length=512, blank=True, null=True, editable=False) @@ -121,7 +133,7 @@ class UserKey(CreatedUpdatedModel): # If no other active UserKeys exist, generate a new master key and use it to activate this UserKey. if self.is_filled() and not self.is_active() and not UserKey.objects.active().count(): - master_key = generate_master_key() + master_key = generate_random_key() self.master_key_cipher = encrypt_master_key(master_key, self.public_key) super(UserKey, self).save(*args, **kwargs) @@ -171,6 +183,58 @@ class UserKey(CreatedUpdatedModel): self.save() +@python_2_unicode_compatible +class SessionKey(models.Model): + """ + A SessionKey stores a User's temporary key to be used for the encryption and decryption of secrets. + """ + user = models.OneToOneField(User, related_name='session_key', editable=False) + cipher = models.BinaryField(max_length=512, editable=False) + hash = models.CharField(max_length=128, editable=False) + created = models.DateTimeField(auto_now_add=True) + expiration_time = models.DateTimeField(blank=True, null=True, editable=False) + + key = None + + class Meta: + ordering = ['user__username'] + + def __str__(self): + return self.user.username + + def save(self, master_key=None, *args, **kwargs): + + if master_key is None: + raise Exception("The master key must be provided to save a session key.") + + # Generate a random 256-bit session key if one is not already defined + if self.key is None: + self.key = generate_random_key() + + # Generate SHA256 hash using Django's built-in password hashing mechanism + self.hash = make_password(self.key) + + # Encrypt master key using the session key + self.cipher = xor_keys(self.key, master_key) + + # Calculate expiration time + # TODO: Define a SESSION_KEY_MAX_AGE configuration setting + self.expiration_time = timezone.now() + datetime.timedelta(hours=12) + + super(SessionKey, self).save(*args, **kwargs) + + def get_master_key(self, session_key): + + # Validate the provided session key + if not check_password(session_key, self.hash): + raise Exception("Invalid session key") + + # Decrypt master key using provided session key + master_key = xor_keys(session_key, self.cipher) + + return master_key + + @python_2_unicode_compatible class SecretRole(models.Model): """ diff --git a/netbox/secrets/tests/test_models.py b/netbox/secrets/tests/test_models.py index 132487765..e668f1185 100644 --- a/netbox/secrets/tests/test_models.py +++ b/netbox/secrets/tests/test_models.py @@ -5,7 +5,7 @@ from django.contrib.auth.models import User from django.core.exceptions import ValidationError from django.test import TestCase -from secrets.models import UserKey, Secret, generate_master_key, encrypt_master_key, decrypt_master_key +from secrets.models import UserKey, Secret, encrypt_master_key, decrypt_master_key, generate_random_key from secrets.hashers import SecretValidationHasher @@ -33,7 +33,7 @@ class UserKeyTestCase(TestCase): """ Validate the activation of a UserKey. """ - master_key = generate_master_key() + master_key = generate_random_key() alice_uk = UserKey(user=User.objects.get(username='alice'), public_key=self.TEST_KEYS['alice_public']) self.assertFalse(alice_uk.is_active(), "Inactive UserKey is_active() did not return False") alice_uk.activate(master_key) @@ -62,7 +62,7 @@ class UserKeyTestCase(TestCase): """ Test the decryption of a master key using the user's private key. """ - master_key = generate_master_key() + master_key = generate_random_key() alice_uk = UserKey(user=User.objects.get(username='alice'), public_key=self.TEST_KEYS['alice_public']) alice_uk.activate(master_key) retrieved_master_key = alice_uk.get_master_key(self.TEST_KEYS['alice_private']) @@ -72,7 +72,7 @@ class UserKeyTestCase(TestCase): """ Ensure that an exception is raised when attempting to retrieve a secret key using an invalid private key. """ - secret_key = generate_master_key() + secret_key = generate_random_key() secret_key_cipher = encrypt_master_key(secret_key, self.TEST_KEYS['alice_public']) try: decrypted_secret_key = decrypt_master_key(secret_key_cipher, self.TEST_KEYS['bob_private']) @@ -88,7 +88,7 @@ class SecretTestCase(TestCase): Test basic encryption and decryption functionality using a random master key. """ plaintext = "FooBar123" - secret_key = generate_master_key() + secret_key = generate_random_key() s = Secret(plaintext=plaintext) s.encrypt(secret_key) @@ -118,7 +118,7 @@ class SecretTestCase(TestCase): Generate 50 Secrets using the same plaintext and check for duplicate IVs or payloads. """ plaintext = "1234567890abcdef" - secret_key = generate_master_key() + secret_key = generate_random_key() ivs = [] ciphertexts = [] for i in range(1, 51):