1
0
mirror of https://github.com/github/octodns.git synced 2024-05-11 05:55:00 +00:00

Extract OvhProvider from octoDNS core

This commit is contained in:
Ross McFarland
2022-01-13 15:26:38 -08:00
parent 4d53d05bdb
commit c86b5bb904
5 changed files with 20 additions and 876 deletions

View File

@@ -22,6 +22,7 @@
* [HetznerProvider](https://github.com/octodns/octodns-hetzner/)
* [MythicBeastsProvider](https://github.com/octodns/octodns-mythicbeasts/)
* [Ns1Provider](https://github.com/octodns/octodns-ns1/)
* [OvhProvider](https://github.com/octodns/octodns-ovh/)
* [PowerDnsProvider](https://github.com/octodns/octodns-powerdns/)
* [Route53Provider](https://github.com/octodns/octodns-route53/) also
AwsAcmMangingProcessor

View File

@@ -209,7 +209,7 @@ The table below lists the providers octoDNS supports. We're currently in the pro
| [HetznerProvider](https://github.com/octodns/octodns-hetzner/) | [octodns_hetzner](https://github.com/octodns/octodns-hetzner/) | | | | |
| [MythicBeastsProvider](https://github.com/octodns/octodns-mythicbeasts/) | [octodns_mythicbeasts](https://github.com/octodns/octodns-mythicbeasts/) | | | | |
| [Ns1Provider](https://github.com/octodns/octodns-ns1/) | [octodns_ns1](https://github.com/octodns/octodns-ns1/) | | | | |
| [OVH](/octodns/provider/ovh.py) | | ovh | A, AAAA, CAA, CNAME, MX, NAPTR, NS, PTR, SPF, SRV, SSHFP, TXT, DKIM | No | |
| [OvhProvider](https://github.com/octodns/octodns-ovh/) | [octodns_ovh](https://github.com/octodns/octodns-ovh/) | | A, AAAA, CAA, CNAME, MX, NAPTR, NS, PTR, SPF, SRV, SSHFP, TXT, DKIM | No | |
| [PowerDnsProvider](https://github.com/octodns/octodns-powerdns/) | [octodns_powerdns](https://github.com/octodns/octodns-powerdns/) | | | | |
| [Rackspace](/octodns/provider/rackspace.py) | | | A, AAAA, ALIAS, CNAME, MX, NS, PTR, SPF, TXT | No | |
| [Route53](https://github.com/octodns/octodns-route53) | [octodns_route53](https://github.com/octodns/octodns-route53) | | | | |

View File

@@ -5,420 +5,17 @@
from __future__ import absolute_import, division, print_function, \
unicode_literals
import base64
import binascii
import logging
from collections import defaultdict
from logging import getLogger
import ovh
from ovh import ResourceNotFoundError
from octodns.record import Record
from .base import BaseProvider
class OvhProvider(BaseProvider):
"""
OVH provider using API v6
ovh:
class: octodns.provider.ovh.OvhProvider
# OVH api v6 endpoint
endpoint: ovh-eu
# API application key
application_key: 1234
# API application secret
application_secret: 1234
# API consumer key
consumer_key: 1234
"""
SUPPORTS_GEO = False
SUPPORTS_DYNAMIC = False
ZONE_NOT_FOUND_MESSAGE = 'This service does not exist'
# This variable is also used in populate method to filter which OVH record
# types are supported by octodns
SUPPORTS = set(('A', 'AAAA', 'CAA', 'CNAME', 'DKIM', 'MX', 'NAPTR', 'NS',
'PTR', 'SPF', 'SRV', 'SSHFP', 'TXT'))
def __init__(self, id, endpoint, application_key, application_secret,
consumer_key, *args, **kwargs):
self.log = logging.getLogger(f'OvhProvider[{id}]')
self.log.debug('__init__: id=%s, endpoint=%s, application_key=%s, '
'application_secret=***, consumer_key=%s', id, endpoint,
application_key, consumer_key)
super(OvhProvider, self).__init__(id, *args, **kwargs)
self._client = ovh.Client(
endpoint=endpoint,
application_key=application_key,
application_secret=application_secret,
consumer_key=consumer_key,
)
def populate(self, zone, target=False, lenient=False):
self.log.debug('populate: name=%s, target=%s, lenient=%s', zone.name,
target, lenient)
zone_name = zone.name[:-1]
try:
records = self.get_records(zone_name=zone_name)
exists = True
except ResourceNotFoundError as e:
if str(e) != self.ZONE_NOT_FOUND_MESSAGE:
raise
exists = False
records = []
values = defaultdict(lambda: defaultdict(list))
for record in records:
values[record['subDomain']][record['fieldType']].append(record)
before = len(zone.records)
for name, types in values.items():
for _type, records in types.items():
if _type not in self.SUPPORTS:
self.log.warning('Not managed record of type %s, skip',
_type)
continue
data_for = getattr(self, f'_data_for_{_type}')
record = Record.new(zone, name, data_for(_type, records),
source=self, lenient=lenient)
zone.add_record(record, lenient=lenient)
self.log.info('populate: found %s records, exists=%s',
len(zone.records) - before, exists)
return exists
def _apply(self, plan):
desired = plan.desired
changes = plan.changes
zone_name = desired.name[:-1]
self.log.info('_apply: zone=%s, len(changes)=%d', desired.name,
len(changes))
for change in changes:
class_name = change.__class__.__name__
getattr(self, f'_apply_{class_name}'.lower())(zone_name, change)
# We need to refresh the zone to really apply the changes
self._client.post(f'/domain/zone/{zone_name}/refresh')
def _apply_create(self, zone_name, change):
new = change.new
params_for = getattr(self, f'_params_for_{new._type}')
for params in params_for(new):
self.create_record(zone_name, params)
def _apply_update(self, zone_name, change):
self._apply_delete(zone_name, change)
self._apply_create(zone_name, change)
def _apply_delete(self, zone_name, change):
existing = change.existing
record_type = existing._type
if record_type == "TXT":
if self._is_valid_dkim(existing.values[0]):
record_type = 'DKIM'
self.delete_records(zone_name, record_type, existing.name)
@staticmethod
def _data_for_multiple(_type, records):
return {
'ttl': records[0]['ttl'],
'type': _type,
'values': [record['target'] for record in records]
}
@staticmethod
def _data_for_single(_type, records):
record = records[0]
return {
'ttl': record['ttl'],
'type': _type,
'value': record['target']
}
@staticmethod
def _data_for_CAA(_type, records):
values = []
for record in records:
flags, tag, value = record['target'].split(' ', 2)
values.append({
'flags': flags,
'tag': tag,
'value': value[1:-1]
})
return {
'ttl': records[0]['ttl'],
'type': _type,
'values': values
}
@staticmethod
def _data_for_MX(_type, records):
values = []
for record in records:
preference, exchange = record['target'].split(' ', 1)
values.append({
'preference': preference,
'exchange': exchange,
})
return {
'ttl': records[0]['ttl'],
'type': _type,
'values': values,
}
@staticmethod
def _data_for_NAPTR(_type, records):
values = []
for record in records:
order, preference, flags, service, regexp, replacement = record[
'target'].split(' ', 5)
values.append({
'flags': flags[1:-1],
'order': order,
'preference': preference,
'regexp': regexp[1:-1],
'replacement': replacement,
'service': service[1:-1],
})
return {
'type': _type,
'ttl': records[0]['ttl'],
'values': values
}
@staticmethod
def _data_for_SRV(_type, records):
values = []
for record in records:
priority, weight, port, target = record['target'].split(' ', 3)
values.append({
'port': port,
'priority': priority,
'target': f'{target}.',
'weight': weight
})
return {
'type': _type,
'ttl': records[0]['ttl'],
'values': values
}
@staticmethod
def _data_for_SSHFP(_type, records):
values = []
for record in records:
algorithm, fingerprint_type, fingerprint = record['target'].split(
' ', 2)
values.append({
'algorithm': algorithm,
'fingerprint': fingerprint,
'fingerprint_type': fingerprint_type
})
return {
'type': _type,
'ttl': records[0]['ttl'],
'values': values
}
@staticmethod
def _data_for_DKIM(_type, records):
return {
'ttl': records[0]['ttl'],
'type': "TXT",
'values': [record['target'].replace(';', '\\;')
for record in records]
}
_data_for_A = _data_for_multiple
_data_for_AAAA = _data_for_multiple
_data_for_NS = _data_for_multiple
_data_for_TXT = _data_for_multiple
_data_for_SPF = _data_for_multiple
_data_for_PTR = _data_for_single
_data_for_CNAME = _data_for_single
@staticmethod
def _params_for_multiple(record):
for value in record.values:
yield {
'target': value,
'subDomain': record.name,
'ttl': record.ttl,
'fieldType': record._type,
}
@staticmethod
def _params_for_single(record):
yield {
'target': record.value,
'subDomain': record.name,
'ttl': record.ttl,
'fieldType': record._type
}
@staticmethod
def _params_for_CAA(record):
for value in record.values:
yield {
'target': f'{value.flags} {value.tag} "{value.value}"',
'subDomain': record.name,
'ttl': record.ttl,
'fieldType': record._type
}
@staticmethod
def _params_for_MX(record):
for value in record.values:
yield {
'target': f'{value.preference:d} {value.exchange}',
'subDomain': record.name,
'ttl': record.ttl,
'fieldType': record._type
}
@staticmethod
def _params_for_NAPTR(record):
for value in record.values:
content = f'{value.order} {value.preference} "{value.flags}" ' \
f'"{value.service}" "{value.regexp}" {value.replacement}'
yield {
'target': content,
'subDomain': record.name,
'ttl': record.ttl,
'fieldType': record._type
}
@staticmethod
def _params_for_SRV(record):
for value in record.values:
yield {
'target': f'{value.priority} {value.weight} {value.port} '
f'{value.target}',
'subDomain': record.name,
'ttl': record.ttl,
'fieldType': record._type
}
@staticmethod
def _params_for_SSHFP(record):
for value in record.values:
yield {
'target': f'{value.algorithm} {value.fingerprint_type} '
f'{value.fingerprint}',
'subDomain': record.name,
'ttl': record.ttl,
'fieldType': record._type
}
def _params_for_TXT(self, record):
for value in record.values:
field_type = 'TXT'
if self._is_valid_dkim(value):
field_type = 'DKIM'
value = value.replace("\\;", ";")
yield {
'target': value,
'subDomain': record.name,
'ttl': record.ttl,
'fieldType': field_type
}
_params_for_A = _params_for_multiple
_params_for_AAAA = _params_for_multiple
_params_for_NS = _params_for_multiple
_params_for_SPF = _params_for_multiple
_params_for_CNAME = _params_for_single
_params_for_PTR = _params_for_single
def _is_valid_dkim(self, value):
"""Check if value is a valid DKIM"""
validator_dict = {'h': lambda val: val in ['sha1', 'sha256'],
's': lambda val: val in ['*', 'email'],
't': lambda val: val in ['y', 's'],
'v': lambda val: val == 'DKIM1',
'k': lambda val: val == 'rsa',
'n': lambda _: True,
'g': lambda _: True}
splitted = [v for v in value.split('\\;') if v]
found_key = False
for splitted_value in splitted:
sub_split = [x.strip() for x in splitted_value.split("=", 1)]
if len(sub_split) < 2:
return False
key, value = sub_split[0], sub_split[1]
if key == "p":
is_valid_key = self._is_valid_dkim_key(value)
if not is_valid_key:
return False
found_key = True
else:
is_valid_key = validator_dict.get(key, lambda _: False)(value)
if not is_valid_key:
return False
return found_key
@staticmethod
def _is_valid_dkim_key(key):
result = True
base64_decode = getattr(base64, 'decodestring', None)
base64_decode = getattr(base64, 'decodebytes', base64_decode)
try:
result = base64_decode(bytearray(key, 'utf-8'))
except binascii.Error:
result = False
return result
def get_records(self, zone_name):
"""
List all records of a DNS zone
:param zone_name: Name of zone
:return: list of id's records
"""
records = self._client.get(f'/domain/zone/{zone_name}/record')
return [self.get_record(zone_name, record_id) for record_id in records]
def get_record(self, zone_name, record_id):
"""
Get record with given id
:param zone_name: Name of the zone
:param record_id: Id of the record
:return: Value of the record
"""
return self._client.get(f'/domain/zone/{zone_name}/record/{record_id}')
def delete_records(self, zone_name, record_type, subdomain):
"""
Delete record from have fieldType=type and subDomain=subdomain
:param zone_name: Name of the zone
:param record_type: fieldType
:param subdomain: subDomain
"""
records = self._client.get(f'/domain/zone/{zone_name}/record',
fieldType=record_type, subDomain=subdomain)
for record in records:
self.delete_record(zone_name, record)
def delete_record(self, zone_name, record_id):
"""
Delete record with a given id
:param zone_name: Name of the zone
:param record_id: Id of the record
"""
self.log.debug('Delete record: zone: %s, id %s', zone_name, record_id)
self._client.delete(f'/domain/zone/{zone_name}/record/{record_id}')
def create_record(self, zone_name, params):
"""
Create a record
:param zone_name: Name of the zone
:param params: {'fieldType': 'A', 'ttl': 60, 'subDomain': 'www',
'target': '1.2.3.4'
"""
self.log.debug('Create record: zone: %s, id %s', zone_name,
params)
return self._client.post(f'/domain/zone/{zone_name}/record', **params)
logger = getLogger('Ovh')
try:
logger.warn('octodns_ovh shimmed. Update your provider class to '
'octodns_ovh.OvhProvider. '
'Shim will be removed in 1.0')
from octodns_ovh import OvhProvider
OvhProvider # pragma: no cover
except ModuleNotFoundError:
logger.exception('OvhProvider has been moved into a seperate module, '
'octodns_ovh is now required. Provider class should '
'be updated to octodns_ovh.OvhProvider')
raise

View File

@@ -4,7 +4,6 @@ docutils==0.16
fqdn==1.5.0
jmespath==0.10.0
natsort==6.2.1
ovh==0.5.0
pycountry-convert==0.7.2
pycountry==22.1.10
python-dateutil==2.8.1

View File

@@ -7,463 +7,10 @@ from __future__ import absolute_import, division, print_function, \
from unittest import TestCase
from mock import patch, call
from ovh import APIError, ResourceNotFoundError, InvalidCredential
from octodns.provider.ovh import OvhProvider
from octodns.record import Record
from octodns.zone import Zone
class TestOvhShim(TestCase):
class TestOvhProvider(TestCase):
api_record = []
valid_dkim = []
invalid_dkim = []
valid_dkim_key = "p=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCxLaG16G4SaE" \
"cXVdiIxTg7gKSGbHKQLm30CHib1h9FzS9nkcyvQSyQj1rMFyqC//" \
"tft3ohx3nvJl+bGCWxdtLYDSmir9PW54e5CTdxEh8MWRkBO3StF6" \
"QG/tAh3aTGDmkqhIJGLb87iHvpmVKqURmEUzJPv5KPJfWLofADI+" \
"q9lQIDAQAB"
zone = Zone('unit.tests.', [])
expected = set()
# A, subdomain=''
api_record.append({
'fieldType': 'A',
'ttl': 100,
'target': '1.2.3.4',
'subDomain': '',
'id': 1
})
expected.add(Record.new(zone, '', {
'ttl': 100,
'type': 'A',
'value': '1.2.3.4',
}))
# A, subdomain='sub
api_record.append({
'fieldType': 'A',
'ttl': 200,
'target': '1.2.3.4',
'subDomain': 'sub',
'id': 2
})
expected.add(Record.new(zone, 'sub', {
'ttl': 200,
'type': 'A',
'value': '1.2.3.4',
}))
# CNAME
api_record.append({
'fieldType': 'CNAME',
'ttl': 300,
'target': 'unit.tests.',
'subDomain': 'www2',
'id': 3
})
expected.add(Record.new(zone, 'www2', {
'ttl': 300,
'type': 'CNAME',
'value': 'unit.tests.',
}))
# MX
api_record.append({
'fieldType': 'MX',
'ttl': 400,
'target': '10 mx1.unit.tests.',
'subDomain': '',
'id': 4
})
expected.add(Record.new(zone, '', {
'ttl': 400,
'type': 'MX',
'values': [{
'preference': 10,
'exchange': 'mx1.unit.tests.',
}]
}))
# NAPTR
api_record.append({
'fieldType': 'NAPTR',
'ttl': 500,
'target': '10 100 "S" "SIP+D2U" "!^.*$!sip:info@bar.example.com!" .',
'subDomain': 'naptr',
'id': 5
})
expected.add(Record.new(zone, 'naptr', {
'ttl': 500,
'type': 'NAPTR',
'values': [{
'flags': 'S',
'order': 10,
'preference': 100,
'regexp': '!^.*$!sip:info@bar.example.com!',
'replacement': '.',
'service': 'SIP+D2U',
}]
}))
# NS
api_record.append({
'fieldType': 'NS',
'ttl': 600,
'target': 'ns1.unit.tests.',
'subDomain': '',
'id': 6
})
api_record.append({
'fieldType': 'NS',
'ttl': 600,
'target': 'ns2.unit.tests.',
'subDomain': '',
'id': 7
})
expected.add(Record.new(zone, '', {
'ttl': 600,
'type': 'NS',
'values': ['ns1.unit.tests.', 'ns2.unit.tests.'],
}))
# NS with sub
api_record.append({
'fieldType': 'NS',
'ttl': 700,
'target': 'ns3.unit.tests.',
'subDomain': 'www3',
'id': 8
})
api_record.append({
'fieldType': 'NS',
'ttl': 700,
'target': 'ns4.unit.tests.',
'subDomain': 'www3',
'id': 9
})
expected.add(Record.new(zone, 'www3', {
'ttl': 700,
'type': 'NS',
'values': ['ns3.unit.tests.', 'ns4.unit.tests.'],
}))
api_record.append({
'fieldType': 'SRV',
'ttl': 800,
'target': '10 20 30 foo-1.unit.tests.',
'subDomain': '_srv._tcp',
'id': 10
})
api_record.append({
'fieldType': 'SRV',
'ttl': 800,
'target': '40 50 60 foo-2.unit.tests.',
'subDomain': '_srv._tcp',
'id': 11
})
expected.add(Record.new(zone, '_srv._tcp', {
'ttl': 800,
'type': 'SRV',
'values': [{
'priority': 10,
'weight': 20,
'port': 30,
'target': 'foo-1.unit.tests.',
}, {
'priority': 40,
'weight': 50,
'port': 60,
'target': 'foo-2.unit.tests.',
}]
}))
# PTR
api_record.append({
'fieldType': 'PTR',
'ttl': 900,
'target': 'unit.tests.',
'subDomain': '4',
'id': 12
})
expected.add(Record.new(zone, '4', {
'ttl': 900,
'type': 'PTR',
'value': 'unit.tests.'
}))
# SPF
api_record.append({
'fieldType': 'SPF',
'ttl': 1000,
'target': 'v=spf1 include:unit.texts.redirect ~all',
'subDomain': '',
'id': 13
})
expected.add(Record.new(zone, '', {
'ttl': 1000,
'type': 'SPF',
'value': 'v=spf1 include:unit.texts.redirect ~all'
}))
# SSHFP
api_record.append({
'fieldType': 'SSHFP',
'ttl': 1100,
'target': '1 1 bf6b6825d2977c511a475bbefb88aad54a92ac73 ',
'subDomain': '',
'id': 14
})
expected.add(Record.new(zone, '', {
'ttl': 1100,
'type': 'SSHFP',
'value': {
'algorithm': 1,
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73',
'fingerprint_type': 1
}
}))
# AAAA
api_record.append({
'fieldType': 'AAAA',
'ttl': 1200,
'target': '1:1ec:1::1',
'subDomain': '',
'id': 15
})
expected.add(Record.new(zone, '', {
'ttl': 200,
'type': 'AAAA',
'value': '1:1ec:1::1',
}))
# DKIM
api_record.append({
'fieldType': 'DKIM',
'ttl': 1300,
'target': valid_dkim_key,
'subDomain': 'dkim',
'id': 16
})
expected.add(Record.new(zone, 'dkim', {
'ttl': 1300,
'type': 'TXT',
'value': valid_dkim_key,
}))
# TXT
api_record.append({
'fieldType': 'TXT',
'ttl': 1400,
'target': 'TXT text',
'subDomain': 'txt',
'id': 17
})
expected.add(Record.new(zone, 'txt', {
'ttl': 1400,
'type': 'TXT',
'value': 'TXT text',
}))
# LOC
# We do not have associated record for LOC, as it's not managed
api_record.append({
'fieldType': 'LOC',
'ttl': 1500,
'target': '1 1 1 N 1 1 1 E 1m 1m',
'subDomain': '',
'id': 18
})
# CAA
api_record.append({
'fieldType': 'CAA',
'ttl': 1600,
'target': '0 issue "ca.unit.tests"',
'subDomain': 'caa',
'id': 19
})
expected.add(Record.new(zone, 'caa', {
'ttl': 1600,
'type': 'CAA',
'values': [{
'flags': 0,
'tag': 'issue',
'value': 'ca.unit.tests'
}]
}))
valid_dkim = [valid_dkim_key,
'v=DKIM1 \\; %s' % valid_dkim_key,
'h=sha256 \\; %s' % valid_dkim_key,
'h=sha1 \\; %s' % valid_dkim_key,
's=* \\; %s' % valid_dkim_key,
's=email \\; %s' % valid_dkim_key,
't=y \\; %s' % valid_dkim_key,
't=s \\; %s' % valid_dkim_key,
'k=rsa \\; %s' % valid_dkim_key,
'n=notes \\; %s' % valid_dkim_key,
'g=granularity \\; %s' % valid_dkim_key,
]
invalid_dkim = ['p=%invalid%', # Invalid public key
'v=DKIM1', # Missing public key
'v=DKIM2 \\; %s' % valid_dkim_key, # Invalid version
'h=sha512 \\; %s' % valid_dkim_key, # Invalid hash algo
's=fake \\; %s' % valid_dkim_key, # Invalid selector
't=fake \\; %s' % valid_dkim_key, # Invalid flag
'u=invalid \\; %s' % valid_dkim_key, # Invalid key
]
@patch('ovh.Client')
def test_populate(self, client_mock):
provider = OvhProvider('test', 'endpoint', 'application_key',
'application_secret', 'consumer_key')
with patch.object(provider._client, 'get') as get_mock:
zone = Zone('unit.tests.', [])
get_mock.side_effect = ResourceNotFoundError('boom')
with self.assertRaises(APIError) as ctx:
provider.populate(zone)
self.assertEquals(get_mock.side_effect, ctx.exception)
get_mock.side_effect = InvalidCredential('boom')
with self.assertRaises(APIError) as ctx:
provider.populate(zone)
self.assertEquals(get_mock.side_effect, ctx.exception)
zone = Zone('unit.tests.', [])
get_mock.side_effect = ResourceNotFoundError('This service does '
'not exist')
exists = provider.populate(zone)
self.assertEquals(set(), zone.records)
self.assertFalse(exists)
zone = Zone('unit.tests.', [])
get_returns = [[record['id'] for record in self.api_record]]
get_returns += self.api_record
get_mock.side_effect = get_returns
exists = provider.populate(zone)
self.assertEquals(self.expected, zone.records)
self.assertTrue(exists)
@patch('ovh.Client')
def test_is_valid_dkim(self, client_mock):
"""Test _is_valid_dkim"""
provider = OvhProvider('test', 'endpoint', 'application_key',
'application_secret', 'consumer_key')
for dkim in self.valid_dkim:
self.assertTrue(provider._is_valid_dkim(dkim))
for dkim in self.invalid_dkim:
self.assertFalse(provider._is_valid_dkim(dkim))
@patch('ovh.Client')
def test_apply(self, client_mock):
provider = OvhProvider('test', 'endpoint', 'application_key',
'application_secret', 'consumer_key')
desired = Zone('unit.tests.', [])
for r in self.expected:
desired.add_record(r)
with patch.object(provider._client, 'post') as get_mock:
plan = provider.plan(desired)
get_mock.side_effect = APIError('boom')
with self.assertRaises(APIError) as ctx:
provider.apply(plan)
self.assertEquals(get_mock.side_effect, ctx.exception)
# Records get by API call
with patch.object(provider._client, 'get') as get_mock:
get_returns = [
[1, 2, 3, 4],
{'fieldType': 'A', 'ttl': 600, 'target': '5.6.7.8',
'subDomain': '', 'id': 100},
{'fieldType': 'A', 'ttl': 600, 'target': '5.6.7.8',
'subDomain': 'fake', 'id': 101},
{'fieldType': 'TXT', 'ttl': 600, 'target': 'fake txt record',
'subDomain': 'txt', 'id': 102},
{'fieldType': 'DKIM', 'ttl': 600,
'target': 'v=DKIM1; %s' % self.valid_dkim_key,
'subDomain': 'dkim', 'id': 103}
]
get_mock.side_effect = get_returns
plan = provider.plan(desired)
with patch.object(provider._client, 'post') as post_mock, \
patch.object(provider._client, 'delete') as delete_mock:
get_mock.side_effect = [[100], [101], [102], [103]]
provider.apply(plan)
wanted_calls = [
call('/domain/zone/unit.tests/record', fieldType='A',
subDomain='', target='1.2.3.4', ttl=100),
call('/domain/zone/unit.tests/record', fieldType='AAAA',
subDomain='', target='1:1ec:1::1', ttl=200),
call('/domain/zone/unit.tests/record', fieldType='MX',
subDomain='', target='10 mx1.unit.tests.', ttl=400),
call('/domain/zone/unit.tests/record', fieldType='SPF',
subDomain='',
target='v=spf1 include:unit.texts.redirect ~all',
ttl=1000),
call('/domain/zone/unit.tests/record', fieldType='SSHFP',
subDomain='',
target='1 1 bf6b6825d2977c511a475bbefb88aad54a92ac73',
ttl=1100),
call('/domain/zone/unit.tests/record', fieldType='PTR',
subDomain='4', target='unit.tests.', ttl=900),
call('/domain/zone/unit.tests/record', fieldType='SRV',
subDomain='_srv._tcp',
target='10 20 30 foo-1.unit.tests.', ttl=800),
call('/domain/zone/unit.tests/record', fieldType='SRV',
subDomain='_srv._tcp',
target='40 50 60 foo-2.unit.tests.', ttl=800),
call('/domain/zone/unit.tests/record', fieldType='CAA',
subDomain='caa', target='0 issue "ca.unit.tests"',
ttl=1600),
call('/domain/zone/unit.tests/record', fieldType='DKIM',
subDomain='dkim',
target='p=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCxLaG'
'16G4SaEcXVdiIxTg7gKSGbHKQLm30CHib1h9FzS9nkcyvQSyQj1r'
'MFyqC//tft3ohx3nvJl+bGCWxdtLYDSmir9PW54e5CTdxEh8MWRk'
'BO3StF6QG/tAh3aTGDmkqhIJGLb87iHvpmVKqURmEUzJPv5KPJfW'
'LofADI+q9lQIDAQAB', ttl=1300),
call('/domain/zone/unit.tests/record', fieldType='NAPTR',
subDomain='naptr',
target='10 100 "S" "SIP+D2U" "!^.*$!sip:info@bar.exam'
'ple.com!" .', ttl=500),
call('/domain/zone/unit.tests/record', fieldType='A',
subDomain='sub', target='1.2.3.4', ttl=200),
call('/domain/zone/unit.tests/record', fieldType='TXT',
subDomain='txt', target='TXT text', ttl=1400),
call('/domain/zone/unit.tests/record', fieldType='CNAME',
subDomain='www2', target='unit.tests.', ttl=300),
call('/domain/zone/unit.tests/record', fieldType='NS',
subDomain='www3', target='ns3.unit.tests.', ttl=700),
call('/domain/zone/unit.tests/record', fieldType='NS',
subDomain='www3', target='ns4.unit.tests.', ttl=700),
call('/domain/zone/unit.tests/refresh')]
post_mock.assert_has_calls(wanted_calls)
# Get for delete calls
wanted_get_calls = [
call(u'/domain/zone/unit.tests/record', fieldType=u'A',
subDomain=u''),
call(u'/domain/zone/unit.tests/record', fieldType=u'DKIM',
subDomain='dkim'),
call(u'/domain/zone/unit.tests/record', fieldType=u'A',
subDomain='fake'),
call(u'/domain/zone/unit.tests/record', fieldType=u'TXT',
subDomain='txt')]
get_mock.assert_has_calls(wanted_get_calls)
# 4 delete calls for update and delete
delete_mock.assert_has_calls(
[call(u'/domain/zone/unit.tests/record/100'),
call(u'/domain/zone/unit.tests/record/101'),
call(u'/domain/zone/unit.tests/record/102'),
call(u'/domain/zone/unit.tests/record/103')])
def test_missing(self):
with self.assertRaises(ModuleNotFoundError):
from octodns.provider.ovh import OvhProvider
OvhProvider