1
0
mirror of https://github.com/github/octodns.git synced 2024-05-11 05:55:00 +00:00

Error on too many lookups from single SPF mechanisms

This commit is contained in:
Samuel Parkinson
2023-02-15 15:17:12 +00:00
parent 9f4a910a08
commit dc446eefb9
2 changed files with 211 additions and 0 deletions

58
octodns/processor/spf.py Normal file
View File

@@ -0,0 +1,58 @@
#
#
#
from logging import getLogger
from .base import BaseProcessor, ProcessorException
class SpfValueException(ProcessorException):
pass
class SpfDnsLookupException(ProcessorException):
pass
class SpfDnsLookupProcessor(BaseProcessor):
log = getLogger('SpfDnsLookupProcessor')
def __init__(self, name):
self.log.debug(f"SpfDnsLookupProcessor: {name}")
super().__init__(name)
def process_source_zone(self, zone, *args, **kwargs):
for record in zone.records:
if record._type != 'TXT':
continue
if record._octodns.get('lenient'):
continue
# SPF values must begin with 'v=spf1 '
values = [
value for value in record.values if value.startswith('v=spf1 ')
]
if len(values) == 0:
continue
if len(values) > 1:
raise SpfValueException(
f"{record.fqdn} has more than one SPF value"
)
lookups = 0
terms = values[0].removeprefix('v=spf1 ').split(' ')
for term in terms:
if lookups > 10:
raise SpfDnsLookupException(
f"{record.fqdn} has too many SPF DNS lookups"
)
if term in ['a', 'mx', 'exists', 'redirect']:
lookups += 1
return zone

View File

@@ -0,0 +1,153 @@
from unittest import TestCase
from octodns.processor.spf import (
SpfDnsLookupException,
SpfDnsLookupProcessor,
SpfValueException,
)
from octodns.record.base import Record
from octodns.zone import Zone
class TestSpfDnsLookupProcessor(TestCase):
def test_processor(self):
processor = SpfDnsLookupProcessor('test')
assert processor.name == 'test'
processor = SpfDnsLookupProcessor('test')
zone = Zone('unit.tests.', [])
zone.add_record(
Record.new(
zone,
'',
{
'type': 'TXT',
'ttl': 86400,
'values': ['v=spf1 a ~all', 'v=DMARC1\; p=reject\;'],
},
)
)
assert zone == processor.process_source_zone(zone)
zone = Zone('unit.tests.', [])
zone.add_record(
Record.new(
zone,
'',
{
'type': 'TXT',
'ttl': 86400,
'values': [
'v=spf1 a a a a a a a a a a -all',
'v=DMARC1\; p=reject\;',
],
},
)
)
assert zone == processor.process_source_zone(zone)
zone = Zone('unit.tests.', [])
zone.add_record(
Record.new(
zone,
'',
{
'type': 'TXT',
'ttl': 86400,
'values': [
'v=spf1 a mx exists redirect a a a a a a a ~all',
'v=DMARC1\; p=reject\;',
],
},
)
)
with self.assertRaises(SpfDnsLookupException):
processor.process_source_zone(zone)
def test_processor_skips_lenient_records(self):
processor = SpfDnsLookupProcessor('test')
zone = Zone('unit.tests.', [])
lenient = Record.new(
zone,
'lenient',
{
'type': 'TXT',
'ttl': 86400,
'value': 'v=spf1 a a a a a a a a a a a ~all',
'octodns': {'lenient': True},
},
)
zone.add_record(lenient)
processed = processor.process_source_zone(zone)
assert zone == processed
def test_processor_errors_on_many_spf_values_in_record(self):
processor = SpfDnsLookupProcessor('test')
zone = Zone('unit.tests.', [])
record = Record.new(
zone,
'',
{
'type': 'TXT',
'ttl': 86400,
'values': [
'v=spf1 include:mailgun.org ~all',
'v=spf1 include:_spf.google.com ~all',
],
},
)
zone.add_record(record)
with self.assertRaises(SpfValueException):
processor.process_source_zone(zone)
def test_processor_filters_to_records_with_spf_values(self):
processor = SpfDnsLookupProcessor('test')
zone = Zone('unit.tests.', [])
zone.add_record(
Record.new(
zone, '', {'type': 'A', 'ttl': 86400, 'value': '1.2.3.4'}
)
)
zone.add_record(
Record.new(
zone,
'',
{
'type': 'TXT',
'ttl': 86400,
'value': 'v=spf1 a a a a a a a a a a a ~all',
},
)
)
with self.assertRaises(SpfDnsLookupException):
processor.process_source_zone(zone)
zone = Zone('unit.tests.', [])
zone.add_record(
Record.new(
zone, '', {'type': 'A', 'ttl': 86400, 'value': '1.2.3.4'}
)
)
zone.add_record(
Record.new(
zone,
'',
{
'type': 'TXT',
'ttl': 86400,
'values': ['AAAAAAAAAAA', 'v=spf10'],
},
)
)
assert zone == processor.process_source_zone(zone)