mirror of
https://github.com/github/octodns.git
synced 2024-05-11 05:55:00 +00:00
1803 lines
69 KiB
Python
1803 lines
69 KiB
Python
#
|
|
#
|
|
#
|
|
|
|
from __future__ import absolute_import, division, print_function, \
|
|
unicode_literals
|
|
|
|
from octodns.record import Create, Update, Delete, Record
|
|
from octodns.provider.azuredns import _AzureRecord, AzureProvider, \
|
|
_check_endswith_dot, _parse_azure_type, _root_traffic_manager_name, \
|
|
_get_monitor, _profile_is_match, AzureException
|
|
from octodns.zone import Zone
|
|
from octodns.provider.base import Plan
|
|
|
|
from azure.mgmt.dns.models import ARecord, AaaaRecord, CaaRecord, \
|
|
CnameRecord, MxRecord, SrvRecord, NsRecord, PtrRecord, TxtRecord, \
|
|
RecordSet, SoaRecord, SubResource, Zone as AzureZone
|
|
from azure.mgmt.trafficmanager.models import Profile, DnsConfig, \
|
|
MonitorConfig, Endpoint, MonitorConfigCustomHeadersItem
|
|
from msrestazure.azure_exceptions import CloudError
|
|
|
|
from six import text_type
|
|
from unittest import TestCase
|
|
from mock import Mock, patch, call
|
|
|
|
|
|
zone = Zone(name='unit.tests.', sub_zones=[])
|
|
octo_records = []
|
|
octo_records.append(Record.new(zone, '', {
|
|
'ttl': 0,
|
|
'type': 'A',
|
|
'values': ['1.2.3.4', '10.10.10.10']}))
|
|
octo_records.append(Record.new(zone, 'a', {
|
|
'ttl': 1,
|
|
'type': 'A',
|
|
'values': ['1.2.3.4', '1.1.1.1']}))
|
|
octo_records.append(Record.new(zone, 'aa', {
|
|
'ttl': 9001,
|
|
'type': 'A',
|
|
'values': ['1.2.4.3']}))
|
|
octo_records.append(Record.new(zone, 'aaa', {
|
|
'ttl': 2,
|
|
'type': 'A',
|
|
'values': ['1.1.1.3']}))
|
|
octo_records.append(Record.new(zone, 'aaaa1', {
|
|
'ttl': 300,
|
|
'type': 'AAAA',
|
|
'values': ['2601:644:500:e210:62f8:1dff:feb8:947a',
|
|
'2601:642:500:e210:62f8:1dff:feb8:947a'],
|
|
}))
|
|
octo_records.append(Record.new(zone, 'aaaa2', {
|
|
'ttl': 300,
|
|
'type': 'AAAA',
|
|
'value': '2601:644:500:e210:62f8:1dff:feb8:947a'
|
|
}))
|
|
octo_records.append(Record.new(zone, 'caa1', {
|
|
'ttl': 9,
|
|
'type': 'CAA',
|
|
'value': {
|
|
'flags': 0,
|
|
'tag': 'issue',
|
|
'value': 'ca.unit.tests',
|
|
}}))
|
|
octo_records.append(Record.new(zone, 'caa2', {
|
|
'ttl': 9,
|
|
'type': 'CAA',
|
|
'values': [{
|
|
'flags': 0,
|
|
'tag': 'issue',
|
|
'value': 'ca1.unit.tests',
|
|
}, {
|
|
'flags': 0,
|
|
'tag': 'issue',
|
|
'value': 'ca2.unit.tests',
|
|
}]}))
|
|
octo_records.append(Record.new(zone, 'cname', {
|
|
'ttl': 3,
|
|
'type': 'CNAME',
|
|
'value': 'a.unit.tests.'}))
|
|
octo_records.append(Record.new(zone, 'mx1', {
|
|
'ttl': 3,
|
|
'type': 'MX',
|
|
'values': [{
|
|
'priority': 10,
|
|
'value': 'mx1.unit.tests.',
|
|
}, {
|
|
'priority': 20,
|
|
'value': 'mx2.unit.tests.',
|
|
}]}))
|
|
octo_records.append(Record.new(zone, 'mx2', {
|
|
'ttl': 3,
|
|
'type': 'MX',
|
|
'values': [{
|
|
'priority': 10,
|
|
'value': 'mx1.unit.tests.',
|
|
}]}))
|
|
octo_records.append(Record.new(zone, '', {
|
|
'ttl': 4,
|
|
'type': 'NS',
|
|
'values': ['ns1.unit.tests.', 'ns2.unit.tests.']}))
|
|
octo_records.append(Record.new(zone, 'foo', {
|
|
'ttl': 5,
|
|
'type': 'NS',
|
|
'value': 'ns1.unit.tests.'}))
|
|
octo_records.append(Record.new(zone, 'ptr1', {
|
|
'ttl': 5,
|
|
'type': 'PTR',
|
|
'value': 'ptr1.unit.tests.'}))
|
|
octo_records.append(Record.new(zone, '_srv._tcp', {
|
|
'ttl': 6,
|
|
'type': 'SRV',
|
|
'values': [{
|
|
'priority': 10,
|
|
'weight': 20,
|
|
'port': 30,
|
|
'target': 'foo-1.unit.tests.',
|
|
}, {
|
|
'priority': 12,
|
|
'weight': 30,
|
|
'port': 30,
|
|
'target': 'foo-2.unit.tests.',
|
|
}]}))
|
|
octo_records.append(Record.new(zone, '_srv2._tcp', {
|
|
'ttl': 7,
|
|
'type': 'SRV',
|
|
'values': [{
|
|
'priority': 12,
|
|
'weight': 17,
|
|
'port': 1,
|
|
'target': 'srvfoo.unit.tests.',
|
|
}]}))
|
|
octo_records.append(Record.new(zone, 'txt1', {
|
|
'ttl': 8,
|
|
'type': 'TXT',
|
|
'value': 'txt singleton test'}))
|
|
octo_records.append(Record.new(zone, 'txt2', {
|
|
'ttl': 9,
|
|
'type': 'TXT',
|
|
'values': ['txt multiple test', 'txt multiple test 2']}))
|
|
|
|
long_txt = "v=spf1 ip4:10.10.0.0/24 ip4:10.10.1.0/24 ip4:10.10.2.0/24"
|
|
long_txt += " ip4:10.10.3.0/24 ip4:10.10.4.0/24 ip4:10.10.5.0/24 "
|
|
long_txt += " 10.6.0/24 ip4:10.10.7.0/24 ip4:10.10.8.0/24 "
|
|
long_txt += " ip4:10.10.10.0/24 ip4:10.10.11.0/24 ip4:10.10.12.0/24"
|
|
long_txt += " ip4:10.10.13.0/24 ip4:10.10.14.0/24 ip4:10.10.15.0/24"
|
|
long_txt += " ip4:10.10.16.0/24 ip4:10.10.17.0/24 ip4:10.10.18.0/24"
|
|
long_txt += " ip4:10.10.19.0/24 ip4:10.10.20.0/24 ~all"
|
|
octo_records.append(Record.new(zone, 'txt3', {
|
|
'ttl': 10,
|
|
'type': 'TXT',
|
|
'values': ['txt multiple test', long_txt]}))
|
|
|
|
azure_records = []
|
|
_base0 = _AzureRecord('TestAzure', octo_records[0])
|
|
_base0.zone_name = 'unit.tests'
|
|
_base0.relative_record_set_name = '@'
|
|
_base0.record_type = 'A'
|
|
_base0.params['ttl'] = 0
|
|
_base0.params['a_records'] = [ARecord(ipv4_address='1.2.3.4'),
|
|
ARecord(ipv4_address='10.10.10.10')]
|
|
azure_records.append(_base0)
|
|
|
|
_base1 = _AzureRecord('TestAzure', octo_records[1])
|
|
_base1.zone_name = 'unit.tests'
|
|
_base1.relative_record_set_name = 'a'
|
|
_base1.record_type = 'A'
|
|
_base1.params['ttl'] = 1
|
|
_base1.params['a_records'] = [ARecord(ipv4_address='1.2.3.4'),
|
|
ARecord(ipv4_address='1.1.1.1')]
|
|
azure_records.append(_base1)
|
|
|
|
_base2 = _AzureRecord('TestAzure', octo_records[2])
|
|
_base2.zone_name = 'unit.tests'
|
|
_base2.relative_record_set_name = 'aa'
|
|
_base2.record_type = 'A'
|
|
_base2.params['ttl'] = 9001
|
|
_base2.params['a_records'] = ARecord(ipv4_address='1.2.4.3')
|
|
azure_records.append(_base2)
|
|
|
|
_base3 = _AzureRecord('TestAzure', octo_records[3])
|
|
_base3.zone_name = 'unit.tests'
|
|
_base3.relative_record_set_name = 'aaa'
|
|
_base3.record_type = 'A'
|
|
_base3.params['ttl'] = 2
|
|
_base3.params['a_records'] = ARecord(ipv4_address='1.1.1.3')
|
|
azure_records.append(_base3)
|
|
|
|
_base4 = _AzureRecord('TestAzure', octo_records[4])
|
|
_base4.zone_name = 'unit.tests'
|
|
_base4.relative_record_set_name = 'aaaa1'
|
|
_base4.record_type = 'AAAA'
|
|
_base4.params['ttl'] = 300
|
|
aaaa1 = AaaaRecord(ipv6_address='2601:644:500:e210:62f8:1dff:feb8:947a')
|
|
aaaa2 = AaaaRecord(ipv6_address='2601:642:500:e210:62f8:1dff:feb8:947a')
|
|
_base4.params['aaaa_records'] = [aaaa1, aaaa2]
|
|
azure_records.append(_base4)
|
|
|
|
_base5 = _AzureRecord('TestAzure', octo_records[5])
|
|
_base5.zone_name = 'unit.tests'
|
|
_base5.relative_record_set_name = 'aaaa2'
|
|
_base5.record_type = 'AAAA'
|
|
_base5.params['ttl'] = 300
|
|
_base5.params['aaaa_records'] = [aaaa1]
|
|
azure_records.append(_base5)
|
|
|
|
_base6 = _AzureRecord('TestAzure', octo_records[6])
|
|
_base6.zone_name = 'unit.tests'
|
|
_base6.relative_record_set_name = 'caa1'
|
|
_base6.record_type = 'CAA'
|
|
_base6.params['ttl'] = 9
|
|
_base6.params['caa_records'] = [CaaRecord(flags=0,
|
|
tag='issue',
|
|
value='ca.unit.tests')]
|
|
azure_records.append(_base6)
|
|
|
|
_base7 = _AzureRecord('TestAzure', octo_records[7])
|
|
_base7.zone_name = 'unit.tests'
|
|
_base7.relative_record_set_name = 'caa2'
|
|
_base7.record_type = 'CAA'
|
|
_base7.params['ttl'] = 9
|
|
_base7.params['caa_records'] = [CaaRecord(flags=0,
|
|
tag='issue',
|
|
value='ca1.unit.tests'),
|
|
CaaRecord(flags=0,
|
|
tag='issue',
|
|
value='ca2.unit.tests')]
|
|
azure_records.append(_base7)
|
|
|
|
_base8 = _AzureRecord('TestAzure', octo_records[8])
|
|
_base8.zone_name = 'unit.tests'
|
|
_base8.relative_record_set_name = 'cname'
|
|
_base8.record_type = 'CNAME'
|
|
_base8.params['ttl'] = 3
|
|
_base8.params['cname_record'] = CnameRecord(cname='a.unit.tests.')
|
|
azure_records.append(_base8)
|
|
|
|
_base9 = _AzureRecord('TestAzure', octo_records[9])
|
|
_base9.zone_name = 'unit.tests'
|
|
_base9.relative_record_set_name = 'mx1'
|
|
_base9.record_type = 'MX'
|
|
_base9.params['ttl'] = 3
|
|
_base9.params['mx_records'] = [MxRecord(preference=10,
|
|
exchange='mx1.unit.tests.'),
|
|
MxRecord(preference=20,
|
|
exchange='mx2.unit.tests.')]
|
|
azure_records.append(_base9)
|
|
|
|
_base10 = _AzureRecord('TestAzure', octo_records[10])
|
|
_base10.zone_name = 'unit.tests'
|
|
_base10.relative_record_set_name = 'mx2'
|
|
_base10.record_type = 'MX'
|
|
_base10.params['ttl'] = 3
|
|
_base10.params['mx_records'] = [MxRecord(preference=10,
|
|
exchange='mx1.unit.tests.')]
|
|
azure_records.append(_base10)
|
|
|
|
_base11 = _AzureRecord('TestAzure', octo_records[11])
|
|
_base11.zone_name = 'unit.tests'
|
|
_base11.relative_record_set_name = '@'
|
|
_base11.record_type = 'NS'
|
|
_base11.params['ttl'] = 4
|
|
_base11.params['ns_records'] = [NsRecord(nsdname='ns1.unit.tests.'),
|
|
NsRecord(nsdname='ns2.unit.tests.')]
|
|
azure_records.append(_base11)
|
|
|
|
_base12 = _AzureRecord('TestAzure', octo_records[12])
|
|
_base12.zone_name = 'unit.tests'
|
|
_base12.relative_record_set_name = 'foo'
|
|
_base12.record_type = 'NS'
|
|
_base12.params['ttl'] = 5
|
|
_base12.params['ns_records'] = [NsRecord(nsdname='ns1.unit.tests.')]
|
|
azure_records.append(_base12)
|
|
|
|
_base13 = _AzureRecord('TestAzure', octo_records[13])
|
|
_base13.zone_name = 'unit.tests'
|
|
_base13.relative_record_set_name = 'ptr1'
|
|
_base13.record_type = 'PTR'
|
|
_base13.params['ttl'] = 5
|
|
_base13.params['ptr_records'] = [PtrRecord(ptrdname='ptr1.unit.tests.')]
|
|
azure_records.append(_base13)
|
|
|
|
_base14 = _AzureRecord('TestAzure', octo_records[14])
|
|
_base14.zone_name = 'unit.tests'
|
|
_base14.relative_record_set_name = '_srv._tcp'
|
|
_base14.record_type = 'SRV'
|
|
_base14.params['ttl'] = 6
|
|
_base14.params['srv_records'] = [SrvRecord(priority=10,
|
|
weight=20,
|
|
port=30,
|
|
target='foo-1.unit.tests.'),
|
|
SrvRecord(priority=12,
|
|
weight=30,
|
|
port=30,
|
|
target='foo-2.unit.tests.')]
|
|
azure_records.append(_base14)
|
|
|
|
_base15 = _AzureRecord('TestAzure', octo_records[15])
|
|
_base15.zone_name = 'unit.tests'
|
|
_base15.relative_record_set_name = '_srv2._tcp'
|
|
_base15.record_type = 'SRV'
|
|
_base15.params['ttl'] = 7
|
|
_base15.params['srv_records'] = [SrvRecord(priority=12,
|
|
weight=17,
|
|
port=1,
|
|
target='srvfoo.unit.tests.')]
|
|
azure_records.append(_base15)
|
|
|
|
_base16 = _AzureRecord('TestAzure', octo_records[16])
|
|
_base16.zone_name = 'unit.tests'
|
|
_base16.relative_record_set_name = 'txt1'
|
|
_base16.record_type = 'TXT'
|
|
_base16.params['ttl'] = 8
|
|
_base16.params['txt_records'] = [TxtRecord(value=['txt singleton test'])]
|
|
azure_records.append(_base16)
|
|
|
|
_base17 = _AzureRecord('TestAzure', octo_records[17])
|
|
_base17.zone_name = 'unit.tests'
|
|
_base17.relative_record_set_name = 'txt2'
|
|
_base17.record_type = 'TXT'
|
|
_base17.params['ttl'] = 9
|
|
_base17.params['txt_records'] = [TxtRecord(value=['txt multiple test']),
|
|
TxtRecord(value=['txt multiple test 2'])]
|
|
azure_records.append(_base17)
|
|
|
|
long_txt_az1 = "v=spf1 ip4:10.10.0.0/24 ip4:10.10.1.0/24 ip4:10.10.2.0/24"
|
|
long_txt_az1 += " ip4:10.10.3.0/24 ip4:10.10.4.0/24 ip4:10.10.5.0/24 "
|
|
long_txt_az1 += " 10.6.0/24 ip4:10.10.7.0/24 ip4:10.10.8.0/24 "
|
|
long_txt_az1 += " ip4:10.10.10.0/24 ip4:10.10.11.0/24 ip4:10.10.12.0/24"
|
|
long_txt_az1 += " ip4:10.10.13.0/24 ip4:10.10.14.0/24 ip4:10.10."
|
|
long_txt_az2 = "15.0/24 ip4:10.10.16.0/24 ip4:10.10.17.0/24 ip4:10.10.18.0/24"
|
|
long_txt_az2 += " ip4:10.10.19.0/24 ip4:10.10.20.0/24 ~all"
|
|
_base18 = _AzureRecord('TestAzure', octo_records[18])
|
|
_base18.zone_name = 'unit.tests'
|
|
_base18.relative_record_set_name = 'txt3'
|
|
_base18.record_type = 'TXT'
|
|
_base18.params['ttl'] = 10
|
|
_base18.params['txt_records'] = [TxtRecord(value=['txt multiple test']),
|
|
TxtRecord(value=[long_txt_az1, long_txt_az2])]
|
|
azure_records.append(_base18)
|
|
|
|
|
|
class Test_AzureRecord(TestCase):
|
|
def test_azure_record(self):
|
|
assert(len(azure_records) == len(octo_records))
|
|
for i in range(len(azure_records)):
|
|
octo = _AzureRecord('TestAzure', octo_records[i])
|
|
assert(azure_records[i]._equals(octo))
|
|
|
|
|
|
class Test_DynamicAzureRecord(TestCase):
|
|
def test_azure_record(self):
|
|
tm_profile = Profile()
|
|
data = {
|
|
'ttl': 60,
|
|
'type': 'CNAME',
|
|
'value': 'default.unit.tests.',
|
|
'dynamic': {
|
|
'pools': {
|
|
'one': {
|
|
'values': [
|
|
{'value': 'one.unit.tests.', 'weight': 1}
|
|
],
|
|
'fallback': 'two',
|
|
},
|
|
'two': {
|
|
'values': [
|
|
{'value': 'two.unit.tests.', 'weight': 1}
|
|
],
|
|
},
|
|
},
|
|
'rules': [
|
|
{'geos': ['AF'], 'pool': 'one'},
|
|
{'pool': 'two'},
|
|
],
|
|
}
|
|
}
|
|
octo_record = Record.new(zone, 'foo', data)
|
|
azure_record = _AzureRecord('TestAzure', octo_record,
|
|
traffic_manager=tm_profile)
|
|
self.assertEqual(azure_record.zone_name, zone.name[:-1])
|
|
self.assertEqual(azure_record.relative_record_set_name, 'foo')
|
|
self.assertEqual(azure_record.record_type, 'CNAME')
|
|
self.assertEqual(azure_record.params['ttl'], 60)
|
|
self.assertEqual(azure_record.params['target_resource'], tm_profile)
|
|
|
|
|
|
class Test_ParseAzureType(TestCase):
|
|
def test_parse_azure_type(self):
|
|
for expected, test in [['A', 'Microsoft.Network/dnszones/A'],
|
|
['AAAA', 'Microsoft.Network/dnszones/AAAA'],
|
|
['NS', 'Microsoft.Network/dnszones/NS'],
|
|
['MX', 'Microsoft.Network/dnszones/MX']]:
|
|
self.assertEquals(expected, _parse_azure_type(test))
|
|
|
|
|
|
class Test_CheckEndswithDot(TestCase):
|
|
def test_check_endswith_dot(self):
|
|
for expected, test in [['a.', 'a'],
|
|
['a.', 'a.'],
|
|
['foo.bar.', 'foo.bar.'],
|
|
['foo.bar.', 'foo.bar']]:
|
|
self.assertEquals(expected, _check_endswith_dot(test))
|
|
|
|
|
|
class Test_RootTrafficManagerName(TestCase):
|
|
def test_root_traffic_manager_name(self):
|
|
test = Record.new(zone, 'foo', data={
|
|
'ttl': 60, 'type': 'CNAME', 'value': 'default.unit.tests.',
|
|
})
|
|
self.assertEqual(_root_traffic_manager_name(test), 'foo--unit--tests')
|
|
|
|
|
|
class Test_GetMonitor(TestCase):
|
|
def test_get_monitor(self):
|
|
record = Record.new(zone, 'foo', data={
|
|
'type': 'CNAME', 'ttl': 60, 'value': 'default.unit.tests.',
|
|
'octodns': {
|
|
'healthcheck': {
|
|
'path': '/_ping',
|
|
'port': 4443,
|
|
'protocol': 'HTTPS',
|
|
}
|
|
},
|
|
})
|
|
|
|
monitor = _get_monitor(record)
|
|
self.assertEqual(monitor.protocol, 'HTTPS')
|
|
self.assertEqual(monitor.port, 4443)
|
|
self.assertEqual(monitor.path, '/_ping')
|
|
headers = monitor.custom_headers
|
|
self.assertIsInstance(headers, list)
|
|
self.assertEquals(len(headers), 1)
|
|
headers = headers[0]
|
|
self.assertEqual(headers.name, 'Host')
|
|
self.assertEqual(headers.value, record.healthcheck_host)
|
|
|
|
# test TCP monitor
|
|
record._octodns['healthcheck']['protocol'] = 'TCP'
|
|
monitor = _get_monitor(record)
|
|
self.assertEqual(monitor.protocol, 'TCP')
|
|
self.assertIsNone(monitor.custom_headers)
|
|
|
|
|
|
class Test_ProfileIsMatch(TestCase):
|
|
def test_profile_is_match(self):
|
|
is_match = _profile_is_match
|
|
|
|
self.assertFalse(is_match(None, Profile()))
|
|
|
|
# Profile object builder with default property values that can be
|
|
# overridden for testing below
|
|
def profile(
|
|
name = 'foo-unit-tests',
|
|
ttl = 60,
|
|
method = 'Geographic',
|
|
dns_name = None,
|
|
monitor_proto = 'HTTPS',
|
|
monitor_port = 4443,
|
|
monitor_path = '/_ping',
|
|
endpoints = 1,
|
|
endpoint_name = 'name',
|
|
endpoint_type = 'profile/nestedEndpoints',
|
|
target = 'target.unit.tests',
|
|
target_id = 'resource/id',
|
|
geos = ['GEO-AF'],
|
|
weight = 1,
|
|
priority = 1,
|
|
):
|
|
dns = DnsConfig(relative_name=(dns_name or name), ttl=ttl)
|
|
return Profile(
|
|
name=name, traffic_routing_method=method, dns_config=dns,
|
|
monitor_config=MonitorConfig(
|
|
protocol=monitor_proto,
|
|
port=monitor_port,
|
|
path=monitor_path,
|
|
),
|
|
endpoints=[Endpoint(
|
|
name=endpoint_name,
|
|
type=endpoint_type,
|
|
target=target,
|
|
target_resource_id=target_id,
|
|
geo_mapping=geos,
|
|
weight=weight,
|
|
priority=priority,
|
|
)] + [Endpoint()] * (endpoints - 1),
|
|
)
|
|
|
|
self.assertTrue(is_match(profile(), profile()))
|
|
|
|
self.assertFalse(is_match(profile(), profile(name='two')))
|
|
self.assertFalse(is_match(profile(), profile(endpoints=2)))
|
|
self.assertFalse(is_match(profile(), profile(dns_name='two')))
|
|
self.assertFalse(is_match(profile(), profile(monitor_proto='HTTP')))
|
|
self.assertFalse(is_match(profile(), profile(endpoint_name='a')))
|
|
self.assertFalse(is_match(profile(), profile(endpoint_type='b')))
|
|
self.assertFalse(
|
|
is_match(profile(endpoint_type='b'), profile(endpoint_type='b'))
|
|
)
|
|
self.assertFalse(is_match(profile(), profile(target_id='rsrc/id2')))
|
|
self.assertFalse(is_match(profile(), profile(geos=['IN'])))
|
|
|
|
def wprofile(**kwargs):
|
|
kwargs['method'] = 'Weighted'
|
|
kwargs['endpoint_type'] = 'profile/externalEndpoints'
|
|
return profile(**kwargs)
|
|
|
|
self.assertFalse(is_match(wprofile(), wprofile(target='bar.unit')))
|
|
self.assertFalse(is_match(wprofile(), wprofile(weight=3)))
|
|
|
|
|
|
class TestAzureDnsProvider(TestCase):
|
|
def _provider(self):
|
|
return self._get_provider('mock_spc', 'mock_dns_client')
|
|
|
|
@patch('octodns.provider.azuredns.TrafficManagerManagementClient')
|
|
@patch('octodns.provider.azuredns.DnsManagementClient')
|
|
@patch('octodns.provider.azuredns.ClientSecretCredential')
|
|
@patch('octodns.provider.azuredns.ServicePrincipalCredentials')
|
|
def _get_provider(self, mock_spc, mock_css, mock_dns_client,
|
|
mock_tm_client):
|
|
'''Returns a mock AzureProvider object to use in testing.
|
|
|
|
:param mock_spc: placeholder
|
|
:type mock_spc: str
|
|
:param mock_dns_client: placeholder
|
|
:type mock_dns_client: str
|
|
:param mock_tm_client: placeholder
|
|
:type mock_tm_client: str
|
|
|
|
:type return: AzureProvider
|
|
'''
|
|
provider = AzureProvider('mock_id', 'mock_client', 'mock_key',
|
|
'mock_directory', 'mock_sub', 'mock_rg'
|
|
)
|
|
|
|
# Fetch the client to force it to load the creds
|
|
provider._dns_client
|
|
|
|
# set critical functions to return properly
|
|
tm_list = provider._tm_client.profiles.list_by_resource_group
|
|
tm_list.return_value = []
|
|
tm_sync = provider._tm_client.profiles.create_or_update
|
|
|
|
def side_effect(rg, name, profile):
|
|
return profile
|
|
|
|
tm_sync.side_effect = side_effect
|
|
|
|
return provider
|
|
|
|
def _get_dynamic_record(self, zone):
|
|
return Record.new(zone, 'foo', data={
|
|
'type': 'CNAME',
|
|
'ttl': 60,
|
|
'value': 'default.unit.tests.',
|
|
'dynamic': {
|
|
'pools': {
|
|
'one': {
|
|
'values': [
|
|
{'value': 'one.unit.tests.', 'weight': 1},
|
|
],
|
|
'fallback': 'two',
|
|
},
|
|
'two': {
|
|
'values': [
|
|
{'value': 'two1.unit.tests.', 'weight': 3},
|
|
{'value': 'two2.unit.tests.', 'weight': 4},
|
|
],
|
|
'fallback': 'three',
|
|
},
|
|
'three': {
|
|
'values': [
|
|
{'value': 'three.unit.tests.', 'weight': 1},
|
|
],
|
|
},
|
|
},
|
|
'rules': [
|
|
{'geos': ['AF', 'EU-DE', 'NA-US-CA', 'OC'], 'pool': 'one'},
|
|
{'pool': 'two'},
|
|
],
|
|
},
|
|
'octodns': {
|
|
'healthcheck': {
|
|
'path': '/_ping',
|
|
'port': 4443,
|
|
'protocol': 'HTTPS',
|
|
}
|
|
},
|
|
})
|
|
|
|
def _get_tm_profiles(self, provider):
|
|
sub = provider._dns_client_subscription_id
|
|
rg = provider._resource_group
|
|
base_id = '/subscriptions/' + sub + \
|
|
'/resourceGroups/' + rg + \
|
|
'/providers/Microsoft.Network/trafficManagerProfiles/'
|
|
prefix = 'foo--unit--tests'
|
|
name_format = prefix + '-{}'
|
|
id_format = base_id + name_format
|
|
|
|
header = MonitorConfigCustomHeadersItem(name='Host',
|
|
value='foo.unit.tests')
|
|
monitor = MonitorConfig(protocol='HTTPS', port=4443, path='/_ping',
|
|
custom_headers=[header])
|
|
external = 'Microsoft.Network/trafficManagerProfiles/externalEndpoints'
|
|
nested = 'Microsoft.Network/trafficManagerProfiles/nestedEndpoints'
|
|
|
|
profiles = [
|
|
Profile(
|
|
id=id_format.format('pool-two'),
|
|
name=name_format.format('pool-two'),
|
|
traffic_routing_method='Weighted',
|
|
dns_config=DnsConfig(ttl=60),
|
|
monitor_config=monitor,
|
|
endpoints=[
|
|
Endpoint(
|
|
name='two--two1.unit.tests',
|
|
type=external,
|
|
target='two1.unit.tests',
|
|
weight=3,
|
|
),
|
|
Endpoint(
|
|
name='two--two2.unit.tests',
|
|
type=external,
|
|
target='two2.unit.tests',
|
|
weight=4,
|
|
),
|
|
],
|
|
),
|
|
Profile(
|
|
id=id_format.format('rule-one'),
|
|
name=name_format.format('rule-one'),
|
|
traffic_routing_method='Priority',
|
|
dns_config=DnsConfig(ttl=60),
|
|
monitor_config=monitor,
|
|
endpoints=[
|
|
Endpoint(
|
|
name='one',
|
|
type=external,
|
|
target='one.unit.tests',
|
|
priority=1,
|
|
),
|
|
Endpoint(
|
|
name='two',
|
|
type=nested,
|
|
target_resource_id=id_format.format('pool-two'),
|
|
priority=2,
|
|
),
|
|
Endpoint(
|
|
name='three',
|
|
type=external,
|
|
target='three.unit.tests',
|
|
priority=3,
|
|
),
|
|
Endpoint(
|
|
name='--default--',
|
|
type=external,
|
|
target='default.unit.tests',
|
|
priority=4,
|
|
),
|
|
],
|
|
),
|
|
Profile(
|
|
id=id_format.format('rule-two'),
|
|
name=name_format.format('rule-two'),
|
|
traffic_routing_method='Priority',
|
|
dns_config=DnsConfig(ttl=60),
|
|
monitor_config=monitor,
|
|
endpoints=[
|
|
Endpoint(
|
|
name='two',
|
|
type=nested,
|
|
target_resource_id=id_format.format('pool-two'),
|
|
priority=1,
|
|
),
|
|
Endpoint(
|
|
name='three',
|
|
type=external,
|
|
target='three.unit.tests',
|
|
priority=2,
|
|
),
|
|
Endpoint(
|
|
name='--default--',
|
|
type=external,
|
|
target='default.unit.tests',
|
|
priority=3,
|
|
),
|
|
],
|
|
),
|
|
Profile(
|
|
id=base_id + prefix,
|
|
name=prefix,
|
|
traffic_routing_method='Geographic',
|
|
dns_config=DnsConfig(ttl=60),
|
|
monitor_config=monitor,
|
|
endpoints=[
|
|
Endpoint(
|
|
geo_mapping=['GEO-AF', 'DE', 'US-CA', 'GEO-AP'],
|
|
name='rule-one',
|
|
type=nested,
|
|
target_resource_id=id_format.format('rule-one'),
|
|
),
|
|
Endpoint(
|
|
geo_mapping=['WORLD'],
|
|
name='rule-two',
|
|
type=nested,
|
|
target_resource_id=id_format.format('rule-two'),
|
|
),
|
|
],
|
|
),
|
|
]
|
|
|
|
for profile in profiles:
|
|
profile.dns_config.relative_name = profile.name
|
|
|
|
return profiles
|
|
|
|
def _get_dynamic_package(self):
|
|
'''Convenience function to setup a sample dynamic record.
|
|
'''
|
|
provider = self._get_provider()
|
|
|
|
# setup traffic manager profiles
|
|
tm_list = provider._tm_client.profiles.list_by_resource_group
|
|
tm_list.return_value = self._get_tm_profiles(provider)
|
|
|
|
# setup zone with dynamic record
|
|
zone = Zone(name='unit.tests.', sub_zones=[])
|
|
record = self._get_dynamic_record(zone)
|
|
zone.add_record(record)
|
|
|
|
# return everything
|
|
return provider, zone, record
|
|
|
|
def test_populate_records(self):
|
|
provider = self._get_provider()
|
|
|
|
rs = []
|
|
recordSet = RecordSet(a_records=[ARecord(ipv4_address='1.1.1.1')])
|
|
recordSet.name, recordSet.ttl, recordSet.type = 'a1', 0, 'A'
|
|
recordSet.target_resource = SubResource()
|
|
rs.append(recordSet)
|
|
recordSet = RecordSet(a_records=[ARecord(ipv4_address='1.1.1.1'),
|
|
ARecord(ipv4_address='2.2.2.2')])
|
|
recordSet.name, recordSet.ttl, recordSet.type = 'a2', 1, 'A'
|
|
recordSet.target_resource = SubResource()
|
|
rs.append(recordSet)
|
|
aaaa1 = AaaaRecord(ipv6_address='1:1ec:1::1')
|
|
recordSet = RecordSet(aaaa_records=[aaaa1])
|
|
recordSet.name, recordSet.ttl, recordSet.type = 'aaaa1', 2, 'AAAA'
|
|
recordSet.target_resource = SubResource()
|
|
rs.append(recordSet)
|
|
aaaa2 = AaaaRecord(ipv6_address='1:1ec:1::2')
|
|
recordSet = RecordSet(aaaa_records=[aaaa1,
|
|
aaaa2])
|
|
recordSet.name, recordSet.ttl, recordSet.type = 'aaaa2', 3, 'AAAA'
|
|
recordSet.target_resource = SubResource()
|
|
rs.append(recordSet)
|
|
recordSet = RecordSet(caa_records=[CaaRecord(flags=0,
|
|
tag='issue',
|
|
value='caa1.unit.tests')])
|
|
recordSet.name, recordSet.ttl, recordSet.type = 'caa1', 4, 'CAA'
|
|
rs.append(recordSet)
|
|
recordSet = RecordSet(caa_records=[CaaRecord(flags=0,
|
|
tag='issue',
|
|
value='caa1.unit.tests'),
|
|
CaaRecord(flags=0,
|
|
tag='issue',
|
|
value='caa2.unit.tests')])
|
|
recordSet.name, recordSet.ttl, recordSet.type = 'caa2', 4, 'CAA'
|
|
rs.append(recordSet)
|
|
cname1 = CnameRecord(cname='cname.unit.test.')
|
|
recordSet = RecordSet(cname_record=cname1)
|
|
recordSet.name, recordSet.ttl, recordSet.type = 'cname1', 5, 'CNAME'
|
|
recordSet.target_resource = SubResource()
|
|
rs.append(recordSet)
|
|
recordSet = RecordSet(mx_records=[MxRecord(preference=10,
|
|
exchange='mx1.unit.test.')])
|
|
recordSet.name, recordSet.ttl, recordSet.type = 'mx1', 7, 'MX'
|
|
rs.append(recordSet)
|
|
recordSet = RecordSet(mx_records=[MxRecord(preference=10,
|
|
exchange='mx1.unit.test.'),
|
|
MxRecord(preference=11,
|
|
exchange='mx2.unit.test.')])
|
|
recordSet.name, recordSet.ttl, recordSet.type = 'mx2', 8, 'MX'
|
|
rs.append(recordSet)
|
|
recordSet = RecordSet(ns_records=[NsRecord(nsdname='ns1.unit.test.')])
|
|
recordSet.name, recordSet.ttl, recordSet.type = 'ns1', 9, 'NS'
|
|
rs.append(recordSet)
|
|
recordSet = RecordSet(ns_records=[NsRecord(nsdname='ns1.unit.test.'),
|
|
NsRecord(nsdname='ns2.unit.test.')])
|
|
recordSet.name, recordSet.ttl, recordSet.type = 'ns2', 10, 'NS'
|
|
rs.append(recordSet)
|
|
ptr1 = PtrRecord(ptrdname='ptr1.unit.test.')
|
|
recordSet = RecordSet(ptr_records=[ptr1])
|
|
recordSet.name, recordSet.ttl, recordSet.type = 'ptr1', 11, 'PTR'
|
|
rs.append(recordSet)
|
|
recordSet = RecordSet(srv_records=[SrvRecord(priority=1,
|
|
weight=2,
|
|
port=3,
|
|
target='1unit.tests.')])
|
|
recordSet.name, recordSet.ttl, recordSet.type = '_srv1._tcp', 13, 'SRV'
|
|
rs.append(recordSet)
|
|
recordSet = RecordSet(srv_records=[SrvRecord(priority=1,
|
|
weight=2,
|
|
port=3,
|
|
target='1unit.tests.'),
|
|
SrvRecord(priority=4,
|
|
weight=5,
|
|
port=6,
|
|
target='2unit.tests.')])
|
|
recordSet.name, recordSet.ttl, recordSet.type = '_srv2._tcp', 14, 'SRV'
|
|
rs.append(recordSet)
|
|
recordSet = RecordSet(txt_records=[TxtRecord(value='sample text1')])
|
|
recordSet.name, recordSet.ttl, recordSet.type = 'txt1', 15, 'TXT'
|
|
recordSet.target_resource = SubResource()
|
|
rs.append(recordSet)
|
|
recordSet = RecordSet(txt_records=[TxtRecord(value='sample text1'),
|
|
TxtRecord(value='sample text2')])
|
|
recordSet.name, recordSet.ttl, recordSet.type = 'txt2', 16, 'TXT'
|
|
recordSet.target_resource = SubResource()
|
|
rs.append(recordSet)
|
|
recordSet = RecordSet(soa_record=[SoaRecord()])
|
|
recordSet.name, recordSet.ttl, recordSet.type = '', 17, 'SOA'
|
|
rs.append(recordSet)
|
|
long_txt = "v=spf1 ip4:10.10.0.0/24 ip4:10.10.1.0/24 ip4:10.10.2.0/24"
|
|
long_txt += " ip4:10.10.3.0/24 ip4:10.10.4.0/24 ip4:10.10.5.0/24 "
|
|
long_txt += " 10.6.0/24 ip4:10.10.7.0/24 ip4:10.10.8.0/24 "
|
|
long_txt += " ip4:10.10.10.0/24 ip4:10.10.11.0/24 ip4:10.10.12.0/24"
|
|
long_txt += " ip4:10.10.13.0/24 ip4:10.10.14.0/24 ip4:10.10.15.0/24"
|
|
long_txt += " ip4:10.10.16.0/24 ip4:10.10.17.0/24 ip4:10.10.18.0/24"
|
|
long_txt += " ip4:10.10.19.0/24 ip4:10.10.20.0/24 ~all"
|
|
recordSet = RecordSet(txt_records=[TxtRecord(value='sample value1'),
|
|
TxtRecord(value=long_txt)])
|
|
recordSet.name, recordSet.ttl, recordSet.type = 'txt3', 18, 'TXT'
|
|
recordSet.target_resource = SubResource()
|
|
rs.append(recordSet)
|
|
|
|
record_list = provider._dns_client.record_sets.list_by_dns_zone
|
|
record_list.return_value = rs
|
|
|
|
zone_list = provider._dns_client.zones.list_by_resource_group
|
|
zone_list.return_value = [zone]
|
|
|
|
exists = provider.populate(zone)
|
|
|
|
self.assertEquals(len(zone.records), 17)
|
|
self.assertTrue(exists)
|
|
|
|
def test_populate_zone(self):
|
|
provider = self._get_provider()
|
|
|
|
zone_list = provider._dns_client.zones.list_by_resource_group
|
|
zone_1 = AzureZone(location='global')
|
|
# This is far from ideal but the
|
|
# zone constructor doesn't let me set it on creation
|
|
zone_1.name = "zone-1"
|
|
zone_2 = AzureZone(location='global')
|
|
# This is far from ideal but the
|
|
# zone constructor doesn't let me set it on creation
|
|
zone_2.name = "zone-2"
|
|
zone_list.return_value = [zone_1,
|
|
zone_2,
|
|
zone_1]
|
|
|
|
provider._populate_zones()
|
|
|
|
# This should be returning two zones since two zones are the same
|
|
self.assertEquals(len(provider._azure_zones), 2)
|
|
|
|
def test_bad_zone_response(self):
|
|
provider = self._get_provider()
|
|
|
|
_get = provider._dns_client.zones.get
|
|
_get.side_effect = CloudError(Mock(status=404), 'Azure Error')
|
|
self.assertEquals(
|
|
provider._check_zone('unit.test', create=False),
|
|
None
|
|
)
|
|
|
|
def test_extra_changes(self):
|
|
provider, existing, record = self._get_dynamic_package()
|
|
|
|
# test simple records produce no extra changes
|
|
desired = Zone(name=existing.name, sub_zones=[])
|
|
desired.add_record(Record.new(desired, 'simple', data={
|
|
'type': record._type,
|
|
'ttl': record.ttl,
|
|
'value': record.value,
|
|
}))
|
|
extra = provider._extra_changes(desired, desired, [])
|
|
self.assertEqual(len(extra), 0)
|
|
|
|
# test an unchanged dynamic record produces no extra changes
|
|
desired.add_record(record)
|
|
extra = provider._extra_changes(existing, desired, [])
|
|
self.assertEqual(len(extra), 0)
|
|
|
|
# test unused TM produces the extra change for clean up
|
|
sample_profile = self._get_tm_profiles(provider)[0]
|
|
tm_id = provider._profile_name_to_id
|
|
root_profile_name = _root_traffic_manager_name(record)
|
|
extra_profile = Profile(
|
|
id=tm_id('{}-pool-random'.format(root_profile_name)),
|
|
name='{}-pool-random'.format(root_profile_name),
|
|
traffic_routing_method='Weighted',
|
|
dns_config=sample_profile.dns_config,
|
|
monitor_config=sample_profile.monitor_config,
|
|
endpoints=sample_profile.endpoints,
|
|
)
|
|
tm_list = provider._tm_client.profiles.list_by_resource_group
|
|
tm_list.return_value.append(extra_profile)
|
|
provider._populate_traffic_managers()
|
|
extra = provider._extra_changes(existing, desired, [])
|
|
self.assertEqual(len(extra), 1)
|
|
extra = extra[0]
|
|
self.assertIsInstance(extra, Update)
|
|
self.assertEqual(extra.new, record)
|
|
desired._remove_record(record)
|
|
tm_list.return_value.pop()
|
|
|
|
# test new dynamic record does not produce an extra change for it
|
|
new_dynamic = Record.new(desired, record.name + '2', data={
|
|
'type': record._type,
|
|
'ttl': record.ttl,
|
|
'value': record.value,
|
|
'dynamic': record.dynamic._data(),
|
|
'octodns': record._octodns,
|
|
})
|
|
# test change in healthcheck by using a different port number
|
|
update_dynamic = Record.new(desired, record.name, data={
|
|
'type': record._type,
|
|
'ttl': record.ttl,
|
|
'value': record.value,
|
|
'dynamic': record.dynamic._data(),
|
|
'octodns': {
|
|
'healthcheck': {
|
|
'path': '/_ping',
|
|
'port': 443,
|
|
'protocol': 'HTTPS',
|
|
},
|
|
},
|
|
})
|
|
desired.add_record(new_dynamic)
|
|
desired.add_record(update_dynamic)
|
|
changes = [Create(new_dynamic)]
|
|
extra = provider._extra_changes(existing, desired, changes)
|
|
# implicitly asserts that new_dynamic was not added to extra changes
|
|
# as it was already in the `changes` list
|
|
self.assertEqual(len(extra), 1)
|
|
extra = extra[0]
|
|
self.assertIsInstance(extra, Update)
|
|
self.assertEqual(extra.new, update_dynamic)
|
|
|
|
# test non-CNAME dynamic record throws exception
|
|
a_dynamic = Record.new(desired, record.name + '3', data={
|
|
'type': 'A',
|
|
'ttl': record.ttl,
|
|
'values': ['1.1.1.1'],
|
|
'dynamic': {
|
|
'pools': {
|
|
'one': {'values': [{'value': '2.2.2.2'}]},
|
|
},
|
|
'rules': [
|
|
{'pool': 'one'},
|
|
],
|
|
},
|
|
})
|
|
desired.add_record(a_dynamic)
|
|
changes.append(Create(a_dynamic))
|
|
with self.assertRaises(AzureException) as ctx:
|
|
provider._extra_changes(existing, desired, changes)
|
|
self.assertTrue(text_type(ctx).endswith(
|
|
'must be of type CNAME'
|
|
))
|
|
desired._remove_record(a_dynamic)
|
|
|
|
# test colliding ATM names throws exception
|
|
record1 = Record.new(desired, 'sub.www', data={
|
|
'type': record._type,
|
|
'ttl': record.ttl,
|
|
'value': record.value,
|
|
'dynamic': record.dynamic._data(),
|
|
})
|
|
record2 = Record.new(desired, 'sub--www', data={
|
|
'type': record._type,
|
|
'ttl': record.ttl,
|
|
'value': record.value,
|
|
'dynamic': record.dynamic._data(),
|
|
})
|
|
desired.add_record(record1)
|
|
desired.add_record(record2)
|
|
changes = [Create(record1), Create(record2)]
|
|
with self.assertRaises(AzureException) as ctx:
|
|
provider._extra_changes(existing, desired, changes)
|
|
self.assertTrue(text_type(ctx).startswith(
|
|
'Collision in Traffic Manager'
|
|
))
|
|
|
|
def test_generate_tm_profile(self):
|
|
provider, zone, record = self._get_dynamic_package()
|
|
profile_gen = provider._generate_tm_profile
|
|
|
|
label = 'foobar'
|
|
routing = 'Priority'
|
|
endpoints = [
|
|
Endpoint(target='one.unit.tests'),
|
|
Endpoint(target_resource_id='/s/1/rg/foo/tm/foobar2'),
|
|
Endpoint(name='invalid'),
|
|
]
|
|
|
|
# invalid endpoint raises exception
|
|
with self.assertRaises(AzureException):
|
|
profile_gen(routing, endpoints, record, label)
|
|
|
|
# regular test
|
|
endpoints.pop()
|
|
profile = profile_gen(routing, endpoints, record, label)
|
|
|
|
# implicitly tests _profile_name_to_id
|
|
sub = provider._dns_client_subscription_id
|
|
rg = provider._resource_group
|
|
expected_name = 'foo--unit--tests-rule-foobar'
|
|
expected_id = '/subscriptions/' + sub + \
|
|
'/resourceGroups/' + rg + \
|
|
'/providers/Microsoft.Network/trafficManagerProfiles/' + \
|
|
expected_name
|
|
self.assertEqual(profile.id, expected_id)
|
|
self.assertEqual(profile.name, expected_name)
|
|
self.assertEqual(profile.name, profile.dns_config.relative_name)
|
|
self.assertEqual(profile.traffic_routing_method, routing)
|
|
self.assertEqual(profile.dns_config.ttl, record.ttl)
|
|
self.assertEqual(len(profile.endpoints), len(endpoints))
|
|
|
|
self.assertEqual(
|
|
profile.endpoints[0].type,
|
|
'Microsoft.Network/trafficManagerProfiles/externalEndpoints'
|
|
)
|
|
self.assertEqual(
|
|
profile.endpoints[1].type,
|
|
'Microsoft.Network/trafficManagerProfiles/nestedEndpoints'
|
|
)
|
|
|
|
def test_dynamic_record(self):
|
|
provider, zone, record = self._get_dynamic_package()
|
|
profiles = provider._generate_traffic_managers(record)
|
|
|
|
# check that every profile is a match with what we expect
|
|
expected_profiles = self._get_tm_profiles(provider)
|
|
self.assertEqual(len(expected_profiles), len(profiles))
|
|
for have, expected in zip(profiles, expected_profiles):
|
|
self.assertTrue(_profile_is_match(have, expected))
|
|
|
|
# check that dynamic record is populated back from profiles
|
|
azrecord = RecordSet(
|
|
ttl=60,
|
|
target_resource=SubResource(id=profiles[-1].id),
|
|
)
|
|
azrecord.name = record.name or '@'
|
|
azrecord.type = 'Microsoft.Network/dnszones/{}'.format(record._type)
|
|
record2 = provider._populate_record(zone, azrecord)
|
|
self.assertEqual(record2.dynamic._data(), record.dynamic._data())
|
|
|
|
def test_generate_traffic_managers_middle_east(self):
|
|
# check Asia/Middle East test case
|
|
provider, zone, record = self._get_dynamic_package()
|
|
record.dynamic._data()['rules'][0]['geos'].append('AS')
|
|
profiles = provider._generate_traffic_managers(record)
|
|
self.assertIn('GEO-ME', profiles[-1].endpoints[0].geo_mapping)
|
|
self.assertIn('GEO-AS', profiles[-1].endpoints[0].geo_mapping)
|
|
|
|
def test_populate_dynamic_middle_east(self):
|
|
# Middle east without Asia raises exception
|
|
provider, zone, record = self._get_dynamic_package()
|
|
tm_suffix = _root_traffic_manager_name(record)
|
|
tm_id = provider._profile_name_to_id
|
|
tm_list = provider._tm_client.profiles.list_by_resource_group
|
|
tm_list.return_value = [
|
|
Profile(
|
|
id=tm_id(tm_suffix),
|
|
name=tm_suffix,
|
|
traffic_routing_method='Geographic',
|
|
endpoints=[
|
|
Endpoint(
|
|
geo_mapping=['GEO-ME'],
|
|
),
|
|
],
|
|
),
|
|
]
|
|
azrecord = RecordSet(
|
|
ttl=60,
|
|
target_resource=SubResource(id=tm_id(tm_suffix)),
|
|
)
|
|
azrecord.name = record.name or '@'
|
|
azrecord.type = 'Microsoft.Network/dnszones/{}'.format(record._type)
|
|
with self.assertRaises(AzureException) as ctx:
|
|
provider._populate_record(zone, azrecord)
|
|
self.assertTrue(text_type(ctx).startswith(
|
|
'Middle East (GEO-ME) is not supported'
|
|
))
|
|
|
|
# valid profiles with Middle East test case
|
|
provider, zone, record = self._get_dynamic_package()
|
|
geo_profile = provider._get_tm_for_dynamic_record(record)
|
|
geo_profile.endpoints[0].geo_mapping.extend(['GEO-ME', 'GEO-AS'])
|
|
record = provider._populate_record(zone, azrecord)
|
|
self.assertIn('AS', record.dynamic.rules[0].data['geos'])
|
|
self.assertNotIn('ME', record.dynamic.rules[0].data['geos'])
|
|
|
|
def test_dynamic_no_geo(self):
|
|
# test that traffic managers are generated as expected
|
|
provider, zone, record = self._get_dynamic_package()
|
|
external = 'Microsoft.Network/trafficManagerProfiles/externalEndpoints'
|
|
|
|
record = Record.new(zone, 'foo', data={
|
|
'type': 'CNAME',
|
|
'ttl': 60,
|
|
'value': 'default.unit.tests.',
|
|
'dynamic': {
|
|
'pools': {
|
|
'one': {
|
|
'values': [
|
|
{'value': 'one.unit.tests.'},
|
|
],
|
|
},
|
|
},
|
|
'rules': [
|
|
{'pool': 'one'},
|
|
],
|
|
}
|
|
})
|
|
profiles = provider._generate_traffic_managers(record)
|
|
|
|
self.assertEqual(len(profiles), 1)
|
|
self.assertTrue(_profile_is_match(profiles[0], Profile(
|
|
name='foo--unit--tests',
|
|
traffic_routing_method='Priority',
|
|
dns_config=DnsConfig(
|
|
relative_name='foo--unit--tests', ttl=60),
|
|
monitor_config=_get_monitor(record),
|
|
endpoints=[
|
|
Endpoint(
|
|
name='one',
|
|
type=external,
|
|
target='one.unit.tests',
|
|
priority=1,
|
|
),
|
|
Endpoint(
|
|
name='--default--',
|
|
type=external,
|
|
target='default.unit.tests',
|
|
priority=2,
|
|
),
|
|
],
|
|
)))
|
|
|
|
# test that same record gets populated back from traffic managers
|
|
tm_list = provider._tm_client.profiles.list_by_resource_group
|
|
tm_list.return_value = profiles
|
|
azrecord = RecordSet(
|
|
ttl=60,
|
|
target_resource=SubResource(id=profiles[0].id),
|
|
)
|
|
azrecord.name = record.name or '@'
|
|
azrecord.type = 'Microsoft.Network/dnszones/{}'.format(record._type)
|
|
record2 = provider._populate_record(zone, azrecord)
|
|
self.assertEqual(record2.dynamic._data(), record.dynamic._data())
|
|
|
|
def test_dynamic_fallback_is_default(self):
|
|
# test that traffic managers are generated as expected
|
|
provider, zone, record = self._get_dynamic_package()
|
|
external = 'Microsoft.Network/trafficManagerProfiles/externalEndpoints'
|
|
|
|
record = Record.new(zone, 'foo', data={
|
|
'type': 'CNAME',
|
|
'ttl': 60,
|
|
'value': 'default.unit.tests.',
|
|
'dynamic': {
|
|
'pools': {
|
|
'def': {
|
|
'values': [
|
|
{'value': 'default.unit.tests.'},
|
|
],
|
|
},
|
|
},
|
|
'rules': [
|
|
{'geos': ['AF'], 'pool': 'def'},
|
|
],
|
|
}
|
|
})
|
|
profiles = provider._generate_traffic_managers(record)
|
|
|
|
self.assertEqual(len(profiles), 1)
|
|
self.assertTrue(_profile_is_match(profiles[0], Profile(
|
|
name='foo--unit--tests',
|
|
traffic_routing_method='Geographic',
|
|
dns_config=DnsConfig(
|
|
relative_name='foo--unit--tests', ttl=60),
|
|
monitor_config=_get_monitor(record),
|
|
endpoints=[
|
|
Endpoint(
|
|
name='def--default--',
|
|
type=external,
|
|
target='default.unit.tests',
|
|
geo_mapping=['GEO-AF'],
|
|
),
|
|
],
|
|
)))
|
|
|
|
# test that same record gets populated back from traffic managers
|
|
tm_list = provider._tm_client.profiles.list_by_resource_group
|
|
tm_list.return_value = profiles
|
|
azrecord = RecordSet(
|
|
ttl=60,
|
|
target_resource=SubResource(id=profiles[0].id),
|
|
)
|
|
azrecord.name = record.name or '@'
|
|
azrecord.type = 'Microsoft.Network/dnszones/{}'.format(record._type)
|
|
record2 = provider._populate_record(zone, azrecord)
|
|
self.assertEqual(record2.dynamic._data(), record.dynamic._data())
|
|
|
|
def test_dynamic_pool_contains_default(self):
|
|
# test that traffic managers are generated as expected
|
|
provider, zone, record = self._get_dynamic_package()
|
|
tm_id = provider._profile_name_to_id
|
|
external = 'Microsoft.Network/trafficManagerProfiles/externalEndpoints'
|
|
nested = 'Microsoft.Network/trafficManagerProfiles/nestedEndpoints'
|
|
|
|
record = Record.new(zone, 'foo', data={
|
|
'type': 'CNAME',
|
|
'ttl': 60,
|
|
'value': 'default.unit.tests.',
|
|
'dynamic': {
|
|
'pools': {
|
|
'rr': {
|
|
'values': [
|
|
{'value': 'one.unit.tests.'},
|
|
{'value': 'two.unit.tests.'},
|
|
{'value': 'default.unit.tests.'},
|
|
{'value': 'final.unit.tests.'},
|
|
],
|
|
},
|
|
},
|
|
'rules': [
|
|
{'geos': ['AF'], 'pool': 'rr'},
|
|
],
|
|
}
|
|
})
|
|
profiles = provider._generate_traffic_managers(record)
|
|
|
|
self.assertEqual(len(profiles), 2)
|
|
self.assertTrue(_profile_is_match(profiles[0], Profile(
|
|
name='foo--unit--tests-pool-rr',
|
|
traffic_routing_method='Weighted',
|
|
dns_config=DnsConfig(
|
|
relative_name='foo--unit--tests-pool-rr', ttl=60),
|
|
monitor_config=_get_monitor(record),
|
|
endpoints=[
|
|
Endpoint(
|
|
name='rr--one.unit.tests',
|
|
type=external,
|
|
target='one.unit.tests',
|
|
weight=1,
|
|
),
|
|
Endpoint(
|
|
name='rr--two.unit.tests',
|
|
type=external,
|
|
target='two.unit.tests',
|
|
weight=1,
|
|
),
|
|
Endpoint(
|
|
name='rr--default.unit.tests--default--',
|
|
type=external,
|
|
target='default.unit.tests',
|
|
weight=1,
|
|
),
|
|
Endpoint(
|
|
name='rr--final.unit.tests',
|
|
type=external,
|
|
target='final.unit.tests',
|
|
weight=1,
|
|
),
|
|
],
|
|
)))
|
|
self.assertTrue(_profile_is_match(profiles[1], Profile(
|
|
name='foo--unit--tests',
|
|
traffic_routing_method='Geographic',
|
|
dns_config=DnsConfig(
|
|
relative_name='foo--unit--tests', ttl=60),
|
|
monitor_config=_get_monitor(record),
|
|
endpoints=[
|
|
Endpoint(
|
|
name='rule-rr',
|
|
type=nested,
|
|
target_resource_id=tm_id(profiles[0].name),
|
|
geo_mapping=['GEO-AF'],
|
|
),
|
|
],
|
|
)))
|
|
|
|
# test that same record gets populated back from traffic managers
|
|
tm_list = provider._tm_client.profiles.list_by_resource_group
|
|
tm_list.return_value = profiles
|
|
azrecord = RecordSet(
|
|
ttl=60,
|
|
target_resource=SubResource(id=profiles[1].id),
|
|
)
|
|
azrecord.name = record.name or '@'
|
|
azrecord.type = 'Microsoft.Network/dnszones/{}'.format(record._type)
|
|
record2 = provider._populate_record(zone, azrecord)
|
|
self.assertEqual(record2.dynamic._data(), record.dynamic._data())
|
|
|
|
def test_dynamic_pool_contains_default_no_geo(self):
|
|
# test that traffic managers are generated as expected
|
|
provider, zone, record = self._get_dynamic_package()
|
|
external = 'Microsoft.Network/trafficManagerProfiles/externalEndpoints'
|
|
|
|
record = Record.new(zone, 'foo', data={
|
|
'type': 'CNAME',
|
|
'ttl': 60,
|
|
'value': 'default.unit.tests.',
|
|
'dynamic': {
|
|
'pools': {
|
|
'rr': {
|
|
'values': [
|
|
{'value': 'one.unit.tests.'},
|
|
{'value': 'two.unit.tests.'},
|
|
{'value': 'default.unit.tests.'},
|
|
{'value': 'final.unit.tests.'},
|
|
],
|
|
},
|
|
},
|
|
'rules': [
|
|
{'pool': 'rr'},
|
|
],
|
|
}
|
|
})
|
|
profiles = provider._generate_traffic_managers(record)
|
|
|
|
self.assertEqual(len(profiles), 1)
|
|
self.assertTrue(_profile_is_match(profiles[0], Profile(
|
|
name='foo--unit--tests',
|
|
traffic_routing_method='Weighted',
|
|
dns_config=DnsConfig(
|
|
relative_name='foo--unit--tests', ttl=60),
|
|
monitor_config=_get_monitor(record),
|
|
endpoints=[
|
|
Endpoint(
|
|
name='rr--one.unit.tests',
|
|
type=external,
|
|
target='one.unit.tests',
|
|
weight=1,
|
|
),
|
|
Endpoint(
|
|
name='rr--two.unit.tests',
|
|
type=external,
|
|
target='two.unit.tests',
|
|
weight=1,
|
|
),
|
|
Endpoint(
|
|
name='rr--default.unit.tests--default--',
|
|
type=external,
|
|
target='default.unit.tests',
|
|
weight=1,
|
|
),
|
|
Endpoint(
|
|
name='rr--final.unit.tests',
|
|
type=external,
|
|
target='final.unit.tests',
|
|
weight=1,
|
|
),
|
|
],
|
|
)))
|
|
|
|
# test that same record gets populated back from traffic managers
|
|
tm_list = provider._tm_client.profiles.list_by_resource_group
|
|
tm_list.return_value = profiles
|
|
azrecord = RecordSet(
|
|
ttl=60,
|
|
target_resource=SubResource(id=profiles[0].id),
|
|
)
|
|
azrecord.name = record.name or '@'
|
|
azrecord.type = 'Microsoft.Network/dnszones/{}'.format(record._type)
|
|
record2 = provider._populate_record(zone, azrecord)
|
|
self.assertEqual(record2.dynamic._data(), record.dynamic._data())
|
|
|
|
def test_dynamic_last_pool_contains_default_no_geo(self):
|
|
# test that traffic managers are generated as expected
|
|
provider, zone, record = self._get_dynamic_package()
|
|
tm_id = provider._profile_name_to_id
|
|
external = 'Microsoft.Network/trafficManagerProfiles/externalEndpoints'
|
|
nested = 'Microsoft.Network/trafficManagerProfiles/nestedEndpoints'
|
|
|
|
record = Record.new(zone, 'foo', data={
|
|
'type': 'CNAME',
|
|
'ttl': 60,
|
|
'value': 'default.unit.tests.',
|
|
'dynamic': {
|
|
'pools': {
|
|
'cloud': {
|
|
'values': [
|
|
{'value': 'cloud.unit.tests.'},
|
|
],
|
|
'fallback': 'rr',
|
|
},
|
|
'rr': {
|
|
'values': [
|
|
{'value': 'one.unit.tests.'},
|
|
{'value': 'two.unit.tests.'},
|
|
{'value': 'default.unit.tests.'},
|
|
{'value': 'final.unit.tests.'},
|
|
],
|
|
},
|
|
},
|
|
'rules': [
|
|
{'pool': 'cloud'},
|
|
],
|
|
}
|
|
})
|
|
profiles = provider._generate_traffic_managers(record)
|
|
|
|
self.assertEqual(len(profiles), 2)
|
|
self.assertTrue(_profile_is_match(profiles[0], Profile(
|
|
name='foo--unit--tests-pool-rr',
|
|
traffic_routing_method='Weighted',
|
|
dns_config=DnsConfig(
|
|
relative_name='foo--unit--tests-pool-rr', ttl=60),
|
|
monitor_config=_get_monitor(record),
|
|
endpoints=[
|
|
Endpoint(
|
|
name='rr--one.unit.tests',
|
|
type=external,
|
|
target='one.unit.tests',
|
|
weight=1,
|
|
),
|
|
Endpoint(
|
|
name='rr--two.unit.tests',
|
|
type=external,
|
|
target='two.unit.tests',
|
|
weight=1,
|
|
),
|
|
Endpoint(
|
|
name='rr--default.unit.tests--default--',
|
|
type=external,
|
|
target='default.unit.tests',
|
|
weight=1,
|
|
),
|
|
Endpoint(
|
|
name='rr--final.unit.tests',
|
|
type=external,
|
|
target='final.unit.tests',
|
|
weight=1,
|
|
),
|
|
],
|
|
)))
|
|
self.assertTrue(_profile_is_match(profiles[1], Profile(
|
|
name='foo--unit--tests',
|
|
traffic_routing_method='Priority',
|
|
dns_config=DnsConfig(
|
|
relative_name='foo--unit--tests', ttl=60),
|
|
monitor_config=_get_monitor(record),
|
|
endpoints=[
|
|
Endpoint(
|
|
name='cloud',
|
|
type=external,
|
|
target='cloud.unit.tests',
|
|
priority=1,
|
|
),
|
|
Endpoint(
|
|
name='rr',
|
|
type=nested,
|
|
target_resource_id=tm_id(profiles[0].name),
|
|
priority=2,
|
|
),
|
|
],
|
|
)))
|
|
|
|
# test that same record gets populated back from traffic managers
|
|
tm_list = provider._tm_client.profiles.list_by_resource_group
|
|
tm_list.return_value = profiles
|
|
azrecord = RecordSet(
|
|
ttl=60,
|
|
target_resource=SubResource(id=profiles[1].id),
|
|
)
|
|
azrecord.name = record.name or '@'
|
|
azrecord.type = 'Microsoft.Network/dnszones/{}'.format(record._type)
|
|
record2 = provider._populate_record(zone, azrecord)
|
|
self.assertEqual(record2.dynamic._data(), record.dynamic._data())
|
|
|
|
def test_dynamic_unique_traffic_managers(self):
|
|
record = self._get_dynamic_record(zone)
|
|
data = {
|
|
'type': record._type,
|
|
'ttl': record.ttl,
|
|
'value': record.value,
|
|
'dynamic': record.dynamic._data()
|
|
}
|
|
record_names = [
|
|
'www.foo', 'www-foo'
|
|
]
|
|
provider = self._get_provider()
|
|
|
|
seen = set()
|
|
for name in record_names:
|
|
record = Record.new(zone, name, data=data)
|
|
tms = provider._generate_traffic_managers(record)
|
|
for tm in tms:
|
|
self.assertNotIn(tm.name, seen)
|
|
seen.add(tm.name)
|
|
|
|
def test_sync_traffic_managers(self):
|
|
provider, zone, record = self._get_dynamic_package()
|
|
provider._populate_traffic_managers()
|
|
|
|
tm_sync = provider._tm_client.profiles.create_or_update
|
|
|
|
prefix = 'foo--unit--tests'
|
|
expected_seen = {
|
|
prefix, '{}-pool-two'.format(prefix),
|
|
'{}-rule-one'.format(prefix), '{}-rule-two'.format(prefix),
|
|
}
|
|
|
|
# test no change
|
|
seen = provider._sync_traffic_managers(record)
|
|
self.assertEqual(seen, expected_seen)
|
|
tm_sync.assert_not_called()
|
|
|
|
# test that changing weight causes update API call
|
|
dynamic = record.dynamic._data()
|
|
dynamic['pools']['two']['values'][0]['weight'] = 14
|
|
data = {
|
|
'type': 'CNAME',
|
|
'ttl': record.ttl,
|
|
'value': record.value,
|
|
'dynamic': dynamic,
|
|
'octodns': record._octodns,
|
|
}
|
|
new_record = Record.new(zone, record.name, data)
|
|
tm_sync.reset_mock()
|
|
seen2 = provider._sync_traffic_managers(new_record)
|
|
self.assertEqual(seen2, expected_seen)
|
|
tm_sync.assert_called_once()
|
|
|
|
# test that new profile was successfully inserted in cache
|
|
new_profile = provider._get_tm_profile_by_name(
|
|
'{}-pool-two'.format(prefix)
|
|
)
|
|
self.assertEqual(new_profile.endpoints[0].weight, 14)
|
|
|
|
@patch(
|
|
'octodns.provider.azuredns.AzureProvider._generate_traffic_managers')
|
|
def test_sync_traffic_managers_duplicate(self, mock_gen_tms):
|
|
provider, zone, record = self._get_dynamic_package()
|
|
tm_sync = provider._tm_client.profiles.create_or_update
|
|
|
|
# change and duplicate profiles
|
|
profile = self._get_tm_profiles(provider)[0]
|
|
profile.name = 'changing_this_to_trigger_sync'
|
|
mock_gen_tms.return_value = [profile, profile]
|
|
provider._sync_traffic_managers(record)
|
|
|
|
# it should only be called once for duplicate profiles
|
|
tm_sync.assert_called_once()
|
|
|
|
def test_find_traffic_managers(self):
|
|
provider, zone, record = self._get_dynamic_package()
|
|
|
|
# insert a non-matching profile
|
|
sample_profile = self._get_tm_profiles(provider)[0]
|
|
# dummy record for generating suffix
|
|
record2 = Record.new(zone, record.name + '2', data={
|
|
'type': record._type,
|
|
'ttl': record.ttl,
|
|
'value': record.value,
|
|
})
|
|
prefix2 = _root_traffic_manager_name(record2)
|
|
tm_id = provider._profile_name_to_id
|
|
extra_profile = Profile(
|
|
id=tm_id('{}-pool-random'.format(prefix2)),
|
|
name='{}-pool-random'.format(prefix2),
|
|
traffic_routing_method='Weighted',
|
|
dns_config=sample_profile.dns_config,
|
|
monitor_config=sample_profile.monitor_config,
|
|
endpoints=sample_profile.endpoints,
|
|
)
|
|
tm_list = provider._tm_client.profiles.list_by_resource_group
|
|
tm_list.return_value.append(extra_profile)
|
|
provider._populate_traffic_managers()
|
|
|
|
# implicitly asserts that non-matching profile is not included
|
|
prefix = _root_traffic_manager_name(record)
|
|
self.assertEqual(provider._find_traffic_managers(record), {
|
|
prefix, '{}-pool-two'.format(prefix),
|
|
'{}-rule-one'.format(prefix), '{}-rule-two'.format(prefix),
|
|
})
|
|
|
|
def test_traffic_manager_gc(self):
|
|
provider, zone, record = self._get_dynamic_package()
|
|
provider._populate_traffic_managers()
|
|
|
|
profiles = provider._find_traffic_managers(record)
|
|
profile_delete_mock = provider._tm_client.profiles.delete
|
|
|
|
provider._traffic_managers_gc(record, profiles)
|
|
profile_delete_mock.assert_not_called()
|
|
|
|
profile_delete_mock.reset_mock()
|
|
remove = list(profiles)[3]
|
|
profiles.discard(remove)
|
|
|
|
provider._traffic_managers_gc(record, profiles)
|
|
profile_delete_mock.assert_has_calls(
|
|
[call(provider._resource_group, remove)]
|
|
)
|
|
|
|
def test_apply(self):
|
|
provider = self._get_provider()
|
|
|
|
half = int(len(octo_records) / 2)
|
|
changes = [Create(r) for r in octo_records[:half]] + \
|
|
[Update(r, r) for r in octo_records[half:]]
|
|
deletes = [Delete(r) for r in octo_records]
|
|
|
|
self.assertEquals(19, provider.apply(Plan(None, zone,
|
|
changes, True)))
|
|
self.assertEquals(19, provider.apply(Plan(zone, zone,
|
|
deletes, True)))
|
|
|
|
def test_apply_create_dynamic(self):
|
|
provider = self._get_provider()
|
|
|
|
tm_list = provider._tm_client.profiles.list_by_resource_group
|
|
tm_list.return_value = []
|
|
|
|
tm_sync = provider._tm_client.profiles.create_or_update
|
|
|
|
zone = Zone(name='unit.tests.', sub_zones=[])
|
|
record = self._get_dynamic_record(zone)
|
|
|
|
profiles = self._get_tm_profiles(provider)
|
|
|
|
provider._apply_Create(Create(record))
|
|
# create was called as many times as number of profiles required for
|
|
# the dynamic record
|
|
self.assertEqual(tm_sync.call_count, len(profiles))
|
|
|
|
create = provider._dns_client.record_sets.create_or_update
|
|
create.assert_called_once()
|
|
|
|
def test_apply_update_dynamic(self):
|
|
# existing is simple, new is dynamic
|
|
provider = self._get_provider()
|
|
tm_list = provider._tm_client.profiles.list_by_resource_group
|
|
tm_list.return_value = []
|
|
profiles = self._get_tm_profiles(provider)
|
|
dynamic_record = self._get_dynamic_record(zone)
|
|
simple_record = Record.new(zone, dynamic_record.name, data={
|
|
'type': 'CNAME',
|
|
'ttl': 3600,
|
|
'value': 'cname.unit.tests.',
|
|
})
|
|
change = Update(simple_record, dynamic_record)
|
|
provider._apply_Update(change)
|
|
tm_sync, dns_update, tm_delete = (
|
|
provider._tm_client.profiles.create_or_update,
|
|
provider._dns_client.record_sets.create_or_update,
|
|
provider._tm_client.profiles.delete
|
|
)
|
|
self.assertEqual(tm_sync.call_count, len(profiles))
|
|
dns_update.assert_called_once()
|
|
tm_delete.assert_not_called()
|
|
|
|
# existing is dynamic, new is simple
|
|
provider, existing, dynamic_record = self._get_dynamic_package()
|
|
profiles = self._get_tm_profiles(provider)
|
|
change = Update(dynamic_record, simple_record)
|
|
provider._apply_Update(change)
|
|
tm_sync, dns_update, tm_delete = (
|
|
provider._tm_client.profiles.create_or_update,
|
|
provider._dns_client.record_sets.create_or_update,
|
|
provider._tm_client.profiles.delete
|
|
)
|
|
tm_sync.assert_not_called()
|
|
dns_update.assert_called_once()
|
|
self.assertEqual(tm_delete.call_count, len(profiles))
|
|
|
|
# both are dynamic, healthcheck port is changed
|
|
provider, existing, dynamic_record = self._get_dynamic_package()
|
|
profiles = self._get_tm_profiles(provider)
|
|
dynamic_record2 = self._get_dynamic_record(existing)
|
|
dynamic_record2._octodns['healthcheck']['port'] += 1
|
|
change = Update(dynamic_record, dynamic_record2)
|
|
provider._apply_Update(change)
|
|
tm_sync, dns_update, tm_delete = (
|
|
provider._tm_client.profiles.create_or_update,
|
|
provider._dns_client.record_sets.create_or_update,
|
|
provider._tm_client.profiles.delete
|
|
)
|
|
self.assertEqual(tm_sync.call_count, len(profiles))
|
|
dns_update.assert_not_called()
|
|
tm_delete.assert_not_called()
|
|
|
|
# both are dynamic, extra profile should be deleted
|
|
provider, existing, dynamic_record = self._get_dynamic_package()
|
|
sample_profile = self._get_tm_profiles(provider)[0]
|
|
tm_id = provider._profile_name_to_id
|
|
root_profile_name = _root_traffic_manager_name(dynamic_record)
|
|
extra_profile = Profile(
|
|
id=tm_id('{}-pool-random'.format(root_profile_name)),
|
|
name='{}-pool-random'.format(root_profile_name),
|
|
traffic_routing_method='Weighted',
|
|
dns_config=sample_profile.dns_config,
|
|
monitor_config=sample_profile.monitor_config,
|
|
endpoints=sample_profile.endpoints,
|
|
)
|
|
tm_list = provider._tm_client.profiles.list_by_resource_group
|
|
tm_list.return_value.append(extra_profile)
|
|
change = Update(dynamic_record, dynamic_record)
|
|
provider._apply_Update(change)
|
|
tm_sync, dns_update, tm_delete = (
|
|
provider._tm_client.profiles.create_or_update,
|
|
provider._dns_client.record_sets.create_or_update,
|
|
provider._tm_client.profiles.delete
|
|
)
|
|
tm_sync.assert_not_called()
|
|
dns_update.assert_not_called()
|
|
tm_delete.assert_called_once()
|
|
|
|
# both are dynamic but alias is broken
|
|
provider, existing, record1 = self._get_dynamic_package()
|
|
azrecord = RecordSet(
|
|
ttl=record1.ttl, target_resource=SubResource(id=None))
|
|
azrecord.name = record1.name or '@'
|
|
azrecord.type = 'Microsoft.Network/dnszones/{}'.format(record1._type)
|
|
|
|
record2 = provider._populate_record(zone, azrecord)
|
|
self.assertEqual(record2.value, 'iam.invalid.')
|
|
|
|
change = Update(record2, record1)
|
|
provider._apply_Update(change)
|
|
tm_sync, dns_update, tm_delete = (
|
|
provider._tm_client.profiles.create_or_update,
|
|
provider._dns_client.record_sets.create_or_update,
|
|
provider._tm_client.profiles.delete
|
|
)
|
|
tm_sync.assert_not_called()
|
|
dns_update.assert_called_once()
|
|
tm_delete.assert_not_called()
|
|
|
|
def test_apply_delete_dynamic(self):
|
|
provider, existing, record = self._get_dynamic_package()
|
|
provider._populate_traffic_managers()
|
|
profiles = self._get_tm_profiles(provider)
|
|
change = Delete(record)
|
|
provider._apply_Delete(change)
|
|
dns_delete, tm_delete = (
|
|
provider._dns_client.record_sets.delete,
|
|
provider._tm_client.profiles.delete
|
|
)
|
|
dns_delete.assert_called_once()
|
|
self.assertEqual(tm_delete.call_count, len(profiles))
|
|
|
|
def test_create_zone(self):
|
|
provider = self._get_provider()
|
|
|
|
changes = []
|
|
for i in octo_records:
|
|
changes.append(Create(i))
|
|
desired = Zone('unit2.test.', [])
|
|
|
|
err_msg = 'The Resource \'Microsoft.Network/dnszones/unit2.test\' '
|
|
err_msg += 'under resource group \'mock_rg\' was not found.'
|
|
_get = provider._dns_client.zones.get
|
|
_get.side_effect = CloudError(Mock(status=404), err_msg)
|
|
|
|
self.assertEquals(19, provider.apply(Plan(None, desired, changes,
|
|
True)))
|
|
|
|
def test_check_zone_no_create(self):
|
|
provider = self._get_provider()
|
|
|
|
rs = []
|
|
recordSet = RecordSet(a_records=[ARecord(ipv4_address='1.1.1.1')])
|
|
recordSet.name, recordSet.ttl, recordSet.type = 'a1', 0, 'A'
|
|
rs.append(recordSet)
|
|
recordSet = RecordSet(a_records=[ARecord(ipv4_address='1.1.1.1'),
|
|
ARecord(ipv4_address='2.2.2.2')])
|
|
recordSet.name, recordSet.ttl, recordSet.type = 'a2', 1, 'A'
|
|
rs.append(recordSet)
|
|
|
|
record_list = provider._dns_client.record_sets.list_by_dns_zone
|
|
record_list.return_value = rs
|
|
|
|
err_msg = 'The Resource \'Microsoft.Network/dnszones/unit3.test\' '
|
|
err_msg += 'under resource group \'mock_rg\' was not found.'
|
|
_get = provider._dns_client.zones.get
|
|
_get.side_effect = CloudError(Mock(status=404), err_msg)
|
|
|
|
exists = provider.populate(Zone('unit3.test.', []))
|
|
self.assertFalse(exists)
|
|
|
|
self.assertEquals(len(zone.records), 0)
|