mirror of
https://github.com/github/octodns.git
synced 2024-05-11 05:55:00 +00:00
helps if you add the files, AutoArpa
This commit is contained in:
59
octodns/processor/arpa.py
Normal file
59
octodns/processor/arpa.py
Normal file
@@ -0,0 +1,59 @@
|
||||
#
|
||||
#
|
||||
#
|
||||
|
||||
from ipaddress import ip_address
|
||||
from logging import getLogger
|
||||
|
||||
from ..record import Record
|
||||
from .base import BaseProcessor
|
||||
|
||||
|
||||
class AutoArpa(BaseProcessor):
|
||||
def __init__(self, name, ttl=3600):
|
||||
super().__init__(name)
|
||||
self.log = getLogger(f'AutoArpa[{name}]')
|
||||
self.ttl = ttl
|
||||
self._records = {}
|
||||
|
||||
def process_source_zone(self, desired, sources):
|
||||
for record in desired.records:
|
||||
if record._type in ('A', 'AAAA'):
|
||||
ips = record.values
|
||||
if record.geo:
|
||||
for geo in record.geo.values():
|
||||
ips += geo.values
|
||||
if record.dynamic:
|
||||
for pool in record.dynamic.pools.values():
|
||||
for value in pool.data['values']:
|
||||
ips.append(value['value'])
|
||||
|
||||
for ip in ips:
|
||||
ptr = ip_address(ip).reverse_pointer
|
||||
self._records[f'{ptr}.'] = record.fqdn
|
||||
|
||||
return desired
|
||||
|
||||
def populate(self, zone, target=False, lenient=False):
|
||||
self.log.debug(
|
||||
'populate: name=%s, target=%s, lenient=%s',
|
||||
zone.name,
|
||||
target,
|
||||
lenient,
|
||||
)
|
||||
|
||||
before = len(zone.records)
|
||||
|
||||
zone_name = zone.name
|
||||
n = len(zone_name) + 1
|
||||
for arpa, fqdn in self._records.items():
|
||||
if arpa.endswith(zone_name):
|
||||
name = arpa[:-n]
|
||||
record = Record.new(
|
||||
zone, name, {'ttl': self.ttl, 'type': 'PTR', 'value': fqdn}
|
||||
)
|
||||
zone.add_record(record)
|
||||
|
||||
self.log.info(
|
||||
'populate: found %s records', len(zone.records) - before
|
||||
)
|
||||
177
tests/test_octodns_processor_arpa.py
Normal file
177
tests/test_octodns_processor_arpa.py
Normal file
@@ -0,0 +1,177 @@
|
||||
#
|
||||
#
|
||||
#
|
||||
|
||||
from unittest import TestCase
|
||||
|
||||
from octodns.processor.arpa import AutoArpa
|
||||
from octodns.record import Record
|
||||
from octodns.zone import Zone
|
||||
|
||||
|
||||
class TestAutoArpa(TestCase):
|
||||
def test_empty_zone(self):
|
||||
|
||||
# empty zone no records
|
||||
zone = Zone('unit.tests.', [])
|
||||
aa = AutoArpa('auto-arpa')
|
||||
aa.process_source_zone(zone, [])
|
||||
self.assertFalse(aa._records)
|
||||
|
||||
def test_single_value_A(self):
|
||||
zone = Zone('unit.tests.', [])
|
||||
record = Record.new(
|
||||
zone, 'a', {'ttl': 32, 'type': 'A', 'value': '1.2.3.4'}
|
||||
)
|
||||
zone.add_record(record)
|
||||
aa = AutoArpa('auto-arpa')
|
||||
aa.process_source_zone(zone, [])
|
||||
self.assertEqual(
|
||||
{'4.3.2.1.in-addr.arpa.': 'a.unit.tests.'}, aa._records
|
||||
)
|
||||
|
||||
# matching zone
|
||||
arpa = Zone('3.2.1.in-addr.arpa.', [])
|
||||
aa.populate(arpa)
|
||||
self.assertEqual(1, len(arpa.records))
|
||||
(ptr,) = arpa.records
|
||||
self.assertEqual('4.3.2.1.in-addr.arpa.', ptr.fqdn)
|
||||
self.assertEqual(record.fqdn, ptr.value)
|
||||
self.assertEqual(3600, ptr.ttl)
|
||||
|
||||
# other zone
|
||||
arpa = Zone('4.4.4.in-addr.arpa.', [])
|
||||
aa.populate(arpa)
|
||||
self.assertEqual(0, len(arpa.records))
|
||||
|
||||
def test_multi_value_A(self):
|
||||
zone = Zone('unit.tests.', [])
|
||||
record = Record.new(
|
||||
zone,
|
||||
'a',
|
||||
{'ttl': 32, 'type': 'A', 'values': ['1.2.3.4', '1.2.3.5']},
|
||||
)
|
||||
zone.add_record(record)
|
||||
aa = AutoArpa('auto-arpa', ttl=1600)
|
||||
aa.process_source_zone(zone, [])
|
||||
self.assertEqual(
|
||||
{
|
||||
'4.3.2.1.in-addr.arpa.': 'a.unit.tests.',
|
||||
'5.3.2.1.in-addr.arpa.': 'a.unit.tests.',
|
||||
},
|
||||
aa._records,
|
||||
)
|
||||
|
||||
arpa = Zone('3.2.1.in-addr.arpa.', [])
|
||||
aa.populate(arpa)
|
||||
self.assertEqual(2, len(arpa.records))
|
||||
ptr_1, ptr_2 = sorted(arpa.records)
|
||||
self.assertEqual('4.3.2.1.in-addr.arpa.', ptr_1.fqdn)
|
||||
self.assertEqual(record.fqdn, ptr_1.value)
|
||||
self.assertEqual('5.3.2.1.in-addr.arpa.', ptr_2.fqdn)
|
||||
self.assertEqual(record.fqdn, ptr_2.value)
|
||||
self.assertEqual(1600, ptr_2.ttl)
|
||||
|
||||
def test_AAAA(self):
|
||||
zone = Zone('unit.tests.', [])
|
||||
record = Record.new(
|
||||
zone, 'aaaa', {'ttl': 32, 'type': 'AAAA', 'value': 'ff:0c::4:2'}
|
||||
)
|
||||
zone.add_record(record)
|
||||
aa = AutoArpa('auto-arpa')
|
||||
aa.process_source_zone(zone, [])
|
||||
ip6_arpa = '2.0.0.0.4.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.c.0.0.0.f.f.0.0.ip6.arpa.'
|
||||
self.assertEqual({ip6_arpa: 'aaaa.unit.tests.'}, aa._records)
|
||||
|
||||
# matching zone
|
||||
arpa = Zone('c.0.0.0.f.f.0.0.ip6.arpa.', [])
|
||||
aa.populate(arpa)
|
||||
self.assertEqual(1, len(arpa.records))
|
||||
(ptr,) = arpa.records
|
||||
self.assertEqual(ip6_arpa, ptr.fqdn)
|
||||
self.assertEqual(record.fqdn, ptr.value)
|
||||
|
||||
# other zone
|
||||
arpa = Zone('c.0.0.0.e.f.0.0.ip6.arpa.', [])
|
||||
aa.populate(arpa)
|
||||
self.assertEqual(0, len(arpa.records))
|
||||
|
||||
def test_geo(self):
|
||||
zone = Zone('unit.tests.', [])
|
||||
record = Record.new(
|
||||
zone,
|
||||
'geo',
|
||||
{
|
||||
'ttl': 32,
|
||||
'type': 'A',
|
||||
'values': ['1.2.3.4', '1.2.3.5'],
|
||||
'geo': {
|
||||
'AF': ['1.1.1.1'],
|
||||
'AS-JP': ['2.2.2.2', '3.3.3.3'],
|
||||
'NA-US': ['4.4.4.4', '5.5.5.5'],
|
||||
},
|
||||
},
|
||||
)
|
||||
zone.add_record(record)
|
||||
aa = AutoArpa('auto-arpa')
|
||||
aa.process_source_zone(zone, [])
|
||||
self.assertEqual(
|
||||
{
|
||||
'1.1.1.1.in-addr.arpa.': 'geo.unit.tests.',
|
||||
'2.2.2.2.in-addr.arpa.': 'geo.unit.tests.',
|
||||
'3.3.3.3.in-addr.arpa.': 'geo.unit.tests.',
|
||||
'4.4.4.4.in-addr.arpa.': 'geo.unit.tests.',
|
||||
'5.5.5.5.in-addr.arpa.': 'geo.unit.tests.',
|
||||
'4.3.2.1.in-addr.arpa.': 'geo.unit.tests.',
|
||||
'5.3.2.1.in-addr.arpa.': 'geo.unit.tests.',
|
||||
},
|
||||
aa._records,
|
||||
)
|
||||
|
||||
def test_dynamic(self):
|
||||
zone = Zone('unit.tests.', [])
|
||||
record = Record.new(
|
||||
zone,
|
||||
'dynamic',
|
||||
{
|
||||
'ttl': 32,
|
||||
'type': 'A',
|
||||
'values': ['1.2.3.4', '1.2.3.5'],
|
||||
'dynamic': {
|
||||
'pools': {
|
||||
'one': {'values': [{'weight': 1, 'value': '3.3.3.3'}]},
|
||||
'two': {
|
||||
# Testing out of order value sorting here
|
||||
'values': [
|
||||
{'value': '5.5.5.5'},
|
||||
{'value': '4.4.4.4'},
|
||||
]
|
||||
},
|
||||
'three': {
|
||||
'values': [
|
||||
{'weight': 10, 'value': '4.4.4.4'},
|
||||
{'weight': 12, 'value': '5.5.5.5'},
|
||||
]
|
||||
},
|
||||
},
|
||||
'rules': [
|
||||
{'geos': ['AF', 'EU'], 'pool': 'three'},
|
||||
{'geos': ['NA-US-CA'], 'pool': 'two'},
|
||||
{'pool': 'one'},
|
||||
],
|
||||
},
|
||||
},
|
||||
)
|
||||
zone.add_record(record)
|
||||
aa = AutoArpa('auto-arpa')
|
||||
aa.process_source_zone(zone, [])
|
||||
self.assertEqual(
|
||||
{
|
||||
'3.3.3.3.in-addr.arpa.': 'dynamic.unit.tests.',
|
||||
'4.4.4.4.in-addr.arpa.': 'dynamic.unit.tests.',
|
||||
'5.5.5.5.in-addr.arpa.': 'dynamic.unit.tests.',
|
||||
'4.3.2.1.in-addr.arpa.': 'dynamic.unit.tests.',
|
||||
'5.3.2.1.in-addr.arpa.': 'dynamic.unit.tests.',
|
||||
},
|
||||
aa._records,
|
||||
)
|
||||
Reference in New Issue
Block a user