1
0
mirror of https://github.com/github/octodns.git synced 2024-05-11 05:55:00 +00:00

Near complete rework of chunked rdata handling/parsing

This commit is contained in:
Ross McFarland
2024-05-10 15:05:02 -07:00
parent 89b3650c4c
commit dd745f9ca6
2 changed files with 177 additions and 16 deletions

View File

@ -3,8 +3,10 @@
#
import re
from io import StringIO
from .base import ValuesMixin
from .rr import RrParseError
class _ChunkedValuesMixin(ValuesMixin):
@ -32,16 +34,78 @@ class _ChunkedValuesMixin(ValuesMixin):
return self.chunked_values
def _parse(s, spec_unquoted=False, strict=False):
whitespace = {' ', '\t', '\n', '\r', '\f', '\v'}
n = len(s)
pos = 0
while pos < n:
if s[pos] in whitespace:
# skip whitespace (outside of piece)
pos += 1
elif s[pos] == '"':
# it's a quoted chunk, run until we reach the closing quote,
# handling escaped quotes as we go
buf = StringIO()
pos += 1
start = pos
while pos < n:
i = s.find('"', pos)
if i == -1:
if strict:
raise RrParseError()
# we didn't find a closing quote, best effort... return
# whatever we have left
yield s[start:]
# we've returned everything
pos = n
elif s[i - 1] == '\\':
# it was an escaped quote, grab everything before the escape
buf.write(s[start : i - 1])
# we'll get the " as part of the next piece
start = i
pos = i + 1
else:
# it was our closing quote, we have our chunk
buf.write(s[start:i])
yield buf.getvalue()
pos = i + 1
break
elif spec_unquoted:
# it's not quoted, we want everything up until the next whitespace
locs = sorted(
i for i in [s.find(c, pos) for c in whitespace] if i != -1
)
if locs:
i = locs[0]
# we have our whitespace, everything before it is our chunk
yield s[pos:i]
pos = i + 1
else:
# we hit the end of s, whatever is left is our chunk
yield s[pos:]
pos += 1
break
else:
# it's not quoted, we want everything verbatim, excluding any
# trailing whitespace
end = n - 1
while end >= pos and s[end] in whitespace:
end -= 1
yield s[pos : end + 1]
break
class _ChunkedValue(str):
_unescaped_semicolon_re = re.compile(r'\w;')
_chunk_sep_re = re.compile(r'"\s+"')
@classmethod
def parse_rdata_text(cls, value):
try:
return value.replace(';', '\\;')
except AttributeError:
if not value or not isinstance(value, str):
return value
chunks = _parse(value, spec_unquoted=True, strict=True)
value = ''.join(chunks)
return value.replace(';', '\\;')
@classmethod
def validate(cls, data, _type):
@ -62,12 +126,9 @@ class _ChunkedValue(str):
@classmethod
def process(cls, values):
ret = []
for v in values:
# remove leading/trailing whitespace
v = v.strip()
if v and v[0] == '"':
v = v[1:-1]
ret.append(cls(cls._chunk_sep_re.sub('', v)))
for value in values:
value = ''.join(_parse(value))
ret.append(cls(value))
return ret
@property

View File

@ -4,7 +4,8 @@
from unittest import TestCase
from octodns.record.chunked import _ChunkedValue
from octodns.record.chunked import _ChunkedValue, _parse
from octodns.record.rr import RrParseError
from octodns.record.spf import SpfRecord
from octodns.zone import Zone
@ -21,16 +22,40 @@ class TestRecordChunked(TestCase):
'some.words.that.here',
'1.2.word.4',
'1.2.3.4',
# quotes are not removed
'"Hello World!"',
):
self.assertEqual(s, _ChunkedValue.parse_rdata_text(s))
# quotes are removed
s = '"Hello World!"'
self.assertEqual(s.replace('"', ''), _ChunkedValue.parse_rdata_text(s))
# semi-colons are escaped
self.assertEqual(
'Hello\\; World!', _ChunkedValue.parse_rdata_text('Hello; World!')
'Hello\\; World!', _ChunkedValue.parse_rdata_text('"Hello; World!"')
)
# unquoted whitespace seperated pieces are concatenated
self.assertEqual(
'thisrunstogether',
_ChunkedValue.parse_rdata_text('this runs\ttogether'),
)
# mix of quoted and unquoted
self.assertEqual(
'This is quoted andthisisnot, this is back to being quoted',
_ChunkedValue.parse_rdata_text(
'"This is quoted " and this is not ", this is back to being quoted"'
),
)
for s in (
'"no closing quote',
'"no closing quote ',
'"no closing \\" quote',
):
with self.assertRaises(RrParseError):
_ChunkedValue.parse_rdata_text(s)
# since we're always a string validate and __init__ don't
# parse_rdata_text
@ -68,6 +93,81 @@ class TestChunkedValue(TestCase):
_ChunkedValue.validate('Déjà vu', 'TXT'),
)
def test_quoted(self):
# test escaped double quotes
for value, expected in (
(
'"This is a quoted string with escaped \\"quotes\\""',
'This is a quoted string with escaped "quotes"',
),
):
chunked = _ChunkedValue.process([value])
self.assertEqual(1, len(chunked))
chunked = chunked[0]
self.assertEqual(expected, chunked)
# all whitespace
chunked = _ChunkedValue.process(['" \t\t"'])
self.assertEqual(1, len(chunked))
self.assertEqual(' \t\t', chunked[0])
# TODO: missing closing quote
value = '"This is quoted, but has no end'
chunked = _ChunkedValue.process([value])
self.assertEqual(1, len(chunked))
self.assertEqual(value[1:], chunked[0])
# TODO: missing opening quote
def test_unquoted(self):
for value in (
'This is not quoted',
' This has leading space',
' This has leading spaces',
'\tThis has a leading tab',
'\t\tThis has leading tabs',
' \tThis has leading tabs',
'This has trailing space ',
'This has trailing spaces ',
'This has a trailing tab\t',
'This has trailing tabs\t\t',
' \tThis has leading tabs\t ',
' This has leading and trailing space ',
' This has leading and trailing space ',
'\tThis has a leading and trailing tab\t',
'\t\tThis has leading and trailing tabs\t\t',
'This has a quote " in the middle',
):
chunked = _ChunkedValue.process([value])
self.assertEqual(1, len(chunked))
self.assertEqual(value.strip(), chunked[0])
# all whitespace
chunked = _ChunkedValue.process([' '])
self.assertEqual(1, len(chunked))
self.assertEqual('', chunked[0])
def test_spec_unquoted(self):
for value in (
'This is not quoted',
' This has leading space',
' This has leading spaces',
'\tThis has a leading tab',
'\t\tThis has leading tabs',
' \tThis has leading tabs',
'This has trailing space ',
'This has trailing spaces ',
'This has a trailing tab\t',
'This has trailing tabs\t\t',
' \tThis has leading tabs\t ',
' This has leading and trailing space ',
' This has leading and trailing space ',
'\tThis has a leading and trailing tab\t',
'\t\tThis has leading and trailing tabs\t\t',
):
parsed = list(_parse(value, spec_unquoted=True))
self.assertEqual(value.strip().split(), parsed)
def test_large_values(self):
# There is additional testing in TXT
@ -100,7 +200,7 @@ class TestChunkedValue(TestCase):
)
self.assertEqual(dechunked_value, chunked)
# already dechunked, noop
# non-quoted is a no-op
chunked = _ChunkedValue.process([dechunked_value])[0]
self.assertEqual(dechunked_value, chunked)
@ -153,7 +253,7 @@ class TestChunkedValue(TestCase):
# ~real world test case
values = [
'before',
' "v=DKIM1\\; h=sha256\\; k=rsa\\; p=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAx78E7PtJvr8vpoNgHdIAe+llFKoy8WuTXDd6Z5mm3D4AUva9MBt5fFetxg/kcRy3KMDnMw6kDybwbpS/oPw1ylk6DL1xit7Cr5xeYYSWKukxXURAlHwT2K72oUsFKRUvN1X9lVysAeo+H8H/22Z9fJ0P30sOuRIRqCaiz+OiUYicxy4x" "rpfH2s9a+o3yRwX3zhlp8GjRmmmyK5mf7CkQTCfjnKVsYtB7mabXXmClH9tlcymnBMoN9PeXxaS5JRRysVV8RBCC9/wmfp9y//cck8nvE/MavFpSUHvv+TfTTdVKDlsXPjKX8iZQv0nO3xhspgkqFquKjydiR8nf4meHhwIDAQAB" ',
'"v=DKIM1\\; h=sha256\\; k=rsa\\; p=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAx78E7PtJvr8vpoNgHdIAe+llFKoy8WuTXDd6Z5mm3D4AUva9MBt5fFetxg/kcRy3KMDnMw6kDybwbpS/oPw1ylk6DL1xit7Cr5xeYYSWKukxXURAlHwT2K72oUsFKRUvN1X9lVysAeo+H8H/22Z9fJ0P30sOuRIRqCaiz+OiUYicxy4x" "rpfH2s9a+o3yRwX3zhlp8GjRmmmyK5mf7CkQTCfjnKVsYtB7mabXXmClH9tlcymnBMoN9PeXxaS5JRRysVV8RBCC9/wmfp9y//cck8nvE/MavFpSUHvv+TfTTdVKDlsXPjKX8iZQv0nO3xhspgkqFquKjydiR8nf4meHhwIDAQAB"',
'z after',
]
chunked = _ChunkedValue.process(values)