mirror of
https://github.com/github/octodns.git
synced 2024-05-11 05:55:00 +00:00
ZoneNameFilter to error/ignore when record names end with the zone name
This commit is contained in:
@@ -1,3 +1,8 @@
|
||||
## v1.3.0 - 2023-??-?? - ???
|
||||
|
||||
* Added ZoneNameFilter processor to enable ignoring/alerting on type-os like
|
||||
octodns.com.octodns.com
|
||||
|
||||
## v1.2.1 - 2023-09-29 - Now with fewer stale files
|
||||
|
||||
* Update script/release to do clean room dist builds
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
from re import compile as re_compile
|
||||
|
||||
from ..record.exception import ValidationError
|
||||
from .base import BaseProcessor
|
||||
|
||||
|
||||
@@ -215,3 +216,53 @@ class IgnoreRootNsFilter(BaseProcessor):
|
||||
|
||||
process_source_zone = _process
|
||||
process_target_zone = _process
|
||||
|
||||
|
||||
class ZoneNameFilter(BaseProcessor):
|
||||
'''Filter or error on record names that contain the zone name
|
||||
|
||||
Example usage:
|
||||
|
||||
processors:
|
||||
zone-name:
|
||||
class: octodns.processor.filter.ZoneNameFilter
|
||||
# If true a ValidationError will be throw when such records are
|
||||
# encouterd, if false the records will just be ignored/omitted.
|
||||
# (default: false)
|
||||
|
||||
zones:
|
||||
exxampled.com.:
|
||||
sources:
|
||||
- config
|
||||
processors:
|
||||
- zone-name
|
||||
targets:
|
||||
- azure
|
||||
'''
|
||||
|
||||
def __init__(self, name, error=False):
|
||||
super().__init__(name)
|
||||
self.error = error
|
||||
|
||||
def _process(self, zone, *args, **kwargs):
|
||||
zone_name_with_dot = zone.name
|
||||
zone_name_without_dot = zone_name_with_dot[:-1]
|
||||
for record in zone.records:
|
||||
name = record.name
|
||||
if name.endswith(zone_name_with_dot) or name.endswith(
|
||||
zone_name_without_dot
|
||||
):
|
||||
if self.error:
|
||||
raise ValidationError(
|
||||
record.fqdn,
|
||||
['record name ends with zone name'],
|
||||
record.context,
|
||||
)
|
||||
else:
|
||||
# just remove it
|
||||
zone.remove_record(record)
|
||||
|
||||
return zone
|
||||
|
||||
process_source_zone = _process
|
||||
process_target_zone = _process
|
||||
|
||||
@@ -10,8 +10,10 @@ from octodns.processor.filter import (
|
||||
NameRejectlistFilter,
|
||||
TypeAllowlistFilter,
|
||||
TypeRejectlistFilter,
|
||||
ZoneNameFilter,
|
||||
)
|
||||
from octodns.record import Record
|
||||
from octodns.record.exception import ValidationError
|
||||
from octodns.zone import Zone
|
||||
|
||||
zone = Zone('unit.tests.', [])
|
||||
@@ -180,3 +182,70 @@ class TestIgnoreRootNsFilter(TestCase):
|
||||
[('A', ''), ('NS', 'sub')],
|
||||
sorted([(r._type, r.name) for r in filtered.records]),
|
||||
)
|
||||
|
||||
|
||||
class TestZoneNameFilter(TestCase):
|
||||
def test_ends_with_zone(self):
|
||||
zone_name_filter = ZoneNameFilter('zone-name')
|
||||
|
||||
zone = Zone('unit.tests.', [])
|
||||
|
||||
# something that doesn't come into play
|
||||
zone.add_record(
|
||||
Record.new(
|
||||
zone, 'www', {'type': 'A', 'ttl': 43, 'value': '1.2.3.4'}
|
||||
)
|
||||
)
|
||||
|
||||
# something that has the zone name, but doesn't end with it
|
||||
zone.add_record(
|
||||
Record.new(
|
||||
zone,
|
||||
f'{zone.name}more',
|
||||
{'type': 'A', 'ttl': 43, 'value': '1.2.3.4'},
|
||||
)
|
||||
)
|
||||
|
||||
self.assertEqual(2, len(zone.records))
|
||||
filtered = zone_name_filter.process_source_zone(zone.copy())
|
||||
# get everything back
|
||||
self.assertEqual(2, len(filtered.records))
|
||||
|
||||
with_dot = zone.copy()
|
||||
with_dot.add_record(
|
||||
Record.new(
|
||||
zone, zone.name, {'type': 'A', 'ttl': 43, 'value': '1.2.3.4'}
|
||||
)
|
||||
)
|
||||
self.assertEqual(3, len(with_dot.records))
|
||||
filtered = zone_name_filter.process_source_zone(with_dot.copy())
|
||||
# don't get the one that ends with the zone name
|
||||
self.assertEqual(2, len(filtered.records))
|
||||
|
||||
without_dot = zone.copy()
|
||||
without_dot.add_record(
|
||||
Record.new(
|
||||
zone,
|
||||
zone.name[:-1],
|
||||
{'type': 'A', 'ttl': 43, 'value': '1.2.3.4'},
|
||||
)
|
||||
)
|
||||
self.assertEqual(3, len(without_dot.records))
|
||||
filtered = zone_name_filter.process_source_zone(without_dot.copy())
|
||||
# don't get the one that ends with the zone name
|
||||
self.assertEqual(2, len(filtered.records))
|
||||
|
||||
def test_error(self):
|
||||
errors = ZoneNameFilter('zone-name', error=True)
|
||||
|
||||
zone = Zone('unit.tests.', [])
|
||||
zone.add_record(
|
||||
Record.new(
|
||||
zone, zone.name, {'type': 'A', 'ttl': 43, 'value': '1.2.3.4'}
|
||||
)
|
||||
)
|
||||
with self.assertRaises(ValidationError) as ctx:
|
||||
errors.process_source_zone(zone)
|
||||
self.assertEqual(
|
||||
['record name ends with zone name'], ctx.exception.reasons
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user