From dd745f9ca61a8bb83e22417f6938ef0c082f7c64 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Fri, 10 May 2024 15:05:02 -0700 Subject: [PATCH] Near complete rework of chunked rdata handling/parsing --- octodns/record/chunked.py | 81 ++++++++++++++++--- tests/test_octodns_record_chunked.py | 112 +++++++++++++++++++++++++-- 2 files changed, 177 insertions(+), 16 deletions(-) diff --git a/octodns/record/chunked.py b/octodns/record/chunked.py index ea49782..3122704 100644 --- a/octodns/record/chunked.py +++ b/octodns/record/chunked.py @@ -3,8 +3,10 @@ # import re +from io import StringIO from .base import ValuesMixin +from .rr import RrParseError class _ChunkedValuesMixin(ValuesMixin): @@ -32,16 +34,78 @@ class _ChunkedValuesMixin(ValuesMixin): return self.chunked_values +def _parse(s, spec_unquoted=False, strict=False): + whitespace = {' ', '\t', '\n', '\r', '\f', '\v'} + + n = len(s) + pos = 0 + while pos < n: + if s[pos] in whitespace: + # skip whitespace (outside of piece) + pos += 1 + elif s[pos] == '"': + # it's a quoted chunk, run until we reach the closing quote, + # handling escaped quotes as we go + buf = StringIO() + pos += 1 + start = pos + while pos < n: + i = s.find('"', pos) + if i == -1: + if strict: + raise RrParseError() + # we didn't find a closing quote, best effort... return + # whatever we have left + yield s[start:] + # we've returned everything + pos = n + elif s[i - 1] == '\\': + # it was an escaped quote, grab everything before the escape + buf.write(s[start : i - 1]) + # we'll get the " as part of the next piece + start = i + pos = i + 1 + else: + # it was our closing quote, we have our chunk + buf.write(s[start:i]) + yield buf.getvalue() + pos = i + 1 + break + elif spec_unquoted: + # it's not quoted, we want everything up until the next whitespace + locs = sorted( + i for i in [s.find(c, pos) for c in whitespace] if i != -1 + ) + if locs: + i = locs[0] + # we have our whitespace, everything before it is our chunk + yield s[pos:i] + pos = i + 1 + else: + # we hit the end of s, whatever is left is our chunk + yield s[pos:] + pos += 1 + break + else: + # it's not quoted, we want everything verbatim, excluding any + # trailing whitespace + end = n - 1 + while end >= pos and s[end] in whitespace: + end -= 1 + yield s[pos : end + 1] + break + + class _ChunkedValue(str): _unescaped_semicolon_re = re.compile(r'\w;') - _chunk_sep_re = re.compile(r'"\s+"') @classmethod def parse_rdata_text(cls, value): - try: - return value.replace(';', '\\;') - except AttributeError: + if not value or not isinstance(value, str): return value + chunks = _parse(value, spec_unquoted=True, strict=True) + value = ''.join(chunks) + return value.replace(';', '\\;') @classmethod def validate(cls, data, _type): @@ -62,12 +126,9 @@ class _ChunkedValue(str): @classmethod def process(cls, values): ret = [] - for v in values: - # remove leading/trailing whitespace - v = v.strip() - if v and v[0] == '"': - v = v[1:-1] - ret.append(cls(cls._chunk_sep_re.sub('', v))) + for value in values: + value = ''.join(_parse(value)) + ret.append(cls(value)) return ret @property diff --git a/tests/test_octodns_record_chunked.py b/tests/test_octodns_record_chunked.py index fb6f991..46a221d 100644 --- a/tests/test_octodns_record_chunked.py +++ b/tests/test_octodns_record_chunked.py @@ -4,7 +4,8 @@ from unittest import TestCase -from octodns.record.chunked import _ChunkedValue +from octodns.record.chunked import _ChunkedValue, _parse +from octodns.record.rr import RrParseError from octodns.record.spf import SpfRecord from octodns.zone import Zone @@ -21,16 +22,40 @@ class TestRecordChunked(TestCase): 'some.words.that.here', '1.2.word.4', '1.2.3.4', - # quotes are not removed - '"Hello World!"', ): self.assertEqual(s, _ChunkedValue.parse_rdata_text(s)) + # quotes are removed + s = '"Hello World!"' + self.assertEqual(s.replace('"', ''), _ChunkedValue.parse_rdata_text(s)) + # semi-colons are escaped self.assertEqual( - 'Hello\\; World!', _ChunkedValue.parse_rdata_text('Hello; World!') + 'Hello\\; World!', _ChunkedValue.parse_rdata_text('"Hello; World!"') ) + # unquoted whitespace seperated pieces are concatenated + self.assertEqual( + 'thisrunstogether', + _ChunkedValue.parse_rdata_text('this runs\ttogether'), + ) + + # mix of quoted and unquoted + self.assertEqual( + 'This is quoted andthisisnot, this is back to being quoted', + _ChunkedValue.parse_rdata_text( + '"This is quoted " and this is not ", this is back to being quoted"' + ), + ) + + for s in ( + '"no closing quote', + '"no closing quote ', + '"no closing \\" quote', + ): + with self.assertRaises(RrParseError): + _ChunkedValue.parse_rdata_text(s) + # since we're always a string validate and __init__ don't # parse_rdata_text @@ -68,6 +93,81 @@ class TestChunkedValue(TestCase): _ChunkedValue.validate('Déjà vu', 'TXT'), ) + def test_quoted(self): + # test escaped double quotes + for value, expected in ( + ( + '"This is a quoted string with escaped \\"quotes\\""', + 'This is a quoted string with escaped "quotes"', + ), + ): + chunked = _ChunkedValue.process([value]) + self.assertEqual(1, len(chunked)) + chunked = chunked[0] + self.assertEqual(expected, chunked) + + # all whitespace + chunked = _ChunkedValue.process(['" \t\t"']) + self.assertEqual(1, len(chunked)) + self.assertEqual(' \t\t', chunked[0]) + + # TODO: missing closing quote + value = '"This is quoted, but has no end' + chunked = _ChunkedValue.process([value]) + self.assertEqual(1, len(chunked)) + self.assertEqual(value[1:], chunked[0]) + + # TODO: missing opening quote + + def test_unquoted(self): + for value in ( + 'This is not quoted', + ' This has leading space', + ' This has leading spaces', + '\tThis has a leading tab', + '\t\tThis has leading tabs', + ' \tThis has leading tabs', + 'This has trailing space ', + 'This has trailing spaces ', + 'This has a trailing tab\t', + 'This has trailing tabs\t\t', + ' \tThis has leading tabs\t ', + ' This has leading and trailing space ', + ' This has leading and trailing space ', + '\tThis has a leading and trailing tab\t', + '\t\tThis has leading and trailing tabs\t\t', + 'This has a quote " in the middle', + ): + chunked = _ChunkedValue.process([value]) + self.assertEqual(1, len(chunked)) + self.assertEqual(value.strip(), chunked[0]) + + # all whitespace + chunked = _ChunkedValue.process([' ']) + self.assertEqual(1, len(chunked)) + self.assertEqual('', chunked[0]) + + def test_spec_unquoted(self): + for value in ( + 'This is not quoted', + ' This has leading space', + ' This has leading spaces', + '\tThis has a leading tab', + '\t\tThis has leading tabs', + ' \tThis has leading tabs', + 'This has trailing space ', + 'This has trailing spaces ', + 'This has a trailing tab\t', + 'This has trailing tabs\t\t', + ' \tThis has leading tabs\t ', + ' This has leading and trailing space ', + ' This has leading and trailing space ', + '\tThis has a leading and trailing tab\t', + '\t\tThis has leading and trailing tabs\t\t', + ): + parsed = list(_parse(value, spec_unquoted=True)) + self.assertEqual(value.strip().split(), parsed) + def test_large_values(self): # There is additional testing in TXT @@ -100,7 +200,7 @@ class TestChunkedValue(TestCase): ) self.assertEqual(dechunked_value, chunked) - # already dechunked, noop + # non-quoted is a no-op chunked = _ChunkedValue.process([dechunked_value])[0] self.assertEqual(dechunked_value, chunked) @@ -153,7 +253,7 @@ class TestChunkedValue(TestCase): # ~real world test case values = [ 'before', - ' "v=DKIM1\\; h=sha256\\; k=rsa\\; p=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAx78E7PtJvr8vpoNgHdIAe+llFKoy8WuTXDd6Z5mm3D4AUva9MBt5fFetxg/kcRy3KMDnMw6kDybwbpS/oPw1ylk6DL1xit7Cr5xeYYSWKukxXURAlHwT2K72oUsFKRUvN1X9lVysAeo+H8H/22Z9fJ0P30sOuRIRqCaiz+OiUYicxy4x" "rpfH2s9a+o3yRwX3zhlp8GjRmmmyK5mf7CkQTCfjnKVsYtB7mabXXmClH9tlcymnBMoN9PeXxaS5JRRysVV8RBCC9/wmfp9y//cck8nvE/MavFpSUHvv+TfTTdVKDlsXPjKX8iZQv0nO3xhspgkqFquKjydiR8nf4meHhwIDAQAB" ', + '"v=DKIM1\\; h=sha256\\; k=rsa\\; p=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAx78E7PtJvr8vpoNgHdIAe+llFKoy8WuTXDd6Z5mm3D4AUva9MBt5fFetxg/kcRy3KMDnMw6kDybwbpS/oPw1ylk6DL1xit7Cr5xeYYSWKukxXURAlHwT2K72oUsFKRUvN1X9lVysAeo+H8H/22Z9fJ0P30sOuRIRqCaiz+OiUYicxy4x" "rpfH2s9a+o3yRwX3zhlp8GjRmmmyK5mf7CkQTCfjnKVsYtB7mabXXmClH9tlcymnBMoN9PeXxaS5JRRysVV8RBCC9/wmfp9y//cck8nvE/MavFpSUHvv+TfTTdVKDlsXPjKX8iZQv0nO3xhspgkqFquKjydiR8nf4meHhwIDAQAB"', 'z after', ] chunked = _ChunkedValue.process(values)