From 16f9acd8701b2fe848619a0afb1b79388b290943 Mon Sep 17 00:00:00 2001 From: Maikel Poot Date: Tue, 30 Nov 2021 12:12:59 +0100 Subject: [PATCH] Implement cleaned code from `https://github.com/solarmonkey` --- octodns/provider/transip.py | 384 ++++++---------- requirements.txt | 4 +- tests/test_octodns_provider_transip.py | 592 +++++++++++++++---------- 3 files changed, 506 insertions(+), 474 deletions(-) diff --git a/octodns/provider/transip.py b/octodns/provider/transip.py index 176da88..f4c0fe2 100644 --- a/octodns/provider/transip.py +++ b/octodns/provider/transip.py @@ -1,19 +1,18 @@ -# -# -# +from __future__ import (absolute_import, division, print_function, + unicode_literals) -from __future__ import absolute_import, division, print_function, \ - unicode_literals - -from suds import WebFault - -from collections import defaultdict -from . import ProviderException -from .base import BaseProvider +from collections import defaultdict, namedtuple from logging import getLogger + +from transip import TransIP +from transip.exceptions import TransIPHTTPError +from transip.v6.objects import DnsEntry + +from . import ProviderException from ..record import Record -from transip.service.domain import DomainService -from transip.service.objects import DnsEntry +from .base import BaseProvider + +DNSEntry = namedtuple('DNSEntry', ('name', 'expire', 'type', 'content')) class TransipException(ProviderException): @@ -48,6 +47,7 @@ class TransipProvider(BaseProvider): # if both `key_file` and `key` are presented `key_file` is used ''' + SUPPORTS_GEO = False SUPPORTS_DYNAMIC = False SUPPORTS = set(('A', 'AAAA', 'CNAME', 'MX', 'NS', 'SRV', 'SPF', 'TXT', @@ -64,71 +64,74 @@ class TransipProvider(BaseProvider): super(TransipProvider, self).__init__(id, *args, **kwargs) if key_file is not None: - self._client = self._domain_service(account, - private_key_file=key_file) + self._client = TransIP(login=account, private_key_file=key_file) elif key is not None: - self._client = self._domain_service(account, private_key=key) + self._client = TransIP(login=account, private_key=key) else: raise TransipConfigException( 'Missing `key` or `key_file` parameter in config' ) - self._currentZone = {} - - def _domain_service(self, *args, **kwargs): - 'This exists only for mocking purposes' - return DomainService(*args, **kwargs) - def populate(self, zone, target=False, lenient=False): - - exists = False - self._currentZone = zone + ''' + Populate the zone with records in-place. + ''' self.log.debug('populate: name=%s, target=%s, lenient=%s', zone.name, target, lenient) before = len(zone.records) + try: - zoneInfo = self._client.get_info(zone.name[:-1]) - except WebFault as e: - if e.fault.faultcode == '102' and target is False: + domain = self._client.domains.get(zone.name.strip('.')) + records = domain.dns.list() + except TransIPHTTPError as e: + if e.response_code == 404 and target is False: # Zone not found in account, and not a target so just # leave an empty zone. - return exists - elif e.fault.faultcode == '102' and target is True: + return False + elif e.response_code == 404 and target is True: self.log.warning('populate: Transip can\'t create new zones') raise TransipNewZoneException( ('populate: ({}) Transip used ' + 'as target for non-existing zone: {}').format( - e.fault.faultcode, zone.name)) + e.response_code, zone.name)) else: - self.log.error('populate: (%s) %s ', e.fault.faultcode, - e.fault.faultstring) - raise e + self.log.error( + 'populate: (%s) %s ', e.response_code, e.message + ) + raise TransipException( + 'Unhandled error: ({}) {}'.format( + e.response_code, e.message + ) + ) - self.log.debug('populate: found %s records for zone %s', - len(zoneInfo.dnsEntries), zone.name) - exists = True - if zoneInfo.dnsEntries: + self.log.debug( + 'populate: found %s records for zone %s', len(records), zone.name + ) + if records: values = defaultdict(lambda: defaultdict(list)) - for record in zoneInfo.dnsEntries: - name = zone.hostname_from_fqdn(record['name']) + for record in records: + name = zone.hostname_from_fqdn(record.name) if name == self.ROOT_RECORD: name = '' - if record['type'] in self.SUPPORTS: - values[name][record['type']].append(record) + if record.type in self.SUPPORTS: + values[name][record.type].append(record) for name, types in values.items(): for _type, records in types.items(): - data_for = getattr(self, '_data_for_{}'.format(_type)) - record = Record.new(zone, name, data_for(_type, records), - source=self, lenient=lenient) + record = Record.new( + zone, + name, + _data_for(_type, records, zone), + source=self, + lenient=lenient, + ) zone.add_record(record, lenient=lenient) - self.log.info('populate: found %s records, exists = %s', - len(zone.records) - before, exists) + self.log.info('populate: found %s records', + len(zone.records) - before) - self._currentZone = {} - return exists + return True def _apply(self, plan): desired = plan.desired @@ -136,219 +139,120 @@ class TransipProvider(BaseProvider): self.log.debug('apply: zone=%s, changes=%d', desired.name, len(changes)) - self._currentZone = plan.desired try: - self._client.get_info(plan.desired.name[:-1]) - except WebFault as e: - self.log.exception('_apply: get_info failed') - raise e + domain = self._client.domains.get(plan.desired.name[:-1]) + except TransIPHTTPError as e: + self.log.exception('_apply: getting the domain failed') + raise TransipException( + 'Unhandled error: ({}) {}'.format(e.response_code, e.message) + ) - _dns_entries = [] + records = [] for record in plan.desired.records: - entries_for = getattr(self, '_entries_for_{}'.format(record._type)) + if record._type in self.SUPPORTS: + # Root records have '@' as name + name = record.name + if name == '': + name = self.ROOT_RECORD - # Root records have '@' as name - name = record.name - if name == '': - name = self.ROOT_RECORD - - _dns_entries.extend(entries_for(name, record)) + records.extend(_entries_for(name, record)) + # Transform DNSEntry namedtuples into transip.v6.objects.DnsEntry + # objects, which is a bit ugly because it's quite a magical object. + api_records = [DnsEntry(domain.dns, r._asdict()) for r in records] try: - self._client.set_dns_entries(plan.desired.name[:-1], _dns_entries) - except WebFault as e: - self.log.warning(('_apply: Set DNS returned ' + - 'one or more errors: {}').format( - e.fault.faultstring)) - raise TransipException(200, e.fault.faultstring) + domain.dns.replace(api_records) + except TransIPHTTPError as e: + self.log.warning( + '_apply: Set DNS returned one or more errors: {}'.format(e) + ) + raise TransipException( + 'Unhandled error: ({}) {}'.format(e.response_code, e.message) + ) - self._currentZone = {} - - def _entries_for_multiple(self, name, record): - _entries = [] - - for value in record.values: - _entries.append(DnsEntry(name, record.ttl, record._type, value)) - - return _entries - - def _entries_for_single(self, name, record): - - return [DnsEntry(name, record.ttl, record._type, record.value)] - - _entries_for_A = _entries_for_multiple - _entries_for_AAAA = _entries_for_multiple - _entries_for_NS = _entries_for_multiple - _entries_for_SPF = _entries_for_multiple - _entries_for_CNAME = _entries_for_single - - def _entries_for_MX(self, name, record): - _entries = [] - - for value in record.values: - content = "{} {}".format(value.preference, value.exchange) - _entries.append(DnsEntry(name, record.ttl, record._type, content)) - - return _entries - - def _entries_for_SRV(self, name, record): - _entries = [] - - for value in record.values: - content = "{} {} {} {}".format(value.priority, value.weight, - value.port, value.target) - _entries.append(DnsEntry(name, record.ttl, record._type, content)) - - return _entries - - def _entries_for_SSHFP(self, name, record): - _entries = [] - - for value in record.values: - content = "{} {} {}".format(value.algorithm, - value.fingerprint_type, - value.fingerprint) - _entries.append(DnsEntry(name, record.ttl, record._type, content)) - - return _entries - - def _entries_for_CAA(self, name, record): - _entries = [] - - for value in record.values: - content = "{} {} {}".format(value.flags, value.tag, - value.value) - _entries.append(DnsEntry(name, record.ttl, record._type, content)) - - return _entries - - def _entries_for_TXT(self, name, record): - _entries = [] - - for value in record.values: - value = value.replace('\\;', ';') - _entries.append(DnsEntry(name, record.ttl, record._type, value)) - - return _entries - - def _parse_to_fqdn(self, value): - - # Enforce switch from suds.sax.text.Text to string - value = str(value) - - # TransIP allows '@' as value to alias the root record. - # this provider won't set an '@' value, but can be an existing record - if value == self.ROOT_RECORD: - value = self._currentZone.name - - if value[-1] != '.': - self.log.debug('parseToFQDN: changed %s to %s', value, - '{}.{}'.format(value, self._currentZone.name)) - value = '{}.{}'.format(value, self._currentZone.name) - - return value - - def _get_lowest_ttl(self, records): - _ttl = 100000 - for record in records: - _ttl = min(_ttl, record['expire']) - return _ttl - - def _data_for_multiple(self, _type, records): - - _values = [] - for record in records: - # Enforce switch from suds.sax.text.Text to string - _values.append(str(record['content'])) +def _data_for(type_, records, current_zone): + if type_ == 'CNAME': return { - 'ttl': self._get_lowest_ttl(records), - 'type': _type, - 'values': _values + 'type': type_, + 'ttl': records[0].expire, + 'value': _parse_to_fqdn(records[0].content, current_zone), } - _data_for_A = _data_for_multiple - _data_for_AAAA = _data_for_multiple - _data_for_NS = _data_for_multiple - _data_for_SPF = _data_for_multiple - - def _data_for_CNAME(self, _type, records): + def format_mx(record): + preference, exchange = record.content.split(' ', 1) return { - 'ttl': records[0]['expire'], - 'type': _type, - 'value': self._parse_to_fqdn(records[0]['content']) + 'preference': preference, + 'exchange': _parse_to_fqdn(exchange, current_zone), } - def _data_for_MX(self, _type, records): - _values = [] - for record in records: - preference, exchange = record['content'].split(" ", 1) - _values.append({ - 'preference': preference, - 'exchange': self._parse_to_fqdn(exchange) - }) + def format_srv(record): + priority, weight, port, target = record.content.split(' ', 3) return { - 'ttl': self._get_lowest_ttl(records), - 'type': _type, - 'values': _values + 'port': port, + 'priority': priority, + 'target': _parse_to_fqdn(target, current_zone), + 'weight': weight, } - def _data_for_SRV(self, _type, records): - _values = [] - for record in records: - priority, weight, port, target = record['content'].split(' ', 3) - _values.append({ - 'port': port, - 'priority': priority, - 'target': self._parse_to_fqdn(target), - 'weight': weight - }) - + def format_sshfp(record): + algorithm, fp_type, fingerprint = record.content.split(' ', 2) return { - 'type': _type, - 'ttl': self._get_lowest_ttl(records), - 'values': _values + 'algorithm': algorithm, + 'fingerprint': fingerprint.lower(), + 'fingerprint_type': fp_type, } - def _data_for_SSHFP(self, _type, records): - _values = [] - for record in records: - algorithm, fp_type, fingerprint = record['content'].split(' ', 2) - _values.append({ - 'algorithm': algorithm, - 'fingerprint': fingerprint.lower(), - 'fingerprint_type': fp_type - }) + def format_caa(record): + flags, tag, value = record.content.split(' ', 2) + return {'flags': flags, 'tag': tag, 'value': value} - return { - 'type': _type, - 'ttl': self._get_lowest_ttl(records), - 'values': _values - } + def format_txt(record): + return record.content.replace(';', '\\;') - def _data_for_CAA(self, _type, records): - _values = [] - for record in records: - flags, tag, value = record['content'].split(' ', 2) - _values.append({ - 'flags': flags, - 'tag': tag, - 'value': value - }) + value_formatter = { + 'MX': format_mx, + 'SRV': format_srv, + 'SSHFP': format_sshfp, + 'CAA': format_caa, + 'TXT': format_txt, + }.get(type_, lambda r: r.content) - return { - 'type': _type, - 'ttl': self._get_lowest_ttl(records), - 'values': _values - } + return { + 'type': type_, + 'ttl': _get_lowest_ttl(records), + 'values': [value_formatter(r) for r in records], + } - def _data_for_TXT(self, _type, records): - _values = [] - for record in records: - _values.append(record['content'].replace(';', '\\;')) - return { - 'type': _type, - 'ttl': self._get_lowest_ttl(records), - 'values': _values - } +def _parse_to_fqdn(value, current_zone): + # TransIP allows '@' as value to alias the root record. + # this provider won't set an '@' value, but can be an existing record + if value == TransipProvider.ROOT_RECORD: + value = current_zone.name + + if value[-1] != '.': + value = '{}.{}'.format(value, current_zone.name) + + return value + + +def _get_lowest_ttl(records): + return min([r.expire for r in records] + [100000]) + + +def _entries_for(name, record): + values = record.values if hasattr(record, 'values') else [record.value] + formatter = { + 'MX': lambda v: f'{v.preference} {v.exchange}', + 'SRV': lambda v: f'{v.priority} {v.weight} {v.port} {v.target}', + 'SSHFP': lambda v: ( + f'{v.algorithm} {v.fingerprint_type} {v.fingerprint}' + ), + 'CAA': lambda v: f'{v.flags} {v.tag} {v.value}', + 'TXT': lambda v: v.replace('\\;', ';'), + }.get(record._type, lambda r: r) + return [ + DNSEntry(name, record.ttl, record._type, formatter(value)) + for value in values + ] diff --git a/requirements.txt b/requirements.txt index 13ab92c..4f98856 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,8 +20,8 @@ ovh==0.5.0 pycountry-convert==0.7.2 pycountry==20.7.3 python-dateutil==2.8.1 -requests==2.24.0 +requests==2.25.1 s3transfer==0.3.3 setuptools==44.1.1 six==1.15.0 -transip==2.1.2 +python-transip==0.5.0 \ No newline at end of file diff --git a/tests/test_octodns_provider_transip.py b/tests/test_octodns_provider_transip.py index 8a2e11a..e0ba398 100644 --- a/tests/test_octodns_provider_transip.py +++ b/tests/test_octodns_provider_transip.py @@ -1,291 +1,419 @@ -# -# -# - -from __future__ import absolute_import, division, print_function, \ - unicode_literals +from __future__ import (absolute_import, division, print_function, + unicode_literals) +from operator import itemgetter from os.path import dirname, join -from six import text_type - -from suds import WebFault - -from mock import patch from unittest import TestCase +from unittest.mock import Mock, patch -from octodns.provider.transip import TransipProvider +from octodns.provider.transip import (DNSEntry, TransipConfigException, + TransipException, + TransipNewZoneException, TransipProvider, + _entries_for, _parse_to_fqdn) from octodns.provider.yaml import YamlProvider from octodns.zone import Zone -from transip.service.objects import DnsEntry +from transip.exceptions import TransIPHTTPError -class MockFault(object): - faultstring = "" - faultcode = "" - - def __init__(self, code, string, *args, **kwargs): - self.faultstring = string - self.faultcode = code +def make_expected(): + expected = Zone("unit.tests.", []) + source = YamlProvider("test", join(dirname(__file__), "config")) + source.populate(expected) + return expected -class MockResponse(object): - dnsEntries = [] +def make_mock(): + zone = make_expected() + + # Turn Zone.records into TransIP DNSEntries + api_entries = [] + for record in zone.records: + if record._type in TransipProvider.SUPPORTS: + # Root records have '@' as name + name = record.name + if name == "": + name = TransipProvider.ROOT_RECORD + + api_entries.extend(_entries_for(name, record)) + + # Append bogus entry so test for record type not being in SUPPORTS is + # executed. For 100% test coverage. + api_entries.append(DNSEntry("@", "3600", "BOGUS", "ns.transip.nl")) + + return zone, api_entries -class MockDomainService(object): +def make_mock_empty(): + mock = Mock() + mock.return_value.domains.get.return_value.dns.list.return_value = [] + return mock - def __init__(self, *args, **kwargs): - self.mockupEntries = [] - self.throw_auth_fault = False - def mockup(self, records): - - provider = TransipProvider('', '', '') - - _dns_entries = [] - for record in records: - if record._type in provider.SUPPORTS: - entries_for = getattr(provider, - '_entries_for_{}'.format(record._type)) - - # Root records have '@' as name - name = record.name - if name == '': - name = provider.ROOT_RECORD - - _dns_entries.extend(entries_for(name, record)) - - # Add a non-supported type - # so it triggers the "is supported" (transip.py:115) check and - # give 100% code coverage - _dns_entries.append( - DnsEntry('@', '3600', 'BOGUS', 'ns01.transip.nl.')) - - self.mockupEntries = _dns_entries - - # Skips authentication layer and returns the entries loaded by "Mockup" - def get_info(self, domain_name): - - if self.throw_auth_fault: - self.raiseInvalidAuth() - - # Special 'domain' to trigger error - if str(domain_name) == str('notfound.unit.tests'): - self.raiseZoneNotFound() - - result = MockResponse() - result.dnsEntries = self.mockupEntries - return result - - def set_dns_entries(self, domain_name, dns_entries): - - # Special 'domain' to trigger error - if str(domain_name) == str('failsetdns.unit.tests'): - self.raiseSaveError() - - return True - - def raiseZoneNotFound(self): - fault = MockFault(str('102'), '102 is zone not found') - document = {} - raise WebFault(fault, document) - - def raiseInvalidAuth(self): - fault = MockFault(str('200'), '200 is invalid auth') - document = {} - raise WebFault(fault, document) - - def raiseSaveError(self): - fault = MockFault(str('200'), '202 random error') - document = {} - raise WebFault(fault, document) +def make_failing_mock(response_code): + mock = Mock() + mock.return_value.domains.get.side_effect = [ + TransIPHTTPError(str(response_code), response_code) + ] + return mock class TestTransipProvider(TestCase): - bogus_key = str("""-----BEGIN RSA PRIVATE KEY----- -MIIEowIBAAKCAQEA0U5HGCkLrz423IyUf3u4cKN2WrNz1x5KNr6PvH2M/zxas+zB -elbxkdT3AQ+wmfcIvOuTmFRTHv35q2um1aBrPxVw+2s+lWo28VwIRttwIB1vIeWu -lSBnkEZQRLyPI2tH0i5QoMX4CVPf9rvij3Uslimi84jdzDfPFIh6jZ6C8nLipOTG -0IMhge1ofVfB0oSy5H+7PYS2858QLAf5ruYbzbAxZRivS402wGmQ0d0Lc1KxraAj -kiMM5yj/CkH/Vm2w9I6+tLFeASE4ub5HCP5G/ig4dbYtqZMQMpqyAbGxd5SOVtyn -UHagAJUxf8DT3I8PyjEHjxdOPUsxNyRtepO/7QIDAQABAoIBAQC7fiZ7gxE/ezjD -2n6PsHFpHVTBLS2gzzZl0dCKZeFvJk6ODJDImaeuHhrh7X8ifMNsEI9XjnojMhl8 -MGPzy88mZHugDNK0H8B19x5G8v1/Fz7dG5WHas660/HFkS+b59cfdXOugYiOOn9O -08HBBpLZNRUOmVUuQfQTjapSwGLG8PocgpyRD4zx0LnldnJcqYCxwCdev+AAsPnq -ibNtOd/MYD37w9MEGcaxLE8wGgkv8yd97aTjkgE+tp4zsM4QE4Rag133tsLLNznT -4Qr/of15M3NW/DXq/fgctyRcJjZpU66eCXLCz2iRTnLyyxxDC2nwlxKbubV+lcS0 -S4hbfd/BAoGBAO8jXxEaiybR0aIhhSR5esEc3ymo8R8vBN3ZMJ+vr5jEPXr/ZuFj -/R4cZ2XV3VoQJG0pvIOYVPZ5DpJM7W+zSXtJ/7bLXy4Bnmh/rc+YYgC+AXQoLSil -iD2OuB2xAzRAK71DVSO0kv8gEEXCersPT2i6+vC2GIlJvLcYbOdRKWGxAoGBAOAQ -aJbRLtKujH+kMdoMI7tRlL8XwI+SZf0FcieEu//nFyerTePUhVgEtcE+7eQ7hyhG -fIXUFx/wALySoqFzdJDLc8U8pTLhbUaoLOTjkwnCTKQVprhnISqQqqh/0U5u47IE -RWzWKN6OHb0CezNTq80Dr6HoxmPCnJHBHn5LinT9AoGAQSpvZpbIIqz8pmTiBl2A -QQ2gFpcuFeRXPClKYcmbXVLkuhbNL1BzEniFCLAt4LQTaRf9ghLJ3FyCxwVlkpHV -zV4N6/8hkcTpKOraL38D/dXJSaEFJVVuee/hZl3tVJjEEpA9rDwx7ooLRSdJEJ6M -ciq55UyKBSdt4KssSiDI2RECgYBL3mJ7xuLy5bWfNsrGiVvD/rC+L928/5ZXIXPw -26oI0Yfun7ulDH4GOroMcDF/GYT/Zzac3h7iapLlR0WYI47xxGI0A//wBZLJ3QIu -krxkDo2C9e3Y/NqnHgsbOQR3aWbiDT4wxydZjIeXS3LKA2fl6Hyc90PN3cTEOb8I -hq2gRQKBgEt0SxhhtyB93SjgTzmUZZ7PiEf0YJatfM6cevmjWHexrZH+x31PB72s -fH2BQyTKKzoCLB1k/6HRaMnZdrWyWSZ7JKz3AHJ8+58d0Hr8LTrzDM1L6BbjeDct -N4OiVz1I3rbZGYa396lpxO6ku8yCglisL1yrSP6DdEUp66ntpKVd ------END RSA PRIVATE KEY-----""") + bogus_key = "-----BEGIN RSA PRIVATE KEY-----Z-----END RSA PRIVATE KEY-----" - def make_expected(self): - expected = Zone('unit.tests.', []) - source = YamlProvider('test', join(dirname(__file__), 'config')) - source.populate(expected) - return expected - - @patch('octodns.provider.transip.TransipProvider._domain_service', - return_value=MockDomainService()) - def test_init(self, _): - - # No key nor key_file - with self.assertRaises(Exception) as ctx: - TransipProvider('test', 'unittest') + @patch("octodns.provider.transip.TransIP", make_mock_empty()) + def test_init(self): + with self.assertRaises(TransipConfigException) as ctx: + TransipProvider("test", "unittest") self.assertEquals( - str('Missing `key` or `key_file` parameter in config'), - str(ctx.exception)) + "Missing `key` or `key_file` parameter in config", + str(ctx.exception), + ) - # With key - TransipProvider('test', 'unittest', key=self.bogus_key) - - # With key_file - TransipProvider('test', 'unittest', key_file='/fake/path') - - @patch('suds.client.Client.__init__', new=lambda *args, **kwargs: None) - def test_domain_service(self): - # Special case smoke test for DomainService to get coverage - TransipProvider('test', 'unittest', key=self.bogus_key) - - @patch('octodns.provider.transip.TransipProvider._domain_service', - return_value=MockDomainService()) - def test_populate(self, _): - _expected = self.make_expected() + # Those should work + TransipProvider("test", "unittest", key=self.bogus_key) + TransipProvider("test", "unittest", key_file="/fake/path") + @patch("octodns.provider.transip.TransIP", make_failing_mock(401)) + def test_populate_unauthenticated(self): # Unhappy Plan - Not authenticated - # Live test against API, will fail in an unauthorized error - with self.assertRaises(WebFault) as ctx: - provider = TransipProvider('test', 'unittest', self.bogus_key) - provider._client.throw_auth_fault = True - zone = Zone('unit.tests.', []) + provider = TransipProvider("test", "unittest", self.bogus_key) + zone = Zone("unit.tests.", []) + with self.assertRaises(TransipException): provider.populate(zone, True) - self.assertEquals(str('WebFault'), - str(ctx.exception.__class__.__name__)) - - self.assertEquals(str('200'), ctx.exception.fault.faultcode) - - # No more auth problems - provider._client.throw_auth_fault = False - + @patch("octodns.provider.transip.TransIP", make_failing_mock(404)) + def test_populate_new_zone_as_target(self): # Unhappy Plan - Zone does not exists # Will trigger an exception if provider is used as a target for a # non-existing zone - with self.assertRaises(Exception) as ctx: - provider = TransipProvider('test', 'unittest', self.bogus_key) - zone = Zone('notfound.unit.tests.', []) + provider = TransipProvider("test", "unittest", self.bogus_key) + zone = Zone("notfound.unit.tests.", []) + with self.assertRaises(TransipNewZoneException): provider.populate(zone, True) - self.assertEquals(str('TransipNewZoneException'), - str(ctx.exception.__class__.__name__)) - - self.assertEquals( - 'populate: (102) Transip used as target' + - ' for non-existing zone: notfound.unit.tests.', - text_type(ctx.exception)) - + @patch("octodns.provider.transip.TransIP", make_mock_empty()) + def test_populate_new_zone_not_target(self): # Happy Plan - Zone does not exists # Won't trigger an exception if provider is NOT used as a target for a # non-existing zone. - provider = TransipProvider('test', 'unittest', self.bogus_key) - zone = Zone('notfound.unit.tests.', []) + provider = TransipProvider("test", "unittest", self.bogus_key) + zone = Zone("notfound.unit.tests.", []) provider.populate(zone, False) - # Happy Plan - Populate with mockup records - provider = TransipProvider('test', 'unittest', self.bogus_key) - provider._client.mockup(_expected.records) - zone = Zone('unit.tests.', []) + @patch("octodns.provider.transip.TransIP", make_failing_mock(404)) + def test_populate_zone_does_not_exist(self): + # Happy Plan - Zone does not exists + # Won't trigger an exception if provider is NOT used as a target for a + # non-existing zone. + provider = TransipProvider("test", "unittest", self.bogus_key) + zone = Zone("notfound.unit.tests.", []) provider.populate(zone, False) - # Transip allows relative values for types like cname, mx. - # Test is these are correctly appended with the domain - provider._currentZone = zone - self.assertEquals("www.unit.tests.", provider._parse_to_fqdn("www")) - self.assertEquals("www.unit.tests.", - provider._parse_to_fqdn("www.unit.tests.")) - self.assertEquals("www.sub.sub.sub.unit.tests.", - provider._parse_to_fqdn("www.sub.sub.sub")) - self.assertEquals("unit.tests.", - provider._parse_to_fqdn("@")) + @patch("octodns.provider.transip.TransIP") + def test_populate_zone_exists_not_target(self, mock_client): + # Happy Plan - Populate + source_zone, api_records = make_mock() + mock_client.return_value.domains.get.return_value.dns.list. \ + return_value = api_records + provider = TransipProvider("test", "unittest", self.bogus_key) + zone = Zone("unit.tests.", []) + exists = provider.populate(zone, False) + + self.assertTrue(exists, "populate should return True") + + # Due to the implementation of Record._equality_tuple() we can't do a + # normal compare, as that ingores ttl's for example. We therefor use + # the __repr__ to compare. We do need to filter out `.geo` attributes + # that Transip doesn't support. + expected = set() + for r in source_zone.records: + if r._type in TransipProvider.SUPPORTS: + if hasattr(r, "geo"): + r.geo = None + expected.add(r.__repr__()) + self.assertEqual({r.__repr__() for r in zone.records}, expected) + + @patch("octodns.provider.transip.TransIP", make_mock_empty()) + def test_populate_zone_exists_as_target(self): # Happy Plan - Even if the zone has no records the zone should exist - provider = TransipProvider('test', 'unittest', self.bogus_key) - zone = Zone('unit.tests.', []) + provider = TransipProvider("test", "unittest", self.bogus_key) + zone = Zone("unit.tests.", []) exists = provider.populate(zone, True) - self.assertTrue(exists, 'populate should return true') + self.assertTrue(exists, "populate should return True") - return + @patch("octodns.provider.transip.TransIP", make_mock_empty()) + def test_plan(self): + # Test happy plan, only create + provider = TransipProvider("test", "unittest", self.bogus_key) - @patch('octodns.provider.transip.TransipProvider._domain_service', - return_value=MockDomainService()) - def test_plan(self, _): - _expected = self.make_expected() + plan = provider.plan(make_expected()) - # Test Happy plan, only create - provider = TransipProvider('test', 'unittest', self.bogus_key) - plan = provider.plan(_expected) + self.assertIsNotNone(plan) + self.assertEqual(15, plan.change_counts["Create"]) + self.assertEqual(0, plan.change_counts["Update"]) + self.assertEqual(0, plan.change_counts["Delete"]) - self.assertEqual(15, plan.change_counts['Create']) - self.assertEqual(0, plan.change_counts['Update']) - self.assertEqual(0, plan.change_counts['Delete']) + @patch("octodns.provider.transip.TransIP") + def test_apply(self, client_mock): + # Test happy flow. Create all supported records + domain_mock = Mock() + client_mock.return_value.domains.get.return_value = domain_mock + domain_mock.dns.list.return_value = [] + provider = TransipProvider("test", "unittest", self.bogus_key) - return + plan = provider.plan(make_expected()) + self.assertIsNotNone(plan) + provider.apply(plan) - @patch('octodns.provider.transip.TransipProvider._domain_service', - return_value=MockDomainService()) - def test_apply(self, _): - _expected = self.make_expected() + domain_mock.dns.replace.assert_called_once() - # Test happy flow. Create all supoorted records - provider = TransipProvider('test', 'unittest', self.bogus_key) - plan = provider.plan(_expected) - self.assertEqual(15, len(plan.changes)) - changes = provider.apply(plan) - self.assertEqual(changes, len(plan.changes)) + # These are the supported ones from tests/config/unit.test.yaml + expected_entries = [ + { + "name": "ignored", + "expire": 3600, + "type": "A", + "content": "9.9.9.9", + }, + { + "name": "@", + "expire": 3600, + "type": "CAA", + "content": "0 issue ca.unit.tests", + }, + { + "name": "sub", + "expire": 3600, + "type": "NS", + "content": "6.2.3.4.", + }, + { + "name": "sub", + "expire": 3600, + "type": "NS", + "content": "7.2.3.4.", + }, + { + "name": "spf", + "expire": 600, + "type": "SPF", + "content": "v=spf1 ip4:192.168.0.1/16-all", + }, + { + "name": "_srv._tcp", + "expire": 600, + "type": "SRV", + "content": "10 20 30 foo-1.unit.tests.", + }, + { + "name": "_srv._tcp", + "expire": 600, + "type": "SRV", + "content": "12 20 30 foo-2.unit.tests.", + }, + { + "name": "_pop3._tcp", + "expire": 600, + "type": "SRV", + "content": "0 0 0 .", + }, + { + "name": "_imap._tcp", + "expire": 600, + "type": "SRV", + "content": "0 0 0 .", + }, + { + "name": "txt", + "expire": 600, + "type": "TXT", + "content": "Bah bah black sheep", + }, + { + "name": "txt", + "expire": 600, + "type": "TXT", + "content": "have you any wool.", + }, + { + "name": "txt", + "expire": 600, + "type": "TXT", + "content": ( + "v=DKIM1;k=rsa;s=email;h=sha256;" + "p=A/kinda+of/long/string+with+numb3rs" + ), + }, + {"name": "@", "expire": 3600, "type": "NS", "content": "6.2.3.4."}, + {"name": "@", "expire": 3600, "type": "NS", "content": "7.2.3.4."}, + { + "name": "cname", + "expire": 300, + "type": "CNAME", + "content": "unit.tests.", + }, + { + "name": "excluded", + "expire": 3600, + "type": "CNAME", + "content": "unit.tests.", + }, + { + "name": "www.sub", + "expire": 300, + "type": "A", + "content": "2.2.3.6", + }, + { + "name": "included", + "expire": 3600, + "type": "CNAME", + "content": "unit.tests.", + }, + { + "name": "mx", + "expire": 300, + "type": "MX", + "content": "10 smtp-4.unit.tests.", + }, + { + "name": "mx", + "expire": 300, + "type": "MX", + "content": "20 smtp-2.unit.tests.", + }, + { + "name": "mx", + "expire": 300, + "type": "MX", + "content": "30 smtp-3.unit.tests.", + }, + { + "name": "mx", + "expire": 300, + "type": "MX", + "content": "40 smtp-1.unit.tests.", + }, + { + "name": "aaaa", + "expire": 600, + "type": "AAAA", + "content": "2601:644:500:e210:62f8:1dff:feb8:947a", + }, + {"name": "@", "expire": 300, "type": "A", "content": "1.2.3.4"}, + {"name": "@", "expire": 300, "type": "A", "content": "1.2.3.5"}, + {"name": "www", "expire": 300, "type": "A", "content": "2.2.3.6"}, + { + "name": "@", + "expire": 3600, + "type": "SSHFP", + "content": "1 1 7491973e5f8b39d5327cd4e08bc81b05f7710b49", + }, + { + "name": "@", + "expire": 3600, + "type": "SSHFP", + "content": "1 1 bf6b6825d2977c511a475bbefb88aad54a92ac73", + }, + ] + # Unpack from the transip library magic structure... + seen_entries = [ + e.__dict__["_attrs"] + for e in domain_mock.dns.replace.mock_calls[0][1][0] + ] + self.assertEqual( + sorted(seen_entries, key=itemgetter("name", "type", "expire")), + sorted(expected_entries, key=itemgetter("name", "type", "expire")), + ) + @patch("octodns.provider.transip.TransIP") + def test_apply_unsupported(self, client_mock): + # This triggers the if supported statement to give 100% code coverage + domain_mock = Mock() + client_mock.return_value.domains.get.return_value = domain_mock + domain_mock.dns.list.return_value = [] + provider = TransipProvider("test", "unittest", self.bogus_key) + + plan = provider.plan(make_expected()) + self.assertIsNotNone(plan) + + # Test apply with only support for A records + provider.SUPPORTS = set(("A")) + + provider.apply(plan) + seen_entries = [ + e.__dict__["_attrs"] + for e in domain_mock.dns.replace.mock_calls[0][1][0] + ] + expected_entries = [ + { + "name": "ignored", + "expire": 3600, + "type": "A", + "content": "9.9.9.9", + }, + { + "name": "www.sub", + "expire": 300, + "type": "A", + "content": "2.2.3.6", + }, + {"name": "@", "expire": 300, "type": "A", "content": "1.2.3.4"}, + {"name": "@", "expire": 300, "type": "A", "content": "1.2.3.5"}, + {"name": "www", "expire": 300, "type": "A", "content": "2.2.3.6"}, + ] + self.assertEquals( + sorted(seen_entries, key=itemgetter("name", "type", "expire")), + sorted(expected_entries, key=itemgetter("name", "type", "expire")), + ) + + @patch("octodns.provider.transip.TransIP") + def test_apply_failure_on_not_found(self, client_mock): # Test unhappy flow. Trigger 'not found error' in apply stage # This should normally not happen as populate will capture it first # but just in case. - changes = [] # reset changes - with self.assertRaises(Exception) as ctx: - provider = TransipProvider('test', 'unittest', self.bogus_key) - plan = provider.plan(_expected) - plan.desired.name = 'notfound.unit.tests.' - changes = provider.apply(plan) + domain_mock = Mock() + domain_mock.dns.list.return_value = [] + client_mock.return_value.domains.get.side_effect = [ + domain_mock, + TransIPHTTPError("Not Found", 404), + ] + provider = TransipProvider("test", "unittest", self.bogus_key) - # Changes should not be set due to an Exception - self.assertEqual([], changes) + plan = provider.plan(make_expected()) - self.assertEquals(str('WebFault'), - str(ctx.exception.__class__.__name__)) - - self.assertEquals(str('102'), ctx.exception.fault.faultcode) + with self.assertRaises(TransipException): + provider.apply(plan) + @patch("octodns.provider.transip.TransIP") + def test_apply_failure_on_error(self, client_mock): # Test unhappy flow. Trigger a unrecoverable error while saving - _expected = self.make_expected() # reset expected - changes = [] # reset changes + domain_mock = Mock() + domain_mock.dns.list.return_value = [] + domain_mock.dns.replace.side_effect = [ + TransIPHTTPError("Not Found", 500) + ] + client_mock.return_value.domains.get.return_value = domain_mock + provider = TransipProvider("test", "unittest", self.bogus_key) - with self.assertRaises(Exception) as ctx: - provider = TransipProvider('test', 'unittest', self.bogus_key) - plan = provider.plan(_expected) - plan.desired.name = 'failsetdns.unit.tests.' - changes = provider.apply(plan) + plan = provider.plan(make_expected()) - # Changes should not be set due to an Exception - self.assertEqual([], changes) + with self.assertRaises(TransipException): + provider.apply(plan) - self.assertEquals(str('TransipException'), - str(ctx.exception.__class__.__name__)) + +class TestParseFQDN(TestCase): + def test_parse_fqdn(self): + zone = Zone("unit.tests.", []) + self.assertEquals("www.unit.tests.", _parse_to_fqdn("www", zone)) + self.assertEquals( + "www.unit.tests.", _parse_to_fqdn("www.unit.tests.", zone) + ) + self.assertEquals( + "www.sub.sub.sub.unit.tests.", + _parse_to_fqdn("www.sub.sub.sub", zone), + ) + self.assertEquals("unit.tests.", _parse_to_fqdn("@", zone))