1
0
mirror of https://github.com/github/octodns.git synced 2024-05-11 05:55:00 +00:00

Add Provider._process_existing_zone to mirror _process_desired_zone

This commit is contained in:
Ross McFarland
2022-02-08 13:46:04 -08:00
parent 33a10eada4
commit 3bcb6c8cec
2 changed files with 72 additions and 0 deletions

View File

@@ -105,6 +105,37 @@ class BaseProvider(BaseSource):
return desired
def _process_existing_zone(self, existing):
'''
An opportunity for providers to modify the existing zone records before
planning. `existing` is a "shallow" copy, see `Zone.copy` for more
information
- Must call `super` at an appropriate point for their work, generally
that means as the final step of the method, returning the result of
the `super` call.
- May modify `existing` directly.
- Must not modify records directly, `record.copy` should be called,
the results of which can be modified, and then `Zone.add_record` may
be used with `replace=True`.
- May call `Zone.remove_record` to remove records from `existing`.
- Must call supports_warn_or_except with information about any changes
that are made to have them logged or throw errors depending on the
provider configuration.
'''
for record in existing.records:
if record._type == 'NS' and record.name == '' and \
not self.SUPPORTS_ROOT_NS:
# ignore, we can't manage root NS records
msg = \
f'root NS record not supported for {record.fqdn}'
fallback = 'ignoring it'
self.supports_warn_or_except(msg, fallback)
existing.remove_record(record)
return existing
def _include_change(self, change):
'''
An opportunity for providers to filter out false positives due to
@@ -143,6 +174,8 @@ class BaseProvider(BaseSource):
self.log.warning('Provider %s used in target mode did not return '
'exists', self.id)
existing = self._process_existing_zone(existing)
for processor in processors:
existing = processor.process_target_zone(existing, target=self)

View File

@@ -344,6 +344,45 @@ class TestBaseProvider(TestCase):
'obey'
)
# SUPPORTS_ROOT_NS
provider.SUPPORTS_ROOT_NS = False
zone1 = Zone('unit.tests.', [])
record1 = Record.new(zone1, '', {
'type': 'NS',
'ttl': 3600,
'values': ['foo.com.', 'bar.com.'],
})
zone1.add_record(record1)
zone2 = provider._process_desired_zone(zone1.copy())
self.assertEqual(0, len(zone2.records))
provider.SUPPORTS_ROOT_NS = True
zone2 = provider._process_desired_zone(zone1.copy())
self.assertEqual(1, len(zone2.records))
self.assertEqual(record1, list(zone2.records)[0])
def test_process_existing_zone(self):
provider = HelperProvider('test')
# SUPPORTS_ROOT_NS
provider.SUPPORTS_ROOT_NS = False
zone1 = Zone('unit.tests.', [])
record1 = Record.new(zone1, '', {
'type': 'NS',
'ttl': 3600,
'values': ['foo.com.', 'bar.com.'],
})
zone1.add_record(record1)
zone2 = provider._process_existing_zone(zone1.copy())
self.assertEqual(0, len(zone2.records))
provider.SUPPORTS_ROOT_NS = True
zone2 = provider._process_existing_zone(zone1.copy())
self.assertEqual(1, len(zone2.records))
self.assertEqual(record1, list(zone2.records)[0])
def test_safe_none(self):
# No changes is safe
Plan(None, None, [], True).raise_if_unsafe()