diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a59c14..25c23ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ * octodns-report access --lenient flag to allow running reports with records sourced from providers with non-compliant record data. * Correctly handle FQDNs in TinyDNS config files that end with trailing .'s +* Complete rewrite of TinyDnsBaseSource to fully implement the spec and the ipv6 + extensions ## v1.0.0.rc0 - 2023-05-16 - First of the ones diff --git a/octodns/record/base.py b/octodns/record/base.py index b6e3ed3..23b9a5c 100644 --- a/octodns/record/base.py +++ b/octodns/record/base.py @@ -123,6 +123,10 @@ class Record(EqualityTupleMixin): return records + @classmethod + def parse_rdata_texts(cls, rdatas): + return [cls._value_type.parse_rdata_text(r) for r in rdatas] + def __init__(self, zone, name, data, source=None): self.zone = zone if name: diff --git a/octodns/source/tinydns.py b/octodns/source/tinydns.py index 4d32e52..1faf2f7 100755 --- a/octodns/source/tinydns.py +++ b/octodns/source/tinydns.py @@ -3,7 +3,6 @@ # import logging -import re import textwrap from collections import defaultdict from ipaddress import ip_address @@ -11,96 +10,399 @@ from os import listdir from os.path import join from ..record import Record -from ..zone import DuplicateRecordException, SubzoneRecordException from .base import BaseSource +def _unique(values): + try: + # this will work if they're simple strings + return list(set(values)) + except TypeError: + pass + # if they're dictionaries it's a bit more involved since dict's aren't + # hashable, based on https://stackoverflow.com/a/38521207 + return [dict(s) for s in set(frozenset(v.items()) for v in values)] + + class TinyDnsBaseSource(BaseSource): SUPPORTS_GEO = False SUPPORTS_DYNAMIC = False - SUPPORTS = set(('A', 'CNAME', 'MX', 'NS', 'TXT', 'AAAA')) - - split_re = re.compile(r':+') def __init__(self, id, default_ttl=3600): super().__init__(id) self.default_ttl = default_ttl - def _data_for_A(self, _type, records): - values = [] - for record in records: - if record[0] != '0.0.0.0': - values.append(record[0]) - if len(values) == 0: - return - try: - ttl = records[0][1] - except IndexError: - ttl = self.default_ttl - return {'ttl': ttl, 'type': _type, 'values': values} + @property + def SUPPORTS(self): + # All record types, including those registered by 3rd party modules + return set(Record.registered_types().keys()) + + def _ttl_for(self, lines, index): + # see if we can find a ttl on any of the lines, first one wins + for line in lines: + try: + return int(line[index]) + except IndexError: + pass + # and if we don't use the default + return self.default_ttl + + def _records_for_at(self, zone, name, lines, arpa=False): + # @fqdn:ip:x:dist:ttl:timestamp:lo + # MX (and optional A) + + if arpa: + # no arpa + return [] + + if not zone.owns('MX', name): + # if name doesn't live under our zone there's nothing for us to do + return + + ttl = self._ttl_for(lines, 4) - def _data_for_AAAA(self, _type, records): values = [] - for record in records: + for line in lines: + mx = line[2] + # if there's a . in the mx we hit a special case and use it as-is + if '.' not in mx: + # otherwise we treat it as the MX hostnam and construct the rest + mx = f'{mx}.mx.{zone.name}' + elif mx[-1] != '.': + mx = f'{mx}.' + + # default distance is 0 + try: + dist = line[3] or 0 + except IndexError: + dist = 0 + + # if we have an IP then we need to create an A for the MX + ip = line[1] + if ip and zone.owns('A', mx): + yield 'A', mx, ttl, [ip] + + values.append({'preference': dist, 'exchange': mx}) + + yield 'MX', name, ttl, values + + def _records_for_C(self, zone, name, lines, arpa=False): + # Cfqdn:p:ttl:timestamp:lo + # CNAME + + if arpa: + # no arpa + return [] + + if not zone.owns('CNAME', name): + # if name doesn't live under our zone there's nothing for us to do + return + + value = lines[0][1] + if value[-1] != '.': + value = f'{value}.' + + ttl = self._ttl_for(lines, 2) + + yield 'CNAME', name, ttl, [value] + + def _records_for_caret(self, zone, name, lines, arpa=False): + # ^fqdn:p:ttl:timestamp:lo + # PTR, line may be a A/AAAA or straight PTR + + if not arpa: + # we only operate on arpa + return [] + + names = defaultdict(list) + for line in lines: + if line[0].endswith('in-addr.arpa') or line[0].endswith( + 'ip6.arpa.' + ): + # it's a straight PTR record, already in in-addr.arpa format, + # 2nd item is the name it points to + name = line[0] + value = line[1] + else: + # it's not a PTR we need to build up the PTR data from what + # we're given + value = line[0] + addr = line[1] + if '.' not in addr: + addr = u':'.join(textwrap.wrap(line[1], 4)) + addr = ip_address(addr) + name = addr.reverse_pointer + + if value[-1] != '.': + value = f'{value}.' + names[name].append(value) + + ttl = self._ttl_for(lines, 2) + + for name, values in names.items(): + if zone.owns('PTR', name): + yield 'PTR', name, ttl, values + + def _records_for_equal(self, zone, name, lines, arpa=False): + # =fqdn:ip:ttl:timestamp:lo + # A (arpa False) & PTR (arpa True) + if arpa: + yield from self._records_for_caret(zone, name, lines, arpa) + else: + yield from self._records_for_plus(zone, name, lines, arpa) + + def _records_for_dot(self, zone, name, lines, arpa=False): + # .fqdn:ip:x:ttl:timestamp:lo + # NS (and optional A) + + if arpa: + # no arpa + return [] + + if not zone.owns('NS', name): + # if name doesn't live under our zone there's nothing for us to do + return + + ttl = self._ttl_for(lines, 3) + + values = [] + for line in lines: + ns = line[2] + # if there's a . in the ns we hit a special case and use it as-is + if '.' not in ns: + # otherwise we treat it as the NS hostnam and construct the rest + ns = f'{ns}.ns.{zone.name}' + elif ns[-1] != '.': + ns = f'{ns}.' + + # if we have an IP then we need to create an A for the MX + ip = line[1] + if ip and zone.owns('A', ns): + yield 'A', ns, ttl, [ip] + + values.append(ns) + + yield 'NS', name, ttl, values + + _records_for_amp = _records_for_dot + + def _records_for_plus(self, zone, name, lines, arpa=False): + # +fqdn:ip:ttl:timestamp:lo + # A + + if arpa: + # no arpa + return [] + + if not zone.owns('A', name): + # if name doesn't live under our zone there's nothing for us to do + return + + # collect our ip(s) + ips = [l[1] for l in lines if l[1] != '0.0.0.0'] + + if not ips: + # we didn't find any value ips so nothing to do + return [] + + ttl = self._ttl_for(lines, 2) + + yield 'A', name, ttl, ips + + def _records_for_quote(self, zone, name, lines, arpa=False): + # 'fqdn:s:ttl:timestamp:lo + # TXT + + if arpa: + # no arpa + return [] + + if not zone.owns('TXT', name): + # if name doesn't live under our zone there's nothing for us to do + return + + # collect our ip(s) + values = [ + l[1].encode('latin1').decode('unicode-escape').replace(";", "\\;") + for l in lines + ] + + ttl = self._ttl_for(lines, 2) + + yield 'TXT', name, ttl, values + + def _records_for_three(self, zone, name, lines, arpa=False): + # 3fqdn:ip:ttl:timestamp:lo + # AAAA + + if arpa: + # no arpa + return [] + + if not zone.owns('AAAA', name): + # if name doesn't live under our zone there's nothing for us to do + return + + # collect our ip(s) + ips = [] + for line in lines: # TinyDNS files have the ipv6 address written in full, but with the # colons removed. This inserts a colon every 4th character to make # the address correct. - values.append(u":".join(textwrap.wrap(record[0], 4))) - try: - ttl = records[0][1] - except IndexError: - ttl = self.default_ttl - return {'ttl': ttl, 'type': _type, 'values': values} + ips.append(u':'.join(textwrap.wrap(line[1], 4))) + + ttl = self._ttl_for(lines, 2) + + yield 'AAAA', name, ttl, ips + + def _records_for_S(self, zone, name, lines, arpa=False): + # Sfqdn:ip:x:port:priority:weight:ttl:timestamp:lo + # SRV + + if arpa: + # no arpa + return [] + + if not zone.owns('SRV', name): + # if name doesn't live under our zone there's nothing for us to do + return + + ttl = self._ttl_for(lines, 6) - def _data_for_TXT(self, _type, records): values = [] + for line in lines: + target = line[2] + # if there's a . in the mx we hit a special case and use it as-is + if '.' not in target: + # otherwise we treat it as the MX hostnam and construct the rest + target = f'{target}.srv.{zone.name}' + elif target[-1] != '.': + target = f'{target}.' - for record in records: - new_value = ( - record[0] - .encode('latin1') - .decode('unicode-escape') - .replace(";", "\\;") + # if we have an IP then we need to create an A for the SRV + # has to be present, but can be empty + ip = line[1] + if ip and zone.owns('A', target): + yield 'A', target, ttl, [ip] + + # required + port = int(line[3]) + + # optional, default 0 + try: + priority = int(line[4] or 0) + except IndexError: + priority = 0 + + # optional, default 0 + try: + weight = int(line[5] or 0) + except IndexError: + weight = 0 + + values.append( + { + 'priority': priority, + 'weight': weight, + 'port': port, + 'target': target, + } ) - values.append(new_value) - try: - ttl = records[0][1] - except IndexError: - ttl = self.default_ttl - return {'ttl': ttl, 'type': _type, 'values': values} + yield 'SRV', name, ttl, values - def _data_for_CNAME(self, _type, records): - first = records[0] - try: - ttl = first[1] - except IndexError: - ttl = self.default_ttl - return {'ttl': ttl, 'type': _type, 'value': f'{first[0]}.'} + def _records_for_colon(self, zone, name, lines, arpa=False): + # :fqdn:n:rdata:ttl:timestamp:lo + # ANY - def _data_for_MX(self, _type, records): - try: - ttl = records[0][2] - except IndexError: - ttl = self.default_ttl - return { - 'ttl': ttl, - 'type': _type, - 'values': [ - {'preference': r[1], 'exchange': f'{r[0]}.'} for r in records - ], - } + if arpa: + # no arpa + return [] - def _data_for_NS(self, _type, records): - try: - ttl = records[0][1] - except IndexError: - ttl = self.default_ttl - return { - 'ttl': ttl, - 'type': _type, - 'values': [f'{r[0]}.' for r in records], - } + if not zone.owns('SRV', name): + # if name doesn't live under our zone there's nothing for us to do + return + + # group by lines by the record type + types = defaultdict(list) + for line in lines: + types[line[1].upper()].append(line) + + classes = Record.registered_types() + for _type, lines in types.items(): + _class = classes.get(_type, None) + if not _class: + self.log.info( + '_records_for_colon: unrecognized type %s, %s', _type, line + ) + continue + + ttl = self._ttl_for(lines, 3) + + rdatas = [l[2] for l in lines] + yield _type, name, ttl, _class.parse_rdata_texts(rdatas) + + def _records_for_six(self, zone, name, lines, arpa=False): + # 6fqdn:ip:ttl:timestamp:lo + # AAAA (arpa False) & PTR (arpa True) + if arpa: + yield from self._records_for_caret(zone, name, lines, arpa) + else: + yield from self._records_for_three(zone, name, lines, arpa) + + SYMBOL_MAP = { + '=': _records_for_equal, # A + '^': _records_for_caret, # PTR + '.': _records_for_dot, # NS + 'C': _records_for_C, # CNAME + '+': _records_for_plus, # A + '@': _records_for_at, # MX + '&': _records_for_amp, # NS + '\'': _records_for_quote, # TXT + '3': _records_for_three, # AAAA + 'S': _records_for_S, # SRV + ':': _records_for_colon, # arbitrary + '6': _records_for_six, # AAAA + } + + def _process_lines(self, zone, lines): + data = defaultdict(lambda: defaultdict(list)) + for line in lines: + symbol = line[0] + + # Skip type, remove trailing comments, and omit newline + line = line[1:].split('#', 1)[0] + # Split on :'s including :: and strip leading/trailing ws + line = [p.strip() for p in line.split(':')] + data[symbol][line[0]].append(line) + + return data + + def _process_symbols(self, zone, symbols, arpa): + types = defaultdict(lambda: defaultdict(list)) + ttls = defaultdict(dict) + for symbol, names in symbols.items(): + records_for = self.SYMBOL_MAP.get(symbol, None) + if not records_for: + # Something we don't care about + self.log.info( + 'skipping type %s, not supported/interested', symbol + ) + continue + + for name, lines in names.items(): + for _type, name, ttl, values in records_for( + self, zone, name, lines, arpa=arpa + ): + # remove the zone name + name = zone.hostname_from_fqdn(name) + types[_type][name].extend(values) + # first non-default wins, if we never see anything we'll + # just use the default below + if ttl != self.default_ttl: + ttls[_type][name] = ttl + + return types, ttls def populate(self, zone, target=False, lenient=False): self.log.debug( @@ -112,112 +414,42 @@ class TinyDnsBaseSource(BaseSource): before = len(zone.records) - if zone.name.endswith('in-addr.arpa.'): - self._populate_in_addr_arpa(zone, lenient) - else: - self._populate_normal(zone, lenient) + # This is complicate b/c the mapping between tinydns line types (called + # symbols here) is not one to one with (octoDNS) records. Some lines + # create multiple types of records and multiple lines are often combined + # to make a single record (with multiple values.) Sometimes both happen. + # To deal with this we'll do things in 3 stages: + + # first group lines by their symbol and name + symbols = self._process_lines(zone, self._lines()) + + # then work through those to group values by their _type and name + zone_name = zone.name + arpa = zone_name.endswith('in-addr.arpa.') or zone_name.endswith( + 'ip6.arpa.' + ) + types, ttls = self._process_symbols(zone, symbols, arpa) + + # now we finally have all the values for each (soon to be) record + # collected together, turn them into their coresponding record and add + # it to the zone + for _type, names in types.items(): + for name, values in names.items(): + data = { + 'ttl': ttls[_type].get(name, self.default_ttl), + 'type': _type, + } + if len(values) > 1: + data['values'] = _unique(values) + else: + data['value'] = values[0] + record = Record.new(zone, name, data, lenient=lenient) + zone.add_record(record, lenient=lenient) self.log.info( 'populate: found %s records', len(zone.records) - before ) - def _populate_normal(self, zone, lenient): - type_map = { - '=': 'A', - '^': None, - '.': 'NS', - 'C': 'CNAME', - '+': 'A', - '@': 'MX', - '\'': 'TXT', - '3': 'AAAA', - '6': 'AAAA', - } - name_re = re.compile(fr'((?P.+)\.)?{zone.name[:-1]}\.?$') - - data = defaultdict(lambda: defaultdict(list)) - for line in self._lines(): - _type = line[0] - if _type not in type_map: - # Something we don't care about - continue - _type = type_map[_type] - if not _type: - continue - - # Skip type, remove trailing comments, and omit newline - line = line[1:].split('#', 1)[0] - # Split on :'s including :: and strip leading/trailing ws - line = [p.strip() for p in self.split_re.split(line)] - match = name_re.match(line[0]) - if not match: - continue - name = zone.hostname_from_fqdn(line[0]) - data[name][_type].append(line[1:]) - - for name, types in data.items(): - for _type, d in types.items(): - data_for = getattr(self, f'_data_for_{_type}') - data = data_for(_type, d) - if data: - record = Record.new( - zone, name, data, source=self, lenient=lenient - ) - try: - zone.add_record(record, lenient=lenient) - except SubzoneRecordException: - self.log.debug( - '_populate_normal: skipping subzone record=%s', - record, - ) - - def _populate_in_addr_arpa(self, zone, lenient): - name_re = re.compile(fr'(?P.+)\.{zone.name[:-1]}\.?$') - - for line in self._lines(): - _type = line[0] - # We're only interested in = (A+PTR), and ^ (PTR) records - if _type not in ('=', '^'): - continue - - # Skip type, remove trailing comments, and omit newline - line = line[1:].split('#', 1)[0] - # Split on :'s including :: and strip leading/trailing ws - line = [p.strip() for p in self.split_re.split(line)] - - if line[0].endswith('in-addr.arpa'): - # since it's already in in-addr.arpa format - match = name_re.match(line[0]) - value = line[1] - else: - addr = ip_address(line[1]) - match = name_re.match(addr.reverse_pointer) - value = line[0] - - if match: - try: - ttl = line[2] - except IndexError: - ttl = self.default_ttl - - if value[-1] != '.': - value = f'{value}.' - - name = match.group('name') - record = Record.new( - zone, - name, - {'ttl': ttl, 'type': 'PTR', 'value': value}, - source=self, - lenient=lenient, - ) - try: - zone.add_record(record, lenient=lenient) - except DuplicateRecordException: - self.log.warning( - f'Duplicate PTR record for {addr}, skipping' - ) - class TinyDnsFileSource(TinyDnsBaseSource): ''' @@ -232,6 +464,11 @@ class TinyDnsFileSource(TinyDnsBaseSource): default_ttl: 3600 NOTE: timestamps & lo fields are ignored if present. + + The source intends to conform to and fully support the official spec, + https://cr.yp.to/djbdns/tinydns-data.html and the common patch/extensions to + support IPv6 and a few other record types, + https://docs.bytemark.co.uk/article/tinydns-format/. ''' def __init__(self, id, directory, default_ttl=3600): diff --git a/octodns/zone.py b/octodns/zone.py index c35e94c..9cbc712 100644 --- a/octodns/zone.py +++ b/octodns/zone.py @@ -75,6 +75,32 @@ class Zone(object): # it has utf8 chars return self._utf8_name_re.sub('', fqdn) + def owns(self, _type, fqdn): + if fqdn[-1] != '.': + fqdn = f'{fqdn}.' + + # if we exactly match the zone name we own it + if fqdn == self.name: + return True + + # if we don't end with the zone's name on a boundary we aren't owned + if not fqdn.endswith(f'.{self.name}'): + return False + + hostname = self.hostname_from_fqdn(fqdn) + if hostname in self.sub_zones: + # if our hostname matches a sub-zone exactly we have to be a NS + # record + return _type == 'NS' + + for sub_zone in self.sub_zones: + if hostname.endswith(f'.{sub_zone}'): + # this belongs under a sub-zone + return False + + # otherwise we own it + return True + def add_record(self, record, replace=False, lenient=False): if self._origin: self.hydrate() diff --git a/tests/test_octodns_record.py b/tests/test_octodns_record.py index 88a7dfc..becc802 100644 --- a/tests/test_octodns_record.py +++ b/tests/test_octodns_record.py @@ -8,6 +8,7 @@ from octodns.idna import idna_encode from octodns.record import ( AliasRecord, ARecord, + CnameRecord, Create, Delete, MxValue, @@ -176,6 +177,20 @@ class TestRecord(TestCase): # make sure there's nothing extra self.assertEqual(5, len(records)) + def test_parse_rdata_texts(self): + self.assertEqual(['2.3.4.5'], ARecord.parse_rdata_texts(['2.3.4.5'])) + self.assertEqual( + ['2.3.4.6', '3.4.5.7'], + ARecord.parse_rdata_texts(['2.3.4.6', '3.4.5.7']), + ) + self.assertEqual( + ['some.target.'], CnameRecord.parse_rdata_texts(['some.target.']) + ) + self.assertEqual( + ['some.target.', 'other.target.'], + CnameRecord.parse_rdata_texts(['some.target.', 'other.target.']), + ) + def test_values_mixin_data(self): # no values, no value or values in data a = ARecord(self.zone, '', {'type': 'A', 'ttl': 600, 'values': []}) diff --git a/tests/test_octodns_source_tinydns.py b/tests/test_octodns_source_tinydns.py index 40ce9f5..338c4d6 100644 --- a/tests/test_octodns_source_tinydns.py +++ b/tests/test_octodns_source_tinydns.py @@ -17,7 +17,7 @@ class TestTinyDnsFileSource(TestCase): def test_populate_normal(self): got = Zone('example.com.', []) self.source.populate(got) - self.assertEqual(17, len(got.records)) + self.assertEqual(30, len(got.records)) expected = Zone('example.com.', []) for name, data in ( @@ -26,8 +26,13 @@ class TestTinyDnsFileSource(TestCase): '', { 'type': 'NS', - 'ttl': 3600, - 'values': ['ns1.ns.com.', 'ns2.ns.com.'], + 'ttl': 31, + 'values': [ + 'a.ns.example.com.', + 'b.ns.example.com.', + 'ns1.ns.com.', + 'ns2.ns.com.', + ], }, ), ( @@ -43,6 +48,10 @@ class TestTinyDnsFileSource(TestCase): 'cname', {'type': 'CNAME', 'ttl': 3600, 'value': 'www.example.com.'}, ), + ( + 'cname2', + {'type': 'CNAME', 'ttl': 48, 'value': 'www2.example.com.'}, + ), ( 'some-host-abc123', {'type': 'A', 'ttl': 1800, 'value': '10.2.3.7'}, @@ -61,7 +70,7 @@ class TestTinyDnsFileSource(TestCase): 'exchange': 'smtp-1-host.example.com.', }, { - 'preference': 20, + 'preference': 0, 'exchange': 'smtp-2-host.example.com.', }, ], @@ -75,11 +84,11 @@ class TestTinyDnsFileSource(TestCase): 'values': [ { 'preference': 30, - 'exchange': 'smtp-1-host.example.com.', + 'exchange': 'smtp-3-host.mx.example.com.', }, { 'preference': 40, - 'exchange': 'smtp-2-host.example.com.', + 'exchange': 'smtp-4-host.mx.example.com.', }, ], }, @@ -111,6 +120,83 @@ class TestTinyDnsFileSource(TestCase): 'value': 'v=DKIM1\\; k=rsa\\; p=blah', }, ), + ('b.ns', {'type': 'A', 'ttl': 31, 'value': '43.44.45.46'}), + ('a.ns', {'type': 'A', 'ttl': 3600, 'value': '42.43.44.45'}), + ( + 'smtp-3-host.mx', + {'type': 'A', 'ttl': 1800, 'value': '21.22.23.24'}, + ), + ( + 'smtp-4-host.mx', + {'type': 'A', 'ttl': 1800, 'value': '22.23.24.25'}, + ), + ('ns5.ns', {'type': 'A', 'ttl': 30, 'value': '14.15.16.17'}), + ('ns6.ns', {'type': 'A', 'ttl': 30, 'value': '15.16.17.18'}), + ( + 'other', + { + 'type': 'NS', + 'ttl': 30, + 'values': ['ns5.ns.example.com.', 'ns6.ns.example.com.'], + }, + ), + ( + '_a._tcp', + { + 'type': 'SRV', + 'ttl': 43, + 'values': [ + { + 'priority': 0, + 'weight': 0, + 'port': 8888, + 'target': 'target.srv.example.com.', + }, + { + 'priority': 10, + 'weight': 50, + 'port': 8080, + 'target': 'target.somewhere.else.', + }, + ], + }, + ), + ('target.srv', {'type': 'A', 'ttl': 43, 'value': '56.57.58.59'}), + ( + '_b._tcp', + { + 'type': 'SRV', + 'ttl': 3600, + 'values': [ + { + 'priority': 0, + 'weight': 0, + 'port': 9999, + 'target': 'target.srv.example.com.', + } + ], + }, + ), + ( + 'arbitrary-sshfp', + { + 'type': 'SSHFP', + 'ttl': 45, + 'values': [ + { + 'algorithm': 1, + 'fingerprint_type': 2, + 'fingerprint': '00479b27', + }, + { + 'algorithm': 2, + 'fingerprint_type': 2, + 'fingerprint': '00479a28', + }, + ], + }, + ), + ('arbitrary-a', {'type': 'A', 'ttl': 3600, 'value': '80.81.82.83'}), ): record = Record.new(expected, name, data) expected.add_record(record) @@ -162,7 +248,10 @@ class TestTinyDnsFileSource(TestCase): { 'type': 'PTR', 'ttl': 3600, - 'value': 'has-dup-def123.example.com.', + 'values': [ + 'has-dup-def123.example.com.', + 'has-dup-def456.example.com.', + ], }, ), ( @@ -183,4 +272,5 @@ class TestTinyDnsFileSource(TestCase): def test_ignores_subs(self): got = Zone('example.com.', ['sub']) self.source.populate(got) - self.assertEqual(16, len(got.records)) + # we don't see one www.sub.example.com. record b/c it's in a sub + self.assertEqual(29, len(got.records)) diff --git a/tests/test_octodns_zone.py b/tests/test_octodns_zone.py index 0ee01b0..93d8a87 100644 --- a/tests/test_octodns_zone.py +++ b/tests/test_octodns_zone.py @@ -191,6 +191,31 @@ class TestZone(TestCase): Zone('space not allowed.', []) self.assertTrue('whitespace not allowed' in str(ctx.exception)) + def test_owns(self): + zone = Zone('unit.tests.', set(['sub'])) + + self.assertTrue(zone.owns('A', 'unit.tests')) + self.assertTrue(zone.owns('A', 'unit.tests.')) + self.assertTrue(zone.owns('A', 'www.unit.tests.')) + self.assertTrue(zone.owns('A', 'www.unit.tests.')) + # we do own our direct sub's delegation NS records + self.assertTrue(zone.owns('NS', 'sub.unit.tests.')) + + # we don't own the root of our sub + self.assertFalse(zone.owns('A', 'sub.unit.tests.')) + + # of anything under it + self.assertFalse(zone.owns('A', 'www.sub.unit.tests.')) + + # including subsequent delegatoin NS records + self.assertFalse(zone.owns('NS', 'below.sub.unit.tests.')) + + # edge cases + # we don't own something that ends with our name, but isn't a boundary + self.assertFalse(zone.owns('A', 'foo-unit.tests.')) + # we do something that ends with the sub-zone, but isn't at a boundary + self.assertTrue(zone.owns('A', 'foo-sub.unit.tests.')) + def test_sub_zones(self): # NS for exactly the sub is allowed zone = Zone('unit.tests.', set(['sub', 'barred'])) diff --git a/tests/zones/tinydns/example.com b/tests/zones/tinydns/example.com index a1f983f..3369a28 100755 --- a/tests/zones/tinydns/example.com +++ b/tests/zones/tinydns/example.com @@ -5,6 +5,8 @@ # Multi-value A +example.com:10.2.3.4:30 +example.com.:10.2.3.5:30 +# duplicate value should be ignored ++example.com:10.2.3.4 Ccname.other.foo:www.other.foo @@ -26,21 +28,27 @@ Ccname.other.foo:www.other.foo # MX @example.com::smtp-1-host.example.com:10 -@example.com.::smtp-2-host.example.com:20 -# MX with ttl -@smtp.example.com::smtp-1-host.example.com:30:1800 -@smtp.example.com.::smtp-2-host.example.com:40:1800 +@example.com.::smtp-2-host.example.com. +# MX with ttl and ip +@smtp.example.com:21.22.23.24:smtp-3-host:30:1800 +@smtp.example.com.:22.23.24.25:smtp-4-host:40:1800 -# NS +# NS for sub .sub.example.com::ns3.ns.com:30 -.sub.example.com.::ns4.ns.com:30 +.sub.example.com.::ns4.ns.com.:30 +# NS with ip +.other.example.com:14.15.16.17:ns5:30 +.other.example.com.:15.16.17.18:ns6:30 # A, under sub -+www.sub.example.com::1.2.3.4 ++www.sub.example.com:1.2.3.4 # Top-level NS .example.com::ns1.ns.com .example.com.::ns2.ns.com +# Top-level NS with automatic A +&example.com:42.43.44.45:a +&example.com.:43.44.45.46:b:31 # sub special cases +a1.blah-asdf.subtest.com:10.2.3.5 @@ -55,3 +63,20 @@ Ccname.other.foo:www.other.foo 6ipv6-6.example.com:2a021348017cd5d0002419fffef35743 'semicolon.example.com:v=DKIM1; k=rsa; p=blah:300 + +# SRV +S_a._tcp.example.com:56.57.58.59:target:8888 +S_a._tcp.example.com::target.somewhere.else:8080:10:50:43 +# will try and re-create an already existing A with the same IP, should be a +# noop +S_b._tcp.example.com:56.57.58.59:target.srv.example.com.:9999 +# complete duplicate should be ignored +S_b._tcp.example.com:56.57.58.59:target.srv.example.com.:9999 + +# arbitrary multi-value non-spec record +:arbitrary-sshfp.example.com:SSHFP:2 2 00479a28 +:arbitrary-sshfp.example.com:SSHFP:1 2 00479b27:45 +# does not make sense to do an A this way, but it'll work +:arbitrary-a.example.com:a:80.81.82.83 +# this should just be inored b/c the type is unknown +:arbitrary-invalid.example.com:invalid:does not matter:99 diff --git a/tests/zones/tinydns/other.foo b/tests/zones/tinydns/other.foo index 82e010d..a96dee2 100644 --- a/tests/zones/tinydns/other.foo +++ b/tests/zones/tinydns/other.foo @@ -3,5 +3,6 @@ # CNAME with trailing comment Ccname.example.com:www.example.com # this is a comment +Ccname2.example.com:www2.example.com.:48 +www.other.foo:14.2.3.6