1
0
mirror of https://github.com/github/octodns.git synced 2024-05-11 05:55:00 +00:00

OVH: Add support of DKIM records

This commit is contained in:
trnsnt
2017-10-23 17:12:32 +02:00
parent cf76cb1446
commit 6b1a8f8ccf
2 changed files with 231 additions and 84 deletions

View File

@@ -5,6 +5,8 @@
from __future__ import absolute_import, division, print_function, \
unicode_literals
import base64
import binascii
import logging
from collections import defaultdict
@@ -32,8 +34,10 @@ class OvhProvider(BaseProvider):
SUPPORTS_GEO = False
SUPPORTS = set(('A', 'AAAA', 'CNAME', 'MX', 'NAPTR', 'NS', 'PTR', 'SPF',
'SRV', 'SSHFP', 'TXT'))
# This variable is also used in populate method to filter which OVH record
# types are supported by octodns
SUPPORTS = set(('A', 'AAAA', 'CNAME', 'DKIM', 'MX', 'NAPTR', 'NS', 'PTR',
'SPF', 'SRV', 'SSHFP', 'TXT'))
def __init__(self, id, endpoint, application_key, application_secret,
consumer_key, *args, **kwargs):
@@ -62,6 +66,10 @@ class OvhProvider(BaseProvider):
before = len(zone.records)
for name, types in values.items():
for _type, records in types.items():
if _type not in self.SUPPORTS:
self.log.warning('Not managed record of type %s, skip',
_type)
continue
data_for = getattr(self, '_data_for_{}'.format(_type))
record = Record.new(zone, name, data_for(_type, records),
source=self, lenient=lenient)
@@ -96,7 +104,11 @@ class OvhProvider(BaseProvider):
def _apply_delete(self, zone_name, change):
existing = change.existing
self.delete_records(zone_name, existing._type, existing.name)
record_type = existing._type
if record_type == "TXT":
if self._is_valid_dkim(existing.values[0]):
record_type = 'DKIM'
self.delete_records(zone_name, record_type, existing.name)
@staticmethod
def _data_for_multiple(_type, records):
@@ -184,6 +196,15 @@ class OvhProvider(BaseProvider):
'values': values
}
@staticmethod
def _data_for_DKIM(_type, records):
return {
'ttl': records[0]['ttl'],
'type': "TXT",
'values': [record['target'].replace(';', '\;')
for record in records]
}
_data_for_A = _data_for_multiple
_data_for_AAAA = _data_for_multiple
_data_for_NS = _data_for_multiple
@@ -258,15 +279,63 @@ class OvhProvider(BaseProvider):
'fieldType': record._type
}
def _params_for_TXT(self, record):
for value in record.values:
field_type = 'TXT'
if self._is_valid_dkim(value):
field_type = 'DKIM'
value = value.replace("\;", ";")
yield {
'target': value,
'subDomain': record.name,
'ttl': record.ttl,
'fieldType': field_type
}
_params_for_A = _params_for_multiple
_params_for_AAAA = _params_for_multiple
_params_for_NS = _params_for_multiple
_params_for_SPF = _params_for_multiple
_params_for_TXT = _params_for_multiple
_params_for_CNAME = _params_for_single
_params_for_PTR = _params_for_single
def _is_valid_dkim(self, value):
"""Check if value is a valid DKIM"""
validator_dict = {'h': lambda val: val in ['sha1', 'sha256'],
's': lambda val: val in ['*', 'email'],
't': lambda val: val in ['y', 's'],
'v': lambda val: val == 'DKIM1',
'k': lambda val: val == 'rsa',
'n': lambda _: True,
'g': lambda _: True}
splitted = value.split('\;')
found_key = False
for splitted_value in splitted:
sub_split = map(lambda x: x.strip(), splitted_value.split("=", 1))
if len(sub_split) < 2:
return False
key, value = sub_split[0], sub_split[1]
if key == "p":
is_valid_key = self._is_valid_dkim_key(value)
if not is_valid_key:
return False
found_key = True
else:
is_valid_key = validator_dict.get(key, lambda _: False)(value)
if not is_valid_key:
return False
return found_key
@staticmethod
def _is_valid_dkim_key(key):
try:
base64.decodestring(key)
except binascii.Error:
return False
return True
def get_records(self, zone_name):
"""
List all records of a DNS zone