1
0
mirror of https://github.com/github/octodns.git synced 2024-05-11 05:55:00 +00:00

Use path.join, add loop detection & tests

This commit is contained in:
Ross McFarland
2018-06-09 16:21:25 -07:00
parent 50e1ce3881
commit 1e2da34361
2 changed files with 84 additions and 35 deletions

View File

@@ -5,8 +5,8 @@
from __future__ import absolute_import, division, print_function, \
unicode_literals
from os import makedirs
from os.path import isdir, join
from os import makedirs, path
from os.path import isdir
import logging
from .base import BaseProvider
@@ -66,60 +66,50 @@ class EtcHostsProvider(BaseProvider):
if not isdir(self.directory):
makedirs(self.directory)
filename = '{}hosts'.format(join(self.directory, desired.name))
filename = '{}hosts'.format(path.join(self.directory, desired.name))
self.log.info('_apply: filename=%s', filename)
with open(filename, 'w') as fh:
fh.write('##################################################\n')
fh.write('# octoDNS ')
fh.write(self.id)
fh.write(' ')
fh.write(desired.name)
fh.write('\n')
fh.write('# octoDNS {} {}\n'.format(self.id, desired.name))
fh.write('##################################################\n\n')
if values:
fh.write('## A & AAAA\n\n')
for fqdn, value in sorted(values.items()):
if fqdn[0] == '*':
fh.write('# ')
fh.write(value)
fh.write('\t')
fh.write(fqdn)
fh.write('\n\n')
fh.write('{}\t{}\n\n'.format(value, fqdn))
if cnames:
fh.write('\n')
fh.write('## CNAME (mapped)\n\n')
fh.write('\n## CNAME (mapped)\n\n')
for fqdn, value in sorted(cnames.items()):
# Print out a comment of the first level
fh.write('# ')
fh.write(fqdn)
fh.write(' -> ')
fh.write(value)
fh.write('\n')
# No loop protection :-/
fh.write('# {} -> {}\n'.format(fqdn, value))
seen = set()
while True:
seen.add(value)
try:
value = values[value]
# If we're here we've found the target, print it
# and break the loop
fh.write(value)
fh.write('\t')
fh.write(fqdn)
fh.write('\n')
fh.write('{}\t{}\n'.format(value, fqdn))
break
except KeyError:
# Try and step down one level
orig = value
value = cnames.get(value, None)
# Print out this step
fh.write('# ')
fh.write(orig)
fh.write(' -> ')
if value:
fh.write(value)
if value in seen:
# We'd loop here, break it
fh.write('# {} -> {} **loop**\n'
.format(orig, value))
break
else:
fh.write('# {} -> {}\n'
.format(orig, value))
else:
# Don't have anywhere else to go
fh.write('**unknown**')
fh.write('# {} -> **unknown**\n'.format(orig))
break
fh.write('\n')
fh.write('\n')

View File

@@ -5,7 +5,8 @@
from __future__ import absolute_import, division, print_function, \
unicode_literals
from os.path import dirname, isfile, join
from os import path
from os.path import dirname, isfile
from unittest import TestCase
from octodns.provider.etc_hosts import EtcHostsProvider
@@ -19,7 +20,8 @@ from helpers import TemporaryDirectory
class TestEtcHostsProvider(TestCase):
def test_provider(self):
source = EtcHostsProvider('test', join(dirname(__file__), 'config'))
source = EtcHostsProvider('test', path.join(dirname(__file__),
'config'))
zone = Zone('unit.tests.', [])
@@ -86,8 +88,8 @@ class TestEtcHostsProvider(TestCase):
with TemporaryDirectory() as td:
# Add some subdirs to make sure that it can create them
directory = join(td.dirname, 'sub', 'dir')
hosts_file = join(directory, 'unit.tests.hosts')
directory = path.join(td.dirname, 'sub', 'dir')
hosts_file = path.join(directory, 'unit.tests.hosts')
target = EtcHostsProvider('test', directory)
# We add everything
@@ -102,7 +104,7 @@ class TestEtcHostsProvider(TestCase):
with open(hosts_file) as fh:
data = fh.read()
# v6
self.assertTrue('2001:4860:4860::8844\tv6.unit.tests')
self.assertTrue('2001:4860:4860::8844\tv6.unit.tests' in data)
# www
self.assertTrue('1.1.1.1\twww.unit.tests' in data)
# root ALIAS
@@ -118,3 +120,60 @@ class TestEtcHostsProvider(TestCase):
# second empty run that won't create dirs and overwrites file
plan = Plan(zone, zone, [], True)
self.assertEquals(0, target.apply(plan))
def test_cname_loop(self):
source = EtcHostsProvider('test', path.join(dirname(__file__),
'config'))
zone = Zone('unit.tests.', [])
# We never populate anything, when acting as a source
source.populate(zone, target=source)
self.assertEquals(0, len(zone.records))
# Same if we're acting as a target
source.populate(zone)
self.assertEquals(0, len(zone.records))
record = Record.new(zone, 'start', {
'ttl': 60,
'type': 'CNAME',
'value': 'middle.unit.tests.',
})
zone.add_record(record)
record = Record.new(zone, 'middle', {
'ttl': 60,
'type': 'CNAME',
'value': 'loop.unit.tests.',
})
zone.add_record(record)
record = Record.new(zone, 'loop', {
'ttl': 60,
'type': 'CNAME',
'value': 'start.unit.tests.',
})
zone.add_record(record)
with TemporaryDirectory() as td:
# Add some subdirs to make sure that it can create them
directory = path.join(td.dirname, 'sub', 'dir')
hosts_file = path.join(directory, 'unit.tests.hosts')
target = EtcHostsProvider('test', directory)
# We add everything
plan = target.plan(zone)
self.assertEquals(len(zone.records), len(plan.changes))
self.assertFalse(isfile(hosts_file))
# Now actually do it
self.assertEquals(len(zone.records), target.apply(plan))
self.assertTrue(isfile(hosts_file))
with open(hosts_file) as fh:
data = fh.read()
print(data)
self.assertTrue('# loop.unit.tests -> start.unit.tests '
'**loop**' in data)
self.assertTrue('# middle.unit.tests -> loop.unit.tests '
'**loop**' in data)
self.assertTrue('# start.unit.tests -> middle.unit.tests '
'**loop**' in data)