1
0
mirror of https://github.com/github/octodns.git synced 2024-05-11 05:55:00 +00:00

Merge branch 'main' into quiet-cmd-line

This commit is contained in:
Ross McFarland
2022-10-01 18:35:26 -07:00
committed by GitHub
3 changed files with 891 additions and 148 deletions

View File

@@ -2,6 +2,7 @@
#
#
from collections import defaultdict
from ipaddress import IPv4Address as _IPv4Address, IPv6Address as _IPv6Address
from logging import getLogger
import re
@@ -67,6 +68,11 @@ class RecordException(Exception):
pass
class RrParseError(RecordException):
def __init__(self, message='failed to parse string value as RR text'):
super().__init__(message)
class ValidationError(RecordException):
@classmethod
def build_message(cls, fqdn, reasons):
@@ -79,6 +85,23 @@ class ValidationError(RecordException):
self.reasons = reasons
class Rr(object):
'''
Simple object intended to be used with Record.from_rrs to allow providers
that work with RFC formatted rdata to share centralized parsing/encoding
code
'''
def __init__(self, name, _type, ttl, rdata):
self.name = name
self._type = _type
self.ttl = ttl
self.rdata = rdata
def __repr__(self):
return f'Rr<{self.name}, {self._type}, {self.ttl}, {self.rdata}'
class Record(EqualityTupleMixin):
log = getLogger('Record')
@@ -166,6 +189,27 @@ class Record(EqualityTupleMixin):
pass
return reasons
@classmethod
def from_rrs(cls, zone, rrs, lenient=False):
# group records by name & type so that multiple rdatas can be combined
# into a single record when needed
grouped = defaultdict(list)
for rr in rrs:
grouped[(rr.name, rr._type)].append(rr)
records = []
# walk the grouped rrs converting each one to data and then create a
# record with that data
for _, rrs in sorted(grouped.items()):
rr = rrs[0]
name = zone.hostname_from_fqdn(rr.name)
_class = cls._CLASSES[rr._type]
data = _class.data_from_rrs(rrs)
record = Record.new(zone, name, data, lenient=lenient)
records.append(record)
return records
def __init__(self, zone, name, data, source=None):
self.zone = zone
if name:
@@ -337,6 +381,14 @@ class ValuesMixin(object):
return reasons
@classmethod
def data_from_rrs(cls, rrs):
# type and TTL come from the first rr
rr = rrs[0]
# values come from parsing the rdata portion of all rrs
values = [cls._value_type.parse_rdata_text(rr.rdata) for rr in rrs]
return {'ttl': rr.ttl, 'type': rr._type, 'values': values}
def __init__(self, zone, name, data, source=None):
super().__init__(zone, name, data, source=source)
try:
@@ -365,6 +417,15 @@ class ValuesMixin(object):
return ret
@property
def rrs(self):
return (
self.fqdn,
self.ttl,
self._type,
[v.rdata_text for v in self.values],
)
def __repr__(self):
values = "', '".join([str(v) for v in self.values])
klass = self.__class__.__name__
@@ -433,6 +494,16 @@ class ValueMixin(object):
)
return reasons
@classmethod
def data_from_rrs(cls, rrs):
# single value, so single rr only...
rr = rrs[0]
return {
'ttl': rr.ttl,
'type': rr._type,
'value': cls._value_type.parse_rdata_text(rr.rdata),
}
def __init__(self, zone, name, data, source=None):
super().__init__(zone, name, data, source=source)
self.value = self._value_type.process(data['value'])
@@ -448,6 +519,10 @@ class ValueMixin(object):
ret['value'] = getattr(self.value, 'data', self.value)
return ret
@property
def rrs(self):
return self.fqdn, self.ttl, self._type, [self.value.rdata_text]
def __repr__(self):
klass = self.__class__.__name__
return f'<{klass} {self._type} {self.ttl}, {self.decoded_fqdn}, {self.value}>'
@@ -785,6 +860,10 @@ class _DynamicMixin(object):
class _TargetValue(str):
@classmethod
def parse_rdata_text(self, value):
return value
@classmethod
def validate(cls, data, _type):
reasons = []
@@ -810,6 +889,10 @@ class _TargetValue(str):
v = idna_encode(v)
return super().__new__(cls, v)
@property
def rdata_text(self):
return self
class CnameValue(_TargetValue):
pass
@@ -820,6 +903,10 @@ class DnameValue(_TargetValue):
class _IpAddress(str):
@classmethod
def parse_rdata_text(cls, value):
return value
@classmethod
def validate(cls, data, _type):
if not isinstance(data, (list, tuple)):
@@ -853,6 +940,10 @@ class _IpAddress(str):
v = str(cls._address_type(v))
return super().__new__(cls, v)
@property
def rdata_text(self):
return self
class Ipv4Address(_IpAddress):
_address_type = _IPv4Address
@@ -903,6 +994,18 @@ Record.register_type(AliasRecord)
class CaaValue(EqualityTupleMixin, dict):
# https://tools.ietf.org/html/rfc6844#page-5
@classmethod
def parse_rdata_text(cls, value):
try:
flags, tag, value = value.split(' ')
except ValueError:
raise RrParseError()
try:
flags = int(flags)
except ValueError:
pass
return {'flags': flags, 'tag': tag, 'value': value}
@classmethod
def validate(cls, data, _type):
if not isinstance(data, (list, tuple)):
@@ -963,6 +1066,10 @@ class CaaValue(EqualityTupleMixin, dict):
def data(self):
return self
@property
def rdata_text(self):
return f'{self.flags} {self.tag} {self.value}'
def _equality_tuple(self):
return (self.flags, self.tag, self.value)
@@ -1003,7 +1110,85 @@ Record.register_type(DnameRecord)
class LocValue(EqualityTupleMixin, dict):
# TODO: work out how to do defaults per RFC
# TODO: this does not really match the RFC, but it's stuck using the details
# of how the type was impelemented. Would be nice to rework things to match
# while maintaining backwards compatibility.
# https://www.rfc-editor.org/rfc/rfc1876.html
@classmethod
def parse_rdata_text(cls, value):
try:
value = value.replace('m', '')
(
lat_degrees,
lat_minutes,
lat_seconds,
lat_direction,
long_degrees,
long_minutes,
long_seconds,
long_direction,
altitude,
size,
precision_horz,
precision_vert,
) = value.split(' ')
except ValueError:
raise RrParseError()
try:
lat_degrees = int(lat_degrees)
except ValueError:
pass
try:
lat_minutes = int(lat_minutes)
except ValueError:
pass
try:
long_degrees = int(long_degrees)
except ValueError:
pass
try:
long_minutes = int(long_minutes)
except ValueError:
pass
try:
lat_seconds = float(lat_seconds)
except ValueError:
pass
try:
long_seconds = float(long_seconds)
except ValueError:
pass
try:
altitude = float(altitude)
except ValueError:
pass
try:
size = float(size)
except ValueError:
pass
try:
precision_horz = float(precision_horz)
except ValueError:
pass
try:
precision_vert = float(precision_vert)
except ValueError:
pass
return {
'lat_degrees': lat_degrees,
'lat_minutes': lat_minutes,
'lat_seconds': lat_seconds,
'lat_direction': lat_direction,
'long_degrees': long_degrees,
'long_minutes': long_minutes,
'long_seconds': long_seconds,
'long_direction': long_direction,
'altitude': altitude,
'size': size,
'precision_horz': precision_horz,
'precision_vert': precision_vert,
}
@classmethod
def validate(cls, data, _type):
@@ -1218,6 +1403,10 @@ class LocValue(EqualityTupleMixin, dict):
def data(self):
return self
@property
def rdata_text(self):
return f'{self.lat_degrees} {self.lat_minutes} {self.lat_seconds} {self.lat_direction} {self.long_degrees} {self.long_minutes} {self.long_seconds} {self.long_direction} {self.altitude}m {self.size}m {self.precision_horz}m {self.precision_vert}m'
def __hash__(self):
return hash(
(
@@ -1272,6 +1461,18 @@ Record.register_type(LocRecord)
class MxValue(EqualityTupleMixin, dict):
@classmethod
def parse_rdata_text(cls, value):
try:
preference, exchange = value.split(' ')
except ValueError:
raise RrParseError()
try:
preference = int(preference)
except ValueError:
pass
return {'preference': preference, 'exchange': exchange}
@classmethod
def validate(cls, data, _type):
if not isinstance(data, (list, tuple)):
@@ -1347,6 +1548,10 @@ class MxValue(EqualityTupleMixin, dict):
def data(self):
return self
@property
def rdata_text(self):
return f'{self.preference} {self.exchange}'
def __hash__(self):
return hash((self.preference, self.exchange))
@@ -1368,6 +1573,33 @@ Record.register_type(MxRecord)
class NaptrValue(EqualityTupleMixin, dict):
VALID_FLAGS = ('S', 'A', 'U', 'P')
@classmethod
def parse_rdata_text(cls, value):
try:
(
order,
preference,
flags,
service,
regexp,
replacement,
) = value.split(' ')
except ValueError:
raise RrParseError()
try:
order = int(order)
preference = int(preference)
except ValueError:
pass
return {
'order': order,
'preference': preference,
'flags': flags,
'service': service,
'regexp': regexp,
'replacement': replacement,
}
@classmethod
def validate(cls, data, _type):
if not isinstance(data, (list, tuple)):
@@ -1468,6 +1700,10 @@ class NaptrValue(EqualityTupleMixin, dict):
def data(self):
return self
@property
def rdata_text(self):
return f'{self.order} {self.preference} {self.flags} {self.service} {self.regexp} {self.replacement}'
def __hash__(self):
return hash(self.__repr__())
@@ -1500,6 +1736,10 @@ Record.register_type(NaptrRecord)
class _NsValue(str):
@classmethod
def parse_rdata_text(cls, value):
return value
@classmethod
def validate(cls, data, _type):
if not data:
@@ -1525,6 +1765,10 @@ class _NsValue(str):
v = idna_encode(v)
return super().__new__(cls, v)
@property
def rdata_text(self):
return self
class NsRecord(ValuesMixin, Record):
_type = 'NS'
@@ -1574,6 +1818,26 @@ class SshfpValue(EqualityTupleMixin, dict):
VALID_ALGORITHMS = (1, 2, 3, 4)
VALID_FINGERPRINT_TYPES = (1, 2)
@classmethod
def parse_rdata_text(self, value):
try:
algorithm, fingerprint_type, fingerprint = value.split(' ')
except ValueError:
raise RrParseError()
try:
algorithm = int(algorithm)
except ValueError:
pass
try:
fingerprint_type = int(fingerprint_type)
except ValueError:
pass
return {
'algorithm': algorithm,
'fingerprint_type': fingerprint_type,
'fingerprint': fingerprint,
}
@classmethod
def validate(cls, data, _type):
if not isinstance(data, (list, tuple)):
@@ -1645,6 +1909,10 @@ class SshfpValue(EqualityTupleMixin, dict):
def data(self):
return self
@property
def rdata_text(self):
return f'{self.algorithm} {self.fingerprint_type} {self.fingerprint}'
def __hash__(self):
return hash(self.__repr__())
@@ -1687,6 +1955,13 @@ class _ChunkedValuesMixin(ValuesMixin):
class _ChunkedValue(str):
_unescaped_semicolon_re = re.compile(r'\w;')
@classmethod
def parse_rdata_text(cls, value):
try:
return value.replace(';', '\\;')
except AttributeError:
return value
@classmethod
def validate(cls, data, _type):
if not data:
@@ -1708,6 +1983,10 @@ class _ChunkedValue(str):
ret.append(cls(v.replace('" "', '')))
return ret
@property
def rdata_text(self):
return self
class SpfRecord(_ChunkedValuesMixin, Record):
_type = 'SPF'
@@ -1718,6 +1997,31 @@ Record.register_type(SpfRecord)
class SrvValue(EqualityTupleMixin, dict):
@classmethod
def parse_rdata_text(self, value):
try:
priority, weight, port, target = value.split(' ')
except ValueError:
raise RrParseError()
try:
priority = int(priority)
except ValueError:
pass
try:
weight = int(weight)
except ValueError:
pass
try:
port = int(port)
except ValueError:
pass
return {
'priority': priority,
'weight': weight,
'port': port,
'target': target,
}
@classmethod
def validate(cls, data, _type):
if not isinstance(data, (list, tuple)):
@@ -1840,6 +2144,36 @@ Record.register_type(SrvRecord)
class TlsaValue(EqualityTupleMixin, dict):
@classmethod
def parse_rdata_text(self, value):
try:
(
certificate_usage,
selector,
matching_type,
certificate_association_data,
) = value.split(' ')
except ValueError:
raise RrParseError()
try:
certificate_usage = int(certificate_usage)
except ValueError:
pass
try:
selector = int(selector)
except ValueError:
pass
try:
matching_type = int(matching_type)
except ValueError:
pass
return {
'certificate_usage': certificate_usage,
'selector': selector,
'matching_type': matching_type,
'certificate_association_data': certificate_association_data,
}
@classmethod
def validate(cls, data, _type):
if not isinstance(data, (list, tuple)):
@@ -1932,6 +2266,10 @@ class TlsaValue(EqualityTupleMixin, dict):
def certificate_association_data(self, value):
self['certificate_association_data'] = value
@property
def rdata_text(self):
return f'{self.certificate_usage} {self.selector} {self.matching_type} {self.certificate_association_data}'
def _equality_tuple(self):
return (
self.certificate_usage,

View File

@@ -9,12 +9,11 @@ import dns.rdatatype
from dns.exception import DNSException
from collections import defaultdict
from os import listdir
from os.path import join
import logging
from ..record import Record
from ..record import Record, Rr
from .base import BaseSource
@@ -42,110 +41,6 @@ class AxfrBaseSource(BaseSource):
def __init__(self, id):
super().__init__(id)
def _data_for_multiple(self, _type, records):
return {
'ttl': records[0]['ttl'],
'type': _type,
'values': [r['value'] for r in records],
}
_data_for_A = _data_for_multiple
_data_for_AAAA = _data_for_multiple
_data_for_NS = _data_for_multiple
_data_for_PTR = _data_for_multiple
def _data_for_CAA(self, _type, records):
values = []
for record in records:
flags, tag, value = record['value'].split(' ', 2)
values.append(
{'flags': flags, 'tag': tag, 'value': value.replace('"', '')}
)
return {'ttl': records[0]['ttl'], 'type': _type, 'values': values}
def _data_for_LOC(self, _type, records):
values = []
for record in records:
(
lat_degrees,
lat_minutes,
lat_seconds,
lat_direction,
long_degrees,
long_minutes,
long_seconds,
long_direction,
altitude,
size,
precision_horz,
precision_vert,
) = (record['value'].replace('m', '').split(' ', 11))
values.append(
{
'lat_degrees': lat_degrees,
'lat_minutes': lat_minutes,
'lat_seconds': lat_seconds,
'lat_direction': lat_direction,
'long_degrees': long_degrees,
'long_minutes': long_minutes,
'long_seconds': long_seconds,
'long_direction': long_direction,
'altitude': altitude,
'size': size,
'precision_horz': precision_horz,
'precision_vert': precision_vert,
}
)
return {'ttl': records[0]['ttl'], 'type': _type, 'values': values}
def _data_for_MX(self, _type, records):
values = []
for record in records:
preference, exchange = record['value'].split(' ', 1)
values.append({'preference': preference, 'exchange': exchange})
return {'ttl': records[0]['ttl'], 'type': _type, 'values': values}
def _data_for_TXT(self, _type, records):
values = [value['value'].replace(';', '\\;') for value in records]
return {'ttl': records[0]['ttl'], 'type': _type, 'values': values}
_data_for_SPF = _data_for_TXT
def _data_for_single(self, _type, records):
record = records[0]
return {'ttl': record['ttl'], 'type': _type, 'value': record['value']}
_data_for_CNAME = _data_for_single
def _data_for_SRV(self, _type, records):
values = []
for record in records:
priority, weight, port, target = record['value'].split(' ', 3)
values.append(
{
'priority': priority,
'weight': weight,
'port': port,
'target': target,
}
)
return {'type': _type, 'ttl': records[0]['ttl'], 'values': values}
def _data_for_SSHFP(self, _type, records):
values = []
for record in records:
algorithm, fingerprint_type, fingerprint = record['value'].split(
' ', 2
)
values.append(
{
'algorithm': algorithm,
'fingerprint_type': fingerprint_type,
'fingerprint': fingerprint,
}
)
return {'type': _type, 'ttl': records[0]['ttl'], 'values': values}
def populate(self, zone, target=False, lenient=False):
self.log.debug(
'populate: name=%s, target=%s, lenient=%s',
@@ -154,26 +49,10 @@ class AxfrBaseSource(BaseSource):
lenient,
)
values = defaultdict(lambda: defaultdict(list))
for record in self.zone_records(zone):
_type = record['type']
if _type not in self.SUPPORTS:
continue
name = zone.hostname_from_fqdn(record['name'])
values[name][record['type']].append(record)
before = len(zone.records)
for name, types in values.items():
for _type, records in types.items():
data_for = getattr(self, f'_data_for_{_type}')
record = Record.new(
zone,
name,
data_for(_type, records),
source=self,
lenient=lenient,
)
zone.add_record(record, lenient=lenient)
rrs = self.zone_records(zone)
for record in Record.from_rrs(zone, rrs, lenient=lenient):
zone.add_record(record, lenient=lenient)
self.log.info(
'populate: found %s records', len(zone.records) - before
@@ -218,14 +97,8 @@ class AxfrSource(AxfrBaseSource):
for (name, ttl, rdata) in z.iterate_rdatas():
rdtype = dns.rdatatype.to_text(rdata.rdtype)
records.append(
{
"name": name.to_text(),
"ttl": ttl,
"type": rdtype,
"value": rdata.to_text(),
}
)
if rdtype in self.SUPPORTS:
records.append(Rr(name.to_text(), rdtype, ttl, rdata.to_text()))
return records
@@ -302,20 +175,17 @@ class ZoneFileSource(AxfrBaseSource):
if zone.name not in self._zone_records:
try:
z = self._load_zone_file(zone.name)
records = []
for (name, ttl, rdata) in z.iterate_rdatas():
rdtype = dns.rdatatype.to_text(rdata.rdtype)
records.append(
{
"name": name.to_text(),
"ttl": ttl,
"type": rdtype,
"value": rdata.to_text(),
}
)
self._zone_records[zone.name] = records
except ZoneFileSourceNotFound:
return []
records = []
for (name, ttl, rdata) in z.iterate_rdatas():
rdtype = dns.rdatatype.to_text(rdata.rdtype)
if rdtype in self.SUPPORTS:
records.append(
Rr(name.to_text(), rdtype, ttl, rdata.to_text())
)
self._zone_records[zone.name] = records
return self._zone_records[zone.name]

View File

@@ -27,6 +27,8 @@ from octodns.record import (
PtrRecord,
Record,
RecordException,
Rr,
RrParseError,
SshfpRecord,
SshfpValue,
SpfRecord,
@@ -39,11 +41,13 @@ from octodns.record import (
UrlfwdRecord,
UrlfwdValue,
ValidationError,
ValuesMixin,
_ChunkedValue,
_Dynamic,
_DynamicPool,
_DynamicRule,
_NsValue,
ValuesMixin,
_TargetValue,
)
from octodns.zone import Zone
@@ -293,6 +297,26 @@ class TestRecord(TestCase):
DummyRecord().__repr__()
def test_ip_address_rdata_text(self):
# anything goes, we're a noop
for s in (
None,
'',
'word',
42,
42.43,
'1.2.3',
'some.words.that.here',
'1.2.word.4',
'1.2.3.4',
):
self.assertEqual(s, Ipv4Address.parse_rdata_text(s))
zone = Zone('unit.tests.', [])
a = ARecord(zone, 'a', {'ttl': 42, 'value': '1.2.3.4'})
self.assertEqual('1.2.3.4', a.values[0].rdata_text)
def test_values_mixin_data(self):
# no values, no value or values in data
a = ARecord(self.zone, '', {'type': 'A', 'ttl': 600, 'values': []})
@@ -461,6 +485,26 @@ class TestRecord(TestCase):
# __repr__ doesn't blow up
a.__repr__()
def test_target_rdata_text(self):
# anything goes, we're a noop
for s in (
None,
'',
'word',
42,
42.43,
'1.2.3',
'some.words.that.here',
'1.2.word.4',
'1.2.3.4',
):
self.assertEqual(s, _TargetValue.parse_rdata_text(s))
zone = Zone('unit.tests.', [])
a = AliasRecord(zone, 'a', {'ttl': 42, 'value': 'some.target.'})
self.assertEqual('some.target.', a.value.rdata_text)
def test_caa(self):
a_values = [
CaaValue({'flags': 0, 'tag': 'issue', 'value': 'ca.example.net'}),
@@ -521,6 +565,56 @@ class TestRecord(TestCase):
# __repr__ doesn't blow up
a.__repr__()
def test_caa_value_rdata_text(self):
# empty string won't parse
with self.assertRaises(RrParseError):
CaaValue.parse_rdata_text('')
# single word won't parse
with self.assertRaises(RrParseError):
CaaValue.parse_rdata_text('nope')
# 2nd word won't parse
with self.assertRaises(RrParseError):
CaaValue.parse_rdata_text('0 tag')
# 4th word won't parse
with self.assertRaises(RrParseError):
CaaValue.parse_rdata_text('1 tag value another')
# flags not an int, will parse
self.assertEqual(
{'flags': 'one', 'tag': 'tag', 'value': 'value'},
CaaValue.parse_rdata_text('one tag value'),
)
# valid
self.assertEqual(
{'flags': 0, 'tag': 'tag', 'value': '99148c81'},
CaaValue.parse_rdata_text('0 tag 99148c81'),
)
zone = Zone('unit.tests.', [])
a = CaaRecord(
zone,
'caa',
{
'ttl': 32,
'values': [
{'flags': 1, 'tag': 'tag1', 'value': '99148c81'},
{'flags': 2, 'tag': 'tag2', 'value': '99148c44'},
],
},
)
self.assertEqual(1, a.values[0].flags)
self.assertEqual('tag1', a.values[0].tag)
self.assertEqual('99148c81', a.values[0].value)
self.assertEqual('1 tag1 99148c81', a.values[0].rdata_text)
self.assertEqual(2, a.values[1].flags)
self.assertEqual('tag2', a.values[1].tag)
self.assertEqual('99148c44', a.values[1].value)
self.assertEqual('2 tag2 99148c44', a.values[1].rdata_text)
def test_cname(self):
self.assertSingleValue(CnameRecord, 'target.foo.com.', 'other.foo.com.')
@@ -623,6 +717,92 @@ class TestRecord(TestCase):
# __repr__ doesn't blow up
a.__repr__()
def test_loc_value_rdata_text(self):
# only the exact correct number of words is allowed
for i in tuple(range(0, 12)) + (13,):
s = ''.join(['word'] * i)
with self.assertRaises(RrParseError):
LocValue.parse_rdata_text(s)
# type conversions are best effort
self.assertEqual(
{
'altitude': 'six',
'lat_degrees': 'zero',
'lat_direction': 'S',
'lat_minutes': 'one',
'lat_seconds': 'two',
'long_degrees': 'three',
'long_direction': 'W',
'long_minutes': 'four',
'long_seconds': 'five',
'precision_horz': 'eight',
'precision_vert': 'nine',
'size': 'seven',
},
LocValue.parse_rdata_text(
'zero one two S three four five W six seven eight nine'
),
)
# valid
s = '0 1 2.2 N 3 4 5.5 E 6.6m 7.7m 8.8m 9.9m'
self.assertEqual(
{
'altitude': 6.6,
'lat_degrees': 0,
'lat_direction': 'N',
'lat_minutes': 1,
'lat_seconds': 2.2,
'long_degrees': 3,
'long_direction': 'E',
'long_minutes': 4,
'long_seconds': 5.5,
'precision_horz': 8.8,
'precision_vert': 9.9,
'size': 7.7,
},
LocValue.parse_rdata_text(s),
)
# make sure that the cstor is using parse_rdata_text
zone = Zone('unit.tests.', [])
a = LocRecord(
zone,
'mx',
{
'type': 'LOC',
'ttl': 42,
'value': {
'altitude': 6.6,
'lat_degrees': 0,
'lat_direction': 'N',
'lat_minutes': 1,
'lat_seconds': 2.2,
'long_degrees': 3,
'long_direction': 'E',
'long_minutes': 4,
'long_seconds': 5.5,
'precision_horz': 8.8,
'precision_vert': 9.9,
'size': 7.7,
},
},
)
self.assertEqual(0, a.values[0].lat_degrees)
self.assertEqual(1, a.values[0].lat_minutes)
self.assertEqual(2.2, a.values[0].lat_seconds)
self.assertEqual('N', a.values[0].lat_direction)
self.assertEqual(3, a.values[0].long_degrees)
self.assertEqual(4, a.values[0].long_minutes)
self.assertEqual(5.5, a.values[0].long_seconds)
self.assertEqual('E', a.values[0].long_direction)
self.assertEqual(6.6, a.values[0].altitude)
self.assertEqual(7.7, a.values[0].size)
self.assertEqual(8.8, a.values[0].precision_horz)
self.assertEqual(9.9, a.values[0].precision_vert)
self.assertEqual(s, a.values[0].rdata_text)
def test_mx(self):
a_values = [
MxValue({'preference': 10, 'exchange': 'smtp1.'}),
@@ -674,6 +854,51 @@ class TestRecord(TestCase):
# __repr__ doesn't blow up
a.__repr__()
def test_mx_value_rdata_text(self):
# empty string won't parse
with self.assertRaises(RrParseError):
MxValue.parse_rdata_text('')
# single word won't parse
with self.assertRaises(RrParseError):
MxValue.parse_rdata_text('nope')
# 3rd word won't parse
with self.assertRaises(RrParseError):
MxValue.parse_rdata_text('10 mx.unit.tests. another')
# preference not an int
self.assertEqual(
{'preference': 'abc', 'exchange': 'mx.unit.tests.'},
MxValue.parse_rdata_text('abc mx.unit.tests.'),
)
# valid
self.assertEqual(
{'preference': 10, 'exchange': 'mx.unit.tests.'},
MxValue.parse_rdata_text('10 mx.unit.tests.'),
)
zone = Zone('unit.tests.', [])
a = MxRecord(
zone,
'mx',
{
'ttl': 32,
'values': [
{'preference': 11, 'exchange': 'mail1.unit.tests.'},
{'preference': 12, 'exchange': 'mail2.unit.tests.'},
],
},
)
self.assertEqual(11, a.values[0].preference)
self.assertEqual('mail1.unit.tests.', a.values[0].exchange)
self.assertEqual('11 mail1.unit.tests.', a.values[0].rdata_text)
self.assertEqual(12, a.values[1].preference)
self.assertEqual('mail2.unit.tests.', a.values[1].exchange)
self.assertEqual('12 mail2.unit.tests.', a.values[1].rdata_text)
def test_naptr(self):
a_values = [
NaptrValue(
@@ -964,6 +1189,72 @@ class TestRecord(TestCase):
o.replacement = '1'
self.assertEqual('1', o.replacement)
def test_naptr_value_rdata_text(self):
# things with the wrong number of words won't parse
for v in (
'',
'one',
'one two',
'one two three',
'one two three four',
'one two three four five',
'one two three four five six seven',
):
with self.assertRaises(RrParseError):
NaptrValue.parse_rdata_text(v)
# we don't care if the types of things are correct when parsing rr text
self.assertEqual(
{
'order': 'one',
'preference': 'two',
'flags': 'three',
'service': 'four',
'regexp': 'five',
'replacement': 'six',
},
NaptrValue.parse_rdata_text('one two three four five six'),
)
# order and preference will be converted to int's when possible
self.assertEqual(
{
'order': 1,
'preference': 2,
'flags': 'three',
'service': 'four',
'regexp': 'five',
'replacement': 'six',
},
NaptrValue.parse_rdata_text('1 2 three four five six'),
)
# make sure that the cstor is using parse_rdata_text
zone = Zone('unit.tests.', [])
a = NaptrRecord(
zone,
'naptr',
{
'ttl': 32,
'value': {
'order': 1,
'preference': 2,
'flags': 'S',
'service': 'service',
'regexp': 'regexp',
'replacement': 'replacement',
},
},
)
self.assertEqual(1, a.values[0].order)
self.assertEqual(2, a.values[0].preference)
self.assertEqual('S', a.values[0].flags)
self.assertEqual('service', a.values[0].service)
self.assertEqual('regexp', a.values[0].regexp)
self.assertEqual('replacement', a.values[0].replacement)
s = '1 2 S service regexp replacement'
self.assertEqual(s, a.values[0].rdata_text)
def test_ns(self):
a_values = ['5.6.7.8.', '6.7.8.9.', '7.8.9.0.']
a_data = {'ttl': 30, 'values': a_values}
@@ -980,6 +1271,25 @@ class TestRecord(TestCase):
self.assertEqual([b_value], b.values)
self.assertEqual(b_data, b.data)
def test_ns_value_rdata_text(self):
# anything goes, we're a noop
for s in (
None,
'',
'word',
42,
42.43,
'1.2.3',
'some.words.that.here',
'1.2.word.4',
'1.2.3.4',
):
self.assertEqual(s, _NsValue.parse_rdata_text(s))
zone = Zone('unit.tests.', [])
a = NsRecord(zone, 'a', {'ttl': 42, 'value': 'some.target.'})
self.assertEqual('some.target.', a.values[0].rdata_text)
def test_sshfp(self):
a_values = [
SshfpValue(
@@ -1048,11 +1358,85 @@ class TestRecord(TestCase):
# __repr__ doesn't blow up
a.__repr__()
def test_sshfp_value_rdata_text(self):
# empty string won't parse
with self.assertRaises(RrParseError):
SshfpValue.parse_rdata_text('')
# single word won't parse
with self.assertRaises(RrParseError):
SshfpValue.parse_rdata_text('nope')
# 3rd word won't parse
with self.assertRaises(RrParseError):
SshfpValue.parse_rdata_text('0 1 00479b27 another')
# algorithm and fingerprint_type not ints
self.assertEqual(
{
'algorithm': 'one',
'fingerprint_type': 'two',
'fingerprint': '00479b27',
},
SshfpValue.parse_rdata_text('one two 00479b27'),
)
# valid
self.assertEqual(
{'algorithm': 1, 'fingerprint_type': 2, 'fingerprint': '00479b27'},
SshfpValue.parse_rdata_text('1 2 00479b27'),
)
zone = Zone('unit.tests.', [])
a = SshfpRecord(
zone,
'sshfp',
{
'ttl': 32,
'value': {
'algorithm': 1,
'fingerprint_type': 2,
'fingerprint': '00479b27',
},
},
)
self.assertEqual(1, a.values[0].algorithm)
self.assertEqual(2, a.values[0].fingerprint_type)
self.assertEqual('00479b27', a.values[0].fingerprint)
self.assertEqual('1 2 00479b27', a.values[0].rdata_text)
def test_spf(self):
a_values = ['spf1 -all', 'spf1 -hrm']
b_value = 'spf1 -other'
self.assertMultipleValues(SpfRecord, a_values, b_value)
def test_chunked_value_rdata_text(self):
for s in (
None,
'',
'word',
42,
42.43,
'1.2.3',
'some.words.that.here',
'1.2.word.4',
'1.2.3.4',
):
self.assertEqual(s, _ChunkedValue.parse_rdata_text(s))
# semi-colons are escaped
self.assertEqual(
'Hello\\; World!', _ChunkedValue.parse_rdata_text('Hello; World!')
)
# since we're always a string validate and __init__ don't
# parse_rdata_text
zone = Zone('unit.tests.', [])
a = SpfRecord(zone, 'a', {'ttl': 42, 'value': 'some.target.'})
self.assertEqual('some.target.', a.values[0].rdata_text)
def test_srv(self):
a_values = [
SrvValue(
@@ -1117,6 +1501,69 @@ class TestRecord(TestCase):
# __repr__ doesn't blow up
a.__repr__()
def test_srv_value_rdata_text(self):
# empty string won't parse
with self.assertRaises(RrParseError):
SrvValue.parse_rdata_text('')
# single word won't parse
with self.assertRaises(RrParseError):
SrvValue.parse_rdata_text('nope')
# 2nd word won't parse
with self.assertRaises(RrParseError):
SrvValue.parse_rdata_text('1 2')
# 3rd word won't parse
with self.assertRaises(RrParseError):
SrvValue.parse_rdata_text('1 2 3')
# 5th word won't parse
with self.assertRaises(RrParseError):
SrvValue.parse_rdata_text('1 2 3 4 5')
# priority weight and port not ints
self.assertEqual(
{
'priority': 'one',
'weight': 'two',
'port': 'three',
'target': 'srv.unit.tests.',
},
SrvValue.parse_rdata_text('one two three srv.unit.tests.'),
)
# valid
self.assertEqual(
{
'priority': 1,
'weight': 2,
'port': 3,
'target': 'srv.unit.tests.',
},
SrvValue.parse_rdata_text('1 2 3 srv.unit.tests.'),
)
zone = Zone('unit.tests.', [])
a = SrvRecord(
zone,
'_srv._tcp',
{
'ttl': 32,
'value': {
'priority': 1,
'weight': 2,
'port': 3,
'target': 'srv.unit.tests.',
},
},
)
self.assertEqual(1, a.values[0].priority)
self.assertEqual(2, a.values[0].weight)
self.assertEqual(3, a.values[0].port)
self.assertEqual('srv.unit.tests.', a.values[0].target)
def test_tlsa(self):
a_values = [
TlsaValue(
@@ -1218,6 +1665,70 @@ class TestRecord(TestCase):
# __repr__ doesn't blow up
a.__repr__()
def test_tsla_value_rdata_text(self):
# empty string won't parse
with self.assertRaises(RrParseError):
TlsaValue.parse_rdata_text('')
# single word won't parse
with self.assertRaises(RrParseError):
TlsaValue.parse_rdata_text('nope')
# 2nd word won't parse
with self.assertRaises(RrParseError):
TlsaValue.parse_rdata_text('1 2')
# 3rd word won't parse
with self.assertRaises(RrParseError):
TlsaValue.parse_rdata_text('1 2 3')
# 5th word won't parse
with self.assertRaises(RrParseError):
TlsaValue.parse_rdata_text('1 2 3 abcd another')
# non-ints
self.assertEqual(
{
'certificate_usage': 'one',
'selector': 'two',
'matching_type': 'three',
'certificate_association_data': 'abcd',
},
TlsaValue.parse_rdata_text('one two three abcd'),
)
# valid
self.assertEqual(
{
'certificate_usage': 1,
'selector': 2,
'matching_type': 3,
'certificate_association_data': 'abcd',
},
TlsaValue.parse_rdata_text('1 2 3 abcd'),
)
zone = Zone('unit.tests.', [])
a = TlsaRecord(
zone,
'tlsa',
{
'ttl': 32,
'value': {
'certificate_usage': 2,
'selector': 1,
'matching_type': 0,
'certificate_association_data': 'abcd',
},
},
)
self.assertEqual(2, a.values[0].certificate_usage)
self.assertEqual(1, a.values[0].selector)
self.assertEqual(0, a.values[0].matching_type)
self.assertEqual('abcd', a.values[0].certificate_association_data)
self.assertEqual('2 1 0 abcd', a.values[0].rdata_text)
def test_txt(self):
a_values = ['a one', 'a two']
b_value = 'b other'
@@ -2026,6 +2537,30 @@ class TestRecord(TestCase):
values.add(b)
self.assertTrue(b in values)
def test_rr(self):
# nothing much to test, just make sure that things don't blow up
Rr('name', 'type', 42, 'Hello World!').__repr__()
zone = Zone('unit.tests.', [])
record = Record.new(
zone,
'a',
{'ttl': 42, 'type': 'A', 'values': ['1.2.3.4', '2.3.4.5']},
)
self.assertEqual(
('a.unit.tests.', 42, 'A', ['1.2.3.4', '2.3.4.5']), record.rrs
)
record = Record.new(
zone,
'cname',
{'ttl': 43, 'type': 'CNAME', 'value': 'target.unit.tests.'},
)
self.assertEqual(
('cname.unit.tests.', 43, 'CNAME', ['target.unit.tests.']),
record.rrs,
)
class TestRecordValidation(TestCase):
zone = Zone('unit.tests.', [])