diff --git a/CHANGELOG.md b/CHANGELOG.md index 8db2550..a2b8052 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ * [GcoreProvider](https://github.com/octodns/octodns-gcore/) * [GoogleCloudProvider](https://github.com/octodns/octodns-googlecloud/) * [HetznerProvider](https://github.com/octodns/octodns-hetzner/) + * [MythicBeastsProvider](https://github.com/octodns/octodns-mythicbeasts/) * [Ns1Provider](https://github.com/octodns/octodns-ns1/) * [PowerDnsProvider](https://github.com/octodns/octodns-powerdns/) * [Route53Provider](https://github.com/octodns/octodns-route53/) also diff --git a/README.md b/README.md index d8b3060..476d705 100644 --- a/README.md +++ b/README.md @@ -207,7 +207,7 @@ The table below lists the providers octoDNS supports. We're currently in the pro | [GCoreProvider](https://github.com/octodns/octodns-gcore/) | [octodns_gcore](https://github.com/octodns/octodns-gcore/) | | | | | | [GoogleCloudProvider](https://github.com/octodns/octodns-googlecloud/) | [octodns_googlecloud](https://github.com/octodns/octodns-googlecloud/) | | | | | | [HetznerProvider](https://github.com/octodns/octodns-hetzner/) | [octodns_hetzner](https://github.com/octodns/octodns-hetzner/) | | | | | -| [MythicBeastsProvider](/octodns/provider/mythicbeasts.py) | | Mythic Beasts | A, AAAA, ALIAS, CNAME, MX, NS, SRV, SSHFP, CAA, TXT | No | | +| [MythicBeastsProvider](https://github.com/octodns/octodns-mythicbeasts/) | [octodns_mythicbeasts](https://github.com/octodns/octodns-mythicbeasts/) | | | | | | [Ns1Provider](https://github.com/octodns/octodns-ns1/) | [octodns_ns1](https://github.com/octodns/octodns-ns1/) | | | | | | [OVH](/octodns/provider/ovh.py) | | ovh | A, AAAA, CAA, CNAME, MX, NAPTR, NS, PTR, SPF, SRV, SSHFP, TXT, DKIM | No | | | [PowerDnsProvider](https://github.com/octodns/octodns-powerdns/) | [octodns_powerdns](https://github.com/octodns/octodns-powerdns/) | | | | | diff --git a/octodns/provider/mythicbeasts.py b/octodns/provider/mythicbeasts.py index 7cc820e..09e73a4 100644 --- a/octodns/provider/mythicbeasts.py +++ b/octodns/provider/mythicbeasts.py @@ -5,465 +5,18 @@ from __future__ import absolute_import, division, print_function, \ unicode_literals -import re - -from requests import Session from logging import getLogger -from ..record import Record -from . import ProviderException -from .base import BaseProvider - -from collections import defaultdict - - -def add_trailing_dot(value): - ''' - Add trailing dots to values - ''' - assert value, 'Missing value' - assert value[-1] != '.', 'Value already has trailing dot' - return value + '.' - - -def remove_trailing_dot(value): - ''' - Remove trailing dots from values - ''' - assert value, 'Missing value' - assert value[-1] == '.', 'Value already missing trailing dot' - return value[:-1] - - -class MythicBeastsUnauthorizedException(ProviderException): - def __init__(self, zone, *args): - self.zone = zone - self.message = f'Mythic Beasts unauthorized for zone: {self.zone}' - - super(MythicBeastsUnauthorizedException, self).__init__( - self.message, self.zone, *args) - - -class MythicBeastsRecordException(ProviderException): - def __init__(self, zone, command, *args): - self.zone = zone - self.command = command - self.message = 'Mythic Beasts could not action command: ' \ - f'{self.zone} {self.command}' - - super(MythicBeastsRecordException, self).__init__( - self.message, self.zone, self.command, *args) - - -class MythicBeastsProvider(BaseProvider): - ''' - Mythic Beasts DNS API Provider - - Config settings: - - --- - providers: - config: - ... - mythicbeasts: - class: octodns.provider.mythicbeasts.MythicBeastsProvider - passwords: - my.domain.: 'DNS API v1 password' - - zones: - my.domain.: - targets: - - mythicbeasts - ''' - - RE_MX = re.compile(r'^(?P[0-9]+)\s+(?P\S+)$', - re.IGNORECASE) - - RE_SRV = re.compile(r'^(?P[0-9]+)\s+(?P[0-9]+)\s+' - r'(?P[0-9]+)\s+(?P\S+)$', - re.IGNORECASE) - - RE_SSHFP = re.compile(r'^(?P[0-9]+)\s+' - r'(?P[0-9]+)\s+' - r'(?P\S+)$', - re.IGNORECASE) - - RE_CAA = re.compile(r'^(?P[0-9]+)\s+' - r'(?Pissue|issuewild|iodef)\s+' - r'(?P\S+)$', - re.IGNORECASE) - - RE_POPLINE = re.compile(r'^(?P\S+)\s+(?P\d+)\s+' - r'(?P\S+)\s+(?P.*)$', - re.IGNORECASE) - - SUPPORTS_GEO = False - SUPPORTS_DYNAMIC = False - SUPPORTS = set(('A', 'AAAA', 'ALIAS', 'CNAME', 'MX', 'NS', - 'SRV', 'SSHFP', 'CAA', 'TXT')) - BASE = 'https://dnsapi.mythic-beasts.com/' - - def __init__(self, identifier, passwords, *args, **kwargs): - self.log = getLogger(f'MythicBeastsProvider[{identifier}]') - - assert isinstance(passwords, dict), 'Passwords must be a dictionary' - - self.log.debug( - '__init__: id=%s, registered zones; %s', - identifier, - passwords.keys()) - super(MythicBeastsProvider, self).__init__(identifier, *args, **kwargs) - - self._passwords = passwords - sess = Session() - self._sess = sess - - def _request(self, method, path, data=None): - self.log.debug('_request: method=%s, path=%s data=%s', - method, path, data) - - resp = self._sess.request(method, path, data=data) - self.log.debug( - '_request: status=%d data=%s', - resp.status_code, - resp.text[:20]) - - if resp.status_code == 401: - raise MythicBeastsUnauthorizedException(data['domain']) - - if resp.status_code == 400: - raise MythicBeastsRecordException( - data['domain'], - data['command'] - ) - resp.raise_for_status() - return resp - - def _post(self, data=None): - return self._request('POST', self.BASE, data=data) - - def records(self, zone): - domain = remove_trailing_dot(zone) - assert zone in self._passwords, \ - f'Missing password for domain: {domain}' - - return self._post({ - 'domain': domain, - 'password': self._passwords[zone], - 'showall': 0, - 'command': 'LIST', - }) - - @staticmethod - def _data_for_single(_type, data): - return { - 'type': _type, - 'value': data['raw_values'][0]['value'], - 'ttl': data['raw_values'][0]['ttl'] - } - - @staticmethod - def _data_for_multiple(_type, data): - return { - 'type': _type, - 'values': - [raw_values['value'] for raw_values in data['raw_values']], - 'ttl': - max([raw_values['ttl'] for raw_values in data['raw_values']]), - } - - @staticmethod - def _data_for_TXT(_type, data): - return { - 'type': _type, - 'values': - [ - str(raw_values['value']).replace(';', '\\;') - for raw_values in data['raw_values'] - ], - 'ttl': - max([raw_values['ttl'] for raw_values in data['raw_values']]), - } - - @staticmethod - def _data_for_MX(_type, data): - ttl = max([raw_values['ttl'] for raw_values in data['raw_values']]) - values = [] - - for raw_value in \ - [raw_values['value'] for raw_values in data['raw_values']]: - match = MythicBeastsProvider.RE_MX.match(raw_value) - - assert match is not None, 'Unable to parse MX data' - - exchange = match.group('exchange') - - if not exchange.endswith('.'): - exchange = f'{exchange}.{data["zone"]}' - - values.append({ - 'preference': match.group('preference'), - 'exchange': exchange, - }) - - return { - 'type': _type, - 'values': values, - 'ttl': ttl, - } - - @staticmethod - def _data_for_CNAME(_type, data): - ttl = data['raw_values'][0]['ttl'] - value = data['raw_values'][0]['value'] - if not value.endswith('.'): - value = f'{value}.{data["zone"]}' - - return MythicBeastsProvider._data_for_single( - _type, - {'raw_values': [ - {'value': value, 'ttl': ttl} - ]}) - - @staticmethod - def _data_for_ANAME(_type, data): - ttl = data['raw_values'][0]['ttl'] - value = data['raw_values'][0]['value'] - return MythicBeastsProvider._data_for_single( - 'ALIAS', - {'raw_values': [ - {'value': value, 'ttl': ttl} - ]}) - - @staticmethod - def _data_for_SRV(_type, data): - ttl = max([raw_values['ttl'] for raw_values in data['raw_values']]) - values = [] - - for raw_value in \ - [raw_values['value'] for raw_values in data['raw_values']]: - - match = MythicBeastsProvider.RE_SRV.match(raw_value) - - assert match is not None, 'Unable to parse SRV data' - - target = match.group('target') - if not target.endswith('.'): - target = f'{target}.{data["zone"]}' - - values.append({ - 'priority': match.group('priority'), - 'weight': match.group('weight'), - 'port': match.group('port'), - 'target': target, - }) - - return { - 'type': _type, - 'values': values, - 'ttl': ttl, - } - - @staticmethod - def _data_for_SSHFP(_type, data): - ttl = max([raw_values['ttl'] for raw_values in data['raw_values']]) - values = [] - - for raw_value in \ - [raw_values['value'] for raw_values in data['raw_values']]: - match = MythicBeastsProvider.RE_SSHFP.match(raw_value) - - assert match is not None, 'Unable to parse SSHFP data' - - values.append({ - 'algorithm': match.group('algorithm'), - 'fingerprint_type': match.group('fingerprint_type'), - 'fingerprint': match.group('fingerprint'), - }) - - return { - 'type': _type, - 'values': values, - 'ttl': ttl, - } - - @staticmethod - def _data_for_CAA(_type, data): - ttl = data['raw_values'][0]['ttl'] - raw_value = data['raw_values'][0]['value'] - - match = MythicBeastsProvider.RE_CAA.match(raw_value) - - assert match is not None, 'Unable to parse CAA data' - - value = { - 'flags': match.group('flags'), - 'tag': match.group('tag'), - 'value': match.group('value'), - } - - return MythicBeastsProvider._data_for_single( - 'CAA', - {'raw_values': [{'value': value, 'ttl': ttl}]}) - - _data_for_NS = _data_for_multiple - _data_for_A = _data_for_multiple - _data_for_AAAA = _data_for_multiple - - def populate(self, zone, target=False, lenient=False): - self.log.debug('populate: name=%s, target=%s, lenient=%s', zone.name, - target, lenient) - - resp = self.records(zone.name) - - before = len(zone.records) - exists = False - data = defaultdict(lambda: defaultdict(lambda: { - 'raw_values': [], - 'name': None, - 'zone': None, - })) - - exists = True - for line in resp.content.splitlines(): - match = MythicBeastsProvider.RE_POPLINE.match(line.decode("utf-8")) - - if match is None: - self.log.debug('failed to match line: %s', line) - continue - - if match.group(1) == '@': - _name = '' - else: - _name = match.group('name') - - _type = match.group('type') - _ttl = int(match.group('ttl')) - _value = match.group('value').strip() - - if hasattr(self, f'_data_for_{_type}'): - if _name not in data[_type]: - data[_type][_name] = { - 'raw_values': [{'value': _value, 'ttl': _ttl}], - 'name': _name, - 'zone': zone.name, - } - - else: - data[_type][_name].get('raw_values').append( - {'value': _value, 'ttl': _ttl} - ) - else: - self.log.debug('skipping %s as not supported', _type) - - for _type in data: - for _name in data[_type]: - data_for = getattr(self, f'_data_for_{_type}') - - record = Record.new( - zone, - _name, - data_for(_type, data[_type][_name]), - source=self - ) - zone.add_record(record, lenient=lenient) - - self.log.debug('populate: found %s records, exists=%s', - len(zone.records) - before, exists) - - return exists - - def _compile_commands(self, action, record): - commands = [] - - hostname = remove_trailing_dot(record.fqdn) - ttl = record.ttl - _type = record._type - - if _type == 'ALIAS': - _type = 'ANAME' - - if hasattr(record, 'values'): - values = record.values - else: - values = [record.value] - - base = f'{action} {hostname} {ttl} {_type}' - - # Unescape TXT records - if _type == 'TXT': - values = [value.replace('\\;', ';') for value in values] - - # Handle specific types or default - if _type == 'SSHFP': - data = values[0].data - algorithm = data['algorithm'] - fingerprint_type = data['fingerprint_type'] - fingerprint = data['fingerprint'] - commands.append(f'{base} {algorithm} {fingerprint_type} ' - f'{fingerprint}') - - elif _type == 'SRV': - for value in values: - data = value.data - priority = data['priority'] - weight = data['weight'] - port = data['port'] - target = data['target'] - commands.append(f'{base} {priority} {weight} {port} {target}') - - elif _type == 'MX': - for value in values: - data = value.data - preference = data['preference'] - exchange = data['exchange'] - commands.append(f'{base} {preference} {exchange}') - - else: - if hasattr(self, f'_data_for_{_type}'): - for value in values: - commands.append(f'{base} {value}') - else: - self.log.debug('skipping %s as not supported', _type) - - return commands - - def _apply_Create(self, change): - zone = change.new.zone - commands = self._compile_commands('ADD', change.new) - - for command in commands: - self._post({ - 'domain': remove_trailing_dot(zone.name), - 'origin': '.', - 'password': self._passwords[zone.name], - 'command': command, - }) - return True - - def _apply_Update(self, change): - self._apply_Delete(change) - self._apply_Create(change) - - def _apply_Delete(self, change): - zone = change.existing.zone - commands = self._compile_commands('DELETE', change.existing) - - for command in commands: - self._post({ - 'domain': remove_trailing_dot(zone.name), - 'origin': '.', - 'password': self._passwords[zone.name], - 'command': command, - }) - return True - - def _apply(self, plan): - desired = plan.desired - changes = plan.changes - self.log.debug('_apply: zone=%s, len(changes)=%d', desired.name, - len(changes)) - - for change in changes: - class_name = change.__class__.__name__ - getattr(self, f'_apply_{class_name}')(change) +logger = getLogger('MythicBeasts') +try: + logger.warn('octodns_mythicbeasts shimmed. Update your provider class to ' + 'octodns_mythicbeasts.MythicBeastsProvider. ' + 'Shim will be removed in 1.0') + from octodns_mythicbeasts import MythicBeastsProvider + MythicBeastsProvider # pragma: no cover +except ModuleNotFoundError: + logger.exception('MythicBeastsProvider has been moved into a seperate ' + 'module, octodns_mythicbeasts is now required. Provider ' + 'class should be updated to ' + 'octodns_mythicbeasts.MythicBeastsProvider') + raise diff --git a/tests/fixtures/mythicbeasts-list.txt b/tests/fixtures/mythicbeasts-list.txt deleted file mode 100644 index 006a8ff..0000000 --- a/tests/fixtures/mythicbeasts-list.txt +++ /dev/null @@ -1,27 +0,0 @@ -@ 3600 NS 6.2.3.4. -@ 3600 NS 7.2.3.4. -@ 300 A 1.2.3.4 -@ 300 A 1.2.3.5 -@ 3600 SSHFP 1 1 bf6b6825d2977c511a475bbefb88aad54a92ac73 -@ 3600 SSHFP 1 1 7491973e5f8b39d5327cd4e08bc81b05f7710b49 -@ 3600 CAA 0 issue ca.unit.tests -_imap._tcp 600 SRV 0 0 0 . -_pop3._tcp 600 SRV 0 0 0 . -_srv._tcp 600 SRV 10 20 30 foo-1.unit.tests. -_srv._tcp 600 SRV 12 20 30 foo-2.unit.tests. -aaaa 600 AAAA 2601:644:500:e210:62f8:1dff:feb8:947a -cname 300 CNAME unit.tests. -excluded 300 CNAME unit.tests. -ignored 300 A 9.9.9.9 -included 3600 CNAME unit.tests. -mx 300 MX 10 smtp-4.unit.tests. -mx 300 MX 20 smtp-2.unit.tests. -mx 300 MX 30 smtp-3.unit.tests. -mx 300 MX 40 smtp-1.unit.tests. -sub 3600 NS 6.2.3.4. -sub 3600 NS 7.2.3.4. -txt 600 TXT "Bah bah black sheep" -txt 600 TXT "have you any wool." -txt 600 TXT "v=DKIM1;k=rsa;s=email;h=sha256;p=A/kinda+of/long/string+with+numb3rs" -www 300 A 2.2.3.6 -www.sub 300 A 2.2.3.6 diff --git a/tests/test_octodns_provider_mythicbeasts.py b/tests/test_octodns_provider_mythicbeasts.py index 886c828..f8c316c 100644 --- a/tests/test_octodns_provider_mythicbeasts.py +++ b/tests/test_octodns_provider_mythicbeasts.py @@ -5,440 +5,12 @@ from __future__ import absolute_import, division, print_function, \ unicode_literals -from os.path import dirname, join - -from requests_mock import ANY, mock as requests_mock from unittest import TestCase -from octodns.provider.mythicbeasts import MythicBeastsProvider, \ - add_trailing_dot, remove_trailing_dot -from octodns.provider.yaml import YamlProvider -from octodns.zone import Zone -from octodns.record import Create, Update, Delete, Record +class TestMythicBeastsShim(TestCase): -class TestMythicBeastsProvider(TestCase): - expected = Zone('unit.tests.', []) - source = YamlProvider('test_expected', join(dirname(__file__), 'config')) - source.populate(expected) - - # Dump anything we don't support from expected - for record in list(expected.records): - if record._type not in MythicBeastsProvider.SUPPORTS: - expected._remove_record(record) - - def test_trailing_dot(self): - with self.assertRaises(AssertionError) as err: - add_trailing_dot('unit.tests.') - self.assertEquals('Value already has trailing dot', - str(err.exception)) - - with self.assertRaises(AssertionError) as err: - remove_trailing_dot('unit.tests') - self.assertEquals('Value already missing trailing dot', - str(err.exception)) - - self.assertEquals(add_trailing_dot('unit.tests'), 'unit.tests.') - self.assertEquals(remove_trailing_dot('unit.tests.'), 'unit.tests') - - def test_data_for_single(self): - test_data = { - 'raw_values': [{'value': 'a:a::c', 'ttl': 0}], - 'zone': 'unit.tests.', - } - test_single = MythicBeastsProvider._data_for_single('', test_data) - self.assertTrue(isinstance(test_single, dict)) - self.assertEquals('a:a::c', test_single['value']) - - def test_data_for_multiple(self): - test_data = { - 'raw_values': [ - {'value': 'b:b::d', 'ttl': 60}, - {'value': 'a:a::c', 'ttl': 60}], - 'zone': 'unit.tests.', - } - test_multiple = MythicBeastsProvider._data_for_multiple('', test_data) - self.assertTrue(isinstance(test_multiple, dict)) - self.assertEquals(2, len(test_multiple['values'])) - - def test_data_for_txt(self): - test_data = { - 'raw_values': [ - {'value': 'v=DKIM1; k=rsa; p=prawf', 'ttl': 60}, - {'value': 'prawf prawf dyma prawf', 'ttl': 300}], - 'zone': 'unit.tests.', - } - test_txt = MythicBeastsProvider._data_for_TXT('', test_data) - self.assertTrue(isinstance(test_txt, dict)) - self.assertEquals(2, len(test_txt['values'])) - self.assertEquals('v=DKIM1\\; k=rsa\\; p=prawf', test_txt['values'][0]) - - def test_data_for_MX(self): - test_data = { - 'raw_values': [ - {'value': '10 un.unit', 'ttl': 60}, - {'value': '20 dau.unit', 'ttl': 60}, - {'value': '30 tri.unit', 'ttl': 60}], - 'zone': 'unit.tests.', - } - test_MX = MythicBeastsProvider._data_for_MX('', test_data) - self.assertTrue(isinstance(test_MX, dict)) - self.assertEquals(3, len(test_MX['values'])) - - with self.assertRaises(AssertionError) as err: - test_MX = MythicBeastsProvider._data_for_MX( - '', - {'raw_values': [{'value': '', 'ttl': 0}]} - ) - self.assertEquals('Unable to parse MX data', str(err.exception)) - - def test_data_for_CNAME(self): - test_data = { - 'raw_values': [{'value': 'cname', 'ttl': 60}], - 'zone': 'unit.tests.', - } - test_cname = MythicBeastsProvider._data_for_CNAME('', test_data) - self.assertTrue(isinstance(test_cname, dict)) - self.assertEquals('cname.unit.tests.', test_cname['value']) - - def test_data_for_ANAME(self): - test_data = { - 'raw_values': [{'value': 'aname', 'ttl': 60}], - 'zone': 'unit.tests.', - } - test_aname = MythicBeastsProvider._data_for_ANAME('', test_data) - self.assertTrue(isinstance(test_aname, dict)) - self.assertEquals('aname', test_aname['value']) - - def test_data_for_SRV(self): - test_data = { - 'raw_values': [ - {'value': '10 20 30 un.srv.unit', 'ttl': 60}, - {'value': '20 30 40 dau.srv.unit', 'ttl': 60}, - {'value': '30 30 50 tri.srv.unit', 'ttl': 60}], - 'zone': 'unit.tests.', - } - test_SRV = MythicBeastsProvider._data_for_SRV('', test_data) - self.assertTrue(isinstance(test_SRV, dict)) - self.assertEquals(3, len(test_SRV['values'])) - - with self.assertRaises(AssertionError) as err: - test_SRV = MythicBeastsProvider._data_for_SRV( - '', - {'raw_values': [{'value': '', 'ttl': 0}]} - ) - self.assertEquals('Unable to parse SRV data', str(err.exception)) - - def test_data_for_SSHFP(self): - test_data = { - 'raw_values': [ - {'value': '1 1 0123456789abcdef', 'ttl': 60}, - {'value': '1 2 0123456789abcdef', 'ttl': 60}, - {'value': '2 3 0123456789abcdef', 'ttl': 60}], - 'zone': 'unit.tests.', - } - test_SSHFP = MythicBeastsProvider._data_for_SSHFP('', test_data) - self.assertTrue(isinstance(test_SSHFP, dict)) - self.assertEquals(3, len(test_SSHFP['values'])) - - with self.assertRaises(AssertionError) as err: - test_SSHFP = MythicBeastsProvider._data_for_SSHFP( - '', - {'raw_values': [{'value': '', 'ttl': 0}]} - ) - self.assertEquals('Unable to parse SSHFP data', str(err.exception)) - - def test_data_for_CAA(self): - test_data = { - 'raw_values': [{'value': '1 issue letsencrypt.org', 'ttl': 60}], - 'zone': 'unit.tests.', - } - test_CAA = MythicBeastsProvider._data_for_CAA('', test_data) - self.assertTrue(isinstance(test_CAA, dict)) - self.assertEquals(3, len(test_CAA['value'])) - - with self.assertRaises(AssertionError) as err: - test_CAA = MythicBeastsProvider._data_for_CAA( - '', - {'raw_values': [{'value': '', 'ttl': 0}]} - ) - self.assertEquals('Unable to parse CAA data', str(err.exception)) - - def test_command_generation(self): - zone = Zone('unit.tests.', []) - zone.add_record(Record.new(zone, '', { - 'ttl': 60, - 'type': 'ALIAS', - 'value': 'alias.unit.tests.', - })) - zone.add_record(Record.new(zone, 'prawf-ns', { - 'ttl': 300, - 'type': 'NS', - 'values': [ - 'alias.unit.tests.', - 'alias2.unit.tests.', - ], - })) - zone.add_record(Record.new(zone, 'prawf-a', { - 'ttl': 60, - 'type': 'A', - 'values': [ - '1.2.3.4', - '5.6.7.8', - ], - })) - zone.add_record(Record.new(zone, 'prawf-aaaa', { - 'ttl': 60, - 'type': 'AAAA', - 'values': [ - 'a:a::a', - 'b:b::b', - 'c:c::c:c', - ], - })) - zone.add_record(Record.new(zone, 'prawf-txt', { - 'ttl': 60, - 'type': 'TXT', - 'value': 'prawf prawf dyma prawf', - })) - zone.add_record(Record.new(zone, 'prawf-txt2', { - 'ttl': 60, - 'type': 'TXT', - 'value': 'v=DKIM1\\; k=rsa\\; p=prawf', - })) - with requests_mock() as mock: - mock.post(ANY, status_code=200, text='') - - provider = MythicBeastsProvider('test', { - 'unit.tests.': 'mypassword' - }) - - plan = provider.plan(zone) - changes = plan.changes - generated_commands = [] - - for change in changes: - generated_commands.extend( - provider._compile_commands('ADD', change.new) - ) - - expected_commands = [ - 'ADD unit.tests 60 ANAME alias.unit.tests.', - 'ADD prawf-ns.unit.tests 300 NS alias.unit.tests.', - 'ADD prawf-ns.unit.tests 300 NS alias2.unit.tests.', - 'ADD prawf-a.unit.tests 60 A 1.2.3.4', - 'ADD prawf-a.unit.tests 60 A 5.6.7.8', - 'ADD prawf-aaaa.unit.tests 60 AAAA a:a::a', - 'ADD prawf-aaaa.unit.tests 60 AAAA b:b::b', - 'ADD prawf-aaaa.unit.tests 60 AAAA c:c::c:c', - 'ADD prawf-txt.unit.tests 60 TXT prawf prawf dyma prawf', - 'ADD prawf-txt2.unit.tests 60 TXT v=DKIM1; k=rsa; p=prawf', - ] - - generated_commands.sort() - expected_commands.sort() - - self.assertEquals( - generated_commands, - expected_commands - ) - - # Now test deletion - existing = 'prawf-txt 300 TXT prawf prawf dyma prawf\n' \ - 'prawf-txt2 300 TXT v=DKIM1; k=rsa; p=prawf\n' \ - 'prawf-a 60 A 1.2.3.4' - - with requests_mock() as mock: - mock.post(ANY, status_code=200, text=existing) - wanted = Zone('unit.tests.', []) - - plan = provider.plan(wanted) - changes = plan.changes - generated_commands = [] - - for change in changes: - generated_commands.extend( - provider._compile_commands('DELETE', change.existing) - ) - - expected_commands = [ - 'DELETE prawf-a.unit.tests 60 A 1.2.3.4', - 'DELETE prawf-txt.unit.tests 300 TXT prawf prawf dyma prawf', - 'DELETE prawf-txt2.unit.tests 300 TXT v=DKIM1; k=rsa; p=prawf', - ] - - generated_commands.sort() - expected_commands.sort() - - self.assertEquals( - generated_commands, - expected_commands - ) - - def test_fake_command_generation(self): - class FakeChangeRecord(object): - def __init__(self): - self.__fqdn = 'prawf.unit.tests.' - self._type = 'NOOP' - self.value = 'prawf' - self.ttl = 60 - - @property - def record(self): - return self - - @property - def fqdn(self): - return self.__fqdn - - with requests_mock() as mock: - mock.post(ANY, status_code=200, text='') - - provider = MythicBeastsProvider('test', { - 'unit.tests.': 'mypassword' - }) - record = FakeChangeRecord() - command = provider._compile_commands('ADD', record) - self.assertEquals([], command) - - def test_populate(self): - provider = None - - # Null passwords dict - with self.assertRaises(AssertionError) as err: - provider = MythicBeastsProvider('test', None) - self.assertEquals('Passwords must be a dictionary', str(err.exception)) - - # Missing password - with requests_mock() as mock: - mock.post(ANY, status_code=401, text='ERR Not authenticated') - - with self.assertRaises(AssertionError) as err: - provider = MythicBeastsProvider('test', dict()) - zone = Zone('unit.tests.', []) - provider.populate(zone) - self.assertEquals('Missing password for domain: unit.tests', - str(err.exception)) - - # Failed authentication - with requests_mock() as mock: - mock.post(ANY, status_code=401, text='ERR Not authenticated') - - with self.assertRaises(Exception) as err: - provider = MythicBeastsProvider('test', { - 'unit.tests.': 'mypassword' - }) - zone = Zone('unit.tests.', []) - provider.populate(zone) - self.assertEquals( - 'Mythic Beasts unauthorized for zone: unit.tests', - err.exception.message) - - # Check unmatched lines are ignored - test_data = 'This should not match' - with requests_mock() as mock: - mock.post(ANY, status_code=200, text=test_data) - - provider = MythicBeastsProvider('test', { - 'unit.tests.': 'mypassword' - }) - zone = Zone('unit.tests.', []) - provider.populate(zone) - self.assertEquals(0, len(zone.records)) - - # Check unsupported records are skipped - test_data = '@ 60 NOOP prawf\n@ 60 SPF prawf prawf prawf' - with requests_mock() as mock: - mock.post(ANY, status_code=200, text=test_data) - - provider = MythicBeastsProvider('test', { - 'unit.tests.': 'mypassword' - }) - zone = Zone('unit.tests.', []) - provider.populate(zone) - self.assertEquals(0, len(zone.records)) - - # Check no changes between what we support and what's parsed - # from the unit.tests. config YAML. Also make sure we see the same - # for both after we've thrown away records we don't support - with requests_mock() as mock: - with open('tests/fixtures/mythicbeasts-list.txt') as file_handle: - mock.post(ANY, status_code=200, text=file_handle.read()) - - provider = MythicBeastsProvider('test', { - 'unit.tests.': 'mypassword' - }) - zone = Zone('unit.tests.', []) - provider.populate(zone) - - self.assertEquals(17, len(zone.records)) - self.assertEquals(17, len(self.expected.records)) - changes = self.expected.changes(zone, provider) - self.assertEquals(0, len(changes)) - - def test_apply(self): - provider = MythicBeastsProvider('test', { - 'unit.tests.': 'mypassword' - }) - zone = Zone('unit.tests.', []) - - # Create blank zone - with requests_mock() as mock: - mock.post(ANY, status_code=200, text='') - provider.populate(zone) - - self.assertEquals(0, len(zone.records)) - - # Record change failed - with requests_mock() as mock: - mock.post(ANY, status_code=200, text='') - provider.populate(zone) - zone.add_record(Record.new(zone, 'prawf', { - 'ttl': 300, - 'type': 'TXT', - 'value': 'prawf', - })) - plan = provider.plan(zone) - - with requests_mock() as mock: - mock.post(ANY, status_code=400, text='NADD 300 TXT prawf') - - with self.assertRaises(Exception) as err: - provider.apply(plan) - self.assertEquals( - 'Mythic Beasts could not action command: unit.tests ' - 'ADD prawf.unit.tests 300 TXT prawf', err.exception.message) - - # Check deleting and adding/changing test record - existing = 'prawf 300 TXT prawf prawf prawf\ndileu 300 TXT dileu' - - with requests_mock() as mock: - mock.post(ANY, status_code=200, text=existing) - - # Mash up a new zone with records so a plan - # is generated with changes and applied. For some reason - # passing self.expected, or just changing each record's zone - # doesn't work. Nor does this without a single add_record after - wanted = Zone('unit.tests.', []) - for record in list(self.expected.records): - data = {'type': record._type} - data.update(record.data) - wanted.add_record(Record.new(wanted, record.name, data)) - - wanted.add_record(Record.new(wanted, 'prawf', { - 'ttl': 60, - 'type': 'TXT', - 'value': 'prawf yw e', - })) - - plan = provider.plan(wanted) - - # Octo ignores NS records (15-1) - self.assertEquals(1, len([c for c in plan.changes - if isinstance(c, Update)])) - self.assertEquals(1, len([c for c in plan.changes - if isinstance(c, Delete)])) - self.assertEquals(16, len([c for c in plan.changes - if isinstance(c, Create)])) - self.assertEquals(18, provider.apply(plan)) - self.assertTrue(plan.exists) + def test_missing(self): + with self.assertRaises(ModuleNotFoundError): + from octodns.provider.mythicbeasts import MythicBeastsProvider + MythicBeastsProvider