diff --git a/octodns/record.py b/octodns/record.py index cacb147..827ad0a 100644 --- a/octodns/record.py +++ b/octodns/record.py @@ -54,7 +54,14 @@ class Delete(Change): return 'Delete {}'.format(self.existing) -_unescaped_semicolon_re = re.compile(r'\w;') +class ValidationError(Exception): + + def __init__(self, fqdn, reasons): + message = 'Invalid record {}\n - {}' \ + .format(fqdn, '\n - '.join(reasons)) + super(Exception, self).__init__(message) + self.fqdn = fqdn + self.reasons = reasons class Record(object): @@ -62,13 +69,13 @@ class Record(object): @classmethod def new(cls, zone, name, data, source=None): + fqdn = '{}.{}'.format(name, zone.name) if name else zone.name try: _type = data['type'] except KeyError: - fqdn = '{}.{}'.format(name, zone.name) if name else zone.name raise Exception('Invalid record {}, missing type'.format(fqdn)) try: - _type = { + _class = { 'A': ARecord, 'AAAA': AaaaRecord, 'ALIAS': AliasRecord, @@ -98,7 +105,21 @@ class Record(object): }[_type] except KeyError: raise Exception('Unknown record type: "{}"'.format(_type)) - return _type(zone, name, data, source=source) + reasons = _class.validate(name, data) + if reasons: + raise ValidationError(fqdn, reasons) + return _class(zone, name, data, source=source) + + @classmethod + def validate(cls, name, data): + reasons = [] + try: + ttl = int(data['ttl']) + if ttl < 0: + reasons.append('invalid ttl') + except KeyError: + reasons.append('missing ttl') + return reasons def __init__(self, zone, name, data, source=None): self.log.debug('__init__: zone.name=%s, type=%11s, name=%s', zone.name, @@ -106,11 +127,8 @@ class Record(object): self.zone = zone # force everything lower-case just to be safe self.name = str(name).lower() if name else name - try: - self.ttl = int(data['ttl']) - except KeyError: - raise Exception('Invalid record {}, missing ttl'.format(self.fqdn)) self.source = source + self.ttl = int(data['ttl']) octodns = data.get('octodns', {}) self.ignored = octodns.get('ignored', False) @@ -154,11 +172,17 @@ class GeoValue(object): geo_re = re.compile(r'^(?P\w\w)(-(?P\w\w)' r'(-(?P\w\w))?)?$') - def __init__(self, geo, values): - match = self.geo_re.match(geo) + @classmethod + def _validate_geo(cls, code): + reasons = [] + match = cls.geo_re.match(code) if not match: - raise Exception('Invalid geo "{}"'.format(geo)) + reasons.append('invalid geo "{}"'.format(code)) + return reasons + + def __init__(self, geo, values): self.code = geo + match = self.geo_re.match(geo) self.continent_code = match.group('continent_code') self.country_code = match.group('country_code') self.subdivision_code = match.group('subdivision_code') @@ -185,16 +209,29 @@ class GeoValue(object): class _ValuesMixin(object): - def __init__(self, zone, name, data, source=None): - super(_ValuesMixin, self).__init__(zone, name, data, source=source) + @classmethod + def validate(cls, name, data): + reasons = super(_ValuesMixin, cls).validate(name, data) + values = [] try: values = data['values'] except KeyError: try: values = [data['value']] except KeyError: - raise Exception('Invalid record {}, missing value(s)' - .format(self.fqdn)) + reasons.append('missing value(s)') + + for value in values: + reasons.extend(cls._validate_value(value)) + + return reasons + + def __init__(self, zone, name, data, source=None): + super(_ValuesMixin, self).__init__(zone, name, data, source=source) + try: + values = data['values'] + except KeyError: + values = [data['value']] self.values = sorted(self._process_values(values)) def changes(self, other, target): @@ -224,6 +261,21 @@ class _GeoMixin(_ValuesMixin): Must be included before `Record`. ''' + @classmethod + def validate(cls, name, data): + reasons = super(_GeoMixin, cls).validate(name, data) + try: + geo = dict(data['geo']) + # TODO: validate legal codes + for code, values in geo.items(): + reasons.extend(GeoValue._validate_geo(code)) + for value in values: + reasons.extend(cls._validate_value(value)) + except KeyError: + pass + return reasons + + # TODO: support 'value' as well # TODO: move away from "data" hash to strict params, it's kind of leaking # the yaml implementation into here and then forcing it back out into # non-yaml providers during input @@ -233,9 +285,8 @@ class _GeoMixin(_ValuesMixin): self.geo = dict(data['geo']) except KeyError: self.geo = {} - for k, vs in self.geo.items(): - vs = sorted(self._process_values(vs)) - self.geo[k] = GeoValue(k, vs) + for code, values in self.geo.items(): + self.geo[code] = GeoValue(code, values) def _data(self): ret = super(_GeoMixin, self)._data() @@ -264,41 +315,52 @@ class _GeoMixin(_ValuesMixin): class ARecord(_GeoMixin, Record): _type = 'A' + @classmethod + def _validate_value(self, value): + reasons = [] + try: + IPv4Address(unicode(value)) + except Exception: + reasons.append('invalid ip address "{}"'.format(value)) + return reasons + def _process_values(self, values): - for ip in values: - try: - IPv4Address(unicode(ip)) - except Exception: - raise Exception('Invalid record {}, value {} not a valid ip' - .format(self.fqdn, ip)) return values class AaaaRecord(_GeoMixin, Record): _type = 'AAAA' + @classmethod + def _validate_value(self, value): + reasons = [] + try: + IPv6Address(unicode(value)) + except Exception: + reasons.append('invalid ip address "{}"'.format(value)) + return reasons + def _process_values(self, values): - ret = [] - for ip in values: - try: - IPv6Address(unicode(ip)) - ret.append(ip.lower()) - except Exception: - raise Exception('Invalid record {}, value {} not a valid ip' - .format(self.fqdn, ip)) - return ret + return values class _ValueMixin(object): - def __init__(self, zone, name, data, source=None): - super(_ValueMixin, self).__init__(zone, name, data, source=source) + @classmethod + def validate(cls, name, data): + reasons = super(_ValueMixin, cls).validate(name, data) + value = None try: value = data['value'] except KeyError: - raise Exception('Invalid record {}, missing value' - .format(self.fqdn)) - self.value = self._process_value(value) + reasons.append('missing value') + if value: + reasons.extend(cls._validate_value(value)) + return reasons + + def __init__(self, zone, name, data, source=None): + super(_ValueMixin, self).__init__(zone, name, data, source=source) + self.value = self._process_value(data['value']) def changes(self, other, target): if self.value != other.value: @@ -319,25 +381,42 @@ class _ValueMixin(object): class AliasRecord(_ValueMixin, Record): _type = 'ALIAS' - def _process_value(self, value): + @classmethod + def _validate_value(self, value): + reasons = [] if not value.endswith('.'): - raise Exception('Invalid record {}, value ({}) missing trailing .' - .format(self.fqdn, value)) + reasons.append('missing trailing .') + return reasons + + def _process_value(self, value): return value class CnameRecord(_ValueMixin, Record): _type = 'CNAME' - def _process_value(self, value): + @classmethod + def _validate_value(cls, value): + reasons = [] if not value.endswith('.'): - raise Exception('Invalid record {}, value ({}) missing trailing .' - .format(self.fqdn, value)) - return value.lower() + reasons.append('missing trailing .') + return reasons + + def _process_value(self, value): + return value class MxValue(object): + @classmethod + def _validate_value(cls, value): + reasons = [] + if 'priority' not in value: + reasons.append('missing priority') + if 'value' not in value: + reasons.append('missing value') + return reasons + def __init__(self, value): # TODO: rename preference self.priority = int(value['priority']) @@ -363,19 +442,38 @@ class MxValue(object): class MxRecord(_ValuesMixin, Record): _type = 'MX' + @classmethod + def _validate_value(cls, value): + return MxValue._validate_value(value) + def _process_values(self, values): - ret = [] - for value in values: - try: - ret.append(MxValue(value)) - except KeyError as e: - raise Exception('Invalid value in record {}, missing {}' - .format(self.fqdn, e.args[0])) - return ret + return [MxValue(v) for v in values] class NaptrValue(object): + @classmethod + def _validate_value(cls, data): + reasons = [] + try: + int(data['order']) + except KeyError: + reasons.append('missing order') + except ValueError: + reasons.append('invalid order "{}"'.format(data['order'])) + try: + int(data['preference']) + except KeyError: + reasons.append('missing preference') + except ValueError: + reasons.append('invalid preference "{}"' + .format(data['preference'])) + # TODO: validate field data + for k in ('flags', 'service', 'regexp', 'replacement'): + if k not in data: + reasons.append('missing {}'.format(k)) + return reasons + def __init__(self, value): self.order = int(value['order']) self.preference = int(value['preference']) @@ -420,42 +518,65 @@ class NaptrValue(object): class NaptrRecord(_ValuesMixin, Record): _type = 'NAPTR' + @classmethod + def _validate_value(cls, value): + return NaptrValue._validate_value(value) + def _process_values(self, values): - ret = [] - for value in values: - try: - ret.append(NaptrValue(value)) - except KeyError as e: - raise Exception('Invalid value in record {}, missing {}' - .format(self.fqdn, e.args[0])) - return ret + return [NaptrValue(v) for v in values] class NsRecord(_ValuesMixin, Record): _type = 'NS' + @classmethod + def _validate_value(cls, value): + reasons = [] + if not value.endswith('.'): + reasons.append('missing trailing .') + return reasons + def _process_values(self, values): - ret = [] - for ns in values: - if not ns.endswith('.'): - raise Exception('Invalid record {}, value {} missing ' - 'trailing .'.format(self.fqdn, ns)) - ret.append(ns.lower()) - return ret + return values class PtrRecord(_ValueMixin, Record): _type = 'PTR' - def _process_value(self, value): + @classmethod + def _validate_value(cls, value): + reasons = [] if not value.endswith('.'): - raise Exception('Invalid record {}, value ({}) missing trailing .' - .format(self.fqdn, value)) - return value.lower() + reasons.append('missing trailing .') + return reasons + + def _process_value(self, value): + return value class SshfpValue(object): + @classmethod + def _validate_value(cls, value): + reasons = [] + # TODO: validate algorithm and fingerprint_type values + try: + int(value['algorithm']) + except KeyError: + reasons.append('missing algorithm') + except ValueError: + reasons.append('invalid algorithm "{}"'.format(value['algorithm'])) + try: + int(value['fingerprint_type']) + except KeyError: + reasons.append('missing fingerprint_type') + except ValueError: + reasons.append('invalid fingerprint_type "{}"' + .format(value['fingerprint_type'])) + if 'fingerprint' not in value: + reasons.append('missing fingerprint') + return reasons + def __init__(self, value): self.algorithm = int(value['algorithm']) self.fingerprint_type = int(value['fingerprint_type']) @@ -484,26 +605,61 @@ class SshfpValue(object): class SshfpRecord(_ValuesMixin, Record): _type = 'SSHFP' + @classmethod + def _validate_value(cls, value): + return SshfpValue._validate_value(value) + def _process_values(self, values): - ret = [] - for value in values: - try: - ret.append(SshfpValue(value)) - except KeyError as e: - raise Exception('Invalid value in record {}, missing {}' - .format(self.fqdn, e.args[0])) - return ret + return [SshfpValue(v) for v in values] + + +_unescaped_semicolon_re = re.compile(r'\w;') class SpfRecord(_ValuesMixin, Record): _type = 'SPF' + @classmethod + def _validate_value(cls, value): + if _unescaped_semicolon_re.search(value): + return ['unescaped ;'] + return [] + def _process_values(self, values): return values class SrvValue(object): + @classmethod + def _validate_value(self, value): + reasons = [] + # TODO: validate algorithm and fingerprint_type values + try: + int(value['priority']) + except KeyError: + reasons.append('missing priority') + except ValueError: + reasons.append('invalid priority "{}"'.format(value['priority'])) + try: + int(value['weight']) + except KeyError: + reasons.append('missing weight') + except ValueError: + reasons.append('invalid weight "{}"'.format(value['weight'])) + try: + int(value['port']) + except KeyError: + reasons.append('missing port') + except ValueError: + reasons.append('invalid port "{}"'.format(value['port'])) + try: + if not value['target'].endswith('.'): + reasons.append('missing trailing .') + except KeyError: + reasons.append('missing target') + return reasons + def __init__(self, value): self.priority = int(value['priority']) self.weight = int(value['weight']) @@ -537,28 +693,30 @@ class SrvRecord(_ValuesMixin, Record): _type = 'SRV' _name_re = re.compile(r'^_[^\.]+\.[^\.]+') - def __init__(self, zone, name, data, source=None): - if not self._name_re.match(name): - raise Exception('Invalid name {}.{}'.format(name, zone.name)) - super(SrvRecord, self).__init__(zone, name, data, source) + @classmethod + def validate(cls, name, data): + reasons = [] + if not cls._name_re.match(name): + reasons.append('invalid name') + reasons.extend(super(SrvRecord, cls).validate(name, data)) + return reasons + + @classmethod + def _validate_value(cls, value): + return SrvValue._validate_value(value) def _process_values(self, values): - ret = [] - for value in values: - try: - ret.append(SrvValue(value)) - except KeyError as e: - raise Exception('Invalid value in record {}, missing {}' - .format(self.fqdn, e.args[0])) - return ret + return [SrvValue(v) for v in values] class TxtRecord(_ValuesMixin, Record): _type = 'TXT' + @classmethod + def _validate_value(cls, value): + if _unescaped_semicolon_re.search(value): + return ['unescaped ;'] + return [] + def _process_values(self, values): - for value in values: - if _unescaped_semicolon_re.search(value): - raise Exception('Invalid record {}, unescaped ;' - .format(self.fqdn)) return values diff --git a/tests/test_octodns_record.py b/tests/test_octodns_record.py index 52505cb..99a502e 100644 --- a/tests/test_octodns_record.py +++ b/tests/test_octodns_record.py @@ -9,7 +9,8 @@ from unittest import TestCase from octodns.record import ARecord, AaaaRecord, AliasRecord, CnameRecord, \ Create, Delete, GeoValue, MxRecord, NaptrRecord, NaptrValue, NsRecord, \ - PtrRecord, Record, SshfpRecord, SpfRecord, SrvRecord, TxtRecord, Update + Record, SshfpRecord, SpfRecord, SrvRecord, TxtRecord, Update, \ + ValidationError from octodns.zone import Zone from helpers import GeoProvider, SimpleProvider @@ -42,15 +43,6 @@ class TestRecord(TestCase): self.assertEquals([b_value], b.values) self.assertEquals(b_data, b.data) - # missing ttl - with self.assertRaises(Exception) as ctx: - ARecord(self.zone, None, {'value': '1.1.1.1'}) - self.assertTrue('missing ttl' in ctx.exception.message) - # missing values & value - with self.assertRaises(Exception) as ctx: - ARecord(self.zone, None, {'ttl': 42}) - self.assertTrue('missing value(s)' in ctx.exception.message) - # top-level data = {'ttl': 30, 'value': '4.2.3.4'} self.assertEquals(self.zone.name, ARecord(self.zone, '', data).fqdn) @@ -104,20 +96,6 @@ class TestRecord(TestCase): DummyRecord().__repr__() - def test_invalid_a(self): - with self.assertRaises(Exception) as ctx: - ARecord(self.zone, 'a', { - 'ttl': 30, - 'value': 'foo', - }) - self.assertTrue('Invalid record' in ctx.exception.message) - with self.assertRaises(Exception) as ctx: - ARecord(self.zone, 'a', { - 'ttl': 30, - 'values': ['1.2.3.4', 'bar'], - }) - self.assertTrue('Invalid record' in ctx.exception.message) - def test_geo(self): geo_data = {'ttl': 42, 'values': ['5.2.3.4', '6.2.3.4'], 'geo': {'AF': ['1.1.1.1'], @@ -157,19 +135,6 @@ class TestRecord(TestCase): # Geo provider does consider lack of geo diffs to be changes self.assertTrue(geo.changes(other, geo_target)) - # invalid geo code - with self.assertRaises(Exception) as ctx: - ARecord(self.zone, 'geo', {'ttl': 42, - 'values': ['5.2.3.4', '6.2.3.4'], - 'geo': {'abc': ['1.1.1.1']}}) - self.assertEquals('Invalid geo "abc"', ctx.exception.message) - - with self.assertRaises(Exception) as ctx: - ARecord(self.zone, 'geo', {'ttl': 42, - 'values': ['5.2.3.4', '6.2.3.4'], - 'geo': {'NA-US': ['1.1.1']}}) - self.assertTrue('not a valid ip' in ctx.exception.message) - # __repr__ doesn't blow up geo.__repr__() @@ -187,30 +152,12 @@ class TestRecord(TestCase): self.assertEquals([b_value], b.values) self.assertEquals(b_data, b.data) - # missing values & value - with self.assertRaises(Exception) as ctx: - _type(self.zone, None, {'ttl': 42}) - self.assertTrue('missing value(s)' in ctx.exception.message) - def test_aaaa(self): a_values = ['2001:0db8:3c4d:0015:0000:0000:1a2f:1a2b', '2001:0db8:3c4d:0015:0000:0000:1a2f:1a3b'] b_value = '2001:0db8:3c4d:0015:0000:0000:1a2f:1a4b' self.assertMultipleValues(AaaaRecord, a_values, b_value) - with self.assertRaises(Exception) as ctx: - AaaaRecord(self.zone, 'a', { - 'ttl': 30, - 'value': 'foo', - }) - self.assertTrue('Invalid record' in ctx.exception.message) - with self.assertRaises(Exception) as ctx: - AaaaRecord(self.zone, 'a', { - 'ttl': 30, - 'values': [b_value, 'bar'], - }) - self.assertTrue('Invalid record' in ctx.exception.message) - def assertSingleValue(self, _type, a_value, b_value): a_data = {'ttl': 30, 'value': a_value} a = _type(self.zone, 'a', a_data) @@ -225,11 +172,6 @@ class TestRecord(TestCase): self.assertEquals(b_value, b.value) self.assertEquals(b_data, b.data) - # missing value - with self.assertRaises(Exception) as ctx: - _type(self.zone, None, {'ttl': 42}) - self.assertTrue('missing value' in ctx.exception.message) - target = SimpleProvider() # No changes with self self.assertFalse(a.changes(a, target)) @@ -251,15 +193,6 @@ class TestRecord(TestCase): self.assertEquals(a_data['value'], a.value) self.assertEquals(a_data, a.data) - # missing value - with self.assertRaises(Exception) as ctx: - AliasRecord(self.zone, None, {'ttl': 0}) - self.assertTrue('missing value' in ctx.exception.message) - # bad name - with self.assertRaises(Exception) as ctx: - AliasRecord(self.zone, None, {'ttl': 0, 'value': 'www.unit.tests'}) - self.assertTrue('missing trailing .' in ctx.exception.message) - target = SimpleProvider() # No changes with self self.assertFalse(a.changes(a, target)) @@ -277,19 +210,6 @@ class TestRecord(TestCase): self.assertSingleValue(CnameRecord, 'target.foo.com.', 'other.foo.com.') - with self.assertRaises(Exception) as ctx: - CnameRecord(self.zone, 'a', { - 'ttl': 30, - 'value': 'foo', - }) - self.assertTrue('Invalid record' in ctx.exception.message) - with self.assertRaises(Exception) as ctx: - CnameRecord(self.zone, 'a', { - 'ttl': 30, - 'values': ['foo.com.', 'bar.com'], - }) - self.assertTrue('Invalid record' in ctx.exception.message) - def test_mx(self): a_values = [{ 'priority': 10, @@ -319,15 +239,6 @@ class TestRecord(TestCase): self.assertEquals(b_value['value'], b.values[0].value) self.assertEquals(b_data, b.data) - # missing value - with self.assertRaises(Exception) as ctx: - MxRecord(self.zone, None, {'ttl': 42}) - self.assertTrue('missing value(s)' in ctx.exception.message) - # invalid value - with self.assertRaises(Exception) as ctx: - MxRecord(self.zone, None, {'ttl': 42, 'value': {}}) - self.assertTrue('Invalid value' in ctx.exception.message) - target = SimpleProvider() # No changes with self self.assertFalse(a.changes(a, target)) @@ -387,15 +298,6 @@ class TestRecord(TestCase): self.assertEquals(b_value[k], getattr(b.values[0], k)) self.assertEquals(b_data, b.data) - # missing value - with self.assertRaises(Exception) as ctx: - NaptrRecord(self.zone, None, {'ttl': 42}) - self.assertTrue('missing value' in ctx.exception.message) - # invalid value - with self.assertRaises(Exception) as ctx: - NaptrRecord(self.zone, None, {'ttl': 42, 'value': {}}) - self.assertTrue('Invalid value' in ctx.exception.message) - target = SimpleProvider() # No changes with self self.assertFalse(a.changes(a, target)) @@ -538,33 +440,6 @@ class TestRecord(TestCase): self.assertEquals([b_value], b.values) self.assertEquals(b_data, b.data) - # missing values & value - with self.assertRaises(Exception) as ctx: - NsRecord(self.zone, None, {'ttl': 42}) - self.assertTrue('missing value(s)' in ctx.exception.message) - - with self.assertRaises(Exception) as ctx: - NsRecord(self.zone, 'a', { - 'ttl': 30, - 'value': 'foo', - }) - self.assertTrue('Invalid record' in ctx.exception.message) - with self.assertRaises(Exception) as ctx: - NsRecord(self.zone, 'a', { - 'ttl': 30, - 'values': ['foo.com.', 'bar.com'], - }) - self.assertTrue('Invalid record' in ctx.exception.message) - - def test_ptr(self): - self.assertSingleValue(PtrRecord, 'foo.bar.com.', 'other.bar.com.') - with self.assertRaises(Exception) as ctx: - PtrRecord(self.zone, 'a', { - 'ttl': 30, - 'value': 'foo', - }) - self.assertTrue('Invalid record' in ctx.exception.message) - def test_sshfp(self): a_values = [{ 'algorithm': 10, @@ -599,15 +474,6 @@ class TestRecord(TestCase): self.assertEquals(b_value['fingerprint'], b.values[0].fingerprint) self.assertEquals(b_data, b.data) - # missing value - with self.assertRaises(Exception) as ctx: - SshfpRecord(self.zone, None, {'ttl': 42}) - self.assertTrue('missing value(s)' in ctx.exception.message) - # invalid value - with self.assertRaises(Exception) as ctx: - SshfpRecord(self.zone, None, {'ttl': 42, 'value': {}}) - self.assertTrue('Invalid value' in ctx.exception.message) - target = SimpleProvider() # No changes with self self.assertFalse(a.changes(a, target)) @@ -677,21 +543,6 @@ class TestRecord(TestCase): self.assertEquals(b_value['target'], b.values[0].target) self.assertEquals(b_data, b.data) - # invalid name - with self.assertRaises(Exception) as ctx: - SrvRecord(self.zone, 'bad', {'ttl': 42}) - self.assertEquals('Invalid name bad.unit.tests.', - ctx.exception.message) - - # missing value - with self.assertRaises(Exception) as ctx: - SrvRecord(self.zone, '_missing._tcp', {'ttl': 42}) - self.assertTrue('missing value(s)' in ctx.exception.message) - # invalid value - with self.assertRaises(Exception) as ctx: - SrvRecord(self.zone, '_missing._udp', {'ttl': 42, 'value': {}}) - self.assertTrue('Invalid value' in ctx.exception.message) - target = SimpleProvider() # No changes with self self.assertFalse(a.changes(a, target)) @@ -729,21 +580,6 @@ class TestRecord(TestCase): b_value = 'b other' self.assertMultipleValues(TxtRecord, a_values, b_value) - Record.new(self.zone, 'txt', { - 'ttl': 44, - 'type': 'TXT', - 'value': 'escaped\; foo', - }) - - with self.assertRaises(Exception) as ctx: - Record.new(self.zone, 'txt', { - 'ttl': 44, - 'type': 'TXT', - 'value': 'un-escaped; foo', - }) - self.assertEquals('Invalid record txt.unit.tests., unescaped ;', - ctx.exception.message) - def test_record_new(self): txt = Record.new(self.zone, 'txt', { 'ttl': 44, @@ -794,3 +630,642 @@ class TestRecord(TestCase): self.assertEquals('CA', geo.subdivision_code) self.assertEquals(values, geo.values) self.assertEquals(['NA-US', 'NA'], list(geo.parents)) + + +class TestRecordValidation(TestCase): + zone = Zone('unit.tests.', []) + + def test_base(self): + # no ttl + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'A', + 'value': '1.2.3.4', + }) + self.assertEquals(['missing ttl'], ctx.exception.reasons) + # invalid ttl + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, 'www', { + 'type': 'A', + 'ttl': -1, + 'value': '1.2.3.4', + }) + self.assertEquals('www.unit.tests.', ctx.exception.fqdn) + self.assertEquals(['invalid ttl'], ctx.exception.reasons) + + def test_A_and_values_mixin(self): + # doesn't blow up + Record.new(self.zone, '', { + 'type': 'A', + 'ttl': 600, + 'value': '1.2.3.4', + }) + Record.new(self.zone, '', { + 'type': 'A', + 'ttl': 600, + 'values': [ + '1.2.3.4', + '1.2.3.5', + ] + }) + + # missing value(s) + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'A', + 'ttl': 600, + }) + self.assertEquals(['missing value(s)'], ctx.exception.reasons) + # missing value(s) & ttl + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'A', + }) + self.assertEquals(['missing ttl', 'missing value(s)'], + ctx.exception.reasons) + + # invalid ip address + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'A', + 'ttl': 600, + 'value': 'hello' + }) + self.assertEquals(['invalid ip address "hello"'], + ctx.exception.reasons) + + # invalid ip addresses + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'A', + 'ttl': 600, + 'values': ['hello', 'goodbye'] + }) + self.assertEquals([ + 'invalid ip address "hello"', + 'invalid ip address "goodbye"' + ], ctx.exception.reasons) + + # invalid & valid ip addresses, no ttl + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'A', + 'values': ['1.2.3.4', 'hello', '5.6.7.8'] + }) + self.assertEquals([ + 'missing ttl', + 'invalid ip address "hello"', + ], ctx.exception.reasons) + + def test_geo(self): + Record.new(self.zone, '', { + 'geo': { + 'NA': ['1.2.3.5'], + 'NA-US': ['1.2.3.5', '1.2.3.6'] + }, + 'type': 'A', + 'ttl': 600, + 'value': '1.2.3.4', + }) + + # invalid ip address + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'geo': { + 'NA': ['hello'], + 'NA-US': ['1.2.3.5', '1.2.3.6'] + }, + 'type': 'A', + 'ttl': 600, + 'value': '1.2.3.4', + }) + self.assertEquals(['invalid ip address "hello"'], + ctx.exception.reasons) + + # invalid geo code + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'geo': { + 'XYZ': ['1.2.3.4'], + }, + 'type': 'A', + 'ttl': 600, + 'value': '1.2.3.4', + }) + self.assertEquals(['invalid geo "XYZ"'], ctx.exception.reasons) + + # invalid ip address + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'geo': { + 'NA': ['hello'], + 'NA-US': ['1.2.3.5', 'goodbye'] + }, + 'type': 'A', + 'ttl': 600, + 'value': '1.2.3.4', + }) + self.assertEquals([ + 'invalid ip address "hello"', + 'invalid ip address "goodbye"' + ], ctx.exception.reasons) + + def test_AAAA(self): + # doesn't blow up + Record.new(self.zone, '', { + 'type': 'AAAA', + 'ttl': 600, + 'value': '2601:644:500:e210:62f8:1dff:feb8:947a', + }) + Record.new(self.zone, '', { + 'type': 'AAAA', + 'ttl': 600, + 'values': [ + '2601:644:500:e210:62f8:1dff:feb8:947a', + '2601:644:500:e210:62f8:1dff:feb8:947b', + ] + }) + + # invalid ip address + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'AAAA', + 'ttl': 600, + 'value': 'hello' + }) + self.assertEquals(['invalid ip address "hello"'], + ctx.exception.reasons) + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'AAAA', + 'ttl': 600, + 'value': '1.2.3.4' + }) + self.assertEquals(['invalid ip address "1.2.3.4"'], + ctx.exception.reasons) + + # invalid ip addresses + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'AAAA', + 'ttl': 600, + 'values': ['hello', 'goodbye'] + }) + self.assertEquals([ + 'invalid ip address "hello"', + 'invalid ip address "goodbye"' + ], ctx.exception.reasons) + + def test_ALIAS_and_value_mixin(self): + # doesn't blow up + Record.new(self.zone, '', { + 'type': 'ALIAS', + 'ttl': 600, + 'value': 'foo.bar.com.', + }) + + # missing value + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'ALIAS', + 'ttl': 600, + }) + self.assertEquals(['missing value'], ctx.exception.reasons) + + # missing trailing . + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'ALIAS', + 'ttl': 600, + 'value': 'foo.bar.com', + }) + self.assertEquals(['missing trailing .'], ctx.exception.reasons) + + def test_CNAME(self): + # doesn't blow up + Record.new(self.zone, '', { + 'type': 'CNAME', + 'ttl': 600, + 'value': 'foo.bar.com.', + }) + + # missing trailing . + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'CNAME', + 'ttl': 600, + 'value': 'foo.bar.com', + }) + self.assertEquals(['missing trailing .'], ctx.exception.reasons) + + def test_MX(self): + # doesn't blow up + Record.new(self.zone, '', { + 'type': 'MX', + 'ttl': 600, + 'value': { + 'priority': 10, + 'value': 'foo.bar.com.' + } + }) + + # missing priority + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'MX', + 'ttl': 600, + 'value': { + 'value': 'foo.bar.com.' + } + }) + self.assertEquals(['missing priority'], ctx.exception.reasons) + + # missing value + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'MX', + 'ttl': 600, + 'value': { + 'priority': 10, + } + }) + self.assertEquals(['missing value'], ctx.exception.reasons) + + def test_NXPTR(self): + # doesn't blow up + Record.new(self.zone, '', { + 'type': 'NAPTR', + 'ttl': 600, + 'value': { + 'order': 10, + 'preference': 20, + 'flags': 'f', + 'service': 'srv', + 'regexp': '.*', + 'replacement': '.' + } + }) + + # missing X priority + value = { + 'order': 10, + 'preference': 20, + 'flags': 'f', + 'service': 'srv', + 'regexp': '.*', + 'replacement': '.' + } + for k in ('order', 'preference', 'flags', 'service', 'regexp', + 'replacement'): + v = dict(value) + del v[k] + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'NAPTR', + 'ttl': 600, + 'value': v + }) + self.assertEquals(['missing {}'.format(k)], ctx.exception.reasons) + + # non-int order + v = dict(value) + v['order'] = 'boo' + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'NAPTR', + 'ttl': 600, + 'value': v + }) + self.assertEquals(['invalid order "boo"'], ctx.exception.reasons) + + # non-int preference + v = dict(value) + v['preference'] = 'who' + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'NAPTR', + 'ttl': 600, + 'value': v + }) + self.assertEquals(['invalid preference "who"'], ctx.exception.reasons) + + def test_NS(self): + # doesn't blow up + Record.new(self.zone, '', { + 'type': 'NS', + 'ttl': 600, + 'values': [ + 'foo.bar.com.', + '1.2.3.4.' + ] + }) + + # missing value + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'NS', + 'ttl': 600, + }) + self.assertEquals(['missing value(s)'], ctx.exception.reasons) + + # no trailing . + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'NS', + 'ttl': 600, + 'value': 'foo.bar', + }) + self.assertEquals(['missing trailing .'], ctx.exception.reasons) + + def test_PTR(self): + # doesn't blow up (name & zone here don't make any sense, but not + # important) + Record.new(self.zone, '', { + 'type': 'PTR', + 'ttl': 600, + 'value': 'foo.bar.com.', + }) + + # missing value + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'PTR', + 'ttl': 600, + }) + self.assertEquals(['missing value'], ctx.exception.reasons) + + # no trailing . + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'PTR', + 'ttl': 600, + 'value': 'foo.bar', + }) + self.assertEquals(['missing trailing .'], ctx.exception.reasons) + + def test_SSHFP(self): + # doesn't blow up + Record.new(self.zone, '', { + 'type': 'SSHFP', + 'ttl': 600, + 'value': { + 'algorithm': 1, + 'fingerprint_type': 1, + 'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73' + } + }) + + # missing algorithm + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'SSHFP', + 'ttl': 600, + 'value': { + 'fingerprint_type': 1, + 'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73' + } + }) + self.assertEquals(['missing algorithm'], ctx.exception.reasons) + + # invalid algorithm + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'SSHFP', + 'ttl': 600, + 'value': { + 'algorithm': 'nope', + 'fingerprint_type': 1, + 'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73' + } + }) + self.assertEquals(['invalid algorithm "nope"'], ctx.exception.reasons) + + # missing fingerprint_type + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'SSHFP', + 'ttl': 600, + 'value': { + 'algorithm': 1, + 'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73' + } + }) + self.assertEquals(['missing fingerprint_type'], ctx.exception.reasons) + + # invalid fingerprint_type + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'SSHFP', + 'ttl': 600, + 'value': { + 'algorithm': 1, + 'fingerprint_type': 'yeeah', + 'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73' + } + }) + self.assertEquals(['invalid fingerprint_type "yeeah"'], + ctx.exception.reasons) + + # missing fingerprint + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'SSHFP', + 'ttl': 600, + 'value': { + 'algorithm': 1, + 'fingerprint_type': 1, + } + }) + self.assertEquals(['missing fingerprint'], ctx.exception.reasons) + + def test_SPF(self): + # doesn't blow up (name & zone here don't make any sense, but not + # important) + Record.new(self.zone, '', { + 'type': 'SPF', + 'ttl': 600, + 'values': [ + 'v=spf1 ip4:192.168.0.1/16-all', + 'v=spf1 ip4:10.1.2.1/24-all', + 'this has some\; semi-colons\; in it', + ] + }) + + # missing value + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'SPF', + 'ttl': 600, + }) + self.assertEquals(['missing value(s)'], ctx.exception.reasons) + + # missing escapes + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'SPF', + 'ttl': 600, + 'value': 'this has some; semi-colons\; in it', + }) + self.assertEquals(['unescaped ;'], ctx.exception.reasons) + + def test_SRV(self): + # doesn't blow up + Record.new(self.zone, '_srv._tcp', { + 'type': 'SRV', + 'ttl': 600, + 'value': { + 'priority': 1, + 'weight': 2, + 'port': 3, + 'target': 'foo.bar.baz.' + } + }) + + # invalid name + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, 'neup', { + 'type': 'SRV', + 'ttl': 600, + 'value': { + 'priority': 1, + 'weight': 2, + 'port': 3, + 'target': 'foo.bar.baz.' + } + }) + self.assertEquals(['invalid name'], ctx.exception.reasons) + + # missing priority + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '_srv._tcp', { + 'type': 'SRV', + 'ttl': 600, + 'value': { + 'weight': 2, + 'port': 3, + 'target': 'foo.bar.baz.' + } + }) + self.assertEquals(['missing priority'], ctx.exception.reasons) + + # invalid priority + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '_srv._tcp', { + 'type': 'SRV', + 'ttl': 600, + 'value': { + 'priority': 'foo', + 'weight': 2, + 'port': 3, + 'target': 'foo.bar.baz.' + } + }) + self.assertEquals(['invalid priority "foo"'], ctx.exception.reasons) + + # missing weight + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '_srv._tcp', { + 'type': 'SRV', + 'ttl': 600, + 'value': { + 'priority': 1, + 'port': 3, + 'target': 'foo.bar.baz.' + } + }) + self.assertEquals(['missing weight'], ctx.exception.reasons) + # invalid weight + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '_srv._tcp', { + 'type': 'SRV', + 'ttl': 600, + 'value': { + 'priority': 1, + 'weight': 'foo', + 'port': 3, + 'target': 'foo.bar.baz.' + } + }) + self.assertEquals(['invalid weight "foo"'], ctx.exception.reasons) + + # missing port + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '_srv._tcp', { + 'type': 'SRV', + 'ttl': 600, + 'value': { + 'priority': 1, + 'weight': 2, + 'target': 'foo.bar.baz.' + } + }) + self.assertEquals(['missing port'], ctx.exception.reasons) + # invalid port + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '_srv._tcp', { + 'type': 'SRV', + 'ttl': 600, + 'value': { + 'priority': 1, + 'weight': 2, + 'port': 'foo', + 'target': 'foo.bar.baz.' + } + }) + self.assertEquals(['invalid port "foo"'], ctx.exception.reasons) + + # missing target + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '_srv._tcp', { + 'type': 'SRV', + 'ttl': 600, + 'value': { + 'priority': 1, + 'weight': 2, + 'port': 3, + } + }) + self.assertEquals(['missing target'], ctx.exception.reasons) + # invalid target + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '_srv._tcp', { + 'type': 'SRV', + 'ttl': 600, + 'value': { + 'priority': 1, + 'weight': 2, + 'port': 3, + 'target': 'foo.bar.baz' + } + }) + self.assertEquals(['missing trailing .'], + ctx.exception.reasons) + + def test_TXT(self): + # doesn't blow up (name & zone here don't make any sense, but not + # important) + Record.new(self.zone, '', { + 'type': 'TXT', + 'ttl': 600, + 'values': [ + 'hello world', + 'this has some\; semi-colons\; in it', + ] + }) + + # missing value + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'TXT', + 'ttl': 600, + }) + self.assertEquals(['missing value(s)'], ctx.exception.reasons) + + # missing escapes + with self.assertRaises(ValidationError) as ctx: + Record.new(self.zone, '', { + 'type': 'TXT', + 'ttl': 600, + 'value': 'this has some; semi-colons\; in it', + }) + self.assertEquals(['unescaped ;'], ctx.exception.reasons)