From a48881beb4b51934699e7eecc05932a7a74d3bda Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Thu, 7 Mar 2024 12:50:59 -0800 Subject: [PATCH] pass at adding a processor that adds missing trailing dots --- CHANGELOG.md | 4 + README.md | 1 + octodns/processor/trailing_dots.py | 44 +++++ tests/test_octodns_processor_trailing_dots.py | 168 ++++++++++++++++++ 4 files changed, 217 insertions(+) create mode 100644 octodns/processor/trailing_dots.py create mode 100644 tests/test_octodns_processor_trailing_dots.py diff --git a/CHANGELOG.md b/CHANGELOG.md index d8bc762..9a772e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## v1.?.? - 2024-??-?? - + +* Add EnsureTrailingDots processor + ## v1.5.0 - 2024-02-26 - Checksums, nested expansion, & flexable values * Beta support for Manager.enable_checksum and octodns-sync --checksum Allows a diff --git a/README.md b/README.md index 421d621..0b76a9b 100644 --- a/README.md +++ b/README.md @@ -330,6 +330,7 @@ Similar to providers, but can only serve to populate records into a zone, cannot |--|--| | [AcmeMangingProcessor](/octodns/processor/acme.py) | Useful when processes external to octoDNS are managing acme challenge DNS records, e.g. LetsEncrypt | | [AutoArpa](/octodns/processor/arpa.py) | See [Automatic PTR generation](#automatic-ptr-generation) below | +| [EnsureTrailingDots](/octodns/processor/trailing_dots.py) | Processor that ensures ALIAS, CNAME, DNAME, MX, NS, PTR, and SRVs have trailing dots | | [ExcludeRootNsChanges](/octodns/processor/filter.py) | Filter that errors or warns on planned root/APEX NS records changes. | | [IgnoreRootNsFilter](/octodns/processor/filter.py) | Filter that IGNORES root/APEX NS records and prevents octoDNS from trying to manage them (where supported.) | | [MetaProcessor](/octodns/processor/meta.py) | Adds a special meta record with timing, UUID, providers, and/or version to aid in debugging and monitoring. | diff --git a/octodns/processor/trailing_dots.py b/octodns/processor/trailing_dots.py new file mode 100644 index 0000000..31ac740 --- /dev/null +++ b/octodns/processor/trailing_dots.py @@ -0,0 +1,44 @@ +# +# +# + +from octodns.processor.base import BaseProcessor + + +def _no_trailing_dot(record, prop): + return any(getattr(v, prop)[-1] != '.' for v in record.values) + + +def _ensure_trailing_dots(record, prop): + new = record.copy() + for value in new.values: + val = getattr(value, prop) + if val[-1] != '.': + setattr(value, prop, f'{val}.') + return new + + +class EnsureTrailingDots(BaseProcessor): + def process_source_zone(self, desired, sources): + for record in desired.records: + _type = record._type + if _type in ('ALIAS', 'CNAME', 'DNAME') and record.value[-1] != '.': + new = record.copy() + new.value = f'{new.value}.' + desired.add_record(new, replace=True) + elif _type in ('NS', 'PTR') and any( + v[-1] != '.' for v in record.values + ): + new = record.copy() + new.values = [ + v if v[-1] == '.' else f'{v}.' for v in record.values + ] + desired.add_record(new, replace=True) + elif _type == 'MX' and _no_trailing_dot(record, 'exchange'): + new = _ensure_trailing_dots(record, 'exchange') + desired.add_record(new, replace=True) + elif _type == 'SRV' and _no_trailing_dot(record, 'target'): + new = _ensure_trailing_dots(record, 'target') + desired.add_record(new, replace=True) + + return desired diff --git a/tests/test_octodns_processor_trailing_dots.py b/tests/test_octodns_processor_trailing_dots.py new file mode 100644 index 0000000..0118b4b --- /dev/null +++ b/tests/test_octodns_processor_trailing_dots.py @@ -0,0 +1,168 @@ +# +# +# + +from unittest import TestCase + +from octodns.processor.trailing_dots import ( + EnsureTrailingDots, + _ensure_trailing_dots, + _no_trailing_dot, +) +from octodns.record import Record +from octodns.zone import Zone + + +def _find(zone, name): + return next(r for r in zone.records if r.name == name) + + +class EnsureTrailingDotsTest(TestCase): + def test_cname(self): + etd = EnsureTrailingDots('test') + + zone = Zone('unit.tests.', []) + has = Record.new( + zone, + 'has', + {'type': 'CNAME', 'ttl': 42, 'value': 'absolute.target.'}, + ) + zone.add_record(has) + missing = Record.new( + zone, + 'missing', + {'type': 'CNAME', 'ttl': 42, 'value': 'relative.target'}, + lenient=True, + ) + zone.add_record(missing) + + got = etd.process_source_zone(zone, None) + self.assertEqual('absolute.target.', _find(got, 'has').value) + self.assertEqual('relative.target.', _find(got, 'missing').value) + + # HACK: this should never be done to records outside of specific testing + # situations like this + has._type = 'ALIAS' + missing._type = 'ALIAS' + got = etd.process_source_zone(zone, None) + self.assertEqual('absolute.target.', _find(got, 'has').value) + self.assertEqual('relative.target.', _find(got, 'missing').value) + + has._type = 'DNAME' + missing._type = 'DNAME' + got = etd.process_source_zone(zone, None) + self.assertEqual('absolute.target.', _find(got, 'has').value) + self.assertEqual('relative.target.', _find(got, 'missing').value) + + def test_mx(self): + etd = EnsureTrailingDots('test') + + zone = Zone('unit.tests.', []) + record = Record.new( + zone, + 'record', + { + 'type': 'MX', + 'ttl': 42, + 'values': [ + {'preference': 1, 'exchange': 'absolute.target.'}, + {'preference': 1, 'exchange': 'relative.target'}, + ], + }, + lenient=True, + ) + zone.add_record(record) + + # processor + got = etd.process_source_zone(zone, None) + got = next(iter(got.records)) + self.assertEqual( + ['absolute.target.', 'relative.target.'], + [v.exchange for v in got.values], + ) + + # specifically test the checker + self.assertTrue(_no_trailing_dot(record, 'exchange')) + # specifically test the fixer + self.assertEqual( + ['absolute.target.', 'relative.target.'], + [ + v.exchange + for v in _ensure_trailing_dots(record, 'exchange').values + ], + ) + # this time with nothing that matches + record.values[1].exchange = 'also.absolute.' + self.assertFalse(_no_trailing_dot(record, 'exchange')) + + def test_ns(self): + etd = EnsureTrailingDots('test') + + zone = Zone('unit.tests.', []) + record = Record.new( + zone, + 'record', + { + 'type': 'NS', + 'ttl': 42, + 'values': ['absolute.target.', 'relative.target'], + }, + lenient=True, + ) + zone.add_record(record) + + got = etd.process_source_zone(zone, None) + got = next(iter(got.records)) + self.assertEqual(['absolute.target.', 'relative.target.'], got.values) + + # HACK: this should never be done to records outside of specific testing + # situations like this + record._type = 'PTR' + got = etd.process_source_zone(zone, None) + got = next(iter(got.records)) + self.assertEqual(['absolute.target.', 'relative.target.'], got.values) + + def test_srv(self): + etd = EnsureTrailingDots('test') + + zone = Zone('unit.tests.', []) + record = Record.new( + zone, + 'record', + { + 'type': 'SRV', + 'ttl': 42, + 'values': [ + { + 'priority': 1, + 'weight': 1, + 'port': 99, + 'target': 'absolute.target.', + }, + { + 'priority': 1, + 'weight': 1, + 'port': 99, + 'target': 'relative.target', + }, + ], + }, + lenient=True, + ) + zone.add_record(record) + + # processor + got = etd.process_source_zone(zone, None) + got = next(iter(got.records)) + self.assertEqual( + ['absolute.target.', 'relative.target.'], + [v.target for v in got.values], + ) + + # specifically test the checker + self.assertTrue(_no_trailing_dot(record, 'target')) + # specifically test the fixer + self.assertEqual( + ['absolute.target.', 'relative.target.'], + [v.target for v in _ensure_trailing_dots(record, 'target').values], + )