mirror of
https://github.com/github/octodns.git
synced 2024-05-11 05:55:00 +00:00
Add CAA Record class and tests
This commit is contained in:
@@ -81,29 +81,16 @@ class Record(object):
|
||||
'A': ARecord,
|
||||
'AAAA': AaaaRecord,
|
||||
'ALIAS': AliasRecord,
|
||||
# cert
|
||||
'CAA': CaaRecord,
|
||||
'CNAME': CnameRecord,
|
||||
# dhcid
|
||||
# dname
|
||||
# dnskey
|
||||
# ds
|
||||
# ipseckey
|
||||
# key
|
||||
# kx
|
||||
# loc
|
||||
'MX': MxRecord,
|
||||
'NAPTR': NaptrRecord,
|
||||
'NS': NsRecord,
|
||||
# nsap
|
||||
'PTR': PtrRecord,
|
||||
# px
|
||||
# rp
|
||||
# soa - would it even make sense?
|
||||
'SPF': SpfRecord,
|
||||
'SRV': SrvRecord,
|
||||
'SSHFP': SshfpRecord,
|
||||
'TXT': TxtRecord,
|
||||
# url
|
||||
}[_type]
|
||||
except KeyError:
|
||||
raise Exception('Unknown record type: "{}"'.format(_type))
|
||||
@@ -398,6 +385,66 @@ class AliasRecord(_ValueMixin, Record):
|
||||
return value
|
||||
|
||||
|
||||
class CaaValue(object):
|
||||
# https://tools.ietf.org/html/rfc6844#page-5
|
||||
|
||||
@classmethod
|
||||
def _validate_value(cls, value):
|
||||
reasons = []
|
||||
try:
|
||||
flags = int(value.get('flags', 0))
|
||||
if flags not in (0, 1):
|
||||
reasons.append('invalid flags "{}"'.format(flags))
|
||||
except ValueError:
|
||||
reasons.append('invalid flags "{}"'.format(value['flags']))
|
||||
|
||||
try:
|
||||
tag = value['tag']
|
||||
if tag not in ('issue', 'issuewild', 'iodef'):
|
||||
reasons.append('invalid tag "{}"'.format(tag))
|
||||
except KeyError:
|
||||
reasons.append('missing tag')
|
||||
|
||||
if 'value' not in value:
|
||||
reasons.append('missing value')
|
||||
|
||||
return reasons
|
||||
|
||||
def __init__(self, value):
|
||||
self.flags = int(value.get('flags', 0))
|
||||
self.tag = value['tag']
|
||||
self.value = value['value']
|
||||
|
||||
@property
|
||||
def data(self):
|
||||
return {
|
||||
'flags': self.flags,
|
||||
'tag': self.tag,
|
||||
'value': self.value,
|
||||
}
|
||||
|
||||
def __cmp__(self, other):
|
||||
if self.flags == other.flags:
|
||||
if self.tag == other.tag:
|
||||
return cmp(self.value, other.value)
|
||||
return cmp(self.tag, other.tag)
|
||||
return cmp(self.flags, other.flags)
|
||||
|
||||
def __repr__(self):
|
||||
return "'{} {} {}'".format(self.flags, self.tag, self.value)
|
||||
|
||||
|
||||
class CaaRecord(_ValuesMixin, Record):
|
||||
_type = 'CAA'
|
||||
|
||||
@classmethod
|
||||
def _validate_value(cls, value):
|
||||
return CaaValue._validate_value(value)
|
||||
|
||||
def _process_values(self, values):
|
||||
return [CaaValue(v) for v in values]
|
||||
|
||||
|
||||
class CnameRecord(_ValueMixin, Record):
|
||||
_type = 'CNAME'
|
||||
|
||||
|
@@ -7,10 +7,10 @@ from __future__ import absolute_import, division, print_function, \
|
||||
|
||||
from unittest import TestCase
|
||||
|
||||
from octodns.record import ARecord, AaaaRecord, AliasRecord, CnameRecord, \
|
||||
Create, Delete, GeoValue, MxRecord, NaptrRecord, NaptrValue, NsRecord, \
|
||||
Record, SshfpRecord, SpfRecord, SrvRecord, TxtRecord, Update, \
|
||||
ValidationError
|
||||
from octodns.record import ARecord, AaaaRecord, AliasRecord, CaaRecord, \
|
||||
CnameRecord, Create, Delete, GeoValue, MxRecord, NaptrRecord, \
|
||||
NaptrValue, NsRecord, Record, SshfpRecord, SpfRecord, SrvRecord, \
|
||||
TxtRecord, Update, ValidationError
|
||||
from octodns.zone import Zone
|
||||
|
||||
from helpers import GeoProvider, SimpleProvider
|
||||
@@ -206,6 +206,66 @@ class TestRecord(TestCase):
|
||||
# __repr__ doesn't blow up
|
||||
a.__repr__()
|
||||
|
||||
def test_caa(self):
|
||||
a_values = [{
|
||||
'flags': 0,
|
||||
'tag': 'issue',
|
||||
'value': 'ca.example.net',
|
||||
}, {
|
||||
'flags': 1,
|
||||
'tag': 'iodef',
|
||||
'value': 'mailto:security@example.com',
|
||||
}]
|
||||
a_data = {'ttl': 30, 'values': a_values}
|
||||
a = CaaRecord(self.zone, 'a', a_data)
|
||||
self.assertEquals('a', a.name)
|
||||
self.assertEquals('a.unit.tests.', a.fqdn)
|
||||
self.assertEquals(30, a.ttl)
|
||||
self.assertEquals(a_values[0]['flags'], a.values[0].flags)
|
||||
self.assertEquals(a_values[0]['tag'], a.values[0].tag)
|
||||
self.assertEquals(a_values[0]['value'], a.values[0].value)
|
||||
self.assertEquals(a_values[1]['flags'], a.values[1].flags)
|
||||
self.assertEquals(a_values[1]['tag'], a.values[1].tag)
|
||||
self.assertEquals(a_values[1]['value'], a.values[1].value)
|
||||
self.assertEquals(a_data, a.data)
|
||||
|
||||
b_value = {
|
||||
'tag': 'iodef',
|
||||
'value': 'http://iodef.example.com/',
|
||||
}
|
||||
b_data = {'ttl': 30, 'value': b_value}
|
||||
b = CaaRecord(self.zone, 'b', b_data)
|
||||
self.assertEquals(0, b.values[0].flags)
|
||||
self.assertEquals(b_value['tag'], b.values[0].tag)
|
||||
self.assertEquals(b_value['value'], b.values[0].value)
|
||||
b_data['value']['flags'] = 0
|
||||
self.assertEquals(b_data, b.data)
|
||||
|
||||
target = SimpleProvider()
|
||||
# No changes with self
|
||||
self.assertFalse(a.changes(a, target))
|
||||
# Diff in flags causes change
|
||||
other = CaaRecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
|
||||
other.values[0].flags = 1
|
||||
change = a.changes(other, target)
|
||||
self.assertEqual(change.existing, a)
|
||||
self.assertEqual(change.new, other)
|
||||
# Diff in tag causes change
|
||||
other.values[0].flags = a.values[0].flags
|
||||
other.values[0].tag = 'foo'
|
||||
change = a.changes(other, target)
|
||||
self.assertEqual(change.existing, a)
|
||||
self.assertEqual(change.new, other)
|
||||
# Diff in value causes change
|
||||
other.values[0].tag = a.values[0].tag
|
||||
other.values[0].value = 'bar'
|
||||
change = a.changes(other, target)
|
||||
self.assertEqual(change.existing, a)
|
||||
self.assertEqual(change.new, other)
|
||||
|
||||
# __repr__ doesn't blow up
|
||||
a.__repr__()
|
||||
|
||||
def test_cname(self):
|
||||
self.assertSingleValue(CnameRecord, 'target.foo.com.',
|
||||
'other.foo.com.')
|
||||
@@ -861,6 +921,76 @@ class TestRecordValidation(TestCase):
|
||||
})
|
||||
self.assertEquals(['missing trailing .'], ctx.exception.reasons)
|
||||
|
||||
def test_CAA(self):
|
||||
# doesn't blow up
|
||||
Record.new(self.zone, '', {
|
||||
'type': 'CAA',
|
||||
'ttl': 600,
|
||||
'value': {
|
||||
'flags': 1,
|
||||
'tag': 'iodef',
|
||||
'value': 'http://foo.bar.com/'
|
||||
}
|
||||
})
|
||||
|
||||
# invalid flags
|
||||
with self.assertRaises(ValidationError) as ctx:
|
||||
Record.new(self.zone, '', {
|
||||
'type': 'CAA',
|
||||
'ttl': 600,
|
||||
'value': {
|
||||
'flags': 42,
|
||||
'tag': 'iodef',
|
||||
'value': 'http://foo.bar.com/',
|
||||
}
|
||||
})
|
||||
self.assertEquals(['invalid flags "42"'], ctx.exception.reasons)
|
||||
with self.assertRaises(ValidationError) as ctx:
|
||||
Record.new(self.zone, '', {
|
||||
'type': 'CAA',
|
||||
'ttl': 600,
|
||||
'value': {
|
||||
'flags': 'nope',
|
||||
'tag': 'iodef',
|
||||
'value': 'http://foo.bar.com/',
|
||||
}
|
||||
})
|
||||
self.assertEquals(['invalid flags "nope"'], ctx.exception.reasons)
|
||||
|
||||
# missing tag
|
||||
with self.assertRaises(ValidationError) as ctx:
|
||||
Record.new(self.zone, '', {
|
||||
'type': 'CAA',
|
||||
'ttl': 600,
|
||||
'value': {
|
||||
'value': 'http://foo.bar.com/',
|
||||
}
|
||||
})
|
||||
self.assertEquals(['missing tag'], ctx.exception.reasons)
|
||||
|
||||
# invalid tag
|
||||
with self.assertRaises(ValidationError) as ctx:
|
||||
Record.new(self.zone, '', {
|
||||
'type': 'CAA',
|
||||
'ttl': 600,
|
||||
'value': {
|
||||
'tag': 'xyz',
|
||||
'value': 'http://foo.bar.com/',
|
||||
}
|
||||
})
|
||||
self.assertEquals(['invalid tag "xyz"'], ctx.exception.reasons)
|
||||
|
||||
# missing value
|
||||
with self.assertRaises(ValidationError) as ctx:
|
||||
Record.new(self.zone, '', {
|
||||
'type': 'CAA',
|
||||
'ttl': 600,
|
||||
'value': {
|
||||
'tag': 'iodef',
|
||||
}
|
||||
})
|
||||
self.assertEquals(['missing value'], ctx.exception.reasons)
|
||||
|
||||
def test_CNAME(self):
|
||||
# doesn't blow up
|
||||
Record.new(self.zone, 'www', {
|
||||
|
Reference in New Issue
Block a user