mirror of
				https://github.com/github/octodns.git
				synced 2024-05-11 05:55:00 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			268 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			268 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#
 | 
						|
#
 | 
						|
#
 | 
						|
 | 
						|
from __future__ import (
 | 
						|
    absolute_import,
 | 
						|
    division,
 | 
						|
    print_function,
 | 
						|
    unicode_literals,
 | 
						|
)
 | 
						|
 | 
						|
from collections import defaultdict
 | 
						|
from ipaddress import ip_address
 | 
						|
from os import listdir
 | 
						|
from os.path import join
 | 
						|
import logging
 | 
						|
import re
 | 
						|
import textwrap
 | 
						|
 | 
						|
from ..record import Record
 | 
						|
from ..zone import DuplicateRecordException, SubzoneRecordException
 | 
						|
from .base import BaseSource
 | 
						|
 | 
						|
 | 
						|
class TinyDnsBaseSource(BaseSource):
 | 
						|
    SUPPORTS_GEO = False
 | 
						|
    SUPPORTS_DYNAMIC = False
 | 
						|
    SUPPORTS = set(('A', 'CNAME', 'MX', 'NS', 'TXT', 'AAAA'))
 | 
						|
 | 
						|
    split_re = re.compile(r':+')
 | 
						|
 | 
						|
    def __init__(self, id, default_ttl=3600):
 | 
						|
        super(TinyDnsBaseSource, self).__init__(id)
 | 
						|
        self.default_ttl = default_ttl
 | 
						|
 | 
						|
    def _data_for_A(self, _type, records):
 | 
						|
        values = []
 | 
						|
        for record in records:
 | 
						|
            if record[0] != '0.0.0.0':
 | 
						|
                values.append(record[0])
 | 
						|
        if len(values) == 0:
 | 
						|
            return
 | 
						|
        try:
 | 
						|
            ttl = records[0][1]
 | 
						|
        except IndexError:
 | 
						|
            ttl = self.default_ttl
 | 
						|
        return {'ttl': ttl, 'type': _type, 'values': values}
 | 
						|
 | 
						|
    def _data_for_AAAA(self, _type, records):
 | 
						|
        values = []
 | 
						|
        for record in records:
 | 
						|
            # TinyDNS files have the ipv6 address written in full, but with the
 | 
						|
            # colons removed. This inserts a colon every 4th character to make
 | 
						|
            # the address correct.
 | 
						|
            values.append(u":".join(textwrap.wrap(record[0], 4)))
 | 
						|
        try:
 | 
						|
            ttl = records[0][1]
 | 
						|
        except IndexError:
 | 
						|
            ttl = self.default_ttl
 | 
						|
        return {'ttl': ttl, 'type': _type, 'values': values}
 | 
						|
 | 
						|
    def _data_for_TXT(self, _type, records):
 | 
						|
        values = []
 | 
						|
 | 
						|
        for record in records:
 | 
						|
            new_value = (
 | 
						|
                record[0]
 | 
						|
                .encode('latin1')
 | 
						|
                .decode('unicode-escape')
 | 
						|
                .replace(";", "\\;")
 | 
						|
            )
 | 
						|
            values.append(new_value)
 | 
						|
 | 
						|
        try:
 | 
						|
            ttl = records[0][1]
 | 
						|
        except IndexError:
 | 
						|
            ttl = self.default_ttl
 | 
						|
        return {'ttl': ttl, 'type': _type, 'values': values}
 | 
						|
 | 
						|
    def _data_for_CNAME(self, _type, records):
 | 
						|
        first = records[0]
 | 
						|
        try:
 | 
						|
            ttl = first[1]
 | 
						|
        except IndexError:
 | 
						|
            ttl = self.default_ttl
 | 
						|
        return {'ttl': ttl, 'type': _type, 'value': f'{first[0]}.'}
 | 
						|
 | 
						|
    def _data_for_MX(self, _type, records):
 | 
						|
        try:
 | 
						|
            ttl = records[0][2]
 | 
						|
        except IndexError:
 | 
						|
            ttl = self.default_ttl
 | 
						|
        return {
 | 
						|
            'ttl': ttl,
 | 
						|
            'type': _type,
 | 
						|
            'values': [
 | 
						|
                {'preference': r[1], 'exchange': f'{r[0]}.'} for r in records
 | 
						|
            ],
 | 
						|
        }
 | 
						|
 | 
						|
    def _data_for_NS(self, _type, records):
 | 
						|
        try:
 | 
						|
            ttl = records[0][1]
 | 
						|
        except IndexError:
 | 
						|
            ttl = self.default_ttl
 | 
						|
        return {
 | 
						|
            'ttl': ttl,
 | 
						|
            'type': _type,
 | 
						|
            'values': [f'{r[0]}.' for r in records],
 | 
						|
        }
 | 
						|
 | 
						|
    def populate(self, zone, target=False, lenient=False):
 | 
						|
        self.log.debug(
 | 
						|
            'populate: name=%s, target=%s, lenient=%s',
 | 
						|
            zone.name,
 | 
						|
            target,
 | 
						|
            lenient,
 | 
						|
        )
 | 
						|
 | 
						|
        before = len(zone.records)
 | 
						|
 | 
						|
        if zone.name.endswith('in-addr.arpa.'):
 | 
						|
            self._populate_in_addr_arpa(zone, lenient)
 | 
						|
        else:
 | 
						|
            self._populate_normal(zone, lenient)
 | 
						|
 | 
						|
        self.log.info(
 | 
						|
            'populate:   found %s records', len(zone.records) - before
 | 
						|
        )
 | 
						|
 | 
						|
    def _populate_normal(self, zone, lenient):
 | 
						|
        type_map = {
 | 
						|
            '=': 'A',
 | 
						|
            '^': None,
 | 
						|
            '.': 'NS',
 | 
						|
            'C': 'CNAME',
 | 
						|
            '+': 'A',
 | 
						|
            '@': 'MX',
 | 
						|
            '\'': 'TXT',
 | 
						|
            '3': 'AAAA',
 | 
						|
            '6': 'AAAA',
 | 
						|
        }
 | 
						|
        name_re = re.compile(fr'((?P<name>.+)\.)?{zone.name[:-1]}$')
 | 
						|
 | 
						|
        data = defaultdict(lambda: defaultdict(list))
 | 
						|
        for line in self._lines():
 | 
						|
            _type = line[0]
 | 
						|
            if _type not in type_map:
 | 
						|
                # Something we don't care about
 | 
						|
                continue
 | 
						|
            _type = type_map[_type]
 | 
						|
            if not _type:
 | 
						|
                continue
 | 
						|
 | 
						|
            # Skip type, remove trailing comments, and omit newline
 | 
						|
            line = line[1:].split('#', 1)[0]
 | 
						|
            # Split on :'s including :: and strip leading/trailing ws
 | 
						|
            line = [p.strip() for p in self.split_re.split(line)]
 | 
						|
            match = name_re.match(line[0])
 | 
						|
            if not match:
 | 
						|
                continue
 | 
						|
            name = zone.hostname_from_fqdn(line[0])
 | 
						|
            data[name][_type].append(line[1:])
 | 
						|
 | 
						|
        for name, types in data.items():
 | 
						|
            for _type, d in types.items():
 | 
						|
                data_for = getattr(self, f'_data_for_{_type}')
 | 
						|
                data = data_for(_type, d)
 | 
						|
                if data:
 | 
						|
                    record = Record.new(
 | 
						|
                        zone, name, data, source=self, lenient=lenient
 | 
						|
                    )
 | 
						|
                    try:
 | 
						|
                        zone.add_record(record, lenient=lenient)
 | 
						|
                    except SubzoneRecordException:
 | 
						|
                        self.log.debug(
 | 
						|
                            '_populate_normal: skipping subzone record=%s',
 | 
						|
                            record,
 | 
						|
                        )
 | 
						|
 | 
						|
    def _populate_in_addr_arpa(self, zone, lenient):
 | 
						|
        name_re = re.compile(fr'(?P<name>.+)\.{zone.name[:-1]}$')
 | 
						|
 | 
						|
        for line in self._lines():
 | 
						|
            _type = line[0]
 | 
						|
            # We're only interested in = (A+PTR), and ^ (PTR) records
 | 
						|
            if _type not in ('=', '^'):
 | 
						|
                continue
 | 
						|
 | 
						|
            # Skip type, remove trailing comments, and omit newline
 | 
						|
            line = line[1:].split('#', 1)[0]
 | 
						|
            # Split on :'s including :: and strip leading/trailing ws
 | 
						|
            line = [p.strip() for p in self.split_re.split(line)]
 | 
						|
 | 
						|
            if line[0].endswith('in-addr.arpa'):
 | 
						|
                # since it's already in in-addr.arpa format
 | 
						|
                match = name_re.match(line[0])
 | 
						|
                value = f'{line[1]}.'
 | 
						|
            else:
 | 
						|
                addr = ip_address(line[1])
 | 
						|
                match = name_re.match(addr.reverse_pointer)
 | 
						|
                value = f'{line[0]}.'
 | 
						|
 | 
						|
            if match:
 | 
						|
                try:
 | 
						|
                    ttl = line[2]
 | 
						|
                except IndexError:
 | 
						|
                    ttl = self.default_ttl
 | 
						|
 | 
						|
                name = match.group('name')
 | 
						|
                record = Record.new(
 | 
						|
                    zone,
 | 
						|
                    name,
 | 
						|
                    {'ttl': ttl, 'type': 'PTR', 'value': value},
 | 
						|
                    source=self,
 | 
						|
                    lenient=lenient,
 | 
						|
                )
 | 
						|
                try:
 | 
						|
                    zone.add_record(record, lenient=lenient)
 | 
						|
                except DuplicateRecordException:
 | 
						|
                    self.log.warning(
 | 
						|
                        f'Duplicate PTR record for {addr}, skipping'
 | 
						|
                    )
 | 
						|
 | 
						|
 | 
						|
class TinyDnsFileSource(TinyDnsBaseSource):
 | 
						|
    '''
 | 
						|
    A basic TinyDNS zonefile importer created to import legacy data.
 | 
						|
 | 
						|
    tinydns:
 | 
						|
        class: octodns.source.tinydns.TinyDnsFileSource
 | 
						|
        # The location of the TinyDNS zone files
 | 
						|
        directory: ./zones
 | 
						|
        # The ttl to use for records when not specified in the data
 | 
						|
        # (optional, default 3600)
 | 
						|
        default_ttl: 3600
 | 
						|
 | 
						|
    NOTE: timestamps & lo fields are ignored if present.
 | 
						|
    '''
 | 
						|
 | 
						|
    def __init__(self, id, directory, default_ttl=3600):
 | 
						|
        self.log = logging.getLogger(f'TinyDnsFileSource[{id}]')
 | 
						|
        self.log.debug(
 | 
						|
            '__init__: id=%s, directory=%s, default_ttl=%d',
 | 
						|
            id,
 | 
						|
            directory,
 | 
						|
            default_ttl,
 | 
						|
        )
 | 
						|
        super(TinyDnsFileSource, self).__init__(id, default_ttl)
 | 
						|
        self.directory = directory
 | 
						|
        self._cache = None
 | 
						|
 | 
						|
    def _lines(self):
 | 
						|
        if self._cache is None:
 | 
						|
            # We unfortunately don't know where to look since tinydns stuff can
 | 
						|
            # be defined anywhere so we'll just read all files
 | 
						|
            lines = []
 | 
						|
            for filename in listdir(self.directory):
 | 
						|
                if filename[0] == '.':
 | 
						|
                    # Ignore hidden files
 | 
						|
                    continue
 | 
						|
                with open(join(self.directory, filename), 'r') as fh:
 | 
						|
                    lines += [l for l in fh.read().split('\n') if l]
 | 
						|
 | 
						|
            self._cache = lines
 | 
						|
 | 
						|
        return self._cache
 |