1
0
mirror of https://github.com/netbox-community/netbox.git synced 2024-05-10 07:54:54 +00:00
2016-03-01 11:23:03 -05:00

283 lines
11 KiB
Python

import os
from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.PublicKey import RSA
from django.conf import settings
from django.contrib.auth.hashers import make_password, check_password
from django.contrib.auth.models import User
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.core.urlresolvers import reverse
from django.db import models
from django.utils.encoding import force_bytes
def generate_master_key():
"""
Generate a new 256-bit (32 bytes) AES key to be used for symmetric encryption of secrets.
"""
return os.urandom(32)
def encrypt_master_key(master_key, public_key):
"""
Encrypt a secret key with the provided public RSA key.
"""
key = RSA.importKey(public_key)
cipher = PKCS1_OAEP.new(key)
return cipher.encrypt(master_key)
def decrypt_master_key(master_key_cipher, private_key):
"""
Decrypt a secret key with the provided private RSA key.
"""
key = RSA.importKey(private_key)
cipher = PKCS1_OAEP.new(key)
return cipher.decrypt(master_key_cipher)
class UserKeyQuerySet(models.QuerySet):
def active(self):
return self.filter(master_key_cipher__isnull=False)
def delete(self):
# Disable bulk deletion to avoid accidentally wiping out all copies of the master key.
raise Exception("Bulk deletion has been disabled.")
class UserKey(models.Model):
"""
A user's personal public RSA key.
"""
user = models.OneToOneField(User, related_name='user_key', verbose_name='User')
public_key = models.TextField(verbose_name='RSA public key')
master_key_cipher = models.BinaryField(max_length=512, blank=True, null=True, editable=False)
created = models.DateTimeField(auto_now_add=True, verbose_name='Time created')
last_modified = models.DateTimeField(auto_now=True, verbose_name='Last modified')
objects = UserKeyQuerySet.as_manager()
class Meta:
ordering = ['user__username']
permissions = (
('activate_userkey', "Can activate user keys for decryption"),
)
def __init__(self, *args, **kwargs):
super(UserKey, self).__init__(*args, **kwargs)
# Store the initial public_key and master_key_cipher to check for changes on save().
self.__initial_public_key = self.public_key
self.__initial_master_key_cipher = self.master_key_cipher
def __unicode__(self):
return self.user.username
def clean(self, *args, **kwargs):
# Validate the public key format and length.
if self.public_key:
try:
pubkey = RSA.importKey(self.public_key)
except ValueError:
raise ValidationError("Invalid RSA key format.")
except:
raise ValidationError("Something went wrong while trying to save your key. Please ensure that you're "
"uploading a valid RSA public key in PEM format (no SSH/PGP).")
# key.size() returns 1 less than the key modulus
pubkey_length = pubkey.size() + 1
if pubkey_length < settings.SECRETS_MIN_PUBKEY_SIZE:
raise ValidationError("Insufficient key length. Keys must be at least {} bits long."
.format(settings.SECRETS_MIN_PUBKEY_SIZE))
# We can't use keys bigger than our master_key_cipher field can hold
if pubkey_length > 4096:
raise ValidationError("Public key size ({}) is too large. Maximum key size is 4096 bits."
.format(pubkey_length))
super(UserKey, self).clean()
def save(self, *args, **kwargs):
# Check whether public_key has been modified. If so, nullify the initial master_key_cipher.
if self.__initial_master_key_cipher and self.public_key != self.__initial_public_key:
self.master_key_cipher = None
# If no other active UserKeys exist, generate a new master key and use it to activate this UserKey.
if self.is_filled() and not self.is_active() and not UserKey.objects.active().count():
master_key = generate_master_key()
self.master_key_cipher = encrypt_master_key(master_key, self.public_key)
super(UserKey, self).save(*args, **kwargs)
def delete(self, *args, **kwargs):
# If Secrets exist and this is the last active UserKey, prevent its deletion. Deleting the last UserKey will
# result in the master key being destroyed and rendering all Secrets inaccessible.
if Secret.objects.count() and [uk.pk for uk in UserKey.objects.active()] == [self.pk]:
raise Exception("Cannot delete the last active UserKey when Secrets exist! This would render all secrets "
"inaccessible.")
super(UserKey, self).delete(*args, **kwargs)
def is_filled(self):
"""
Returns True if the UserKey has been filled with a public RSA key.
"""
return bool(self.public_key)
is_filled.boolean = True
def is_active(self):
"""
Returns True if the UserKey has been populated with an encrypted copy of the master key.
"""
return self.master_key_cipher is not None
is_active.boolean = True
def get_master_key(self, private_key):
"""
Given the User's private key, return the encrypted master key.
"""
if not self.is_active:
raise ValueError("Unable to retrieve master key: UserKey is inactive.")
try:
return decrypt_master_key(force_bytes(self.master_key_cipher), private_key)
except ValueError:
return None
def activate(self, master_key):
"""
Activate the UserKey by saving an encrypted copy of the master key to the database.
"""
if not self.public_key:
raise Exception("Cannot activate UserKey: Its public key must be filled first.")
self.master_key_cipher = encrypt_master_key(master_key, self.public_key)
self.save()
class SecretRole(models.Model):
"""
A functional classification of secret type. For example: login credentials, SNMP communities, etc.
"""
name = models.CharField(max_length=50, unique=True)
slug = models.SlugField(unique=True)
class Meta:
ordering = ['name']
def __unicode__(self):
return self.name
class Secret(models.Model):
"""
A secret string of up to 255 bytes in length, stored as both an AES256-encrypted ciphertext and an irreversible
salted SHA256 hash (for plaintext validation).
"""
content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
object_id = models.PositiveIntegerField()
parent = GenericForeignKey('content_type', 'object_id')
role = models.ForeignKey('SecretRole', related_name='secrets', on_delete=models.PROTECT)
name = models.CharField(max_length=100, blank=True)
ciphertext = models.BinaryField(editable=False, max_length=65568) # 16B IV + 2B pad length + {62-65550}B padded
hash = models.CharField(max_length=128, editable=False)
created = models.DateTimeField(auto_now_add=True, editable=False, verbose_name='Created')
last_modified = models.DateTimeField(auto_now=True, verbose_name='Last modified')
plaintext = None
class Meta:
ordering = ['role', 'name']
permissions = (
('view_secret', "Can view secrets"),
)
def __init__(self, *args, **kwargs):
self.plaintext = kwargs.pop('plaintext', None)
super(Secret, self).__init__(*args, **kwargs)
def __unicode__(self):
if self.role and self.parent:
return "{} for {}".format(self.role, self.parent)
return "Secret"
def get_absolute_url(self):
return reverse('secrets:secret', args=[self.pk])
def _pad(self, s):
"""
Prepend the length of the plaintext (2B) and pad with garbage to a multiple of 16B (minimum of 64B).
+--+--------+-------------------------------------------+
|LL|MySecret|xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx|
+--+--------+-------------------------------------------+
"""
if len(s) > 65535:
raise ValueError("Maximum plaintext size is 65535 bytes.")
# Minimum ciphertext size is 64 bytes to conceal the length of short secrets.
if len(s) <= 62:
pad_length = 62 - len(s)
elif (len(s) + 2) % 16:
pad_length = 16 - ((len(s) + 2) % 16)
else:
pad_length = 0
return chr(len(s) >> 8) + chr(len(s) % 256) + s + os.urandom(pad_length)
def _unpad(self, s):
"""
Consume the first two bytes of s as a plaintext length indicator and return only that many bytes as the
plaintext.
"""
plaintext_length = (ord(s[0]) << 8) + ord(s[1])
return s[2:plaintext_length + 2]
def encrypt(self, secret_key):
"""
Generate a random initialization vector (IV) for AES. Pad the plaintext to the AES block size (16 bytes) and
encrypt. Prepend the IV for use in decryption. Finally, record the SHA256 hash of the plaintext for validation
upon decryption.
"""
if self.plaintext is None:
raise Exception("Must unlock or set plaintext before locking.")
# Pad and encrypt plaintext
iv = os.urandom(16)
aes = AES.new(secret_key, AES.MODE_CFB, iv)
self.ciphertext = iv + aes.encrypt(self._pad(self.plaintext))
# Generate SHA256 using Django's built-in password hashing mechanism
self.hash = make_password(self.plaintext, hasher='pbkdf2_sha256')
self.plaintext = None
def decrypt(self, secret_key):
"""
Consume the first 16 bytes of self.ciphertext as the AES initialization vector (IV). The remainder is decrypted
using the IV and the provided secret key. Padding is then removed to reveal the plaintext. Finally, validate the
decrypted plaintext value against the stored hash.
"""
if self.plaintext is not None:
return
if not self.ciphertext:
raise Exception("Must define ciphertext before unlocking.")
# Decrypt ciphertext and remove padding
iv = self.ciphertext[0:16]
aes = AES.new(secret_key, AES.MODE_CFB, iv)
plaintext = self._unpad(aes.decrypt(self.ciphertext[16:]))
# Verify decrypted plaintext against hash
if not self.validate(plaintext):
raise ValueError("Invalid key or ciphertext!")
self.plaintext = plaintext
def validate(self, plaintext):
"""
Validate that a given plaintext matches the stored hash.
"""
if not self.hash:
raise Exception("Hash has not been generated for this secret.")
return check_password(plaintext, self.hash)