1
0
mirror of https://github.com/github/octodns.git synced 2024-05-11 05:55:00 +00:00
Files
github-octodns/tests/test_octodns_record.py
2019-04-17 23:43:01 +08:00

3341 lines
108 KiB
Python

#
#
#
from __future__ import absolute_import, division, print_function, \
unicode_literals
from unittest import TestCase
from octodns.record import ARecord, AaaaRecord, AliasRecord, CaaRecord, \
CnameRecord, Create, Delete, GeoValue, MxRecord, NaptrRecord, NaptrValue, \
NsRecord, PtrRecord, Record, SshfpRecord, SpfRecord, SrvRecord, \
TxtRecord, Update, ValidationError, _Dynamic, _DynamicPool, _DynamicRule
from octodns.zone import Zone
from helpers import DynamicProvider, GeoProvider, SimpleProvider
class TestRecord(TestCase):
zone = Zone('unit.tests.', [])
def test_lowering(self):
record = ARecord(self.zone, 'MiXeDcAsE', {
'ttl': 30,
'type': 'A',
'value': '1.2.3.4',
})
self.assertEquals('mixedcase', record.name)
def test_alias_lowering_value(self):
upper_record = AliasRecord(self.zone, 'aliasUppwerValue', {
'ttl': 30,
'type': 'ALIAS',
'value': 'GITHUB.COM',
})
lower_record = AliasRecord(self.zone, 'aliasLowerValue', {
'ttl': 30,
'type': 'ALIAS',
'value': 'github.com',
})
self.assertEquals(upper_record.value, lower_record.value)
def test_cname_lowering_value(self):
upper_record = CnameRecord(self.zone, 'CnameUppwerValue', {
'ttl': 30,
'type': 'CNAME',
'value': 'GITHUB.COM',
})
lower_record = CnameRecord(self.zone, 'CnameLowerValue', {
'ttl': 30,
'type': 'CNAME',
'value': 'github.com',
})
self.assertEquals(upper_record.value, lower_record.value)
def test_ptr_lowering_value(self):
upper_record = PtrRecord(self.zone, 'PtrUppwerValue', {
'ttl': 30,
'type': 'PTR',
'value': 'GITHUB.COM',
})
lower_record = PtrRecord(self.zone, 'PtrLowerValue', {
'ttl': 30,
'type': 'PTR',
'value': 'github.com',
})
self.assertEquals(upper_record.value, lower_record.value)
def test_a_and_record(self):
a_values = ['1.2.3.4', '2.2.3.4']
a_data = {'ttl': 30, 'values': a_values}
a = ARecord(self.zone, 'a', a_data)
self.assertEquals('a', a.name)
self.assertEquals('a.unit.tests.', a.fqdn)
self.assertEquals(30, a.ttl)
self.assertEquals(a_values, a.values)
self.assertEquals(a_data, a.data)
b_value = '3.2.3.4'
b_data = {'ttl': 30, 'value': b_value}
b = ARecord(self.zone, 'b', b_data)
self.assertEquals([b_value], b.values)
self.assertEquals(b_data, b.data)
# top-level
data = {'ttl': 30, 'value': '4.2.3.4'}
self.assertEquals(self.zone.name, ARecord(self.zone, '', data).fqdn)
self.assertEquals(self.zone.name, ARecord(self.zone, None, data).fqdn)
# ARecord equate with itself
self.assertTrue(a == a)
# Records with differing names and same type don't equate
self.assertFalse(a == b)
# Records with same name & type equate even if ttl is different
self.assertTrue(a == ARecord(self.zone, 'a',
{'ttl': 31, 'values': a_values}))
# Records with same name & type equate even if values are different
self.assertTrue(a == ARecord(self.zone, 'a',
{'ttl': 30, 'value': b_value}))
target = SimpleProvider()
# no changes if self
self.assertFalse(a.changes(a, target))
# no changes if clone
other = ARecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
self.assertFalse(a.changes(other, target))
# changes if ttl modified
other.ttl = 31
update = a.changes(other, target)
self.assertEquals(a, update.existing)
self.assertEquals(other, update.new)
# changes if values modified
other.ttl = a.ttl
other.values = ['4.4.4.4']
update = a.changes(other, target)
self.assertEquals(a, update.existing)
self.assertEquals(other, update.new)
# Hashing
records = set()
records.add(a)
self.assertTrue(a in records)
self.assertFalse(b in records)
records.add(b)
self.assertTrue(b in records)
# __repr__ doesn't blow up
a.__repr__()
# Record.__repr__ does
with self.assertRaises(NotImplementedError):
class DummyRecord(Record):
def __init__(self):
pass
DummyRecord().__repr__()
def test_values_mixin_data(self):
# no values, no value or values in data
a = ARecord(self.zone, '', {
'type': 'A',
'ttl': 600,
'values': []
})
self.assertNotIn('values', a.data)
# empty value, no value or values in data
b = ARecord(self.zone, '', {
'type': 'A',
'ttl': 600,
'values': ['']
})
self.assertNotIn('value', b.data)
# empty/None values, no value or values in data
c = ARecord(self.zone, '', {
'type': 'A',
'ttl': 600,
'values': ['', None]
})
self.assertNotIn('values', c.data)
# empty/None values and valid, value in data
c = ARecord(self.zone, '', {
'type': 'A',
'ttl': 600,
'values': ['', None, '10.10.10.10']
})
self.assertNotIn('values', c.data)
self.assertEqual('10.10.10.10', c.data['value'])
def test_value_mixin_data(self):
# unspecified value, no value in data
a = AliasRecord(self.zone, '', {
'type': 'ALIAS',
'ttl': 600,
'value': None
})
self.assertNotIn('value', a.data)
# unspecified value, no value in data
a = AliasRecord(self.zone, '', {
'type': 'ALIAS',
'ttl': 600,
'value': ''
})
self.assertNotIn('value', a.data)
def test_geo(self):
geo_data = {'ttl': 42, 'values': ['5.2.3.4', '6.2.3.4'],
'geo': {'AF': ['1.1.1.1'],
'AS-JP': ['2.2.2.2', '3.3.3.3'],
'NA-US': ['4.4.4.4', '5.5.5.5'],
'NA-US-CA': ['6.6.6.6', '7.7.7.7']}}
geo = ARecord(self.zone, 'geo', geo_data)
self.assertEquals(geo_data, geo.data)
other_data = {'ttl': 42, 'values': ['5.2.3.4', '6.2.3.4'],
'geo': {'AF': ['1.1.1.1'],
'AS-JP': ['2.2.2.2', '3.3.3.3'],
'NA-US': ['4.4.4.4', '5.5.5.5'],
'NA-US-CA': ['6.6.6.6', '7.7.7.7']}}
other = ARecord(self.zone, 'geo', other_data)
self.assertEquals(other_data, other.data)
simple_target = SimpleProvider()
geo_target = GeoProvider()
# Geo provider doesn't consider identical geo to be changes
self.assertFalse(geo.changes(geo, geo_target))
# geo values don't impact equality
other.geo['AF'].values = ['9.9.9.9']
self.assertTrue(geo == other)
# Non-geo supporting provider doesn't consider geo diffs to be changes
self.assertFalse(geo.changes(other, simple_target))
# Geo provider does consider geo diffs to be changes
self.assertTrue(geo.changes(other, geo_target))
# Object without geo doesn't impact equality
other.geo = {}
self.assertTrue(geo == other)
# Non-geo supporting provider doesn't consider lack of geo a diff
self.assertFalse(geo.changes(other, simple_target))
# Geo provider does consider lack of geo diffs to be changes
self.assertTrue(geo.changes(other, geo_target))
# __repr__ doesn't blow up
geo.__repr__()
def assertMultipleValues(self, _type, a_values, b_value):
a_data = {'ttl': 30, 'values': a_values}
a = _type(self.zone, 'a', a_data)
self.assertEquals('a', a.name)
self.assertEquals('a.unit.tests.', a.fqdn)
self.assertEquals(30, a.ttl)
self.assertEquals(a_values, a.values)
self.assertEquals(a_data, a.data)
b_data = {'ttl': 30, 'value': b_value}
b = _type(self.zone, 'b', b_data)
self.assertEquals([b_value], b.values)
self.assertEquals(b_data, b.data)
def test_aaaa(self):
a_values = ['2001:0db8:3c4d:0015:0000:0000:1a2f:1a2b',
'2001:0db8:3c4d:0015:0000:0000:1a2f:1a3b']
b_value = '2001:0db8:3c4d:0015:0000:0000:1a2f:1a4b'
self.assertMultipleValues(AaaaRecord, a_values, b_value)
def assertSingleValue(self, _type, a_value, b_value):
a_data = {'ttl': 30, 'value': a_value}
a = _type(self.zone, 'a', a_data)
self.assertEquals('a', a.name)
self.assertEquals('a.unit.tests.', a.fqdn)
self.assertEquals(30, a.ttl)
self.assertEquals(a_value, a.value)
self.assertEquals(a_data, a.data)
b_data = {'ttl': 30, 'value': b_value}
b = _type(self.zone, 'b', b_data)
self.assertEquals(b_value, b.value)
self.assertEquals(b_data, b.data)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
# Diff in value causes change
other = _type(self.zone, 'a', {'ttl': 30, 'value': b_value})
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# __repr__ doesn't blow up
a.__repr__()
def test_alias(self):
a_data = {'ttl': 0, 'value': 'www.unit.tests.'}
a = AliasRecord(self.zone, '', a_data)
self.assertEquals('', a.name)
self.assertEquals('unit.tests.', a.fqdn)
self.assertEquals(0, a.ttl)
self.assertEquals(a_data['value'], a.value)
self.assertEquals(a_data, a.data)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
# Diff in value causes change
other = AliasRecord(self.zone, 'a', a_data)
other.value = 'foo.unit.tests.'
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# __repr__ doesn't blow up
a.__repr__()
def test_caa(self):
a_values = [{
'flags': 0,
'tag': 'issue',
'value': 'ca.example.net',
}, {
'flags': 128,
'tag': 'iodef',
'value': 'mailto:security@example.com',
}]
a_data = {'ttl': 30, 'values': a_values}
a = CaaRecord(self.zone, 'a', a_data)
self.assertEquals('a', a.name)
self.assertEquals('a.unit.tests.', a.fqdn)
self.assertEquals(30, a.ttl)
self.assertEquals(a_values[0]['flags'], a.values[0].flags)
self.assertEquals(a_values[0]['tag'], a.values[0].tag)
self.assertEquals(a_values[0]['value'], a.values[0].value)
self.assertEquals(a_values[1]['flags'], a.values[1].flags)
self.assertEquals(a_values[1]['tag'], a.values[1].tag)
self.assertEquals(a_values[1]['value'], a.values[1].value)
self.assertEquals(a_data, a.data)
b_value = {
'tag': 'iodef',
'value': 'http://iodef.example.com/',
}
b_data = {'ttl': 30, 'value': b_value}
b = CaaRecord(self.zone, 'b', b_data)
self.assertEquals(0, b.values[0].flags)
self.assertEquals(b_value['tag'], b.values[0].tag)
self.assertEquals(b_value['value'], b.values[0].value)
b_data['value']['flags'] = 0
self.assertEquals(b_data, b.data)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
# Diff in flags causes change
other = CaaRecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
other.values[0].flags = 128
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in tag causes change
other.values[0].flags = a.values[0].flags
other.values[0].tag = 'foo'
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in value causes change
other.values[0].tag = a.values[0].tag
other.values[0].value = 'bar'
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# __repr__ doesn't blow up
a.__repr__()
def test_cname(self):
self.assertSingleValue(CnameRecord, 'target.foo.com.',
'other.foo.com.')
def test_mx(self):
a_values = [{
'preference': 10,
'exchange': 'smtp1.'
}, {
'priority': 20,
'value': 'smtp2.'
}]
a_data = {'ttl': 30, 'values': a_values}
a = MxRecord(self.zone, 'a', a_data)
self.assertEquals('a', a.name)
self.assertEquals('a.unit.tests.', a.fqdn)
self.assertEquals(30, a.ttl)
self.assertEquals(a_values[0]['preference'], a.values[0].preference)
self.assertEquals(a_values[0]['exchange'], a.values[0].exchange)
self.assertEquals(a_values[1]['priority'], a.values[1].preference)
self.assertEquals(a_values[1]['value'], a.values[1].exchange)
a_data['values'][1] = {
'preference': 20,
'exchange': 'smtp2.',
}
self.assertEquals(a_data, a.data)
b_value = {
'preference': 0,
'exchange': 'smtp3.',
}
b_data = {'ttl': 30, 'value': b_value}
b = MxRecord(self.zone, 'b', b_data)
self.assertEquals(b_value['preference'], b.values[0].preference)
self.assertEquals(b_value['exchange'], b.values[0].exchange)
self.assertEquals(b_data, b.data)
a_upper_values = [{
'preference': 10,
'exchange': 'SMTP1.'
}, {
'priority': 20,
'value': 'SMTP2.'
}]
a_upper_data = {'ttl': 30, 'values': a_upper_values}
a_upper = MxRecord(self.zone, 'a', a_upper_data)
self.assertEquals(a_upper.data, a.data)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
# Diff in preference causes change
other = MxRecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
other.values[0].preference = 22
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in value causes change
other.values[0].preference = a.values[0].preference
other.values[0].exchange = 'smtpX'
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# __repr__ doesn't blow up
a.__repr__()
def test_naptr(self):
a_values = [{
'order': 10,
'preference': 11,
'flags': 'X',
'service': 'Y',
'regexp': 'Z',
'replacement': '.',
}, {
'order': 20,
'preference': 21,
'flags': 'A',
'service': 'B',
'regexp': 'C',
'replacement': 'foo.com',
}]
a_data = {'ttl': 30, 'values': a_values}
a = NaptrRecord(self.zone, 'a', a_data)
self.assertEquals('a', a.name)
self.assertEquals('a.unit.tests.', a.fqdn)
self.assertEquals(30, a.ttl)
for i in (0, 1):
for k in a_values[0].keys():
self.assertEquals(a_values[i][k], getattr(a.values[i], k))
self.assertEquals(a_data, a.data)
b_value = {
'order': 30,
'preference': 31,
'flags': 'M',
'service': 'N',
'regexp': 'O',
'replacement': 'x',
}
b_data = {'ttl': 30, 'value': b_value}
b = NaptrRecord(self.zone, 'b', b_data)
for k in a_values[0].keys():
self.assertEquals(b_value[k], getattr(b.values[0], k))
self.assertEquals(b_data, b.data)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
# Diff in priority causes change
other = NaptrRecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
other.values[0].order = 22
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in replacement causes change
other.values[0].order = a.values[0].order
other.values[0].replacement = 'smtpX'
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# full sorting
# equivalent
b_naptr_value = b.values[0]
self.assertEquals(0, b_naptr_value.__cmp__(b_naptr_value))
# by order
self.assertEquals(1, b_naptr_value.__cmp__(NaptrValue({
'order': 10,
'preference': 31,
'flags': 'M',
'service': 'N',
'regexp': 'O',
'replacement': 'x',
})))
self.assertEquals(-1, b_naptr_value.__cmp__(NaptrValue({
'order': 40,
'preference': 31,
'flags': 'M',
'service': 'N',
'regexp': 'O',
'replacement': 'x',
})))
# by preference
self.assertEquals(1, b_naptr_value.__cmp__(NaptrValue({
'order': 30,
'preference': 10,
'flags': 'M',
'service': 'N',
'regexp': 'O',
'replacement': 'x',
})))
self.assertEquals(-1, b_naptr_value.__cmp__(NaptrValue({
'order': 30,
'preference': 40,
'flags': 'M',
'service': 'N',
'regexp': 'O',
'replacement': 'x',
})))
# by flags
self.assertEquals(1, b_naptr_value.__cmp__(NaptrValue({
'order': 30,
'preference': 31,
'flags': 'A',
'service': 'N',
'regexp': 'O',
'replacement': 'x',
})))
self.assertEquals(-1, b_naptr_value.__cmp__(NaptrValue({
'order': 30,
'preference': 31,
'flags': 'Z',
'service': 'N',
'regexp': 'O',
'replacement': 'x',
})))
# by service
self.assertEquals(1, b_naptr_value.__cmp__(NaptrValue({
'order': 30,
'preference': 31,
'flags': 'M',
'service': 'A',
'regexp': 'O',
'replacement': 'x',
})))
self.assertEquals(-1, b_naptr_value.__cmp__(NaptrValue({
'order': 30,
'preference': 31,
'flags': 'M',
'service': 'Z',
'regexp': 'O',
'replacement': 'x',
})))
# by regexp
self.assertEquals(1, b_naptr_value.__cmp__(NaptrValue({
'order': 30,
'preference': 31,
'flags': 'M',
'service': 'N',
'regexp': 'A',
'replacement': 'x',
})))
self.assertEquals(-1, b_naptr_value.__cmp__(NaptrValue({
'order': 30,
'preference': 31,
'flags': 'M',
'service': 'N',
'regexp': 'Z',
'replacement': 'x',
})))
# by replacement
self.assertEquals(1, b_naptr_value.__cmp__(NaptrValue({
'order': 30,
'preference': 31,
'flags': 'M',
'service': 'N',
'regexp': 'O',
'replacement': 'a',
})))
self.assertEquals(-1, b_naptr_value.__cmp__(NaptrValue({
'order': 30,
'preference': 31,
'flags': 'M',
'service': 'N',
'regexp': 'O',
'replacement': 'z',
})))
# __repr__ doesn't blow up
a.__repr__()
def test_ns(self):
a_values = ['5.6.7.8.', '6.7.8.9.', '7.8.9.0.']
a_data = {'ttl': 30, 'values': a_values}
a = NsRecord(self.zone, 'a', a_data)
self.assertEquals('a', a.name)
self.assertEquals('a.unit.tests.', a.fqdn)
self.assertEquals(30, a.ttl)
self.assertEquals(a_values, a.values)
self.assertEquals(a_data, a.data)
b_value = '9.8.7.6.'
b_data = {'ttl': 30, 'value': b_value}
b = NsRecord(self.zone, 'b', b_data)
self.assertEquals([b_value], b.values)
self.assertEquals(b_data, b.data)
def test_sshfp(self):
a_values = [{
'algorithm': 10,
'fingerprint_type': 11,
'fingerprint': 'abc123',
}, {
'algorithm': 20,
'fingerprint_type': 21,
'fingerprint': 'def456',
}]
a_data = {'ttl': 30, 'values': a_values}
a = SshfpRecord(self.zone, 'a', a_data)
self.assertEquals('a', a.name)
self.assertEquals('a.unit.tests.', a.fqdn)
self.assertEquals(30, a.ttl)
self.assertEquals(a_values[0]['algorithm'], a.values[0].algorithm)
self.assertEquals(a_values[0]['fingerprint_type'],
a.values[0].fingerprint_type)
self.assertEquals(a_values[0]['fingerprint'], a.values[0].fingerprint)
self.assertEquals(a_data, a.data)
b_value = {
'algorithm': 30,
'fingerprint_type': 31,
'fingerprint': 'ghi789',
}
b_data = {'ttl': 30, 'value': b_value}
b = SshfpRecord(self.zone, 'b', b_data)
self.assertEquals(b_value['algorithm'], b.values[0].algorithm)
self.assertEquals(b_value['fingerprint_type'],
b.values[0].fingerprint_type)
self.assertEquals(b_value['fingerprint'], b.values[0].fingerprint)
self.assertEquals(b_data, b.data)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
# Diff in algorithm causes change
other = SshfpRecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
other.values[0].algorithm = 22
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in fingerprint_type causes change
other = SshfpRecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
other.values[0].algorithm = a.values[0].algorithm
other.values[0].fingerprint_type = 22
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in fingerprint causes change
other = SshfpRecord(self.zone, 'a', {'ttl': 30, 'values': a_values})
other.values[0].fingerprint_type = a.values[0].fingerprint_type
other.values[0].fingerprint = 22
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# __repr__ doesn't blow up
a.__repr__()
def test_spf(self):
a_values = ['spf1 -all', 'spf1 -hrm']
b_value = 'spf1 -other'
self.assertMultipleValues(SpfRecord, a_values, b_value)
def test_srv(self):
a_values = [{
'priority': 10,
'weight': 11,
'port': 12,
'target': 'server1',
}, {
'priority': 20,
'weight': 21,
'port': 22,
'target': 'server2',
}]
a_data = {'ttl': 30, 'values': a_values}
a = SrvRecord(self.zone, '_a._tcp', a_data)
self.assertEquals('_a._tcp', a.name)
self.assertEquals('_a._tcp.unit.tests.', a.fqdn)
self.assertEquals(30, a.ttl)
self.assertEquals(a_values[0]['priority'], a.values[0].priority)
self.assertEquals(a_values[0]['weight'], a.values[0].weight)
self.assertEquals(a_values[0]['port'], a.values[0].port)
self.assertEquals(a_values[0]['target'], a.values[0].target)
self.assertEquals(a_data, a.data)
b_value = {
'priority': 30,
'weight': 31,
'port': 32,
'target': 'server3',
}
b_data = {'ttl': 30, 'value': b_value}
b = SrvRecord(self.zone, '_b._tcp', b_data)
self.assertEquals(b_value['priority'], b.values[0].priority)
self.assertEquals(b_value['weight'], b.values[0].weight)
self.assertEquals(b_value['port'], b.values[0].port)
self.assertEquals(b_value['target'], b.values[0].target)
self.assertEquals(b_data, b.data)
target = SimpleProvider()
# No changes with self
self.assertFalse(a.changes(a, target))
# Diff in priority causes change
other = SrvRecord(self.zone, '_a._icmp',
{'ttl': 30, 'values': a_values})
other.values[0].priority = 22
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in weight causes change
other.values[0].priority = a.values[0].priority
other.values[0].weight = 33
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in port causes change
other.values[0].weight = a.values[0].weight
other.values[0].port = 44
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# Diff in target causes change
other.values[0].port = a.values[0].port
other.values[0].target = 'serverX'
change = a.changes(other, target)
self.assertEqual(change.existing, a)
self.assertEqual(change.new, other)
# __repr__ doesn't blow up
a.__repr__()
def test_txt(self):
a_values = ['a one', 'a two']
b_value = 'b other'
self.assertMultipleValues(TxtRecord, a_values, b_value)
def test_record_new(self):
txt = Record.new(self.zone, 'txt', {
'ttl': 44,
'type': 'TXT',
'value': 'some text',
})
self.assertIsInstance(txt, TxtRecord)
self.assertEquals('TXT', txt._type)
self.assertEquals(['some text'], txt.values)
# Missing type
with self.assertRaises(Exception) as ctx:
Record.new(self.zone, 'unknown', {})
self.assertTrue('missing type' in ctx.exception.message)
# Unknown type
with self.assertRaises(Exception) as ctx:
Record.new(self.zone, 'unknown', {
'type': 'XXX',
})
self.assertTrue('Unknown record type' in ctx.exception.message)
def test_change(self):
existing = Record.new(self.zone, 'txt', {
'ttl': 44,
'type': 'TXT',
'value': 'some text',
})
new = Record.new(self.zone, 'txt', {
'ttl': 44,
'type': 'TXT',
'value': 'some change',
})
create = Create(new)
self.assertEquals(new.values, create.record.values)
update = Update(existing, new)
self.assertEquals(new.values, update.record.values)
delete = Delete(existing)
self.assertEquals(existing.values, delete.record.values)
def test_geo_value(self):
code = 'NA-US-CA'
values = ['1.2.3.4']
geo = GeoValue(code, values)
self.assertEquals(code, geo.code)
self.assertEquals('NA', geo.continent_code)
self.assertEquals('US', geo.country_code)
self.assertEquals('CA', geo.subdivision_code)
self.assertEquals(values, geo.values)
self.assertEquals(['NA-US', 'NA'], list(geo.parents))
def test_healthcheck(self):
new = Record.new(self.zone, 'a', {
'ttl': 44,
'type': 'A',
'value': '1.2.3.4',
'octodns': {
'healthcheck': {
'path': '/_ready',
'host': 'bleep.bloop',
'protocol': 'HTTP',
'port': 8080,
}
}
})
self.assertEquals('/_ready', new.healthcheck_path)
self.assertEquals('bleep.bloop', new.healthcheck_host)
self.assertEquals('HTTP', new.healthcheck_protocol)
self.assertEquals(8080, new.healthcheck_port)
new = Record.new(self.zone, 'a', {
'ttl': 44,
'type': 'A',
'value': '1.2.3.4',
})
self.assertEquals('/_dns', new.healthcheck_path)
self.assertEquals('a.unit.tests', new.healthcheck_host)
self.assertEquals('HTTPS', new.healthcheck_protocol)
self.assertEquals(443, new.healthcheck_port)
def test_inored(self):
new = Record.new(self.zone, 'txt', {
'ttl': 44,
'type': 'TXT',
'value': 'some change',
'octodns': {
'ignored': True,
}
})
self.assertTrue(new.ignored)
new = Record.new(self.zone, 'txt', {
'ttl': 44,
'type': 'TXT',
'value': 'some change',
'octodns': {
'ignored': False,
}
})
self.assertFalse(new.ignored)
new = Record.new(self.zone, 'txt', {
'ttl': 44,
'type': 'TXT',
'value': 'some change',
})
self.assertFalse(new.ignored)
class TestRecordValidation(TestCase):
zone = Zone('unit.tests.', [])
def test_base(self):
# no ttl
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'A',
'value': '1.2.3.4',
})
self.assertEquals(['missing ttl'], ctx.exception.reasons)
# invalid ttl
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'A',
'ttl': -1,
'value': '1.2.3.4',
})
self.assertEquals('www.unit.tests.', ctx.exception.fqdn)
self.assertEquals(['invalid ttl'], ctx.exception.reasons)
# no exception if we're in lenient mode
Record.new(self.zone, 'www', {
'type': 'A',
'ttl': -1,
'value': '1.2.3.4',
}, lenient=True)
# __init__ may still blow up, even if validation is lenient
with self.assertRaises(KeyError) as ctx:
Record.new(self.zone, 'www', {
'type': 'A',
'ttl': -1,
}, lenient=True)
self.assertEquals(('value',), ctx.exception.args)
# no exception if we're in lenient mode from config
Record.new(self.zone, 'www', {
'octodns': {
'lenient': True
},
'type': 'A',
'ttl': -1,
'value': '1.2.3.4',
}, lenient=True)
def test_A_and_values_mixin(self):
# doesn't blow up
Record.new(self.zone, '', {
'type': 'A',
'ttl': 600,
'value': '1.2.3.4',
})
Record.new(self.zone, '', {
'type': 'A',
'ttl': 600,
'values': [
'1.2.3.4',
]
})
Record.new(self.zone, '', {
'type': 'A',
'ttl': 600,
'values': [
'1.2.3.4',
'1.2.3.5',
]
})
# missing value(s), no value or value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'A',
'ttl': 600,
})
self.assertEquals(['missing value(s)'], ctx.exception.reasons)
# missing value(s), empty values
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'A',
'ttl': 600,
'values': []
})
self.assertEquals(['missing value(s)'], ctx.exception.reasons)
# missing value(s), None values
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'A',
'ttl': 600,
'values': None
})
self.assertEquals(['missing value(s)'], ctx.exception.reasons)
# missing value(s) and empty value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'A',
'ttl': 600,
'values': [None, '']
})
self.assertEquals(['missing value(s)',
'empty value'], ctx.exception.reasons)
# missing value(s), None value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'A',
'ttl': 600,
'value': None
})
self.assertEquals(['missing value(s)'], ctx.exception.reasons)
# empty value, empty string value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'A',
'ttl': 600,
'value': ''
})
self.assertEquals(['empty value'], ctx.exception.reasons)
# missing value(s) & ttl
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'A',
})
self.assertEquals(['missing ttl', 'missing value(s)'],
ctx.exception.reasons)
# invalid ipv4 address
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'A',
'ttl': 600,
'value': 'hello'
})
self.assertEquals(['invalid IPv4 address "hello"'],
ctx.exception.reasons)
# invalid ipv4 addresses
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'A',
'ttl': 600,
'values': ['hello', 'goodbye']
})
self.assertEquals([
'invalid IPv4 address "hello"',
'invalid IPv4 address "goodbye"'
], ctx.exception.reasons)
# invalid & valid ipv4 addresses, no ttl
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'A',
'values': ['1.2.3.4', 'hello', '5.6.7.8']
})
self.assertEquals([
'missing ttl',
'invalid IPv4 address "hello"',
], ctx.exception.reasons)
def test_AAAA_validation(self):
# doesn't blow up
Record.new(self.zone, '', {
'type': 'AAAA',
'ttl': 600,
'value': '2601:644:500:e210:62f8:1dff:feb8:947a',
})
Record.new(self.zone, '', {
'type': 'AAAA',
'ttl': 600,
'values': [
'2601:644:500:e210:62f8:1dff:feb8:947a',
]
})
Record.new(self.zone, '', {
'type': 'AAAA',
'ttl': 600,
'values': [
'2601:644:500:e210:62f8:1dff:feb8:947a',
'2601:642:500:e210:62f8:1dff:feb8:947a',
]
})
# missing value(s), no value or value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'AAAA',
'ttl': 600,
})
self.assertEquals(['missing value(s)'], ctx.exception.reasons)
# missing value(s), empty values
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'AAAA',
'ttl': 600,
'values': []
})
self.assertEquals(['missing value(s)'], ctx.exception.reasons)
# missing value(s), None values
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'AAAA',
'ttl': 600,
'values': None
})
self.assertEquals(['missing value(s)'], ctx.exception.reasons)
# missing value(s) and empty value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'AAAA',
'ttl': 600,
'values': [None, '']
})
self.assertEquals(['missing value(s)',
'empty value'], ctx.exception.reasons)
# missing value(s), None value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'AAAA',
'ttl': 600,
'value': None
})
self.assertEquals(['missing value(s)'], ctx.exception.reasons)
# empty value, empty string value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'AAAA',
'ttl': 600,
'value': ''
})
self.assertEquals(['empty value'], ctx.exception.reasons)
# missing value(s) & ttl
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'AAAA',
})
self.assertEquals(['missing ttl', 'missing value(s)'],
ctx.exception.reasons)
# invalid IPv6 address
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'AAAA',
'ttl': 600,
'value': 'hello'
})
self.assertEquals(['invalid IPv6 address "hello"'],
ctx.exception.reasons)
# invalid IPv6 addresses
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'AAAA',
'ttl': 600,
'values': ['hello', 'goodbye']
})
self.assertEquals([
'invalid IPv6 address "hello"',
'invalid IPv6 address "goodbye"'
], ctx.exception.reasons)
# invalid & valid IPv6 addresses, no ttl
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'AAAA',
'values': [
'2601:644:500:e210:62f8:1dff:feb8:947a',
'hello',
'2601:642:500:e210:62f8:1dff:feb8:947a'
]
})
self.assertEquals([
'missing ttl',
'invalid IPv6 address "hello"',
], ctx.exception.reasons)
def test_geo(self):
Record.new(self.zone, '', {
'geo': {
'NA': ['1.2.3.5'],
'NA-US': ['1.2.3.5', '1.2.3.6']
},
'type': 'A',
'ttl': 600,
'value': '1.2.3.4',
})
# invalid ip address
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'geo': {
'NA': ['hello'],
'NA-US': ['1.2.3.5', '1.2.3.6']
},
'type': 'A',
'ttl': 600,
'value': '1.2.3.4',
})
self.assertEquals(['invalid IPv4 address "hello"'],
ctx.exception.reasons)
# invalid geo code
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'geo': {
'XYZ': ['1.2.3.4'],
},
'type': 'A',
'ttl': 600,
'value': '1.2.3.4',
})
self.assertEquals(['invalid geo "XYZ"'], ctx.exception.reasons)
# invalid ip address
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'geo': {
'NA': ['hello'],
'NA-US': ['1.2.3.5', 'goodbye']
},
'type': 'A',
'ttl': 600,
'value': '1.2.3.4',
})
self.assertEquals([
'invalid IPv4 address "hello"',
'invalid IPv4 address "goodbye"'
], ctx.exception.reasons)
# invalid healthcheck protocol
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'a', {
'geo': {
'NA': ['1.2.3.5'],
'NA-US': ['1.2.3.5', '1.2.3.6']
},
'type': 'A',
'ttl': 600,
'value': '1.2.3.4',
'octodns': {
'healthcheck': {
'protocol': 'FTP',
}
}
})
self.assertEquals(['invalid healthcheck protocol'],
ctx.exception.reasons)
def test_AAAA(self):
# doesn't blow up
Record.new(self.zone, '', {
'type': 'AAAA',
'ttl': 600,
'value': '2601:644:500:e210:62f8:1dff:feb8:947a',
})
Record.new(self.zone, '', {
'type': 'AAAA',
'ttl': 600,
'values': [
'2601:644:500:e210:62f8:1dff:feb8:947a',
'2601:644:500:e210:62f8:1dff:feb8:947b',
]
})
# invalid ip address
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'AAAA',
'ttl': 600,
'value': 'hello'
})
self.assertEquals(['invalid IPv6 address "hello"'],
ctx.exception.reasons)
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'AAAA',
'ttl': 600,
'values': [
'1.2.3.4',
'2.3.4.5',
],
})
self.assertEquals([
'invalid IPv6 address "1.2.3.4"',
'invalid IPv6 address "2.3.4.5"',
], ctx.exception.reasons)
# invalid ip addresses
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'AAAA',
'ttl': 600,
'values': ['hello', 'goodbye']
})
self.assertEquals([
'invalid IPv6 address "hello"',
'invalid IPv6 address "goodbye"'
], ctx.exception.reasons)
def test_ALIAS_and_value_mixin(self):
# doesn't blow up
Record.new(self.zone, '', {
'type': 'ALIAS',
'ttl': 600,
'value': 'foo.bar.com.',
})
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'ALIAS',
'ttl': 600,
})
self.assertEquals(['missing value'], ctx.exception.reasons)
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'ALIAS',
'ttl': 600,
'value': None
})
self.assertEquals(['missing value'], ctx.exception.reasons)
# empty value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'ALIAS',
'ttl': 600,
'value': ''
})
self.assertEquals(['empty value'], ctx.exception.reasons)
# missing trailing .
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'ALIAS',
'ttl': 600,
'value': 'foo.bar.com',
})
self.assertEquals(['ALIAS value "foo.bar.com" missing trailing .'],
ctx.exception.reasons)
def test_CAA(self):
# doesn't blow up
Record.new(self.zone, '', {
'type': 'CAA',
'ttl': 600,
'value': {
'flags': 128,
'tag': 'iodef',
'value': 'http://foo.bar.com/'
}
})
# invalid flags
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'CAA',
'ttl': 600,
'value': {
'flags': -42,
'tag': 'iodef',
'value': 'http://foo.bar.com/',
}
})
self.assertEquals(['invalid flags "-42"'], ctx.exception.reasons)
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'CAA',
'ttl': 600,
'value': {
'flags': 442,
'tag': 'iodef',
'value': 'http://foo.bar.com/',
}
})
self.assertEquals(['invalid flags "442"'], ctx.exception.reasons)
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'CAA',
'ttl': 600,
'value': {
'flags': 'nope',
'tag': 'iodef',
'value': 'http://foo.bar.com/',
}
})
self.assertEquals(['invalid flags "nope"'], ctx.exception.reasons)
# missing tag
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'CAA',
'ttl': 600,
'value': {
'value': 'http://foo.bar.com/',
}
})
self.assertEquals(['missing tag'], ctx.exception.reasons)
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'CAA',
'ttl': 600,
'value': {
'tag': 'iodef',
}
})
self.assertEquals(['missing value'], ctx.exception.reasons)
def test_CNAME(self):
# doesn't blow up
Record.new(self.zone, 'www', {
'type': 'CNAME',
'ttl': 600,
'value': 'foo.bar.com.',
})
# root cname is a no-no
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'CNAME',
'ttl': 600,
'value': 'foo.bar.com.',
})
self.assertEquals(['root CNAME not allowed'], ctx.exception.reasons)
# missing trailing .
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'www', {
'type': 'CNAME',
'ttl': 600,
'value': 'foo.bar.com',
})
self.assertEquals(['CNAME value "foo.bar.com" missing trailing .'],
ctx.exception.reasons)
def test_MX(self):
# doesn't blow up
Record.new(self.zone, '', {
'type': 'MX',
'ttl': 600,
'value': {
'preference': 10,
'exchange': 'foo.bar.com.'
}
})
# missing preference
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'MX',
'ttl': 600,
'value': {
'exchange': 'foo.bar.com.'
}
})
self.assertEquals(['missing preference'], ctx.exception.reasons)
# invalid preference
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'MX',
'ttl': 600,
'value': {
'preference': 'nope',
'exchange': 'foo.bar.com.'
}
})
self.assertEquals(['invalid preference "nope"'], ctx.exception.reasons)
# missing exchange
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'MX',
'ttl': 600,
'value': {
'preference': 10,
}
})
self.assertEquals(['missing exchange'], ctx.exception.reasons)
# missing trailing .
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'MX',
'ttl': 600,
'value': {
'preference': 10,
'exchange': 'foo.bar.com'
}
})
self.assertEquals(['MX value "foo.bar.com" missing trailing .'],
ctx.exception.reasons)
def test_NXPTR(self):
# doesn't blow up
Record.new(self.zone, '', {
'type': 'NAPTR',
'ttl': 600,
'value': {
'order': 10,
'preference': 20,
'flags': 'S',
'service': 'srv',
'regexp': '.*',
'replacement': '.'
}
})
# missing X priority
value = {
'order': 10,
'preference': 20,
'flags': 'S',
'service': 'srv',
'regexp': '.*',
'replacement': '.'
}
for k in ('order', 'preference', 'flags', 'service', 'regexp',
'replacement'):
v = dict(value)
del v[k]
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'NAPTR',
'ttl': 600,
'value': v
})
self.assertEquals(['missing {}'.format(k)], ctx.exception.reasons)
# non-int order
v = dict(value)
v['order'] = 'boo'
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'NAPTR',
'ttl': 600,
'value': v
})
self.assertEquals(['invalid order "boo"'], ctx.exception.reasons)
# non-int preference
v = dict(value)
v['preference'] = 'who'
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'NAPTR',
'ttl': 600,
'value': v
})
self.assertEquals(['invalid preference "who"'], ctx.exception.reasons)
# unrecognized flags
v = dict(value)
v['flags'] = 'X'
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'NAPTR',
'ttl': 600,
'value': v
})
self.assertEquals(['unrecognized flags "X"'], ctx.exception.reasons)
def test_NS(self):
# doesn't blow up
Record.new(self.zone, '', {
'type': 'NS',
'ttl': 600,
'values': [
'foo.bar.com.',
'1.2.3.4.'
]
})
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'NS',
'ttl': 600,
})
self.assertEquals(['missing value(s)'], ctx.exception.reasons)
# no trailing .
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'NS',
'ttl': 600,
'value': 'foo.bar',
})
self.assertEquals(['NS value "foo.bar" missing trailing .'],
ctx.exception.reasons)
def test_PTR(self):
# doesn't blow up (name & zone here don't make any sense, but not
# important)
Record.new(self.zone, '', {
'type': 'PTR',
'ttl': 600,
'value': 'foo.bar.com.',
})
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'PTR',
'ttl': 600,
})
self.assertEquals(['missing value'], ctx.exception.reasons)
# no trailing .
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'PTR',
'ttl': 600,
'value': 'foo.bar',
})
self.assertEquals(['PTR value "foo.bar" missing trailing .'],
ctx.exception.reasons)
def test_SSHFP(self):
# doesn't blow up
Record.new(self.zone, '', {
'type': 'SSHFP',
'ttl': 600,
'value': {
'algorithm': 1,
'fingerprint_type': 1,
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73'
}
})
# missing algorithm
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'SSHFP',
'ttl': 600,
'value': {
'fingerprint_type': 1,
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73'
}
})
self.assertEquals(['missing algorithm'], ctx.exception.reasons)
# invalid algorithm
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'SSHFP',
'ttl': 600,
'value': {
'algorithm': 'nope',
'fingerprint_type': 2,
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73'
}
})
self.assertEquals(['invalid algorithm "nope"'], ctx.exception.reasons)
# unrecognized algorithm
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'SSHFP',
'ttl': 600,
'value': {
'algorithm': 42,
'fingerprint_type': 1,
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73'
}
})
self.assertEquals(['unrecognized algorithm "42"'],
ctx.exception.reasons)
# missing fingerprint_type
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'SSHFP',
'ttl': 600,
'value': {
'algorithm': 2,
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73'
}
})
self.assertEquals(['missing fingerprint_type'], ctx.exception.reasons)
# invalid fingerprint_type
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'SSHFP',
'ttl': 600,
'value': {
'algorithm': 3,
'fingerprint_type': 'yeeah',
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73'
}
})
self.assertEquals(['invalid fingerprint_type "yeeah"'],
ctx.exception.reasons)
# unrecognized fingerprint_type
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'SSHFP',
'ttl': 600,
'value': {
'algorithm': 1,
'fingerprint_type': 42,
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73'
}
})
self.assertEquals(['unrecognized fingerprint_type "42"'],
ctx.exception.reasons)
# missing fingerprint
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'SSHFP',
'ttl': 600,
'value': {
'algorithm': 1,
'fingerprint_type': 1,
}
})
self.assertEquals(['missing fingerprint'], ctx.exception.reasons)
def test_SPF(self):
# doesn't blow up (name & zone here don't make any sense, but not
# important)
Record.new(self.zone, '', {
'type': 'SPF',
'ttl': 600,
'values': [
'v=spf1 ip4:192.168.0.1/16-all',
'v=spf1 ip4:10.1.2.1/24-all',
'this has some\\; semi-colons\\; in it',
]
})
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'SPF',
'ttl': 600,
})
self.assertEquals(['missing value(s)'], ctx.exception.reasons)
# missing escapes
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'SPF',
'ttl': 600,
'value': 'this has some; semi-colons\\; in it',
})
self.assertEquals(['unescaped ; in "this has some; '
'semi-colons\\; in it"'], ctx.exception.reasons)
def test_SRV(self):
# doesn't blow up
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
'target': 'foo.bar.baz.'
}
})
# invalid name
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'neup', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
'target': 'foo.bar.baz.'
}
})
self.assertEquals(['invalid name'], ctx.exception.reasons)
# missing priority
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'weight': 2,
'port': 3,
'target': 'foo.bar.baz.'
}
})
self.assertEquals(['missing priority'], ctx.exception.reasons)
# invalid priority
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 'foo',
'weight': 2,
'port': 3,
'target': 'foo.bar.baz.'
}
})
self.assertEquals(['invalid priority "foo"'], ctx.exception.reasons)
# missing weight
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'port': 3,
'target': 'foo.bar.baz.'
}
})
self.assertEquals(['missing weight'], ctx.exception.reasons)
# invalid weight
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 'foo',
'port': 3,
'target': 'foo.bar.baz.'
}
})
self.assertEquals(['invalid weight "foo"'], ctx.exception.reasons)
# missing port
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'target': 'foo.bar.baz.'
}
})
self.assertEquals(['missing port'], ctx.exception.reasons)
# invalid port
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 'foo',
'target': 'foo.bar.baz.'
}
})
self.assertEquals(['invalid port "foo"'], ctx.exception.reasons)
# missing target
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
}
})
self.assertEquals(['missing target'], ctx.exception.reasons)
# invalid target
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '_srv._tcp', {
'type': 'SRV',
'ttl': 600,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
'target': 'foo.bar.baz'
}
})
self.assertEquals(['SRV value "foo.bar.baz" missing trailing .'],
ctx.exception.reasons)
def test_TXT(self):
# doesn't blow up (name & zone here don't make any sense, but not
# important)
Record.new(self.zone, '', {
'type': 'TXT',
'ttl': 600,
'values': [
'hello world',
'this has some\\; semi-colons\\; in it',
]
})
# missing value
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'TXT',
'ttl': 600,
})
self.assertEquals(['missing value(s)'], ctx.exception.reasons)
# missing escapes
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {
'type': 'TXT',
'ttl': 600,
'value': 'this has some; semi-colons\\; in it',
})
self.assertEquals(['unescaped ; in "this has some; semi-colons\\; '
'in it"'], ctx.exception.reasons)
def test_TXT_long_value_chunking(self):
expected = '"Lorem ipsum dolor sit amet, consectetur adipiscing ' \
'elit, sed do eiusmod tempor incididunt ut labore et dolore ' \
'magna aliqua. Ut enim ad minim veniam, quis nostrud ' \
'exercitation ullamco laboris nisi ut aliquip ex ea commodo ' \
'consequat. Duis aute irure dolor i" "n reprehenderit in ' \
'voluptate velit esse cillum dolore eu fugiat nulla pariatur. ' \
'Excepteur sint occaecat cupidatat non proident, sunt in culpa ' \
'qui officia deserunt mollit anim id est laborum."'
long_value = 'Lorem ipsum dolor sit amet, consectetur adipiscing ' \
'elit, sed do eiusmod tempor incididunt ut labore et dolore ' \
'magna aliqua. Ut enim ad minim veniam, quis nostrud ' \
'exercitation ullamco laboris nisi ut aliquip ex ea commodo ' \
'consequat. Duis aute irure dolor in reprehenderit in ' \
'voluptate velit esse cillum dolore eu fugiat nulla ' \
'pariatur. Excepteur sint occaecat cupidatat non proident, ' \
'sunt in culpa qui officia deserunt mollit anim id est ' \
'laborum.'
# Single string
single = Record.new(self.zone, '', {
'type': 'TXT',
'ttl': 600,
'values': [
'hello world',
long_value,
'this has some\\; semi-colons\\; in it',
]
})
self.assertEquals(3, len(single.values))
self.assertEquals(3, len(single.chunked_values))
# Note we are checking that this normalizes the chunking, not that we
# get out what we put in.
self.assertEquals(expected, single.chunked_values[0])
long_split_value = '"Lorem ipsum dolor sit amet, consectetur ' \
'adipiscing elit, sed do eiusmod tempor incididunt ut ' \
'labore et dolore magna aliqua. Ut enim ad minim veniam, ' \
'quis nostrud exercitation ullamco laboris nisi ut aliquip ' \
'ex" " ea commodo consequat. Duis aute irure dolor in ' \
'reprehenderit in voluptate velit esse cillum dolore eu ' \
'fugiat nulla pariatur. Excepteur sint occaecat cupidatat ' \
'non proident, sunt in culpa qui officia deserunt mollit ' \
'anim id est laborum."'
# Chunked
chunked = Record.new(self.zone, '', {
'type': 'TXT',
'ttl': 600,
'values': [
'"hello world"',
long_split_value,
'"this has some\\; semi-colons\\; in it"',
]
})
self.assertEquals(expected, chunked.chunked_values[0])
# should be single values, no quoting
self.assertEquals(single.values, chunked.values)
# should be chunked values, with quoting
self.assertEquals(single.chunked_values, chunked.chunked_values)
class TestDynamicRecords(TestCase):
zone = Zone('unit.tests.', [])
def test_simple_a_weighted(self):
a_data = {
'dynamic': {
'pools': {
'one': {
'values': [{
'value': '3.3.3.3',
}],
},
'two': {
# Testing out of order value sorting here
'values': [{
'value': '5.5.5.5',
}, {
'value': '4.4.4.4',
}],
},
'three': {
'values': [{
'weight': 10,
'value': '4.4.4.4',
}, {
'weight': 12,
'value': '5.5.5.5',
}],
},
},
'rules': [{
'geos': ['AF', 'EU'],
'pool': 'three',
}, {
'geos': ['NA-US-CA'],
'pool': 'two',
}, {
'pool': 'one',
}],
},
'ttl': 60,
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
a = ARecord(self.zone, 'weighted', a_data)
self.assertEquals('A', a._type)
self.assertEquals(a_data['ttl'], a.ttl)
self.assertEquals(a_data['values'], a.values)
dynamic = a.dynamic
self.assertTrue(dynamic)
pools = dynamic.pools
self.assertTrue(pools)
self.assertEquals({
'value': '3.3.3.3',
'weight': 1,
}, pools['one'].data['values'][0])
self.assertEquals([{
'value': '4.4.4.4',
'weight': 1,
}, {
'value': '5.5.5.5',
'weight': 1,
}], pools['two'].data['values'])
self.assertEquals([{
'weight': 10,
'value': '4.4.4.4',
}, {
'weight': 12,
'value': '5.5.5.5',
}], pools['three'].data['values'])
rules = dynamic.rules
self.assertTrue(rules)
self.assertEquals(a_data['dynamic']['rules'][0], rules[0].data)
def test_simple_aaaa_weighted(self):
aaaa_data = {
'dynamic': {
'pools': {
'one': '2601:642:500:e210:62f8:1dff:feb8:9473',
'two': [
'2601:642:500:e210:62f8:1dff:feb8:9474',
'2601:642:500:e210:62f8:1dff:feb8:9475',
],
'three': {
1: '2601:642:500:e210:62f8:1dff:feb8:9476',
2: '2601:642:500:e210:62f8:1dff:feb8:9477',
},
},
'rules': [{
'pools': [
'three',
'two',
'one',
],
}],
},
'ttl': 60,
'values': [
'2601:642:500:e210:62f8:1dff:feb8:9471',
'2601:642:500:e210:62f8:1dff:feb8:9472',
],
}
aaaa_data = {
'dynamic': {
'pools': {
'one': {
'values': [{
'value': '2601:642:500:e210:62f8:1dff:feb8:9473',
}],
},
'two': {
# Testing out of order value sorting here
'values': [{
'value': '2601:642:500:e210:62f8:1dff:feb8:9475',
}, {
'value': '2601:642:500:e210:62f8:1dff:feb8:9474',
}],
},
'three': {
'values': [{
'weight': 10,
'value': '2601:642:500:e210:62f8:1dff:feb8:9476',
}, {
'weight': 12,
'value': '2601:642:500:e210:62f8:1dff:feb8:9477',
}],
},
},
'rules': [{
'geos': ['AF', 'EU'],
'pool': 'three',
}, {
'geos': ['NA-US-CA'],
'pool': 'two',
}, {
'pool': 'one',
}],
},
'ttl': 60,
'values': [
'2601:642:500:e210:62f8:1dff:feb8:9471',
'2601:642:500:e210:62f8:1dff:feb8:9472',
],
}
aaaa = AaaaRecord(self.zone, 'weighted', aaaa_data)
self.assertEquals('AAAA', aaaa._type)
self.assertEquals(aaaa_data['ttl'], aaaa.ttl)
self.assertEquals(aaaa_data['values'], aaaa.values)
dynamic = aaaa.dynamic
self.assertTrue(dynamic)
pools = dynamic.pools
self.assertTrue(pools)
self.assertEquals({
'value': '2601:642:500:e210:62f8:1dff:feb8:9473',
'weight': 1,
}, pools['one'].data['values'][0])
self.assertEquals([{
'value': '2601:642:500:e210:62f8:1dff:feb8:9474',
'weight': 1,
}, {
'value': '2601:642:500:e210:62f8:1dff:feb8:9475',
'weight': 1,
}], pools['two'].data['values'])
self.assertEquals([{
'weight': 10,
'value': '2601:642:500:e210:62f8:1dff:feb8:9476',
}, {
'weight': 12,
'value': '2601:642:500:e210:62f8:1dff:feb8:9477',
}], pools['three'].data['values'])
rules = dynamic.rules
self.assertTrue(rules)
self.assertEquals(aaaa_data['dynamic']['rules'][0], rules[0].data)
def test_simple_cname_weighted(self):
cname_data = {
'dynamic': {
'pools': {
'one': {
'values': [{
'value': 'one.cname.target.',
}],
},
'two': {
'values': [{
'value': 'two.cname.target.',
}],
},
'three': {
'values': [{
'weight': 12,
'value': 'three-1.cname.target.',
}, {
'weight': 32,
'value': 'three-2.cname.target.',
}]
},
},
'rules': [{
'geos': ['AF', 'EU'],
'pool': 'three',
}, {
'geos': ['NA-US-CA'],
'pool': 'two',
}, {
'pool': 'one',
}],
},
'ttl': 60,
'value': 'cname.target.',
}
cname = CnameRecord(self.zone, 'weighted', cname_data)
self.assertEquals('CNAME', cname._type)
self.assertEquals(cname_data['ttl'], cname.ttl)
self.assertEquals(cname_data['value'], cname.value)
dynamic = cname.dynamic
self.assertTrue(dynamic)
pools = dynamic.pools
self.assertTrue(pools)
self.assertEquals({
'value': 'one.cname.target.',
'weight': 1,
}, pools['one'].data['values'][0])
self.assertEquals({
'value': 'two.cname.target.',
'weight': 1,
}, pools['two'].data['values'][0])
self.assertEquals([{
'value': 'three-1.cname.target.',
'weight': 12,
}, {
'value': 'three-2.cname.target.',
'weight': 32,
}], pools['three'].data['values'])
rules = dynamic.rules
self.assertTrue(rules)
self.assertEquals(cname_data['dynamic']['rules'][0], rules[0].data)
def test_dynamic_validation(self):
# Missing pools
a_data = {
'dynamic': {
'rules': [{
'pool': 'one',
}],
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'bad', a_data)
self.assertEquals(['missing pools', 'rule 1 undefined pool "one"'],
ctx.exception.reasons)
# Empty pools
a_data = {
'dynamic': {
'pools': {
},
'rules': [{
'pool': 'one',
}],
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'bad', a_data)
self.assertEquals(['missing pools', 'rule 1 undefined pool "one"'],
ctx.exception.reasons)
# pools not a dict
a_data = {
'dynamic': {
'pools': [],
'rules': [{
'pool': 'one',
}],
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'bad', a_data)
self.assertEquals(['pools must be a dict',
'rule 1 undefined pool "one"'],
ctx.exception.reasons)
# Invalid addresses
a_data = {
'dynamic': {
'pools': {
'one': {
'values': [{
'value': 'this-aint-right',
}],
},
'two': {
'fallback': 'one',
'values': [{
'value': '4.4.4.4',
}, {
'value': 'nor-is-this',
}]
},
'three': {
'fallback': 'two',
'values': [{
'weight': 1,
'value': '5.5.5.5',
}, {
'weight': 2,
'value': 'yet-another-bad-one',
}],
},
},
'rules': [{
'geos': ['AF', 'EU'],
'pool': 'three',
}, {
'geos': ['NA-US-CA'],
'pool': 'two',
}, {
'pool': 'one',
}],
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'bad', a_data)
self.assertEquals([
'invalid IPv4 address "this-aint-right"',
'invalid IPv4 address "yet-another-bad-one"',
'invalid IPv4 address "nor-is-this"',
], ctx.exception.reasons)
# missing value(s)
a_data = {
'dynamic': {
'pools': {
'one': {},
'two': {
'values': [{
'value': '4.4.4.4',
}, {
'value': '5.5.5.5',
}]
},
'three': {
'values': [{
'weight': 1,
'value': '6.6.6.6',
}, {
'weight': 2,
'value': '7.7.7.7',
}],
},
},
'rules': [{
'geos': ['AF', 'EU'],
'pool': 'three',
}, {
'geos': ['NA-US-CA'],
'pool': 'two',
}, {
'pool': 'one',
}],
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'bad', a_data)
self.assertEquals(['pool "one" is missing values'],
ctx.exception.reasons)
# pool valu not a dict
a_data = {
'dynamic': {
'pools': {
'one': '',
'two': {
'values': [{
'value': '4.4.4.4',
}, {
'value': '5.5.5.5',
}]
},
'three': {
'values': [{
'weight': 1,
'value': '6.6.6.6',
}, {
'weight': 2,
'value': '7.7.7.7',
}],
},
},
'rules': [{
'geos': ['AF', 'EU'],
'pool': 'three',
}, {
'geos': ['NA-US-CA'],
'pool': 'two',
}, {
'pool': 'one',
}],
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'bad', a_data)
self.assertEquals(['pool "one" must be a dict'],
ctx.exception.reasons)
# empty pool value
a_data = {
'dynamic': {
'pools': {
'one': {},
'two': {
'values': [{
'value': '4.4.4.4',
}, {
'value': '5.5.5.5',
}]
},
'three': {
'values': [{
'weight': 1,
'value': '6.6.6.6',
}, {
'weight': 2,
'value': '7.7.7.7',
}],
},
},
'rules': [{
'geos': ['AF', 'EU'],
'pool': 'three',
}, {
'geos': ['NA-US-CA'],
'pool': 'two',
}, {
'pool': 'one',
}],
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'bad', a_data)
self.assertEquals(['pool "one" is missing values'],
ctx.exception.reasons)
# invalid int weight
a_data = {
'dynamic': {
'pools': {
'one': {
'values': [{
'value': '3.3.3.3',
}]
},
'two': {
'values': [{
'value': '4.4.4.4',
}, {
'value': '5.5.5.5',
}]
},
'three': {
'values': [{
'weight': 1,
'value': '6.6.6.6',
}, {
'weight': 256,
'value': '7.7.7.7',
}],
},
},
'rules': [{
'geos': ['AF', 'EU'],
'pool': 'three',
}, {
'geos': ['NA-US-CA'],
'pool': 'two',
}, {
'pool': 'one',
}],
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'bad', a_data)
self.assertEquals(['invalid weight "256" in pool "three" value 2'],
ctx.exception.reasons)
# invalid non-int weight
a_data = {
'dynamic': {
'pools': {
'one': {
'values': [{
'value': '3.3.3.3',
}]
},
'two': {
'values': [{
'value': '4.4.4.4',
}, {
'value': '5.5.5.5',
}]
},
'three': {
'values': [{
'weight': 1,
'value': '6.6.6.6',
}, {
'weight': 'foo',
'value': '7.7.7.7',
}],
},
},
'rules': [{
'geos': ['AF', 'EU'],
'pool': 'three',
}, {
'geos': ['NA-US-CA'],
'pool': 'two',
}, {
'pool': 'one',
}],
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'bad', a_data)
self.assertEquals(['invalid weight "foo" in pool "three" value 2'],
ctx.exception.reasons)
# invalid fallback
a_data = {
'dynamic': {
'pools': {
'one': {
'values': [{
'value': '3.3.3.3',
}],
},
'two': {
'fallback': 'invalid',
'values': [{
'value': '4.4.4.4',
}, {
'value': '5.5.5.5',
}]
},
'three': {
'fallback': 'two',
'values': [{
'weight': 1,
'value': '6.6.6.6',
}, {
'weight': 5,
'value': '7.7.7.7',
}],
},
},
'rules': [{
'geos': ['AF', 'EU'],
'pool': 'three',
}, {
'geos': ['NA-US-CA'],
'pool': 'two',
}, {
'pool': 'one',
}],
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'bad', a_data)
self.assertEquals(['undefined fallback "invalid" for pool "two"'],
ctx.exception.reasons)
# fallback loop
a_data = {
'dynamic': {
'pools': {
'one': {
'fallback': 'three',
'values': [{
'value': '3.3.3.3',
}],
},
'two': {
'fallback': 'one',
'values': [{
'value': '4.4.4.4',
}, {
'value': '5.5.5.5',
}]
},
'three': {
'fallback': 'two',
'values': [{
'weight': 1,
'value': '6.6.6.6',
}, {
'weight': 5,
'value': '7.7.7.7',
}],
},
},
'rules': [{
'geos': ['AF', 'EU'],
'pool': 'three',
}, {
'geos': ['NA-US-CA'],
'pool': 'two',
}, {
'pool': 'one',
}],
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'bad', a_data)
self.assertEquals([
'loop in pool fallbacks: one -> three -> two',
'loop in pool fallbacks: three -> two -> one',
'loop in pool fallbacks: two -> one -> three'
], ctx.exception.reasons)
# multiple pool problems
a_data = {
'dynamic': {
'pools': {
'one': '',
'two': {
'values': [{
'value': '4.4.4.4',
}, {
'value': 'blip',
}]
},
'three': {
'values': [{
'weight': 1,
}, {
'weight': 5000,
'value': '7.7.7.7',
}],
},
},
'rules': [{
'geos': ['AF', 'EU'],
'pool': 'three',
}, {
'geos': ['NA-US-CA'],
'pool': 'two',
}, {
'pool': 'one',
}],
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'bad', a_data)
self.assertEquals([
'pool "one" must be a dict',
'missing value in pool "three" value 1',
'invalid weight "5000" in pool "three" value 2',
'invalid IPv4 address "blip"',
], ctx.exception.reasons)
# missing rules
a_data = {
'dynamic': {
'pools': {
'one': {
'values': [{
'value': '3.3.3.3',
}]
},
'two': {
'values': [{
'value': '4.4.4.4',
}, {
'value': '5.5.5.5',
}]
},
},
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'bad', a_data)
self.assertEquals(['missing rules'], ctx.exception.reasons)
# empty rules
a_data = {
'dynamic': {
'pools': {
'one': {
'values': [{
'value': '3.3.3.3',
}]
},
'two': {
'values': [{
'value': '4.4.4.4',
}, {
'value': '5.5.5.5',
}]
},
},
'rules': [],
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'bad', a_data)
self.assertEquals(['missing rules'], ctx.exception.reasons)
# rules not a list/tuple
a_data = {
'dynamic': {
'pools': {
'one': {
'values': [{
'value': '3.3.3.3',
}]
},
'two': {
'values': [{
'value': '4.4.4.4',
}, {
'value': '5.5.5.5',
}]
},
},
'rules': {},
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'bad', a_data)
self.assertEquals(['rules must be a list'], ctx.exception.reasons)
# rule without pool
a_data = {
'dynamic': {
'pools': {
'one': {
'values': [{
'value': '3.3.3.3',
}],
},
'two': {
'values': [{
'value': '4.4.4.4',
}, {
'value': '5.5.5.5',
}]
},
},
'rules': [{
'geos': ['NA-US-CA'],
}, {
'pool': 'one',
}],
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'bad', a_data)
self.assertEquals(['rule 1 missing pool'], ctx.exception.reasons)
# rule with non-string pools
a_data = {
'dynamic': {
'pools': {
'one': {
'values': [{
'value': '3.3.3.3',
}]
},
'two': {
'values': [{
'value': '4.4.4.4',
}, {
'value': '5.5.5.5',
}]
},
},
'rules': [{
'geos': ['NA-US-CA'],
'pool': [],
}, {
'pool': 'one',
}],
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'bad', a_data)
self.assertEquals(['rule 1 invalid pool "[]"'],
ctx.exception.reasons)
# rule references non-existant pool
a_data = {
'dynamic': {
'pools': {
'one': {
'values': [{
'value': '3.3.3.3',
}]
},
'two': {
'values': [{
'value': '4.4.4.4',
}, {
'value': '5.5.5.5',
}]
},
},
'rules': [{
'geos': ['NA-US-CA'],
'pool': 'non-existant',
}, {
'pool': 'one',
}],
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'bad', a_data)
self.assertEquals(["rule 1 undefined pool \"non-existant\""],
ctx.exception.reasons)
# rule with invalid geos
a_data = {
'dynamic': {
'pools': {
'one': {
'values': [{
'value': '3.3.3.3',
}]
},
'two': {
'values': [{
'value': '4.4.4.4',
}, {
'value': '5.5.5.5',
}]
},
},
'rules': [{
'geos': 'NA-US-CA',
'pool': 'two',
}, {
'pool': 'one',
}],
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'bad', a_data)
self.assertEquals(['rule 1 geos must be a list'],
ctx.exception.reasons)
# rule with invalid geo
a_data = {
'dynamic': {
'pools': {
'one': {
'values': [{
'value': '3.3.3.3',
}]
},
'two': {
'values': [{
'value': '4.4.4.4',
}, {
'value': '5.5.5.5',
}]
},
},
'rules': [{
'geos': ['invalid'],
'pool': 'two',
}, {
'pool': 'one',
}],
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'bad', a_data)
self.assertEquals(['rule 1 unknown continent code "invalid"'],
ctx.exception.reasons)
# multiple default rules
a_data = {
'dynamic': {
'pools': {
'one': {
'values': [{
'value': '3.3.3.3',
}]
},
'two': {
'values': [{
'value': '4.4.4.4',
}, {
'value': '5.5.5.5',
}]
},
},
'rules': [{
'pool': 'two',
}, {
'pool': 'one',
}],
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'bad', a_data)
self.assertEquals(['rule 2 duplicate default'],
ctx.exception.reasons)
def test_dynamic_lenient(self):
# Missing pools
a_data = {
'dynamic': {
'rules': [{
'geos': ['EU'],
'pool': 'two',
}, {
'pool': 'one',
}],
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
a = Record.new(self.zone, 'bad', a_data, lenient=True)
self.assertEquals({
'pools': {},
'rules': a_data['dynamic']['rules'],
}, a._data()['dynamic'])
# Missing rule
a_data = {
'dynamic': {
'pools': {
'one': {
'values': [{
'value': '3.3.3.3',
}]
},
'two': {
'values': [{
'value': '4.4.4.4',
}, {
'value': '5.5.5.5',
'weight': 2,
}]
},
},
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
a = Record.new(self.zone, 'bad', a_data, lenient=True)
self.assertEquals({
'pools': {
'one': {
'fallback': None,
'values': [{
'value': '3.3.3.3',
'weight': 1,
}]
},
'two': {
'fallback': None,
'values': [{
'value': '4.4.4.4',
'weight': 1,
}, {
'value': '5.5.5.5',
'weight': 2,
}]
},
},
'rules': [],
}, a._data()['dynamic'])
# rule without pool
a_data = {
'dynamic': {
'pools': {
'one': {
'values': [{
'value': '3.3.3.3',
}]
},
'two': {
'values': [{
'value': '4.4.4.4',
}, {
'value': '5.5.5.5',
'weight': 2,
}]
},
},
'rules': [{
'geos': ['EU'],
'pool': 'two',
}, {
}],
},
'ttl': 60,
'type': 'A',
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
a = Record.new(self.zone, 'bad', a_data, lenient=True)
self.assertEquals({
'pools': {
'one': {
'fallback': None,
'values': [{
'value': '3.3.3.3',
'weight': 1,
}]
},
'two': {
'fallback': None,
'values': [{
'value': '4.4.4.4',
'weight': 1,
}, {
'value': '5.5.5.5',
'weight': 2,
}]
},
},
'rules': a_data['dynamic']['rules'],
}, a._data()['dynamic'])
def test_dynamic_changes(self):
simple = SimpleProvider()
dynamic = DynamicProvider()
a_data = {
'dynamic': {
'pools': {
'one': {
'values': [{
'value': '3.3.3.3',
}]
},
'two': {
'values': [{
'value': '4.4.4.4',
}, {
'value': '5.5.5.5',
}]
},
},
'rules': [{
'geos': ['EU'],
'pool': 'two',
}, {
'pool': 'one',
}],
},
'ttl': 60,
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
a = ARecord(self.zone, 'weighted', a_data)
dup = ARecord(self.zone, 'weighted', a_data)
b_data = {
'dynamic': {
'pools': {
'one': {
'values': [{
'value': '3.3.3.3',
}]
},
'two': {
'values': [{
'value': '4.4.4.4',
'weight': 2,
}, {
'value': '5.5.5.5',
}]
},
},
'rules': [{
'geos': ['EU'],
'pool': 'two',
}, {
'pool': 'one',
}],
},
'ttl': 60,
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
b = ARecord(self.zone, 'weighted', b_data)
c_data = {
'dynamic': {
'pools': {
'one': {
'values': [{
'value': '3.3.3.3',
}]
},
'two': {
'values': [{
'value': '4.4.4.4',
}, {
'value': '5.5.5.5',
}]
},
},
'rules': [{
'geos': ['NA'],
'pool': 'two',
}, {
'pool': 'one',
}],
},
'ttl': 60,
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
c = ARecord(self.zone, 'weighted', c_data)
# a changes a (identical dup) is never true
self.assertFalse(a.changes(dup, simple))
self.assertFalse(a.changes(dup, dynamic))
# a changes b is not true for simple
self.assertFalse(a.changes(b, simple))
# but is true for dynamic
update = a.changes(b, dynamic)
self.assertEquals(a, update.existing)
self.assertEquals(b, update.new)
# transitive
self.assertFalse(b.changes(a, simple))
update = b.changes(a, dynamic)
self.assertEquals(a, update.existing)
self.assertEquals(b, update.new)
# same for a change c
self.assertFalse(a.changes(c, simple))
self.assertTrue(a.changes(c, dynamic))
self.assertFalse(c.changes(a, simple))
self.assertTrue(c.changes(a, dynamic))
# smoke test some of the equiality bits
self.assertEquals(a.dynamic.pools, a.dynamic.pools)
self.assertEquals(a.dynamic.pools['one'], a.dynamic.pools['one'])
self.assertNotEquals(a.dynamic.pools['one'], a.dynamic.pools['two'])
self.assertEquals(a.dynamic.rules, a.dynamic.rules)
self.assertEquals(a.dynamic.rules[0], a.dynamic.rules[0])
self.assertNotEquals(a.dynamic.rules[0], c.dynamic.rules[0])
def test_dynamic_and_geo_validation(self):
a_data = {
'dynamic': {
'pools': {
'one': {
'values': [{
'value': '3.3.3.3',
}],
},
'two': {
# Testing out of order value sorting here
'values': [{
'value': '5.5.5.5',
}, {
'value': '4.4.4.4',
}],
},
'three': {
'values': [{
'weight': 10,
'value': '4.4.4.4',
}, {
'weight': 12,
'value': '5.5.5.5',
}],
},
},
'rules': [{
'geos': ['AF', 'EU'],
'pool': 'three',
}, {
'geos': ['NA-US-CA'],
'pool': 'two',
}, {
'pool': 'one',
}],
},
'geo': {
'NA': ['1.2.3.5'],
'NA-US': ['1.2.3.5', '1.2.3.6']
},
'type': 'A',
'ttl': 60,
'values': [
'1.1.1.1',
'2.2.2.2',
],
}
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, 'bad', a_data)
self.assertEquals(['"dynamic" record with "geo" content'],
ctx.exception.reasons)
def test_dynamic_eqs(self):
pool_one = _DynamicPool('one', {
'values': [{
'value': '1.2.3.4',
}],
})
pool_two = _DynamicPool('two', {
'values': [{
'value': '1.2.3.5',
}],
})
self.assertEquals(pool_one, pool_one)
self.assertNotEquals(pool_one, pool_two)
self.assertNotEquals(pool_one, 42)
pools = {
'one': pool_one,
'two': pool_two,
}
rule_one = _DynamicRule(0, {
'pool': 'one',
})
rule_two = _DynamicRule(1, {
'pool': 'two',
})
self.assertEquals(rule_one, rule_one)
self.assertNotEquals(rule_one, rule_two)
self.assertNotEquals(rule_one, 42)
rules = [
rule_one,
rule_two,
]
dynamic = _Dynamic(pools, rules)
other = _Dynamic({}, [])
self.assertEquals(dynamic, dynamic)
self.assertNotEquals(dynamic, other)
self.assertNotEquals(dynamic, 42)