mirror of
https://github.com/github/octodns.git
synced 2024-05-11 05:55:00 +00:00
finish up rr text Record coverage (hopefully)
This commit is contained in:
+111
-24
@@ -1954,16 +1954,29 @@ Record.register_type(SpfRecord)
|
||||
|
||||
class SrvValue(EqualityTupleMixin, dict):
|
||||
@classmethod
|
||||
def preprocess_value(self, value):
|
||||
if isinstance(value, str):
|
||||
priority, weight, port, target = value.split(' ', 3)
|
||||
return {
|
||||
'priority': priority,
|
||||
'weight': weight,
|
||||
'port': port,
|
||||
'target': target,
|
||||
}
|
||||
return value
|
||||
def parse_rr_text(self, value):
|
||||
try:
|
||||
priority, weight, port, target = value.split(' ')
|
||||
except ValueError:
|
||||
raise RrParseError()
|
||||
try:
|
||||
priority = int(priority)
|
||||
except ValueError:
|
||||
pass
|
||||
try:
|
||||
weight = int(weight)
|
||||
except ValueError:
|
||||
pass
|
||||
try:
|
||||
port = int(port)
|
||||
except ValueError:
|
||||
pass
|
||||
return {
|
||||
'priority': priority,
|
||||
'weight': weight,
|
||||
'port': port,
|
||||
'target': target,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def validate(cls, data, _type):
|
||||
@@ -1971,6 +1984,14 @@ class SrvValue(EqualityTupleMixin, dict):
|
||||
data = (data,)
|
||||
reasons = []
|
||||
for value in data:
|
||||
if isinstance(value, str):
|
||||
# it's hopefully RR formatted, give parsing a try
|
||||
try:
|
||||
value = cls.parse_rr_text(value)
|
||||
except RrParseError as e:
|
||||
reasons.append(str(e))
|
||||
# not a dict so no point in continuing
|
||||
continue
|
||||
# TODO: validate algorithm and fingerprint_type values
|
||||
try:
|
||||
int(value['priority'])
|
||||
@@ -2010,6 +2031,8 @@ class SrvValue(EqualityTupleMixin, dict):
|
||||
return [cls(v) for v in values]
|
||||
|
||||
def __init__(self, value):
|
||||
if isinstance(value, str):
|
||||
value = self.parse_rr_text(value)
|
||||
super().__init__(
|
||||
{
|
||||
'priority': int(value['priority']),
|
||||
@@ -2084,21 +2107,34 @@ Record.register_type(SrvRecord)
|
||||
|
||||
class TlsaValue(EqualityTupleMixin, dict):
|
||||
@classmethod
|
||||
def preprocess_value(self, value):
|
||||
if isinstance(value, str):
|
||||
def parse_rr_text(self, value):
|
||||
try:
|
||||
(
|
||||
certificate_usage,
|
||||
certificate_association_data,
|
||||
selector,
|
||||
matching_type,
|
||||
certificate_association_data,
|
||||
) = value.split(' ', 3)
|
||||
return {
|
||||
'certificate_usage': certificate_usage,
|
||||
'certificate_association_data': certificate_association_data,
|
||||
'matching_type': matching_type,
|
||||
'certificate_association_data': certificate_association_data,
|
||||
}
|
||||
return value
|
||||
) = value.split(' ')
|
||||
except ValueError:
|
||||
raise RrParseError()
|
||||
try:
|
||||
certificate_usage = int(certificate_usage)
|
||||
except ValueError:
|
||||
pass
|
||||
try:
|
||||
selector = int(selector)
|
||||
except ValueError:
|
||||
pass
|
||||
try:
|
||||
matching_type = int(matching_type)
|
||||
except ValueError:
|
||||
pass
|
||||
return {
|
||||
'certificate_usage': certificate_usage,
|
||||
'selector': selector,
|
||||
'matching_type': matching_type,
|
||||
'certificate_association_data': certificate_association_data,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def validate(cls, data, _type):
|
||||
@@ -2106,6 +2142,14 @@ class TlsaValue(EqualityTupleMixin, dict):
|
||||
data = (data,)
|
||||
reasons = []
|
||||
for value in data:
|
||||
if isinstance(value, str):
|
||||
# it's hopefully RR formatted, give parsing a try
|
||||
try:
|
||||
value = cls.parse_rr_text(value)
|
||||
except RrParseError as e:
|
||||
reasons.append(str(e))
|
||||
# not a dict so no point in continuing
|
||||
continue
|
||||
try:
|
||||
certificate_usage = int(value.get('certificate_usage', 0))
|
||||
if certificate_usage < 0 or certificate_usage > 3:
|
||||
@@ -2149,6 +2193,8 @@ class TlsaValue(EqualityTupleMixin, dict):
|
||||
return [cls(v) for v in values]
|
||||
|
||||
def __init__(self, value):
|
||||
if isinstance(value, str):
|
||||
value = self.parse_rr_text(value)
|
||||
super().__init__(
|
||||
{
|
||||
'certificate_usage': int(value.get('certificate_usage', 0)),
|
||||
@@ -2192,6 +2238,10 @@ class TlsaValue(EqualityTupleMixin, dict):
|
||||
def certificate_association_data(self, value):
|
||||
self['certificate_association_data'] = value
|
||||
|
||||
@property
|
||||
def rr_text(self):
|
||||
return f'{self.certificate_usage} {self.selector} {self.matching_type} {self.certificate_association_data}'
|
||||
|
||||
def _equality_tuple(self):
|
||||
return (
|
||||
self.certificate_usage,
|
||||
@@ -2233,9 +2283,30 @@ class UrlfwdValue(EqualityTupleMixin, dict):
|
||||
VALID_QUERY = (0, 1)
|
||||
|
||||
@classmethod
|
||||
def preprocess_value(self, value):
|
||||
# TODO:
|
||||
return value
|
||||
def parse_rr_text(self, value):
|
||||
try:
|
||||
code, masking, query, path, target = value.split(' ')
|
||||
except ValueError:
|
||||
raise RrParseError()
|
||||
try:
|
||||
code = int(code)
|
||||
except ValueError:
|
||||
pass
|
||||
try:
|
||||
masking = int(masking)
|
||||
except ValueError:
|
||||
pass
|
||||
try:
|
||||
query = int(query)
|
||||
except ValueError:
|
||||
pass
|
||||
return {
|
||||
'code': code,
|
||||
'masking': masking,
|
||||
'query': query,
|
||||
'path': path,
|
||||
'target': target,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def validate(cls, data, _type):
|
||||
@@ -2243,6 +2314,14 @@ class UrlfwdValue(EqualityTupleMixin, dict):
|
||||
data = (data,)
|
||||
reasons = []
|
||||
for value in data:
|
||||
if isinstance(value, str):
|
||||
# it's hopefully RR formatted, give parsing a try
|
||||
try:
|
||||
value = cls.parse_rr_text(value)
|
||||
except RrParseError as e:
|
||||
reasons.append(str(e))
|
||||
# not a dict so no point in continuing
|
||||
continue
|
||||
try:
|
||||
code = int(value['code'])
|
||||
if code not in cls.VALID_CODES:
|
||||
@@ -2277,6 +2356,8 @@ class UrlfwdValue(EqualityTupleMixin, dict):
|
||||
return [UrlfwdValue(v) for v in values]
|
||||
|
||||
def __init__(self, value):
|
||||
if isinstance(value, str):
|
||||
value = self.parse_rr_text(value)
|
||||
super().__init__(
|
||||
{
|
||||
'path': value['path'],
|
||||
@@ -2327,6 +2408,12 @@ class UrlfwdValue(EqualityTupleMixin, dict):
|
||||
def query(self, value):
|
||||
self['query'] = value
|
||||
|
||||
@property
|
||||
def rr_text(self):
|
||||
return (
|
||||
f'{self.code} {self.masking} {self.query} {self.path} {self.target}'
|
||||
)
|
||||
|
||||
def _equality_tuple(self):
|
||||
return (self.path, self.target, self.code, self.masking, self.query)
|
||||
|
||||
|
||||
+230
-12
@@ -52,6 +52,7 @@ from octodns.record import (
|
||||
_DynamicPool,
|
||||
_DynamicRule,
|
||||
_NsValue,
|
||||
_TargetValue,
|
||||
)
|
||||
from octodns.zone import Zone
|
||||
|
||||
@@ -419,7 +420,7 @@ class TestRecord(TestCase):
|
||||
'1.2.word.4',
|
||||
'1.2.3.4',
|
||||
):
|
||||
self.assertEqual(s, Ipv4Address.parse_rr_text(s))
|
||||
self.assertEqual(s, _TargetValue.parse_rr_text(s))
|
||||
|
||||
# since we're a noop there's no need/way to check whether validate or
|
||||
# __init__ call parse_rr_text
|
||||
@@ -656,12 +657,33 @@ class TestRecord(TestCase):
|
||||
a.__repr__()
|
||||
|
||||
def test_loc_value_rr_text(self):
|
||||
# only the exact correct number of words
|
||||
# only the exact correct number of words is allowed
|
||||
for i in tuple(range(0, 12)) + (13,):
|
||||
s = ''.join(['word'] * i)
|
||||
with self.assertRaises(RrParseError):
|
||||
LocValue.parse_rr_text(s)
|
||||
|
||||
# type conversions are best effort
|
||||
self.assertEqual(
|
||||
{
|
||||
'altitude': 'six',
|
||||
'lat_degrees': 'zero',
|
||||
'lat_direction': 'S',
|
||||
'lat_minutes': 'one',
|
||||
'lat_seconds': 'two',
|
||||
'long_degrees': 'three',
|
||||
'long_direction': 'W',
|
||||
'long_minutes': 'four',
|
||||
'long_seconds': 'five',
|
||||
'precision_horz': 'eight',
|
||||
'precision_vert': 'nine',
|
||||
'size': 'seven',
|
||||
},
|
||||
LocValue.parse_rr_text(
|
||||
'zero one two S three four five W six seven eight nine'
|
||||
),
|
||||
)
|
||||
|
||||
# valid
|
||||
s = '0 1 2.2 N 3 4 5.5 E 6.6m 7.7m 8.8m 9.9m'
|
||||
self.assertEqual(
|
||||
@@ -1385,16 +1407,6 @@ class TestRecord(TestCase):
|
||||
self.assertEqual(a_values[0]['weight'], a.values[0].weight)
|
||||
self.assertEqual(a_values[0]['port'], a.values[0].port)
|
||||
self.assertEqual(a_values[0]['target'], a.values[0].target)
|
||||
from pprint import pprint
|
||||
|
||||
pprint(
|
||||
{
|
||||
'a_values': a_values,
|
||||
'self': a_data,
|
||||
'other': a.data,
|
||||
'a.values': a.values,
|
||||
}
|
||||
)
|
||||
self.assertEqual(a_data, a.data)
|
||||
|
||||
b_value = SrvValue(
|
||||
@@ -1441,6 +1453,73 @@ class TestRecord(TestCase):
|
||||
# __repr__ doesn't blow up
|
||||
a.__repr__()
|
||||
|
||||
def test_srv_value_rr_text(self):
|
||||
|
||||
# empty string won't parse
|
||||
with self.assertRaises(RrParseError):
|
||||
SrvValue.parse_rr_text('')
|
||||
|
||||
# single word won't parse
|
||||
with self.assertRaises(RrParseError):
|
||||
SrvValue.parse_rr_text('nope')
|
||||
|
||||
# 2nd word won't parse
|
||||
with self.assertRaises(RrParseError):
|
||||
SrvValue.parse_rr_text('1 2')
|
||||
|
||||
# 3rd word won't parse
|
||||
with self.assertRaises(RrParseError):
|
||||
SrvValue.parse_rr_text('1 2 3')
|
||||
|
||||
# 5th word won't parse
|
||||
with self.assertRaises(RrParseError):
|
||||
SrvValue.parse_rr_text('1 2 3 4 5')
|
||||
|
||||
# priority weight and port not ints
|
||||
self.assertEqual(
|
||||
{
|
||||
'priority': 'one',
|
||||
'weight': 'two',
|
||||
'port': 'three',
|
||||
'target': 'srv.unit.tests.',
|
||||
},
|
||||
SrvValue.parse_rr_text('one two three srv.unit.tests.'),
|
||||
)
|
||||
|
||||
# valid
|
||||
self.assertEqual(
|
||||
{
|
||||
'priority': 1,
|
||||
'weight': 2,
|
||||
'port': 3,
|
||||
'target': 'srv.unit.tests.',
|
||||
},
|
||||
SrvValue.parse_rr_text('1 2 3 srv.unit.tests.'),
|
||||
)
|
||||
|
||||
# make sure that validate is using parse_rr_text when passed string
|
||||
# value(s)
|
||||
reasons = SrvRecord.validate(
|
||||
'_srv._tcp', '_srv._tcp.unit.tests.', {'ttl': 32, 'value': ''}
|
||||
)
|
||||
self.assertEqual(['failed to parse string value as RR text'], reasons)
|
||||
reasons = SrvRecord.validate(
|
||||
'_srv._tcp',
|
||||
'_srv._tcp.unit.tests.',
|
||||
{'ttl': 32, 'value': '1 2 3 srv.unit.tests.'},
|
||||
)
|
||||
self.assertFalse(reasons)
|
||||
|
||||
# make sure that the cstor is using parse_rr_text
|
||||
zone = Zone('unit.tests.', [])
|
||||
a = SrvRecord(
|
||||
zone, '_srv._tcp', {'ttl': 32, 'value': '1 2 3 srv.unit.tests.'}
|
||||
)
|
||||
self.assertEqual(1, a.values[0].priority)
|
||||
self.assertEqual(2, a.values[0].weight)
|
||||
self.assertEqual(3, a.values[0].port)
|
||||
self.assertEqual('srv.unit.tests.', a.values[0].target)
|
||||
|
||||
def test_tlsa(self):
|
||||
a_values = [
|
||||
TlsaValue(
|
||||
@@ -1542,6 +1621,70 @@ class TestRecord(TestCase):
|
||||
# __repr__ doesn't blow up
|
||||
a.__repr__()
|
||||
|
||||
def test_tsla_value_rr_text(self):
|
||||
|
||||
# empty string won't parse
|
||||
with self.assertRaises(RrParseError):
|
||||
TlsaValue.parse_rr_text('')
|
||||
|
||||
# single word won't parse
|
||||
with self.assertRaises(RrParseError):
|
||||
TlsaValue.parse_rr_text('nope')
|
||||
|
||||
# 2nd word won't parse
|
||||
with self.assertRaises(RrParseError):
|
||||
TlsaValue.parse_rr_text('1 2')
|
||||
|
||||
# 3rd word won't parse
|
||||
with self.assertRaises(RrParseError):
|
||||
TlsaValue.parse_rr_text('1 2 3')
|
||||
|
||||
# 5th word won't parse
|
||||
with self.assertRaises(RrParseError):
|
||||
TlsaValue.parse_rr_text('1 2 3 abcd another')
|
||||
|
||||
# non-ints
|
||||
self.assertEqual(
|
||||
{
|
||||
'certificate_usage': 'one',
|
||||
'selector': 'two',
|
||||
'matching_type': 'three',
|
||||
'certificate_association_data': 'abcd',
|
||||
},
|
||||
TlsaValue.parse_rr_text('one two three abcd'),
|
||||
)
|
||||
|
||||
# valid
|
||||
self.assertEqual(
|
||||
{
|
||||
'certificate_usage': 1,
|
||||
'selector': 2,
|
||||
'matching_type': 3,
|
||||
'certificate_association_data': 'abcd',
|
||||
},
|
||||
TlsaValue.parse_rr_text('1 2 3 abcd'),
|
||||
)
|
||||
|
||||
# make sure that validate is using parse_rr_text when passed string
|
||||
# value(s)
|
||||
reasons = TlsaRecord.validate(
|
||||
'tlsa', 'tlsa.unit.tests.', {'ttl': 32, 'value': ''}
|
||||
)
|
||||
self.assertEqual(['failed to parse string value as RR text'], reasons)
|
||||
reasons = TlsaRecord.validate(
|
||||
'tlsa', 'tlsa.unit.tests.', {'ttl': 32, 'value': '2 1 0 abcd'}
|
||||
)
|
||||
self.assertFalse(reasons)
|
||||
|
||||
# make sure that the cstor is using parse_rr_text
|
||||
zone = Zone('unit.tests.', [])
|
||||
a = TlsaRecord(zone, 'tlsa', {'ttl': 32, 'value': '2 1 0 abcd'})
|
||||
self.assertEqual(2, a.values[0].certificate_usage)
|
||||
self.assertEqual(1, a.values[0].selector)
|
||||
self.assertEqual(0, a.values[0].matching_type)
|
||||
self.assertEqual('abcd', a.values[0].certificate_association_data)
|
||||
self.assertEqual('2 1 0 abcd', a.values[0].rr_text)
|
||||
|
||||
def test_txt(self):
|
||||
a_values = ['a one', 'a two']
|
||||
b_value = 'b other'
|
||||
@@ -1666,6 +1809,81 @@ class TestRecord(TestCase):
|
||||
# __repr__ doesn't blow up
|
||||
a.__repr__()
|
||||
|
||||
def test_urlfwd_value_rr_text(self):
|
||||
|
||||
# empty string won't parse
|
||||
with self.assertRaises(RrParseError):
|
||||
UrlfwdValue.parse_rr_text('')
|
||||
|
||||
# single word won't parse
|
||||
with self.assertRaises(RrParseError):
|
||||
UrlfwdValue.parse_rr_text('nope')
|
||||
|
||||
# 2nd word won't parse
|
||||
with self.assertRaises(RrParseError):
|
||||
UrlfwdValue.parse_rr_text('one two')
|
||||
|
||||
# 3rd word won't parse
|
||||
with self.assertRaises(RrParseError):
|
||||
UrlfwdValue.parse_rr_text('one two three')
|
||||
|
||||
# 4th word won't parse
|
||||
with self.assertRaises(RrParseError):
|
||||
UrlfwdValue.parse_rr_text('one two three four')
|
||||
|
||||
# 6th word won't parse
|
||||
with self.assertRaises(RrParseError):
|
||||
UrlfwdValue.parse_rr_text('one two three four five size')
|
||||
|
||||
# non-ints
|
||||
self.assertEqual(
|
||||
{
|
||||
'code': 'one',
|
||||
'masking': 'two',
|
||||
'query': 'three',
|
||||
'path': 'four',
|
||||
'target': 'five',
|
||||
},
|
||||
UrlfwdValue.parse_rr_text('one two three four five'),
|
||||
)
|
||||
|
||||
# valid
|
||||
self.assertEqual(
|
||||
{
|
||||
'code': 301,
|
||||
'masking': 0,
|
||||
'query': 1,
|
||||
'path': 'four',
|
||||
'target': 'five',
|
||||
},
|
||||
UrlfwdValue.parse_rr_text('301 0 1 four five'),
|
||||
)
|
||||
|
||||
# make sure that validate is using parse_rr_text when passed string
|
||||
# value(s)
|
||||
reasons = UrlfwdRecord.validate(
|
||||
'urlfwd', 'urlfwd.unit.tests.', {'ttl': 32, 'value': ''}
|
||||
)
|
||||
self.assertEqual(['failed to parse string value as RR text'], reasons)
|
||||
reasons = UrlfwdRecord.validate(
|
||||
'urlfwd',
|
||||
'urlfwd.unit.tests.',
|
||||
{'ttl': 32, 'value': '301 0 1 four five'},
|
||||
)
|
||||
self.assertFalse(reasons)
|
||||
|
||||
# make sure that the cstor is using parse_rr_text
|
||||
zone = Zone('unit.tests.', [])
|
||||
a = UrlfwdRecord(
|
||||
zone, 'urlfwd', {'ttl': 32, 'value': '301 0 1 four five'}
|
||||
)
|
||||
self.assertEqual(301, a.values[0].code)
|
||||
self.assertEqual(0, a.values[0].masking)
|
||||
self.assertEqual(1, a.values[0].query)
|
||||
self.assertEqual('four', a.values[0].path)
|
||||
self.assertEqual('five', a.values[0].target)
|
||||
self.assertEqual('301 0 1 four five', a.values[0].rr_text)
|
||||
|
||||
def test_record_new(self):
|
||||
txt = Record.new(
|
||||
self.zone, 'txt', {'ttl': 44, 'type': 'TXT', 'value': 'some text'}
|
||||
|
||||
Reference in New Issue
Block a user