1
0
mirror of https://github.com/github/octodns.git synced 2024-05-11 05:55:00 +00:00

Added TransIP provider and tests

This commit is contained in:
Maikel Poot
2019-09-25 07:01:06 +02:00
parent 736d588e86
commit bb3f0c0b4a
2 changed files with 562 additions and 0 deletions

328
octodns/provider/transip.py Normal file
View File

@@ -0,0 +1,328 @@
#
#
#
from __future__ import absolute_import, division, print_function, \
unicode_literals
from suds import WebFault
from collections import defaultdict
from .base import BaseProvider
from logging import getLogger
from ..record import Record
from transip.service.domain import DomainService
from transip.service.objects import DnsEntry
class TransipProvider(BaseProvider):
'''
Transip DNS provider
transip:
class: octodns.provider.transip.TransipProvider
# Your Transip account name (required)
account: yourname
# The api key (required)
key: |
\'''
-----BEGIN PRIVATE KEY-----
...
-----END PRIVATE KEY-----
\'''
'''
SUPPORTS_GEO = False
SUPPORTS_DYNAMIC = False
SUPPORTS = set(
('A', 'AAAA', 'CNAME', 'MX', 'SRV', 'SPF', 'TXT', 'SSHFP', 'CAA'))
# unsupported by OctoDNS: 'TLSA', 'CAA'
MIN_TTL = 120
TIMEOUT = 15
ROOT_RECORD = '@'
def __init__(self, id, account, key, *args, **kwargs):
self.log = getLogger('TransipProvider[{}]'.format(id))
self.log.debug('__init__: id=%s, account=%s, token=***', id,
account)
super(TransipProvider, self).__init__(id, *args, **kwargs)
self._client = DomainService(account, key)
self.account = account
self.key = key
self._zones = None
self._zone_records = {}
self._currentZone = {}
def populate(self, zone, target=False, lenient=False):
exists = False
self._currentZone = zone
self.log.debug('populate: name=%s, target=%s, lenient=%s', zone.name,
target, lenient)
before = len(zone.records)
try:
zoneInfo = self._client.get_info(zone.name[:-1])
except WebFault as e:
if e.fault.faultcode == '102' and target == False:
self.log.warning(
'populate: (%s) Zone %s not found in account ',
e.fault.faultcode, zone.name)
exists = False
return exists
elif e.fault.faultcode == '102' and target == True:
self.log.warning('populate: Transip can\'t create new zones')
raise Exception(
('populate: ({}) Transip used ' +
'as target for non-existing zone: {}').format(
e.fault.faultcode, zone.name))
else:
self.log.error('populate: (%s) %s ', e.fault.faultcode,
e.fault.faultstring)
raise e
self.log.debug('populate: found %s records for zone %s',
len(zoneInfo.dnsEntries), zone.name)
exists = True
if zoneInfo.dnsEntries:
values = defaultdict(lambda: defaultdict(list))
for record in zoneInfo.dnsEntries:
name = zone.hostname_from_fqdn(record['name'])
if name == self.ROOT_RECORD:
name = ''
if record['type'] in self.SUPPORTS:
values[name][record['type']].append(record)
for name, types in values.items():
for _type, records in types.items():
data_for = getattr(self, '_data_for_{}'.format(_type))
record = Record.new(zone, name, data_for(_type, records),
source=self, lenient=lenient)
zone.add_record(record, lenient=lenient)
self.log.info('populate: found %s records, exists = %s',
len(zone.records) - before, exists)
self._currentZone = {}
return exists
def _apply(self, plan):
desired = plan.desired
changes = plan.changes
self.log.debug('apply: zone=%s, changes=%d', desired.name,
len(changes))
# for change in changes:
# class_name = change.__class__.__name__
# getattr(self, '_apply_{}'.format(class_name))(change)
self._currentZone = plan.desired
try:
self._client.get_info(plan.desired.name[:-1])
except WebFault as e:
self.log.warning('_apply: %s ', e.message)
raise e
_dns_entries = []
for record in plan.desired.records:
if record._type in self.SUPPORTS:
entries_for = getattr(self, '_entries_for_{}'.format(record._type))
# Root records have '@' as name
name = record.name
if name == '':
name = self.ROOT_RECORD
_dns_entries.extend(entries_for(name, record))
try:
self._client.set_dns_entries(plan.desired.name[:-1], _dns_entries)
except WebFault as e:
self.log.warning(('_apply: Set DNS returned ' +
'one or more errors: {}').format(
e.fault.faultstring))
raise Exception(200, e.fault.faultstring)
self._currentZone = {}
def _entries_for_multiple(self, name, record):
_entries = []
for value in record.values:
_entries.append(DnsEntry(name, record.ttl, record._type, value))
return _entries
def _entries_for_single(self, name, record):
return [DnsEntry(name, record.ttl, record._type, record.value)]
_entries_for_A = _entries_for_multiple
_entries_for_AAAA = _entries_for_multiple
_entries_for_NS = _entries_for_multiple
_entries_for_SPF = _entries_for_multiple
_entries_for_CNAME = _entries_for_single
def _entries_for_MX(self, name, record):
_entries = []
for value in record.values:
content = "{} {}".format(value.preference, value.exchange)
_entries.append(DnsEntry(name, record.ttl, record._type, content))
return _entries
def _entries_for_SRV(self, name, record):
_entries = []
for value in record.values:
content = "{} {} {} {}".format(value.priority, value.weight,
value.port, value.target)
_entries.append(DnsEntry(name, record.ttl, record._type, content))
return _entries
def _entries_for_SSHFP(self, name, record):
_entries = []
for value in record.values:
content = "{} {} {}".format(value.algorithm, value.fingerprint_type,
value.fingerprint)
_entries.append(DnsEntry(name, record.ttl, record._type, content))
return _entries
def _entries_for_CAA(self, name, record):
_entries = []
for value in record.values:
content = "{} {} {}".format(value.flags, value.tag,
value.value)
_entries.append(DnsEntry(name, record.ttl, record._type, content))
return _entries
def _entries_for_TXT(self, name, record):
_entries = []
for value in record.values:
value = value.replace('\\;', ';')
_entries.append(DnsEntry(name, record.ttl, record._type, value))
return _entries
def _parse_to_fqdn(self, value):
if (value[-1] != '.'):
self.log.debug('parseToFQDN: changed %s to %s', value,
'{}.{}'.format(value, self._currentZone.name))
value = '{}.{}'.format(value, self._currentZone.name)
return value
def _get_lowest_ttl(self, records):
_ttl = 100000
for record in records:
_ttl = min(_ttl, record['expire'])
return _ttl
def _data_for_multiple(self, _type, records):
_values = []
for record in records:
_values.append(record['content'])
return {
'ttl': self._get_lowest_ttl(records),
'type': _type,
'values': _values
}
_data_for_A = _data_for_multiple
_data_for_AAAA = _data_for_multiple
_data_for_NS = _data_for_multiple
_data_for_SPF = _data_for_multiple
def _data_for_CNAME(self, _type, records):
return {
'ttl': records[0]['expire'],
'type': _type,
'value': self._parse_to_fqdn(records[0]['content'])
}
def _data_for_MX(self, _type, records):
values = []
for record in records:
preference, exchange = record['content'].split(" ", 1)
values.append({
'preference': preference,
'exchange': self._parse_to_fqdn(exchange)
})
return {
'ttl': self._get_lowest_ttl(records),
'type': _type,
'values': values
}
def _data_for_SRV(self, _type, records):
values = []
for record in records:
priority, weight, port, target = record['content'].split(' ', 3)
values.append({
'port': port,
'priority': priority,
'target': self._parse_to_fqdn(target),
'weight': weight
})
return {
'type': _type,
'ttl': self._get_lowest_ttl(records),
'values': values
}
def _data_for_SSHFP(self, _type, records):
values = []
for record in records:
algorithm, fp_type, fingerprint = record['content'].split(' ', 2)
values.append({
'algorithm': algorithm,
'fingerprint': fingerprint.lower(),
'fingerprint_type': fp_type
})
return {
'type': _type,
'ttl': self._get_lowest_ttl(records),
'values': values
}
def _data_for_CAA(self, _type, records):
values = []
for record in records:
flags, tag, value = record['content'].split(' ', 2)
values.append({
'flags': flags,
'tag': tag,
'value': value
})
return {
'type': _type,
'ttl': self._get_lowest_ttl(records),
'values': values
}
def _data_for_TXT(self, _type, records):
values = []
for record in records:
values.append(record['content'].replace(';', '\\;'))
return {
'type': _type,
'ttl': self._get_lowest_ttl(records),
'values': values
}

View File

@@ -0,0 +1,234 @@
#
#
#
from __future__ import absolute_import, division, print_function, \
unicode_literals
# from mock import Mock, call
from os.path import dirname, join
from suds import WebFault
from requests_mock import ANY, mock as requests_mock
from unittest import TestCase
from octodns.record import Record
from octodns.provider.transip import TransipProvider
from octodns.provider.yaml import YamlProvider
from octodns.zone import Zone
from transip.service.domain import DomainService
from transip.service.objects import DnsEntry
class MockDomainService(DomainService):
def __init__(self, *args, **kwargs):
super(MockDomainService, self).__init__('MockDomainService', *args, **kwargs)
self.mockupEntries = []
def mockup(self, records):
provider = TransipProvider('', '', '');
_dns_entries = []
for record in records:
if record._type in provider.SUPPORTS:
entries_for = getattr(provider, '_entries_for_{}'.format(record._type))
# Root records have '@' as name
name = record.name
if name == '':
name = provider.ROOT_RECORD
_dns_entries.extend(entries_for(name, record))
_dns_entries.append(DnsEntry('@', '3600', 'NS', 'ns01.transip.nl.'))
self.mockupEntries = _dns_entries
# Skips authentication layer and returns the entries loaded by "Mockup"
def get_info(self, domain_name):
if str(domain_name) == str('notfound.unit.tests'):
self.raiseZoneNotFound()
result = lambda: None
setattr(result, "dnsEntries", self.mockupEntries)
return result
def set_dns_entries(self, domain_name, dns_entries):
if str(domain_name) == str('failsetdns.unit.tests'):
self.raiseSaveError()
return True
def raiseZoneNotFound(self):
fault = lambda: None
setattr(fault, "faultstring", '102 is zone not found')
setattr(fault, "faultcode", str('102'))
document = {}
raise WebFault(fault, document)
def raiseInvalidAuth(self):
fault = lambda: None
setattr(fault, "faultstring", '200 is invalid auth')
setattr(fault, "faultcode", str('200'))
document = {}
raise WebFault(fault, document)
def raiseSaveError(self):
fault = lambda: None
setattr(fault, "faultstring", '202 error while saving')
setattr(fault, "faultcode", str('202'))
document = {}
raise WebFault(fault, document)
class TestTransipProvider(TestCase):
bogus_key = str("""-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEA0U5HGCkLrz423IyUf3u4cKN2WrNz1x5KNr6PvH2M/zxas+zB
elbxkdT3AQ+wmfcIvOuTmFRTHv35q2um1aBrPxVw+2s+lWo28VwIRttwIB1vIeWu
lSBnkEZQRLyPI2tH0i5QoMX4CVPf9rvij3Uslimi84jdzDfPFIh6jZ6C8nLipOTG
0IMhge1ofVfB0oSy5H+7PYS2858QLAf5ruYbzbAxZRivS402wGmQ0d0Lc1KxraAj
kiMM5yj/CkH/Vm2w9I6+tLFeASE4ub5HCP5G/ig4dbYtqZMQMpqyAbGxd5SOVtyn
UHagAJUxf8DT3I8PyjEHjxdOPUsxNyRtepO/7QIDAQABAoIBAQC7fiZ7gxE/ezjD
2n6PsHFpHVTBLS2gzzZl0dCKZeFvJk6ODJDImaeuHhrh7X8ifMNsEI9XjnojMhl8
MGPzy88mZHugDNK0H8B19x5G8v1/Fz7dG5WHas660/HFkS+b59cfdXOugYiOOn9O
08HBBpLZNRUOmVUuQfQTjapSwGLG8PocgpyRD4zx0LnldnJcqYCxwCdev+AAsPnq
ibNtOd/MYD37w9MEGcaxLE8wGgkv8yd97aTjkgE+tp4zsM4QE4Rag133tsLLNznT
4Qr/of15M3NW/DXq/fgctyRcJjZpU66eCXLCz2iRTnLyyxxDC2nwlxKbubV+lcS0
S4hbfd/BAoGBAO8jXxEaiybR0aIhhSR5esEc3ymo8R8vBN3ZMJ+vr5jEPXr/ZuFj
/R4cZ2XV3VoQJG0pvIOYVPZ5DpJM7W+zSXtJ/7bLXy4Bnmh/rc+YYgC+AXQoLSil
iD2OuB2xAzRAK71DVSO0kv8gEEXCersPT2i6+vC2GIlJvLcYbOdRKWGxAoGBAOAQ
aJbRLtKujH+kMdoMI7tRlL8XwI+SZf0FcieEu//nFyerTePUhVgEtcE+7eQ7hyhG
fIXUFx/wALySoqFzdJDLc8U8pTLhbUaoLOTjkwnCTKQVprhnISqQqqh/0U5u47IE
RWzWKN6OHb0CezNTq80Dr6HoxmPCnJHBHn5LinT9AoGAQSpvZpbIIqz8pmTiBl2A
QQ2gFpcuFeRXPClKYcmbXVLkuhbNL1BzEniFCLAt4LQTaRf9ghLJ3FyCxwVlkpHV
zV4N6/8hkcTpKOraL38D/dXJSaEFJVVuee/hZl3tVJjEEpA9rDwx7ooLRSdJEJ6M
ciq55UyKBSdt4KssSiDI2RECgYBL3mJ7xuLy5bWfNsrGiVvD/rC+L928/5ZXIXPw
26oI0Yfun7ulDH4GOroMcDF/GYT/Zzac3h7iapLlR0WYI47xxGI0A//wBZLJ3QIu
krxkDo2C9e3Y/NqnHgsbOQR3aWbiDT4wxydZjIeXS3LKA2fl6Hyc90PN3cTEOb8I
hq2gRQKBgEt0SxhhtyB93SjgTzmUZZ7PiEf0YJatfM6cevmjWHexrZH+x31PB72s
fH2BQyTKKzoCLB1k/6HRaMnZdrWyWSZ7JKz3AHJ8+58d0Hr8LTrzDM1L6BbjeDct
N4OiVz1I3rbZGYa396lpxO6ku8yCglisL1yrSP6DdEUp66ntpKVd
-----END RSA PRIVATE KEY-----""")
def make_expected(self):
expected = Zone('unit.tests.', [])
source = YamlProvider('test', join(dirname(__file__), 'config'))
source.populate(expected)
return expected
def test_populate(self):
_expected = self.make_expected()
with self.assertRaises(WebFault) as ctx:
provider = TransipProvider('test', 'unittest', self.bogus_key)
zone = Zone('unit.tests.', [])
provider.populate(zone, True)
self.assertEquals(str('WebFault'),
str(ctx.exception.__class__.__name__))
self.assertEquals(str('200'), ctx.exception.fault.faultcode)
with self.assertRaises(Exception) as ctx:
provider = TransipProvider('test', 'unittest', self.bogus_key)
provider._client = MockDomainService('unittest', self.bogus_key)
zone = Zone('notfound.unit.tests.', [])
provider.populate(zone, True)
self.assertEquals(str('Exception'),
str(ctx.exception.__class__.__name__))
self.assertEquals('populate: (102) Transip used as target for non-existing zone: notfound.unit.tests.', ctx.exception.message)
provider = TransipProvider('test', 'unittest', self.bogus_key)
provider._client = MockDomainService('unittest', self.bogus_key)
zone = Zone('notfound.unit.tests.', [])
provider.populate(zone, False)
provider = TransipProvider('test', 'unittest', self.bogus_key)
provider._client = MockDomainService('unittest', self.bogus_key)
provider._client.mockup(_expected.records)
zone = Zone('unit.tests.', [])
provider.populate(zone, False)
provider._currentZone = zone
self.assertEquals("www.unit.tests.", provider._parse_to_fqdn("www"))
provider = TransipProvider('test', 'unittest', self.bogus_key)
provider._client = MockDomainService('unittest', self.bogus_key)
zone = Zone('unit.tests.', [])
exists = provider.populate(zone, True)
self.assertTrue(exists, 'populate should return true')
return
def test_plan(self):
_expected = self.make_expected()
print(_expected.name)
provider = TransipProvider('test', 'unittest', self.bogus_key)
provider._client = MockDomainService('unittest', self.bogus_key)
plan = provider.plan(_expected)
self.assertEqual(12, plan.change_counts['Create'])
self.assertEqual(0, plan.change_counts['Update'])
self.assertEqual(0, plan.change_counts['Delete'])
return
def test_apply(self):
_expected = self.make_expected()
provider = TransipProvider('test', 'unittest', self.bogus_key)
provider._client = MockDomainService('unittest', self.bogus_key)
plan = provider.plan(_expected)
#self.assertEqual(11, plan.changes)
changes = provider.apply(plan)
with self.assertRaises(Exception) as ctx:
provider = TransipProvider('test', 'unittest', self.bogus_key)
provider._client = MockDomainService('unittest', self.bogus_key)
plan = provider.plan(_expected)
plan.desired.name = 'notfound.unit.tests.'
changes = provider.apply(plan)
# self.assertEqual(11, changes)
self.assertEquals(str('WebFault'),
str(ctx.exception.__class__.__name__))
_expected = self.make_expected()
with self.assertRaises(Exception) as ctx:
provider = TransipProvider('test', 'unittest', self.bogus_key)
provider._client = MockDomainService('unittest', self.bogus_key)
plan = provider.plan(_expected)
plan.desired.name = 'failsetdns.unit.tests.'
changes = provider.apply(plan)
# self.assertEqual(11, changes)
#provider = TransipProvider('test', 'unittest', self.bogus_key)
#plan = provider.plan(_expected)
# changes = provider.apply(plan)
# self.assertEquals(29, changes)