From c86b5bb904ee5744e1e6d42a59fcfe5da603dc2c Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Thu, 13 Jan 2022 15:26:38 -0800 Subject: [PATCH] Extract OvhProvider from octoDNS core --- CHANGELOG.md | 1 + README.md | 2 +- octodns/provider/ovh.py | 429 +------------------------- requirements.txt | 1 - tests/test_octodns_provider_ovh.py | 463 +---------------------------- 5 files changed, 20 insertions(+), 876 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a2b8052..3a0f91e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ * [HetznerProvider](https://github.com/octodns/octodns-hetzner/) * [MythicBeastsProvider](https://github.com/octodns/octodns-mythicbeasts/) * [Ns1Provider](https://github.com/octodns/octodns-ns1/) + * [OvhProvider](https://github.com/octodns/octodns-ovh/) * [PowerDnsProvider](https://github.com/octodns/octodns-powerdns/) * [Route53Provider](https://github.com/octodns/octodns-route53/) also AwsAcmMangingProcessor diff --git a/README.md b/README.md index 476d705..82d2cbb 100644 --- a/README.md +++ b/README.md @@ -209,7 +209,7 @@ The table below lists the providers octoDNS supports. We're currently in the pro | [HetznerProvider](https://github.com/octodns/octodns-hetzner/) | [octodns_hetzner](https://github.com/octodns/octodns-hetzner/) | | | | | | [MythicBeastsProvider](https://github.com/octodns/octodns-mythicbeasts/) | [octodns_mythicbeasts](https://github.com/octodns/octodns-mythicbeasts/) | | | | | | [Ns1Provider](https://github.com/octodns/octodns-ns1/) | [octodns_ns1](https://github.com/octodns/octodns-ns1/) | | | | | -| [OVH](/octodns/provider/ovh.py) | | ovh | A, AAAA, CAA, CNAME, MX, NAPTR, NS, PTR, SPF, SRV, SSHFP, TXT, DKIM | No | | +| [OvhProvider](https://github.com/octodns/octodns-ovh/) | [octodns_ovh](https://github.com/octodns/octodns-ovh/) | | A, AAAA, CAA, CNAME, MX, NAPTR, NS, PTR, SPF, SRV, SSHFP, TXT, DKIM | No | | | [PowerDnsProvider](https://github.com/octodns/octodns-powerdns/) | [octodns_powerdns](https://github.com/octodns/octodns-powerdns/) | | | | | | [Rackspace](/octodns/provider/rackspace.py) | | | A, AAAA, ALIAS, CNAME, MX, NS, PTR, SPF, TXT | No | | | [Route53](https://github.com/octodns/octodns-route53) | [octodns_route53](https://github.com/octodns/octodns-route53) | | | | | diff --git a/octodns/provider/ovh.py b/octodns/provider/ovh.py index 9b7d4e3..d7fc2e8 100644 --- a/octodns/provider/ovh.py +++ b/octodns/provider/ovh.py @@ -5,420 +5,17 @@ from __future__ import absolute_import, division, print_function, \ unicode_literals -import base64 -import binascii -import logging -from collections import defaultdict +from logging import getLogger -import ovh -from ovh import ResourceNotFoundError - -from octodns.record import Record -from .base import BaseProvider - - -class OvhProvider(BaseProvider): - """ - OVH provider using API v6 - - ovh: - class: octodns.provider.ovh.OvhProvider - # OVH api v6 endpoint - endpoint: ovh-eu - # API application key - application_key: 1234 - # API application secret - application_secret: 1234 - # API consumer key - consumer_key: 1234 - """ - - SUPPORTS_GEO = False - SUPPORTS_DYNAMIC = False - ZONE_NOT_FOUND_MESSAGE = 'This service does not exist' - - # This variable is also used in populate method to filter which OVH record - # types are supported by octodns - SUPPORTS = set(('A', 'AAAA', 'CAA', 'CNAME', 'DKIM', 'MX', 'NAPTR', 'NS', - 'PTR', 'SPF', 'SRV', 'SSHFP', 'TXT')) - - def __init__(self, id, endpoint, application_key, application_secret, - consumer_key, *args, **kwargs): - self.log = logging.getLogger(f'OvhProvider[{id}]') - self.log.debug('__init__: id=%s, endpoint=%s, application_key=%s, ' - 'application_secret=***, consumer_key=%s', id, endpoint, - application_key, consumer_key) - super(OvhProvider, self).__init__(id, *args, **kwargs) - self._client = ovh.Client( - endpoint=endpoint, - application_key=application_key, - application_secret=application_secret, - consumer_key=consumer_key, - ) - - def populate(self, zone, target=False, lenient=False): - self.log.debug('populate: name=%s, target=%s, lenient=%s', zone.name, - target, lenient) - zone_name = zone.name[:-1] - try: - records = self.get_records(zone_name=zone_name) - exists = True - except ResourceNotFoundError as e: - if str(e) != self.ZONE_NOT_FOUND_MESSAGE: - raise - exists = False - records = [] - - values = defaultdict(lambda: defaultdict(list)) - for record in records: - values[record['subDomain']][record['fieldType']].append(record) - - before = len(zone.records) - for name, types in values.items(): - for _type, records in types.items(): - if _type not in self.SUPPORTS: - self.log.warning('Not managed record of type %s, skip', - _type) - continue - data_for = getattr(self, f'_data_for_{_type}') - record = Record.new(zone, name, data_for(_type, records), - source=self, lenient=lenient) - zone.add_record(record, lenient=lenient) - - self.log.info('populate: found %s records, exists=%s', - len(zone.records) - before, exists) - return exists - - def _apply(self, plan): - desired = plan.desired - changes = plan.changes - zone_name = desired.name[:-1] - self.log.info('_apply: zone=%s, len(changes)=%d', desired.name, - len(changes)) - for change in changes: - class_name = change.__class__.__name__ - getattr(self, f'_apply_{class_name}'.lower())(zone_name, change) - - # We need to refresh the zone to really apply the changes - self._client.post(f'/domain/zone/{zone_name}/refresh') - - def _apply_create(self, zone_name, change): - new = change.new - params_for = getattr(self, f'_params_for_{new._type}') - for params in params_for(new): - self.create_record(zone_name, params) - - def _apply_update(self, zone_name, change): - self._apply_delete(zone_name, change) - self._apply_create(zone_name, change) - - def _apply_delete(self, zone_name, change): - existing = change.existing - record_type = existing._type - if record_type == "TXT": - if self._is_valid_dkim(existing.values[0]): - record_type = 'DKIM' - self.delete_records(zone_name, record_type, existing.name) - - @staticmethod - def _data_for_multiple(_type, records): - return { - 'ttl': records[0]['ttl'], - 'type': _type, - 'values': [record['target'] for record in records] - } - - @staticmethod - def _data_for_single(_type, records): - record = records[0] - return { - 'ttl': record['ttl'], - 'type': _type, - 'value': record['target'] - } - - @staticmethod - def _data_for_CAA(_type, records): - values = [] - for record in records: - flags, tag, value = record['target'].split(' ', 2) - values.append({ - 'flags': flags, - 'tag': tag, - 'value': value[1:-1] - }) - return { - 'ttl': records[0]['ttl'], - 'type': _type, - 'values': values - } - - @staticmethod - def _data_for_MX(_type, records): - values = [] - for record in records: - preference, exchange = record['target'].split(' ', 1) - values.append({ - 'preference': preference, - 'exchange': exchange, - }) - return { - 'ttl': records[0]['ttl'], - 'type': _type, - 'values': values, - } - - @staticmethod - def _data_for_NAPTR(_type, records): - values = [] - for record in records: - order, preference, flags, service, regexp, replacement = record[ - 'target'].split(' ', 5) - values.append({ - 'flags': flags[1:-1], - 'order': order, - 'preference': preference, - 'regexp': regexp[1:-1], - 'replacement': replacement, - 'service': service[1:-1], - }) - return { - 'type': _type, - 'ttl': records[0]['ttl'], - 'values': values - } - - @staticmethod - def _data_for_SRV(_type, records): - values = [] - for record in records: - priority, weight, port, target = record['target'].split(' ', 3) - values.append({ - 'port': port, - 'priority': priority, - 'target': f'{target}.', - 'weight': weight - }) - return { - 'type': _type, - 'ttl': records[0]['ttl'], - 'values': values - } - - @staticmethod - def _data_for_SSHFP(_type, records): - values = [] - for record in records: - algorithm, fingerprint_type, fingerprint = record['target'].split( - ' ', 2) - values.append({ - 'algorithm': algorithm, - 'fingerprint': fingerprint, - 'fingerprint_type': fingerprint_type - }) - return { - 'type': _type, - 'ttl': records[0]['ttl'], - 'values': values - } - - @staticmethod - def _data_for_DKIM(_type, records): - return { - 'ttl': records[0]['ttl'], - 'type': "TXT", - 'values': [record['target'].replace(';', '\\;') - for record in records] - } - - _data_for_A = _data_for_multiple - _data_for_AAAA = _data_for_multiple - _data_for_NS = _data_for_multiple - _data_for_TXT = _data_for_multiple - _data_for_SPF = _data_for_multiple - _data_for_PTR = _data_for_single - _data_for_CNAME = _data_for_single - - @staticmethod - def _params_for_multiple(record): - for value in record.values: - yield { - 'target': value, - 'subDomain': record.name, - 'ttl': record.ttl, - 'fieldType': record._type, - } - - @staticmethod - def _params_for_single(record): - yield { - 'target': record.value, - 'subDomain': record.name, - 'ttl': record.ttl, - 'fieldType': record._type - } - - @staticmethod - def _params_for_CAA(record): - for value in record.values: - yield { - 'target': f'{value.flags} {value.tag} "{value.value}"', - 'subDomain': record.name, - 'ttl': record.ttl, - 'fieldType': record._type - } - - @staticmethod - def _params_for_MX(record): - for value in record.values: - yield { - 'target': f'{value.preference:d} {value.exchange}', - 'subDomain': record.name, - 'ttl': record.ttl, - 'fieldType': record._type - } - - @staticmethod - def _params_for_NAPTR(record): - for value in record.values: - content = f'{value.order} {value.preference} "{value.flags}" ' \ - f'"{value.service}" "{value.regexp}" {value.replacement}' - yield { - 'target': content, - 'subDomain': record.name, - 'ttl': record.ttl, - 'fieldType': record._type - } - - @staticmethod - def _params_for_SRV(record): - for value in record.values: - yield { - 'target': f'{value.priority} {value.weight} {value.port} ' - f'{value.target}', - 'subDomain': record.name, - 'ttl': record.ttl, - 'fieldType': record._type - } - - @staticmethod - def _params_for_SSHFP(record): - for value in record.values: - yield { - 'target': f'{value.algorithm} {value.fingerprint_type} ' - f'{value.fingerprint}', - 'subDomain': record.name, - 'ttl': record.ttl, - 'fieldType': record._type - } - - def _params_for_TXT(self, record): - for value in record.values: - field_type = 'TXT' - if self._is_valid_dkim(value): - field_type = 'DKIM' - value = value.replace("\\;", ";") - yield { - 'target': value, - 'subDomain': record.name, - 'ttl': record.ttl, - 'fieldType': field_type - } - - _params_for_A = _params_for_multiple - _params_for_AAAA = _params_for_multiple - _params_for_NS = _params_for_multiple - _params_for_SPF = _params_for_multiple - - _params_for_CNAME = _params_for_single - _params_for_PTR = _params_for_single - - def _is_valid_dkim(self, value): - """Check if value is a valid DKIM""" - validator_dict = {'h': lambda val: val in ['sha1', 'sha256'], - 's': lambda val: val in ['*', 'email'], - 't': lambda val: val in ['y', 's'], - 'v': lambda val: val == 'DKIM1', - 'k': lambda val: val == 'rsa', - 'n': lambda _: True, - 'g': lambda _: True} - - splitted = [v for v in value.split('\\;') if v] - found_key = False - for splitted_value in splitted: - sub_split = [x.strip() for x in splitted_value.split("=", 1)] - if len(sub_split) < 2: - return False - key, value = sub_split[0], sub_split[1] - if key == "p": - is_valid_key = self._is_valid_dkim_key(value) - if not is_valid_key: - return False - found_key = True - else: - is_valid_key = validator_dict.get(key, lambda _: False)(value) - if not is_valid_key: - return False - return found_key - - @staticmethod - def _is_valid_dkim_key(key): - result = True - base64_decode = getattr(base64, 'decodestring', None) - base64_decode = getattr(base64, 'decodebytes', base64_decode) - - try: - result = base64_decode(bytearray(key, 'utf-8')) - except binascii.Error: - result = False - return result - - def get_records(self, zone_name): - """ - List all records of a DNS zone - :param zone_name: Name of zone - :return: list of id's records - """ - records = self._client.get(f'/domain/zone/{zone_name}/record') - return [self.get_record(zone_name, record_id) for record_id in records] - - def get_record(self, zone_name, record_id): - """ - Get record with given id - :param zone_name: Name of the zone - :param record_id: Id of the record - :return: Value of the record - """ - return self._client.get(f'/domain/zone/{zone_name}/record/{record_id}') - - def delete_records(self, zone_name, record_type, subdomain): - """ - Delete record from have fieldType=type and subDomain=subdomain - :param zone_name: Name of the zone - :param record_type: fieldType - :param subdomain: subDomain - """ - records = self._client.get(f'/domain/zone/{zone_name}/record', - fieldType=record_type, subDomain=subdomain) - for record in records: - self.delete_record(zone_name, record) - - def delete_record(self, zone_name, record_id): - """ - Delete record with a given id - :param zone_name: Name of the zone - :param record_id: Id of the record - """ - self.log.debug('Delete record: zone: %s, id %s', zone_name, record_id) - self._client.delete(f'/domain/zone/{zone_name}/record/{record_id}') - - def create_record(self, zone_name, params): - """ - Create a record - :param zone_name: Name of the zone - :param params: {'fieldType': 'A', 'ttl': 60, 'subDomain': 'www', - 'target': '1.2.3.4' - """ - self.log.debug('Create record: zone: %s, id %s', zone_name, - params) - return self._client.post(f'/domain/zone/{zone_name}/record', **params) +logger = getLogger('Ovh') +try: + logger.warn('octodns_ovh shimmed. Update your provider class to ' + 'octodns_ovh.OvhProvider. ' + 'Shim will be removed in 1.0') + from octodns_ovh import OvhProvider + OvhProvider # pragma: no cover +except ModuleNotFoundError: + logger.exception('OvhProvider has been moved into a seperate module, ' + 'octodns_ovh is now required. Provider class should ' + 'be updated to octodns_ovh.OvhProvider') + raise diff --git a/requirements.txt b/requirements.txt index d6df2ce..1ba51d9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,7 +4,6 @@ docutils==0.16 fqdn==1.5.0 jmespath==0.10.0 natsort==6.2.1 -ovh==0.5.0 pycountry-convert==0.7.2 pycountry==22.1.10 python-dateutil==2.8.1 diff --git a/tests/test_octodns_provider_ovh.py b/tests/test_octodns_provider_ovh.py index 3da4276..27ac8f6 100644 --- a/tests/test_octodns_provider_ovh.py +++ b/tests/test_octodns_provider_ovh.py @@ -7,463 +7,10 @@ from __future__ import absolute_import, division, print_function, \ from unittest import TestCase -from mock import patch, call -from ovh import APIError, ResourceNotFoundError, InvalidCredential -from octodns.provider.ovh import OvhProvider -from octodns.record import Record -from octodns.zone import Zone +class TestOvhShim(TestCase): - -class TestOvhProvider(TestCase): - api_record = [] - valid_dkim = [] - invalid_dkim = [] - - valid_dkim_key = "p=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCxLaG16G4SaE" \ - "cXVdiIxTg7gKSGbHKQLm30CHib1h9FzS9nkcyvQSyQj1rMFyqC//" \ - "tft3ohx3nvJl+bGCWxdtLYDSmir9PW54e5CTdxEh8MWRkBO3StF6" \ - "QG/tAh3aTGDmkqhIJGLb87iHvpmVKqURmEUzJPv5KPJfWLofADI+" \ - "q9lQIDAQAB" - - zone = Zone('unit.tests.', []) - expected = set() - - # A, subdomain='' - api_record.append({ - 'fieldType': 'A', - 'ttl': 100, - 'target': '1.2.3.4', - 'subDomain': '', - 'id': 1 - }) - expected.add(Record.new(zone, '', { - 'ttl': 100, - 'type': 'A', - 'value': '1.2.3.4', - })) - - # A, subdomain='sub - api_record.append({ - 'fieldType': 'A', - 'ttl': 200, - 'target': '1.2.3.4', - 'subDomain': 'sub', - 'id': 2 - }) - expected.add(Record.new(zone, 'sub', { - 'ttl': 200, - 'type': 'A', - 'value': '1.2.3.4', - })) - - # CNAME - api_record.append({ - 'fieldType': 'CNAME', - 'ttl': 300, - 'target': 'unit.tests.', - 'subDomain': 'www2', - 'id': 3 - }) - expected.add(Record.new(zone, 'www2', { - 'ttl': 300, - 'type': 'CNAME', - 'value': 'unit.tests.', - })) - - # MX - api_record.append({ - 'fieldType': 'MX', - 'ttl': 400, - 'target': '10 mx1.unit.tests.', - 'subDomain': '', - 'id': 4 - }) - expected.add(Record.new(zone, '', { - 'ttl': 400, - 'type': 'MX', - 'values': [{ - 'preference': 10, - 'exchange': 'mx1.unit.tests.', - }] - })) - - # NAPTR - api_record.append({ - 'fieldType': 'NAPTR', - 'ttl': 500, - 'target': '10 100 "S" "SIP+D2U" "!^.*$!sip:info@bar.example.com!" .', - 'subDomain': 'naptr', - 'id': 5 - }) - expected.add(Record.new(zone, 'naptr', { - 'ttl': 500, - 'type': 'NAPTR', - 'values': [{ - 'flags': 'S', - 'order': 10, - 'preference': 100, - 'regexp': '!^.*$!sip:info@bar.example.com!', - 'replacement': '.', - 'service': 'SIP+D2U', - }] - })) - - # NS - api_record.append({ - 'fieldType': 'NS', - 'ttl': 600, - 'target': 'ns1.unit.tests.', - 'subDomain': '', - 'id': 6 - }) - api_record.append({ - 'fieldType': 'NS', - 'ttl': 600, - 'target': 'ns2.unit.tests.', - 'subDomain': '', - 'id': 7 - }) - expected.add(Record.new(zone, '', { - 'ttl': 600, - 'type': 'NS', - 'values': ['ns1.unit.tests.', 'ns2.unit.tests.'], - })) - - # NS with sub - api_record.append({ - 'fieldType': 'NS', - 'ttl': 700, - 'target': 'ns3.unit.tests.', - 'subDomain': 'www3', - 'id': 8 - }) - api_record.append({ - 'fieldType': 'NS', - 'ttl': 700, - 'target': 'ns4.unit.tests.', - 'subDomain': 'www3', - 'id': 9 - }) - expected.add(Record.new(zone, 'www3', { - 'ttl': 700, - 'type': 'NS', - 'values': ['ns3.unit.tests.', 'ns4.unit.tests.'], - })) - - api_record.append({ - 'fieldType': 'SRV', - 'ttl': 800, - 'target': '10 20 30 foo-1.unit.tests.', - 'subDomain': '_srv._tcp', - 'id': 10 - }) - api_record.append({ - 'fieldType': 'SRV', - 'ttl': 800, - 'target': '40 50 60 foo-2.unit.tests.', - 'subDomain': '_srv._tcp', - 'id': 11 - }) - expected.add(Record.new(zone, '_srv._tcp', { - 'ttl': 800, - 'type': 'SRV', - 'values': [{ - 'priority': 10, - 'weight': 20, - 'port': 30, - 'target': 'foo-1.unit.tests.', - }, { - 'priority': 40, - 'weight': 50, - 'port': 60, - 'target': 'foo-2.unit.tests.', - }] - })) - - # PTR - api_record.append({ - 'fieldType': 'PTR', - 'ttl': 900, - 'target': 'unit.tests.', - 'subDomain': '4', - 'id': 12 - }) - expected.add(Record.new(zone, '4', { - 'ttl': 900, - 'type': 'PTR', - 'value': 'unit.tests.' - })) - - # SPF - api_record.append({ - 'fieldType': 'SPF', - 'ttl': 1000, - 'target': 'v=spf1 include:unit.texts.redirect ~all', - 'subDomain': '', - 'id': 13 - }) - expected.add(Record.new(zone, '', { - 'ttl': 1000, - 'type': 'SPF', - 'value': 'v=spf1 include:unit.texts.redirect ~all' - })) - - # SSHFP - api_record.append({ - 'fieldType': 'SSHFP', - 'ttl': 1100, - 'target': '1 1 bf6b6825d2977c511a475bbefb88aad54a92ac73 ', - 'subDomain': '', - 'id': 14 - }) - expected.add(Record.new(zone, '', { - 'ttl': 1100, - 'type': 'SSHFP', - 'value': { - 'algorithm': 1, - 'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73', - 'fingerprint_type': 1 - } - })) - - # AAAA - api_record.append({ - 'fieldType': 'AAAA', - 'ttl': 1200, - 'target': '1:1ec:1::1', - 'subDomain': '', - 'id': 15 - }) - expected.add(Record.new(zone, '', { - 'ttl': 200, - 'type': 'AAAA', - 'value': '1:1ec:1::1', - })) - - # DKIM - api_record.append({ - 'fieldType': 'DKIM', - 'ttl': 1300, - 'target': valid_dkim_key, - 'subDomain': 'dkim', - 'id': 16 - }) - expected.add(Record.new(zone, 'dkim', { - 'ttl': 1300, - 'type': 'TXT', - 'value': valid_dkim_key, - })) - - # TXT - api_record.append({ - 'fieldType': 'TXT', - 'ttl': 1400, - 'target': 'TXT text', - 'subDomain': 'txt', - 'id': 17 - }) - expected.add(Record.new(zone, 'txt', { - 'ttl': 1400, - 'type': 'TXT', - 'value': 'TXT text', - })) - - # LOC - # We do not have associated record for LOC, as it's not managed - api_record.append({ - 'fieldType': 'LOC', - 'ttl': 1500, - 'target': '1 1 1 N 1 1 1 E 1m 1m', - 'subDomain': '', - 'id': 18 - }) - - # CAA - api_record.append({ - 'fieldType': 'CAA', - 'ttl': 1600, - 'target': '0 issue "ca.unit.tests"', - 'subDomain': 'caa', - 'id': 19 - }) - expected.add(Record.new(zone, 'caa', { - 'ttl': 1600, - 'type': 'CAA', - 'values': [{ - 'flags': 0, - 'tag': 'issue', - 'value': 'ca.unit.tests' - }] - })) - - valid_dkim = [valid_dkim_key, - 'v=DKIM1 \\; %s' % valid_dkim_key, - 'h=sha256 \\; %s' % valid_dkim_key, - 'h=sha1 \\; %s' % valid_dkim_key, - 's=* \\; %s' % valid_dkim_key, - 's=email \\; %s' % valid_dkim_key, - 't=y \\; %s' % valid_dkim_key, - 't=s \\; %s' % valid_dkim_key, - 'k=rsa \\; %s' % valid_dkim_key, - 'n=notes \\; %s' % valid_dkim_key, - 'g=granularity \\; %s' % valid_dkim_key, - ] - invalid_dkim = ['p=%invalid%', # Invalid public key - 'v=DKIM1', # Missing public key - 'v=DKIM2 \\; %s' % valid_dkim_key, # Invalid version - 'h=sha512 \\; %s' % valid_dkim_key, # Invalid hash algo - 's=fake \\; %s' % valid_dkim_key, # Invalid selector - 't=fake \\; %s' % valid_dkim_key, # Invalid flag - 'u=invalid \\; %s' % valid_dkim_key, # Invalid key - ] - - @patch('ovh.Client') - def test_populate(self, client_mock): - provider = OvhProvider('test', 'endpoint', 'application_key', - 'application_secret', 'consumer_key') - - with patch.object(provider._client, 'get') as get_mock: - zone = Zone('unit.tests.', []) - get_mock.side_effect = ResourceNotFoundError('boom') - with self.assertRaises(APIError) as ctx: - provider.populate(zone) - self.assertEquals(get_mock.side_effect, ctx.exception) - - get_mock.side_effect = InvalidCredential('boom') - with self.assertRaises(APIError) as ctx: - provider.populate(zone) - self.assertEquals(get_mock.side_effect, ctx.exception) - - zone = Zone('unit.tests.', []) - get_mock.side_effect = ResourceNotFoundError('This service does ' - 'not exist') - exists = provider.populate(zone) - self.assertEquals(set(), zone.records) - self.assertFalse(exists) - - zone = Zone('unit.tests.', []) - get_returns = [[record['id'] for record in self.api_record]] - get_returns += self.api_record - get_mock.side_effect = get_returns - exists = provider.populate(zone) - self.assertEquals(self.expected, zone.records) - self.assertTrue(exists) - - @patch('ovh.Client') - def test_is_valid_dkim(self, client_mock): - """Test _is_valid_dkim""" - provider = OvhProvider('test', 'endpoint', 'application_key', - 'application_secret', 'consumer_key') - for dkim in self.valid_dkim: - self.assertTrue(provider._is_valid_dkim(dkim)) - for dkim in self.invalid_dkim: - self.assertFalse(provider._is_valid_dkim(dkim)) - - @patch('ovh.Client') - def test_apply(self, client_mock): - provider = OvhProvider('test', 'endpoint', 'application_key', - 'application_secret', 'consumer_key') - - desired = Zone('unit.tests.', []) - - for r in self.expected: - desired.add_record(r) - - with patch.object(provider._client, 'post') as get_mock: - plan = provider.plan(desired) - get_mock.side_effect = APIError('boom') - with self.assertRaises(APIError) as ctx: - provider.apply(plan) - self.assertEquals(get_mock.side_effect, ctx.exception) - - # Records get by API call - with patch.object(provider._client, 'get') as get_mock: - get_returns = [ - [1, 2, 3, 4], - {'fieldType': 'A', 'ttl': 600, 'target': '5.6.7.8', - 'subDomain': '', 'id': 100}, - {'fieldType': 'A', 'ttl': 600, 'target': '5.6.7.8', - 'subDomain': 'fake', 'id': 101}, - {'fieldType': 'TXT', 'ttl': 600, 'target': 'fake txt record', - 'subDomain': 'txt', 'id': 102}, - {'fieldType': 'DKIM', 'ttl': 600, - 'target': 'v=DKIM1; %s' % self.valid_dkim_key, - 'subDomain': 'dkim', 'id': 103} - ] - get_mock.side_effect = get_returns - - plan = provider.plan(desired) - - with patch.object(provider._client, 'post') as post_mock, \ - patch.object(provider._client, 'delete') as delete_mock: - get_mock.side_effect = [[100], [101], [102], [103]] - provider.apply(plan) - wanted_calls = [ - call('/domain/zone/unit.tests/record', fieldType='A', - subDomain='', target='1.2.3.4', ttl=100), - call('/domain/zone/unit.tests/record', fieldType='AAAA', - subDomain='', target='1:1ec:1::1', ttl=200), - call('/domain/zone/unit.tests/record', fieldType='MX', - subDomain='', target='10 mx1.unit.tests.', ttl=400), - call('/domain/zone/unit.tests/record', fieldType='SPF', - subDomain='', - target='v=spf1 include:unit.texts.redirect ~all', - ttl=1000), - call('/domain/zone/unit.tests/record', fieldType='SSHFP', - subDomain='', - target='1 1 bf6b6825d2977c511a475bbefb88aad54a92ac73', - ttl=1100), - call('/domain/zone/unit.tests/record', fieldType='PTR', - subDomain='4', target='unit.tests.', ttl=900), - call('/domain/zone/unit.tests/record', fieldType='SRV', - subDomain='_srv._tcp', - target='10 20 30 foo-1.unit.tests.', ttl=800), - call('/domain/zone/unit.tests/record', fieldType='SRV', - subDomain='_srv._tcp', - target='40 50 60 foo-2.unit.tests.', ttl=800), - call('/domain/zone/unit.tests/record', fieldType='CAA', - subDomain='caa', target='0 issue "ca.unit.tests"', - ttl=1600), - call('/domain/zone/unit.tests/record', fieldType='DKIM', - subDomain='dkim', - target='p=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCxLaG' - '16G4SaEcXVdiIxTg7gKSGbHKQLm30CHib1h9FzS9nkcyvQSyQj1r' - 'MFyqC//tft3ohx3nvJl+bGCWxdtLYDSmir9PW54e5CTdxEh8MWRk' - 'BO3StF6QG/tAh3aTGDmkqhIJGLb87iHvpmVKqURmEUzJPv5KPJfW' - 'LofADI+q9lQIDAQAB', ttl=1300), - call('/domain/zone/unit.tests/record', fieldType='NAPTR', - subDomain='naptr', - target='10 100 "S" "SIP+D2U" "!^.*$!sip:info@bar.exam' - 'ple.com!" .', ttl=500), - call('/domain/zone/unit.tests/record', fieldType='A', - subDomain='sub', target='1.2.3.4', ttl=200), - call('/domain/zone/unit.tests/record', fieldType='TXT', - subDomain='txt', target='TXT text', ttl=1400), - call('/domain/zone/unit.tests/record', fieldType='CNAME', - subDomain='www2', target='unit.tests.', ttl=300), - call('/domain/zone/unit.tests/record', fieldType='NS', - subDomain='www3', target='ns3.unit.tests.', ttl=700), - call('/domain/zone/unit.tests/record', fieldType='NS', - subDomain='www3', target='ns4.unit.tests.', ttl=700), - call('/domain/zone/unit.tests/refresh')] - - post_mock.assert_has_calls(wanted_calls) - - # Get for delete calls - wanted_get_calls = [ - call(u'/domain/zone/unit.tests/record', fieldType=u'A', - subDomain=u''), - call(u'/domain/zone/unit.tests/record', fieldType=u'DKIM', - subDomain='dkim'), - call(u'/domain/zone/unit.tests/record', fieldType=u'A', - subDomain='fake'), - call(u'/domain/zone/unit.tests/record', fieldType=u'TXT', - subDomain='txt')] - get_mock.assert_has_calls(wanted_get_calls) - # 4 delete calls for update and delete - delete_mock.assert_has_calls( - [call(u'/domain/zone/unit.tests/record/100'), - call(u'/domain/zone/unit.tests/record/101'), - call(u'/domain/zone/unit.tests/record/102'), - call(u'/domain/zone/unit.tests/record/103')]) + def test_missing(self): + with self.assertRaises(ModuleNotFoundError): + from octodns.provider.ovh import OvhProvider + OvhProvider