From 8dd690ac88aa35c0ba4128975fef9574a57bc836 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Thu, 19 Jan 2023 12:19:10 -0800 Subject: [PATCH] helps if you add the files, AutoArpa --- octodns/processor/arpa.py | 59 +++++++++ tests/test_octodns_processor_arpa.py | 177 +++++++++++++++++++++++++++ 2 files changed, 236 insertions(+) create mode 100644 octodns/processor/arpa.py create mode 100644 tests/test_octodns_processor_arpa.py diff --git a/octodns/processor/arpa.py b/octodns/processor/arpa.py new file mode 100644 index 0000000..0f7742a --- /dev/null +++ b/octodns/processor/arpa.py @@ -0,0 +1,59 @@ +# +# +# + +from ipaddress import ip_address +from logging import getLogger + +from ..record import Record +from .base import BaseProcessor + + +class AutoArpa(BaseProcessor): + def __init__(self, name, ttl=3600): + super().__init__(name) + self.log = getLogger(f'AutoArpa[{name}]') + self.ttl = ttl + self._records = {} + + def process_source_zone(self, desired, sources): + for record in desired.records: + if record._type in ('A', 'AAAA'): + ips = record.values + if record.geo: + for geo in record.geo.values(): + ips += geo.values + if record.dynamic: + for pool in record.dynamic.pools.values(): + for value in pool.data['values']: + ips.append(value['value']) + + for ip in ips: + ptr = ip_address(ip).reverse_pointer + self._records[f'{ptr}.'] = record.fqdn + + return desired + + def populate(self, zone, target=False, lenient=False): + self.log.debug( + 'populate: name=%s, target=%s, lenient=%s', + zone.name, + target, + lenient, + ) + + before = len(zone.records) + + zone_name = zone.name + n = len(zone_name) + 1 + for arpa, fqdn in self._records.items(): + if arpa.endswith(zone_name): + name = arpa[:-n] + record = Record.new( + zone, name, {'ttl': self.ttl, 'type': 'PTR', 'value': fqdn} + ) + zone.add_record(record) + + self.log.info( + 'populate: found %s records', len(zone.records) - before + ) diff --git a/tests/test_octodns_processor_arpa.py b/tests/test_octodns_processor_arpa.py new file mode 100644 index 0000000..e02eeeb --- /dev/null +++ b/tests/test_octodns_processor_arpa.py @@ -0,0 +1,177 @@ +# +# +# + +from unittest import TestCase + +from octodns.processor.arpa import AutoArpa +from octodns.record import Record +from octodns.zone import Zone + + +class TestAutoArpa(TestCase): + def test_empty_zone(self): + + # empty zone no records + zone = Zone('unit.tests.', []) + aa = AutoArpa('auto-arpa') + aa.process_source_zone(zone, []) + self.assertFalse(aa._records) + + def test_single_value_A(self): + zone = Zone('unit.tests.', []) + record = Record.new( + zone, 'a', {'ttl': 32, 'type': 'A', 'value': '1.2.3.4'} + ) + zone.add_record(record) + aa = AutoArpa('auto-arpa') + aa.process_source_zone(zone, []) + self.assertEqual( + {'4.3.2.1.in-addr.arpa.': 'a.unit.tests.'}, aa._records + ) + + # matching zone + arpa = Zone('3.2.1.in-addr.arpa.', []) + aa.populate(arpa) + self.assertEqual(1, len(arpa.records)) + (ptr,) = arpa.records + self.assertEqual('4.3.2.1.in-addr.arpa.', ptr.fqdn) + self.assertEqual(record.fqdn, ptr.value) + self.assertEqual(3600, ptr.ttl) + + # other zone + arpa = Zone('4.4.4.in-addr.arpa.', []) + aa.populate(arpa) + self.assertEqual(0, len(arpa.records)) + + def test_multi_value_A(self): + zone = Zone('unit.tests.', []) + record = Record.new( + zone, + 'a', + {'ttl': 32, 'type': 'A', 'values': ['1.2.3.4', '1.2.3.5']}, + ) + zone.add_record(record) + aa = AutoArpa('auto-arpa', ttl=1600) + aa.process_source_zone(zone, []) + self.assertEqual( + { + '4.3.2.1.in-addr.arpa.': 'a.unit.tests.', + '5.3.2.1.in-addr.arpa.': 'a.unit.tests.', + }, + aa._records, + ) + + arpa = Zone('3.2.1.in-addr.arpa.', []) + aa.populate(arpa) + self.assertEqual(2, len(arpa.records)) + ptr_1, ptr_2 = sorted(arpa.records) + self.assertEqual('4.3.2.1.in-addr.arpa.', ptr_1.fqdn) + self.assertEqual(record.fqdn, ptr_1.value) + self.assertEqual('5.3.2.1.in-addr.arpa.', ptr_2.fqdn) + self.assertEqual(record.fqdn, ptr_2.value) + self.assertEqual(1600, ptr_2.ttl) + + def test_AAAA(self): + zone = Zone('unit.tests.', []) + record = Record.new( + zone, 'aaaa', {'ttl': 32, 'type': 'AAAA', 'value': 'ff:0c::4:2'} + ) + zone.add_record(record) + aa = AutoArpa('auto-arpa') + aa.process_source_zone(zone, []) + ip6_arpa = '2.0.0.0.4.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.c.0.0.0.f.f.0.0.ip6.arpa.' + self.assertEqual({ip6_arpa: 'aaaa.unit.tests.'}, aa._records) + + # matching zone + arpa = Zone('c.0.0.0.f.f.0.0.ip6.arpa.', []) + aa.populate(arpa) + self.assertEqual(1, len(arpa.records)) + (ptr,) = arpa.records + self.assertEqual(ip6_arpa, ptr.fqdn) + self.assertEqual(record.fqdn, ptr.value) + + # other zone + arpa = Zone('c.0.0.0.e.f.0.0.ip6.arpa.', []) + aa.populate(arpa) + self.assertEqual(0, len(arpa.records)) + + def test_geo(self): + zone = Zone('unit.tests.', []) + record = Record.new( + zone, + 'geo', + { + 'ttl': 32, + 'type': 'A', + 'values': ['1.2.3.4', '1.2.3.5'], + 'geo': { + 'AF': ['1.1.1.1'], + 'AS-JP': ['2.2.2.2', '3.3.3.3'], + 'NA-US': ['4.4.4.4', '5.5.5.5'], + }, + }, + ) + zone.add_record(record) + aa = AutoArpa('auto-arpa') + aa.process_source_zone(zone, []) + self.assertEqual( + { + '1.1.1.1.in-addr.arpa.': 'geo.unit.tests.', + '2.2.2.2.in-addr.arpa.': 'geo.unit.tests.', + '3.3.3.3.in-addr.arpa.': 'geo.unit.tests.', + '4.4.4.4.in-addr.arpa.': 'geo.unit.tests.', + '5.5.5.5.in-addr.arpa.': 'geo.unit.tests.', + '4.3.2.1.in-addr.arpa.': 'geo.unit.tests.', + '5.3.2.1.in-addr.arpa.': 'geo.unit.tests.', + }, + aa._records, + ) + + def test_dynamic(self): + zone = Zone('unit.tests.', []) + record = Record.new( + zone, + 'dynamic', + { + 'ttl': 32, + 'type': 'A', + 'values': ['1.2.3.4', '1.2.3.5'], + 'dynamic': { + 'pools': { + 'one': {'values': [{'weight': 1, 'value': '3.3.3.3'}]}, + 'two': { + # Testing out of order value sorting here + 'values': [ + {'value': '5.5.5.5'}, + {'value': '4.4.4.4'}, + ] + }, + 'three': { + 'values': [ + {'weight': 10, 'value': '4.4.4.4'}, + {'weight': 12, 'value': '5.5.5.5'}, + ] + }, + }, + 'rules': [ + {'geos': ['AF', 'EU'], 'pool': 'three'}, + {'geos': ['NA-US-CA'], 'pool': 'two'}, + {'pool': 'one'}, + ], + }, + }, + ) + zone.add_record(record) + aa = AutoArpa('auto-arpa') + aa.process_source_zone(zone, []) + self.assertEqual( + { + '3.3.3.3.in-addr.arpa.': 'dynamic.unit.tests.', + '4.4.4.4.in-addr.arpa.': 'dynamic.unit.tests.', + '5.5.5.5.in-addr.arpa.': 'dynamic.unit.tests.', + '4.3.2.1.in-addr.arpa.': 'dynamic.unit.tests.', + '5.3.2.1.in-addr.arpa.': 'dynamic.unit.tests.', + }, + aa._records, + )