1
0
mirror of https://github.com/netbox-community/netbox.git synced 2024-05-10 07:54:54 +00:00

Implemented SessionKeys for secrets

This commit is contained in:
Jeremy Stretch
2017-02-03 12:49:32 -05:00
parent cf66f67fb6
commit a42eeb12d2
4 changed files with 164 additions and 54 deletions

View File

@ -1,7 +1,5 @@
import base64
from Crypto.Cipher import XOR
from Crypto.PublicKey import RSA
import os
from django.http import HttpResponseBadRequest
@ -14,7 +12,7 @@ from rest_framework.viewsets import ModelViewSet
from extras.api.renderers import FormlessBrowsableAPIRenderer, FreeRADIUSClientsRenderer
from secrets.filters import SecretFilter
from secrets.models import Secret, SecretRole, UserKey
from secrets.models import Secret, SecretRole, SessionKey, UserKey
from utilities.api import WritableSerializerMixin
from . import serializers
@ -57,16 +55,25 @@ class SecretViewSet(WritableSerializerMixin, ModelViewSet):
permission_classes = [IsAuthenticated]
def _get_master_key(self, request):
cached_key = request.session.get('cached_key', None)
session_key = request.COOKIES.get('session_key', None)
if cached_key is None or session_key is None:
# Check for a session key provided as a cookie or header
if 'session_key' in request.COOKIES:
session_key = base64.b64decode(request.COOKIES['session_key'])
elif 'HTTP_X_SESSION_KEY' in request.META:
session_key = base64.b64decode(request.META['HTTP_X_SESSION_KEY'])
else:
return None
cached_key = base64.b64decode(cached_key)
session_key = base64.b64decode(session_key)
# Retrieve session key cipher (if any) for the current user
try:
sk = SessionKey.objects.get(user=request.user)
except SessionKey.DoesNotExist:
return None
# Recover master key
# TODO: Exception handling
master_key = sk.get_master_key(session_key)
xor = XOR.new(session_key)
master_key = xor.encrypt(cached_key)
return master_key
def retrieve(self, request, *args, **kwargs):
@ -100,30 +107,6 @@ class SecretViewSet(WritableSerializerMixin, ModelViewSet):
return Response(serializer.data)
class RSAKeyGeneratorView(APIView):
"""
Generate a new RSA key pair for a user. Authenticated because it's a ripe avenue for DoS.
"""
permission_classes = [IsAuthenticated]
def get(self, request):
# Determine what size key to generate
key_size = request.GET.get('key_size', 2048)
if key_size not in range(2048, 4097, 256):
key_size = 2048
# Export RSA private and public keys in PEM format
key = RSA.generate(key_size)
private_key = key.exportKey('PEM')
public_key = key.publickey().exportKey('PEM')
return Response({
'private_key': private_key,
'public_key': public_key,
})
class GetSessionKey(APIView):
"""
Cache an encrypted copy of the master key derived from the submitted private key.
@ -150,16 +133,42 @@ class GetSessionKey(APIView):
if master_key is None:
return HttpResponseBadRequest(ERR_PRIVKEY_INVALID)
# Generate a random 256-bit encryption key
session_key = os.urandom(32)
xor = XOR.new(session_key)
cached_key = xor.encrypt(master_key)
# Delete the existing SessionKey for this user if one exists
SessionKey.objects.filter(user=request.user).delete()
# Save XORed copy of the master key
request.session['cached_key'] = base64.b64encode(cached_key)
# Create a new SessionKey
sk = SessionKey(user=request.user)
sk.save(master_key=master_key)
# Return the session key both as JSON and as a cookie
response = Response({
'session_key': base64.b64encode(session_key),
'session_key': base64.b64encode(sk.key),
'expiration_time': sk.expiration_time,
})
response.set_cookie('session_key', base64.b64encode(session_key))
# TODO: Limit cookie path to secrets API URLs
response.set_cookie('session_key', base64.b64encode(sk.key), expires=sk.expiration_time)
return response
class RSAKeyGeneratorView(APIView):
"""
Generate a new RSA key pair for a user. Authenticated because it's a ripe avenue for DoS.
"""
permission_classes = [IsAuthenticated]
def get(self, request):
# Determine what size key to generate
key_size = request.GET.get('key_size', 2048)
if key_size not in range(2048, 4097, 256):
key_size = 2048
# Export RSA private and public keys in PEM format
key = RSA.generate(key_size)
private_key = key.exportKey('PEM')
public_key = key.publickey().exportKey('PEM')
return Response({
'private_key': private_key,
'public_key': public_key,
})

View File

@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.10.4 on 2017-02-03 17:10
from __future__ import unicode_literals
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('secrets', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='SessionKey',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('cipher', models.BinaryField(max_length=512)),
('hash', models.CharField(editable=False, max_length=128)),
('created', models.DateTimeField(auto_now_add=True)),
('expiration_time', models.DateTimeField(blank=True, editable=False, null=True)),
('user', models.OneToOneField(editable=False, on_delete=django.db.models.deletion.CASCADE, related_name='session_key', to=settings.AUTH_USER_MODEL)),
],
options={
'ordering': ['user__username'],
},
),
migrations.AlterField(
model_name='userkey',
name='user',
field=models.OneToOneField(editable=False, on_delete=django.db.models.deletion.CASCADE, related_name='user_key', to=settings.AUTH_USER_MODEL),
),
]

View File

@ -1,5 +1,6 @@
import datetime
import os
from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.Cipher import AES, PKCS1_OAEP, XOR
from Crypto.PublicKey import RSA
from django.conf import settings
@ -8,6 +9,7 @@ from django.contrib.auth.models import Group, User
from django.core.exceptions import ValidationError
from django.core.urlresolvers import reverse
from django.db import models
from django.utils import timezone
from django.utils.encoding import force_bytes, python_2_unicode_compatible
from dcim.models import Device
@ -16,11 +18,13 @@ from utilities.models import CreatedUpdatedModel
from .hashers import SecretValidationHasher
def generate_master_key():
def generate_random_key(bits=256):
"""
Generate a new 256-bit (32 bytes) AES key to be used for symmetric encryption of secrets.
Generate a random encryption key. Sizes is given in bits and must be in increments of 32.
"""
return os.urandom(32)
if bits % 32:
raise Exception("Invalid key size ({}). Key sizes must be in increments of 32 bits.".format(bits))
return os.urandom(bits / 8)
def encrypt_master_key(master_key, public_key):
@ -41,6 +45,14 @@ def decrypt_master_key(master_key_cipher, private_key):
return cipher.decrypt(master_key_cipher)
def xor_keys(key_a, key_b):
"""
Return the binary XOR of two given keys.
"""
xor = XOR.new(key_a)
return xor.encrypt(key_b)
class UserKeyQuerySet(models.QuerySet):
def active(self):
@ -58,7 +70,7 @@ class UserKey(CreatedUpdatedModel):
copy of the master encryption key. The encrypted instance of the master key can be decrypted only with the user's
matching (private) decryption key.
"""
user = models.OneToOneField(User, related_name='user_key', verbose_name='User')
user = models.OneToOneField(User, related_name='user_key', editable=False)
public_key = models.TextField(verbose_name='RSA public key')
master_key_cipher = models.BinaryField(max_length=512, blank=True, null=True, editable=False)
@ -121,7 +133,7 @@ class UserKey(CreatedUpdatedModel):
# If no other active UserKeys exist, generate a new master key and use it to activate this UserKey.
if self.is_filled() and not self.is_active() and not UserKey.objects.active().count():
master_key = generate_master_key()
master_key = generate_random_key()
self.master_key_cipher = encrypt_master_key(master_key, self.public_key)
super(UserKey, self).save(*args, **kwargs)
@ -171,6 +183,58 @@ class UserKey(CreatedUpdatedModel):
self.save()
@python_2_unicode_compatible
class SessionKey(models.Model):
"""
A SessionKey stores a User's temporary key to be used for the encryption and decryption of secrets.
"""
user = models.OneToOneField(User, related_name='session_key', editable=False)
cipher = models.BinaryField(max_length=512, editable=False)
hash = models.CharField(max_length=128, editable=False)
created = models.DateTimeField(auto_now_add=True)
expiration_time = models.DateTimeField(blank=True, null=True, editable=False)
key = None
class Meta:
ordering = ['user__username']
def __str__(self):
return self.user.username
def save(self, master_key=None, *args, **kwargs):
if master_key is None:
raise Exception("The master key must be provided to save a session key.")
# Generate a random 256-bit session key if one is not already defined
if self.key is None:
self.key = generate_random_key()
# Generate SHA256 hash using Django's built-in password hashing mechanism
self.hash = make_password(self.key)
# Encrypt master key using the session key
self.cipher = xor_keys(self.key, master_key)
# Calculate expiration time
# TODO: Define a SESSION_KEY_MAX_AGE configuration setting
self.expiration_time = timezone.now() + datetime.timedelta(hours=12)
super(SessionKey, self).save(*args, **kwargs)
def get_master_key(self, session_key):
# Validate the provided session key
if not check_password(session_key, self.hash):
raise Exception("Invalid session key")
# Decrypt master key using provided session key
master_key = xor_keys(session_key, self.cipher)
return master_key
@python_2_unicode_compatible
class SecretRole(models.Model):
"""

View File

@ -5,7 +5,7 @@ from django.contrib.auth.models import User
from django.core.exceptions import ValidationError
from django.test import TestCase
from secrets.models import UserKey, Secret, generate_master_key, encrypt_master_key, decrypt_master_key
from secrets.models import UserKey, Secret, encrypt_master_key, decrypt_master_key, generate_random_key
from secrets.hashers import SecretValidationHasher
@ -33,7 +33,7 @@ class UserKeyTestCase(TestCase):
"""
Validate the activation of a UserKey.
"""
master_key = generate_master_key()
master_key = generate_random_key()
alice_uk = UserKey(user=User.objects.get(username='alice'), public_key=self.TEST_KEYS['alice_public'])
self.assertFalse(alice_uk.is_active(), "Inactive UserKey is_active() did not return False")
alice_uk.activate(master_key)
@ -62,7 +62,7 @@ class UserKeyTestCase(TestCase):
"""
Test the decryption of a master key using the user's private key.
"""
master_key = generate_master_key()
master_key = generate_random_key()
alice_uk = UserKey(user=User.objects.get(username='alice'), public_key=self.TEST_KEYS['alice_public'])
alice_uk.activate(master_key)
retrieved_master_key = alice_uk.get_master_key(self.TEST_KEYS['alice_private'])
@ -72,7 +72,7 @@ class UserKeyTestCase(TestCase):
"""
Ensure that an exception is raised when attempting to retrieve a secret key using an invalid private key.
"""
secret_key = generate_master_key()
secret_key = generate_random_key()
secret_key_cipher = encrypt_master_key(secret_key, self.TEST_KEYS['alice_public'])
try:
decrypted_secret_key = decrypt_master_key(secret_key_cipher, self.TEST_KEYS['bob_private'])
@ -88,7 +88,7 @@ class SecretTestCase(TestCase):
Test basic encryption and decryption functionality using a random master key.
"""
plaintext = "FooBar123"
secret_key = generate_master_key()
secret_key = generate_random_key()
s = Secret(plaintext=plaintext)
s.encrypt(secret_key)
@ -118,7 +118,7 @@ class SecretTestCase(TestCase):
Generate 50 Secrets using the same plaintext and check for duplicate IVs or payloads.
"""
plaintext = "1234567890abcdef"
secret_key = generate_master_key()
secret_key = generate_random_key()
ivs = []
ciphertexts = []
for i in range(1, 51):