1
0
mirror of https://github.com/github/octodns.git synced 2024-05-11 05:55:00 +00:00

Add filters for checking record values

Create two new filters, ValueAllowlistFilter and ValueRejectlistFilter
that allow checing the value(s) of records to include or exclude,
similar to the name filters that alread exist
This commit is contained in:
Nullreff
2024-01-14 08:56:51 -08:00
parent 30f8b4f5f5
commit e9d8b02365
3 changed files with 249 additions and 0 deletions

View File

@ -335,6 +335,8 @@ Similar to providers, but can only serve to populate records into a zone, cannot
| [MetaProcessor](/octodns/processor/meta.py) | Adds a special meta record with timing, UUID, providers, and/or version to aid in debugging and monitoring. |
| [NameAllowlistFilter](/octodns/processor/filter.py) | Filter that ONLY manages records that match specified naming patterns, all others will be ignored |
| [NameRejectlistFilter](/octodns/processor/filter.py) | Filter that INGORES records that match specified naming patterns, all others will be managed |
| [ValueAllowlistFilter](/octodns/processor/filter.py) | Filter that ONLY manages records that match specified value patterns, all others will be ignored |
| [ValueRejectlistFilter](/octodns/processor/filter.py) | Filter that INGORES records that match specified value patterns, all others will be managed |
| [OwnershipProcessor](/octodns/processor/ownership.py) | Processor that implements ownership in octoDNS so that it can manage only the records in a zone in sources and will ignore all others. |
| [SpfDnsLookupProcessor](/octodns/processor/spf.py) | Processor that checks SPF values for violations of DNS query limits |
| [TtlRestrictionFilter](/octodns/processor/restrict.py) | Processor that restricts the allow TTL values to a specified range or list of specific values |

View File

@ -215,6 +215,111 @@ class NameRejectlistFilter(_NameBaseFilter, RejectsMixin):
super().__init__(name, rejectlist)
class _ValueBaseFilter(_FilterProcessor):
def __init__(self, name, _list, **kwargs):
super().__init__(name, **kwargs)
exact = set()
regex = []
for pattern in _list:
if pattern.startswith('/'):
regex.append(re_compile(pattern[1:-1]))
else:
exact.add(pattern)
self.exact = exact
self.regex = regex
def _process(self, zone, *args, **kwargs):
for record in zone.records:
values = []
if hasattr(record, 'values'):
values = [str(value) for value in record.values]
else:
values = [str(record.value)]
if any(value in self.exact for value in values):
self.matches(zone, record)
continue
elif any(r.search(value) for r in self.regex for value in values):
self.matches(zone, record)
continue
self.doesnt_match(zone, record)
return zone
class ValueAllowlistFilter(_ValueBaseFilter, AllowsMixin):
'''Only manage records with values that match the provider patterns
Example usage:
processors:
only-these:
class: octodns.processor.filter.ValueAllowlistFilter
allowlist:
# exact string match
- www
# contains/substring match
- /substring/
# regex pattern match
- /some-pattern-\\d\\+/
# regex - anchored so has to match start to end
- /^start-.+-end$/
# Optional param that can be set to False to leave the target zone
# alone, thus allowing deletion of existing records
# (default: true)
# include_target: True
zones:
exxampled.com.:
sources:
- config
processors:
- only-these
targets:
- route53
'''
def __init__(self, name, allowlist):
super().__init__(name, allowlist)
class ValueRejectlistFilter(_ValueBaseFilter, RejectsMixin):
'''Reject managing records with names that match the provider patterns
Example usage:
processors:
not-these:
class: octodns.processor.filter.ValueRejectlistFilter
rejectlist:
# exact string match
- www
# contains/substring match
- /substring/
# regex pattern match
- /some-pattern-\\d\\+/
# regex - anchored so has to match start to end
- /^start-.+-end$/
# Optional param that can be set to False to leave the target zone
# alone, thus allowing deletion of existing records
# (default: true)
# include_target: True
zones:
exxampled.com.:
sources:
- config
processors:
- not-these
targets:
- route53
'''
def __init__(self, name, rejectlist):
super().__init__(name, rejectlist)
class _NetworkValueBaseFilter(BaseProcessor):
def __init__(self, name, _list):
super().__init__(name)

View File

@ -13,6 +13,8 @@ from octodns.processor.filter import (
NetworkValueRejectlistFilter,
TypeAllowlistFilter,
TypeRejectlistFilter,
ValueAllowlistFilter,
ValueRejectlistFilter,
ZoneNameFilter,
)
from octodns.provider.plan import Plan
@ -179,6 +181,146 @@ class TestNameRejectListFilter(TestCase):
)
class TestValueAllowListFilter(TestCase):
zone = Zone('unit.tests.', [])
matches = Record.new(
zone,
'good.exact',
{'type': 'CNAME', 'ttl': 42, 'value': 'matches.example.com.'},
)
zone.add_record(matches)
doesnt = Record.new(
zone,
'bad.exact',
{'type': 'CNAME', 'ttl': 42, 'value': 'doesnt.example.com.'},
)
zone.add_record(doesnt)
matches_many = Record.new(
zone,
'good.values',
{
'type': 'TXT',
'ttl': 42,
'values': ['matches.example.com.', 'another'],
},
)
zone.add_record(matches_many)
doesnt_many = Record.new(
zone,
'bad.values',
{
'type': 'TXT',
'ttl': 42,
'values': ['doesnt.example.com.', 'another'],
},
)
zone.add_record(doesnt_many)
matchable1 = Record.new(
zone,
'first.regex',
{'type': 'CNAME', 'ttl': 42, 'value': 'start.f43ad96.end.'},
)
zone.add_record(matchable1)
matchable2 = Record.new(
zone,
'second.regex',
{'type': 'CNAME', 'ttl': 42, 'value': 'start.a3b444c.end.'},
)
zone.add_record(matchable2)
def test_exact(self):
allows = ValueAllowlistFilter('exact', ('matches.example.com.',))
self.assertEqual(6, len(self.zone.records))
filtered = allows.process_source_zone(self.zone.copy())
self.assertEqual(2, len(filtered.records))
self.assertEqual(
['good.exact', 'good.values'],
sorted([r.name for r in filtered.records]),
)
def test_regex(self):
allows = ValueAllowlistFilter('exact', ('/^start\\..+\\.end\\.$/',))
self.assertEqual(6, len(self.zone.records))
filtered = allows.process_source_zone(self.zone.copy())
self.assertEqual(2, len(filtered.records))
self.assertEqual(
['first.regex', 'second.regex'],
sorted([r.name for r in filtered.records]),
)
class TestValueRejectListFilter(TestCase):
zone = Zone('unit.tests.', [])
matches = Record.new(
zone,
'good.compare',
{'type': 'CNAME', 'ttl': 42, 'value': 'matches.example.com.'},
)
zone.add_record(matches)
doesnt = Record.new(
zone,
'bad.compare',
{'type': 'CNAME', 'ttl': 42, 'value': 'doesnt.example.com.'},
)
zone.add_record(doesnt)
matches_many = Record.new(
zone,
'good.values',
{
'type': 'TXT',
'ttl': 42,
'values': ['matches.example.com.', 'another'],
},
)
zone.add_record(matches_many)
doesnt_many = Record.new(
zone,
'bad.values',
{
'type': 'TXT',
'ttl': 42,
'values': ['doesnt.example.com.', 'another'],
},
)
zone.add_record(doesnt_many)
matchable1 = Record.new(
zone,
'first.regex',
{'type': 'CNAME', 'ttl': 42, 'value': 'start.f43ad96.end.'},
)
zone.add_record(matchable1)
matchable2 = Record.new(
zone,
'second.regex',
{'type': 'CNAME', 'ttl': 42, 'value': 'start.a3b444c.end.'},
)
zone.add_record(matchable2)
def test_exact(self):
rejects = ValueRejectlistFilter('exact', ('matches.example.com.',))
self.assertEqual(6, len(self.zone.records))
filtered = rejects.process_source_zone(self.zone.copy())
self.assertEqual(4, len(filtered.records))
self.assertEqual(
['bad.compare', 'bad.values', 'first.regex', 'second.regex'],
sorted([r.name for r in filtered.records]),
)
def test_regex(self):
rejects = ValueRejectlistFilter('exact', ('/^start\\..+\\.end\\.$/',))
self.assertEqual(6, len(self.zone.records))
filtered = rejects.process_source_zone(self.zone.copy())
self.assertEqual(4, len(filtered.records))
self.assertEqual(
['bad.compare', 'bad.values', 'good.compare', 'good.values'],
sorted([r.name for r in filtered.records]),
)
class TestNetworkValueFilter(TestCase):
zone = Zone('unit.tests.', [])
for record in [