mirror of
https://github.com/github/octodns.git
synced 2024-05-11 05:55:00 +00:00
397 lines
16 KiB
Python
397 lines
16 KiB
Python
#
|
||
#
|
||
#
|
||
|
||
from __future__ import absolute_import, division, print_function, \
|
||
unicode_literals
|
||
|
||
from os import environ
|
||
from os.path import dirname, join
|
||
from six import text_type
|
||
from unittest import TestCase
|
||
|
||
from octodns.record import Record
|
||
from octodns.manager import _AggregateTarget, MainThreadExecutor, Manager, \
|
||
ManagerException
|
||
from octodns.yaml import safe_load
|
||
from octodns.zone import Zone
|
||
|
||
from helpers import DynamicProvider, GeoProvider, NoSshFpProvider, \
|
||
SimpleProvider, TemporaryDirectory
|
||
|
||
config_dir = join(dirname(__file__), 'config')
|
||
|
||
|
||
def get_config_filename(which):
|
||
return join(config_dir, which)
|
||
|
||
|
||
class TestManager(TestCase):
|
||
|
||
def test_missing_provider_class(self):
|
||
with self.assertRaises(ManagerException) as ctx:
|
||
Manager(get_config_filename('missing-provider-class.yaml')).sync()
|
||
self.assertTrue('missing class' in text_type(ctx.exception))
|
||
|
||
def test_bad_provider_class(self):
|
||
with self.assertRaises(ManagerException) as ctx:
|
||
Manager(get_config_filename('bad-provider-class.yaml')).sync()
|
||
self.assertTrue('Unknown provider class' in text_type(ctx.exception))
|
||
|
||
def test_bad_provider_class_module(self):
|
||
with self.assertRaises(ManagerException) as ctx:
|
||
Manager(get_config_filename('bad-provider-class-module.yaml')) \
|
||
.sync()
|
||
self.assertTrue('Unknown provider class' in text_type(ctx.exception))
|
||
|
||
def test_bad_provider_class_no_module(self):
|
||
with self.assertRaises(ManagerException) as ctx:
|
||
Manager(get_config_filename('bad-provider-class-no-module.yaml')) \
|
||
.sync()
|
||
self.assertTrue('Unknown provider class' in text_type(ctx.exception))
|
||
|
||
def test_missing_provider_config(self):
|
||
# Missing provider config
|
||
with self.assertRaises(ManagerException) as ctx:
|
||
Manager(get_config_filename('missing-provider-config.yaml')).sync()
|
||
self.assertTrue('provider config' in text_type(ctx.exception))
|
||
|
||
def test_missing_env_config(self):
|
||
with self.assertRaises(ManagerException) as ctx:
|
||
Manager(get_config_filename('missing-provider-env.yaml')).sync()
|
||
self.assertTrue('missing env var' in text_type(ctx.exception))
|
||
|
||
def test_missing_source(self):
|
||
with self.assertRaises(ManagerException) as ctx:
|
||
Manager(get_config_filename('provider-problems.yaml')) \
|
||
.sync(['missing.sources.'])
|
||
self.assertTrue('missing sources' in text_type(ctx.exception))
|
||
|
||
def test_missing_targets(self):
|
||
with self.assertRaises(ManagerException) as ctx:
|
||
Manager(get_config_filename('provider-problems.yaml')) \
|
||
.sync(['missing.targets.'])
|
||
self.assertTrue('missing targets' in text_type(ctx.exception))
|
||
|
||
def test_unknown_source(self):
|
||
with self.assertRaises(ManagerException) as ctx:
|
||
Manager(get_config_filename('provider-problems.yaml')) \
|
||
.sync(['unknown.source.'])
|
||
self.assertTrue('unknown source' in text_type(ctx.exception))
|
||
|
||
def test_unknown_target(self):
|
||
with self.assertRaises(ManagerException) as ctx:
|
||
Manager(get_config_filename('provider-problems.yaml')) \
|
||
.sync(['unknown.target.'])
|
||
self.assertTrue('unknown target' in text_type(ctx.exception))
|
||
|
||
def test_bad_plan_output_class(self):
|
||
with self.assertRaises(ManagerException) as ctx:
|
||
name = 'bad-plan-output-missing-class.yaml'
|
||
Manager(get_config_filename(name)).sync()
|
||
self.assertEquals('plan_output bad is missing class',
|
||
text_type(ctx.exception))
|
||
|
||
def test_bad_plan_output_config(self):
|
||
with self.assertRaises(ManagerException) as ctx:
|
||
Manager(get_config_filename('bad-plan-output-config.yaml')).sync()
|
||
self.assertEqual('Incorrect plan_output config for bad',
|
||
text_type(ctx.exception))
|
||
|
||
def test_source_only_as_a_target(self):
|
||
with self.assertRaises(ManagerException) as ctx:
|
||
Manager(get_config_filename('provider-problems.yaml')) \
|
||
.sync(['not.targetable.'])
|
||
self.assertTrue('does not support targeting' in
|
||
text_type(ctx.exception))
|
||
|
||
def test_always_dry_run(self):
|
||
with TemporaryDirectory() as tmpdir:
|
||
environ['YAML_TMP_DIR'] = tmpdir.dirname
|
||
tc = Manager(get_config_filename('always-dry-run.yaml')) \
|
||
.sync(dry_run=False)
|
||
# only the stuff from subzone, unit.tests. is always-dry-run
|
||
self.assertEquals(3, tc)
|
||
|
||
def test_simple(self):
|
||
with TemporaryDirectory() as tmpdir:
|
||
environ['YAML_TMP_DIR'] = tmpdir.dirname
|
||
tc = Manager(get_config_filename('simple.yaml')) \
|
||
.sync(dry_run=False)
|
||
self.assertEquals(22, tc)
|
||
|
||
# try with just one of the zones
|
||
tc = Manager(get_config_filename('simple.yaml')) \
|
||
.sync(dry_run=False, eligible_zones=['unit.tests.'])
|
||
self.assertEquals(16, tc)
|
||
|
||
# the subzone, with 2 targets
|
||
tc = Manager(get_config_filename('simple.yaml')) \
|
||
.sync(dry_run=False, eligible_zones=['subzone.unit.tests.'])
|
||
self.assertEquals(6, tc)
|
||
|
||
# and finally the empty zone
|
||
tc = Manager(get_config_filename('simple.yaml')) \
|
||
.sync(dry_run=False, eligible_zones=['empty.'])
|
||
self.assertEquals(0, tc)
|
||
|
||
# Again with force
|
||
tc = Manager(get_config_filename('simple.yaml')) \
|
||
.sync(dry_run=False, force=True)
|
||
self.assertEquals(22, tc)
|
||
|
||
# Again with max_workers = 1
|
||
tc = Manager(get_config_filename('simple.yaml'), max_workers=1) \
|
||
.sync(dry_run=False, force=True)
|
||
self.assertEquals(22, tc)
|
||
|
||
# Include meta
|
||
tc = Manager(get_config_filename('simple.yaml'), max_workers=1,
|
||
include_meta=True) \
|
||
.sync(dry_run=False, force=True)
|
||
self.assertEquals(26, tc)
|
||
|
||
def test_eligible_sources(self):
|
||
with TemporaryDirectory() as tmpdir:
|
||
environ['YAML_TMP_DIR'] = tmpdir.dirname
|
||
# Only allow a target that doesn't exist
|
||
tc = Manager(get_config_filename('simple.yaml')) \
|
||
.sync(eligible_sources=['foo'])
|
||
self.assertEquals(0, tc)
|
||
|
||
def test_eligible_targets(self):
|
||
with TemporaryDirectory() as tmpdir:
|
||
environ['YAML_TMP_DIR'] = tmpdir.dirname
|
||
# Only allow a target that doesn't exist
|
||
tc = Manager(get_config_filename('simple.yaml')) \
|
||
.sync(eligible_targets=['foo'])
|
||
self.assertEquals(0, tc)
|
||
|
||
def test_aliases(self):
|
||
with TemporaryDirectory() as tmpdir:
|
||
environ['YAML_TMP_DIR'] = tmpdir.dirname
|
||
# Alias zones with a valid target.
|
||
tc = Manager(get_config_filename('simple-alias-zone.yaml')) \
|
||
.sync()
|
||
self.assertEquals(0, tc)
|
||
|
||
# Alias zone with an invalid target.
|
||
with self.assertRaises(ManagerException) as ctx:
|
||
tc = Manager(get_config_filename('unknown-source-zone.yaml')) \
|
||
.sync()
|
||
self.assertEquals('Invalid alias zone alias.tests.: source zone '
|
||
'does-not-exists.tests. does not exist',
|
||
text_type(ctx.exception))
|
||
|
||
# Alias zone that points to another alias zone.
|
||
with self.assertRaises(ManagerException) as ctx:
|
||
tc = Manager(get_config_filename('alias-zone-loop.yaml')) \
|
||
.sync()
|
||
self.assertEquals('Invalid alias zone alias-loop.tests.: source '
|
||
'zone alias.tests. is an alias zone',
|
||
text_type(ctx.exception))
|
||
|
||
def test_compare(self):
|
||
with TemporaryDirectory() as tmpdir:
|
||
environ['YAML_TMP_DIR'] = tmpdir.dirname
|
||
manager = Manager(get_config_filename('simple.yaml'))
|
||
|
||
# make sure this was pulled in from the config
|
||
self.assertEquals(2, manager._executor._max_workers)
|
||
|
||
changes = manager.compare(['in'], ['in'], 'unit.tests.')
|
||
self.assertEquals([], changes)
|
||
|
||
# Create an empty unit.test zone config
|
||
with open(join(tmpdir.dirname, 'unit.tests.yaml'), 'w') as fh:
|
||
fh.write('---\n{}')
|
||
|
||
changes = manager.compare(['in'], ['dump'], 'unit.tests.')
|
||
self.assertEquals(16, len(changes))
|
||
|
||
# Compound sources with varying support
|
||
changes = manager.compare(['in', 'nosshfp'],
|
||
['dump'],
|
||
'unit.tests.')
|
||
self.assertEquals(15, len(changes))
|
||
|
||
with self.assertRaises(ManagerException) as ctx:
|
||
manager.compare(['nope'], ['dump'], 'unit.tests.')
|
||
self.assertEquals('Unknown source: nope', text_type(ctx.exception))
|
||
|
||
def test_aggregate_target(self):
|
||
simple = SimpleProvider()
|
||
geo = GeoProvider()
|
||
dynamic = DynamicProvider()
|
||
nosshfp = NoSshFpProvider()
|
||
|
||
self.assertFalse(_AggregateTarget([simple, simple]).SUPPORTS_GEO)
|
||
self.assertFalse(_AggregateTarget([simple, geo]).SUPPORTS_GEO)
|
||
self.assertFalse(_AggregateTarget([geo, simple]).SUPPORTS_GEO)
|
||
self.assertTrue(_AggregateTarget([geo, geo]).SUPPORTS_GEO)
|
||
|
||
self.assertFalse(_AggregateTarget([simple, simple]).SUPPORTS_DYNAMIC)
|
||
self.assertFalse(_AggregateTarget([simple, dynamic]).SUPPORTS_DYNAMIC)
|
||
self.assertFalse(_AggregateTarget([dynamic, simple]).SUPPORTS_DYNAMIC)
|
||
self.assertTrue(_AggregateTarget([dynamic, dynamic]).SUPPORTS_DYNAMIC)
|
||
|
||
zone = Zone('unit.tests.', [])
|
||
record = Record.new(zone, 'sshfp', {
|
||
'ttl': 60,
|
||
'type': 'SSHFP',
|
||
'value': {
|
||
'algorithm': 1,
|
||
'fingerprint_type': 1,
|
||
'fingerprint': 'abcdefg',
|
||
},
|
||
})
|
||
self.assertTrue(simple.supports(record))
|
||
self.assertFalse(nosshfp.supports(record))
|
||
self.assertTrue(_AggregateTarget([simple, simple]).supports(record))
|
||
self.assertFalse(_AggregateTarget([simple, nosshfp]).supports(record))
|
||
|
||
def test_dump(self):
|
||
with TemporaryDirectory() as tmpdir:
|
||
environ['YAML_TMP_DIR'] = tmpdir.dirname
|
||
manager = Manager(get_config_filename('simple.yaml'))
|
||
|
||
with self.assertRaises(ManagerException) as ctx:
|
||
manager.dump('unit.tests.', tmpdir.dirname, False, False,
|
||
'nope')
|
||
self.assertEquals('Unknown source: nope', text_type(ctx.exception))
|
||
|
||
manager.dump('unit.tests.', tmpdir.dirname, False, False, 'in')
|
||
|
||
# make sure this fails with an IOError and not a KeyError when
|
||
# tyring to find sub zones
|
||
with self.assertRaises(IOError):
|
||
manager.dump('unknown.zone.', tmpdir.dirname, False, False,
|
||
'in')
|
||
|
||
def test_dump_empty(self):
|
||
with TemporaryDirectory() as tmpdir:
|
||
environ['YAML_TMP_DIR'] = tmpdir.dirname
|
||
manager = Manager(get_config_filename('simple.yaml'))
|
||
|
||
manager.dump('empty.', tmpdir.dirname, False, False, 'in')
|
||
|
||
with open(join(tmpdir.dirname, 'empty.yaml')) as fh:
|
||
data = safe_load(fh, False)
|
||
self.assertFalse(data)
|
||
|
||
def test_dump_split(self):
|
||
with TemporaryDirectory() as tmpdir:
|
||
environ['YAML_TMP_DIR'] = tmpdir.dirname
|
||
manager = Manager(get_config_filename('simple-split.yaml'))
|
||
|
||
with self.assertRaises(ManagerException) as ctx:
|
||
manager.dump('unit.tests.', tmpdir.dirname, False, True,
|
||
'nope')
|
||
self.assertEquals('Unknown source: nope', text_type(ctx.exception))
|
||
|
||
manager.dump('unit.tests.', tmpdir.dirname, False, True, 'in')
|
||
|
||
# make sure this fails with an OSError and not a KeyError when
|
||
# tyring to find sub zones
|
||
with self.assertRaises(OSError):
|
||
manager.dump('unknown.zone.', tmpdir.dirname, False, True,
|
||
'in')
|
||
|
||
def test_validate_configs(self):
|
||
Manager(get_config_filename('simple-validate.yaml')).validate_configs()
|
||
|
||
with self.assertRaises(ManagerException) as ctx:
|
||
Manager(get_config_filename('missing-sources.yaml')) \
|
||
.validate_configs()
|
||
self.assertTrue('missing sources' in text_type(ctx.exception))
|
||
|
||
with self.assertRaises(ManagerException) as ctx:
|
||
Manager(get_config_filename('unknown-provider.yaml')) \
|
||
.validate_configs()
|
||
self.assertTrue('unknown source' in text_type(ctx.exception))
|
||
|
||
# Alias zone using an invalid source zone.
|
||
with self.assertRaises(ManagerException) as ctx:
|
||
Manager(get_config_filename('unknown-source-zone.yaml')) \
|
||
.validate_configs()
|
||
self.assertTrue('does not exist' in
|
||
text_type(ctx.exception))
|
||
|
||
# Alias zone that points to another alias zone.
|
||
with self.assertRaises(ManagerException) as ctx:
|
||
Manager(get_config_filename('alias-zone-loop.yaml')) \
|
||
.validate_configs()
|
||
self.assertTrue('is an alias zone' in
|
||
text_type(ctx.exception))
|
||
|
||
# Valid config file using an alias zone.
|
||
Manager(get_config_filename('simple-alias-zone.yaml')) \
|
||
.validate_configs()
|
||
|
||
def test_get_zone(self):
|
||
Manager(get_config_filename('simple.yaml')).get_zone('unit.tests.')
|
||
|
||
with self.assertRaises(ManagerException) as ctx:
|
||
Manager(get_config_filename('simple.yaml')).get_zone('unit.tests')
|
||
self.assertTrue('missing ending dot' in text_type(ctx.exception))
|
||
|
||
with self.assertRaises(ManagerException) as ctx:
|
||
Manager(get_config_filename('simple.yaml')) \
|
||
.get_zone('unknown-zone.tests.')
|
||
self.assertTrue('Unknown zone name' in text_type(ctx.exception))
|
||
|
||
def test_populate_lenient_fallback(self):
|
||
with TemporaryDirectory() as tmpdir:
|
||
environ['YAML_TMP_DIR'] = tmpdir.dirname
|
||
# Only allow a target that doesn't exist
|
||
manager = Manager(get_config_filename('simple.yaml'))
|
||
|
||
class NoLenient(SimpleProvider):
|
||
|
||
def populate(self, zone, source=False):
|
||
pass
|
||
|
||
# This should be ok, we'll fall back to not passing it
|
||
manager._populate_and_plan('unit.tests.', [], [NoLenient()], [])
|
||
|
||
class NoZone(SimpleProvider):
|
||
|
||
def populate(self, lenient=False):
|
||
pass
|
||
|
||
# This will blow up, we don't fallback for source
|
||
with self.assertRaises(TypeError):
|
||
manager._populate_and_plan('unit.tests.', [NoZone()], [])
|
||
|
||
|
||
class TestMainThreadExecutor(TestCase):
|
||
|
||
def test_success(self):
|
||
mte = MainThreadExecutor()
|
||
|
||
future = mte.submit(self.success, 42)
|
||
self.assertEquals(42, future.result())
|
||
|
||
future = mte.submit(self.success, ret=43)
|
||
self.assertEquals(43, future.result())
|
||
|
||
def test_exception(self):
|
||
mte = MainThreadExecutor()
|
||
|
||
e = Exception('boom')
|
||
future = mte.submit(self.exception, e)
|
||
with self.assertRaises(Exception) as ctx:
|
||
future.result()
|
||
self.assertEquals(e, ctx.exception)
|
||
|
||
future = mte.submit(self.exception, e=e)
|
||
with self.assertRaises(Exception) as ctx:
|
||
future.result()
|
||
self.assertEquals(e, ctx.exception)
|
||
|
||
def success(self, ret):
|
||
return ret
|
||
|
||
def exception(self, e):
|
||
raise e
|