1
0
mirror of https://github.com/github/octodns.git synced 2024-05-11 05:55:00 +00:00
Files
github-octodns/tests/test_octodns_provider_azuredns.py
2021-07-04 04:55:18 -07:00

2343 lines
88 KiB
Python

#
#
#
from __future__ import absolute_import, division, print_function, \
unicode_literals
from octodns.record import Create, Update, Delete, Record
from octodns.provider.azuredns import _AzureRecord, AzureProvider, \
_check_endswith_dot, _parse_azure_type, _root_traffic_manager_name, \
_get_monitor, _profile_is_match, AzureException
from octodns.zone import Zone
from octodns.provider.base import Plan
from azure.mgmt.dns.models import ARecord, AaaaRecord, CaaRecord, \
CnameRecord, MxRecord, SrvRecord, NsRecord, PtrRecord, TxtRecord, \
RecordSet, SoaRecord, SubResource, Zone as AzureZone
from azure.mgmt.trafficmanager.models import Profile, DnsConfig, \
MonitorConfig, Endpoint, MonitorConfigCustomHeadersItem
from msrestazure.azure_exceptions import CloudError
from six import text_type
from unittest import TestCase
from mock import Mock, patch, call
zone = Zone(name='unit.tests.', sub_zones=[])
octo_records = []
octo_records.append(Record.new(zone, '', {
'ttl': 0,
'type': 'A',
'values': ['1.2.3.4', '10.10.10.10']}))
octo_records.append(Record.new(zone, 'a', {
'ttl': 1,
'type': 'A',
'values': ['1.2.3.4', '1.1.1.1']}))
octo_records.append(Record.new(zone, 'aa', {
'ttl': 9001,
'type': 'A',
'values': ['1.2.4.3']}))
octo_records.append(Record.new(zone, 'aaa', {
'ttl': 2,
'type': 'A',
'values': ['1.1.1.3']}))
octo_records.append(Record.new(zone, 'aaaa1', {
'ttl': 300,
'type': 'AAAA',
'values': ['2601:644:500:e210:62f8:1dff:feb8:947a',
'2601:642:500:e210:62f8:1dff:feb8:947a'],
}))
octo_records.append(Record.new(zone, 'aaaa2', {
'ttl': 300,
'type': 'AAAA',
'value': '2601:644:500:e210:62f8:1dff:feb8:947a'
}))
octo_records.append(Record.new(zone, 'caa1', {
'ttl': 9,
'type': 'CAA',
'value': {
'flags': 0,
'tag': 'issue',
'value': 'ca.unit.tests',
}}))
octo_records.append(Record.new(zone, 'caa2', {
'ttl': 9,
'type': 'CAA',
'values': [{
'flags': 0,
'tag': 'issue',
'value': 'ca1.unit.tests',
}, {
'flags': 0,
'tag': 'issue',
'value': 'ca2.unit.tests',
}]}))
octo_records.append(Record.new(zone, 'cname', {
'ttl': 3,
'type': 'CNAME',
'value': 'a.unit.tests.'}))
octo_records.append(Record.new(zone, 'mx1', {
'ttl': 3,
'type': 'MX',
'values': [{
'priority': 10,
'value': 'mx1.unit.tests.',
}, {
'priority': 20,
'value': 'mx2.unit.tests.',
}]}))
octo_records.append(Record.new(zone, 'mx2', {
'ttl': 3,
'type': 'MX',
'values': [{
'priority': 10,
'value': 'mx1.unit.tests.',
}]}))
octo_records.append(Record.new(zone, '', {
'ttl': 4,
'type': 'NS',
'values': ['ns1.unit.tests.', 'ns2.unit.tests.']}))
octo_records.append(Record.new(zone, 'foo', {
'ttl': 5,
'type': 'NS',
'value': 'ns1.unit.tests.'}))
octo_records.append(Record.new(zone, 'ptr1', {
'ttl': 5,
'type': 'PTR',
'value': 'ptr1.unit.tests.'}))
octo_records.append(Record.new(zone, '_srv._tcp', {
'ttl': 6,
'type': 'SRV',
'values': [{
'priority': 10,
'weight': 20,
'port': 30,
'target': 'foo-1.unit.tests.',
}, {
'priority': 12,
'weight': 30,
'port': 30,
'target': 'foo-2.unit.tests.',
}]}))
octo_records.append(Record.new(zone, '_srv2._tcp', {
'ttl': 7,
'type': 'SRV',
'values': [{
'priority': 12,
'weight': 17,
'port': 1,
'target': 'srvfoo.unit.tests.',
}]}))
octo_records.append(Record.new(zone, 'txt1', {
'ttl': 8,
'type': 'TXT',
'value': 'txt singleton test'}))
octo_records.append(Record.new(zone, 'txt2', {
'ttl': 9,
'type': 'TXT',
'values': ['txt multiple test', 'txt multiple test 2']}))
long_txt = "v=spf1 ip4:10.10.0.0/24 ip4:10.10.1.0/24 ip4:10.10.2.0/24"
long_txt += " ip4:10.10.3.0/24 ip4:10.10.4.0/24 ip4:10.10.5.0/24 "
long_txt += " 10.6.0/24 ip4:10.10.7.0/24 ip4:10.10.8.0/24 "
long_txt += " ip4:10.10.10.0/24 ip4:10.10.11.0/24 ip4:10.10.12.0/24"
long_txt += " ip4:10.10.13.0/24 ip4:10.10.14.0/24 ip4:10.10.15.0/24"
long_txt += " ip4:10.10.16.0/24 ip4:10.10.17.0/24 ip4:10.10.18.0/24"
long_txt += " ip4:10.10.19.0/24 ip4:10.10.20.0/24 ~all"
octo_records.append(Record.new(zone, 'txt3', {
'ttl': 10,
'type': 'TXT',
'values': ['txt multiple test', long_txt]}))
azure_records = []
_base0 = _AzureRecord('TestAzure', octo_records[0])
_base0.zone_name = 'unit.tests'
_base0.relative_record_set_name = '@'
_base0.record_type = 'A'
_base0.params['ttl'] = 0
_base0.params['a_records'] = [ARecord(ipv4_address='1.2.3.4'),
ARecord(ipv4_address='10.10.10.10')]
azure_records.append(_base0)
_base1 = _AzureRecord('TestAzure', octo_records[1])
_base1.zone_name = 'unit.tests'
_base1.relative_record_set_name = 'a'
_base1.record_type = 'A'
_base1.params['ttl'] = 1
_base1.params['a_records'] = [ARecord(ipv4_address='1.2.3.4'),
ARecord(ipv4_address='1.1.1.1')]
azure_records.append(_base1)
_base2 = _AzureRecord('TestAzure', octo_records[2])
_base2.zone_name = 'unit.tests'
_base2.relative_record_set_name = 'aa'
_base2.record_type = 'A'
_base2.params['ttl'] = 9001
_base2.params['a_records'] = ARecord(ipv4_address='1.2.4.3')
azure_records.append(_base2)
_base3 = _AzureRecord('TestAzure', octo_records[3])
_base3.zone_name = 'unit.tests'
_base3.relative_record_set_name = 'aaa'
_base3.record_type = 'A'
_base3.params['ttl'] = 2
_base3.params['a_records'] = ARecord(ipv4_address='1.1.1.3')
azure_records.append(_base3)
_base4 = _AzureRecord('TestAzure', octo_records[4])
_base4.zone_name = 'unit.tests'
_base4.relative_record_set_name = 'aaaa1'
_base4.record_type = 'AAAA'
_base4.params['ttl'] = 300
aaaa1 = AaaaRecord(ipv6_address='2601:644:500:e210:62f8:1dff:feb8:947a')
aaaa2 = AaaaRecord(ipv6_address='2601:642:500:e210:62f8:1dff:feb8:947a')
_base4.params['aaaa_records'] = [aaaa1, aaaa2]
azure_records.append(_base4)
_base5 = _AzureRecord('TestAzure', octo_records[5])
_base5.zone_name = 'unit.tests'
_base5.relative_record_set_name = 'aaaa2'
_base5.record_type = 'AAAA'
_base5.params['ttl'] = 300
_base5.params['aaaa_records'] = [aaaa1]
azure_records.append(_base5)
_base6 = _AzureRecord('TestAzure', octo_records[6])
_base6.zone_name = 'unit.tests'
_base6.relative_record_set_name = 'caa1'
_base6.record_type = 'CAA'
_base6.params['ttl'] = 9
_base6.params['caa_records'] = [CaaRecord(flags=0,
tag='issue',
value='ca.unit.tests')]
azure_records.append(_base6)
_base7 = _AzureRecord('TestAzure', octo_records[7])
_base7.zone_name = 'unit.tests'
_base7.relative_record_set_name = 'caa2'
_base7.record_type = 'CAA'
_base7.params['ttl'] = 9
_base7.params['caa_records'] = [CaaRecord(flags=0,
tag='issue',
value='ca1.unit.tests'),
CaaRecord(flags=0,
tag='issue',
value='ca2.unit.tests')]
azure_records.append(_base7)
_base8 = _AzureRecord('TestAzure', octo_records[8])
_base8.zone_name = 'unit.tests'
_base8.relative_record_set_name = 'cname'
_base8.record_type = 'CNAME'
_base8.params['ttl'] = 3
_base8.params['cname_record'] = CnameRecord(cname='a.unit.tests.')
azure_records.append(_base8)
_base9 = _AzureRecord('TestAzure', octo_records[9])
_base9.zone_name = 'unit.tests'
_base9.relative_record_set_name = 'mx1'
_base9.record_type = 'MX'
_base9.params['ttl'] = 3
_base9.params['mx_records'] = [MxRecord(preference=10,
exchange='mx1.unit.tests.'),
MxRecord(preference=20,
exchange='mx2.unit.tests.')]
azure_records.append(_base9)
_base10 = _AzureRecord('TestAzure', octo_records[10])
_base10.zone_name = 'unit.tests'
_base10.relative_record_set_name = 'mx2'
_base10.record_type = 'MX'
_base10.params['ttl'] = 3
_base10.params['mx_records'] = [MxRecord(preference=10,
exchange='mx1.unit.tests.')]
azure_records.append(_base10)
_base11 = _AzureRecord('TestAzure', octo_records[11])
_base11.zone_name = 'unit.tests'
_base11.relative_record_set_name = '@'
_base11.record_type = 'NS'
_base11.params['ttl'] = 4
_base11.params['ns_records'] = [NsRecord(nsdname='ns1.unit.tests.'),
NsRecord(nsdname='ns2.unit.tests.')]
azure_records.append(_base11)
_base12 = _AzureRecord('TestAzure', octo_records[12])
_base12.zone_name = 'unit.tests'
_base12.relative_record_set_name = 'foo'
_base12.record_type = 'NS'
_base12.params['ttl'] = 5
_base12.params['ns_records'] = [NsRecord(nsdname='ns1.unit.tests.')]
azure_records.append(_base12)
_base13 = _AzureRecord('TestAzure', octo_records[13])
_base13.zone_name = 'unit.tests'
_base13.relative_record_set_name = 'ptr1'
_base13.record_type = 'PTR'
_base13.params['ttl'] = 5
_base13.params['ptr_records'] = [PtrRecord(ptrdname='ptr1.unit.tests.')]
azure_records.append(_base13)
_base14 = _AzureRecord('TestAzure', octo_records[14])
_base14.zone_name = 'unit.tests'
_base14.relative_record_set_name = '_srv._tcp'
_base14.record_type = 'SRV'
_base14.params['ttl'] = 6
_base14.params['srv_records'] = [SrvRecord(priority=10,
weight=20,
port=30,
target='foo-1.unit.tests.'),
SrvRecord(priority=12,
weight=30,
port=30,
target='foo-2.unit.tests.')]
azure_records.append(_base14)
_base15 = _AzureRecord('TestAzure', octo_records[15])
_base15.zone_name = 'unit.tests'
_base15.relative_record_set_name = '_srv2._tcp'
_base15.record_type = 'SRV'
_base15.params['ttl'] = 7
_base15.params['srv_records'] = [SrvRecord(priority=12,
weight=17,
port=1,
target='srvfoo.unit.tests.')]
azure_records.append(_base15)
_base16 = _AzureRecord('TestAzure', octo_records[16])
_base16.zone_name = 'unit.tests'
_base16.relative_record_set_name = 'txt1'
_base16.record_type = 'TXT'
_base16.params['ttl'] = 8
_base16.params['txt_records'] = [TxtRecord(value=['txt singleton test'])]
azure_records.append(_base16)
_base17 = _AzureRecord('TestAzure', octo_records[17])
_base17.zone_name = 'unit.tests'
_base17.relative_record_set_name = 'txt2'
_base17.record_type = 'TXT'
_base17.params['ttl'] = 9
_base17.params['txt_records'] = [TxtRecord(value=['txt multiple test']),
TxtRecord(value=['txt multiple test 2'])]
azure_records.append(_base17)
long_txt_az1 = "v=spf1 ip4:10.10.0.0/24 ip4:10.10.1.0/24 ip4:10.10.2.0/24"
long_txt_az1 += " ip4:10.10.3.0/24 ip4:10.10.4.0/24 ip4:10.10.5.0/24 "
long_txt_az1 += " 10.6.0/24 ip4:10.10.7.0/24 ip4:10.10.8.0/24 "
long_txt_az1 += " ip4:10.10.10.0/24 ip4:10.10.11.0/24 ip4:10.10.12.0/24"
long_txt_az1 += " ip4:10.10.13.0/24 ip4:10.10.14.0/24 ip4:10.10."
long_txt_az2 = "15.0/24 ip4:10.10.16.0/24 ip4:10.10.17.0/24 ip4:10.10.18.0/24"
long_txt_az2 += " ip4:10.10.19.0/24 ip4:10.10.20.0/24 ~all"
_base18 = _AzureRecord('TestAzure', octo_records[18])
_base18.zone_name = 'unit.tests'
_base18.relative_record_set_name = 'txt3'
_base18.record_type = 'TXT'
_base18.params['ttl'] = 10
_base18.params['txt_records'] = [TxtRecord(value=['txt multiple test']),
TxtRecord(value=[long_txt_az1, long_txt_az2])]
azure_records.append(_base18)
class Test_AzureRecord(TestCase):
def test_azure_record(self):
assert(len(azure_records) == len(octo_records))
for i in range(len(azure_records)):
octo = _AzureRecord('TestAzure', octo_records[i])
assert(azure_records[i]._equals(octo))
class Test_DynamicAzureRecord(TestCase):
def test_azure_record(self):
tm_profile = Profile()
data = {
'ttl': 60,
'type': 'CNAME',
'value': 'default.unit.tests.',
'dynamic': {
'pools': {
'one': {
'values': [
{'value': 'one.unit.tests.', 'weight': 1}
],
'fallback': 'two',
},
'two': {
'values': [
{'value': 'two.unit.tests.', 'weight': 1}
],
},
},
'rules': [
{'geos': ['AF'], 'pool': 'one'},
{'pool': 'two'},
],
}
}
octo_record = Record.new(zone, 'foo', data)
azure_record = _AzureRecord('TestAzure', octo_record,
traffic_manager=tm_profile)
self.assertEqual(azure_record.zone_name, zone.name[:-1])
self.assertEqual(azure_record.relative_record_set_name, 'foo')
self.assertEqual(azure_record.record_type, 'CNAME')
self.assertEqual(azure_record.params['ttl'], 60)
self.assertEqual(azure_record.params['target_resource'], tm_profile)
class Test_ParseAzureType(TestCase):
def test_parse_azure_type(self):
for expected, test in [['A', 'Microsoft.Network/dnszones/A'],
['AAAA', 'Microsoft.Network/dnszones/AAAA'],
['NS', 'Microsoft.Network/dnszones/NS'],
['MX', 'Microsoft.Network/dnszones/MX']]:
self.assertEquals(expected, _parse_azure_type(test))
class Test_CheckEndswithDot(TestCase):
def test_check_endswith_dot(self):
for expected, test in [['a.', 'a'],
['a.', 'a.'],
['foo.bar.', 'foo.bar.'],
['foo.bar.', 'foo.bar']]:
self.assertEquals(expected, _check_endswith_dot(test))
class Test_RootTrafficManagerName(TestCase):
def test_root_traffic_manager_name(self):
test = Record.new(zone, 'foo', data={
'ttl': 60, 'type': 'CNAME', 'value': 'default.unit.tests.',
})
self.assertEqual(_root_traffic_manager_name(test), 'foo--unit--tests')
class Test_GetMonitor(TestCase):
def test_get_monitor(self):
record = Record.new(zone, 'foo', data={
'type': 'CNAME', 'ttl': 60, 'value': 'default.unit.tests.',
'octodns': {
'healthcheck': {
'path': '/_ping',
'port': 4443,
'protocol': 'HTTPS',
}
},
})
monitor = _get_monitor(record)
self.assertEqual(monitor.protocol, 'HTTPS')
self.assertEqual(monitor.port, 4443)
self.assertEqual(monitor.path, '/_ping')
headers = monitor.custom_headers
self.assertIsInstance(headers, list)
self.assertEquals(len(headers), 1)
headers = headers[0]
self.assertEqual(headers.name, 'Host')
self.assertEqual(headers.value, record.healthcheck_host())
# test TCP monitor
record._octodns['healthcheck']['protocol'] = 'TCP'
monitor = _get_monitor(record)
self.assertEqual(monitor.protocol, 'TCP')
self.assertIsNone(monitor.custom_headers)
class Test_ProfileIsMatch(TestCase):
def test_profile_is_match(self):
is_match = _profile_is_match
self.assertFalse(is_match(None, Profile()))
# Profile object builder with default property values that can be
# overridden for testing below
def profile(
name = 'foo-unit-tests',
ttl = 60,
method = 'Geographic',
dns_name = None,
monitor_proto = 'HTTPS',
monitor_port = 4443,
monitor_path = '/_ping',
endpoints = 1,
endpoint_name = 'name',
endpoint_type = 'profile/nestedEndpoints',
target = 'target.unit.tests',
target_id = 'resource/id',
geos = ['GEO-AF'],
weight = 1,
priority = 1,
):
dns = DnsConfig(relative_name=(dns_name or name), ttl=ttl)
return Profile(
name=name, traffic_routing_method=method, dns_config=dns,
monitor_config=MonitorConfig(
protocol=monitor_proto,
port=monitor_port,
path=monitor_path,
),
endpoints=[Endpoint(
name=endpoint_name,
type=endpoint_type,
target=target,
target_resource_id=target_id,
geo_mapping=geos,
weight=weight,
priority=priority,
)] + [Endpoint()] * (endpoints - 1),
)
self.assertTrue(is_match(profile(), profile()))
self.assertFalse(is_match(profile(), profile(name='two')))
self.assertFalse(is_match(profile(), profile(endpoints=2)))
self.assertFalse(is_match(profile(), profile(dns_name='two')))
self.assertFalse(is_match(profile(), profile(monitor_proto='HTTP')))
self.assertFalse(is_match(profile(), profile(endpoint_name='a')))
self.assertFalse(is_match(profile(), profile(endpoint_type='b')))
self.assertFalse(
is_match(profile(endpoint_type='b'), profile(endpoint_type='b'))
)
self.assertFalse(is_match(profile(), profile(target_id='rsrc/id2')))
self.assertFalse(is_match(profile(), profile(geos=['IN'])))
def wprofile(**kwargs):
kwargs['method'] = 'Weighted'
kwargs['endpoint_type'] = 'profile/externalEndpoints'
return profile(**kwargs)
self.assertFalse(is_match(wprofile(), wprofile(target='bar.unit')))
self.assertFalse(is_match(wprofile(), wprofile(weight=3)))
class TestAzureDnsProvider(TestCase):
def _provider(self):
return self._get_provider('mock_spc', 'mock_dns_client')
@patch('octodns.provider.azuredns.TrafficManagerManagementClient')
@patch('octodns.provider.azuredns.DnsManagementClient')
@patch('octodns.provider.azuredns.ClientSecretCredential')
@patch('octodns.provider.azuredns.ServicePrincipalCredentials')
def _get_provider(self, mock_spc, mock_css, mock_dns_client,
mock_tm_client):
'''Returns a mock AzureProvider object to use in testing.
:param mock_spc: placeholder
:type mock_spc: str
:param mock_dns_client: placeholder
:type mock_dns_client: str
:param mock_tm_client: placeholder
:type mock_tm_client: str
:type return: AzureProvider
'''
provider = AzureProvider('mock_id', 'mock_client', 'mock_key',
'mock_directory', 'mock_sub', 'mock_rg'
)
# Fetch the client to force it to load the creds
provider._dns_client
# set critical functions to return properly
tm_list = provider._tm_client.profiles.list_by_resource_group
tm_list.return_value = []
tm_sync = provider._tm_client.profiles.create_or_update
def side_effect(rg, name, profile):
return Profile(
id=profile.id,
name=profile.name,
traffic_routing_method=profile.traffic_routing_method,
dns_config=profile.dns_config,
monitor_config=profile.monitor_config,
endpoints=profile.endpoints,
)
tm_sync.side_effect = side_effect
return provider
def _get_dynamic_record(self, zone):
return Record.new(zone, 'foo', data={
'type': 'CNAME',
'ttl': 60,
'value': 'default.unit.tests.',
'dynamic': {
'pools': {
'one': {
'values': [
{'value': 'one.unit.tests.', 'weight': 1},
],
'fallback': 'two',
},
'two': {
'values': [
{'value': 'two1.unit.tests.', 'weight': 3},
{'value': 'two2.unit.tests.', 'weight': 4},
],
'fallback': 'three',
},
'three': {
'values': [
{'value': 'three.unit.tests.', 'weight': 1},
],
},
},
'rules': [
{'geos': ['AF', 'EU-DE', 'NA-US-CA', 'OC'], 'pool': 'one'},
{'pool': 'two'},
],
},
'octodns': {
'healthcheck': {
'path': '/_ping',
'port': 4443,
'protocol': 'HTTPS',
}
},
})
def _get_tm_profiles(self, provider):
sub = provider._dns_client_subscription_id
rg = provider._resource_group
base_id = '/subscriptions/' + sub + \
'/resourceGroups/' + rg + \
'/providers/Microsoft.Network/trafficManagerProfiles/'
prefix = 'foo--unit--tests'
name_format = prefix + '-{}'
id_format = base_id + name_format
header = MonitorConfigCustomHeadersItem(name='Host',
value='foo.unit.tests')
monitor = MonitorConfig(protocol='HTTPS', port=4443, path='/_ping',
custom_headers=[header])
external = 'Microsoft.Network/trafficManagerProfiles/externalEndpoints'
nested = 'Microsoft.Network/trafficManagerProfiles/nestedEndpoints'
profiles = [
Profile(
id=id_format.format('pool-two'),
name=name_format.format('pool-two'),
traffic_routing_method='Weighted',
dns_config=DnsConfig(ttl=60),
monitor_config=monitor,
endpoints=[
Endpoint(
name='two--two1.unit.tests',
type=external,
target='two1.unit.tests',
weight=3,
),
Endpoint(
name='two--two2.unit.tests',
type=external,
target='two2.unit.tests',
weight=4,
),
],
),
Profile(
id=id_format.format('rule-one'),
name=name_format.format('rule-one'),
traffic_routing_method='Priority',
dns_config=DnsConfig(ttl=60),
monitor_config=monitor,
endpoints=[
Endpoint(
name='one',
type=external,
target='one.unit.tests',
priority=1,
),
Endpoint(
name='two',
type=nested,
target_resource_id=id_format.format('pool-two'),
priority=2,
),
Endpoint(
name='three',
type=external,
target='three.unit.tests',
priority=3,
),
Endpoint(
name='--default--',
type=external,
target='default.unit.tests',
priority=4,
),
],
),
Profile(
id=id_format.format('rule-two'),
name=name_format.format('rule-two'),
traffic_routing_method='Priority',
dns_config=DnsConfig(ttl=60),
monitor_config=monitor,
endpoints=[
Endpoint(
name='two',
type=nested,
target_resource_id=id_format.format('pool-two'),
priority=1,
),
Endpoint(
name='three',
type=external,
target='three.unit.tests',
priority=2,
),
Endpoint(
name='--default--',
type=external,
target='default.unit.tests',
priority=3,
),
],
),
Profile(
id=base_id + prefix,
name=prefix,
traffic_routing_method='Geographic',
dns_config=DnsConfig(ttl=60),
monitor_config=monitor,
endpoints=[
Endpoint(
geo_mapping=['GEO-AF', 'DE', 'US-CA', 'GEO-AP'],
name='one',
type=nested,
target_resource_id=id_format.format('rule-one'),
),
Endpoint(
geo_mapping=['WORLD'],
name='two',
type=nested,
target_resource_id=id_format.format('rule-two'),
),
],
),
]
for profile in profiles:
profile.dns_config.relative_name = profile.name
return profiles
def _get_dynamic_package(self):
'''Convenience function to setup a sample dynamic record.
'''
provider = self._get_provider()
# setup traffic manager profiles
tm_list = provider._tm_client.profiles.list_by_resource_group
tm_list.return_value = self._get_tm_profiles(provider)
# setup zone with dynamic record
zone = Zone(name='unit.tests.', sub_zones=[])
record = self._get_dynamic_record(zone)
zone.add_record(record)
# return everything
return provider, zone, record
def test_populate_records(self):
provider = self._get_provider()
rs = []
recordSet = RecordSet(a_records=[ARecord(ipv4_address='1.1.1.1')])
recordSet.name, recordSet.ttl, recordSet.type = 'a1', 0, 'A'
recordSet.target_resource = SubResource()
rs.append(recordSet)
recordSet = RecordSet(a_records=[ARecord(ipv4_address='1.1.1.1'),
ARecord(ipv4_address='2.2.2.2')])
recordSet.name, recordSet.ttl, recordSet.type = 'a2', 1, 'A'
recordSet.target_resource = SubResource()
rs.append(recordSet)
aaaa1 = AaaaRecord(ipv6_address='1:1ec:1::1')
recordSet = RecordSet(aaaa_records=[aaaa1])
recordSet.name, recordSet.ttl, recordSet.type = 'aaaa1', 2, 'AAAA'
recordSet.target_resource = SubResource()
rs.append(recordSet)
aaaa2 = AaaaRecord(ipv6_address='1:1ec:1::2')
recordSet = RecordSet(aaaa_records=[aaaa1,
aaaa2])
recordSet.name, recordSet.ttl, recordSet.type = 'aaaa2', 3, 'AAAA'
recordSet.target_resource = SubResource()
rs.append(recordSet)
recordSet = RecordSet(caa_records=[CaaRecord(flags=0,
tag='issue',
value='caa1.unit.tests')])
recordSet.name, recordSet.ttl, recordSet.type = 'caa1', 4, 'CAA'
rs.append(recordSet)
recordSet = RecordSet(caa_records=[CaaRecord(flags=0,
tag='issue',
value='caa1.unit.tests'),
CaaRecord(flags=0,
tag='issue',
value='caa2.unit.tests')])
recordSet.name, recordSet.ttl, recordSet.type = 'caa2', 4, 'CAA'
rs.append(recordSet)
cname1 = CnameRecord(cname='cname.unit.test.')
recordSet = RecordSet(cname_record=cname1)
recordSet.name, recordSet.ttl, recordSet.type = 'cname1', 5, 'CNAME'
recordSet.target_resource = SubResource()
rs.append(recordSet)
recordSet = RecordSet(mx_records=[MxRecord(preference=10,
exchange='mx1.unit.test.')])
recordSet.name, recordSet.ttl, recordSet.type = 'mx1', 7, 'MX'
rs.append(recordSet)
recordSet = RecordSet(mx_records=[MxRecord(preference=10,
exchange='mx1.unit.test.'),
MxRecord(preference=11,
exchange='mx2.unit.test.')])
recordSet.name, recordSet.ttl, recordSet.type = 'mx2', 8, 'MX'
rs.append(recordSet)
recordSet = RecordSet(ns_records=[NsRecord(nsdname='ns1.unit.test.')])
recordSet.name, recordSet.ttl, recordSet.type = 'ns1', 9, 'NS'
rs.append(recordSet)
recordSet = RecordSet(ns_records=[NsRecord(nsdname='ns1.unit.test.'),
NsRecord(nsdname='ns2.unit.test.')])
recordSet.name, recordSet.ttl, recordSet.type = 'ns2', 10, 'NS'
rs.append(recordSet)
ptr1 = PtrRecord(ptrdname='ptr1.unit.test.')
recordSet = RecordSet(ptr_records=[ptr1])
recordSet.name, recordSet.ttl, recordSet.type = 'ptr1', 11, 'PTR'
rs.append(recordSet)
recordSet = RecordSet(srv_records=[SrvRecord(priority=1,
weight=2,
port=3,
target='1unit.tests.')])
recordSet.name, recordSet.ttl, recordSet.type = '_srv1._tcp', 13, 'SRV'
rs.append(recordSet)
recordSet = RecordSet(srv_records=[SrvRecord(priority=1,
weight=2,
port=3,
target='1unit.tests.'),
SrvRecord(priority=4,
weight=5,
port=6,
target='2unit.tests.')])
recordSet.name, recordSet.ttl, recordSet.type = '_srv2._tcp', 14, 'SRV'
rs.append(recordSet)
recordSet = RecordSet(txt_records=[TxtRecord(value='sample text1')])
recordSet.name, recordSet.ttl, recordSet.type = 'txt1', 15, 'TXT'
recordSet.target_resource = SubResource()
rs.append(recordSet)
recordSet = RecordSet(txt_records=[TxtRecord(value='sample text1'),
TxtRecord(value='sample text2')])
recordSet.name, recordSet.ttl, recordSet.type = 'txt2', 16, 'TXT'
recordSet.target_resource = SubResource()
rs.append(recordSet)
recordSet = RecordSet(soa_record=[SoaRecord()])
recordSet.name, recordSet.ttl, recordSet.type = '', 17, 'SOA'
rs.append(recordSet)
long_txt = "v=spf1 ip4:10.10.0.0/24 ip4:10.10.1.0/24 ip4:10.10.2.0/24"
long_txt += " ip4:10.10.3.0/24 ip4:10.10.4.0/24 ip4:10.10.5.0/24 "
long_txt += " 10.6.0/24 ip4:10.10.7.0/24 ip4:10.10.8.0/24 "
long_txt += " ip4:10.10.10.0/24 ip4:10.10.11.0/24 ip4:10.10.12.0/24"
long_txt += " ip4:10.10.13.0/24 ip4:10.10.14.0/24 ip4:10.10.15.0/24"
long_txt += " ip4:10.10.16.0/24 ip4:10.10.17.0/24 ip4:10.10.18.0/24"
long_txt += " ip4:10.10.19.0/24 ip4:10.10.20.0/24 ~all"
recordSet = RecordSet(txt_records=[TxtRecord(value='sample value1'),
TxtRecord(value=long_txt)])
recordSet.name, recordSet.ttl, recordSet.type = 'txt3', 18, 'TXT'
recordSet.target_resource = SubResource()
rs.append(recordSet)
record_list = provider._dns_client.record_sets.list_by_dns_zone
record_list.return_value = rs
zone_list = provider._dns_client.zones.list_by_resource_group
zone_list.return_value = [zone]
exists = provider.populate(zone)
self.assertEquals(len(zone.records), 17)
self.assertTrue(exists)
def test_populate_zone(self):
provider = self._get_provider()
zone_list = provider._dns_client.zones.list_by_resource_group
zone_1 = AzureZone(location='global')
# This is far from ideal but the
# zone constructor doesn't let me set it on creation
zone_1.name = "zone-1"
zone_2 = AzureZone(location='global')
# This is far from ideal but the
# zone constructor doesn't let me set it on creation
zone_2.name = "zone-2"
zone_list.return_value = [zone_1,
zone_2,
zone_1]
provider._populate_zones()
# This should be returning two zones since two zones are the same
self.assertEquals(len(provider._azure_zones), 2)
def test_bad_zone_response(self):
provider = self._get_provider()
_get = provider._dns_client.zones.get
_get.side_effect = CloudError(Mock(status=404), 'Azure Error')
self.assertEquals(
provider._check_zone('unit.test', create=False),
None
)
def test_extra_changes(self):
provider, existing, record = self._get_dynamic_package()
# test simple records produce no extra changes
desired = Zone(name=existing.name, sub_zones=[])
simple = Record.new(desired, 'simple', data={
'type': record._type,
'ttl': record.ttl,
'value': record.value,
})
desired.add_record(simple)
extra = provider._extra_changes(desired, desired, [Create(simple)])
self.assertEqual(len(extra), 0)
# test an unchanged dynamic record produces no extra changes
desired.add_record(record)
extra = provider._extra_changes(existing, desired, [])
self.assertEqual(len(extra), 0)
# test unused TM produces the extra change for clean up
sample_profile = self._get_tm_profiles(provider)[0]
tm_id = provider._profile_name_to_id
root_profile_name = _root_traffic_manager_name(record)
extra_profile = Profile(
id=tm_id('{}-pool-random'.format(root_profile_name)),
name='{}-pool-random'.format(root_profile_name),
traffic_routing_method='Weighted',
dns_config=sample_profile.dns_config,
monitor_config=sample_profile.monitor_config,
endpoints=sample_profile.endpoints,
)
tm_list = provider._tm_client.profiles.list_by_resource_group
tm_list.return_value.append(extra_profile)
provider._populate_traffic_managers()
extra = provider._extra_changes(existing, desired, [])
self.assertEqual(len(extra), 1)
extra = extra[0]
self.assertIsInstance(extra, Update)
self.assertEqual(extra.new, record)
desired._remove_record(record)
tm_list.return_value.pop()
# test new dynamic record does not produce an extra change for it
new_dynamic = Record.new(desired, record.name + '2', data={
'type': record._type,
'ttl': record.ttl,
'value': record.value,
'dynamic': record.dynamic._data(),
'octodns': record._octodns,
})
# test change in healthcheck by using a different port number
update_dynamic = Record.new(desired, record.name, data={
'type': record._type,
'ttl': record.ttl,
'value': record.value,
'dynamic': record.dynamic._data(),
'octodns': {
'healthcheck': {
'path': '/_ping',
'port': 443,
'protocol': 'HTTPS',
},
},
})
desired.add_record(new_dynamic)
desired.add_record(update_dynamic)
changes = [Create(new_dynamic)]
extra = provider._extra_changes(existing, desired, changes)
# implicitly asserts that new_dynamic was not added to extra changes
# as it was already in the `changes` list
self.assertEqual(len(extra), 1)
extra = extra[0]
self.assertIsInstance(extra, Update)
self.assertEqual(extra.new, update_dynamic)
# test dynamic record of unsupported type throws exception
unsupported_dynamic = Record.new(desired, record.name + '3', data={
'type': 'DNAME',
'ttl': record.ttl,
'value': 'default.unit.tests.',
'dynamic': {
'pools': {
'one': {'values': [{'value': 'one.unit.tests.'}]},
},
'rules': [
{'pool': 'one'},
],
},
})
desired.add_record(unsupported_dynamic)
changes = [Create(unsupported_dynamic)]
with self.assertRaises(AzureException) as ctx:
provider._extra_changes(existing, desired, changes)
self.assertTrue(text_type(ctx).endswith(
'must be of type CNAME'
))
desired._remove_record(unsupported_dynamic)
# test colliding ATM names throws exception
record1 = Record.new(desired, 'sub.www', data={
'type': record._type,
'ttl': record.ttl,
'value': record.value,
'dynamic': record.dynamic._data(),
})
record2 = Record.new(desired, 'sub--www', data={
'type': record._type,
'ttl': record.ttl,
'value': record.value,
'dynamic': record.dynamic._data(),
})
desired.add_record(record1)
desired.add_record(record2)
changes = [Create(record1), Create(record2)]
with self.assertRaises(AzureException) as ctx:
provider._extra_changes(existing, desired, changes)
self.assertTrue(text_type(ctx).startswith(
'Collision in Traffic Manager'
))
@patch(
'octodns.provider.azuredns.AzureProvider._generate_traffic_managers')
def test_extra_changes_non_last_fallback_contains_default(self, mock_gtm):
provider = self._get_provider()
desired = Zone(zone.name, sub_zones=[])
record = Record.new(desired, 'foo', {
'type': 'CNAME',
'ttl': 60,
'value': 'default.unit.tests.',
'dynamic': {
'pools': {
'one': {
'values': [{'value': 'one.unit.tests.'}],
'fallback': 'def',
},
'def': {
'values': [{'value': 'default.unit.tests.'}],
'fallback': 'two',
},
'two': {
'values': [{'value': 'two.unit.tests.'}],
},
},
'rules': [
{'pool': 'one'},
]
}
})
desired.add_record(record)
changes = [Create(record)]
# assert that no exception is raised
provider._extra_changes(zone, desired, changes)
# simulate duplicate endpoint and assert exception
endpoint = Endpoint(target='dup.unit.tests.')
mock_gtm.return_value = [Profile(
name='test-profile',
endpoints=[endpoint, endpoint],
)]
with self.assertRaises(AzureException) as ctx:
provider._extra_changes(zone, desired, changes)
self.assertTrue('duplicate endpoint' in text_type(ctx))
def test_extra_changes_A_multi_defaults(self):
provider = self._get_provider()
record = Record.new(zone, 'foo', data={
'type': 'A',
'ttl': 60,
'values': ['1.1.1.1', '8.8.8.8'],
'dynamic': {
'pools': {
'one': {
'values': [{'value': '1.1.1.1'}],
},
},
'rules': [
{'pool': 'one'},
],
}
})
# test that extra changes doesn't show any changes
desired = Zone(zone.name, sub_zones=[])
desired.add_record(record)
with self.assertRaises(AzureException) as ctx:
provider._extra_changes(zone, desired, [])
self.assertEqual('single value' in text_type(ctx))
def test_generate_tm_profile(self):
provider, zone, record = self._get_dynamic_package()
profile_gen = provider._generate_tm_profile
label = 'foobar'
routing = 'Priority'
endpoints = [
Endpoint(target='one.unit.tests'),
Endpoint(target_resource_id='/s/1/rg/foo/tm/foobar2'),
Endpoint(name='invalid'),
]
# invalid endpoint raises exception
with self.assertRaises(AzureException):
profile_gen(routing, endpoints, record, label)
# regular test
endpoints.pop()
profile = profile_gen(routing, endpoints, record, label)
# implicitly tests _profile_name_to_id
sub = provider._dns_client_subscription_id
rg = provider._resource_group
expected_name = 'foo--unit--tests-rule-foobar'
expected_id = '/subscriptions/' + sub + \
'/resourceGroups/' + rg + \
'/providers/Microsoft.Network/trafficManagerProfiles/' + \
expected_name
self.assertEqual(profile.id, expected_id)
self.assertEqual(profile.name, expected_name)
self.assertEqual(profile.name, profile.dns_config.relative_name)
self.assertEqual(profile.traffic_routing_method, routing)
self.assertEqual(profile.dns_config.ttl, record.ttl)
self.assertEqual(len(profile.endpoints), len(endpoints))
self.assertEqual(
profile.endpoints[0].type,
'Microsoft.Network/trafficManagerProfiles/externalEndpoints'
)
self.assertEqual(
profile.endpoints[1].type,
'Microsoft.Network/trafficManagerProfiles/nestedEndpoints'
)
def test_dynamic_record(self):
provider, zone, record = self._get_dynamic_package()
profiles = provider._generate_traffic_managers(record)
# check that every profile is a match with what we expect
expected_profiles = self._get_tm_profiles(provider)
self.assertEqual(len(expected_profiles), len(profiles))
for have, expected in zip(profiles, expected_profiles):
self.assertTrue(_profile_is_match(have, expected))
# check that dynamic record is populated back from profiles
azrecord = RecordSet(
ttl=60,
target_resource=SubResource(id=profiles[-1].id),
)
azrecord.name = record.name or '@'
azrecord.type = 'Microsoft.Network/dnszones/{}'.format(record._type)
record2 = provider._populate_record(zone, azrecord)
self.assertEqual(record2.dynamic._data(), record.dynamic._data())
# test that extra changes doesn't show any changes
desired = Zone(zone.name, sub_zones=[])
desired.add_record(record)
changes = provider._extra_changes(zone, desired, [])
self.assertEqual(len(changes), 0)
def test_generate_traffic_managers_middle_east(self):
# check Asia/Middle East test case
provider, zone, record = self._get_dynamic_package()
record.dynamic._data()['rules'][0]['geos'].append('AS')
profiles = provider._generate_traffic_managers(record)
self.assertIn('GEO-ME', profiles[-1].endpoints[0].geo_mapping)
self.assertIn('GEO-AS', profiles[-1].endpoints[0].geo_mapping)
def test_populate_dynamic_middle_east(self):
# Middle east without Asia raises exception
provider, zone, record = self._get_dynamic_package()
tm_suffix = _root_traffic_manager_name(record)
tm_id = provider._profile_name_to_id
tm_list = provider._tm_client.profiles.list_by_resource_group
tm_list.return_value = [
Profile(
id=tm_id(tm_suffix),
name=tm_suffix,
traffic_routing_method='Geographic',
endpoints=[
Endpoint(
geo_mapping=['GEO-ME'],
),
],
),
]
azrecord = RecordSet(
ttl=60,
target_resource=SubResource(id=tm_id(tm_suffix)),
)
azrecord.name = record.name or '@'
azrecord.type = 'Microsoft.Network/dnszones/{}'.format(record._type)
with self.assertRaises(AzureException) as ctx:
provider._populate_record(zone, azrecord)
self.assertTrue(text_type(ctx).startswith(
'Middle East (GEO-ME) is not supported'
))
# valid profiles with Middle East test case
provider, zone, record = self._get_dynamic_package()
geo_profile = provider._get_tm_for_dynamic_record(record)
geo_profile.endpoints[0].geo_mapping.extend(['GEO-ME', 'GEO-AS'])
record = provider._populate_record(zone, azrecord)
self.assertIn('AS', record.dynamic.rules[0].data['geos'])
self.assertNotIn('ME', record.dynamic.rules[0].data['geos'])
def test_dynamic_no_geo(self):
# test that traffic managers are generated as expected
provider = self._get_provider()
external = 'Microsoft.Network/trafficManagerProfiles/externalEndpoints'
record = Record.new(zone, 'foo', data={
'type': 'CNAME',
'ttl': 60,
'value': 'default.unit.tests.',
'dynamic': {
'pools': {
'one': {
'values': [
{'value': 'one.unit.tests.'},
],
},
},
'rules': [
{'pool': 'one'},
],
}
})
profiles = provider._generate_traffic_managers(record)
self.assertEqual(len(profiles), 1)
self.assertTrue(_profile_is_match(profiles[0], Profile(
name='foo--unit--tests',
traffic_routing_method='Priority',
dns_config=DnsConfig(
relative_name='foo--unit--tests', ttl=60),
monitor_config=_get_monitor(record),
endpoints=[
Endpoint(
name='one',
type=external,
target='one.unit.tests',
priority=1,
),
Endpoint(
name='--default--',
type=external,
target='default.unit.tests',
priority=2,
),
],
)))
# test that same record gets populated back from traffic managers
tm_list = provider._tm_client.profiles.list_by_resource_group
tm_list.return_value = profiles
azrecord = RecordSet(
ttl=60,
target_resource=SubResource(id=profiles[-1].id),
)
azrecord.name = record.name or '@'
azrecord.type = 'Microsoft.Network/dnszones/{}'.format(record._type)
record2 = provider._populate_record(zone, azrecord)
self.assertEqual(record2.dynamic._data(), record.dynamic._data())
# test that extra changes doesn't show any changes
desired = Zone(zone.name, sub_zones=[])
desired.add_record(record)
changes = provider._extra_changes(zone, desired, [])
self.assertEqual(len(changes), 0)
def test_dynamic_fallback_is_default(self):
# test that traffic managers are generated as expected
provider = self._get_provider()
external = 'Microsoft.Network/trafficManagerProfiles/externalEndpoints'
record = Record.new(zone, 'foo', data={
'type': 'CNAME',
'ttl': 60,
'value': 'default.unit.tests.',
'dynamic': {
'pools': {
'def': {
'values': [
{'value': 'default.unit.tests.'},
],
},
},
'rules': [
{'geos': ['AF'], 'pool': 'def'},
],
}
})
profiles = provider._generate_traffic_managers(record)
self.assertEqual(len(profiles), 1)
self.assertTrue(_profile_is_match(profiles[0], Profile(
name='foo--unit--tests',
traffic_routing_method='Geographic',
dns_config=DnsConfig(
relative_name='foo--unit--tests', ttl=60),
monitor_config=_get_monitor(record),
endpoints=[
Endpoint(
name='def--default--',
type=external,
target='default.unit.tests',
geo_mapping=['GEO-AF'],
),
],
)))
# test that same record gets populated back from traffic managers
tm_list = provider._tm_client.profiles.list_by_resource_group
tm_list.return_value = profiles
azrecord = RecordSet(
ttl=60,
target_resource=SubResource(id=profiles[-1].id),
)
azrecord.name = record.name or '@'
azrecord.type = 'Microsoft.Network/dnszones/{}'.format(record._type)
record2 = provider._populate_record(zone, azrecord)
self.assertEqual(record2.dynamic._data(), record.dynamic._data())
# test that extra changes doesn't show any changes
desired = Zone(zone.name, sub_zones=[])
desired.add_record(record)
changes = provider._extra_changes(zone, desired, [])
self.assertEqual(len(changes), 0)
def test_dynamic_pool_contains_default(self):
# test that traffic managers are generated as expected
provider = self._get_provider()
external = 'Microsoft.Network/trafficManagerProfiles/externalEndpoints'
nested = 'Microsoft.Network/trafficManagerProfiles/nestedEndpoints'
record = Record.new(zone, 'foo', data={
'type': 'CNAME',
'ttl': 60,
'value': 'default.unit.tests.',
'dynamic': {
'pools': {
'rr': {
'values': [
{'value': 'one.unit.tests.'},
{'value': 'two.unit.tests.'},
{'value': 'default.unit.tests.'},
{'value': 'final.unit.tests.'},
],
},
},
'rules': [
{'geos': ['AF'], 'pool': 'rr'},
],
}
})
profiles = provider._generate_traffic_managers(record)
self.assertEqual(len(profiles), 2)
self.assertTrue(_profile_is_match(profiles[0], Profile(
name='foo--unit--tests-pool-rr',
traffic_routing_method='Weighted',
dns_config=DnsConfig(
relative_name='foo--unit--tests-pool-rr', ttl=60),
monitor_config=_get_monitor(record),
endpoints=[
Endpoint(
name='rr--one.unit.tests',
type=external,
target='one.unit.tests',
weight=1,
),
Endpoint(
name='rr--two.unit.tests',
type=external,
target='two.unit.tests',
weight=1,
),
Endpoint(
name='rr--default.unit.tests--default--',
type=external,
target='default.unit.tests',
weight=1,
),
Endpoint(
name='rr--final.unit.tests',
type=external,
target='final.unit.tests',
weight=1,
),
],
)))
self.assertTrue(_profile_is_match(profiles[1], Profile(
name='foo--unit--tests',
traffic_routing_method='Geographic',
dns_config=DnsConfig(
relative_name='foo--unit--tests', ttl=60),
monitor_config=_get_monitor(record),
endpoints=[
Endpoint(
name='rr',
type=nested,
target_resource_id=profiles[0].id,
geo_mapping=['GEO-AF'],
),
],
)))
# test that same record gets populated back from traffic managers
tm_list = provider._tm_client.profiles.list_by_resource_group
tm_list.return_value = profiles
azrecord = RecordSet(
ttl=60,
target_resource=SubResource(id=profiles[-1].id),
)
azrecord.name = record.name or '@'
azrecord.type = 'Microsoft.Network/dnszones/{}'.format(record._type)
record2 = provider._populate_record(zone, azrecord)
self.assertEqual(record2.dynamic._data(), record.dynamic._data())
# test that extra changes doesn't show any changes
desired = Zone(zone.name, sub_zones=[])
desired.add_record(record)
changes = provider._extra_changes(zone, desired, [])
self.assertEqual(len(changes), 0)
def test_dynamic_pool_contains_default_no_geo(self):
# test that traffic managers are generated as expected
provider = self._get_provider()
external = 'Microsoft.Network/trafficManagerProfiles/externalEndpoints'
record = Record.new(zone, 'foo', data={
'type': 'CNAME',
'ttl': 60,
'value': 'default.unit.tests.',
'dynamic': {
'pools': {
'rr': {
'values': [
{'value': 'one.unit.tests.'},
{'value': 'two.unit.tests.'},
{'value': 'default.unit.tests.'},
{'value': 'final.unit.tests.'},
],
},
},
'rules': [
{'pool': 'rr'},
],
}
})
profiles = provider._generate_traffic_managers(record)
self.assertEqual(len(profiles), 1)
self.assertTrue(_profile_is_match(profiles[0], Profile(
name='foo--unit--tests',
traffic_routing_method='Weighted',
dns_config=DnsConfig(
relative_name='foo--unit--tests', ttl=60),
monitor_config=_get_monitor(record),
endpoints=[
Endpoint(
name='rr--one.unit.tests',
type=external,
target='one.unit.tests',
weight=1,
),
Endpoint(
name='rr--two.unit.tests',
type=external,
target='two.unit.tests',
weight=1,
),
Endpoint(
name='rr--default.unit.tests--default--',
type=external,
target='default.unit.tests',
weight=1,
),
Endpoint(
name='rr--final.unit.tests',
type=external,
target='final.unit.tests',
weight=1,
),
],
)))
# test that same record gets populated back from traffic managers
tm_list = provider._tm_client.profiles.list_by_resource_group
tm_list.return_value = profiles
azrecord = RecordSet(
ttl=60,
target_resource=SubResource(id=profiles[-1].id),
)
azrecord.name = record.name or '@'
azrecord.type = 'Microsoft.Network/dnszones/{}'.format(record._type)
record2 = provider._populate_record(zone, azrecord)
self.assertEqual(record2.dynamic._data(), record.dynamic._data())
# test that extra changes doesn't show any changes
desired = Zone(zone.name, sub_zones=[])
desired.add_record(record)
changes = provider._extra_changes(zone, desired, [])
self.assertEqual(len(changes), 0)
def test_dynamic_last_pool_contains_default_no_geo(self):
# test that traffic managers are generated as expected
provider = self._get_provider()
external = 'Microsoft.Network/trafficManagerProfiles/externalEndpoints'
nested = 'Microsoft.Network/trafficManagerProfiles/nestedEndpoints'
record = Record.new(zone, 'foo', data={
'type': 'CNAME',
'ttl': 60,
'value': 'default.unit.tests.',
'dynamic': {
'pools': {
'cloud': {
'values': [
{'value': 'cloud.unit.tests.'},
],
'fallback': 'rr',
},
'rr': {
'values': [
{'value': 'one.unit.tests.'},
{'value': 'two.unit.tests.'},
{'value': 'default.unit.tests.'},
{'value': 'final.unit.tests.'},
],
},
},
'rules': [
{'pool': 'cloud'},
],
}
})
profiles = provider._generate_traffic_managers(record)
self.assertEqual(len(profiles), 2)
self.assertTrue(_profile_is_match(profiles[0], Profile(
name='foo--unit--tests-pool-rr',
traffic_routing_method='Weighted',
dns_config=DnsConfig(
relative_name='foo--unit--tests-pool-rr', ttl=60),
monitor_config=_get_monitor(record),
endpoints=[
Endpoint(
name='rr--one.unit.tests',
type=external,
target='one.unit.tests',
weight=1,
),
Endpoint(
name='rr--two.unit.tests',
type=external,
target='two.unit.tests',
weight=1,
),
Endpoint(
name='rr--default.unit.tests--default--',
type=external,
target='default.unit.tests',
weight=1,
),
Endpoint(
name='rr--final.unit.tests',
type=external,
target='final.unit.tests',
weight=1,
),
],
)))
self.assertTrue(_profile_is_match(profiles[1], Profile(
name='foo--unit--tests',
traffic_routing_method='Priority',
dns_config=DnsConfig(
relative_name='foo--unit--tests', ttl=60),
monitor_config=_get_monitor(record),
endpoints=[
Endpoint(
name='cloud',
type=external,
target='cloud.unit.tests',
priority=1,
),
Endpoint(
name='rr',
type=nested,
target_resource_id=profiles[0].id,
priority=2,
),
],
)))
# test that same record gets populated back from traffic managers
tm_list = provider._tm_client.profiles.list_by_resource_group
tm_list.return_value = profiles
azrecord = RecordSet(
ttl=60,
target_resource=SubResource(id=profiles[-1].id),
)
azrecord.name = record.name or '@'
azrecord.type = 'Microsoft.Network/dnszones/{}'.format(record._type)
record2 = provider._populate_record(zone, azrecord)
self.assertEqual(record2.dynamic._data(), record.dynamic._data())
# test that extra changes doesn't show any changes
desired = Zone(zone.name, sub_zones=[])
desired.add_record(record)
changes = provider._extra_changes(zone, desired, [])
self.assertEqual(len(changes), 0)
def test_dynamic_unique_traffic_managers(self):
record = self._get_dynamic_record(zone)
data = {
'type': record._type,
'ttl': record.ttl,
'value': record.value,
'dynamic': record.dynamic._data()
}
record_names = [
'www.foo', 'www-foo'
]
provider = self._get_provider()
seen = set()
for name in record_names:
record = Record.new(zone, name, data=data)
tms = provider._generate_traffic_managers(record)
for tm in tms:
self.assertNotIn(tm.name, seen)
seen.add(tm.name)
def test_dynamic_reused_pool(self):
# test that traffic managers are generated as expected
provider = self._get_provider()
nested = 'Microsoft.Network/trafficManagerProfiles/nestedEndpoints'
record = Record.new(zone, 'foo', data={
'type': 'CNAME',
'ttl': 60,
'value': 'default.unit.tests.',
'dynamic': {
'pools': {
'sto': {
'values': [
{'value': 'sto.unit.tests.'},
],
'fallback': 'iad',
},
'iad': {
'values': [
{'value': 'iad.unit.tests.'},
],
'fallback': 'lhr',
},
'lhr': {
'values': [
{'value': 'lhr.unit.tests.'},
],
},
},
'rules': [
{'geos': ['EU'], 'pool': 'iad'},
{'geos': ['EU-GB'], 'pool': 'lhr'},
{'geos': ['EU-SE'], 'pool': 'sto'},
{'pool': 'lhr'},
],
}
})
profiles = provider._generate_traffic_managers(record)
self.assertEqual(len(profiles), 4)
self.assertTrue(_profile_is_match(profiles[-1], Profile(
name='foo--unit--tests',
traffic_routing_method='Geographic',
dns_config=DnsConfig(
relative_name='foo--unit--tests', ttl=record.ttl),
monitor_config=_get_monitor(record),
endpoints=[
Endpoint(
name='iad',
type=nested,
target_resource_id=profiles[0].id,
geo_mapping=['GEO-EU'],
),
Endpoint(
name='lhr',
type=nested,
target_resource_id=profiles[1].id,
geo_mapping=['GB', 'WORLD'],
),
Endpoint(
name='sto',
type=nested,
target_resource_id=profiles[2].id,
geo_mapping=['SE'],
),
],
)))
# test that same record gets populated back from traffic managers
tm_list = provider._tm_client.profiles.list_by_resource_group
tm_list.return_value = profiles
azrecord = RecordSet(
ttl=60,
target_resource=SubResource(id=profiles[-1].id),
)
azrecord.name = record.name or '@'
azrecord.type = 'Microsoft.Network/dnszones/{}'.format(record._type)
record2 = provider._populate_record(zone, azrecord)
self.assertEqual(record2.dynamic._data(), record.dynamic._data())
# test that extra changes doesn't show any changes
desired = Zone(zone.name, sub_zones=[])
desired.add_record(record)
changes = provider._extra_changes(zone, desired, [])
self.assertEqual(len(changes), 0)
def test_dynamic_A(self):
provider = self._get_provider()
external = 'Microsoft.Network/trafficManagerProfiles/externalEndpoints'
nested = 'Microsoft.Network/trafficManagerProfiles/nestedEndpoints'
record = Record.new(zone, 'foo', data={
'type': 'A',
'ttl': 60,
'values': ['9.9.9.9'],
'dynamic': {
'pools': {
'one': {
'values': [
{'value': '11.11.11.11'},
{'value': '12.12.12.12'},
],
'fallback': 'two'
},
'two': {
'values': [
{'value': '2.2.2.2'},
],
},
},
'rules': [
{'geos': ['AF'], 'pool': 'one'},
{'pool': 'two'},
],
}
})
profiles = provider._generate_traffic_managers(record)
self.assertEqual(len(profiles), 4)
self.assertTrue(_profile_is_match(profiles[0], Profile(
name='foo--unit--tests-A-pool-one',
traffic_routing_method='Weighted',
dns_config=DnsConfig(
relative_name='foo--unit--tests-a-pool-one', ttl=record.ttl),
monitor_config=_get_monitor(record),
endpoints=[
Endpoint(
name='one--11.11.11.11',
type=external,
target='11.11.11.11',
weight=1,
),
Endpoint(
name='one--12.12.12.12',
type=external,
target='12.12.12.12',
weight=1,
),
],
)))
self.assertTrue(_profile_is_match(profiles[1], Profile(
name='foo--unit--tests-A-rule-one',
traffic_routing_method='Priority',
dns_config=DnsConfig(
relative_name='foo--unit--tests-a-rule-one', ttl=record.ttl),
monitor_config=_get_monitor(record),
endpoints=[
Endpoint(
name='one',
type=nested,
target_resource_id=profiles[0].id,
priority=1,
),
Endpoint(
name='two',
type=external,
target='2.2.2.2',
priority=2,
),
Endpoint(
name='--default--',
type=external,
target='9.9.9.9',
priority=3,
),
],
)))
self.assertTrue(_profile_is_match(profiles[2], Profile(
name='foo--unit--tests-A-rule-two',
traffic_routing_method='Priority',
dns_config=DnsConfig(
relative_name='foo--unit--tests-a-rule-two', ttl=record.ttl),
monitor_config=_get_monitor(record),
endpoints=[
Endpoint(
name='two',
type=external,
target='2.2.2.2',
priority=1,
),
Endpoint(
name='--default--',
type=external,
target='9.9.9.9',
priority=2,
),
],
)))
self.assertTrue(_profile_is_match(profiles[3], Profile(
name='foo--unit--tests-A',
traffic_routing_method='Geographic',
dns_config=DnsConfig(
relative_name='foo--unit--tests-a', ttl=record.ttl),
monitor_config=_get_monitor(record),
endpoints=[
Endpoint(
name='one',
type=nested,
target_resource_id=profiles[1].id,
geo_mapping=['GEO-AF'],
),
Endpoint(
name='two',
type=nested,
target_resource_id=profiles[2].id,
geo_mapping=['WORLD'],
),
],
)))
# test that the record and ATM profile gets created
tm_sync = provider._tm_client.profiles.create_or_update
create = provider._dns_client.record_sets.create_or_update
provider._apply_Create(Create(record))
self.assertEqual(tm_sync.call_count, len(profiles) + 1)
create.assert_called_once()
# test broken alias
azrecord = RecordSet(
ttl=60, target_resource=SubResource(id=None))
azrecord.name = record.name or '@'
azrecord.type = 'Microsoft.Network/dnszones/{}'.format(record._type)
record2 = provider._populate_record(zone, azrecord)
self.assertEqual(record2.values, ['255.255.255.255'])
# test that same record gets populated back from traffic managers
tm_list = provider._tm_client.profiles.list_by_resource_group
tm_list.return_value = profiles
provider._populate_traffic_managers()
azrecord = RecordSet(
ttl=60,
target_resource=SubResource(id=profiles[-1].id),
)
azrecord.name = record.name or '@'
azrecord.type = 'Microsoft.Network/dnszones/{}'.format(record._type)
record2 = provider._populate_record(zone, azrecord)
self.assertEqual(record2.dynamic._data(), record.dynamic._data())
# test that extra changes doesn't show any changes
desired = Zone(zone.name, sub_zones=[])
desired.add_record(record)
changes = provider._extra_changes(zone, desired, [])
self.assertEqual(len(changes), 0)
def test_dynamic_AAAA(self):
provider = self._get_provider()
external = 'Microsoft.Network/trafficManagerProfiles/externalEndpoints'
record = Record.new(zone, 'foo', data={
'type': 'AAAA',
'ttl': 60,
'values': ['1::1'],
'dynamic': {
'pools': {
'one': {
'values': [
{'value': '1::1'},
],
},
},
'rules': [
{'pool': 'one'},
],
}
})
profiles = provider._generate_traffic_managers(record)
self.assertEqual(len(profiles), 1)
self.assertTrue(_profile_is_match(profiles[0], Profile(
name='foo--unit--tests-AAAA',
traffic_routing_method='Geographic',
dns_config=DnsConfig(
relative_name='foo--unit--tests-aaaa', ttl=record.ttl),
monitor_config=_get_monitor(record),
endpoints=[
Endpoint(
name='one--default--',
type=external,
target='1::1',
geo_mapping=['WORLD'],
),
],
)))
# test that the record and ATM profile gets created
tm_sync = provider._tm_client.profiles.create_or_update
create = provider._dns_client.record_sets.create_or_update
provider._apply_Create(Create(record))
# A dynamic record can only have 1 profile
tm_sync.assert_called_once()
create.assert_called_once()
# test broken alias
azrecord = RecordSet(
ttl=60, target_resource=SubResource(id=None))
azrecord.name = record.name or '@'
azrecord.type = 'Microsoft.Network/dnszones/{}'.format(record._type)
record2 = provider._populate_record(zone, azrecord)
self.assertEqual(record2.values, ['::1'])
# test that same record gets populated back from traffic managers
tm_list = provider._tm_client.profiles.list_by_resource_group
tm_list.return_value = profiles
azrecord = RecordSet(
ttl=60,
target_resource=SubResource(id=profiles[-1].id),
)
azrecord.name = record.name or '@'
azrecord.type = 'Microsoft.Network/dnszones/{}'.format(record._type)
record2 = provider._populate_record(zone, azrecord)
self.assertEqual(record2.dynamic._data(), record.dynamic._data())
# test that extra changes doesn't show any changes
desired = Zone(zone.name, sub_zones=[])
desired.add_record(record)
changes = provider._extra_changes(zone, desired, [])
self.assertEqual(len(changes), 0)
def test_sync_traffic_managers(self):
provider, zone, record = self._get_dynamic_package()
provider._populate_traffic_managers()
tm_sync = provider._tm_client.profiles.create_or_update
prefix = 'foo--unit--tests'
expected_seen = {
prefix, '{}-pool-two'.format(prefix),
'{}-rule-one'.format(prefix), '{}-rule-two'.format(prefix),
}
# test no change
profiles = provider._generate_traffic_managers(record)
seen = provider._sync_traffic_managers(profiles)
self.assertEqual(seen, expected_seen)
tm_sync.assert_not_called()
# test that changing weight causes update API call
dynamic = record.dynamic._data()
dynamic['pools']['two']['values'][0]['weight'] = 14
data = {
'type': 'CNAME',
'ttl': record.ttl,
'value': record.value,
'dynamic': dynamic,
'octodns': record._octodns,
}
new_record = Record.new(zone, record.name, data)
tm_sync.reset_mock()
profiles = provider._generate_traffic_managers(new_record)
seen2 = provider._sync_traffic_managers(profiles)
self.assertEqual(seen2, expected_seen)
tm_sync.assert_called_once()
# test that new profile was successfully inserted in cache
new_profile = provider._get_tm_profile_by_name(
'{}-pool-two'.format(prefix)
)
self.assertEqual(new_profile.endpoints[0].weight, 14)
def test_sync_traffic_managers_duplicate(self):
provider, zone, record = self._get_dynamic_package()
tm_sync = provider._tm_client.profiles.create_or_update
# change and duplicate profiles
profile = self._get_tm_profiles(provider)[0]
profile.name = 'changing_this_to_trigger_sync'
provider._sync_traffic_managers([profile, profile])
# it should only be called once for duplicate profiles
tm_sync.assert_called_once()
def test_find_traffic_managers(self):
provider, zone, record = self._get_dynamic_package()
# insert a non-matching profile
sample_profile = self._get_tm_profiles(provider)[0]
# dummy record for generating suffix
record2 = Record.new(zone, record.name + '2', data={
'type': record._type,
'ttl': record.ttl,
'value': record.value,
})
prefix2 = _root_traffic_manager_name(record2)
tm_id = provider._profile_name_to_id
extra_profile = Profile(
id=tm_id('{}-pool-random'.format(prefix2)),
name='{}-pool-random'.format(prefix2),
traffic_routing_method='Weighted',
dns_config=sample_profile.dns_config,
monitor_config=sample_profile.monitor_config,
endpoints=sample_profile.endpoints,
)
tm_list = provider._tm_client.profiles.list_by_resource_group
tm_list.return_value.append(extra_profile)
provider._populate_traffic_managers()
# implicitly asserts that non-matching profile is not included
prefix = _root_traffic_manager_name(record)
self.assertEqual(provider._find_traffic_managers(record), {
prefix, '{}-pool-two'.format(prefix),
'{}-rule-one'.format(prefix), '{}-rule-two'.format(prefix),
})
def test_traffic_manager_gc(self):
provider, zone, record = self._get_dynamic_package()
provider._populate_traffic_managers()
profiles = provider._find_traffic_managers(record)
profile_delete_mock = provider._tm_client.profiles.delete
provider._traffic_managers_gc(record, profiles)
profile_delete_mock.assert_not_called()
profile_delete_mock.reset_mock()
remove = list(profiles)[3]
profiles.discard(remove)
provider._traffic_managers_gc(record, profiles)
profile_delete_mock.assert_has_calls(
[call(provider._resource_group, remove)]
)
def test_apply(self):
provider = self._get_provider()
half = int(len(octo_records) / 2)
changes = [Create(r) for r in octo_records[:half]] + \
[Update(r, r) for r in octo_records[half:]]
deletes = [Delete(r) for r in octo_records]
self.assertEquals(19, provider.apply(Plan(None, zone,
changes, True)))
self.assertEquals(19, provider.apply(Plan(zone, zone,
deletes, True)))
def test_apply_create_dynamic(self):
provider = self._get_provider()
tm_list = provider._tm_client.profiles.list_by_resource_group
tm_list.return_value = []
tm_sync = provider._tm_client.profiles.create_or_update
record = self._get_dynamic_record(zone)
profiles = self._get_tm_profiles(provider)
provider._apply_Create(Create(record))
# create was called as many times as number of profiles required for
# the dynamic record
self.assertEqual(tm_sync.call_count, len(profiles))
create = provider._dns_client.record_sets.create_or_update
create.assert_called_once()
def test_apply_update_dynamic(self):
# existing is simple, new is dynamic
provider = self._get_provider()
tm_list = provider._tm_client.profiles.list_by_resource_group
tm_list.return_value = []
profiles = self._get_tm_profiles(provider)
dynamic_record = self._get_dynamic_record(zone)
simple_record = Record.new(zone, dynamic_record.name, data={
'type': 'CNAME',
'ttl': 3600,
'value': 'cname.unit.tests.',
})
change = Update(simple_record, dynamic_record)
provider._apply_Update(change)
tm_sync, dns_update, tm_delete = (
provider._tm_client.profiles.create_or_update,
provider._dns_client.record_sets.create_or_update,
provider._tm_client.profiles.delete
)
self.assertEqual(tm_sync.call_count, len(profiles))
dns_update.assert_called_once()
tm_delete.assert_not_called()
# existing is dynamic, new is simple
provider, existing, dynamic_record = self._get_dynamic_package()
profiles = self._get_tm_profiles(provider)
change = Update(dynamic_record, simple_record)
provider._apply_Update(change)
tm_sync, dns_update, tm_delete = (
provider._tm_client.profiles.create_or_update,
provider._dns_client.record_sets.create_or_update,
provider._tm_client.profiles.delete
)
tm_sync.assert_not_called()
dns_update.assert_called_once()
self.assertEqual(tm_delete.call_count, len(profiles))
# both are dynamic, healthcheck port is changed
provider, existing, dynamic_record = self._get_dynamic_package()
profiles = self._get_tm_profiles(provider)
dynamic_record2 = self._get_dynamic_record(existing)
dynamic_record2._octodns['healthcheck']['port'] += 1
change = Update(dynamic_record, dynamic_record2)
provider._apply_Update(change)
tm_sync, dns_update, tm_delete = (
provider._tm_client.profiles.create_or_update,
provider._dns_client.record_sets.create_or_update,
provider._tm_client.profiles.delete
)
self.assertEqual(tm_sync.call_count, len(profiles))
dns_update.assert_not_called()
tm_delete.assert_not_called()
# both are dynamic, extra profile should be deleted
provider, existing, dynamic_record = self._get_dynamic_package()
sample_profile = self._get_tm_profiles(provider)[0]
tm_id = provider._profile_name_to_id
root_profile_name = _root_traffic_manager_name(dynamic_record)
extra_profile = Profile(
id=tm_id('{}-pool-random'.format(root_profile_name)),
name='{}-pool-random'.format(root_profile_name),
traffic_routing_method='Weighted',
dns_config=sample_profile.dns_config,
monitor_config=sample_profile.monitor_config,
endpoints=sample_profile.endpoints,
)
tm_list = provider._tm_client.profiles.list_by_resource_group
tm_list.return_value.append(extra_profile)
change = Update(dynamic_record, dynamic_record)
provider._apply_Update(change)
tm_sync, dns_update, tm_delete = (
provider._tm_client.profiles.create_or_update,
provider._dns_client.record_sets.create_or_update,
provider._tm_client.profiles.delete
)
tm_sync.assert_not_called()
dns_update.assert_not_called()
tm_delete.assert_called_once()
# both are dynamic but alias is broken
provider, existing, record1 = self._get_dynamic_package()
azrecord = RecordSet(
ttl=record1.ttl, target_resource=SubResource(id=None))
azrecord.name = record1.name or '@'
azrecord.type = 'Microsoft.Network/dnszones/{}'.format(record1._type)
record2 = provider._populate_record(zone, azrecord)
self.assertEqual(record2.value, 'iam.invalid.')
change = Update(record2, record1)
provider._apply_Update(change)
tm_sync, dns_update, tm_delete = (
provider._tm_client.profiles.create_or_update,
provider._dns_client.record_sets.create_or_update,
provider._tm_client.profiles.delete
)
tm_sync.assert_not_called()
dns_update.assert_called_once()
tm_delete.assert_not_called()
def test_apply_update_dynamic_A(self):
# existing is simple, new is dynamic
provider = self._get_provider()
simple_record = Record.new(zone, 'foo', data={
'type': 'A',
'ttl': 3600,
'values': ['1.1.1.1', '2.2.2.2'],
})
dynamic_record = Record.new(zone, simple_record.name, data={
'type': 'A',
'ttl': 60,
'values': ['1.1.1.1'],
'dynamic': {
'pools': {
'one': {
'values': [
{'value': '8.8.8.8'},
{'value': '4.4.4.4'},
],
'fallback': 'two',
},
'two': {
'values': [{'value': '9.9.9.9'}],
},
},
'rules': [
{'geos': ['AF'], 'pool': 'two'},
{'pool': 'one'},
],
}
})
num_tms = len(provider._generate_traffic_managers(dynamic_record))
change = Update(simple_record, dynamic_record)
provider._apply_Update(change)
tm_sync, dns_update, tm_delete = (
provider._tm_client.profiles.create_or_update,
provider._dns_client.record_sets.create_or_update,
provider._tm_client.profiles.delete
)
# sync is called once for each profile, plus 1 at the end for nested
# endpoints to workaround A/AAAA nesting limitation in Azure
self.assertEqual(tm_sync.call_count, num_tms + 1)
dns_update.assert_called_once()
tm_delete.assert_not_called()
# both are dynamic, healthcheck port is changed to trigger sync on
# all profiles
provider = self._get_provider()
dynamic_record2 = Record.new(zone, dynamic_record.name, data={
'type': dynamic_record._type,
'ttl': 300,
'values': dynamic_record.values,
'dynamic': dynamic_record.dynamic._data(),
'octodns': {
'healthcheck': {'port': 4433},
}
})
change = Update(dynamic_record, dynamic_record2)
provider._apply_Update(change)
tm_sync, dns_update, tm_delete = (
provider._tm_client.profiles.create_or_update,
provider._dns_client.record_sets.create_or_update,
provider._tm_client.profiles.delete
)
# sync is called once for each profile, extra call at the end is not
# needed when existing dynamic record is already aliased to its root
# profile
self.assertEqual(tm_sync.call_count, num_tms)
dns_update.assert_not_called()
tm_delete.assert_not_called()
def test_apply_update_dynamic_A_singluar(self):
# existing is simple, new is dynamic that needs only one profile
provider = self._get_provider()
simple_record = Record.new(zone, 'foo', data={
'type': 'A',
'ttl': 3600,
'values': ['1.1.1.1', '2.2.2.2'],
})
dynamic_record = Record.new(zone, simple_record.name, data={
'type': 'A',
'ttl': 60,
'values': ['1.1.1.1'],
'dynamic': {
'pools': {
'one': {
'values': [
{'value': '8.8.8.8'},
{'value': '1.1.1.1'},
],
},
},
'rules': [
{'pool': 'one'},
],
}
})
num_tms = len(provider._generate_traffic_managers(dynamic_record))
self.assertEqual(num_tms, 1)
change = Update(simple_record, dynamic_record)
provider._apply_Update(change)
tm_sync, dns_update, tm_delete = (
provider._tm_client.profiles.create_or_update,
provider._dns_client.record_sets.create_or_update,
provider._tm_client.profiles.delete
)
self.assertEqual(tm_sync.call_count, num_tms)
dns_update.assert_called_once()
tm_delete.assert_not_called()
def test_apply_delete_dynamic(self):
provider, existing, record = self._get_dynamic_package()
provider._populate_traffic_managers()
profiles = self._get_tm_profiles(provider)
change = Delete(record)
provider._apply_Delete(change)
dns_delete, tm_delete = (
provider._dns_client.record_sets.delete,
provider._tm_client.profiles.delete
)
dns_delete.assert_called_once()
self.assertEqual(tm_delete.call_count, len(profiles))
def test_create_zone(self):
provider = self._get_provider()
changes = []
for i in octo_records:
changes.append(Create(i))
desired = Zone('unit2.test.', [])
err_msg = 'The Resource \'Microsoft.Network/dnszones/unit2.test\' '
err_msg += 'under resource group \'mock_rg\' was not found.'
_get = provider._dns_client.zones.get
_get.side_effect = CloudError(Mock(status=404), err_msg)
self.assertEquals(19, provider.apply(Plan(None, desired, changes,
True)))
def test_check_zone_no_create(self):
provider = self._get_provider()
rs = []
recordSet = RecordSet(a_records=[ARecord(ipv4_address='1.1.1.1')])
recordSet.name, recordSet.ttl, recordSet.type = 'a1', 0, 'A'
rs.append(recordSet)
recordSet = RecordSet(a_records=[ARecord(ipv4_address='1.1.1.1'),
ARecord(ipv4_address='2.2.2.2')])
recordSet.name, recordSet.ttl, recordSet.type = 'a2', 1, 'A'
rs.append(recordSet)
record_list = provider._dns_client.record_sets.list_by_dns_zone
record_list.return_value = rs
err_msg = 'The Resource \'Microsoft.Network/dnszones/unit3.test\' '
err_msg += 'under resource group \'mock_rg\' was not found.'
_get = provider._dns_client.zones.get
_get.side_effect = CloudError(Mock(status=404), err_msg)
exists = provider.populate(Zone('unit3.test.', []))
self.assertFalse(exists)
self.assertEquals(len(zone.records), 0)