mirror of
				https://github.com/github/octodns.git
				synced 2024-05-11 05:55:00 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			935 lines
		
	
	
		
			31 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			935 lines
		
	
	
		
			31 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#
 | 
						|
#
 | 
						|
#
 | 
						|
 | 
						|
from __future__ import (
 | 
						|
    absolute_import,
 | 
						|
    division,
 | 
						|
    print_function,
 | 
						|
    unicode_literals,
 | 
						|
)
 | 
						|
 | 
						|
from logging import getLogger
 | 
						|
from unittest import TestCase
 | 
						|
from unittest.mock import MagicMock, call
 | 
						|
 | 
						|
from octodns.processor.base import BaseProcessor
 | 
						|
from octodns.provider import SupportsException
 | 
						|
from octodns.provider.base import BaseProvider
 | 
						|
from octodns.provider.plan import Plan, UnsafePlan
 | 
						|
from octodns.record import Create, Delete, Record, Update
 | 
						|
from octodns.zone import Zone
 | 
						|
 | 
						|
 | 
						|
class HelperProvider(BaseProvider):
 | 
						|
    log = getLogger('HelperProvider')
 | 
						|
 | 
						|
    SUPPORTS = set(('A', 'NS', 'PTR'))
 | 
						|
    SUPPORTS_MULTIVALUE_PTR = False
 | 
						|
    SUPPORTS_DYNAMIC = False
 | 
						|
 | 
						|
    id = 'test'
 | 
						|
    strict_supports = False
 | 
						|
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        extra_changes=[],
 | 
						|
        apply_disabled=False,
 | 
						|
        include_change_callback=None,
 | 
						|
    ):
 | 
						|
        self.__extra_changes = extra_changes
 | 
						|
        self.apply_disabled = apply_disabled
 | 
						|
        self.include_change_callback = include_change_callback
 | 
						|
        self.update_pcent_threshold = Plan.MAX_SAFE_UPDATE_PCENT
 | 
						|
        self.delete_pcent_threshold = Plan.MAX_SAFE_DELETE_PCENT
 | 
						|
 | 
						|
    def populate(self, zone, target=False, lenient=False):
 | 
						|
        return True
 | 
						|
 | 
						|
    def _include_change(self, change):
 | 
						|
        return not self.include_change_callback or self.include_change_callback(
 | 
						|
            change
 | 
						|
        )
 | 
						|
 | 
						|
    def _extra_changes(self, **kwargs):
 | 
						|
        return self.__extra_changes
 | 
						|
 | 
						|
    def _apply(self, plan):
 | 
						|
        pass
 | 
						|
 | 
						|
 | 
						|
class TrickyProcessor(BaseProcessor):
 | 
						|
    def __init__(self, name, add_during_process_target_zone):
 | 
						|
        super(TrickyProcessor, self).__init__(name)
 | 
						|
        self.add_during_process_target_zone = add_during_process_target_zone
 | 
						|
        self.reset()
 | 
						|
 | 
						|
    def reset(self):
 | 
						|
        self.existing = None
 | 
						|
        self.target = None
 | 
						|
 | 
						|
    def process_target_zone(self, existing, target):
 | 
						|
        self.existing = existing
 | 
						|
        self.target = target
 | 
						|
 | 
						|
        new = existing.copy()
 | 
						|
        for record in existing.records:
 | 
						|
            new.add_record(record, replace=True)
 | 
						|
        for record in self.add_during_process_target_zone:
 | 
						|
            new.add_record(record, replace=True)
 | 
						|
        return new
 | 
						|
 | 
						|
 | 
						|
class TestBaseProvider(TestCase):
 | 
						|
    def test_base_provider(self):
 | 
						|
        with self.assertRaises(NotImplementedError) as ctx:
 | 
						|
            BaseProvider('base')
 | 
						|
        self.assertEqual(
 | 
						|
            'Abstract base class, log property missing', str(ctx.exception)
 | 
						|
        )
 | 
						|
 | 
						|
        class HasLog(BaseProvider):
 | 
						|
            log = getLogger('HasLog')
 | 
						|
 | 
						|
        with self.assertRaises(NotImplementedError) as ctx:
 | 
						|
            HasLog('haslog')
 | 
						|
        self.assertEqual(
 | 
						|
            'Abstract base class, SUPPORTS_GEO property missing',
 | 
						|
            str(ctx.exception),
 | 
						|
        )
 | 
						|
 | 
						|
        class HasSupportsGeo(HasLog):
 | 
						|
            SUPPORTS_GEO = False
 | 
						|
 | 
						|
        zone = Zone('unit.tests.', ['sub'])
 | 
						|
        with self.assertRaises(NotImplementedError) as ctx:
 | 
						|
            HasSupportsGeo('hassupportsgeo').populate(zone)
 | 
						|
        self.assertEqual(
 | 
						|
            'Abstract base class, SUPPORTS property missing', str(ctx.exception)
 | 
						|
        )
 | 
						|
 | 
						|
        class HasSupports(HasSupportsGeo):
 | 
						|
            SUPPORTS = set(('A',))
 | 
						|
 | 
						|
        with self.assertRaises(NotImplementedError) as ctx:
 | 
						|
            HasSupports('hassupports').populate(zone)
 | 
						|
        self.assertEqual(
 | 
						|
            'Abstract base class, populate method missing', str(ctx.exception)
 | 
						|
        )
 | 
						|
 | 
						|
        # SUPPORTS_DYNAMIC has a default/fallback
 | 
						|
        self.assertFalse(HasSupports('hassupports').SUPPORTS_DYNAMIC)
 | 
						|
 | 
						|
        # But can be overridden
 | 
						|
        class HasSupportsDyanmic(HasSupports):
 | 
						|
            SUPPORTS_DYNAMIC = True
 | 
						|
 | 
						|
        self.assertTrue(
 | 
						|
            HasSupportsDyanmic('hassupportsdynamic').SUPPORTS_DYNAMIC
 | 
						|
        )
 | 
						|
 | 
						|
        class HasPopulate(HasSupports):
 | 
						|
            def populate(self, zone, target=False, lenient=False):
 | 
						|
                zone.add_record(
 | 
						|
                    Record.new(
 | 
						|
                        zone, '', {'ttl': 60, 'type': 'A', 'value': '2.3.4.5'}
 | 
						|
                    ),
 | 
						|
                    lenient=lenient,
 | 
						|
                )
 | 
						|
                zone.add_record(
 | 
						|
                    Record.new(
 | 
						|
                        zone,
 | 
						|
                        'going',
 | 
						|
                        {'ttl': 60, 'type': 'A', 'value': '3.4.5.6'},
 | 
						|
                    ),
 | 
						|
                    lenient=lenient,
 | 
						|
                )
 | 
						|
                zone.add_record(
 | 
						|
                    Record.new(
 | 
						|
                        zone,
 | 
						|
                        'foo.sub',
 | 
						|
                        {'ttl': 61, 'type': 'A', 'value': '4.5.6.7'},
 | 
						|
                    ),
 | 
						|
                    lenient=lenient,
 | 
						|
                )
 | 
						|
 | 
						|
        zone.add_record(
 | 
						|
            Record.new(zone, '', {'ttl': 60, 'type': 'A', 'value': '1.2.3.4'})
 | 
						|
        )
 | 
						|
 | 
						|
        self.assertTrue(
 | 
						|
            HasSupports('hassupportsgeo').supports(list(zone.records)[0])
 | 
						|
        )
 | 
						|
 | 
						|
        plan = HasPopulate('haspopulate').plan(zone)
 | 
						|
        self.assertEqual(3, len(plan.changes))
 | 
						|
 | 
						|
        with self.assertRaises(NotImplementedError) as ctx:
 | 
						|
            HasPopulate('haspopulate').apply(plan)
 | 
						|
        self.assertEqual(
 | 
						|
            'Abstract base class, _apply method missing', str(ctx.exception)
 | 
						|
        )
 | 
						|
 | 
						|
    def test_plan(self):
 | 
						|
        ignored = Zone('unit.tests.', [])
 | 
						|
 | 
						|
        # No change, thus no plan
 | 
						|
        provider = HelperProvider([])
 | 
						|
        self.assertEqual(None, provider.plan(ignored))
 | 
						|
 | 
						|
        record = Record.new(
 | 
						|
            ignored, 'a', {'ttl': 30, 'type': 'A', 'value': '1.2.3.4'}
 | 
						|
        )
 | 
						|
        provider = HelperProvider([Create(record)])
 | 
						|
        plan = provider.plan(ignored)
 | 
						|
        self.assertTrue(plan)
 | 
						|
        self.assertEqual(1, len(plan.changes))
 | 
						|
 | 
						|
    def test_plan_with_process_desired_zone_kwarg_fallback(self):
 | 
						|
        ignored = Zone('unit.tests.', [])
 | 
						|
 | 
						|
        class OldApiProvider(HelperProvider):
 | 
						|
            def _process_desired_zone(self, desired):
 | 
						|
                return desired
 | 
						|
 | 
						|
        # No change, thus no plan
 | 
						|
        provider = OldApiProvider([])
 | 
						|
        self.assertEqual(None, provider.plan(ignored))
 | 
						|
 | 
						|
        class OtherTypeErrorProvider(HelperProvider):
 | 
						|
            def _process_desired_zone(self, desired, exists=False):
 | 
						|
                raise TypeError('foo')
 | 
						|
 | 
						|
        provider = OtherTypeErrorProvider([])
 | 
						|
        with self.assertRaises(TypeError) as ctx:
 | 
						|
            provider.plan(ignored)
 | 
						|
        self.assertEqual('foo', str(ctx.exception))
 | 
						|
 | 
						|
    def test_plan_with_unsupported_type(self):
 | 
						|
        zone = Zone('unit.tests.', [])
 | 
						|
 | 
						|
        # supported
 | 
						|
        supported = Record.new(
 | 
						|
            zone, 'a', {'ttl': 30, 'type': 'A', 'value': '1.2.3.4'}
 | 
						|
        )
 | 
						|
        zone.add_record(supported)
 | 
						|
        # not supported
 | 
						|
        not_supported = Record.new(
 | 
						|
            zone,
 | 
						|
            'aaaa',
 | 
						|
            {
 | 
						|
                'ttl': 30,
 | 
						|
                'type': 'AAAA',
 | 
						|
                'value': '2601:644:500:e210:62f8:1dff:feb8:947a',
 | 
						|
            },
 | 
						|
        )
 | 
						|
        zone.add_record(not_supported)
 | 
						|
        provider = HelperProvider()
 | 
						|
        plan = provider.plan(zone)
 | 
						|
        self.assertTrue(plan)
 | 
						|
        self.assertEqual(1, len(plan.changes))
 | 
						|
        self.assertEqual(supported, plan.changes[0].new)
 | 
						|
 | 
						|
    def test_plan_with_processors(self):
 | 
						|
        zone = Zone('unit.tests.', [])
 | 
						|
 | 
						|
        record = Record.new(
 | 
						|
            zone, 'a', {'ttl': 30, 'type': 'A', 'value': '1.2.3.4'}
 | 
						|
        )
 | 
						|
        provider = HelperProvider()
 | 
						|
        # Processor that adds a record to the zone, which planning will then
 | 
						|
        # delete since it won't know anything about it
 | 
						|
        tricky = TrickyProcessor('tricky', [record])
 | 
						|
        plan = provider.plan(zone, processors=[tricky])
 | 
						|
        self.assertTrue(plan)
 | 
						|
        self.assertEqual(1, len(plan.changes))
 | 
						|
        self.assertIsInstance(plan.changes[0], Delete)
 | 
						|
        # Called processor stored its params
 | 
						|
        self.assertTrue(tricky.existing)
 | 
						|
        self.assertEqual(zone.name, tricky.existing.name)
 | 
						|
 | 
						|
        # Chain of processors happen one after the other
 | 
						|
        other = Record.new(
 | 
						|
            zone, 'b', {'ttl': 30, 'type': 'A', 'value': '5.6.7.8'}
 | 
						|
        )
 | 
						|
        # Another processor will add its record, thus 2 deletes
 | 
						|
        another = TrickyProcessor('tricky', [other])
 | 
						|
        plan = provider.plan(zone, processors=[tricky, another])
 | 
						|
        self.assertTrue(plan)
 | 
						|
        self.assertEqual(2, len(plan.changes))
 | 
						|
        self.assertIsInstance(plan.changes[0], Delete)
 | 
						|
        self.assertIsInstance(plan.changes[1], Delete)
 | 
						|
        # 2nd processor stored its params, and we'll see the record the
 | 
						|
        # first one added
 | 
						|
        self.assertTrue(another.existing)
 | 
						|
        self.assertEqual(zone.name, another.existing.name)
 | 
						|
        self.assertEqual(1, len(another.existing.records))
 | 
						|
 | 
						|
    def test_plan_with_root_ns(self):
 | 
						|
        zone = Zone('unit.tests.', [])
 | 
						|
        record = Record.new(
 | 
						|
            zone, '', {'ttl': 30, 'type': 'NS', 'value': '1.2.3.4.'}
 | 
						|
        )
 | 
						|
        zone.add_record(record)
 | 
						|
 | 
						|
        # No root NS support, no change, thus no plan
 | 
						|
        provider = HelperProvider()
 | 
						|
        self.assertEqual(None, provider.plan(zone))
 | 
						|
 | 
						|
        # set Support root NS records, see the record
 | 
						|
        provider.SUPPORTS_ROOT_NS = True
 | 
						|
        plan = provider.plan(zone)
 | 
						|
        self.assertTrue(plan)
 | 
						|
        self.assertEqual(1, len(plan.changes))
 | 
						|
 | 
						|
    def test_apply(self):
 | 
						|
        ignored = Zone('unit.tests.', [])
 | 
						|
 | 
						|
        record = Record.new(
 | 
						|
            ignored, 'a', {'ttl': 30, 'type': 'A', 'value': '1.2.3.4'}
 | 
						|
        )
 | 
						|
        provider = HelperProvider([Create(record)], apply_disabled=True)
 | 
						|
        plan = provider.plan(ignored)
 | 
						|
        provider.apply(plan)
 | 
						|
 | 
						|
        provider.apply_disabled = False
 | 
						|
        self.assertEqual(1, provider.apply(plan))
 | 
						|
 | 
						|
    def test_include_change(self):
 | 
						|
        zone = Zone('unit.tests.', [])
 | 
						|
 | 
						|
        record = Record.new(
 | 
						|
            zone, 'a', {'ttl': 30, 'type': 'A', 'value': '1.2.3.4'}
 | 
						|
        )
 | 
						|
        zone.add_record(record)
 | 
						|
        provider = HelperProvider([], include_change_callback=lambda c: False)
 | 
						|
        plan = provider.plan(zone)
 | 
						|
        # We filtered out the only change
 | 
						|
        self.assertFalse(plan)
 | 
						|
 | 
						|
    def test_plan_order_of_operations(self):
 | 
						|
        class MockProvider(BaseProvider):
 | 
						|
            log = getLogger('mock-provider')
 | 
						|
            SUPPORTS = set(('A',))
 | 
						|
            SUPPORTS_GEO = False
 | 
						|
 | 
						|
            def __init__(self):
 | 
						|
                super().__init__('mock-provider')
 | 
						|
                self.calls = []
 | 
						|
 | 
						|
            def populate(self, *args, **kwargs):
 | 
						|
                self.calls.append('populate')
 | 
						|
 | 
						|
            def _process_desired_zone(self, *args, **kwargs):
 | 
						|
                self.calls.append('_process_desired_zone')
 | 
						|
                return super()._process_desired_zone(*args, **kwargs)
 | 
						|
 | 
						|
            def _process_existing_zone(self, *args, **kwargs):
 | 
						|
                self.calls.append('_process_existing_zone')
 | 
						|
                return super()._process_existing_zone(*args, **kwargs)
 | 
						|
 | 
						|
        provider = MockProvider()
 | 
						|
 | 
						|
        zone = Zone('unit.tests.', [])
 | 
						|
        self.assertFalse(provider.plan(zone))
 | 
						|
        # ensure the calls were made in the expected order, populate comes
 | 
						|
        # first, then desired, then existing
 | 
						|
        self.assertEqual(
 | 
						|
            ['populate', '_process_desired_zone', '_process_existing_zone'],
 | 
						|
            provider.calls,
 | 
						|
        )
 | 
						|
 | 
						|
    def test_process_desired_zone(self):
 | 
						|
        provider = HelperProvider('test')
 | 
						|
 | 
						|
        # SUPPORTS_MULTIVALUE_PTR
 | 
						|
        provider.SUPPORTS_MULTIVALUE_PTR = False
 | 
						|
        zone1 = Zone('unit.tests.', [])
 | 
						|
        record1 = Record.new(
 | 
						|
            zone1,
 | 
						|
            'ptr',
 | 
						|
            {'type': 'PTR', 'ttl': 3600, 'values': ['foo.com.', 'bar.com.']},
 | 
						|
        )
 | 
						|
        zone1.add_record(record1)
 | 
						|
 | 
						|
        zone2 = provider._process_desired_zone(zone1.copy())
 | 
						|
        record2 = list(zone2.records)[0]
 | 
						|
        self.assertEqual(len(record2.values), 1)
 | 
						|
 | 
						|
        provider.SUPPORTS_MULTIVALUE_PTR = True
 | 
						|
        zone2 = provider._process_desired_zone(zone1.copy())
 | 
						|
        record2 = list(zone2.records)[0]
 | 
						|
        self.assertEqual(len(record2.values), 2)
 | 
						|
 | 
						|
        # SUPPORTS_DYNAMIC
 | 
						|
        provider.SUPPORTS_DYNAMIC = False
 | 
						|
        zone1 = Zone('unit.tests.', [])
 | 
						|
        record1 = Record.new(
 | 
						|
            zone1,
 | 
						|
            'a',
 | 
						|
            {
 | 
						|
                'dynamic': {
 | 
						|
                    'pools': {'one': {'values': [{'value': '1.1.1.1'}]}},
 | 
						|
                    'rules': [{'pool': 'one'}],
 | 
						|
                },
 | 
						|
                'type': 'A',
 | 
						|
                'ttl': 3600,
 | 
						|
                'values': ['2.2.2.2'],
 | 
						|
            },
 | 
						|
        )
 | 
						|
        self.assertTrue(record1.dynamic)
 | 
						|
        zone1.add_record(record1)
 | 
						|
 | 
						|
        zone2 = provider._process_desired_zone(zone1.copy())
 | 
						|
        record2 = list(zone2.records)[0]
 | 
						|
        self.assertFalse(record2.dynamic)
 | 
						|
 | 
						|
        provider.SUPPORTS_DYNAMIC = True
 | 
						|
        zone2 = provider._process_desired_zone(zone1.copy())
 | 
						|
        record2 = list(zone2.records)[0]
 | 
						|
        self.assertTrue(record2.dynamic)
 | 
						|
 | 
						|
        # SUPPORTS_POOL_VALUE_STATUS
 | 
						|
        provider.SUPPORTS_POOL_VALUE_STATUS = False
 | 
						|
        zone1 = Zone('unit.tests.', [])
 | 
						|
        record1.dynamic.pools['one'].data['values'][0]['status'] = 'up'
 | 
						|
        zone1.add_record(record1)
 | 
						|
 | 
						|
        zone2 = provider._process_desired_zone(zone1.copy())
 | 
						|
        record2 = list(zone2.records)[0]
 | 
						|
        self.assertEqual(
 | 
						|
            record2.dynamic.pools['one'].data['values'][0]['status'], 'obey'
 | 
						|
        )
 | 
						|
 | 
						|
        # SUPPORTS_ROOT_NS
 | 
						|
        provider.SUPPORTS_ROOT_NS = False
 | 
						|
        zone1 = Zone('unit.tests.', [])
 | 
						|
        record1 = Record.new(
 | 
						|
            zone1,
 | 
						|
            '',
 | 
						|
            {'type': 'NS', 'ttl': 3600, 'values': ['foo.com.', 'bar.com.']},
 | 
						|
        )
 | 
						|
        zone1.add_record(record1)
 | 
						|
 | 
						|
        zone2 = provider._process_desired_zone(zone1.copy())
 | 
						|
        self.assertEqual(0, len(zone2.records))
 | 
						|
 | 
						|
        provider.SUPPORTS_ROOT_NS = True
 | 
						|
        zone2 = provider._process_desired_zone(zone1.copy())
 | 
						|
        self.assertEqual(1, len(zone2.records))
 | 
						|
        self.assertEqual(record1, list(zone2.records)[0])
 | 
						|
 | 
						|
    def test_process_existing_zone(self):
 | 
						|
        provider = HelperProvider('test')
 | 
						|
 | 
						|
        # SUPPORTS_ROOT_NS
 | 
						|
        provider.SUPPORTS_ROOT_NS = False
 | 
						|
        zone1 = Zone('unit.tests.', [])
 | 
						|
        record1 = Record.new(
 | 
						|
            zone1,
 | 
						|
            '',
 | 
						|
            {'type': 'NS', 'ttl': 3600, 'values': ['foo.com.', 'bar.com.']},
 | 
						|
        )
 | 
						|
        zone1.add_record(record1)
 | 
						|
 | 
						|
        zone2 = provider._process_existing_zone(zone1.copy(), zone1)
 | 
						|
        self.assertEqual(0, len(zone2.records))
 | 
						|
 | 
						|
        provider.SUPPORTS_ROOT_NS = True
 | 
						|
        zone2 = provider._process_existing_zone(zone1.copy(), zone1)
 | 
						|
        self.assertEqual(1, len(zone2.records))
 | 
						|
        self.assertEqual(record1, list(zone2.records)[0])
 | 
						|
 | 
						|
    def test_safe_none(self):
 | 
						|
        # No changes is safe
 | 
						|
        Plan(None, None, [], True).raise_if_unsafe()
 | 
						|
 | 
						|
    def test_safe_creates(self):
 | 
						|
        # Creates are safe when existing records is under MIN_EXISTING_RECORDS
 | 
						|
        zone = Zone('unit.tests.', [])
 | 
						|
 | 
						|
        record = Record.new(
 | 
						|
            zone, 'a', {'ttl': 30, 'type': 'A', 'value': '1.2.3.4'}
 | 
						|
        )
 | 
						|
        Plan(
 | 
						|
            zone, zone, [Create(record) for i in range(10)], True
 | 
						|
        ).raise_if_unsafe()
 | 
						|
 | 
						|
    def test_safe_min_existing_creates(self):
 | 
						|
        # Creates are safe when existing records is over MIN_EXISTING_RECORDS
 | 
						|
        zone = Zone('unit.tests.', [])
 | 
						|
 | 
						|
        record = Record.new(
 | 
						|
            zone, 'a', {'ttl': 30, 'type': 'A', 'value': '1.2.3.4'}
 | 
						|
        )
 | 
						|
 | 
						|
        for i in range(int(Plan.MIN_EXISTING_RECORDS)):
 | 
						|
            zone.add_record(
 | 
						|
                Record.new(
 | 
						|
                    zone, str(i), {'ttl': 60, 'type': 'A', 'value': '2.3.4.5'}
 | 
						|
                )
 | 
						|
            )
 | 
						|
 | 
						|
        Plan(
 | 
						|
            zone, zone, [Create(record) for i in range(10)], True
 | 
						|
        ).raise_if_unsafe()
 | 
						|
 | 
						|
    def test_safe_no_existing(self):
 | 
						|
        # existing records fewer than MIN_EXISTING_RECORDS is safe
 | 
						|
        zone = Zone('unit.tests.', [])
 | 
						|
        record = Record.new(
 | 
						|
            zone, 'a', {'ttl': 30, 'type': 'A', 'value': '1.2.3.4'}
 | 
						|
        )
 | 
						|
 | 
						|
        updates = [Update(record, record), Update(record, record)]
 | 
						|
        Plan(zone, zone, updates, True).raise_if_unsafe()
 | 
						|
 | 
						|
    def test_safe_updates_min_existing(self):
 | 
						|
        # MAX_SAFE_UPDATE_PCENT+1 fails when more
 | 
						|
        # than MIN_EXISTING_RECORDS exist
 | 
						|
        zone = Zone('unit.tests.', [])
 | 
						|
        record = Record.new(
 | 
						|
            zone, 'a', {'ttl': 30, 'type': 'A', 'value': '1.2.3.4'}
 | 
						|
        )
 | 
						|
 | 
						|
        for i in range(int(Plan.MIN_EXISTING_RECORDS)):
 | 
						|
            zone.add_record(
 | 
						|
                Record.new(
 | 
						|
                    zone, str(i), {'ttl': 60, 'type': 'A', 'value': '2.3.4.5'}
 | 
						|
                )
 | 
						|
            )
 | 
						|
 | 
						|
        changes = [
 | 
						|
            Update(record, record)
 | 
						|
            for i in range(
 | 
						|
                int(Plan.MIN_EXISTING_RECORDS * Plan.MAX_SAFE_UPDATE_PCENT) + 1
 | 
						|
            )
 | 
						|
        ]
 | 
						|
 | 
						|
        with self.assertRaises(UnsafePlan) as ctx:
 | 
						|
            Plan(zone, zone, changes, True).raise_if_unsafe()
 | 
						|
 | 
						|
        self.assertTrue('Too many updates' in str(ctx.exception))
 | 
						|
 | 
						|
    def test_safe_updates_min_existing_pcent(self):
 | 
						|
        # MAX_SAFE_UPDATE_PCENT is safe when more
 | 
						|
        # than MIN_EXISTING_RECORDS exist
 | 
						|
        zone = Zone('unit.tests.', [])
 | 
						|
        record = Record.new(
 | 
						|
            zone, 'a', {'ttl': 30, 'type': 'A', 'value': '1.2.3.4'}
 | 
						|
        )
 | 
						|
 | 
						|
        for i in range(int(Plan.MIN_EXISTING_RECORDS)):
 | 
						|
            zone.add_record(
 | 
						|
                Record.new(
 | 
						|
                    zone, str(i), {'ttl': 60, 'type': 'A', 'value': '2.3.4.5'}
 | 
						|
                )
 | 
						|
            )
 | 
						|
        changes = [
 | 
						|
            Update(record, record)
 | 
						|
            for i in range(
 | 
						|
                int(Plan.MIN_EXISTING_RECORDS * Plan.MAX_SAFE_UPDATE_PCENT)
 | 
						|
            )
 | 
						|
        ]
 | 
						|
 | 
						|
        Plan(zone, zone, changes, True).raise_if_unsafe()
 | 
						|
 | 
						|
    def test_safe_deletes_min_existing(self):
 | 
						|
        # MAX_SAFE_DELETE_PCENT+1 fails when more
 | 
						|
        # than MIN_EXISTING_RECORDS exist
 | 
						|
        zone = Zone('unit.tests.', [])
 | 
						|
        record = Record.new(
 | 
						|
            zone, 'a', {'ttl': 30, 'type': 'A', 'value': '1.2.3.4'}
 | 
						|
        )
 | 
						|
 | 
						|
        for i in range(int(Plan.MIN_EXISTING_RECORDS)):
 | 
						|
            zone.add_record(
 | 
						|
                Record.new(
 | 
						|
                    zone, str(i), {'ttl': 60, 'type': 'A', 'value': '2.3.4.5'}
 | 
						|
                )
 | 
						|
            )
 | 
						|
 | 
						|
        changes = [
 | 
						|
            Delete(record)
 | 
						|
            for i in range(
 | 
						|
                int(Plan.MIN_EXISTING_RECORDS * Plan.MAX_SAFE_DELETE_PCENT) + 1
 | 
						|
            )
 | 
						|
        ]
 | 
						|
 | 
						|
        with self.assertRaises(UnsafePlan) as ctx:
 | 
						|
            Plan(zone, zone, changes, True).raise_if_unsafe()
 | 
						|
 | 
						|
        self.assertTrue('Too many deletes' in str(ctx.exception))
 | 
						|
 | 
						|
    def test_safe_deletes_min_existing_pcent(self):
 | 
						|
        # MAX_SAFE_DELETE_PCENT is safe when more
 | 
						|
        # than MIN_EXISTING_RECORDS exist
 | 
						|
        zone = Zone('unit.tests.', [])
 | 
						|
        record = Record.new(
 | 
						|
            zone, 'a', {'ttl': 30, 'type': 'A', 'value': '1.2.3.4'}
 | 
						|
        )
 | 
						|
 | 
						|
        for i in range(int(Plan.MIN_EXISTING_RECORDS)):
 | 
						|
            zone.add_record(
 | 
						|
                Record.new(
 | 
						|
                    zone, str(i), {'ttl': 60, 'type': 'A', 'value': '2.3.4.5'}
 | 
						|
                )
 | 
						|
            )
 | 
						|
        changes = [
 | 
						|
            Delete(record)
 | 
						|
            for i in range(
 | 
						|
                int(Plan.MIN_EXISTING_RECORDS * Plan.MAX_SAFE_DELETE_PCENT)
 | 
						|
            )
 | 
						|
        ]
 | 
						|
 | 
						|
        Plan(zone, zone, changes, True).raise_if_unsafe()
 | 
						|
 | 
						|
    def test_safe_updates_min_existing_override(self):
 | 
						|
        safe_pcent = 0.4
 | 
						|
        # 40% + 1 fails when more
 | 
						|
        # than MIN_EXISTING_RECORDS exist
 | 
						|
        zone = Zone('unit.tests.', [])
 | 
						|
        record = Record.new(
 | 
						|
            zone, 'a', {'ttl': 30, 'type': 'A', 'value': '1.2.3.4'}
 | 
						|
        )
 | 
						|
 | 
						|
        for i in range(int(Plan.MIN_EXISTING_RECORDS)):
 | 
						|
            zone.add_record(
 | 
						|
                Record.new(
 | 
						|
                    zone, str(i), {'ttl': 60, 'type': 'A', 'value': '2.3.4.5'}
 | 
						|
                )
 | 
						|
            )
 | 
						|
 | 
						|
        changes = [
 | 
						|
            Update(record, record)
 | 
						|
            for i in range(int(Plan.MIN_EXISTING_RECORDS * safe_pcent) + 1)
 | 
						|
        ]
 | 
						|
 | 
						|
        with self.assertRaises(UnsafePlan) as ctx:
 | 
						|
            Plan(
 | 
						|
                zone, zone, changes, True, update_pcent_threshold=safe_pcent
 | 
						|
            ).raise_if_unsafe()
 | 
						|
 | 
						|
        self.assertTrue('Too many updates' in str(ctx.exception))
 | 
						|
 | 
						|
    def test_safe_deletes_min_existing_override(self):
 | 
						|
        safe_pcent = 0.4
 | 
						|
        # 40% + 1 fails when more
 | 
						|
        # than MIN_EXISTING_RECORDS exist
 | 
						|
        zone = Zone('unit.tests.', [])
 | 
						|
        record = Record.new(
 | 
						|
            zone, 'a', {'ttl': 30, 'type': 'A', 'value': '1.2.3.4'}
 | 
						|
        )
 | 
						|
 | 
						|
        for i in range(int(Plan.MIN_EXISTING_RECORDS)):
 | 
						|
            zone.add_record(
 | 
						|
                Record.new(
 | 
						|
                    zone, str(i), {'ttl': 60, 'type': 'A', 'value': '2.3.4.5'}
 | 
						|
                )
 | 
						|
            )
 | 
						|
 | 
						|
        changes = [
 | 
						|
            Delete(record)
 | 
						|
            for i in range(int(Plan.MIN_EXISTING_RECORDS * safe_pcent) + 1)
 | 
						|
        ]
 | 
						|
 | 
						|
        with self.assertRaises(UnsafePlan) as ctx:
 | 
						|
            Plan(
 | 
						|
                zone, zone, changes, True, delete_pcent_threshold=safe_pcent
 | 
						|
            ).raise_if_unsafe()
 | 
						|
 | 
						|
        self.assertTrue('Too many deletes' in str(ctx.exception))
 | 
						|
 | 
						|
    def test_supports_warn_or_except(self):
 | 
						|
        class MinimalProvider(BaseProvider):
 | 
						|
            SUPPORTS = set()
 | 
						|
            SUPPORTS_GEO = False
 | 
						|
 | 
						|
            def __init__(self, **kwargs):
 | 
						|
                self.log = MagicMock()
 | 
						|
                super(MinimalProvider, self).__init__('minimal', **kwargs)
 | 
						|
 | 
						|
        normal = MinimalProvider(strict_supports=False)
 | 
						|
        # Should log and not expect
 | 
						|
        normal.supports_warn_or_except('Hello World!', 'Goodbye')
 | 
						|
        normal.log.warning.assert_called_once()
 | 
						|
        normal.log.warning.assert_has_calls(
 | 
						|
            [call('%s; %s', 'Hello World!', 'Goodbye')]
 | 
						|
        )
 | 
						|
 | 
						|
        strict = MinimalProvider(strict_supports=True)
 | 
						|
        # Should log and not expect
 | 
						|
        with self.assertRaises(SupportsException) as ctx:
 | 
						|
            strict.supports_warn_or_except('Hello World!', 'Will not see')
 | 
						|
        self.assertEqual('minimal: Hello World!', str(ctx.exception))
 | 
						|
        strict.log.warning.assert_not_called()
 | 
						|
 | 
						|
 | 
						|
class TestBaseProviderSupportsRootNs(TestCase):
 | 
						|
    class Provider(BaseProvider):
 | 
						|
        log = getLogger('Provider')
 | 
						|
 | 
						|
        SUPPORTS = set(('A', 'NS'))
 | 
						|
        SUPPORTS_GEO = False
 | 
						|
        SUPPORTS_ROOT_NS = False
 | 
						|
 | 
						|
        strict_supports = False
 | 
						|
 | 
						|
        def __init__(self, existing=None):
 | 
						|
            super().__init__('test')
 | 
						|
            self.existing = existing
 | 
						|
 | 
						|
        def populate(self, zone, target=False, lenient=False):
 | 
						|
            if self.existing:
 | 
						|
                for record in self.existing.records:
 | 
						|
                    zone.add_record(record)
 | 
						|
                return True
 | 
						|
            return False
 | 
						|
 | 
						|
    zone = Zone('unit.tests.', [])
 | 
						|
    a_record = Record.new(
 | 
						|
        zone,
 | 
						|
        'ptr',
 | 
						|
        {'type': 'A', 'ttl': 3600, 'values': ['1.2.3.4', '2.3.4.5']},
 | 
						|
    )
 | 
						|
    ns_record = Record.new(
 | 
						|
        zone,
 | 
						|
        'sub',
 | 
						|
        {'type': 'NS', 'ttl': 3600, 'values': ['ns2.foo.com.', 'ns2.bar.com.']},
 | 
						|
    )
 | 
						|
    no_root = zone.copy()
 | 
						|
    no_root.add_record(a_record)
 | 
						|
    no_root.add_record(ns_record)
 | 
						|
 | 
						|
    root_ns_record = Record.new(
 | 
						|
        zone,
 | 
						|
        '',
 | 
						|
        {'type': 'NS', 'ttl': 3600, 'values': ['ns1.foo.com.', 'ns1.bar.com.']},
 | 
						|
    )
 | 
						|
    has_root = no_root.copy()
 | 
						|
    has_root.add_record(root_ns_record)
 | 
						|
 | 
						|
    other_root_ns_record = Record.new(
 | 
						|
        zone,
 | 
						|
        '',
 | 
						|
        {'type': 'NS', 'ttl': 3600, 'values': ['ns4.foo.com.', 'ns4.bar.com.']},
 | 
						|
    )
 | 
						|
    different_root = no_root.copy()
 | 
						|
    different_root.add_record(other_root_ns_record)
 | 
						|
 | 
						|
    # False
 | 
						|
 | 
						|
    def test_supports_root_ns_false_matches(self):
 | 
						|
        # provider has a matching existing root record
 | 
						|
        provider = self.Provider(self.has_root)
 | 
						|
        provider.SUPPORTS_ROOT_NS = False
 | 
						|
 | 
						|
        # matching root NS in the desired
 | 
						|
        plan = provider.plan(self.has_root)
 | 
						|
 | 
						|
        # no root ns upport on the target provider so doesn't matter, still no
 | 
						|
        # changes
 | 
						|
        self.assertFalse(plan)
 | 
						|
 | 
						|
        # plan again with strict_supports enabled, we should get an exception
 | 
						|
        # b/c we have something configured that can't be managed
 | 
						|
        provider.strict_supports = True
 | 
						|
        with self.assertRaises(SupportsException) as ctx:
 | 
						|
            provider.plan(self.has_root)
 | 
						|
        self.assertEqual(
 | 
						|
            'test: root NS record not supported for unit.tests.',
 | 
						|
            str(ctx.exception),
 | 
						|
        )
 | 
						|
 | 
						|
    def test_supports_root_ns_false_different(self):
 | 
						|
        # provider has a non-matching existing record
 | 
						|
        provider = self.Provider(self.different_root)
 | 
						|
        provider.SUPPORTS_ROOT_NS = False
 | 
						|
 | 
						|
        # different root is in the desired
 | 
						|
        plan = provider.plan(self.has_root)
 | 
						|
 | 
						|
        # the mismatch doesn't matter since we can't manage the records
 | 
						|
        # anyway, they will have been removed from the desired and existing.
 | 
						|
        self.assertFalse(plan)
 | 
						|
 | 
						|
        # plan again with strict_supports enabled, we should get an exception
 | 
						|
        # b/c we have something configured that can't be managed (doesn't
 | 
						|
        # matter that it's a mismatch)
 | 
						|
        provider.strict_supports = True
 | 
						|
        with self.assertRaises(SupportsException) as ctx:
 | 
						|
            provider.plan(self.has_root)
 | 
						|
        self.assertEqual(
 | 
						|
            'test: root NS record not supported for unit.tests.',
 | 
						|
            str(ctx.exception),
 | 
						|
        )
 | 
						|
 | 
						|
    def test_supports_root_ns_false_missing(self):
 | 
						|
        # provider has an existing record
 | 
						|
        provider = self.Provider(self.has_root)
 | 
						|
        provider.SUPPORTS_ROOT_NS = False
 | 
						|
 | 
						|
        # desired doesn't have a root
 | 
						|
        plan = provider.plan(self.no_root)
 | 
						|
 | 
						|
        # the mismatch doesn't matter since we can't manage the records
 | 
						|
        # anyway, they will have been removed from the desired and existing.
 | 
						|
        self.assertFalse(plan)
 | 
						|
 | 
						|
        # plan again with strict supports enabled, no change since desired
 | 
						|
        # isn't asking to manage root
 | 
						|
        provider.strict_supports = True
 | 
						|
        plan = provider.plan(self.no_root)
 | 
						|
        self.assertFalse(plan)
 | 
						|
 | 
						|
    def test_supports_root_ns_false_create_zone(self):
 | 
						|
        # provider has no existing records (create)
 | 
						|
        provider = self.Provider()
 | 
						|
        provider.SUPPORTS_ROOT_NS = False
 | 
						|
 | 
						|
        # case where we have a root NS in the desired
 | 
						|
        plan = provider.plan(self.has_root)
 | 
						|
 | 
						|
        # no support for root NS so we only create the other two records
 | 
						|
        self.assertTrue(plan)
 | 
						|
        self.assertEqual(2, len(plan.changes))
 | 
						|
 | 
						|
        # plan again with strict supports enabled, we'll get an exception b/c
 | 
						|
        # the target provider can't manage something in desired
 | 
						|
        provider.strict_supports = True
 | 
						|
        with self.assertRaises(SupportsException) as ctx:
 | 
						|
            provider.plan(self.has_root)
 | 
						|
        self.assertEqual(
 | 
						|
            'test: root NS record not supported for unit.tests.',
 | 
						|
            str(ctx.exception),
 | 
						|
        )
 | 
						|
 | 
						|
    def test_supports_root_ns_false_create_zone_missing(self):
 | 
						|
        # provider has no existing records (create)
 | 
						|
        provider = self.Provider()
 | 
						|
        provider.SUPPORTS_ROOT_NS = False
 | 
						|
 | 
						|
        # case where we have a root NS in the desired
 | 
						|
        plan = provider.plan(self.no_root)
 | 
						|
 | 
						|
        # no support for root NS so we only create the other two records
 | 
						|
        self.assertTrue(plan)
 | 
						|
        self.assertEqual(2, len(plan.changes))
 | 
						|
 | 
						|
        # plan again with strict supports enabled, same result since we're not
 | 
						|
        # asking for a root NS it's just the 2 other changes
 | 
						|
        provider.strict_supports = True
 | 
						|
        plan = provider.plan(self.no_root)
 | 
						|
        self.assertTrue(plan)
 | 
						|
        self.assertEqual(2, len(plan.changes))
 | 
						|
 | 
						|
    # True
 | 
						|
 | 
						|
    def test_supports_root_ns_true_matches(self):
 | 
						|
        # provider has a matching existing root record
 | 
						|
        provider = self.Provider(self.has_root)
 | 
						|
        provider.SUPPORTS_ROOT_NS = True
 | 
						|
 | 
						|
        # same root NS in the desired
 | 
						|
        plan = provider.plan(self.has_root)
 | 
						|
 | 
						|
        # root NS is supported in the target provider, but they match so no
 | 
						|
        # change
 | 
						|
        self.assertFalse(plan)
 | 
						|
 | 
						|
        # again with strict supports enabled, no difference
 | 
						|
        provider.strict_supports = True
 | 
						|
        plan = provider.plan(self.has_root)
 | 
						|
        self.assertFalse(plan)
 | 
						|
 | 
						|
    def test_supports_root_ns_true_different(self):
 | 
						|
        # provider has a non-matching existing record
 | 
						|
        provider = self.Provider(self.different_root)
 | 
						|
        provider.SUPPORTS_ROOT_NS = True
 | 
						|
 | 
						|
        # non-matching root NS in the desired
 | 
						|
        plan = provider.plan(self.has_root)
 | 
						|
 | 
						|
        # root NS mismatch in a target provider that supports it, we'll see the
 | 
						|
        # change
 | 
						|
        self.assertTrue(plan)
 | 
						|
        change = plan.changes[0]
 | 
						|
        self.assertEqual(self.other_root_ns_record, change.existing)
 | 
						|
        self.assertEqual(self.root_ns_record, change.new)
 | 
						|
 | 
						|
        # again with strict supports enabled, no difference, we see the change
 | 
						|
        provider.strict_supports = True
 | 
						|
        plan = provider.plan(self.has_root)
 | 
						|
        self.assertTrue(plan)
 | 
						|
        change = plan.changes[0]
 | 
						|
        self.assertEqual(self.other_root_ns_record, change.existing)
 | 
						|
        self.assertEqual(self.root_ns_record, change.new)
 | 
						|
 | 
						|
    def test_supports_root_ns_true_missing(self):
 | 
						|
        # provider has a matching existing root record
 | 
						|
        provider = self.Provider(self.has_root)
 | 
						|
        provider.SUPPORTS_ROOT_NS = True
 | 
						|
 | 
						|
        # there's no root record in the desired
 | 
						|
        plan = provider.plan(self.no_root)
 | 
						|
 | 
						|
        # the existing root NS in the target is left alone/as is since we
 | 
						|
        # aren't configured with one to manage
 | 
						|
        self.assertFalse(plan)
 | 
						|
 | 
						|
        # again with strict supports enabled, no difference as non-configured
 | 
						|
        # root NS is a special case that we always just warn about. This is
 | 
						|
        # because we can't known them before it's created and some people may
 | 
						|
        # choose to just leave them unmanaged undefinitely which has been the
 | 
						|
        # behavior up until now.
 | 
						|
        provider.strict_supports = True
 | 
						|
        plan = provider.plan(self.no_root)
 | 
						|
        self.assertFalse(plan)
 | 
						|
 | 
						|
    def test_supports_root_ns_true_create_zone(self):
 | 
						|
        # provider has no existing records (create)
 | 
						|
        provider = self.Provider()
 | 
						|
        provider.SUPPORTS_ROOT_NS = True
 | 
						|
 | 
						|
        # case where we have a root NS in the desired
 | 
						|
        plan = provider.plan(self.has_root)
 | 
						|
 | 
						|
        # there's no existing root record since we're creating the zone so
 | 
						|
        # we'll get a plan that creates everything, including it
 | 
						|
        self.assertTrue(plan)
 | 
						|
        self.assertEqual(3, len(plan.changes))
 | 
						|
        change = [
 | 
						|
            c for c in plan.changes if c.new.name == '' and c.new._type == 'NS'
 | 
						|
        ][0]
 | 
						|
        self.assertFalse(change.existing)
 | 
						|
        self.assertEqual(self.root_ns_record, change.new)
 | 
						|
 | 
						|
        # again with strict supports enabled, no difference, we see all 3
 | 
						|
        # changes
 | 
						|
        provider.strict_supports = True
 | 
						|
        plan = provider.plan(self.has_root)
 | 
						|
        self.assertTrue(plan)
 | 
						|
        self.assertEqual(3, len(plan.changes))
 | 
						|
        change = [
 | 
						|
            c for c in plan.changes if c.new.name == '' and c.new._type == 'NS'
 | 
						|
        ][0]
 | 
						|
        self.assertFalse(change.existing)
 | 
						|
        self.assertEqual(self.root_ns_record, change.new)
 | 
						|
 | 
						|
    def test_supports_root_ns_true_create_zone_missing(self):
 | 
						|
        # provider has no existing records (create)
 | 
						|
        provider = self.Provider()
 | 
						|
        provider.SUPPORTS_ROOT_NS = True
 | 
						|
 | 
						|
        # we don't have a root NS configured so we'll ignore them and just
 | 
						|
        # manage the other records
 | 
						|
        plan = provider.plan(self.no_root)
 | 
						|
        self.assertEqual(2, len(plan.changes))
 | 
						|
 | 
						|
        # again with strict supports enabled, we'd normally throw an exception,
 | 
						|
        # but since this is a create and we often can't know the root NS values
 | 
						|
        # before the zone is created it's special cased and will only warn
 | 
						|
        provider.strict_supports = True
 | 
						|
        plan = provider.plan(self.no_root)
 | 
						|
        self.assertEqual(2, len(plan.changes))
 |