mirror of
https://github.com/github/octodns.git
synced 2024-05-11 05:55:00 +00:00
pass at adding a processor that adds missing trailing dots
This commit is contained in:
@ -1,3 +1,7 @@
|
||||
## v1.?.? - 2024-??-?? -
|
||||
|
||||
* Add EnsureTrailingDots processor
|
||||
|
||||
## v1.5.0 - 2024-02-26 - Checksums, nested expansion, & flexable values
|
||||
|
||||
* Beta support for Manager.enable_checksum and octodns-sync --checksum Allows a
|
||||
|
@ -330,6 +330,7 @@ Similar to providers, but can only serve to populate records into a zone, cannot
|
||||
|--|--|
|
||||
| [AcmeMangingProcessor](/octodns/processor/acme.py) | Useful when processes external to octoDNS are managing acme challenge DNS records, e.g. LetsEncrypt |
|
||||
| [AutoArpa](/octodns/processor/arpa.py) | See [Automatic PTR generation](#automatic-ptr-generation) below |
|
||||
| [EnsureTrailingDots](/octodns/processor/trailing_dots.py) | Processor that ensures ALIAS, CNAME, DNAME, MX, NS, PTR, and SRVs have trailing dots |
|
||||
| [ExcludeRootNsChanges](/octodns/processor/filter.py) | Filter that errors or warns on planned root/APEX NS records changes. |
|
||||
| [IgnoreRootNsFilter](/octodns/processor/filter.py) | Filter that IGNORES root/APEX NS records and prevents octoDNS from trying to manage them (where supported.) |
|
||||
| [MetaProcessor](/octodns/processor/meta.py) | Adds a special meta record with timing, UUID, providers, and/or version to aid in debugging and monitoring. |
|
||||
|
44
octodns/processor/trailing_dots.py
Normal file
44
octodns/processor/trailing_dots.py
Normal file
@ -0,0 +1,44 @@
|
||||
#
|
||||
#
|
||||
#
|
||||
|
||||
from octodns.processor.base import BaseProcessor
|
||||
|
||||
|
||||
def _no_trailing_dot(record, prop):
|
||||
return any(getattr(v, prop)[-1] != '.' for v in record.values)
|
||||
|
||||
|
||||
def _ensure_trailing_dots(record, prop):
|
||||
new = record.copy()
|
||||
for value in new.values:
|
||||
val = getattr(value, prop)
|
||||
if val[-1] != '.':
|
||||
setattr(value, prop, f'{val}.')
|
||||
return new
|
||||
|
||||
|
||||
class EnsureTrailingDots(BaseProcessor):
|
||||
def process_source_zone(self, desired, sources):
|
||||
for record in desired.records:
|
||||
_type = record._type
|
||||
if _type in ('ALIAS', 'CNAME', 'DNAME') and record.value[-1] != '.':
|
||||
new = record.copy()
|
||||
new.value = f'{new.value}.'
|
||||
desired.add_record(new, replace=True)
|
||||
elif _type in ('NS', 'PTR') and any(
|
||||
v[-1] != '.' for v in record.values
|
||||
):
|
||||
new = record.copy()
|
||||
new.values = [
|
||||
v if v[-1] == '.' else f'{v}.' for v in record.values
|
||||
]
|
||||
desired.add_record(new, replace=True)
|
||||
elif _type == 'MX' and _no_trailing_dot(record, 'exchange'):
|
||||
new = _ensure_trailing_dots(record, 'exchange')
|
||||
desired.add_record(new, replace=True)
|
||||
elif _type == 'SRV' and _no_trailing_dot(record, 'target'):
|
||||
new = _ensure_trailing_dots(record, 'target')
|
||||
desired.add_record(new, replace=True)
|
||||
|
||||
return desired
|
168
tests/test_octodns_processor_trailing_dots.py
Normal file
168
tests/test_octodns_processor_trailing_dots.py
Normal file
@ -0,0 +1,168 @@
|
||||
#
|
||||
#
|
||||
#
|
||||
|
||||
from unittest import TestCase
|
||||
|
||||
from octodns.processor.trailing_dots import (
|
||||
EnsureTrailingDots,
|
||||
_ensure_trailing_dots,
|
||||
_no_trailing_dot,
|
||||
)
|
||||
from octodns.record import Record
|
||||
from octodns.zone import Zone
|
||||
|
||||
|
||||
def _find(zone, name):
|
||||
return next(r for r in zone.records if r.name == name)
|
||||
|
||||
|
||||
class EnsureTrailingDotsTest(TestCase):
|
||||
def test_cname(self):
|
||||
etd = EnsureTrailingDots('test')
|
||||
|
||||
zone = Zone('unit.tests.', [])
|
||||
has = Record.new(
|
||||
zone,
|
||||
'has',
|
||||
{'type': 'CNAME', 'ttl': 42, 'value': 'absolute.target.'},
|
||||
)
|
||||
zone.add_record(has)
|
||||
missing = Record.new(
|
||||
zone,
|
||||
'missing',
|
||||
{'type': 'CNAME', 'ttl': 42, 'value': 'relative.target'},
|
||||
lenient=True,
|
||||
)
|
||||
zone.add_record(missing)
|
||||
|
||||
got = etd.process_source_zone(zone, None)
|
||||
self.assertEqual('absolute.target.', _find(got, 'has').value)
|
||||
self.assertEqual('relative.target.', _find(got, 'missing').value)
|
||||
|
||||
# HACK: this should never be done to records outside of specific testing
|
||||
# situations like this
|
||||
has._type = 'ALIAS'
|
||||
missing._type = 'ALIAS'
|
||||
got = etd.process_source_zone(zone, None)
|
||||
self.assertEqual('absolute.target.', _find(got, 'has').value)
|
||||
self.assertEqual('relative.target.', _find(got, 'missing').value)
|
||||
|
||||
has._type = 'DNAME'
|
||||
missing._type = 'DNAME'
|
||||
got = etd.process_source_zone(zone, None)
|
||||
self.assertEqual('absolute.target.', _find(got, 'has').value)
|
||||
self.assertEqual('relative.target.', _find(got, 'missing').value)
|
||||
|
||||
def test_mx(self):
|
||||
etd = EnsureTrailingDots('test')
|
||||
|
||||
zone = Zone('unit.tests.', [])
|
||||
record = Record.new(
|
||||
zone,
|
||||
'record',
|
||||
{
|
||||
'type': 'MX',
|
||||
'ttl': 42,
|
||||
'values': [
|
||||
{'preference': 1, 'exchange': 'absolute.target.'},
|
||||
{'preference': 1, 'exchange': 'relative.target'},
|
||||
],
|
||||
},
|
||||
lenient=True,
|
||||
)
|
||||
zone.add_record(record)
|
||||
|
||||
# processor
|
||||
got = etd.process_source_zone(zone, None)
|
||||
got = next(iter(got.records))
|
||||
self.assertEqual(
|
||||
['absolute.target.', 'relative.target.'],
|
||||
[v.exchange for v in got.values],
|
||||
)
|
||||
|
||||
# specifically test the checker
|
||||
self.assertTrue(_no_trailing_dot(record, 'exchange'))
|
||||
# specifically test the fixer
|
||||
self.assertEqual(
|
||||
['absolute.target.', 'relative.target.'],
|
||||
[
|
||||
v.exchange
|
||||
for v in _ensure_trailing_dots(record, 'exchange').values
|
||||
],
|
||||
)
|
||||
# this time with nothing that matches
|
||||
record.values[1].exchange = 'also.absolute.'
|
||||
self.assertFalse(_no_trailing_dot(record, 'exchange'))
|
||||
|
||||
def test_ns(self):
|
||||
etd = EnsureTrailingDots('test')
|
||||
|
||||
zone = Zone('unit.tests.', [])
|
||||
record = Record.new(
|
||||
zone,
|
||||
'record',
|
||||
{
|
||||
'type': 'NS',
|
||||
'ttl': 42,
|
||||
'values': ['absolute.target.', 'relative.target'],
|
||||
},
|
||||
lenient=True,
|
||||
)
|
||||
zone.add_record(record)
|
||||
|
||||
got = etd.process_source_zone(zone, None)
|
||||
got = next(iter(got.records))
|
||||
self.assertEqual(['absolute.target.', 'relative.target.'], got.values)
|
||||
|
||||
# HACK: this should never be done to records outside of specific testing
|
||||
# situations like this
|
||||
record._type = 'PTR'
|
||||
got = etd.process_source_zone(zone, None)
|
||||
got = next(iter(got.records))
|
||||
self.assertEqual(['absolute.target.', 'relative.target.'], got.values)
|
||||
|
||||
def test_srv(self):
|
||||
etd = EnsureTrailingDots('test')
|
||||
|
||||
zone = Zone('unit.tests.', [])
|
||||
record = Record.new(
|
||||
zone,
|
||||
'record',
|
||||
{
|
||||
'type': 'SRV',
|
||||
'ttl': 42,
|
||||
'values': [
|
||||
{
|
||||
'priority': 1,
|
||||
'weight': 1,
|
||||
'port': 99,
|
||||
'target': 'absolute.target.',
|
||||
},
|
||||
{
|
||||
'priority': 1,
|
||||
'weight': 1,
|
||||
'port': 99,
|
||||
'target': 'relative.target',
|
||||
},
|
||||
],
|
||||
},
|
||||
lenient=True,
|
||||
)
|
||||
zone.add_record(record)
|
||||
|
||||
# processor
|
||||
got = etd.process_source_zone(zone, None)
|
||||
got = next(iter(got.records))
|
||||
self.assertEqual(
|
||||
['absolute.target.', 'relative.target.'],
|
||||
[v.target for v in got.values],
|
||||
)
|
||||
|
||||
# specifically test the checker
|
||||
self.assertTrue(_no_trailing_dot(record, 'target'))
|
||||
# specifically test the fixer
|
||||
self.assertEqual(
|
||||
['absolute.target.', 'relative.target.'],
|
||||
[v.target for v in _ensure_trailing_dots(record, 'target').values],
|
||||
)
|
Reference in New Issue
Block a user