mirror of
				https://github.com/github/octodns.git
				synced 2024-05-11 05:55:00 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			1451 lines
		
	
	
		
			56 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1451 lines
		
	
	
		
			56 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#
 | 
						|
#
 | 
						|
#
 | 
						|
 | 
						|
from __future__ import absolute_import, division, print_function, \
 | 
						|
    unicode_literals
 | 
						|
 | 
						|
from octodns.record import Create, Update, Delete, Record
 | 
						|
from octodns.provider.azuredns import _AzureRecord, AzureProvider, \
 | 
						|
    _check_endswith_dot, _parse_azure_type, _traffic_manager_suffix, \
 | 
						|
    _get_monitor, _profile_is_match, AzureException
 | 
						|
from octodns.zone import Zone
 | 
						|
from octodns.provider.base import Plan
 | 
						|
 | 
						|
from azure.mgmt.dns.models import ARecord, AaaaRecord, CaaRecord, \
 | 
						|
    CnameRecord, MxRecord, SrvRecord, NsRecord, PtrRecord, TxtRecord, \
 | 
						|
    RecordSet, SoaRecord, SubResource, Zone as AzureZone
 | 
						|
from azure.mgmt.trafficmanager.models import Profile, DnsConfig, \
 | 
						|
    MonitorConfig, Endpoint, MonitorConfigCustomHeadersItem
 | 
						|
from msrestazure.azure_exceptions import CloudError
 | 
						|
 | 
						|
from six import text_type
 | 
						|
from unittest import TestCase
 | 
						|
from mock import Mock, patch, call
 | 
						|
 | 
						|
 | 
						|
zone = Zone(name='unit.tests.', sub_zones=[])
 | 
						|
octo_records = []
 | 
						|
octo_records.append(Record.new(zone, '', {
 | 
						|
    'ttl': 0,
 | 
						|
    'type': 'A',
 | 
						|
    'values': ['1.2.3.4', '10.10.10.10']}))
 | 
						|
octo_records.append(Record.new(zone, 'a', {
 | 
						|
    'ttl': 1,
 | 
						|
    'type': 'A',
 | 
						|
    'values': ['1.2.3.4', '1.1.1.1']}))
 | 
						|
octo_records.append(Record.new(zone, 'aa', {
 | 
						|
    'ttl': 9001,
 | 
						|
    'type': 'A',
 | 
						|
    'values': ['1.2.4.3']}))
 | 
						|
octo_records.append(Record.new(zone, 'aaa', {
 | 
						|
    'ttl': 2,
 | 
						|
    'type': 'A',
 | 
						|
    'values': ['1.1.1.3']}))
 | 
						|
octo_records.append(Record.new(zone, 'aaaa1', {
 | 
						|
    'ttl': 300,
 | 
						|
    'type': 'AAAA',
 | 
						|
    'values': ['2601:644:500:e210:62f8:1dff:feb8:947a',
 | 
						|
               '2601:642:500:e210:62f8:1dff:feb8:947a'],
 | 
						|
}))
 | 
						|
octo_records.append(Record.new(zone, 'aaaa2', {
 | 
						|
    'ttl': 300,
 | 
						|
    'type': 'AAAA',
 | 
						|
    'value': '2601:644:500:e210:62f8:1dff:feb8:947a'
 | 
						|
}))
 | 
						|
octo_records.append(Record.new(zone, 'caa1', {
 | 
						|
    'ttl': 9,
 | 
						|
    'type': 'CAA',
 | 
						|
    'value': {
 | 
						|
        'flags': 0,
 | 
						|
        'tag': 'issue',
 | 
						|
        'value': 'ca.unit.tests',
 | 
						|
    }}))
 | 
						|
octo_records.append(Record.new(zone, 'caa2', {
 | 
						|
    'ttl': 9,
 | 
						|
    'type': 'CAA',
 | 
						|
    'values': [{
 | 
						|
        'flags': 0,
 | 
						|
        'tag': 'issue',
 | 
						|
        'value': 'ca1.unit.tests',
 | 
						|
    }, {
 | 
						|
        'flags': 0,
 | 
						|
        'tag': 'issue',
 | 
						|
        'value': 'ca2.unit.tests',
 | 
						|
    }]}))
 | 
						|
octo_records.append(Record.new(zone, 'cname', {
 | 
						|
    'ttl': 3,
 | 
						|
    'type': 'CNAME',
 | 
						|
    'value': 'a.unit.tests.'}))
 | 
						|
octo_records.append(Record.new(zone, 'mx1', {
 | 
						|
    'ttl': 3,
 | 
						|
    'type': 'MX',
 | 
						|
    'values': [{
 | 
						|
        'priority': 10,
 | 
						|
        'value': 'mx1.unit.tests.',
 | 
						|
    }, {
 | 
						|
        'priority': 20,
 | 
						|
        'value': 'mx2.unit.tests.',
 | 
						|
    }]}))
 | 
						|
octo_records.append(Record.new(zone, 'mx2', {
 | 
						|
    'ttl': 3,
 | 
						|
    'type': 'MX',
 | 
						|
    'values': [{
 | 
						|
        'priority': 10,
 | 
						|
        'value': 'mx1.unit.tests.',
 | 
						|
    }]}))
 | 
						|
octo_records.append(Record.new(zone, '', {
 | 
						|
    'ttl': 4,
 | 
						|
    'type': 'NS',
 | 
						|
    'values': ['ns1.unit.tests.', 'ns2.unit.tests.']}))
 | 
						|
octo_records.append(Record.new(zone, 'foo', {
 | 
						|
    'ttl': 5,
 | 
						|
    'type': 'NS',
 | 
						|
    'value': 'ns1.unit.tests.'}))
 | 
						|
octo_records.append(Record.new(zone, 'ptr1', {
 | 
						|
    'ttl': 5,
 | 
						|
    'type': 'PTR',
 | 
						|
    'value': 'ptr1.unit.tests.'}))
 | 
						|
octo_records.append(Record.new(zone, '_srv._tcp', {
 | 
						|
    'ttl': 6,
 | 
						|
    'type': 'SRV',
 | 
						|
    'values': [{
 | 
						|
        'priority': 10,
 | 
						|
        'weight': 20,
 | 
						|
        'port': 30,
 | 
						|
        'target': 'foo-1.unit.tests.',
 | 
						|
    }, {
 | 
						|
        'priority': 12,
 | 
						|
        'weight': 30,
 | 
						|
        'port': 30,
 | 
						|
        'target': 'foo-2.unit.tests.',
 | 
						|
    }]}))
 | 
						|
octo_records.append(Record.new(zone, '_srv2._tcp', {
 | 
						|
    'ttl': 7,
 | 
						|
    'type': 'SRV',
 | 
						|
    'values': [{
 | 
						|
        'priority': 12,
 | 
						|
        'weight': 17,
 | 
						|
        'port': 1,
 | 
						|
        'target': 'srvfoo.unit.tests.',
 | 
						|
    }]}))
 | 
						|
octo_records.append(Record.new(zone, 'txt1', {
 | 
						|
    'ttl': 8,
 | 
						|
    'type': 'TXT',
 | 
						|
    'value': 'txt singleton test'}))
 | 
						|
octo_records.append(Record.new(zone, 'txt2', {
 | 
						|
    'ttl': 9,
 | 
						|
    'type': 'TXT',
 | 
						|
    'values': ['txt multiple test', 'txt multiple test 2']}))
 | 
						|
 | 
						|
long_txt = "v=spf1 ip4:10.10.0.0/24 ip4:10.10.1.0/24 ip4:10.10.2.0/24"
 | 
						|
long_txt += " ip4:10.10.3.0/24 ip4:10.10.4.0/24 ip4:10.10.5.0/24 "
 | 
						|
long_txt += " 10.6.0/24 ip4:10.10.7.0/24 ip4:10.10.8.0/24 "
 | 
						|
long_txt += " ip4:10.10.10.0/24 ip4:10.10.11.0/24 ip4:10.10.12.0/24"
 | 
						|
long_txt += " ip4:10.10.13.0/24 ip4:10.10.14.0/24 ip4:10.10.15.0/24"
 | 
						|
long_txt += " ip4:10.10.16.0/24 ip4:10.10.17.0/24 ip4:10.10.18.0/24"
 | 
						|
long_txt += " ip4:10.10.19.0/24 ip4:10.10.20.0/24  ~all"
 | 
						|
octo_records.append(Record.new(zone, 'txt3', {
 | 
						|
    'ttl': 10,
 | 
						|
    'type': 'TXT',
 | 
						|
    'values': ['txt multiple test', long_txt]}))
 | 
						|
 | 
						|
azure_records = []
 | 
						|
_base0 = _AzureRecord('TestAzure', octo_records[0])
 | 
						|
_base0.zone_name = 'unit.tests'
 | 
						|
_base0.relative_record_set_name = '@'
 | 
						|
_base0.record_type = 'A'
 | 
						|
_base0.params['ttl'] = 0
 | 
						|
_base0.params['a_records'] = [ARecord(ipv4_address='1.2.3.4'),
 | 
						|
                              ARecord(ipv4_address='10.10.10.10')]
 | 
						|
azure_records.append(_base0)
 | 
						|
 | 
						|
_base1 = _AzureRecord('TestAzure', octo_records[1])
 | 
						|
_base1.zone_name = 'unit.tests'
 | 
						|
_base1.relative_record_set_name = 'a'
 | 
						|
_base1.record_type = 'A'
 | 
						|
_base1.params['ttl'] = 1
 | 
						|
_base1.params['a_records'] = [ARecord(ipv4_address='1.2.3.4'),
 | 
						|
                              ARecord(ipv4_address='1.1.1.1')]
 | 
						|
azure_records.append(_base1)
 | 
						|
 | 
						|
_base2 = _AzureRecord('TestAzure', octo_records[2])
 | 
						|
_base2.zone_name = 'unit.tests'
 | 
						|
_base2.relative_record_set_name = 'aa'
 | 
						|
_base2.record_type = 'A'
 | 
						|
_base2.params['ttl'] = 9001
 | 
						|
_base2.params['a_records'] = ARecord(ipv4_address='1.2.4.3')
 | 
						|
azure_records.append(_base2)
 | 
						|
 | 
						|
_base3 = _AzureRecord('TestAzure', octo_records[3])
 | 
						|
_base3.zone_name = 'unit.tests'
 | 
						|
_base3.relative_record_set_name = 'aaa'
 | 
						|
_base3.record_type = 'A'
 | 
						|
_base3.params['ttl'] = 2
 | 
						|
_base3.params['a_records'] = ARecord(ipv4_address='1.1.1.3')
 | 
						|
azure_records.append(_base3)
 | 
						|
 | 
						|
_base4 = _AzureRecord('TestAzure', octo_records[4])
 | 
						|
_base4.zone_name = 'unit.tests'
 | 
						|
_base4.relative_record_set_name = 'aaaa1'
 | 
						|
_base4.record_type = 'AAAA'
 | 
						|
_base4.params['ttl'] = 300
 | 
						|
aaaa1 = AaaaRecord(ipv6_address='2601:644:500:e210:62f8:1dff:feb8:947a')
 | 
						|
aaaa2 = AaaaRecord(ipv6_address='2601:642:500:e210:62f8:1dff:feb8:947a')
 | 
						|
_base4.params['aaaa_records'] = [aaaa1, aaaa2]
 | 
						|
azure_records.append(_base4)
 | 
						|
 | 
						|
_base5 = _AzureRecord('TestAzure', octo_records[5])
 | 
						|
_base5.zone_name = 'unit.tests'
 | 
						|
_base5.relative_record_set_name = 'aaaa2'
 | 
						|
_base5.record_type = 'AAAA'
 | 
						|
_base5.params['ttl'] = 300
 | 
						|
_base5.params['aaaa_records'] = [aaaa1]
 | 
						|
azure_records.append(_base5)
 | 
						|
 | 
						|
_base6 = _AzureRecord('TestAzure', octo_records[6])
 | 
						|
_base6.zone_name = 'unit.tests'
 | 
						|
_base6.relative_record_set_name = 'caa1'
 | 
						|
_base6.record_type = 'CAA'
 | 
						|
_base6.params['ttl'] = 9
 | 
						|
_base6.params['caa_records'] = [CaaRecord(flags=0,
 | 
						|
                                          tag='issue',
 | 
						|
                                          value='ca.unit.tests')]
 | 
						|
azure_records.append(_base6)
 | 
						|
 | 
						|
_base7 = _AzureRecord('TestAzure', octo_records[7])
 | 
						|
_base7.zone_name = 'unit.tests'
 | 
						|
_base7.relative_record_set_name = 'caa2'
 | 
						|
_base7.record_type = 'CAA'
 | 
						|
_base7.params['ttl'] = 9
 | 
						|
_base7.params['caa_records'] = [CaaRecord(flags=0,
 | 
						|
                                          tag='issue',
 | 
						|
                                          value='ca1.unit.tests'),
 | 
						|
                                CaaRecord(flags=0,
 | 
						|
                                          tag='issue',
 | 
						|
                                          value='ca2.unit.tests')]
 | 
						|
azure_records.append(_base7)
 | 
						|
 | 
						|
_base8 = _AzureRecord('TestAzure', octo_records[8])
 | 
						|
_base8.zone_name = 'unit.tests'
 | 
						|
_base8.relative_record_set_name = 'cname'
 | 
						|
_base8.record_type = 'CNAME'
 | 
						|
_base8.params['ttl'] = 3
 | 
						|
_base8.params['cname_record'] = CnameRecord(cname='a.unit.tests.')
 | 
						|
azure_records.append(_base8)
 | 
						|
 | 
						|
_base9 = _AzureRecord('TestAzure', octo_records[9])
 | 
						|
_base9.zone_name = 'unit.tests'
 | 
						|
_base9.relative_record_set_name = 'mx1'
 | 
						|
_base9.record_type = 'MX'
 | 
						|
_base9.params['ttl'] = 3
 | 
						|
_base9.params['mx_records'] = [MxRecord(preference=10,
 | 
						|
                                        exchange='mx1.unit.tests.'),
 | 
						|
                               MxRecord(preference=20,
 | 
						|
                                        exchange='mx2.unit.tests.')]
 | 
						|
azure_records.append(_base9)
 | 
						|
 | 
						|
_base10 = _AzureRecord('TestAzure', octo_records[10])
 | 
						|
_base10.zone_name = 'unit.tests'
 | 
						|
_base10.relative_record_set_name = 'mx2'
 | 
						|
_base10.record_type = 'MX'
 | 
						|
_base10.params['ttl'] = 3
 | 
						|
_base10.params['mx_records'] = [MxRecord(preference=10,
 | 
						|
                                         exchange='mx1.unit.tests.')]
 | 
						|
azure_records.append(_base10)
 | 
						|
 | 
						|
_base11 = _AzureRecord('TestAzure', octo_records[11])
 | 
						|
_base11.zone_name = 'unit.tests'
 | 
						|
_base11.relative_record_set_name = '@'
 | 
						|
_base11.record_type = 'NS'
 | 
						|
_base11.params['ttl'] = 4
 | 
						|
_base11.params['ns_records'] = [NsRecord(nsdname='ns1.unit.tests.'),
 | 
						|
                                NsRecord(nsdname='ns2.unit.tests.')]
 | 
						|
azure_records.append(_base11)
 | 
						|
 | 
						|
_base12 = _AzureRecord('TestAzure', octo_records[12])
 | 
						|
_base12.zone_name = 'unit.tests'
 | 
						|
_base12.relative_record_set_name = 'foo'
 | 
						|
_base12.record_type = 'NS'
 | 
						|
_base12.params['ttl'] = 5
 | 
						|
_base12.params['ns_records'] = [NsRecord(nsdname='ns1.unit.tests.')]
 | 
						|
azure_records.append(_base12)
 | 
						|
 | 
						|
_base13 = _AzureRecord('TestAzure', octo_records[13])
 | 
						|
_base13.zone_name = 'unit.tests'
 | 
						|
_base13.relative_record_set_name = 'ptr1'
 | 
						|
_base13.record_type = 'PTR'
 | 
						|
_base13.params['ttl'] = 5
 | 
						|
_base13.params['ptr_records'] = [PtrRecord(ptrdname='ptr1.unit.tests.')]
 | 
						|
azure_records.append(_base13)
 | 
						|
 | 
						|
_base14 = _AzureRecord('TestAzure', octo_records[14])
 | 
						|
_base14.zone_name = 'unit.tests'
 | 
						|
_base14.relative_record_set_name = '_srv._tcp'
 | 
						|
_base14.record_type = 'SRV'
 | 
						|
_base14.params['ttl'] = 6
 | 
						|
_base14.params['srv_records'] = [SrvRecord(priority=10,
 | 
						|
                                           weight=20,
 | 
						|
                                           port=30,
 | 
						|
                                           target='foo-1.unit.tests.'),
 | 
						|
                                 SrvRecord(priority=12,
 | 
						|
                                           weight=30,
 | 
						|
                                           port=30,
 | 
						|
                                           target='foo-2.unit.tests.')]
 | 
						|
azure_records.append(_base14)
 | 
						|
 | 
						|
_base15 = _AzureRecord('TestAzure', octo_records[15])
 | 
						|
_base15.zone_name = 'unit.tests'
 | 
						|
_base15.relative_record_set_name = '_srv2._tcp'
 | 
						|
_base15.record_type = 'SRV'
 | 
						|
_base15.params['ttl'] = 7
 | 
						|
_base15.params['srv_records'] = [SrvRecord(priority=12,
 | 
						|
                                           weight=17,
 | 
						|
                                           port=1,
 | 
						|
                                           target='srvfoo.unit.tests.')]
 | 
						|
azure_records.append(_base15)
 | 
						|
 | 
						|
_base16 = _AzureRecord('TestAzure', octo_records[16])
 | 
						|
_base16.zone_name = 'unit.tests'
 | 
						|
_base16.relative_record_set_name = 'txt1'
 | 
						|
_base16.record_type = 'TXT'
 | 
						|
_base16.params['ttl'] = 8
 | 
						|
_base16.params['txt_records'] = [TxtRecord(value=['txt singleton test'])]
 | 
						|
azure_records.append(_base16)
 | 
						|
 | 
						|
_base17 = _AzureRecord('TestAzure', octo_records[17])
 | 
						|
_base17.zone_name = 'unit.tests'
 | 
						|
_base17.relative_record_set_name = 'txt2'
 | 
						|
_base17.record_type = 'TXT'
 | 
						|
_base17.params['ttl'] = 9
 | 
						|
_base17.params['txt_records'] = [TxtRecord(value=['txt multiple test']),
 | 
						|
                                 TxtRecord(value=['txt multiple test 2'])]
 | 
						|
azure_records.append(_base17)
 | 
						|
 | 
						|
long_txt_az1 = "v=spf1 ip4:10.10.0.0/24 ip4:10.10.1.0/24 ip4:10.10.2.0/24"
 | 
						|
long_txt_az1 += " ip4:10.10.3.0/24 ip4:10.10.4.0/24 ip4:10.10.5.0/24 "
 | 
						|
long_txt_az1 += " 10.6.0/24 ip4:10.10.7.0/24 ip4:10.10.8.0/24 "
 | 
						|
long_txt_az1 += " ip4:10.10.10.0/24 ip4:10.10.11.0/24 ip4:10.10.12.0/24"
 | 
						|
long_txt_az1 += " ip4:10.10.13.0/24 ip4:10.10.14.0/24 ip4:10.10."
 | 
						|
long_txt_az2 = "15.0/24 ip4:10.10.16.0/24 ip4:10.10.17.0/24 ip4:10.10.18.0/24"
 | 
						|
long_txt_az2 += " ip4:10.10.19.0/24 ip4:10.10.20.0/24  ~all"
 | 
						|
_base18 = _AzureRecord('TestAzure', octo_records[18])
 | 
						|
_base18.zone_name = 'unit.tests'
 | 
						|
_base18.relative_record_set_name = 'txt3'
 | 
						|
_base18.record_type = 'TXT'
 | 
						|
_base18.params['ttl'] = 10
 | 
						|
_base18.params['txt_records'] = [TxtRecord(value=['txt multiple test']),
 | 
						|
                                 TxtRecord(value=[long_txt_az1, long_txt_az2])]
 | 
						|
azure_records.append(_base18)
 | 
						|
 | 
						|
 | 
						|
class Test_AzureRecord(TestCase):
 | 
						|
    def test_azure_record(self):
 | 
						|
        assert(len(azure_records) == len(octo_records))
 | 
						|
        for i in range(len(azure_records)):
 | 
						|
            octo = _AzureRecord('TestAzure', octo_records[i])
 | 
						|
            assert(azure_records[i]._equals(octo))
 | 
						|
 | 
						|
 | 
						|
class Test_DynamicAzureRecord(TestCase):
 | 
						|
    def test_azure_record(self):
 | 
						|
        tm_profile = Profile()
 | 
						|
        data = {
 | 
						|
            'ttl': 60,
 | 
						|
            'type': 'CNAME',
 | 
						|
            'value': 'default.unit.tests.',
 | 
						|
            'dynamic': {
 | 
						|
                'pools': {
 | 
						|
                    'one': {
 | 
						|
                        'values': [
 | 
						|
                            {'value': 'one.unit.tests.', 'weight': 1}
 | 
						|
                        ],
 | 
						|
                        'fallback': 'two',
 | 
						|
                    },
 | 
						|
                    'two': {
 | 
						|
                        'values': [
 | 
						|
                            {'value': 'two.unit.tests.', 'weight': 1}
 | 
						|
                        ],
 | 
						|
                    },
 | 
						|
                },
 | 
						|
                'rules': [
 | 
						|
                    {'geos': ['AF'], 'pool': 'one'},
 | 
						|
                    {'pool': 'two'},
 | 
						|
                ],
 | 
						|
            }
 | 
						|
        }
 | 
						|
        octo_record = Record.new(zone, 'foo', data)
 | 
						|
        azure_record = _AzureRecord('TestAzure', octo_record,
 | 
						|
                                    traffic_manager=tm_profile)
 | 
						|
        self.assertEqual(azure_record.zone_name, zone.name[:-1])
 | 
						|
        self.assertEqual(azure_record.relative_record_set_name, 'foo')
 | 
						|
        self.assertEqual(azure_record.record_type, 'CNAME')
 | 
						|
        self.assertEqual(azure_record.params['ttl'], 60)
 | 
						|
        self.assertEqual(azure_record.params['target_resource'], tm_profile)
 | 
						|
 | 
						|
 | 
						|
class Test_ParseAzureType(TestCase):
 | 
						|
    def test_parse_azure_type(self):
 | 
						|
        for expected, test in [['A', 'Microsoft.Network/dnszones/A'],
 | 
						|
                               ['AAAA', 'Microsoft.Network/dnszones/AAAA'],
 | 
						|
                               ['NS', 'Microsoft.Network/dnszones/NS'],
 | 
						|
                               ['MX', 'Microsoft.Network/dnszones/MX']]:
 | 
						|
            self.assertEquals(expected, _parse_azure_type(test))
 | 
						|
 | 
						|
 | 
						|
class Test_CheckEndswithDot(TestCase):
 | 
						|
    def test_check_endswith_dot(self):
 | 
						|
        for expected, test in [['a.', 'a'],
 | 
						|
                               ['a.', 'a.'],
 | 
						|
                               ['foo.bar.', 'foo.bar.'],
 | 
						|
                               ['foo.bar.', 'foo.bar']]:
 | 
						|
            self.assertEquals(expected, _check_endswith_dot(test))
 | 
						|
 | 
						|
 | 
						|
class Test_TrafficManagerSuffix(TestCase):
 | 
						|
    def test_traffic_manager_suffix(self):
 | 
						|
        test = Record.new(zone, 'foo', data={
 | 
						|
            'ttl': 60, 'type': 'CNAME', 'value': 'default.unit.tests.',
 | 
						|
        })
 | 
						|
        self.assertEqual(_traffic_manager_suffix(test), 'foo-unit-tests')
 | 
						|
 | 
						|
 | 
						|
class Test_GetMonitor(TestCase):
 | 
						|
    def test_get_monitor(self):
 | 
						|
        record = Record.new(zone, 'foo', data={
 | 
						|
            'type': 'CNAME', 'ttl': 60, 'value': 'default.unit.tests.',
 | 
						|
            'octodns': {
 | 
						|
                'healthcheck': {
 | 
						|
                    'path': '/_ping',
 | 
						|
                    'port': 4443,
 | 
						|
                    'protocol': 'HTTPS',
 | 
						|
                }
 | 
						|
            },
 | 
						|
        })
 | 
						|
 | 
						|
        monitor = _get_monitor(record)
 | 
						|
        self.assertEqual(monitor.protocol, 'HTTPS')
 | 
						|
        self.assertEqual(monitor.port, 4443)
 | 
						|
        self.assertEqual(monitor.path, '/_ping')
 | 
						|
        headers = monitor.custom_headers
 | 
						|
        self.assertIsInstance(headers, list)
 | 
						|
        self.assertEquals(len(headers), 1)
 | 
						|
        headers = headers[0]
 | 
						|
        self.assertEqual(headers.name, 'Host')
 | 
						|
        self.assertEqual(headers.value, record.healthcheck_host)
 | 
						|
 | 
						|
        # test TCP monitor
 | 
						|
        record._octodns['healthcheck']['protocol'] = 'TCP'
 | 
						|
        monitor = _get_monitor(record)
 | 
						|
        self.assertEqual(monitor.protocol, 'TCP')
 | 
						|
        self.assertIsNone(monitor.custom_headers)
 | 
						|
 | 
						|
 | 
						|
class Test_ProfileIsMatch(TestCase):
 | 
						|
    def test_profile_is_match(self):
 | 
						|
        is_match = _profile_is_match
 | 
						|
 | 
						|
        self.assertFalse(is_match(None, Profile()))
 | 
						|
 | 
						|
        # Profile object builder with default property values that can be
 | 
						|
        # overridden for testing below
 | 
						|
        def profile(
 | 
						|
            name = 'foo-unit-tests',
 | 
						|
            ttl = 60,
 | 
						|
            method = 'Geographic',
 | 
						|
            monitor_proto = 'HTTPS',
 | 
						|
            monitor_port = 4443,
 | 
						|
            monitor_path = '/_ping',
 | 
						|
            endpoints = 1,
 | 
						|
            endpoint_name = 'name',
 | 
						|
            endpoint_type = 'profile/nestedEndpoints',
 | 
						|
            target = 'target.unit.tests',
 | 
						|
            target_id = 'resource/id',
 | 
						|
            geos = ['GEO-AF'],
 | 
						|
            weight = 1,
 | 
						|
            priority = 1,
 | 
						|
        ):
 | 
						|
            dns = DnsConfig(ttl=ttl)
 | 
						|
            return Profile(
 | 
						|
                name=name, traffic_routing_method=method, dns_config=dns,
 | 
						|
                monitor_config=MonitorConfig(
 | 
						|
                    protocol=monitor_proto,
 | 
						|
                    port=monitor_port,
 | 
						|
                    path=monitor_path,
 | 
						|
                ),
 | 
						|
                endpoints=[Endpoint(
 | 
						|
                    name=endpoint_name,
 | 
						|
                    type=endpoint_type,
 | 
						|
                    target=target,
 | 
						|
                    target_resource_id=target_id,
 | 
						|
                    geo_mapping=geos,
 | 
						|
                    weight=weight,
 | 
						|
                    priority=priority,
 | 
						|
                )] + [Endpoint()] * (endpoints - 1),
 | 
						|
            )
 | 
						|
 | 
						|
        self.assertTrue(is_match(profile(), profile()))
 | 
						|
 | 
						|
        self.assertFalse(is_match(profile(), profile(name='two')))
 | 
						|
        self.assertFalse(is_match(profile(), profile(endpoints=2)))
 | 
						|
        self.assertFalse(is_match(profile(), profile(monitor_proto='HTTP')))
 | 
						|
        self.assertFalse(is_match(profile(), profile(endpoint_name='a')))
 | 
						|
        self.assertFalse(is_match(profile(), profile(endpoint_type='b')))
 | 
						|
        self.assertFalse(
 | 
						|
            is_match(profile(endpoint_type='b'), profile(endpoint_type='b'))
 | 
						|
        )
 | 
						|
        self.assertFalse(is_match(profile(), profile(target_id='rsrc/id2')))
 | 
						|
        self.assertFalse(is_match(profile(), profile(geos=['IN'])))
 | 
						|
 | 
						|
        def wprofile(**kwargs):
 | 
						|
            kwargs['method'] = 'Weighted'
 | 
						|
            kwargs['endpoint_type'] = 'profile/externalEndpoints'
 | 
						|
            return profile(**kwargs)
 | 
						|
 | 
						|
        self.assertFalse(is_match(wprofile(), wprofile(target='bar.unit')))
 | 
						|
        self.assertFalse(is_match(wprofile(), wprofile(weight=3)))
 | 
						|
 | 
						|
 | 
						|
class TestAzureDnsProvider(TestCase):
 | 
						|
    def _provider(self):
 | 
						|
        return self._get_provider('mock_spc', 'mock_dns_client')
 | 
						|
 | 
						|
    @patch('octodns.provider.azuredns.TrafficManagerManagementClient')
 | 
						|
    @patch('octodns.provider.azuredns.DnsManagementClient')
 | 
						|
    @patch('octodns.provider.azuredns.ClientSecretCredential')
 | 
						|
    @patch('octodns.provider.azuredns.ServicePrincipalCredentials')
 | 
						|
    def _get_provider(self, mock_spc, mock_css, mock_dns_client,
 | 
						|
                      mock_tm_client):
 | 
						|
        '''Returns a mock AzureProvider object to use in testing.
 | 
						|
 | 
						|
            :param mock_spc: placeholder
 | 
						|
            :type  mock_spc: str
 | 
						|
            :param mock_dns_client: placeholder
 | 
						|
            :type  mock_dns_client: str
 | 
						|
            :param mock_tm_client: placeholder
 | 
						|
            :type  mock_tm_client: str
 | 
						|
 | 
						|
            :type return: AzureProvider
 | 
						|
        '''
 | 
						|
        provider = AzureProvider('mock_id', 'mock_client', 'mock_key',
 | 
						|
                                 'mock_directory', 'mock_sub', 'mock_rg'
 | 
						|
                                 )
 | 
						|
 | 
						|
        # Fetch the client to force it to load the creds
 | 
						|
        provider._dns_client
 | 
						|
 | 
						|
        # set critical functions to return properly
 | 
						|
        tm_list = provider._tm_client.profiles.list_by_resource_group
 | 
						|
        tm_list.return_value = []
 | 
						|
        tm_sync = provider._tm_client.profiles.create_or_update
 | 
						|
 | 
						|
        def side_effect(rg, name, profile):
 | 
						|
            return profile
 | 
						|
 | 
						|
        tm_sync.side_effect = side_effect
 | 
						|
 | 
						|
        return provider
 | 
						|
 | 
						|
    def _get_dynamic_record(self, zone):
 | 
						|
        return Record.new(zone, 'foo', data={
 | 
						|
            'type': 'CNAME',
 | 
						|
            'ttl': 60,
 | 
						|
            'value': 'default.unit.tests.',
 | 
						|
            'dynamic': {
 | 
						|
                'pools': {
 | 
						|
                    'one': {
 | 
						|
                        'values': [
 | 
						|
                            {'value': 'one.unit.tests.', 'weight': 11},
 | 
						|
                        ],
 | 
						|
                        'fallback': 'two',
 | 
						|
                    },
 | 
						|
                    'two': {
 | 
						|
                        'values': [
 | 
						|
                            {'value': 'two1.unit.tests.', 'weight': 3},
 | 
						|
                            {'value': 'two2.unit.tests.', 'weight': 4},
 | 
						|
                        ],
 | 
						|
                        'fallback': 'three',
 | 
						|
                    },
 | 
						|
                    'three': {
 | 
						|
                        'values': [
 | 
						|
                            {'value': 'three.unit.tests.', 'weight': 13},
 | 
						|
                        ],
 | 
						|
                    },
 | 
						|
                },
 | 
						|
                'rules': [
 | 
						|
                    {'geos': ['AF', 'EU-DE', 'NA-US-CA'], 'pool': 'one'},
 | 
						|
                    {'pool': 'three'},
 | 
						|
                ],
 | 
						|
            },
 | 
						|
            'octodns': {
 | 
						|
                'healthcheck': {
 | 
						|
                    'path': '/_ping',
 | 
						|
                    'port': 4443,
 | 
						|
                    'protocol': 'HTTPS',
 | 
						|
                }
 | 
						|
            },
 | 
						|
        })
 | 
						|
 | 
						|
    def _get_tm_profiles(self, provider):
 | 
						|
        sub = provider._dns_client_subscription_id
 | 
						|
        rg = provider._resource_group
 | 
						|
        base_id = '/subscriptions/' + sub + \
 | 
						|
            '/resourceGroups/' + rg + \
 | 
						|
            '/providers/Microsoft.Network/trafficManagerProfiles/'
 | 
						|
        suffix = 'foo-unit-tests'
 | 
						|
        id_format = base_id + '{}--' + suffix
 | 
						|
        name_format = '{}--' + suffix
 | 
						|
 | 
						|
        dns = DnsConfig(ttl=60)
 | 
						|
        header = MonitorConfigCustomHeadersItem(name='Host',
 | 
						|
                                                value='foo.unit.tests')
 | 
						|
        monitor = MonitorConfig(protocol='HTTPS', port=4443, path='/_ping',
 | 
						|
                                custom_headers=[header])
 | 
						|
        external = 'Microsoft.Network/trafficManagerProfiles/externalEndpoints'
 | 
						|
        nested = 'Microsoft.Network/trafficManagerProfiles/nestedEndpoints'
 | 
						|
 | 
						|
        return [
 | 
						|
            Profile(
 | 
						|
                id=id_format.format('default'),
 | 
						|
                name=name_format.format('default'),
 | 
						|
                traffic_routing_method='Weighted',
 | 
						|
                dns_config=dns,
 | 
						|
                monitor_config=monitor,
 | 
						|
                endpoints=[
 | 
						|
                    Endpoint(
 | 
						|
                        name='default.unit.tests',
 | 
						|
                        type=external,
 | 
						|
                        target='default.unit.tests',
 | 
						|
                        weight=1,
 | 
						|
                    ),
 | 
						|
                ],
 | 
						|
            ),
 | 
						|
            Profile(
 | 
						|
                id=id_format.format('pool-one'),
 | 
						|
                name=name_format.format('pool-one'),
 | 
						|
                traffic_routing_method='Weighted',
 | 
						|
                dns_config=dns,
 | 
						|
                monitor_config=monitor,
 | 
						|
                endpoints=[
 | 
						|
                    Endpoint(
 | 
						|
                        name='one.unit.tests',
 | 
						|
                        type=external,
 | 
						|
                        target='one.unit.tests',
 | 
						|
                        weight=11,
 | 
						|
                    ),
 | 
						|
                ],
 | 
						|
            ),
 | 
						|
            Profile(
 | 
						|
                id=id_format.format('pool-two'),
 | 
						|
                name=name_format.format('pool-two'),
 | 
						|
                traffic_routing_method='Weighted',
 | 
						|
                dns_config=dns,
 | 
						|
                monitor_config=monitor,
 | 
						|
                endpoints=[
 | 
						|
                    Endpoint(
 | 
						|
                        name='two1.unit.tests',
 | 
						|
                        type=external,
 | 
						|
                        target='two1.unit.tests',
 | 
						|
                        weight=3,
 | 
						|
                    ),
 | 
						|
                    Endpoint(
 | 
						|
                        name='two2.unit.tests',
 | 
						|
                        type=external,
 | 
						|
                        target='two2.unit.tests',
 | 
						|
                        weight=4,
 | 
						|
                    ),
 | 
						|
                ],
 | 
						|
            ),
 | 
						|
            Profile(
 | 
						|
                id=id_format.format('pool-three'),
 | 
						|
                name=name_format.format('pool-three'),
 | 
						|
                traffic_routing_method='Weighted',
 | 
						|
                dns_config=dns,
 | 
						|
                monitor_config=monitor,
 | 
						|
                endpoints=[
 | 
						|
                    Endpoint(
 | 
						|
                        name='three.unit.tests',
 | 
						|
                        type=external,
 | 
						|
                        target='three.unit.tests',
 | 
						|
                        weight=13,
 | 
						|
                    ),
 | 
						|
                ],
 | 
						|
            ),
 | 
						|
            Profile(
 | 
						|
                id=id_format.format('rule-one'),
 | 
						|
                name=name_format.format('rule-one'),
 | 
						|
                traffic_routing_method='Priority',
 | 
						|
                dns_config=dns,
 | 
						|
                monitor_config=monitor,
 | 
						|
                endpoints=[
 | 
						|
                    Endpoint(
 | 
						|
                        name='one',
 | 
						|
                        type=nested,
 | 
						|
                        target_resource_id=id_format.format('pool-one'),
 | 
						|
                        priority=1,
 | 
						|
                    ),
 | 
						|
                    Endpoint(
 | 
						|
                        name='two',
 | 
						|
                        type=nested,
 | 
						|
                        target_resource_id=id_format.format('pool-two'),
 | 
						|
                        priority=2,
 | 
						|
                    ),
 | 
						|
                    Endpoint(
 | 
						|
                        name='three',
 | 
						|
                        type=nested,
 | 
						|
                        target_resource_id=id_format.format('pool-three'),
 | 
						|
                        priority=3,
 | 
						|
                    ),
 | 
						|
                    Endpoint(
 | 
						|
                        name='--default--',
 | 
						|
                        type=nested,
 | 
						|
                        target_resource_id=id_format.format('default'),
 | 
						|
                        priority=4,
 | 
						|
                    ),
 | 
						|
                ],
 | 
						|
            ),
 | 
						|
            Profile(
 | 
						|
                id=id_format.format('rule-three'),
 | 
						|
                name=name_format.format('rule-three'),
 | 
						|
                traffic_routing_method='Priority',
 | 
						|
                dns_config=dns,
 | 
						|
                monitor_config=monitor,
 | 
						|
                endpoints=[
 | 
						|
                    Endpoint(
 | 
						|
                        name='three',
 | 
						|
                        type=nested,
 | 
						|
                        target_resource_id=id_format.format('pool-three'),
 | 
						|
                        priority=1,
 | 
						|
                    ),
 | 
						|
                    Endpoint(
 | 
						|
                        name='--default--',
 | 
						|
                        type=nested,
 | 
						|
                        target_resource_id=id_format.format('default'),
 | 
						|
                        priority=2,
 | 
						|
                    ),
 | 
						|
                ],
 | 
						|
            ),
 | 
						|
            Profile(
 | 
						|
                id=base_id + suffix,
 | 
						|
                name=suffix,
 | 
						|
                traffic_routing_method='Geographic',
 | 
						|
                dns_config=dns,
 | 
						|
                monitor_config=monitor,
 | 
						|
                endpoints=[
 | 
						|
                    Endpoint(
 | 
						|
                        geo_mapping=['GEO-AF', 'DE', 'US-CA'],
 | 
						|
                        name='rule-one',
 | 
						|
                        type=nested,
 | 
						|
                        target_resource_id=id_format.format('rule-one'),
 | 
						|
                    ),
 | 
						|
                    Endpoint(
 | 
						|
                        geo_mapping=['WORLD'],
 | 
						|
                        name='rule-three',
 | 
						|
                        type=nested,
 | 
						|
                        target_resource_id=id_format.format('rule-three'),
 | 
						|
                    ),
 | 
						|
                ],
 | 
						|
            ),
 | 
						|
        ]
 | 
						|
 | 
						|
    def _get_dynamic_package(self):
 | 
						|
        '''Convenience function to setup a sample dynamic record.
 | 
						|
        '''
 | 
						|
        provider = self._get_provider()
 | 
						|
 | 
						|
        # setup traffic manager profiles
 | 
						|
        tm_list = provider._tm_client.profiles.list_by_resource_group
 | 
						|
        tm_list.return_value = self._get_tm_profiles(provider)
 | 
						|
 | 
						|
        # setup zone with dynamic record
 | 
						|
        zone = Zone(name='unit.tests.', sub_zones=[])
 | 
						|
        record = self._get_dynamic_record(zone)
 | 
						|
        zone.add_record(record)
 | 
						|
 | 
						|
        # return everything
 | 
						|
        return provider, zone, record
 | 
						|
 | 
						|
    def test_populate_records(self):
 | 
						|
        provider = self._get_provider()
 | 
						|
 | 
						|
        rs = []
 | 
						|
        recordSet = RecordSet(a_records=[ARecord(ipv4_address='1.1.1.1')])
 | 
						|
        recordSet.name, recordSet.ttl, recordSet.type = 'a1', 0, 'A'
 | 
						|
        recordSet.target_resource = SubResource()
 | 
						|
        rs.append(recordSet)
 | 
						|
        recordSet = RecordSet(a_records=[ARecord(ipv4_address='1.1.1.1'),
 | 
						|
                                         ARecord(ipv4_address='2.2.2.2')])
 | 
						|
        recordSet.name, recordSet.ttl, recordSet.type = 'a2', 1, 'A'
 | 
						|
        recordSet.target_resource = SubResource()
 | 
						|
        rs.append(recordSet)
 | 
						|
        aaaa1 = AaaaRecord(ipv6_address='1:1ec:1::1')
 | 
						|
        recordSet = RecordSet(aaaa_records=[aaaa1])
 | 
						|
        recordSet.name, recordSet.ttl, recordSet.type = 'aaaa1', 2, 'AAAA'
 | 
						|
        recordSet.target_resource = SubResource()
 | 
						|
        rs.append(recordSet)
 | 
						|
        aaaa2 = AaaaRecord(ipv6_address='1:1ec:1::2')
 | 
						|
        recordSet = RecordSet(aaaa_records=[aaaa1,
 | 
						|
                                            aaaa2])
 | 
						|
        recordSet.name, recordSet.ttl, recordSet.type = 'aaaa2', 3, 'AAAA'
 | 
						|
        recordSet.target_resource = SubResource()
 | 
						|
        rs.append(recordSet)
 | 
						|
        recordSet = RecordSet(caa_records=[CaaRecord(flags=0,
 | 
						|
                                                     tag='issue',
 | 
						|
                                                     value='caa1.unit.tests')])
 | 
						|
        recordSet.name, recordSet.ttl, recordSet.type = 'caa1', 4, 'CAA'
 | 
						|
        rs.append(recordSet)
 | 
						|
        recordSet = RecordSet(caa_records=[CaaRecord(flags=0,
 | 
						|
                                                     tag='issue',
 | 
						|
                                                     value='caa1.unit.tests'),
 | 
						|
                                           CaaRecord(flags=0,
 | 
						|
                                                     tag='issue',
 | 
						|
                                                     value='caa2.unit.tests')])
 | 
						|
        recordSet.name, recordSet.ttl, recordSet.type = 'caa2', 4, 'CAA'
 | 
						|
        rs.append(recordSet)
 | 
						|
        cname1 = CnameRecord(cname='cname.unit.test.')
 | 
						|
        recordSet = RecordSet(cname_record=cname1)
 | 
						|
        recordSet.name, recordSet.ttl, recordSet.type = 'cname1', 5, 'CNAME'
 | 
						|
        recordSet.target_resource = SubResource()
 | 
						|
        rs.append(recordSet)
 | 
						|
        recordSet = RecordSet(mx_records=[MxRecord(preference=10,
 | 
						|
                                                   exchange='mx1.unit.test.')])
 | 
						|
        recordSet.name, recordSet.ttl, recordSet.type = 'mx1', 7, 'MX'
 | 
						|
        rs.append(recordSet)
 | 
						|
        recordSet = RecordSet(mx_records=[MxRecord(preference=10,
 | 
						|
                                                   exchange='mx1.unit.test.'),
 | 
						|
                                          MxRecord(preference=11,
 | 
						|
                                                   exchange='mx2.unit.test.')])
 | 
						|
        recordSet.name, recordSet.ttl, recordSet.type = 'mx2', 8, 'MX'
 | 
						|
        rs.append(recordSet)
 | 
						|
        recordSet = RecordSet(ns_records=[NsRecord(nsdname='ns1.unit.test.')])
 | 
						|
        recordSet.name, recordSet.ttl, recordSet.type = 'ns1', 9, 'NS'
 | 
						|
        rs.append(recordSet)
 | 
						|
        recordSet = RecordSet(ns_records=[NsRecord(nsdname='ns1.unit.test.'),
 | 
						|
                                          NsRecord(nsdname='ns2.unit.test.')])
 | 
						|
        recordSet.name, recordSet.ttl, recordSet.type = 'ns2', 10, 'NS'
 | 
						|
        rs.append(recordSet)
 | 
						|
        ptr1 = PtrRecord(ptrdname='ptr1.unit.test.')
 | 
						|
        recordSet = RecordSet(ptr_records=[ptr1])
 | 
						|
        recordSet.name, recordSet.ttl, recordSet.type = 'ptr1', 11, 'PTR'
 | 
						|
        rs.append(recordSet)
 | 
						|
        recordSet = RecordSet(srv_records=[SrvRecord(priority=1,
 | 
						|
                                                     weight=2,
 | 
						|
                                                     port=3,
 | 
						|
                                                     target='1unit.tests.')])
 | 
						|
        recordSet.name, recordSet.ttl, recordSet.type = '_srv1._tcp', 13, 'SRV'
 | 
						|
        rs.append(recordSet)
 | 
						|
        recordSet = RecordSet(srv_records=[SrvRecord(priority=1,
 | 
						|
                                                     weight=2,
 | 
						|
                                                     port=3,
 | 
						|
                                                     target='1unit.tests.'),
 | 
						|
                                           SrvRecord(priority=4,
 | 
						|
                                                     weight=5,
 | 
						|
                                                     port=6,
 | 
						|
                                                     target='2unit.tests.')])
 | 
						|
        recordSet.name, recordSet.ttl, recordSet.type = '_srv2._tcp', 14, 'SRV'
 | 
						|
        rs.append(recordSet)
 | 
						|
        recordSet = RecordSet(txt_records=[TxtRecord(value='sample text1')])
 | 
						|
        recordSet.name, recordSet.ttl, recordSet.type = 'txt1', 15, 'TXT'
 | 
						|
        recordSet.target_resource = SubResource()
 | 
						|
        rs.append(recordSet)
 | 
						|
        recordSet = RecordSet(txt_records=[TxtRecord(value='sample text1'),
 | 
						|
                                           TxtRecord(value='sample text2')])
 | 
						|
        recordSet.name, recordSet.ttl, recordSet.type = 'txt2', 16, 'TXT'
 | 
						|
        recordSet.target_resource = SubResource()
 | 
						|
        rs.append(recordSet)
 | 
						|
        recordSet = RecordSet(soa_record=[SoaRecord()])
 | 
						|
        recordSet.name, recordSet.ttl, recordSet.type = '', 17, 'SOA'
 | 
						|
        rs.append(recordSet)
 | 
						|
        long_txt = "v=spf1 ip4:10.10.0.0/24 ip4:10.10.1.0/24 ip4:10.10.2.0/24"
 | 
						|
        long_txt += " ip4:10.10.3.0/24 ip4:10.10.4.0/24 ip4:10.10.5.0/24 "
 | 
						|
        long_txt += " 10.6.0/24 ip4:10.10.7.0/24 ip4:10.10.8.0/24 "
 | 
						|
        long_txt += " ip4:10.10.10.0/24 ip4:10.10.11.0/24 ip4:10.10.12.0/24"
 | 
						|
        long_txt += " ip4:10.10.13.0/24 ip4:10.10.14.0/24 ip4:10.10.15.0/24"
 | 
						|
        long_txt += " ip4:10.10.16.0/24 ip4:10.10.17.0/24 ip4:10.10.18.0/24"
 | 
						|
        long_txt += " ip4:10.10.19.0/24 ip4:10.10.20.0/24  ~all"
 | 
						|
        recordSet = RecordSet(txt_records=[TxtRecord(value='sample value1'),
 | 
						|
                                           TxtRecord(value=long_txt)])
 | 
						|
        recordSet.name, recordSet.ttl, recordSet.type = 'txt3', 18, 'TXT'
 | 
						|
        recordSet.target_resource = SubResource()
 | 
						|
        rs.append(recordSet)
 | 
						|
 | 
						|
        record_list = provider._dns_client.record_sets.list_by_dns_zone
 | 
						|
        record_list.return_value = rs
 | 
						|
 | 
						|
        zone_list = provider._dns_client.zones.list_by_resource_group
 | 
						|
        zone_list.return_value = [zone]
 | 
						|
 | 
						|
        exists = provider.populate(zone)
 | 
						|
 | 
						|
        self.assertEquals(len(zone.records), 17)
 | 
						|
        self.assertTrue(exists)
 | 
						|
 | 
						|
    def test_populate_dynamic(self):
 | 
						|
        # Middle east without Asia raises exception
 | 
						|
        provider, zone, record = self._get_dynamic_package()
 | 
						|
        tm_suffix = _traffic_manager_suffix(record)
 | 
						|
        tm_id = provider._profile_name_to_id
 | 
						|
        tm_list = provider._tm_client.profiles.list_by_resource_group
 | 
						|
        rule_name = 'rule-one--{}'.format(tm_suffix)
 | 
						|
        nested = 'Microsoft.Network/trafficManagerProfiles/nestedEndpoints'
 | 
						|
        tm_list.return_value = [
 | 
						|
            Profile(
 | 
						|
                id=tm_id(tm_suffix),
 | 
						|
                name=tm_suffix,
 | 
						|
                traffic_routing_method='Geographic',
 | 
						|
                endpoints=[
 | 
						|
                    Endpoint(
 | 
						|
                        geo_mapping=['GEO-ME'],
 | 
						|
                    ),
 | 
						|
                ],
 | 
						|
            ),
 | 
						|
        ]
 | 
						|
        azrecord = RecordSet(
 | 
						|
            ttl=60,
 | 
						|
            target_resource=SubResource(id=tm_id(tm_suffix)),
 | 
						|
        )
 | 
						|
        azrecord.name = record.name or '@'
 | 
						|
        azrecord.type = 'Microsoft.Network/dnszones/{}'.format(record._type)
 | 
						|
        with self.assertRaises(AzureException) as ctx:
 | 
						|
            provider._populate_record(zone, azrecord)
 | 
						|
            self.assertTrue(text_type(ctx).startswith(
 | 
						|
                'Middle East (GEO-ME) is not supported'
 | 
						|
            ))
 | 
						|
 | 
						|
        # empty priority profile raises exception
 | 
						|
        provider, zone, record = self._get_dynamic_package()
 | 
						|
        tm_list = provider._tm_client.profiles.list_by_resource_group
 | 
						|
        rule_name = 'rule-one--{}'.format(tm_suffix)
 | 
						|
        nested = 'Microsoft.Network/trafficManagerProfiles/nestedEndpoints'
 | 
						|
        tm_list.return_value = [
 | 
						|
            Profile(
 | 
						|
                id=tm_id(rule_name),
 | 
						|
                name=rule_name,
 | 
						|
                traffic_routing_method='Priority',
 | 
						|
                endpoints=[],
 | 
						|
            ),
 | 
						|
            Profile(
 | 
						|
                id=tm_id(tm_suffix),
 | 
						|
                name=tm_suffix,
 | 
						|
                traffic_routing_method='Geographic',
 | 
						|
                endpoints=[
 | 
						|
                    Endpoint(
 | 
						|
                        geo_mapping=['WORLD'],
 | 
						|
                        name='rule-one',
 | 
						|
                        type=nested,
 | 
						|
                        target_resource_id=tm_id(rule_name),
 | 
						|
                    ),
 | 
						|
                ],
 | 
						|
            ),
 | 
						|
        ]
 | 
						|
        with self.assertRaises(AzureException) as ctx:
 | 
						|
            provider._populate_record(zone, azrecord)
 | 
						|
            self.assertTrue(text_type(ctx).startswith(
 | 
						|
                'Expected at least 2 endpoints'
 | 
						|
            ))
 | 
						|
 | 
						|
        # valid set of profiles produce expected dynamic record
 | 
						|
        provider, zone, record = self._get_dynamic_package()
 | 
						|
        root_profile_id = provider._profile_name_to_id(
 | 
						|
            _traffic_manager_suffix(record)
 | 
						|
        )
 | 
						|
        azrecord = RecordSet(
 | 
						|
            ttl=60,
 | 
						|
            target_resource=SubResource(id=root_profile_id),
 | 
						|
        )
 | 
						|
        azrecord.name = record.name or '@'
 | 
						|
        azrecord.type = 'Microsoft.Network/dnszones/{}'.format(record._type)
 | 
						|
 | 
						|
        record = provider._populate_record(zone, azrecord)
 | 
						|
        self.assertEqual(record.name, 'foo')
 | 
						|
        self.assertEqual(record.ttl, 60)
 | 
						|
        self.assertEqual(record.value, 'default.unit.tests.')
 | 
						|
        self.assertEqual(record.dynamic._data(), {
 | 
						|
            'pools': {
 | 
						|
                'one': {
 | 
						|
                    'values': [
 | 
						|
                        {'value': 'one.unit.tests.', 'weight': 11},
 | 
						|
                    ],
 | 
						|
                    'fallback': 'two',
 | 
						|
                },
 | 
						|
                'two': {
 | 
						|
                    'values': [
 | 
						|
                        {'value': 'two1.unit.tests.', 'weight': 3},
 | 
						|
                        {'value': 'two2.unit.tests.', 'weight': 4},
 | 
						|
                    ],
 | 
						|
                    'fallback': 'three',
 | 
						|
                },
 | 
						|
                'three': {
 | 
						|
                    'values': [
 | 
						|
                        {'value': 'three.unit.tests.', 'weight': 13},
 | 
						|
                    ],
 | 
						|
                    'fallback': None,
 | 
						|
                },
 | 
						|
            },
 | 
						|
            'rules': [
 | 
						|
                {'geos': ['AF', 'EU-DE', 'NA-US-CA'], 'pool': 'one'},
 | 
						|
                {'pool': 'three'},
 | 
						|
            ],
 | 
						|
        })
 | 
						|
 | 
						|
        # valid profiles with Middle East test case
 | 
						|
        geo_profile = provider._get_tm_for_dynamic_record(record)
 | 
						|
        geo_profile.endpoints[0].geo_mapping.extend(['GEO-ME', 'GEO-AS'])
 | 
						|
        record = provider._populate_record(zone, azrecord)
 | 
						|
        self.assertIn('AS', record.dynamic.rules[0].data['geos'])
 | 
						|
        self.assertNotIn('ME', record.dynamic.rules[0].data['geos'])
 | 
						|
 | 
						|
    def test_populate_zone(self):
 | 
						|
        provider = self._get_provider()
 | 
						|
 | 
						|
        zone_list = provider._dns_client.zones.list_by_resource_group
 | 
						|
        zone_1 = AzureZone(location='global')
 | 
						|
        # This is far from ideal but the
 | 
						|
        # zone constructor doesn't let me set it on creation
 | 
						|
        zone_1.name = "zone-1"
 | 
						|
        zone_2 = AzureZone(location='global')
 | 
						|
        # This is far from ideal but the
 | 
						|
        # zone constructor doesn't let me set it on creation
 | 
						|
        zone_2.name = "zone-2"
 | 
						|
        zone_list.return_value = [zone_1,
 | 
						|
                                  zone_2,
 | 
						|
                                  zone_1]
 | 
						|
 | 
						|
        provider._populate_zones()
 | 
						|
 | 
						|
        # This should be returning two zones since two zones are the same
 | 
						|
        self.assertEquals(len(provider._azure_zones), 2)
 | 
						|
 | 
						|
    def test_bad_zone_response(self):
 | 
						|
        provider = self._get_provider()
 | 
						|
 | 
						|
        _get = provider._dns_client.zones.get
 | 
						|
        _get.side_effect = CloudError(Mock(status=404), 'Azure Error')
 | 
						|
        self.assertEquals(
 | 
						|
            provider._check_zone('unit.test', create=False),
 | 
						|
            None
 | 
						|
        )
 | 
						|
 | 
						|
    def test_extra_changes(self):
 | 
						|
        provider, existing, record = self._get_dynamic_package()
 | 
						|
 | 
						|
        # test simple records produce no extra changes
 | 
						|
        desired = Zone(name=existing.name, sub_zones=[])
 | 
						|
        desired.add_record(Record.new(desired, 'simple', data={
 | 
						|
            'type': record._type,
 | 
						|
            'ttl': record.ttl,
 | 
						|
            'value': record.value,
 | 
						|
        }))
 | 
						|
        extra = provider._extra_changes(desired, desired, [])
 | 
						|
        self.assertEqual(len(extra), 0)
 | 
						|
 | 
						|
        # test an unchanged dynamic record produces no extra changes
 | 
						|
        desired.add_record(record)
 | 
						|
        extra = provider._extra_changes(existing, desired, [])
 | 
						|
        self.assertEqual(len(extra), 0)
 | 
						|
 | 
						|
        # test unused TM produces the extra change for clean up
 | 
						|
        sample_profile = self._get_tm_profiles(provider)[0]
 | 
						|
        tm_id = provider._profile_name_to_id
 | 
						|
        root_profile_name = _traffic_manager_suffix(record)
 | 
						|
        extra_profile = Profile(
 | 
						|
            id=tm_id('random--{}'.format(root_profile_name)),
 | 
						|
            name='random--{}'.format(root_profile_name),
 | 
						|
            traffic_routing_method='Weighted',
 | 
						|
            dns_config=sample_profile.dns_config,
 | 
						|
            monitor_config=sample_profile.monitor_config,
 | 
						|
            endpoints=sample_profile.endpoints,
 | 
						|
        )
 | 
						|
        tm_list = provider._tm_client.profiles.list_by_resource_group
 | 
						|
        tm_list.return_value.append(extra_profile)
 | 
						|
        provider._populate_traffic_managers()
 | 
						|
        extra = provider._extra_changes(existing, desired, [])
 | 
						|
        self.assertEqual(len(extra), 1)
 | 
						|
        extra = extra[0]
 | 
						|
        self.assertIsInstance(extra, Update)
 | 
						|
        self.assertEqual(extra.new, record)
 | 
						|
        desired._remove_record(record)
 | 
						|
        tm_list.return_value.pop()
 | 
						|
 | 
						|
        # test new dynamic record does not produce an extra change for it
 | 
						|
        new_dynamic = Record.new(desired, record.name + '2', data={
 | 
						|
            'type': record._type,
 | 
						|
            'ttl': record.ttl,
 | 
						|
            'value': record.value,
 | 
						|
            'dynamic': record.dynamic._data(),
 | 
						|
            'octodns': record._octodns,
 | 
						|
        })
 | 
						|
        # test change in healthcheck by using a different port number
 | 
						|
        update_dynamic = Record.new(desired, record.name, data={
 | 
						|
            'type': record._type,
 | 
						|
            'ttl': record.ttl,
 | 
						|
            'value': record.value,
 | 
						|
            'dynamic': record.dynamic._data(),
 | 
						|
            'octodns': {
 | 
						|
                'healthcheck': {
 | 
						|
                    'path': '/_ping',
 | 
						|
                    'port': 443,
 | 
						|
                    'protocol': 'HTTPS',
 | 
						|
                },
 | 
						|
            },
 | 
						|
        })
 | 
						|
        desired.add_record(new_dynamic)
 | 
						|
        desired.add_record(update_dynamic)
 | 
						|
        changes = [Create(new_dynamic)]
 | 
						|
        extra = provider._extra_changes(existing, desired, changes)
 | 
						|
        # implicitly asserts that new_dynamic was not added to extra changes
 | 
						|
        # as it was already in the `changes` list
 | 
						|
        self.assertEqual(len(extra), 1)
 | 
						|
        extra = extra[0]
 | 
						|
        self.assertIsInstance(extra, Update)
 | 
						|
        self.assertEqual(extra.new, update_dynamic)
 | 
						|
 | 
						|
        # test non-CNAME dynamic record throws exception
 | 
						|
        a_dynamic = Record.new(desired, record.name + '3', data={
 | 
						|
            'type': 'A',
 | 
						|
            'ttl': record.ttl,
 | 
						|
            'values': ['1.1.1.1'],
 | 
						|
            'dynamic': {
 | 
						|
                'pools': {
 | 
						|
                    'one': {'values': [{'value': '2.2.2.2'}]},
 | 
						|
                },
 | 
						|
                'rules': [
 | 
						|
                    {'pool': 'one'},
 | 
						|
                ],
 | 
						|
            },
 | 
						|
        })
 | 
						|
        desired.add_record(a_dynamic)
 | 
						|
        changes.append(Create(a_dynamic))
 | 
						|
        with self.assertRaises(AzureException):
 | 
						|
            provider._extra_changes(existing, desired, changes)
 | 
						|
 | 
						|
    def test_generate_tm_profile(self):
 | 
						|
        provider, zone, record = self._get_dynamic_package()
 | 
						|
        profile_gen = provider._generate_tm_profile
 | 
						|
 | 
						|
        name = 'foobar'
 | 
						|
        routing = 'Priority'
 | 
						|
        endpoints = [
 | 
						|
            Endpoint(target='one.unit.tests'),
 | 
						|
            Endpoint(target_resource_id='/s/1/rg/foo/tm/foobar2'),
 | 
						|
            Endpoint(name='invalid'),
 | 
						|
        ]
 | 
						|
 | 
						|
        # invalid endpoint raises exception
 | 
						|
        with self.assertRaises(AzureException):
 | 
						|
            profile_gen(name, routing, endpoints, record)
 | 
						|
 | 
						|
        # regular test
 | 
						|
        endpoints.pop()
 | 
						|
        profile = profile_gen(name, routing, endpoints, record)
 | 
						|
 | 
						|
        # implicitly tests _profile_name_to_id
 | 
						|
        sub = provider._dns_client_subscription_id
 | 
						|
        rg = provider._resource_group
 | 
						|
        expected_id = '/subscriptions/' + sub + \
 | 
						|
            '/resourceGroups/' + rg + \
 | 
						|
            '/providers/Microsoft.Network/trafficManagerProfiles/' + name
 | 
						|
        self.assertEqual(profile.id, expected_id)
 | 
						|
        self.assertEqual(profile.name, name)
 | 
						|
        self.assertEqual(profile.traffic_routing_method, routing)
 | 
						|
        self.assertEqual(profile.dns_config.ttl, record.ttl)
 | 
						|
        self.assertEqual(len(profile.endpoints), len(endpoints))
 | 
						|
 | 
						|
        self.assertEqual(
 | 
						|
            profile.endpoints[0].type,
 | 
						|
            'Microsoft.Network/trafficManagerProfiles/externalEndpoints'
 | 
						|
        )
 | 
						|
        self.assertEqual(
 | 
						|
            profile.endpoints[1].type,
 | 
						|
            'Microsoft.Network/trafficManagerProfiles/nestedEndpoints'
 | 
						|
        )
 | 
						|
 | 
						|
    def test_generate_traffic_managers(self):
 | 
						|
        provider, zone, record = self._get_dynamic_package()
 | 
						|
        profiles = provider._generate_traffic_managers(record)
 | 
						|
        deduped = []
 | 
						|
        seen = set()
 | 
						|
        for profile in profiles:
 | 
						|
            if profile.name not in seen:
 | 
						|
                deduped.append(profile)
 | 
						|
                seen.add(profile.name)
 | 
						|
 | 
						|
        # check that every profile is a match with what we expect
 | 
						|
        expected_profiles = self._get_tm_profiles(provider)
 | 
						|
        self.assertEqual(len(expected_profiles), len(deduped))
 | 
						|
        for have, expected in zip(deduped, expected_profiles):
 | 
						|
            self.assertTrue(_profile_is_match(have, expected))
 | 
						|
 | 
						|
        # check Asia/Middle East test case
 | 
						|
        record.dynamic._data()['rules'][0]['geos'].append('AS')
 | 
						|
        profiles = provider._generate_traffic_managers(record)
 | 
						|
        geo_profile_name = _traffic_manager_suffix(record)
 | 
						|
        geo_profile = next(
 | 
						|
            profile
 | 
						|
            for profile in profiles
 | 
						|
            if profile.name == geo_profile_name
 | 
						|
        )
 | 
						|
        self.assertIn('GEO-ME', geo_profile.endpoints[0].geo_mapping)
 | 
						|
        self.assertIn('GEO-AS', geo_profile.endpoints[0].geo_mapping)
 | 
						|
 | 
						|
    def test_sync_traffic_managers(self):
 | 
						|
        provider, zone, record = self._get_dynamic_package()
 | 
						|
        provider._populate_traffic_managers()
 | 
						|
 | 
						|
        tm_sync = provider._tm_client.profiles.create_or_update
 | 
						|
 | 
						|
        suffix = 'foo-unit-tests'
 | 
						|
        expected_seen = {
 | 
						|
            suffix, 'default--{}'.format(suffix),
 | 
						|
            'rule-one--{}'.format(suffix), 'rule-three--{}'.format(suffix),
 | 
						|
            'pool-one--{}'.format(suffix), 'pool-two--{}'.format(suffix),
 | 
						|
            'pool-three--{}'.format(suffix),
 | 
						|
        }
 | 
						|
 | 
						|
        # test no change
 | 
						|
        seen = provider._sync_traffic_managers(record)
 | 
						|
        self.assertEqual(seen, expected_seen)
 | 
						|
        tm_sync.assert_not_called()
 | 
						|
 | 
						|
        # test that changing weight causes update API call
 | 
						|
        dynamic = record.dynamic._data()
 | 
						|
        dynamic['pools']['one']['values'][0]['weight'] = 14
 | 
						|
        data = {
 | 
						|
            'type': 'CNAME',
 | 
						|
            'ttl': record.ttl,
 | 
						|
            'value': record.value,
 | 
						|
            'dynamic': dynamic,
 | 
						|
            'octodns': record._octodns,
 | 
						|
        }
 | 
						|
        new_record = Record.new(zone, record.name, data)
 | 
						|
        tm_sync.reset_mock()
 | 
						|
        seen2 = provider._sync_traffic_managers(new_record)
 | 
						|
        self.assertEqual(seen2, expected_seen)
 | 
						|
        tm_sync.assert_called_once()
 | 
						|
 | 
						|
        # test that new profile was successfully inserted in cache
 | 
						|
        new_profile = provider._get_tm_profile_by_name(
 | 
						|
            'pool-one--{}'.format(suffix)
 | 
						|
        )
 | 
						|
        self.assertEqual(new_profile.endpoints[0].weight, 14)
 | 
						|
 | 
						|
    def test_find_traffic_managers(self):
 | 
						|
        provider, zone, record = self._get_dynamic_package()
 | 
						|
 | 
						|
        # insert a non-matching profile
 | 
						|
        sample_profile = self._get_tm_profiles(provider)[0]
 | 
						|
        # dummy record for generating suffix
 | 
						|
        record2 = Record.new(zone, record.name + '2', data={
 | 
						|
            'type': record._type,
 | 
						|
            'ttl': record.ttl,
 | 
						|
            'value': record.value,
 | 
						|
        })
 | 
						|
        suffix2 = _traffic_manager_suffix(record2)
 | 
						|
        tm_id = provider._profile_name_to_id
 | 
						|
        extra_profile = Profile(
 | 
						|
            id=tm_id('random--{}'.format(suffix2)),
 | 
						|
            name='random--{}'.format(suffix2),
 | 
						|
            traffic_routing_method='Weighted',
 | 
						|
            dns_config=sample_profile.dns_config,
 | 
						|
            monitor_config=sample_profile.monitor_config,
 | 
						|
            endpoints=sample_profile.endpoints,
 | 
						|
        )
 | 
						|
        tm_list = provider._tm_client.profiles.list_by_resource_group
 | 
						|
        tm_list.return_value.append(extra_profile)
 | 
						|
        provider._populate_traffic_managers()
 | 
						|
 | 
						|
        # implicitly asserts that non-matching profile is not included
 | 
						|
        suffix = _traffic_manager_suffix(record)
 | 
						|
        self.assertEqual(provider._find_traffic_managers(record), {
 | 
						|
            suffix, 'default--{}'.format(suffix),
 | 
						|
            'rule-one--{}'.format(suffix), 'rule-three--{}'.format(suffix),
 | 
						|
            'pool-one--{}'.format(suffix), 'pool-two--{}'.format(suffix),
 | 
						|
            'pool-three--{}'.format(suffix),
 | 
						|
        })
 | 
						|
 | 
						|
    def test_traffic_manager_gc(self):
 | 
						|
        provider, zone, record = self._get_dynamic_package()
 | 
						|
        provider._populate_traffic_managers()
 | 
						|
 | 
						|
        profiles = provider._find_traffic_managers(record)
 | 
						|
        profile_delete_mock = provider._tm_client.profiles.delete
 | 
						|
 | 
						|
        provider._traffic_managers_gc(record, profiles)
 | 
						|
        profile_delete_mock.assert_not_called()
 | 
						|
 | 
						|
        profile_delete_mock.reset_mock()
 | 
						|
        remove = list(profiles)[3]
 | 
						|
        profiles.discard(remove)
 | 
						|
 | 
						|
        provider._traffic_managers_gc(record, profiles)
 | 
						|
        profile_delete_mock.assert_has_calls(
 | 
						|
            [call(provider._resource_group, remove)]
 | 
						|
        )
 | 
						|
 | 
						|
    def test_apply(self):
 | 
						|
        provider = self._get_provider()
 | 
						|
 | 
						|
        half = int(len(octo_records) / 2)
 | 
						|
        changes = [Create(r) for r in octo_records[:half]] + \
 | 
						|
            [Update(r, r) for r in octo_records[half:]]
 | 
						|
        deletes = [Delete(r) for r in octo_records]
 | 
						|
 | 
						|
        self.assertEquals(19, provider.apply(Plan(None, zone,
 | 
						|
                                                  changes, True)))
 | 
						|
        self.assertEquals(19, provider.apply(Plan(zone, zone,
 | 
						|
                                                  deletes, True)))
 | 
						|
 | 
						|
    def test_apply_create_dynamic(self):
 | 
						|
        provider = self._get_provider()
 | 
						|
 | 
						|
        tm_list = provider._tm_client.profiles.list_by_resource_group
 | 
						|
        tm_list.return_value = []
 | 
						|
 | 
						|
        tm_sync = provider._tm_client.profiles.create_or_update
 | 
						|
 | 
						|
        zone = Zone(name='unit.tests.', sub_zones=[])
 | 
						|
        record = self._get_dynamic_record(zone)
 | 
						|
 | 
						|
        profiles = self._get_tm_profiles(provider)
 | 
						|
 | 
						|
        provider._apply_Create(Create(record))
 | 
						|
        # create was called as many times as number of profiles required for
 | 
						|
        # the dynamic record
 | 
						|
        self.assertEqual(tm_sync.call_count, len(profiles))
 | 
						|
 | 
						|
        create = provider._dns_client.record_sets.create_or_update
 | 
						|
        create.assert_called_once()
 | 
						|
 | 
						|
    def test_apply_update_dynamic(self):
 | 
						|
        # existing is simple, new is dynamic
 | 
						|
        provider = self._get_provider()
 | 
						|
        tm_list = provider._tm_client.profiles.list_by_resource_group
 | 
						|
        tm_list.return_value = []
 | 
						|
        profiles = self._get_tm_profiles(provider)
 | 
						|
        dynamic_record = self._get_dynamic_record(zone)
 | 
						|
        simple_record = Record.new(zone, dynamic_record.name, data={
 | 
						|
            'type': 'CNAME',
 | 
						|
            'ttl': 3600,
 | 
						|
            'value': 'cname.unit.tests.',
 | 
						|
        })
 | 
						|
        change = Update(simple_record, dynamic_record)
 | 
						|
        provider._apply_Update(change)
 | 
						|
        tm_sync, dns_update, tm_delete = (
 | 
						|
            provider._tm_client.profiles.create_or_update,
 | 
						|
            provider._dns_client.record_sets.create_or_update,
 | 
						|
            provider._tm_client.profiles.delete
 | 
						|
        )
 | 
						|
        self.assertEqual(tm_sync.call_count, len(profiles))
 | 
						|
        dns_update.assert_called_once()
 | 
						|
        tm_delete.assert_not_called()
 | 
						|
 | 
						|
        # existing is dynamic, new is simple
 | 
						|
        provider, existing, dynamic_record = self._get_dynamic_package()
 | 
						|
        profiles = self._get_tm_profiles(provider)
 | 
						|
        change = Update(dynamic_record, simple_record)
 | 
						|
        provider._apply_Update(change)
 | 
						|
        tm_sync, dns_update, tm_delete = (
 | 
						|
            provider._tm_client.profiles.create_or_update,
 | 
						|
            provider._dns_client.record_sets.create_or_update,
 | 
						|
            provider._tm_client.profiles.delete
 | 
						|
        )
 | 
						|
        tm_sync.assert_not_called()
 | 
						|
        dns_update.assert_called_once()
 | 
						|
        self.assertEqual(tm_delete.call_count, len(profiles))
 | 
						|
 | 
						|
        # both are dynamic, healthcheck port is changed
 | 
						|
        provider, existing, dynamic_record = self._get_dynamic_package()
 | 
						|
        profiles = self._get_tm_profiles(provider)
 | 
						|
        dynamic_record2 = self._get_dynamic_record(existing)
 | 
						|
        dynamic_record2._octodns['healthcheck']['port'] += 1
 | 
						|
        change = Update(dynamic_record, dynamic_record2)
 | 
						|
        provider._apply_Update(change)
 | 
						|
        tm_sync, dns_update, tm_delete = (
 | 
						|
            provider._tm_client.profiles.create_or_update,
 | 
						|
            provider._dns_client.record_sets.create_or_update,
 | 
						|
            provider._tm_client.profiles.delete
 | 
						|
        )
 | 
						|
        self.assertEqual(tm_sync.call_count, len(profiles))
 | 
						|
        dns_update.assert_not_called()
 | 
						|
        tm_delete.assert_not_called()
 | 
						|
 | 
						|
        # both are dynamic, extra profile should be deleted
 | 
						|
        provider, existing, dynamic_record = self._get_dynamic_package()
 | 
						|
        sample_profile = self._get_tm_profiles(provider)[0]
 | 
						|
        tm_id = provider._profile_name_to_id
 | 
						|
        root_profile_name = _traffic_manager_suffix(dynamic_record)
 | 
						|
        extra_profile = Profile(
 | 
						|
            id=tm_id('random--{}'.format(root_profile_name)),
 | 
						|
            name='random--{}'.format(root_profile_name),
 | 
						|
            traffic_routing_method='Weighted',
 | 
						|
            dns_config=sample_profile.dns_config,
 | 
						|
            monitor_config=sample_profile.monitor_config,
 | 
						|
            endpoints=sample_profile.endpoints,
 | 
						|
        )
 | 
						|
        tm_list = provider._tm_client.profiles.list_by_resource_group
 | 
						|
        tm_list.return_value.append(extra_profile)
 | 
						|
        change = Update(dynamic_record, dynamic_record)
 | 
						|
        provider._apply_Update(change)
 | 
						|
        tm_sync, dns_update, tm_delete = (
 | 
						|
            provider._tm_client.profiles.create_or_update,
 | 
						|
            provider._dns_client.record_sets.create_or_update,
 | 
						|
            provider._tm_client.profiles.delete
 | 
						|
        )
 | 
						|
        tm_sync.assert_not_called()
 | 
						|
        dns_update.assert_not_called()
 | 
						|
        tm_delete.assert_called_once()
 | 
						|
 | 
						|
    def test_apply_delete_dynamic(self):
 | 
						|
        provider, existing, record = self._get_dynamic_package()
 | 
						|
        provider._populate_traffic_managers()
 | 
						|
        profiles = self._get_tm_profiles(provider)
 | 
						|
        change = Delete(record)
 | 
						|
        provider._apply_Delete(change)
 | 
						|
        dns_delete, tm_delete = (
 | 
						|
            provider._dns_client.record_sets.delete,
 | 
						|
            provider._tm_client.profiles.delete
 | 
						|
        )
 | 
						|
        dns_delete.assert_called_once()
 | 
						|
        self.assertEqual(tm_delete.call_count, len(profiles))
 | 
						|
 | 
						|
    def test_create_zone(self):
 | 
						|
        provider = self._get_provider()
 | 
						|
 | 
						|
        changes = []
 | 
						|
        for i in octo_records:
 | 
						|
            changes.append(Create(i))
 | 
						|
        desired = Zone('unit2.test.', [])
 | 
						|
 | 
						|
        err_msg = 'The Resource \'Microsoft.Network/dnszones/unit2.test\' '
 | 
						|
        err_msg += 'under resource group \'mock_rg\' was not found.'
 | 
						|
        _get = provider._dns_client.zones.get
 | 
						|
        _get.side_effect = CloudError(Mock(status=404), err_msg)
 | 
						|
 | 
						|
        self.assertEquals(19, provider.apply(Plan(None, desired, changes,
 | 
						|
                                                  True)))
 | 
						|
 | 
						|
    def test_check_zone_no_create(self):
 | 
						|
        provider = self._get_provider()
 | 
						|
 | 
						|
        rs = []
 | 
						|
        recordSet = RecordSet(a_records=[ARecord(ipv4_address='1.1.1.1')])
 | 
						|
        recordSet.name, recordSet.ttl, recordSet.type = 'a1', 0, 'A'
 | 
						|
        rs.append(recordSet)
 | 
						|
        recordSet = RecordSet(a_records=[ARecord(ipv4_address='1.1.1.1'),
 | 
						|
                                         ARecord(ipv4_address='2.2.2.2')])
 | 
						|
        recordSet.name, recordSet.ttl, recordSet.type = 'a2', 1, 'A'
 | 
						|
        rs.append(recordSet)
 | 
						|
 | 
						|
        record_list = provider._dns_client.record_sets.list_by_dns_zone
 | 
						|
        record_list.return_value = rs
 | 
						|
 | 
						|
        err_msg = 'The Resource \'Microsoft.Network/dnszones/unit3.test\' '
 | 
						|
        err_msg += 'under resource group \'mock_rg\' was not found.'
 | 
						|
        _get = provider._dns_client.zones.get
 | 
						|
        _get.side_effect = CloudError(Mock(status=404), err_msg)
 | 
						|
 | 
						|
        exists = provider.populate(Zone('unit3.test.', []))
 | 
						|
        self.assertFalse(exists)
 | 
						|
 | 
						|
        self.assertEquals(len(zone.records), 0)
 |