1
0
mirror of https://github.com/github/octodns.git synced 2024-05-11 05:55:00 +00:00

Complete refactor & rework of how validation is set up

This is with an eye toward expanding it in the future both in terms of what it
checks and to add the ability to ignore things. This commit does not intend to
change any validation. It only reworks the flow and improves the error
messaging.
This commit is contained in:
Ross McFarland
2017-06-23 07:14:01 -07:00
parent 0aa0878ae8
commit 8323b4c0ea
2 changed files with 895 additions and 262 deletions

View File

@@ -54,7 +54,14 @@ class Delete(Change):
return 'Delete {}'.format(self.existing)
_unescaped_semicolon_re = re.compile(r'\w;')
class ValidationError(Exception):
def __init__(self, fqdn, reasons):
message = 'Invalid record {}\n - {}' \
.format(fqdn, '\n - '.join(reasons))
super(Exception, self).__init__(message)
self.fqdn = fqdn
self.reasons = reasons
class Record(object):
@@ -62,13 +69,13 @@ class Record(object):
@classmethod
def new(cls, zone, name, data, source=None):
fqdn = '{}.{}'.format(name, zone.name) if name else zone.name
try:
_type = data['type']
except KeyError:
fqdn = '{}.{}'.format(name, zone.name) if name else zone.name
raise Exception('Invalid record {}, missing type'.format(fqdn))
try:
_type = {
_class = {
'A': ARecord,
'AAAA': AaaaRecord,
'ALIAS': AliasRecord,
@@ -98,7 +105,21 @@ class Record(object):
}[_type]
except KeyError:
raise Exception('Unknown record type: "{}"'.format(_type))
return _type(zone, name, data, source=source)
reasons = _class.validate(name, data)
if reasons:
raise ValidationError(fqdn, reasons)
return _class(zone, name, data, source=source)
@classmethod
def validate(cls, name, data):
reasons = []
try:
ttl = int(data['ttl'])
if ttl < 0:
reasons.append('invalid ttl')
except KeyError:
reasons.append('missing ttl')
return reasons
def __init__(self, zone, name, data, source=None):
self.log.debug('__init__: zone.name=%s, type=%11s, name=%s', zone.name,
@@ -106,11 +127,8 @@ class Record(object):
self.zone = zone
# force everything lower-case just to be safe
self.name = str(name).lower() if name else name
try:
self.ttl = int(data['ttl'])
except KeyError:
raise Exception('Invalid record {}, missing ttl'.format(self.fqdn))
self.source = source
self.ttl = int(data['ttl'])
octodns = data.get('octodns', {})
self.ignored = octodns.get('ignored', False)
@@ -154,11 +172,17 @@ class GeoValue(object):
geo_re = re.compile(r'^(?P<continent_code>\w\w)(-(?P<country_code>\w\w)'
r'(-(?P<subdivision_code>\w\w))?)?$')
def __init__(self, geo, values):
match = self.geo_re.match(geo)
@classmethod
def _validate_geo(cls, code):
reasons = []
match = cls.geo_re.match(code)
if not match:
raise Exception('Invalid geo "{}"'.format(geo))
reasons.append('invalid geo "{}"'.format(code))
return reasons
def __init__(self, geo, values):
self.code = geo
match = self.geo_re.match(geo)
self.continent_code = match.group('continent_code')
self.country_code = match.group('country_code')
self.subdivision_code = match.group('subdivision_code')
@@ -185,16 +209,29 @@ class GeoValue(object):
class _ValuesMixin(object):
def __init__(self, zone, name, data, source=None):
super(_ValuesMixin, self).__init__(zone, name, data, source=source)
@classmethod
def validate(cls, name, data):
reasons = super(_ValuesMixin, cls).validate(name, data)
values = []
try:
values = data['values']
except KeyError:
try:
values = [data['value']]
except KeyError:
raise Exception('Invalid record {}, missing value(s)'
.format(self.fqdn))
reasons.append('missing value(s)')
for value in values:
reasons.extend(cls._validate_value(value))
return reasons
def __init__(self, zone, name, data, source=None):
super(_ValuesMixin, self).__init__(zone, name, data, source=source)
try:
values = data['values']
except KeyError:
values = [data['value']]
self.values = sorted(self._process_values(values))
def changes(self, other, target):
@@ -224,6 +261,21 @@ class _GeoMixin(_ValuesMixin):
Must be included before `Record`.
'''
@classmethod
def validate(cls, name, data):
reasons = super(_GeoMixin, cls).validate(name, data)
try:
geo = dict(data['geo'])
# TODO: validate legal codes
for code, values in geo.items():
reasons.extend(GeoValue._validate_geo(code))
for value in values:
reasons.extend(cls._validate_value(value))
except KeyError:
pass
return reasons
# TODO: support 'value' as well
# TODO: move away from "data" hash to strict params, it's kind of leaking
# the yaml implementation into here and then forcing it back out into
# non-yaml providers during input
@@ -233,9 +285,8 @@ class _GeoMixin(_ValuesMixin):
self.geo = dict(data['geo'])
except KeyError:
self.geo = {}
for k, vs in self.geo.items():
vs = sorted(self._process_values(vs))
self.geo[k] = GeoValue(k, vs)
for code, values in self.geo.items():
self.geo[code] = GeoValue(code, values)
def _data(self):
ret = super(_GeoMixin, self)._data()
@@ -264,41 +315,52 @@ class _GeoMixin(_ValuesMixin):
class ARecord(_GeoMixin, Record):
_type = 'A'
@classmethod
def _validate_value(self, value):
reasons = []
try:
IPv4Address(unicode(value))
except Exception:
reasons.append('invalid ip address "{}"'.format(value))
return reasons
def _process_values(self, values):
for ip in values:
try:
IPv4Address(unicode(ip))
except Exception:
raise Exception('Invalid record {}, value {} not a valid ip'
.format(self.fqdn, ip))
return values
class AaaaRecord(_GeoMixin, Record):
_type = 'AAAA'
@classmethod
def _validate_value(self, value):
reasons = []
try:
IPv6Address(unicode(value))
except Exception:
reasons.append('invalid ip address "{}"'.format(value))
return reasons
def _process_values(self, values):
ret = []
for ip in values:
try:
IPv6Address(unicode(ip))
ret.append(ip.lower())
except Exception:
raise Exception('Invalid record {}, value {} not a valid ip'
.format(self.fqdn, ip))
return ret
return values
class _ValueMixin(object):
def __init__(self, zone, name, data, source=None):
super(_ValueMixin, self).__init__(zone, name, data, source=source)
@classmethod
def validate(cls, name, data):
reasons = super(_ValueMixin, cls).validate(name, data)
value = None
try:
value = data['value']
except KeyError:
raise Exception('Invalid record {}, missing value'
.format(self.fqdn))
self.value = self._process_value(value)
reasons.append('missing value')
if value:
reasons.extend(cls._validate_value(value))
return reasons
def __init__(self, zone, name, data, source=None):
super(_ValueMixin, self).__init__(zone, name, data, source=source)
self.value = self._process_value(data['value'])
def changes(self, other, target):
if self.value != other.value:
@@ -319,25 +381,42 @@ class _ValueMixin(object):
class AliasRecord(_ValueMixin, Record):
_type = 'ALIAS'
def _process_value(self, value):
@classmethod
def _validate_value(self, value):
reasons = []
if not value.endswith('.'):
raise Exception('Invalid record {}, value ({}) missing trailing .'
.format(self.fqdn, value))
reasons.append('missing trailing .')
return reasons
def _process_value(self, value):
return value
class CnameRecord(_ValueMixin, Record):
_type = 'CNAME'
def _process_value(self, value):
@classmethod
def _validate_value(cls, value):
reasons = []
if not value.endswith('.'):
raise Exception('Invalid record {}, value ({}) missing trailing .'
.format(self.fqdn, value))
return value.lower()
reasons.append('missing trailing .')
return reasons
def _process_value(self, value):
return value
class MxValue(object):
@classmethod
def _validate_value(cls, value):
reasons = []
if 'priority' not in value:
reasons.append('missing priority')
if 'value' not in value:
reasons.append('missing value')
return reasons
def __init__(self, value):
# TODO: rename preference
self.priority = int(value['priority'])
@@ -363,19 +442,38 @@ class MxValue(object):
class MxRecord(_ValuesMixin, Record):
_type = 'MX'
@classmethod
def _validate_value(cls, value):
return MxValue._validate_value(value)
def _process_values(self, values):
ret = []
for value in values:
try:
ret.append(MxValue(value))
except KeyError as e:
raise Exception('Invalid value in record {}, missing {}'
.format(self.fqdn, e.args[0]))
return ret
return [MxValue(v) for v in values]
class NaptrValue(object):
@classmethod
def _validate_value(cls, data):
reasons = []
try:
int(data['order'])
except KeyError:
reasons.append('missing order')
except ValueError:
reasons.append('invalid order "{}"'.format(data['order']))
try:
int(data['preference'])
except KeyError:
reasons.append('missing preference')
except ValueError:
reasons.append('invalid preference "{}"'
.format(data['preference']))
# TODO: validate field data
for k in ('flags', 'service', 'regexp', 'replacement'):
if k not in data:
reasons.append('missing {}'.format(k))
return reasons
def __init__(self, value):
self.order = int(value['order'])
self.preference = int(value['preference'])
@@ -420,42 +518,65 @@ class NaptrValue(object):
class NaptrRecord(_ValuesMixin, Record):
_type = 'NAPTR'
@classmethod
def _validate_value(cls, value):
return NaptrValue._validate_value(value)
def _process_values(self, values):
ret = []
for value in values:
try:
ret.append(NaptrValue(value))
except KeyError as e:
raise Exception('Invalid value in record {}, missing {}'
.format(self.fqdn, e.args[0]))
return ret
return [NaptrValue(v) for v in values]
class NsRecord(_ValuesMixin, Record):
_type = 'NS'
@classmethod
def _validate_value(cls, value):
reasons = []
if not value.endswith('.'):
reasons.append('missing trailing .')
return reasons
def _process_values(self, values):
ret = []
for ns in values:
if not ns.endswith('.'):
raise Exception('Invalid record {}, value {} missing '
'trailing .'.format(self.fqdn, ns))
ret.append(ns.lower())
return ret
return values
class PtrRecord(_ValueMixin, Record):
_type = 'PTR'
def _process_value(self, value):
@classmethod
def _validate_value(cls, value):
reasons = []
if not value.endswith('.'):
raise Exception('Invalid record {}, value ({}) missing trailing .'
.format(self.fqdn, value))
return value.lower()
reasons.append('missing trailing .')
return reasons
def _process_value(self, value):
return value
class SshfpValue(object):
@classmethod
def _validate_value(cls, value):
reasons = []
# TODO: validate algorithm and fingerprint_type values
try:
int(value['algorithm'])
except KeyError:
reasons.append('missing algorithm')
except ValueError:
reasons.append('invalid algorithm "{}"'.format(value['algorithm']))
try:
int(value['fingerprint_type'])
except KeyError:
reasons.append('missing fingerprint_type')
except ValueError:
reasons.append('invalid fingerprint_type "{}"'
.format(value['fingerprint_type']))
if 'fingerprint' not in value:
reasons.append('missing fingerprint')
return reasons
def __init__(self, value):
self.algorithm = int(value['algorithm'])
self.fingerprint_type = int(value['fingerprint_type'])
@@ -484,26 +605,61 @@ class SshfpValue(object):
class SshfpRecord(_ValuesMixin, Record):
_type = 'SSHFP'
@classmethod
def _validate_value(cls, value):
return SshfpValue._validate_value(value)
def _process_values(self, values):
ret = []
for value in values:
try:
ret.append(SshfpValue(value))
except KeyError as e:
raise Exception('Invalid value in record {}, missing {}'
.format(self.fqdn, e.args[0]))
return ret
return [SshfpValue(v) for v in values]
_unescaped_semicolon_re = re.compile(r'\w;')
class SpfRecord(_ValuesMixin, Record):
_type = 'SPF'
@classmethod
def _validate_value(cls, value):
if _unescaped_semicolon_re.search(value):
return ['unescaped ;']
return []
def _process_values(self, values):
return values
class SrvValue(object):
@classmethod
def _validate_value(self, value):
reasons = []
# TODO: validate algorithm and fingerprint_type values
try:
int(value['priority'])
except KeyError:
reasons.append('missing priority')
except ValueError:
reasons.append('invalid priority "{}"'.format(value['priority']))
try:
int(value['weight'])
except KeyError:
reasons.append('missing weight')
except ValueError:
reasons.append('invalid weight "{}"'.format(value['weight']))
try:
int(value['port'])
except KeyError:
reasons.append('missing port')
except ValueError:
reasons.append('invalid port "{}"'.format(value['port']))
try:
if not value['target'].endswith('.'):
reasons.append('missing trailing .')
except KeyError:
reasons.append('missing target')
return reasons
def __init__(self, value):
self.priority = int(value['priority'])
self.weight = int(value['weight'])
@@ -537,28 +693,30 @@ class SrvRecord(_ValuesMixin, Record):
_type = 'SRV'
_name_re = re.compile(r'^_[^\.]+\.[^\.]+')
def __init__(self, zone, name, data, source=None):
if not self._name_re.match(name):
raise Exception('Invalid name {}.{}'.format(name, zone.name))
super(SrvRecord, self).__init__(zone, name, data, source)
@classmethod
def validate(cls, name, data):
reasons = []
if not cls._name_re.match(name):
reasons.append('invalid name')
reasons.extend(super(SrvRecord, cls).validate(name, data))
return reasons
@classmethod
def _validate_value(cls, value):
return SrvValue._validate_value(value)
def _process_values(self, values):
ret = []
for value in values:
try:
ret.append(SrvValue(value))
except KeyError as e:
raise Exception('Invalid value in record {}, missing {}'
.format(self.fqdn, e.args[0]))
return ret
return [SrvValue(v) for v in values]
class TxtRecord(_ValuesMixin, Record):
_type = 'TXT'
@classmethod
def _validate_value(cls, value):
if _unescaped_semicolon_re.search(value):
return ['unescaped ;']
return []
def _process_values(self, values):
for value in values:
if _unescaped_semicolon_re.search(value):
raise Exception('Invalid record {}, unescaped ;'
.format(self.fqdn))
return values

View File

@@ -9,7 +9,8 @@ from unittest import TestCase
from octodns.record import ARecord, AaaaRecord, AliasRecord, CnameRecord, \
Create, Delete, GeoValue, MxRecord, NaptrRecord, NaptrValue, NsRecord, \
PtrRecord, Record, SshfpRecord, SpfRecord, SrvRecord, TxtRecord, Update
Record, SshfpRecord, SpfRecord, SrvRecord, TxtRecord, Update, \
ValidationError
from octodns.zone import Zone
from helpers import GeoProvider, SimpleProvider
@@ -42,15 +43,6 @@ class TestRecord(TestCase):
self.assertEquals([b_value], b.values)
self.assertEquals(b_data, b.data)
# missing ttl
with self.assertRaises(Exception) as ctx:
ARecord(self.zone, None, {'value': '1.1.1.1'})
self.assertTrue('missing ttl' in ctx.exception.message)
# missing values & value
with self.assertRaises(Exception) as ctx:
ARecord(self.zone, None, {'ttl': 42})
self.assertTrue('missing value(s)' in ctx.exception.message)
# top-level
data = {'ttl': 30, 'value': '4.2.3.4'}
self.assertEquals(self.zone.name, ARecord(self.zone, '', data).fqdn)
@@ -104,20 +96,6 @@ class TestRecord(TestCase):
DummyRecord().__repr__()
def test_invalid_a(self):
with self.assertRaises(Exception) as ctx:
ARecord(self.zone, 'a', {
'ttl': 30,
'value': 'foo',
})
self.assertTrue('Invalid record' in ctx.exception.message)
with self.assertRaises(Exception) as ctx:
ARecord(self.zone, 'a', {
'ttl': 30,
'values': ['1.2.3.4', 'bar'],
})
self.assertTrue('Invalid record' in ctx.exception.message)
def test_geo(self):
geo_data = {'ttl': 42, 'values': ['5.2.3.4', '6.2.3.4'],
'geo': {'AF': ['1.1.1.1'],
@@ -157,19 +135,6 @@ class TestRecord(TestCase):
# Geo provider does consider lack of geo diffs to be changes
self.assertTrue(geo.changes(other, geo_target))
# invalid geo code
with self.assertRaises(Exception) as ctx:
ARecord(self.zone, 'geo', {'ttl': 42,
'values': ['5.2.3.4', '6.2.3.4'],
'geo': {'abc': ['1.1.1.1']}})
self.assertEquals('Invalid geo "abc"', ctx.exception.message)
with self.assertRaises(Exception) as ctx:
ARecord(self.zone, 'geo', {'ttl': 42,
'values': ['5.2.3.4', '6.2.3.4'],
'geo': {'NA-US': ['1.1.1']}})
self.assertTrue('not a valid ip' in ctx.exception.message)
# __repr__ doesn't blow up
geo.__repr__()
@@ -187,30 +152,12 @@ class TestRecord(TestCase):
self.assertEquals([b_value], b.values)
self.assertEquals(b_data, b.data)
# missing values & value
with self.assertRaises(Exception) as ctx:
_type(self.zone, None, {'ttl': 42})
self.assertTrue('missing value(s)' in ctx.exception.message)
def test_aaaa(self):
a_values = ['2001:0db8:3c4d:0015:0000:0000:1a2f:1a2b',
'2001:0db8:3c4d:0015:0000:0000:1a2f:1a3b']
b_value = '2001:0db8:3c4d:0015:0000:0000:1a2f:1a4b'
self.assertMultipleValues(AaaaRecord, a_values, b_value)
with self.assertRaises(Exception) as ctx:
AaaaRecord(self.zone, 'a', {
'ttl': 30,
'value': 'foo',
})
self.assertTrue('Invalid record' in ctx.exception.message)
with self.assertRaises(Exception) as ctx:
AaaaRecord(self.zone, 'a', {
'ttl': 30,
'values': [b_value, 'bar'],
})
self.assertTrue('Invalid record' in ctx.exception.message)
def assertSingleValue(self, _type, a_value, b_value):
a_data = {'ttl': 30, 'value': a_value}
a = _type(self.zone, 'a', a_data)
@@ -225,11 +172,6 @@ class TestRecord(TestCase):
self.assertEquals(b_value, b.value)
self.assertEquals(b_data, b.data)
# missing value
with self.assertRaises(Exception) as ctx:
_type(self.zone, None, {'ttl': 42})
self.assertTrue('missing value' in ctx.exception.message)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
@@ -251,15 +193,6 @@ class TestRecord(TestCase):
self.assertEquals(a_data['value'], a.value)
self.assertEquals(a_data, a.data)
# missing value
with self.assertRaises(Exception) as ctx:
AliasRecord(self.zone, None, {'ttl': 0})
self.assertTrue('missing value' in ctx.exception.message)
# bad name
with self.assertRaises(Exception) as ctx:
AliasRecord(self.zone, None, {'ttl': 0, 'value': 'www.unit.tests'})
self.assertTrue('missing trailing .' in ctx.exception.message)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
@@ -277,19 +210,6 @@ class TestRecord(TestCase):
self.assertSingleValue(CnameRecord, 'target.foo.com.',
'other.foo.com.')
with self.assertRaises(Exception) as ctx:
CnameRecord(self.zone, 'a', {
'ttl': 30,
'value': 'foo',
})
self.assertTrue('Invalid record' in ctx.exception.message)
with self.assertRaises(Exception) as ctx:
CnameRecord(self.zone, 'a', {
'ttl': 30,
'values': ['foo.com.', 'bar.com'],
})
self.assertTrue('Invalid record' in ctx.exception.message)
def test_mx(self):
a_values = [{
'priority': 10,
@@ -319,15 +239,6 @@ class TestRecord(TestCase):
self.assertEquals(b_value['value'], b.values[0].value)
self.assertEquals(b_data, b.data)
# missing value
with self.assertRaises(Exception) as ctx:
MxRecord(self.zone, None, {'ttl': 42})
self.assertTrue('missing value(s)' in ctx.exception.message)
# invalid value
with self.assertRaises(Exception) as ctx:
MxRecord(self.zone, None, {'ttl': 42, 'value': {}})
self.assertTrue('Invalid value' in ctx.exception.message)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
@@ -387,15 +298,6 @@ class TestRecord(TestCase):
self.assertEquals(b_value[k], getattr(b.values[0], k))
self.assertEquals(b_data, b.data)
# missing value
with self.assertRaises(Exception) as ctx:
NaptrRecord(self.zone, None, {'ttl': 42})
self.assertTrue('missing value' in ctx.exception.message)
# invalid value
with self.assertRaises(Exception) as ctx:
NaptrRecord(self.zone, None, {'ttl': 42, 'value': {}})
self.assertTrue('Invalid value' in ctx.exception.message)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
@@ -538,33 +440,6 @@ class TestRecord(TestCase):
self.assertEquals([b_value], b.values)
self.assertEquals(b_data, b.data)
# missing values & value
with self.assertRaises(Exception) as ctx:
NsRecord(self.zone, None, {'ttl': 42})
self.assertTrue('missing value(s)' in ctx.exception.message)
with self.assertRaises(Exception) as ctx:
NsRecord(self.zone, 'a', {
'ttl': 30,
'value': 'foo',
})
self.assertTrue('Invalid record' in ctx.exception.message)
with self.assertRaises(Exception) as ctx:
NsRecord(self.zone, 'a', {
'ttl': 30,
'values': ['foo.com.', 'bar.com'],
})
self.assertTrue('Invalid record' in ctx.exception.message)
def test_ptr(self):
self.assertSingleValue(PtrRecord, 'foo.bar.com.', 'other.bar.com.')
with self.assertRaises(Exception) as ctx:
PtrRecord(self.zone, 'a', {
'ttl': 30,
'value': 'foo',
})
self.assertTrue('Invalid record' in ctx.exception.message)
def test_sshfp(self):
a_values = [{
'algorithm': 10,
@@ -599,15 +474,6 @@ class TestRecord(TestCase):
self.assertEquals(b_value['fingerprint'], b.values[0].fingerprint)
self.assertEquals(b_data, b.data)
# missing value
with self.assertRaises(Exception) as ctx:
SshfpRecord(self.zone, None, {'ttl': 42})
self.assertTrue('missing value(s)' in ctx.exception.message)
# invalid value
with self.assertRaises(Exception) as ctx:
SshfpRecord(self.zone, None, {'ttl': 42, 'value': {}})
self.assertTrue('Invalid value' in ctx.exception.message)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
@@ -677,21 +543,6 @@ class TestRecord(TestCase):
self.assertEquals(b_value['target'], b.values[0].target)
self.assertEquals(b_data, b.data)
# invalid name
with self.assertRaises(Exception) as ctx:
SrvRecord(self.zone, 'bad', {'ttl': 42})
self.assertEquals('Invalid name bad.unit.tests.',
ctx.exception.message)
# missing value
with self.assertRaises(Exception) as ctx:
SrvRecord(self.zone, '_missing._tcp', {'ttl': 42})
self.assertTrue('missing value(s)' in ctx.exception.message)
# invalid value
with self.assertRaises(Exception) as ctx:
SrvRecord(self.zone, '_missing._udp', {'ttl': 42, 'value': {}})
self.assertTrue('Invalid value' in ctx.exception.message)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
@@ -729,21 +580,6 @@ class TestRecord(TestCase):
b_value = 'b other'
self.assertMultipleValues(TxtRecord, a_values, b_value)
Record.new(self.zone, 'txt', {
'ttl': 44,
'type': 'TXT',
'value': 'escaped\; foo',
})
with self.assertRaises(Exception) as ctx:
Record.new(self.zone, 'txt', {
'ttl': 44,
'type': 'TXT',
'value': 'un-escaped; foo',
})
self.assertEquals('Invalid record txt.unit.tests., unescaped ;',
ctx.exception.message)
def test_record_new(self):
txt = Record.new(self.zone, 'txt', {
'ttl': 44,
@@ -794,3 +630,642 @@ class TestRecord(TestCase):
self.assertEquals('CA', geo.subdivision_code)
self.assertEquals(values, geo.values)
self.assertEquals(['NA-US', 'NA'], list(geo.parents))
class TestRecordValidation(TestCase):
zone = Zone('unit.tests.', [])
def test_base(self):
# no ttl
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'A',
'value': '1.2.3.4',
})
self.assertEquals(['missing ttl'], ctx.exception.reasons)
# invalid ttl
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'A',
'ttl': -1,
'value': '1.2.3.4',
})
self.assertEquals('www.unit.tests.', ctx.exception.fqdn)
self.assertEquals(['invalid ttl'], ctx.exception.reasons)
def test_A_and_values_mixin(self):
# doesn't blow up
Record.new(self.zone, '', {
'type': 'A',
'ttl': 600,
'value': '1.2.3.4',
})
Record.new(self.zone, '', {
'type': 'A',
'ttl': 600,
'values': [
'1.2.3.4',
'1.2.3.5',
]
})
# missing value(s)
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'A',
'ttl': 600,
})
self.assertEquals(['missing value(s)'], ctx.exception.reasons)
# missing value(s) & ttl
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'A',
})
self.assertEquals(['missing ttl', 'missing value(s)'],
ctx.exception.reasons)
# invalid ip address
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'A',
'ttl': 600,
'value': 'hello'
})
self.assertEquals(['invalid ip address "hello"'],
ctx.exception.reasons)
# invalid ip addresses
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'A',
'ttl': 600,
'values': ['hello', 'goodbye']
})
self.assertEquals([
'invalid ip address "hello"',
'invalid ip address "goodbye"'
], ctx.exception.reasons)
# invalid & valid ip addresses, no ttl
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'A',
'values': ['1.2.3.4', 'hello', '5.6.7.8']
})
self.assertEquals([
'missing ttl',
'invalid ip address "hello"',
], ctx.exception.reasons)
def test_geo(self):
Record.new(self.zone, '', {
'geo': {
'NA': ['1.2.3.5'],
'NA-US': ['1.2.3.5', '1.2.3.6']
},
'type': 'A',
'ttl': 600,
'value': '1.2.3.4',
})
# invalid ip address
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'geo': {
'NA': ['hello'],
'NA-US': ['1.2.3.5', '1.2.3.6']
},
'type': 'A',
'ttl': 600,
'value': '1.2.3.4',
})
self.assertEquals(['invalid ip address "hello"'],
ctx.exception.reasons)
# invalid geo code
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'geo': {
'XYZ': ['1.2.3.4'],
},
'type': 'A',
'ttl': 600,
'value': '1.2.3.4',
})
self.assertEquals(['invalid geo "XYZ"'], ctx.exception.reasons)
# invalid ip address
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'geo': {
'NA': ['hello'],
'NA-US': ['1.2.3.5', 'goodbye']
},
'type': 'A',
'ttl': 600,
'value': '1.2.3.4',
})
self.assertEquals([
'invalid ip address "hello"',
'invalid ip address "goodbye"'
], ctx.exception.reasons)
def test_AAAA(self):
# doesn't blow up
Record.new(self.zone, '', {
'type': 'AAAA',
'ttl': 600,
'value': '2601:644:500:e210:62f8:1dff:feb8:947a',
})
Record.new(self.zone, '', {
'type': 'AAAA',
'ttl': 600,
'values': [
'2601:644:500:e210:62f8:1dff:feb8:947a',
'2601:644:500:e210:62f8:1dff:feb8:947b',
]
})
# invalid ip address
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'AAAA',
'ttl': 600,
'value': 'hello'
})
self.assertEquals(['invalid ip address "hello"'],
ctx.exception.reasons)
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'AAAA',
'ttl': 600,
'value': '1.2.3.4'
})
self.assertEquals(['invalid ip address "1.2.3.4"'],
ctx.exception.reasons)
# invalid ip addresses
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'AAAA',
'ttl': 600,
'values': ['hello', 'goodbye']
})
self.assertEquals([
'invalid ip address "hello"',
'invalid ip address "goodbye"'
], ctx.exception.reasons)
def test_ALIAS_and_value_mixin(self):
# doesn't blow up
Record.new(self.zone, '', {
'type': 'ALIAS',
'ttl': 600,
'value': 'foo.bar.com.',
})
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'ALIAS',
'ttl': 600,
})
self.assertEquals(['missing value'], ctx.exception.reasons)
# missing trailing .
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'ALIAS',
'ttl': 600,
'value': 'foo.bar.com',
})
self.assertEquals(['missing trailing .'], ctx.exception.reasons)
def test_CNAME(self):
# doesn't blow up
Record.new(self.zone, '', {
'type': 'CNAME',
'ttl': 600,
'value': 'foo.bar.com.',
})
# missing trailing .
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'CNAME',
'ttl': 600,
'value': 'foo.bar.com',
})
self.assertEquals(['missing trailing .'], ctx.exception.reasons)
def test_MX(self):
# doesn't blow up
Record.new(self.zone, '', {
'type': 'MX',
'ttl': 600,
'value': {
'priority': 10,
'value': 'foo.bar.com.'
}
})
# missing priority
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'MX',
'ttl': 600,
'value': {
'value': 'foo.bar.com.'
}
})
self.assertEquals(['missing priority'], ctx.exception.reasons)
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'MX',
'ttl': 600,
'value': {
'priority': 10,
}
})
self.assertEquals(['missing value'], ctx.exception.reasons)
def test_NXPTR(self):
# doesn't blow up
Record.new(self.zone, '', {
'type': 'NAPTR',
'ttl': 600,
'value': {
'order': 10,
'preference': 20,
'flags': 'f',
'service': 'srv',
'regexp': '.*',
'replacement': '.'
}
})
# missing X priority
value = {
'order': 10,
'preference': 20,
'flags': 'f',
'service': 'srv',
'regexp': '.*',
'replacement': '.'
}
for k in ('order', 'preference', 'flags', 'service', 'regexp',
'replacement'):
v = dict(value)
del v[k]
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'NAPTR',
'ttl': 600,
'value': v
})
self.assertEquals(['missing {}'.format(k)], ctx.exception.reasons)
# non-int order
v = dict(value)
v['order'] = 'boo'
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'NAPTR',
'ttl': 600,
'value': v
})
self.assertEquals(['invalid order "boo"'], ctx.exception.reasons)
# non-int preference
v = dict(value)
v['preference'] = 'who'
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'NAPTR',
'ttl': 600,
'value': v
})
self.assertEquals(['invalid preference "who"'], ctx.exception.reasons)
def test_NS(self):
# doesn't blow up
Record.new(self.zone, '', {
'type': 'NS',
'ttl': 600,
'values': [
'foo.bar.com.',
'1.2.3.4.'
]
})
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'NS',
'ttl': 600,
})
self.assertEquals(['missing value(s)'], ctx.exception.reasons)
# no trailing .
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'NS',
'ttl': 600,
'value': 'foo.bar',
})
self.assertEquals(['missing trailing .'], ctx.exception.reasons)
def test_PTR(self):
# doesn't blow up (name & zone here don't make any sense, but not
# important)
Record.new(self.zone, '', {
'type': 'PTR',
'ttl': 600,
'value': 'foo.bar.com.',
})
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'PTR',
'ttl': 600,
})
self.assertEquals(['missing value'], ctx.exception.reasons)
# no trailing .
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'PTR',
'ttl': 600,
'value': 'foo.bar',
})
self.assertEquals(['missing trailing .'], ctx.exception.reasons)
def test_SSHFP(self):
# doesn't blow up
Record.new(self.zone, '', {
'type': 'SSHFP',
'ttl': 600,
'value': {
'algorithm': 1,
'fingerprint_type': 1,
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73'
}
})
# missing algorithm
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'SSHFP',
'ttl': 600,
'value': {
'fingerprint_type': 1,
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73'
}
})
self.assertEquals(['missing algorithm'], ctx.exception.reasons)
# invalid algorithm
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'SSHFP',
'ttl': 600,
'value': {
'algorithm': 'nope',
'fingerprint_type': 1,
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73'
}
})
self.assertEquals(['invalid algorithm "nope"'], ctx.exception.reasons)
# missing fingerprint_type
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'SSHFP',
'ttl': 600,
'value': {
'algorithm': 1,
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73'
}
})
self.assertEquals(['missing fingerprint_type'], ctx.exception.reasons)
# invalid fingerprint_type
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'SSHFP',
'ttl': 600,
'value': {
'algorithm': 1,
'fingerprint_type': 'yeeah',
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73'
}
})
self.assertEquals(['invalid fingerprint_type "yeeah"'],
ctx.exception.reasons)
# missing fingerprint
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'SSHFP',
'ttl': 600,
'value': {
'algorithm': 1,
'fingerprint_type': 1,
}
})
self.assertEquals(['missing fingerprint'], ctx.exception.reasons)
def test_SPF(self):
# doesn't blow up (name & zone here don't make any sense, but not
# important)
Record.new(self.zone, '', {
'type': 'SPF',
'ttl': 600,
'values': [
'v=spf1 ip4:192.168.0.1/16-all',
'v=spf1 ip4:10.1.2.1/24-all',
'this has some\; semi-colons\; in it',
]
})
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'SPF',
'ttl': 600,
})
self.assertEquals(['missing value(s)'], ctx.exception.reasons)
# missing escapes
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'SPF',
'ttl': 600,
'value': 'this has some; semi-colons\; in it',
})
self.assertEquals(['unescaped ;'], ctx.exception.reasons)
def test_SRV(self):
# doesn't blow up
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
'target': 'foo.bar.baz.'
}
})
# invalid name
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'neup', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
'target': 'foo.bar.baz.'
}
})
self.assertEquals(['invalid name'], ctx.exception.reasons)
# missing priority
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'weight': 2,
'port': 3,
'target': 'foo.bar.baz.'
}
})
self.assertEquals(['missing priority'], ctx.exception.reasons)
# invalid priority
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 'foo',
'weight': 2,
'port': 3,
'target': 'foo.bar.baz.'
}
})
self.assertEquals(['invalid priority "foo"'], ctx.exception.reasons)
# missing weight
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'port': 3,
'target': 'foo.bar.baz.'
}
})
self.assertEquals(['missing weight'], ctx.exception.reasons)
# invalid weight
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 'foo',
'port': 3,
'target': 'foo.bar.baz.'
}
})
self.assertEquals(['invalid weight "foo"'], ctx.exception.reasons)
# missing port
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'target': 'foo.bar.baz.'
}
})
self.assertEquals(['missing port'], ctx.exception.reasons)
# invalid port
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 'foo',
'target': 'foo.bar.baz.'
}
})
self.assertEquals(['invalid port "foo"'], ctx.exception.reasons)
# missing target
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
}
})
self.assertEquals(['missing target'], ctx.exception.reasons)
# invalid target
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
'target': 'foo.bar.baz'
}
})
self.assertEquals(['missing trailing .'],
ctx.exception.reasons)
def test_TXT(self):
# doesn't blow up (name & zone here don't make any sense, but not
# important)
Record.new(self.zone, '', {
'type': 'TXT',
'ttl': 600,
'values': [
'hello world',
'this has some\; semi-colons\; in it',
]
})
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'TXT',
'ttl': 600,
})
self.assertEquals(['missing value(s)'], ctx.exception.reasons)
# missing escapes
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'TXT',
'ttl': 600,
'value': 'this has some; semi-colons\; in it',
})
self.assertEquals(['unescaped ;'], ctx.exception.reasons)