Merge pull request #751 from octodns/acme-ignoring-processor

Implement AcmeManagingProcessor
This commit is contained in:
Ross McFarland
2021-08-14 08:05:37 -07:00
committed by GitHub
2 changed files with 167 additions and 0 deletions
+64
View File
@@ -0,0 +1,64 @@
#
#
#
from __future__ import absolute_import, division, print_function, \
unicode_literals
from logging import getLogger
from .base import BaseProcessor
class AcmeMangingProcessor(BaseProcessor):
log = getLogger('AcmeMangingProcessor')
def __init__(self, name):
'''
processors:
acme:
class: octodns.processor.acme.AcmeMangingProcessor
...
zones:
something.com.:
...
processors:
- acme
...
'''
super(AcmeMangingProcessor, self).__init__(name)
self._owned = set()
def process_source_zone(self, zone, *args, **kwargs):
ret = self._clone_zone(zone)
for record in zone.records:
if record._type == 'TXT' and \
record.name.startswith('_acme-challenge'):
# We have a managed acme challenge record (owned by octoDNS) so
# we should mark it as such
record = record.copy()
record.values.append('*octoDNS*')
record.values.sort()
# This assumes we'll see things as sources before targets,
# which is the case...
self._owned.add(record)
ret.add_record(record)
return ret
def process_target_zone(self, zone, *args, **kwargs):
ret = self._clone_zone(zone)
for record in zone.records:
# Uses a startswith rather than == to ignore subdomain challenges,
# e.g. _acme-challenge.foo.domain.com when managing domain.com
if record._type == 'TXT' and \
record.name.startswith('_acme-challenge') and \
'*octoDNS*' not in record.values and \
record not in self._owned:
self.log.info('_process: ignoring %s', record.fqdn)
continue
ret.add_record(record)
return ret
+103
View File
@@ -0,0 +1,103 @@
#
#
#
from __future__ import absolute_import, division, print_function, \
unicode_literals
from unittest import TestCase
from octodns.processor.acme import AcmeMangingProcessor
from octodns.record import Record
from octodns.zone import Zone
zone = Zone('unit.tests.', [])
records = {
'root-unowned': Record.new(zone, '_acme-challenge', {
'ttl': 30,
'type': 'TXT',
'value': 'magic bit',
}),
'sub-unowned': Record.new(zone, '_acme-challenge.sub-unowned', {
'ttl': 30,
'type': 'TXT',
'value': 'magic bit',
}),
'not-txt': Record.new(zone, '_acme-challenge.not-txt', {
'ttl': 30,
'type': 'AAAA',
'value': '::1',
}),
'not-acme': Record.new(zone, 'not-acme', {
'ttl': 30,
'type': 'TXT',
'value': 'Hello World!',
}),
'managed': Record.new(zone, '_acme-challenge.managed', {
'ttl': 30,
'type': 'TXT',
'value': 'magic bit',
}),
'owned': Record.new(zone, '_acme-challenge.owned', {
'ttl': 30,
'type': 'TXT',
'values': ['*octoDNS*', 'magic bit'],
}),
'going-away': Record.new(zone, '_acme-challenge.going-away', {
'ttl': 30,
'type': 'TXT',
'values': ['*octoDNS*', 'magic bit'],
}),
}
class TestAcmeMangingProcessor(TestCase):
def test_process_zones(self):
acme = AcmeMangingProcessor('acme')
source = Zone(zone.name, [])
# Unrelated stuff that should be untouched
source.add_record(records['not-txt'])
source.add_record(records['not-acme'])
# A managed acme that will have ownership value added
source.add_record(records['managed'])
got = acme.process_source_zone(source)
self.assertEquals([
'_acme-challenge.managed',
'_acme-challenge.not-txt',
'not-acme',
], sorted([r.name for r in got.records]))
managed = None
for record in got.records:
print(record.name)
if record.name.endswith('managed'):
managed = record
break
self.assertTrue(managed)
# Ownership was marked with an extra value
self.assertEquals(['*octoDNS*', 'magic bit'], record.values)
existing = Zone(zone.name, [])
# Unrelated stuff that should be untouched
existing.add_record(records['not-txt'])
existing.add_record(records['not-acme'])
# Stuff that will be ignored
existing.add_record(records['root-unowned'])
existing.add_record(records['sub-unowned'])
# A managed acme that needs ownership value added
existing.add_record(records['managed'])
# A managed acme that has ownershp managed
existing.add_record(records['owned'])
# A managed acme that needs to go away
existing.add_record(records['going-away'])
got = acme.process_target_zone(existing)
self.assertEquals([
'_acme-challenge.going-away',
'_acme-challenge.managed',
'_acme-challenge.not-txt',
'_acme-challenge.owned',
'not-acme'
], sorted([r.name for r in got.records]))