mirror of
https://github.com/github/octodns.git
synced 2024-05-11 05:55:00 +00:00
add NetworkValueRejectlistFilter and NetworkValueAllowlistFilter processors
This commit is contained in:
@@ -2,6 +2,8 @@
|
||||
#
|
||||
#
|
||||
|
||||
from ipaddress import ip_address, ip_network
|
||||
from itertools import product
|
||||
from logging import getLogger
|
||||
from re import compile as re_compile
|
||||
|
||||
@@ -125,6 +127,35 @@ class _NameBaseFilter(BaseProcessor):
|
||||
process_target_zone = _process
|
||||
|
||||
|
||||
class _NetworkValueBaseFilter(BaseProcessor):
|
||||
def __init__(self, name, _list):
|
||||
super().__init__(name)
|
||||
self.networks = []
|
||||
for value in _list:
|
||||
try:
|
||||
self.networks.append(ip_network(value))
|
||||
except ValueError:
|
||||
raise ValueError(f'{value} is not a valid CIDR to use')
|
||||
|
||||
def _process(self, zone, *args, **kwargs):
|
||||
for record in zone.records:
|
||||
if record._type not in ['A', 'AAAA']:
|
||||
continue
|
||||
|
||||
if any(
|
||||
ip_address(value) in network
|
||||
for value, network in product(record.values, self.networks)
|
||||
):
|
||||
self.matches(zone, record)
|
||||
else:
|
||||
self.doesnt_match(zone, record)
|
||||
|
||||
return zone
|
||||
|
||||
process_source_zone = _process
|
||||
process_target_zone = _process
|
||||
|
||||
|
||||
class NameAllowlistFilter(_NameBaseFilter, AllowsMixin):
|
||||
'''Only manage records with names that match the provider patterns
|
||||
|
||||
@@ -189,6 +220,60 @@ class NameRejectlistFilter(_NameBaseFilter, RejectsMixin):
|
||||
super().__init__(name, rejectlist)
|
||||
|
||||
|
||||
class NetworkValueAllowlistFilter(_NetworkValueBaseFilter, AllowsMixin):
|
||||
'''Only manage records with values that match the provider patterns
|
||||
|
||||
Example usage:
|
||||
|
||||
processors:
|
||||
only-these:
|
||||
class: octodns.processor.filter.NetworkValueAllowlistFilter
|
||||
allowlist:
|
||||
- 127.0.0.1/32
|
||||
- 192.168.0.0/16
|
||||
- fd00::/8
|
||||
|
||||
zones:
|
||||
exxampled.com.:
|
||||
sources:
|
||||
- config
|
||||
processors:
|
||||
- only-these
|
||||
targets:
|
||||
- route53
|
||||
'''
|
||||
|
||||
def __init__(self, name, allowlist):
|
||||
super().__init__(name, allowlist)
|
||||
|
||||
|
||||
class NetworkValueRejectlistFilter(_NetworkValueBaseFilter, RejectsMixin):
|
||||
'''Reject managing records with value matching a that match the provider patterns
|
||||
|
||||
Example usage:
|
||||
|
||||
processors:
|
||||
not-these:
|
||||
class: octodns.processor.filter.NetworkValueRejectlistFilter
|
||||
rejectlist:
|
||||
- 127.0.0.1/32
|
||||
- 192.168.0.0/16
|
||||
- fd00::/8
|
||||
|
||||
zones:
|
||||
exxampled.com.:
|
||||
sources:
|
||||
- config
|
||||
processors:
|
||||
- not-these
|
||||
targets:
|
||||
- route53
|
||||
'''
|
||||
|
||||
def __init__(self, name, rejectlist):
|
||||
super().__init__(name, rejectlist)
|
||||
|
||||
|
||||
class IgnoreRootNsFilter(BaseProcessor):
|
||||
'''Do not manage Root NS Records.
|
||||
|
||||
|
||||
@@ -9,6 +9,8 @@ from octodns.processor.filter import (
|
||||
IgnoreRootNsFilter,
|
||||
NameAllowlistFilter,
|
||||
NameRejectlistFilter,
|
||||
NetworkValueAllowlistFilter,
|
||||
NetworkValueRejectlistFilter,
|
||||
TypeAllowlistFilter,
|
||||
TypeRejectlistFilter,
|
||||
ZoneNameFilter,
|
||||
@@ -161,6 +163,38 @@ class TestNameRejectListFilter(TestCase):
|
||||
)
|
||||
|
||||
|
||||
class TestNetworkValueFilter(TestCase):
|
||||
zone = Zone('unit.tests.', [])
|
||||
matches = Record.new(
|
||||
zone, 'private-ipv4', {'type': 'A', 'ttl': 42, 'value': '10.42.42.42'}
|
||||
)
|
||||
zone.add_record(matches)
|
||||
doesnt = Record.new(
|
||||
zone, 'public-ipv4', {'type': 'A', 'ttl': 42, 'value': '42.42.42.42'}
|
||||
)
|
||||
zone.add_record(doesnt)
|
||||
matchable1 = Record.new(
|
||||
zone, 'private-ipv6', {'type': 'AAAA', 'ttl': 42, 'value': 'fd12:3456:789a:1::1'}
|
||||
)
|
||||
zone.add_record(matchable1)
|
||||
matchable2 = Record.new(
|
||||
zone, 'public-ipv6', {'type': 'AAAA', 'ttl': 42, 'value': 'dead:beef:cafe::1'}
|
||||
)
|
||||
zone.add_record(matchable2)
|
||||
|
||||
def test_reject(self):
|
||||
filter_private = NetworkValueRejectlistFilter('rejectlist', set(('10.0.0.0/8', 'fd00::/8')))
|
||||
|
||||
got = filter_private.process_source_zone(self.zone.copy())
|
||||
self.assertEqual(['public-ipv4', 'public-ipv6'], sorted([r.name for r in got.records]))
|
||||
|
||||
def test_allow(self):
|
||||
filter_private = NetworkValueAllowlistFilter('allowlist', set(('10.0.0.0/8', 'fd00::/8')))
|
||||
|
||||
got = filter_private.process_source_zone(self.zone.copy())
|
||||
self.assertEqual(['private-ipv4', 'private-ipv6'], sorted([r.name for r in got.records]))
|
||||
|
||||
|
||||
class TestIgnoreRootNsFilter(TestCase):
|
||||
zone = Zone('unit.tests.', [])
|
||||
root = Record.new(
|
||||
|
||||
Reference in New Issue
Block a user