1
0
mirror of https://github.com/github/octodns.git synced 2024-05-11 05:55:00 +00:00

WIP: full tinydns spec-compliant source implementation

This commit is contained in:
Ross McFarland
2023-06-29 15:40:00 -07:00
parent 1032abf558
commit 1bad1e668b
3 changed files with 287 additions and 38 deletions

View File

@@ -9,19 +9,26 @@ from collections import defaultdict
from ipaddress import ip_address
from os import listdir
from os.path import join
from pprint import pprint
from ..record import Record
from ..zone import DuplicateRecordException, SubzoneRecordException
from .base import BaseSource
def _decode_octal(s):
return re.sub(r'\\(\d\d\d)', lambda m: chr(int(m.group(1), 8)), s).replace(
';', '\\;'
)
class TinyDnsBaseSource(BaseSource):
# spec https://cr.yp.to/djbdns/tinydns-data.html
# ipv6 addon spec https://docs.bytemark.co.uk/article/tinydns-format/
SUPPORTS_GEO = False
SUPPORTS_DYNAMIC = False
SUPPORTS = set(('A', 'CNAME', 'MX', 'NS', 'TXT', 'AAAA'))
split_re = re.compile(r':+')
def __init__(self, id, default_ttl=3600):
super().__init__(id)
self.default_ttl = default_ttl
@@ -121,48 +128,282 @@ class TinyDnsBaseSource(BaseSource):
'populate: found %s records', len(zone.records) - before
)
def _records_for_at(self, zone, name, lines, in_addr=False, lenient=False):
# @fqdn:ip:x:dist:ttl:timestamp:lo
# MX (and optional A)
if in_addr:
return []
# see if we can find a ttl on any of the lines, first one wins
ttl = self.default_ttl
for line in lines:
try:
ttl = int(lines[0][4])
break
except IndexError:
pass
values = []
for line in lines:
mx = line[2]
# if there's a . in the mx we hit a special case and use it as-is
if '.' not in mx:
# otherwise we treat it as the MX hostnam and construct the rest
mx = f'{mx}.ns.{zone.name}'
elif mx[-1] != '.':
mx = f'{mx}.'
# default distance is 0
try:
dist = line[3] or 0
except IndexError:
dist = 0
# if we have an IP then we need to create an A for the MX
ip = line[1]
if ip:
mx_name = zone.hostname_from_fqdn(mx)
yield Record.new(
zone, mx_name, {'type': 'A', 'ttl': ttl, 'value': ip}
)
values.append({'preference': dist, 'exchange': mx})
yield Record.new(
zone, name, {'ttl': ttl, 'type': 'MX', 'values': values}
)
def _records_for_C(self, zone, name, lines, in_addr=False, lenient=False):
# Cfqdn:p:ttl:timestamp:lo
# CNAME
if in_addr:
return []
value = lines[0][1]
if value[-1] != '.':
value = f'{value}.'
# see if we can find a ttl on any of the lines, first one wins
ttl = self.default_ttl
for line in lines:
try:
ttl = int(lines[0][2])
break
except IndexError:
pass
return [
Record.new(
zone, name, {'ttl': ttl, 'type': 'CNAME', 'value': value}
)
]
def _records_for_caret(
self, zone, name, lines, in_addr=False, lenient=False
):
# .fqdn:ip:x:ttl:timestamp:lo
# NS (and optional A)
if not in_addr:
return []
raise NotImplementedError()
def _records_for_equal(
self, zone, name, lines, in_addr=False, lenient=False
):
# =fqdn:ip:ttl:timestamp:lo
# A (in_addr False) & PTR (in_addr True)
return self._records_for_plus(
zone, name, lines, in_addr, lenient
) + self._records_for_caret(zone, name, lines, in_addr, lenient)
def _records_for_dot(self, zone, name, lines, in_addr=False, lenient=False):
# .fqdn:ip:x:ttl:timestamp:lo
# NS (and optional A)
if not in_addr:
return []
# see if we can find a ttl on any of the lines, first one wins
ttl = self.default_ttl
for line in lines:
try:
ttl = int(lines[0][3])
break
except IndexError:
pass
values = []
for line in lines:
ns = line[2]
# if there's a . in the ns we hit a special case and use it as-is
if '.' not in ns:
# otherwise we treat it as the NS hostnam and construct the rest
ns = f'{ns}.ns.{zone.name}'
elif ns[-1] != '.':
ns = f'{ns}.'
# if we have an IP then we need to create an A for the MX
ip = line[1]
if ip:
ns_name = zone.hostname_from_fqdn(ns)
yield Record.new(
zone, ns_name, {'type': 'A', 'ttl': ttl, 'value': ip}
)
values.append(ns)
yield Record.new(
zone, name, {'ttl': ttl, 'type': 'NS', 'values': values}
)
_records_for_amp = _records_for_dot
def _records_for_plus(
self, zone, name, lines, in_addr=False, lenient=False
):
# +fqdn:ip:ttl:timestamp:lo
# A
if in_addr:
return []
# collect our ip(s)
ips = [l[1] for l in lines if l[1] != '0.0.0.0']
if not ips:
# we didn't find any value ips so nothing to do
return []
# see if we can find a ttl on any of the lines, first one wins
ttl = self.default_ttl
for line in lines:
try:
ttl = int(lines[0][2])
break
except IndexError:
pass
return [
Record.new(zone, name, {'ttl': ttl, 'type': 'A', 'values': ips})
]
def _records_for_quote(
self, zone, name, lines, in_addr=False, lenient=False
):
# 'fqdn:s:ttl:timestamp:lo
# TXT
if in_addr:
return []
# collect our ip(s)
values = [_decode_octal(l[1]) for l in lines]
# see if we can find a ttl on any of the lines, first one wins
ttl = self.default_ttl
for line in lines:
try:
ttl = int(lines[0][2])
break
except IndexError:
pass
return [
Record.new(
zone, name, {'ttl': ttl, 'type': 'TXT', 'values': values}
)
]
def _records_for_three(
self, zone, name, lines, in_addr=False, lenient=False
):
# 3fqdn:ip:ttl:timestamp:lo
# AAAA
if in_addr:
return []
# collect our ip(s)
ips = []
for line in lines:
# TinyDNS files have the ipv6 address written in full, but with the
# colons removed. This inserts a colon every 4th character to make
# the address correct.
ips.append(u':'.join(textwrap.wrap(line[1], 4)))
# see if we can find a ttl on any of the lines, first one wins
ttl = self.default_ttl
for line in lines:
try:
ttl = int(lines[0][2])
break
except IndexError:
pass
return [
Record.new(zone, name, {'ttl': ttl, 'type': 'AAAA', 'values': ips})
]
def _records_for_six(self, zone, name, lines, in_addr=False, lenient=False):
# 6fqdn:ip:ttl:timestamp:lo
# AAAA (in_addr False) & PTR (in_addr True)
return self._records_for_three(
zone, name, lines, in_addr, lenient
) + self._records_for_caret(zone, name, lines, in_addr, lenient)
TYPE_MAP = {
'=': _records_for_equal, # A
'^': _records_for_caret, # PTR
'.': _records_for_dot, # NS
'C': _records_for_C, # CNAME
'+': _records_for_plus, # A
'@': _records_for_at, # MX
'&': _records_for_amp, # NS
'\'': _records_for_quote, # TXT
'3': _records_for_three, # AAAA
'6': _records_for_six, # AAAA
# TODO:
#'S': _records_for_S, # SRV
# Sfqdn:ip:x:port:priority:weight:ttl:timestamp:lo
#':': _record_for_semicolon # arbitrary
# :fqdn:n:rdata:ttl:timestamp:lo
}
def _populate_normal(self, zone, lenient):
type_map = {
'=': 'A',
'^': None,
'.': 'NS',
'C': 'CNAME',
'+': 'A',
'@': 'MX',
'\'': 'TXT',
'3': 'AAAA',
'6': 'AAAA',
}
name_re = re.compile(fr'((?P<name>.+)\.)?{zone.name[:-1]}\.?$')
data = defaultdict(lambda: defaultdict(list))
for line in self._lines():
_type = line[0]
if _type not in type_map:
# Something we don't care about
continue
_type = type_map[_type]
if not _type:
continue
# Skip type, remove trailing comments, and omit newline
line = line[1:].split('#', 1)[0]
# Split on :'s including :: and strip leading/trailing ws
line = [p.strip() for p in self.split_re.split(line)]
match = name_re.match(line[0])
if not match:
line = [p.strip() for p in line.split(':')]
# make sure the name portion matches the zone we're currently
# working on
name = line[0]
if not name_re.match(name):
self.log.info('skipping name %s, not a match, %s: ', name, line)
continue
name = zone.hostname_from_fqdn(line[0])
data[name][_type].append(line[1:])
# remove the zone name
name = zone.hostname_from_fqdn(name)
data[_type][name].append(line)
for name, types in data.items():
for _type, d in types.items():
data_for = getattr(self, f'_data_for_{_type}')
data = data_for(_type, d)
if data:
record = Record.new(
zone, name, data, source=self, lenient=lenient
)
pprint(data)
for _type, names in data.items():
records_for = self.TYPE_MAP.get(_type, None)
if _type not in self.TYPE_MAP:
# Something we don't care about
self.log.info(
'skipping type %s, not supported/interested', _type
)
continue
print(_type)
for name, lines in names.items():
for record in records_for(
self, zone, name, lines, lenient=lenient
):
pprint({'record': record})
try:
zone.add_record(record, lenient=lenient)
except SubzoneRecordException:
@@ -177,7 +418,7 @@ class TinyDnsBaseSource(BaseSource):
for line in self._lines():
_type = line[0]
# We're only interested in = (A+PTR), and ^ (PTR) records
if _type not in ('=', '^'):
if _type not in ('=', '^', '&'):
continue
# Skip type, remove trailing comments, and omit newline