1
0
mirror of https://github.com/github/octodns.git synced 2024-05-11 05:55:00 +00:00

Merge branch 'main' into records-rfc-test

This commit is contained in:
Ross McFarland
2022-09-21 09:37:00 -07:00
committed by GitHub
98 changed files with 1041 additions and 777 deletions

View File

@@ -1,3 +1,7 @@
manager:
processors:
- global-counter
providers:
config:
# This helps us get coverage when printing out provider versions
@@ -19,6 +23,8 @@ processors:
test:
# This helps us get coverage when printing out processor versions
class: helpers.TestBaseProcessor
global-counter:
class: helpers.CountingProcessor
zones:
unit.tests.:

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from shutil import rmtree
from tempfile import mkdtemp
from logging import getLogger
@@ -131,3 +124,13 @@ class TestYamlProvider(YamlProvider):
class TestBaseProcessor(BaseProcessor):
pass
class CountingProcessor(BaseProcessor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.count = 0
def process_source_zone(self, zone, *args, **kwargs):
self.count += len(zone.records)
return zone

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase
from octodns.equality import EqualityTupleMixin

View File

@@ -2,16 +2,9 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase
from octodns.idna import idna_decode, idna_encode
from octodns.idna import IdnaDict, IdnaError, idna_decode, idna_encode
class TestIdna(TestCase):
@@ -56,5 +49,107 @@ class TestIdna(TestCase):
self.assertIdna('bleep_bloop.foo_bar.pl.', 'bleep_bloop.foo_bar.pl.')
def test_case_insensitivity(self):
# Shouldn't be hit by octoDNS use cases, but checked anyway
self.assertEqual('zajęzyk.pl.', idna_decode('XN--ZAJZYK-Y4A.PL.'))
self.assertEqual('xn--zajzyk-y4a.pl.', idna_encode('ZajęzyK.Pl.'))
def test_repeated_encode_decoded(self):
self.assertEqual(
'zajęzyk.pl.', idna_decode(idna_decode('xn--zajzyk-y4a.pl.'))
)
self.assertEqual(
'xn--zajzyk-y4a.pl.', idna_encode(idna_encode('zajęzyk.pl.'))
)
def test_exception_translation(self):
with self.assertRaises(IdnaError) as ctx:
idna_encode('déjà..vu.')
self.assertEqual('Empty Label', str(ctx.exception))
with self.assertRaises(IdnaError) as ctx:
idna_decode('xn--djvu-1na6c..com.')
self.assertEqual('Empty Label', str(ctx.exception))
class TestIdnaDict(TestCase):
plain = 'testing.tests.'
almost = 'tésting.tests.'
utf8 = 'déjà.vu.'
normal = {plain: 42, almost: 43, utf8: 44}
def test_basics(self):
d = IdnaDict()
# plain ascii
d[self.plain] = 42
self.assertEqual(42, d[self.plain])
# almost the same, single utf-8 char
d[self.almost] = 43
# fetch as utf-8
self.assertEqual(43, d[self.almost])
# fetch as idna
self.assertEqual(43, d[idna_encode(self.almost)])
# plain is stil there, unchanged
self.assertEqual(42, d[self.plain])
# lots of utf8
d[self.utf8] = 44
self.assertEqual(44, d[self.utf8])
self.assertEqual(44, d[idna_encode(self.utf8)])
# setting with idna version replaces something set previously with utf8
d[idna_encode(self.almost)] = 45
self.assertEqual(45, d[self.almost])
self.assertEqual(45, d[idna_encode(self.almost)])
# contains
self.assertTrue(self.plain in d)
self.assertTrue(self.almost in d)
self.assertTrue(idna_encode(self.almost) in d)
self.assertTrue(self.utf8 in d)
self.assertTrue(idna_encode(self.utf8) in d)
# we can delete with either form
del d[self.almost]
self.assertFalse(self.almost in d)
self.assertFalse(idna_encode(self.almost) in d)
del d[idna_encode(self.utf8)]
self.assertFalse(self.utf8 in d)
self.assertFalse(idna_encode(self.utf8) in d)
# smoke test of repr
d.__repr__()
def test_keys(self):
d = IdnaDict(self.normal)
# keys are idna versions by default
self.assertEqual(
(self.plain, idna_encode(self.almost), idna_encode(self.utf8)),
tuple(d.keys()),
)
# decoded keys gives the utf8 version
self.assertEqual(
(self.plain, self.almost, self.utf8), tuple(d.decoded_keys())
)
def test_items(self):
d = IdnaDict(self.normal)
# idna keys in items
self.assertEqual(
(
(self.plain, 42),
(idna_encode(self.almost), 43),
(idna_encode(self.utf8), 44),
),
tuple(d.items()),
)
# utf8 keys in decoded_items
self.assertEqual(
((self.plain, 42), (self.almost, 43), (self.utf8, 44)),
tuple(d.decoded_items()),
)

View File

@@ -2,17 +2,11 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from os import environ
from os.path import dirname, isfile, join
from octodns import __VERSION__
from octodns.idna import IdnaDict, idna_encode
from octodns.manager import (
_AggregateTarget,
MainThreadExecutor,
@@ -182,6 +176,50 @@ class TestManager(TestCase):
).sync(dry_run=False, force=True)
self.assertEqual(33, tc)
def test_idna_eligible_zones(self):
# loading w/simple, but we'll be blowing it away and doing some manual
# stuff
manager = Manager(get_config_filename('simple.yaml'))
# these configs won't be valid, but that's fine we can test what we're
# after based on exceptions raised
manager.config['zones'] = manager._config_zones(
{'déjà.vu.': {}, 'deja.vu.': {}, idna_encode('こんにちは.jp.'): {}}
)
from pprint import pprint
pprint(manager.config['zones'])
# refer to them with utf-8
with self.assertRaises(ManagerException) as ctx:
manager.sync(eligible_zones=('déjà.vu.',))
self.assertEqual('Zone déjà.vu. is missing sources', str(ctx.exception))
with self.assertRaises(ManagerException) as ctx:
manager.sync(eligible_zones=('deja.vu.',))
self.assertEqual('Zone deja.vu. is missing sources', str(ctx.exception))
with self.assertRaises(ManagerException) as ctx:
manager.sync(eligible_zones=('こんにちは.jp.',))
self.assertEqual(
'Zone こんにちは.jp. is missing sources', str(ctx.exception)
)
# refer to them with idna (exceptions are still utf-8
with self.assertRaises(ManagerException) as ctx:
manager.sync(eligible_zones=(idna_encode('déjà.vu.'),))
self.assertEqual('Zone déjà.vu. is missing sources', str(ctx.exception))
with self.assertRaises(ManagerException) as ctx:
manager.sync(eligible_zones=(idna_encode('deja.vu.'),))
self.assertEqual('Zone deja.vu. is missing sources', str(ctx.exception))
with self.assertRaises(ManagerException) as ctx:
manager.sync(eligible_zones=(idna_encode('こんにちは.jp.'),))
self.assertEqual(
'Zone こんにちは.jp. is missing sources', str(ctx.exception)
)
def test_eligible_sources(self):
with TemporaryDirectory() as tmpdir:
environ['YAML_TMP_DIR'] = tmpdir.dirname
@@ -236,7 +274,7 @@ class TestManager(TestCase):
get_config_filename('simple-alias-zone.yaml')
).sync(eligible_zones=["alias.tests."])
self.assertEqual(
'Zone alias.tests. cannot be sync without zone '
'Zone alias.tests. cannot be synced without zone '
'unit.tests. sinced it is aliased',
str(ctx.exception),
)
@@ -598,9 +636,16 @@ class TestManager(TestCase):
def test_processor_config(self):
# Smoke test loading a valid config
manager = Manager(get_config_filename('processors.yaml'))
self.assertEqual(['noop', 'test'], list(manager.processors.keys()))
self.assertEqual(
['noop', 'test', 'global-counter'], list(manager.processors.keys())
)
# make sure we got the global processor and that it's count is 0 now
self.assertEqual(['global-counter'], manager.global_processors)
self.assertEqual(0, manager.processors['global-counter'].count)
# This zone specifies a valid processor
manager.sync(['unit.tests.'])
# make sure the global processor ran and counted some records
self.assertTrue(manager.processors['global-counter'].count >= 25)
with self.assertRaises(ManagerException) as ctx:
# This zone specifies a non-existent processor
@@ -831,6 +876,41 @@ class TestManager(TestCase):
set(), manager.configured_sub_zones('bar.foo.unit.tests.')
)
def test_config_zones(self):
manager = Manager(get_config_filename('simple.yaml'))
# empty == empty
self.assertEqual({}, manager._config_zones({}))
# single ascii comes back as-is, but in a IdnaDict
zones = manager._config_zones({'unit.tests.': 42})
self.assertEqual({'unit.tests.': 42}, zones)
self.assertIsInstance(zones, IdnaDict)
# single utf-8 comes back idna encoded
self.assertEqual(
{idna_encode('Déjà.vu.'): 42},
dict(manager._config_zones({'Déjà.vu.': 42})),
)
# ascii and non-matching idna as ok
self.assertEqual(
{idna_encode('déjà.vu.'): 42, 'deja.vu.': 43},
dict(
manager._config_zones(
{idna_encode('déjà.vu.'): 42, 'deja.vu.': 43}
)
),
)
with self.assertRaises(ManagerException) as ctx:
# zone configured with both utf-8 and idna is an error
manager._config_zones({'Déjà.vu.': 42, idna_encode('Déjà.vu.'): 43})
self.assertEqual(
'"déjà.vu." configured both in utf-8 and idna "xn--dj-kia8a.vu."',
str(ctx.exception),
)
class TestMainThreadExecutor(TestCase):
def test_success(self):

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from io import StringIO
from logging import getLogger
from unittest import TestCase

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase
from octodns.processor.acme import AcmeMangingProcessor

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase

View File

@@ -2,16 +2,14 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase
from octodns.processor.filter import TypeAllowlistFilter, TypeRejectlistFilter
from octodns.processor.filter import (
NameAllowlistFilter,
NameRejectlistFilter,
TypeAllowlistFilter,
TypeRejectlistFilter,
)
from octodns.record import Record
from octodns.zone import Zone
@@ -76,3 +74,83 @@ class TestTypeRejectListFilter(TestCase):
filter_a_aaaa = TypeRejectlistFilter('not-a-aaaa', set(('A', 'AAAA')))
got = filter_a_aaaa.process_target_zone(zone.copy())
self.assertEqual(['txt', 'txt2'], sorted([r.name for r in got.records]))
class TestNameAllowListFilter(TestCase):
zone = Zone('unit.tests.', [])
matches = Record.new(
zone, 'matches', {'type': 'A', 'ttl': 42, 'value': '1.2.3.4'}
)
zone.add_record(matches)
doesnt = Record.new(
zone, 'doesnt', {'type': 'A', 'ttl': 42, 'value': '2.3.4.5'}
)
zone.add_record(doesnt)
matchable1 = Record.new(
zone, 'start-f43ad96-end', {'type': 'A', 'ttl': 42, 'value': '3.4.5.6'}
)
zone.add_record(matchable1)
matchable2 = Record.new(
zone, 'start-a3b444c-end', {'type': 'A', 'ttl': 42, 'value': '4.5.6.7'}
)
zone.add_record(matchable2)
def test_exact(self):
allows = NameAllowlistFilter('exact', ('matches',))
self.assertEqual(4, len(self.zone.records))
filtered = allows.process_source_zone(self.zone.copy())
self.assertEqual(1, len(filtered.records))
self.assertEqual(['matches'], [r.name for r in filtered.records])
def test_regex(self):
allows = NameAllowlistFilter('exact', ('/^start-.+-end$/',))
self.assertEqual(4, len(self.zone.records))
filtered = allows.process_source_zone(self.zone.copy())
self.assertEqual(2, len(filtered.records))
self.assertEqual(
['start-a3b444c-end', 'start-f43ad96-end'],
sorted([r.name for r in filtered.records]),
)
class TestNameRejectListFilter(TestCase):
zone = Zone('unit.tests.', [])
matches = Record.new(
zone, 'matches', {'type': 'A', 'ttl': 42, 'value': '1.2.3.4'}
)
zone.add_record(matches)
doesnt = Record.new(
zone, 'doesnt', {'type': 'A', 'ttl': 42, 'value': '2.3.4.5'}
)
zone.add_record(doesnt)
matchable1 = Record.new(
zone, 'start-f43ad96-end', {'type': 'A', 'ttl': 42, 'value': '3.4.5.6'}
)
zone.add_record(matchable1)
matchable2 = Record.new(
zone, 'start-a3b444c-end', {'type': 'A', 'ttl': 42, 'value': '4.5.6.7'}
)
zone.add_record(matchable2)
def test_exact(self):
rejects = NameRejectlistFilter('exact', ('matches',))
self.assertEqual(4, len(self.zone.records))
filtered = rejects.process_source_zone(self.zone.copy())
self.assertEqual(3, len(filtered.records))
self.assertEqual(
['doesnt', 'start-a3b444c-end', 'start-f43ad96-end'],
sorted([r.name for r in filtered.records]),
)
def test_regex(self):
rejects = NameRejectlistFilter('exact', ('/^start-.+-end$/',))
self.assertEqual(4, len(self.zone.records))
filtered = rejects.process_source_zone(self.zone.copy())
self.assertEqual(2, len(filtered.records))
self.assertEqual(
['doesnt', 'matches'], sorted([r.name for r in filtered.records])
)

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase
from octodns.processor.ownership import OwnershipProcessor

View File

@@ -0,0 +1,113 @@
from unittest import TestCase
from octodns.processor.restrict import (
RestrictionException,
TtlRestrictionFilter,
)
from octodns.record import Record
from octodns.zone import Zone
class TestTtlRestrictionFilter(TestCase):
def test_restrict_ttl(self):
# configured values
restrictor = TtlRestrictionFilter('test', min_ttl=32, max_ttl=1024)
zone = Zone('unit.tests.', [])
good = Record.new(
zone, 'good', {'type': 'A', 'ttl': 42, 'value': '1.2.3.4'}
)
zone.add_record(good)
restricted = restrictor.process_source_zone(zone)
self.assertEqual(zone.records, restricted.records)
# too low
low = Record.new(
zone, 'low', {'type': 'A', 'ttl': 16, 'value': '1.2.3.4'}
)
copy = zone.copy()
copy.add_record(low)
with self.assertRaises(RestrictionException) as ctx:
restrictor.process_source_zone(copy)
self.assertEqual(
'low.unit.tests. ttl=16 too low, min_ttl=32', str(ctx.exception)
)
# with lenient set, we can go lower
lenient = Record.new(
zone,
'low',
{
'octodns': {'lenient': True},
'type': 'A',
'ttl': 16,
'value': '1.2.3.4',
},
)
copy = zone.copy()
copy.add_record(lenient)
restricted = restrictor.process_source_zone(copy)
self.assertEqual(copy.records, restricted.records)
# too high
high = Record.new(
zone, 'high', {'type': 'A', 'ttl': 2048, 'value': '1.2.3.4'}
)
copy = zone.copy()
copy.add_record(high)
with self.assertRaises(RestrictionException) as ctx:
restrictor.process_source_zone(copy)
self.assertEqual(
'high.unit.tests. ttl=2048 too high, max_ttl=1024',
str(ctx.exception),
)
# too low defaults
restrictor = TtlRestrictionFilter('test')
low = Record.new(
zone, 'low', {'type': 'A', 'ttl': 0, 'value': '1.2.3.4'}
)
copy = zone.copy()
copy.add_record(low)
with self.assertRaises(RestrictionException) as ctx:
restrictor.process_source_zone(copy)
self.assertEqual(
'low.unit.tests. ttl=0 too low, min_ttl=1', str(ctx.exception)
)
# too high defaults
high = Record.new(
zone, 'high', {'type': 'A', 'ttl': 999999, 'value': '1.2.3.4'}
)
copy = zone.copy()
copy.add_record(high)
with self.assertRaises(RestrictionException) as ctx:
restrictor.process_source_zone(copy)
self.assertEqual(
'high.unit.tests. ttl=999999 too high, max_ttl=604800',
str(ctx.exception),
)
# allowed_ttls
restrictor = TtlRestrictionFilter('test', allowed_ttls=[42, 300])
# add 300 (42 is already there)
another = Record.new(
zone, 'another', {'type': 'A', 'ttl': 300, 'value': '4.5.6.7'}
)
zone.add_record(another)
# 42 and 300 are allowed through
restricted = restrictor.process_source_zone(zone)
self.assertEqual(zone.records, restricted.records)
# 16 is not
copy = zone.copy()
copy.add_record(low)
with self.assertRaises(RestrictionException) as ctx:
restrictor.process_source_zone(copy)
self.assertEqual(
'low.unit.tests. ttl=0 not an allowed value, allowed_ttls={42, 300}',
str(ctx.exception),
)

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from logging import getLogger
from unittest import TestCase
from unittest.mock import MagicMock, call

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase
# Just for coverage

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase
# Just for coverage

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase

View File

@@ -2,20 +2,15 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from os import makedirs
from os.path import basename, dirname, isdir, isfile, join
from unittest import TestCase
from yaml import safe_load
from yaml.constructor import ConstructorError
from octodns.idna import idna_encode
from octodns.record import _NsValue, Create, Record, ValuesMixin
from octodns.provider import ProviderException
from octodns.provider.base import Plan
from octodns.provider.yaml import (
_list_all_yaml_files,
@@ -172,6 +167,58 @@ class TestYamlProvider(TestCase):
# make sure nothing is left
self.assertEqual([], list(data.keys()))
def test_idna(self):
with TemporaryDirectory() as td:
name = 'déjà.vu.'
filename = f'{name}yaml'
provider = YamlProvider('test', td.dirname)
zone = Zone(idna_encode(name), [])
# create a idna named file
with open(join(td.dirname, idna_encode(filename)), 'w') as fh:
fh.write(
'''---
'':
type: A
value: 1.2.3.4
# something in idna notation
xn--dj-kia8a:
type: A
value: 2.3.4.5
# something with utf-8
これはテストです:
type: A
value: 3.4.5.6
'''
)
# populates fine when there's just the idna version (as a fallback)
provider.populate(zone)
d = {r.name: r for r in zone.records}
self.assertEqual(3, len(d))
# verify that we loaded the expected records, including idna/utf-8
# named ones
self.assertEqual(['1.2.3.4'], d[''].values)
self.assertEqual(['2.3.4.5'], d['xn--dj-kia8a'].values)
self.assertEqual(['3.4.5.6'], d['xn--28jm5b5a8k5k8cra'].values)
# create a utf8 named file (provider always writes utf-8 filenames
plan = provider.plan(zone)
provider.apply(plan)
with open(join(td.dirname, filename), 'r') as fh:
content = fh.read()
# verify that the non-ascii records were written out in utf-8
self.assertTrue('déjà:' in content)
self.assertTrue('これはテストです:' in content)
# does not allow both idna and utf8 named files
with self.assertRaises(ProviderException) as ctx:
provider.populate(zone)
msg = str(ctx.exception)
self.assertTrue('Both UTF-8' in msg)
def test_empty(self):
source = YamlProvider(
'test', join(dirname(__file__), 'config'), supports_root_ns=False

View File

@@ -2,15 +2,9 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase
from octodns.idna import idna_encode
from octodns.record import (
ARecord,
AaaaRecord,
@@ -92,6 +86,18 @@ class TestRecord(TestCase):
)
self.assertEqual('mixedcase', record.name)
def test_utf8(self):
zone = Zone('natación.mx.', [])
utf8 = 'niño'
encoded = idna_encode(utf8)
record = ARecord(
zone, utf8, {'ttl': 30, 'type': 'A', 'value': '1.2.3.4'}
)
self.assertEqual(encoded, record.name)
self.assertEqual(utf8, record.decoded_name)
self.assertTrue(f'{encoded}.{zone.name}', record.fqdn)
self.assertTrue(f'{utf8}.{zone.decoded_name}', record.decoded_fqdn)
def test_alias_lowering_value(self):
upper_record = AliasRecord(
self.zone,
@@ -2552,6 +2558,51 @@ class TestRecordValidation(TestCase):
self.zone, name, {'ttl': 300, 'type': 'A', 'value': '1.2.3.4'}
)
# make sure we're validating with encoded fqdns
utf8 = 'déjà-vu'
padding = ('.' + ('x' * 57)) * 4
utf8_name = f'{utf8}{padding}'
# make sure our test is valid here, we're under 253 chars long as utf8
self.assertEqual(251, len(f'{utf8_name}.{self.zone.name}'))
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
utf8_name,
{'ttl': 300, 'type': 'A', 'value': '1.2.3.4'},
)
reason = ctx.exception.reasons[0]
self.assertTrue(reason.startswith('invalid fqdn, "déjà-vu'))
self.assertTrue(
reason.endswith(
'.unit.tests." is too long at 259' ' chars, max is 253'
)
)
# same, but with ascii version of things
plain = 'deja-vu'
plain_name = f'{plain}{padding}'
self.assertEqual(251, len(f'{plain_name}.{self.zone.name}'))
Record.new(
self.zone, plain_name, {'ttl': 300, 'type': 'A', 'value': '1.2.3.4'}
)
# check that we're validating encoded labels
padding = 'x' * (60 - len(utf8))
utf8_name = f'{utf8}{padding}'
# make sure the test is valid, we're at 63 chars
self.assertEqual(60, len(utf8_name))
with self.assertRaises(ValidationError) as ctx:
Record.new(
self.zone,
utf8_name,
{'ttl': 300, 'type': 'A', 'value': '1.2.3.4'},
)
reason = ctx.exception.reasons[0]
# Unfortunately this is a translated IDNAError so we don't have much
# control over the exact message :-/ (doesn't give context like octoDNS
# does)
self.assertEqual('Label too long', reason)
# no ttl
with self.assertRaises(ValidationError) as ctx:
Record.new(self.zone, '', {'type': 'A', 'value': '1.2.3.4'})

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase
from octodns.record.geo import GeoCodes

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
import dns.zone
from dns.exception import DNSException

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase
from octodns.record import Record

View File

@@ -2,13 +2,6 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from io import StringIO
from unittest import TestCase
from yaml.constructor import ConstructorError

View File

@@ -2,15 +2,9 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from unittest import TestCase
from octodns.idna import idna_encode
from octodns.record import (
ARecord,
AaaaRecord,
@@ -35,6 +29,13 @@ class TestZone(TestCase):
zone = Zone('UniT.TEsTs.', [])
self.assertEqual('unit.tests.', zone.name)
def test_utf8(self):
utf8 = 'grüßen.de.'
encoded = idna_encode(utf8)
zone = Zone(utf8, [])
self.assertEqual(encoded, zone.name)
self.assertEqual(utf8, zone.decoded_name)
def test_hostname_from_fqdn(self):
zone = Zone('unit.tests.', [])
for hostname, fqdn in (
@@ -46,6 +47,27 @@ class TestZone(TestCase):
('foo.bar', 'foo.bar.unit.tests'),
('foo.unit.tests', 'foo.unit.tests.unit.tests.'),
('foo.unit.tests', 'foo.unit.tests.unit.tests'),
('déjà', 'déjà.unit.tests'),
('déjà.foo', 'déjà.foo.unit.tests'),
('bar.déjà', 'bar.déjà.unit.tests'),
('bar.déjà.foo', 'bar.déjà.foo.unit.tests'),
):
self.assertEqual(hostname, zone.hostname_from_fqdn(fqdn))
zone = Zone('grüßen.de.', [])
for hostname, fqdn in (
('', 'grüßen.de.'),
('', 'grüßen.de'),
('foo', 'foo.grüßen.de.'),
('foo', 'foo.grüßen.de'),
('foo.bar', 'foo.bar.grüßen.de.'),
('foo.bar', 'foo.bar.grüßen.de'),
('foo.grüßen.de', 'foo.grüßen.de.grüßen.de.'),
('foo.grüßen.de', 'foo.grüßen.de.grüßen.de'),
('déjà', 'déjà.grüßen.de'),
('déjà.foo', 'déjà.foo.grüßen.de'),
('bar.déjà', 'bar.déjà.grüßen.de'),
('bar.déjà.foo', 'bar.déjà.foo.grüßen.de'),
):
self.assertEqual(hostname, zone.hostname_from_fqdn(fqdn))