From dc446eefb99f46a75c8f767f87739d692b929351 Mon Sep 17 00:00:00 2001 From: Samuel Parkinson Date: Wed, 15 Feb 2023 15:17:12 +0000 Subject: [PATCH] Error on too many lookups from single SPF mechanisms --- octodns/processor/spf.py | 58 +++++++++++ tests/test_octodns_processor_spf.py | 153 ++++++++++++++++++++++++++++ 2 files changed, 211 insertions(+) create mode 100644 octodns/processor/spf.py create mode 100644 tests/test_octodns_processor_spf.py diff --git a/octodns/processor/spf.py b/octodns/processor/spf.py new file mode 100644 index 0000000..258ad76 --- /dev/null +++ b/octodns/processor/spf.py @@ -0,0 +1,58 @@ +# +# +# + +from logging import getLogger + +from .base import BaseProcessor, ProcessorException + + +class SpfValueException(ProcessorException): + pass + + +class SpfDnsLookupException(ProcessorException): + pass + + +class SpfDnsLookupProcessor(BaseProcessor): + log = getLogger('SpfDnsLookupProcessor') + + def __init__(self, name): + self.log.debug(f"SpfDnsLookupProcessor: {name}") + super().__init__(name) + + def process_source_zone(self, zone, *args, **kwargs): + for record in zone.records: + if record._type != 'TXT': + continue + + if record._octodns.get('lenient'): + continue + + # SPF values must begin with 'v=spf1 ' + values = [ + value for value in record.values if value.startswith('v=spf1 ') + ] + + if len(values) == 0: + continue + + if len(values) > 1: + raise SpfValueException( + f"{record.fqdn} has more than one SPF value" + ) + + lookups = 0 + terms = values[0].removeprefix('v=spf1 ').split(' ') + + for term in terms: + if lookups > 10: + raise SpfDnsLookupException( + f"{record.fqdn} has too many SPF DNS lookups" + ) + + if term in ['a', 'mx', 'exists', 'redirect']: + lookups += 1 + + return zone diff --git a/tests/test_octodns_processor_spf.py b/tests/test_octodns_processor_spf.py new file mode 100644 index 0000000..b4cd3a1 --- /dev/null +++ b/tests/test_octodns_processor_spf.py @@ -0,0 +1,153 @@ +from unittest import TestCase + +from octodns.processor.spf import ( + SpfDnsLookupException, + SpfDnsLookupProcessor, + SpfValueException, +) +from octodns.record.base import Record +from octodns.zone import Zone + + +class TestSpfDnsLookupProcessor(TestCase): + def test_processor(self): + processor = SpfDnsLookupProcessor('test') + assert processor.name == 'test' + + processor = SpfDnsLookupProcessor('test') + zone = Zone('unit.tests.', []) + zone.add_record( + Record.new( + zone, + '', + { + 'type': 'TXT', + 'ttl': 86400, + 'values': ['v=spf1 a ~all', 'v=DMARC1\; p=reject\;'], + }, + ) + ) + + assert zone == processor.process_source_zone(zone) + zone = Zone('unit.tests.', []) + zone.add_record( + Record.new( + zone, + '', + { + 'type': 'TXT', + 'ttl': 86400, + 'values': [ + 'v=spf1 a a a a a a a a a a -all', + 'v=DMARC1\; p=reject\;', + ], + }, + ) + ) + + assert zone == processor.process_source_zone(zone) + + zone = Zone('unit.tests.', []) + zone.add_record( + Record.new( + zone, + '', + { + 'type': 'TXT', + 'ttl': 86400, + 'values': [ + 'v=spf1 a mx exists redirect a a a a a a a ~all', + 'v=DMARC1\; p=reject\;', + ], + }, + ) + ) + + with self.assertRaises(SpfDnsLookupException): + processor.process_source_zone(zone) + + def test_processor_skips_lenient_records(self): + processor = SpfDnsLookupProcessor('test') + zone = Zone('unit.tests.', []) + + lenient = Record.new( + zone, + 'lenient', + { + 'type': 'TXT', + 'ttl': 86400, + 'value': 'v=spf1 a a a a a a a a a a a ~all', + 'octodns': {'lenient': True}, + }, + ) + zone.add_record(lenient) + + processed = processor.process_source_zone(zone) + + assert zone == processed + + def test_processor_errors_on_many_spf_values_in_record(self): + processor = SpfDnsLookupProcessor('test') + zone = Zone('unit.tests.', []) + + record = Record.new( + zone, + '', + { + 'type': 'TXT', + 'ttl': 86400, + 'values': [ + 'v=spf1 include:mailgun.org ~all', + 'v=spf1 include:_spf.google.com ~all', + ], + }, + ) + zone.add_record(record) + + with self.assertRaises(SpfValueException): + processor.process_source_zone(zone) + + def test_processor_filters_to_records_with_spf_values(self): + processor = SpfDnsLookupProcessor('test') + zone = Zone('unit.tests.', []) + + zone.add_record( + Record.new( + zone, '', {'type': 'A', 'ttl': 86400, 'value': '1.2.3.4'} + ) + ) + zone.add_record( + Record.new( + zone, + '', + { + 'type': 'TXT', + 'ttl': 86400, + 'value': 'v=spf1 a a a a a a a a a a a ~all', + }, + ) + ) + + with self.assertRaises(SpfDnsLookupException): + processor.process_source_zone(zone) + + zone = Zone('unit.tests.', []) + + zone.add_record( + Record.new( + zone, '', {'type': 'A', 'ttl': 86400, 'value': '1.2.3.4'} + ) + ) + zone.add_record( + Record.new( + zone, + '', + { + 'type': 'TXT', + 'ttl': 86400, + 'values': ['AAAAAAAAAAA', 'v=spf10'], + }, + ) + ) + + assert zone == processor.process_source_zone(zone)