mirror of
https://github.com/github/octodns.git
synced 2024-05-11 05:55:00 +00:00
Merge remote-tracking branch 'origin/master' into py3-f-strings
This commit is contained in:
@@ -48,19 +48,19 @@ class EasyDNSClient(object):
|
||||
# Domain Portfolio
|
||||
domain_portfolio = 'myport'
|
||||
|
||||
def __init__(self, token, api_key, currency, portfolio, sandbox):
|
||||
def __init__(self, token, api_key, currency, portfolio, sandbox,
|
||||
domain_create_sleep):
|
||||
self.log = logging.getLogger('EasyDNSProvider[{}]'.format(id))
|
||||
self.token = token
|
||||
self.api_key = api_key
|
||||
self.default_currency = currency
|
||||
self.domain_portfolio = portfolio
|
||||
self.apienv = 'sandbox' if sandbox else 'live'
|
||||
auth_key = '{}:{}'.format(self.token, self.api_key)
|
||||
self.auth_key = base64.b64encode(auth_key.encode("utf-8"))
|
||||
self.domain_create_sleep = domain_create_sleep
|
||||
|
||||
auth_key = '{}:{}'.format(token, api_key)
|
||||
auth_key = base64.b64encode(auth_key.encode("utf-8"))
|
||||
self.base_path = self.SANDBOX if sandbox else self.LIVE
|
||||
sess = Session()
|
||||
sess.headers.update({'Authorization': 'Basic {}'
|
||||
.format(self.auth_key.decode('utf-8'))})
|
||||
.format(auth_key.decode('utf-8'))})
|
||||
sess.headers.update({'accept': 'application/json'})
|
||||
self._sess = sess
|
||||
|
||||
@@ -99,7 +99,7 @@ class EasyDNSClient(object):
|
||||
# we need to delete those default record so we can sync with the source
|
||||
# records, first we'll sleep for a second before gathering new records
|
||||
# We also create default NS records, but they won't be deleted
|
||||
sleep(1)
|
||||
sleep(self.domain_create_sleep)
|
||||
records = self.records(name, True)
|
||||
for record in records:
|
||||
if record['host'] in ('', 'www') \
|
||||
@@ -163,12 +163,12 @@ class EasyDNSProvider(BaseProvider):
|
||||
'SRV', 'NAPTR'))
|
||||
|
||||
def __init__(self, id, token, api_key, currency='CAD', portfolio='myport',
|
||||
sandbox=False, *args, **kwargs):
|
||||
sandbox=False, domain_create_sleep=1, *args, **kwargs):
|
||||
self.log = logging.getLogger('EasyDNSProvider[{}]'.format(id))
|
||||
self.log.debug('__init__: id=%s, token=***', id)
|
||||
super(EasyDNSProvider, self).__init__(id, *args, **kwargs)
|
||||
self._client = EasyDNSClient(token, api_key, currency, portfolio,
|
||||
sandbox)
|
||||
sandbox, domain_create_sleep)
|
||||
self._zone_records = {}
|
||||
|
||||
def _data_for_multiple(self, _type, records):
|
||||
|
||||
@@ -57,26 +57,28 @@ class TransipProvider(BaseProvider):
|
||||
TIMEOUT = 15
|
||||
ROOT_RECORD = '@'
|
||||
|
||||
def __init__(self, id, account, key=None, key_file=None, *args, **kwargs):
|
||||
def __init__(self, id, account, key=None, key_file=None, *args, **kwargs):
|
||||
self.log = getLogger('TransipProvider[{}]'.format(id))
|
||||
self.log.debug('__init__: id=%s, account=%s, token=***', id,
|
||||
account)
|
||||
super(TransipProvider, self).__init__(id, *args, **kwargs)
|
||||
|
||||
if key_file is not None:
|
||||
self._client = DomainService(account, private_key_file=key_file)
|
||||
self._client = self._domain_service(account,
|
||||
private_key_file=key_file)
|
||||
elif key is not None:
|
||||
self._client = DomainService(account, private_key=key)
|
||||
self._client = self._domain_service(account, private_key=key)
|
||||
else:
|
||||
raise TransipConfigException(
|
||||
'Missing `key` of `key_file` parameter in config'
|
||||
'Missing `key` or `key_file` parameter in config'
|
||||
)
|
||||
|
||||
self.account = account
|
||||
self.key = key
|
||||
|
||||
self._currentZone = {}
|
||||
|
||||
def _domain_service(self, *args, **kwargs):
|
||||
'This exists only for mocking purposes'
|
||||
return DomainService(*args, **kwargs)
|
||||
|
||||
def populate(self, zone, target=False, lenient=False):
|
||||
|
||||
exists = False
|
||||
|
||||
@@ -1,8 +1,16 @@
|
||||
coverage
|
||||
mock
|
||||
nose
|
||||
nose-no-network
|
||||
nose-timer
|
||||
pycodestyle==2.6.0
|
||||
pyflakes==2.2.0
|
||||
readme_renderer[md]==26.0
|
||||
requests_mock
|
||||
twine==3.2.0; python_version >= '3.2'
|
||||
|
||||
# Profiling tests...
|
||||
# nose-cprof
|
||||
# snakeviz
|
||||
# ./script/test --with-cprof --cprofile-stats-erase
|
||||
# snakeviz stats.dat
|
||||
|
||||
@@ -36,7 +36,7 @@ grep -r -I --line-number "# pragma: +no.*cover" octodns && {
|
||||
exit 1
|
||||
}
|
||||
|
||||
coverage run --branch --source=octodns --omit=octodns/cmds/* "$(command -v nosetests)" --with-xunit "$@"
|
||||
coverage run --branch --source=octodns --omit=octodns/cmds/* "$(command -v nosetests)" --with-no-network --with-xunit "$@"
|
||||
coverage html
|
||||
coverage xml
|
||||
coverage report --show-missing
|
||||
|
||||
@@ -30,4 +30,4 @@ export ARM_CLIENT_SECRET=
|
||||
export ARM_TENANT_ID=
|
||||
export ARM_SUBSCRIPTION_ID=
|
||||
|
||||
nosetests "$@"
|
||||
nosetests --with-no-network "$@"
|
||||
|
||||
@@ -107,7 +107,8 @@ class TestEasyDNSProvider(TestCase):
|
||||
self.assertEquals('Not Found', text_type(ctx.exception))
|
||||
|
||||
def test_apply_not_found(self):
|
||||
provider = EasyDNSProvider('test', 'token', 'apikey')
|
||||
provider = EasyDNSProvider('test', 'token', 'apikey',
|
||||
domain_create_sleep=0)
|
||||
|
||||
wanted = Zone('unit.tests.', [])
|
||||
wanted.add_record(Record.new(wanted, 'test1', {
|
||||
@@ -143,7 +144,8 @@ class TestEasyDNSProvider(TestCase):
|
||||
self.assertEquals('Not Found', text_type(ctx.exception))
|
||||
|
||||
def test_domain_create(self):
|
||||
provider = EasyDNSProvider('test', 'token', 'apikey')
|
||||
provider = EasyDNSProvider('test', 'token', 'apikey',
|
||||
domain_create_sleep=0)
|
||||
domain_after_creation = {
|
||||
"tm": 1000000000,
|
||||
"data": [{
|
||||
|
||||
@@ -394,8 +394,12 @@ class TestRoute53Provider(TestCase):
|
||||
|
||||
return (provider, stubber)
|
||||
|
||||
def test_process_desired_zone(self):
|
||||
# with fallback boto makes an unstubbed call to the 169. metadata api, this
|
||||
# stubs that bit out
|
||||
@patch('botocore.credentials.CredentialResolver.load_credentials')
|
||||
def test_process_desired_zone(self, fetch_metadata_token_mock):
|
||||
provider, stubber = self._get_stubbed_fallback_auth_provider()
|
||||
fetch_metadata_token_mock.side_effect = [None]
|
||||
|
||||
# No records, essentially a no-op
|
||||
desired = Zone('unit.tests.', [])
|
||||
@@ -527,8 +531,12 @@ class TestRoute53Provider(TestCase):
|
||||
list(got.records)[0].dynamic.rules[0].data['geos'])
|
||||
self.assertFalse('geos' in list(got.records)[0].dynamic.rules[1].data)
|
||||
|
||||
def test_populate_with_fallback(self):
|
||||
# with fallback boto makes an unstubbed call to the 169. metadata api, this
|
||||
# stubs that bit out
|
||||
@patch('botocore.credentials.CredentialResolver.load_credentials')
|
||||
def test_populate_with_fallback(self, fetch_metadata_token_mock):
|
||||
provider, stubber = self._get_stubbed_fallback_auth_provider()
|
||||
fetch_metadata_token_mock.side_effect = [None]
|
||||
|
||||
got = Zone('unit.tests.', [])
|
||||
with self.assertRaises(ClientError):
|
||||
|
||||
@@ -10,12 +10,12 @@ from six import text_type
|
||||
|
||||
from suds import WebFault
|
||||
|
||||
from mock import patch
|
||||
from unittest import TestCase
|
||||
|
||||
from octodns.provider.transip import TransipProvider
|
||||
from octodns.provider.yaml import YamlProvider
|
||||
from octodns.zone import Zone
|
||||
from transip.service.domain import DomainService
|
||||
from transip.service.objects import DnsEntry
|
||||
|
||||
|
||||
@@ -32,12 +32,11 @@ class MockResponse(object):
|
||||
dnsEntries = []
|
||||
|
||||
|
||||
class MockDomainService(DomainService):
|
||||
class MockDomainService(object):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(MockDomainService, self).__init__('MockDomainService', *args,
|
||||
**kwargs)
|
||||
self.mockupEntries = []
|
||||
self.throw_auth_fault = False
|
||||
|
||||
def mockup(self, records):
|
||||
|
||||
@@ -67,6 +66,9 @@ class MockDomainService(DomainService):
|
||||
# Skips authentication layer and returns the entries loaded by "Mockup"
|
||||
def get_info(self, domain_name):
|
||||
|
||||
if self.throw_auth_fault:
|
||||
self.raiseInvalidAuth()
|
||||
|
||||
# Special 'domain' to trigger error
|
||||
if str(domain_name) == str('notfound.unit.tests'):
|
||||
self.raiseZoneNotFound()
|
||||
@@ -135,26 +137,39 @@ N4OiVz1I3rbZGYa396lpxO6ku8yCglisL1yrSP6DdEUp66ntpKVd
|
||||
source.populate(expected)
|
||||
return expected
|
||||
|
||||
def test_init(self):
|
||||
@patch('octodns.provider.transip.TransipProvider._domain_service',
|
||||
return_value=MockDomainService())
|
||||
def test_init(self, _):
|
||||
|
||||
# No key nor key_file
|
||||
with self.assertRaises(Exception) as ctx:
|
||||
TransipProvider('test', 'unittest')
|
||||
|
||||
self.assertEquals(
|
||||
str('Missing `key` of `key_file` parameter in config'),
|
||||
str('Missing `key` or `key_file` parameter in config'),
|
||||
str(ctx.exception))
|
||||
|
||||
# With key
|
||||
TransipProvider('test', 'unittest', key=self.bogus_key)
|
||||
|
||||
# Existence and content of the key is tested in the SDK on client call
|
||||
# With key_file
|
||||
TransipProvider('test', 'unittest', key_file='/fake/path')
|
||||
|
||||
def test_populate(self):
|
||||
@patch('suds.client.Client.__init__', new=lambda *args, **kwargs: None)
|
||||
def test_domain_service(self):
|
||||
# Special case smoke test for DomainService to get coverage
|
||||
TransipProvider('test', 'unittest', key=self.bogus_key)
|
||||
|
||||
@patch('octodns.provider.transip.TransipProvider._domain_service',
|
||||
return_value=MockDomainService())
|
||||
def test_populate(self, _):
|
||||
_expected = self.make_expected()
|
||||
|
||||
# Unhappy Plan - Not authenticated
|
||||
# Live test against API, will fail in an unauthorized error
|
||||
with self.assertRaises(WebFault) as ctx:
|
||||
provider = TransipProvider('test', 'unittest', self.bogus_key)
|
||||
provider._client.throw_auth_fault = True
|
||||
zone = Zone('unit.tests.', [])
|
||||
provider.populate(zone, True)
|
||||
|
||||
@@ -163,12 +178,14 @@ N4OiVz1I3rbZGYa396lpxO6ku8yCglisL1yrSP6DdEUp66ntpKVd
|
||||
|
||||
self.assertEquals(str('200'), ctx.exception.fault.faultcode)
|
||||
|
||||
# No more auth problems
|
||||
provider._client.throw_auth_fault = False
|
||||
|
||||
# Unhappy Plan - Zone does not exists
|
||||
# Will trigger an exception if provider is used as a target for a
|
||||
# non-existing zone
|
||||
with self.assertRaises(Exception) as ctx:
|
||||
provider = TransipProvider('test', 'unittest', self.bogus_key)
|
||||
provider._client = MockDomainService('unittest', self.bogus_key)
|
||||
zone = Zone('notfound.unit.tests.', [])
|
||||
provider.populate(zone, True)
|
||||
|
||||
@@ -184,13 +201,11 @@ N4OiVz1I3rbZGYa396lpxO6ku8yCglisL1yrSP6DdEUp66ntpKVd
|
||||
# Won't trigger an exception if provider is NOT used as a target for a
|
||||
# non-existing zone.
|
||||
provider = TransipProvider('test', 'unittest', self.bogus_key)
|
||||
provider._client = MockDomainService('unittest', self.bogus_key)
|
||||
zone = Zone('notfound.unit.tests.', [])
|
||||
provider.populate(zone, False)
|
||||
|
||||
# Happy Plan - Populate with mockup records
|
||||
provider = TransipProvider('test', 'unittest', self.bogus_key)
|
||||
provider._client = MockDomainService('unittest', self.bogus_key)
|
||||
provider._client.mockup(_expected.records)
|
||||
zone = Zone('unit.tests.', [])
|
||||
provider.populate(zone, False)
|
||||
@@ -208,19 +223,19 @@ N4OiVz1I3rbZGYa396lpxO6ku8yCglisL1yrSP6DdEUp66ntpKVd
|
||||
|
||||
# Happy Plan - Even if the zone has no records the zone should exist
|
||||
provider = TransipProvider('test', 'unittest', self.bogus_key)
|
||||
provider._client = MockDomainService('unittest', self.bogus_key)
|
||||
zone = Zone('unit.tests.', [])
|
||||
exists = provider.populate(zone, True)
|
||||
self.assertTrue(exists, 'populate should return true')
|
||||
|
||||
return
|
||||
|
||||
def test_plan(self):
|
||||
@patch('octodns.provider.transip.TransipProvider._domain_service',
|
||||
return_value=MockDomainService())
|
||||
def test_plan(self, _):
|
||||
_expected = self.make_expected()
|
||||
|
||||
# Test Happy plan, only create
|
||||
provider = TransipProvider('test', 'unittest', self.bogus_key)
|
||||
provider._client = MockDomainService('unittest', self.bogus_key)
|
||||
plan = provider.plan(_expected)
|
||||
|
||||
self.assertEqual(15, plan.change_counts['Create'])
|
||||
@@ -229,12 +244,13 @@ N4OiVz1I3rbZGYa396lpxO6ku8yCglisL1yrSP6DdEUp66ntpKVd
|
||||
|
||||
return
|
||||
|
||||
def test_apply(self):
|
||||
@patch('octodns.provider.transip.TransipProvider._domain_service',
|
||||
return_value=MockDomainService())
|
||||
def test_apply(self, _):
|
||||
_expected = self.make_expected()
|
||||
|
||||
# Test happy flow. Create all supoorted records
|
||||
provider = TransipProvider('test', 'unittest', self.bogus_key)
|
||||
provider._client = MockDomainService('unittest', self.bogus_key)
|
||||
plan = provider.plan(_expected)
|
||||
self.assertEqual(15, len(plan.changes))
|
||||
changes = provider.apply(plan)
|
||||
@@ -246,7 +262,6 @@ N4OiVz1I3rbZGYa396lpxO6ku8yCglisL1yrSP6DdEUp66ntpKVd
|
||||
changes = [] # reset changes
|
||||
with self.assertRaises(Exception) as ctx:
|
||||
provider = TransipProvider('test', 'unittest', self.bogus_key)
|
||||
provider._client = MockDomainService('unittest', self.bogus_key)
|
||||
plan = provider.plan(_expected)
|
||||
plan.desired.name = 'notfound.unit.tests.'
|
||||
changes = provider.apply(plan)
|
||||
@@ -265,7 +280,6 @@ N4OiVz1I3rbZGYa396lpxO6ku8yCglisL1yrSP6DdEUp66ntpKVd
|
||||
|
||||
with self.assertRaises(Exception) as ctx:
|
||||
provider = TransipProvider('test', 'unittest', self.bogus_key)
|
||||
provider._client = MockDomainService('unittest', self.bogus_key)
|
||||
plan = provider.plan(_expected)
|
||||
plan.desired.name = 'failsetdns.unit.tests.'
|
||||
changes = provider.apply(plan)
|
||||
|
||||
Reference in New Issue
Block a user