mirror of
https://github.com/github/octodns.git
synced 2024-05-11 05:55:00 +00:00
160 lines
4.1 KiB
Python
160 lines
4.1 KiB
Python
#
|
|
#
|
|
#
|
|
|
|
import re
|
|
|
|
from fqdn import FQDN
|
|
|
|
from ..equality import EqualityTupleMixin
|
|
from ..idna import idna_encode
|
|
from .base import Record, ValuesMixin
|
|
from .rr import RrParseError
|
|
|
|
|
|
class SrvValue(EqualityTupleMixin, dict):
|
|
@classmethod
|
|
def parse_rdata_text(self, value):
|
|
try:
|
|
priority, weight, port, target = value.split(' ')
|
|
except ValueError:
|
|
raise RrParseError()
|
|
try:
|
|
priority = int(priority)
|
|
except ValueError:
|
|
pass
|
|
try:
|
|
weight = int(weight)
|
|
except ValueError:
|
|
pass
|
|
try:
|
|
port = int(port)
|
|
except ValueError:
|
|
pass
|
|
return {
|
|
'priority': priority,
|
|
'weight': weight,
|
|
'port': port,
|
|
'target': target,
|
|
}
|
|
|
|
@classmethod
|
|
def validate(cls, data, _type):
|
|
if not isinstance(data, (list, tuple)):
|
|
data = (data,)
|
|
reasons = []
|
|
for value in data:
|
|
# TODO: validate algorithm and fingerprint_type values
|
|
try:
|
|
int(value['priority'])
|
|
except KeyError:
|
|
reasons.append('missing priority')
|
|
except ValueError:
|
|
reasons.append(f'invalid priority "{value["priority"]}"')
|
|
try:
|
|
int(value['weight'])
|
|
except KeyError:
|
|
reasons.append('missing weight')
|
|
except ValueError:
|
|
reasons.append(f'invalid weight "{value["weight"]}"')
|
|
try:
|
|
int(value['port'])
|
|
except KeyError:
|
|
reasons.append('missing port')
|
|
except ValueError:
|
|
reasons.append(f'invalid port "{value["port"]}"')
|
|
try:
|
|
target = value['target']
|
|
if not target:
|
|
reasons.append('missing target')
|
|
continue
|
|
target = idna_encode(target)
|
|
if not target.endswith('.'):
|
|
reasons.append(f'SRV value "{target}" missing trailing .')
|
|
if (
|
|
target != '.'
|
|
and not FQDN(target, allow_underscores=True).is_valid
|
|
):
|
|
reasons.append(
|
|
f'Invalid SRV target "{target}" is not a valid FQDN.'
|
|
)
|
|
except KeyError:
|
|
reasons.append('missing target')
|
|
return reasons
|
|
|
|
@classmethod
|
|
def process(cls, values):
|
|
return [cls(v) for v in values]
|
|
|
|
def __init__(self, value):
|
|
super().__init__(
|
|
{
|
|
'priority': int(value['priority']),
|
|
'weight': int(value['weight']),
|
|
'port': int(value['port']),
|
|
'target': idna_encode(value['target']),
|
|
}
|
|
)
|
|
|
|
@property
|
|
def priority(self):
|
|
return self['priority']
|
|
|
|
@priority.setter
|
|
def priority(self, value):
|
|
self['priority'] = value
|
|
|
|
@property
|
|
def weight(self):
|
|
return self['weight']
|
|
|
|
@weight.setter
|
|
def weight(self, value):
|
|
self['weight'] = value
|
|
|
|
@property
|
|
def port(self):
|
|
return self['port']
|
|
|
|
@port.setter
|
|
def port(self, value):
|
|
self['port'] = value
|
|
|
|
@property
|
|
def target(self):
|
|
return self['target']
|
|
|
|
@target.setter
|
|
def target(self, value):
|
|
self['target'] = value
|
|
|
|
@property
|
|
def data(self):
|
|
return self
|
|
|
|
def __hash__(self):
|
|
return hash(self.__repr__())
|
|
|
|
def _equality_tuple(self):
|
|
return (self.priority, self.weight, self.port, self.target)
|
|
|
|
def __repr__(self):
|
|
return f"'{self.priority} {self.weight} {self.port} {self.target}'"
|
|
|
|
|
|
class SrvRecord(ValuesMixin, Record):
|
|
_type = 'SRV'
|
|
_value_type = SrvValue
|
|
_name_re = re.compile(r'^(\*|_[^\.]+)\.[^\.]+')
|
|
|
|
@classmethod
|
|
def validate(cls, name, fqdn, data):
|
|
reasons = []
|
|
if not cls._name_re.match(name):
|
|
reasons.append('invalid name for SRV record')
|
|
reasons.extend(super().validate(name, fqdn, data))
|
|
return reasons
|
|
|
|
|
|
Record.register_type(SrvRecord)
|