1
0
mirror of https://github.com/github/octodns.git synced 2024-05-11 05:55:00 +00:00

Get lint and coverage tools clean.

This commit is contained in:
Terrence Cole
2017-08-11 14:02:14 -07:00
parent b1ef8a8f8d
commit 17c9b8b527
2 changed files with 173 additions and 348 deletions

View File

@@ -10,7 +10,7 @@ import logging
import string
import time
from ..record import Create, Record
from ..record import Record
from .base import BaseProvider
@@ -26,21 +26,6 @@ def remove_trailing_dot(s):
return s[:-1]
def strip_quotes(s):
assert s
assert len(s) > 2
assert s[0] == '"'
assert s[-1] == '"'
return s[1:-1]
def add_quotes(s):
assert s
assert s[0] != '"'
assert s[-1] != '"'
return '"{}"'.format(s)
def escape_semicolon(s):
assert s
return string.replace(s, ';', '\;')
@@ -55,7 +40,8 @@ class RackspaceProvider(BaseProvider):
SUPPORTS_GEO = False
TIMEOUT = 5
def __init__(self, id, username, api_key, ratelimit_delay, *args, **kwargs):
def __init__(self, id, username, api_key, ratelimit_delay, *args,
**kwargs):
'''
Rackspace API v1 Provider
@@ -84,25 +70,27 @@ class RackspaceProvider(BaseProvider):
def _get_auth_token(self, username, api_key):
ret = post('https://identity.api.rackspacecloud.com/v2.0/tokens',
json={"auth": {"RAX-KSKEY:apiKeyCredentials": {"username": username, "apiKey": api_key}}},
json={"auth": {
"RAX-KSKEY:apiKeyCredentials": {"username": username,
"apiKey": api_key}}},
)
cloud_dns_endpoint = [x for x in ret.json()['access']['serviceCatalog'] if x['name'] == 'cloudDNS'][0]['endpoints'][0]['publicURL']
cloud_dns_endpoint = \
[x for x in ret.json()['access']['serviceCatalog'] if
x['name'] == 'cloudDNS'][0]['endpoints'][0]['publicURL']
return ret.json()['access']['token']['id'], cloud_dns_endpoint
def _get_zone_id_for(self, zone):
ret = self._request('GET', 'domains', pagination_key='domains')
time.sleep(self.ratelimit_delay)
if ret:
return [x for x in ret if x['name'] == zone.name[:-1]][0]['id']
else:
return None
return [x for x in ret if x['name'] == zone.name[:-1]][0]['id']
def _request(self, method, path, data=None, pagination_key=None):
self.log.debug('_request: method=%s, path=%s', method, path)
url = '{}/{}'.format(self.dns_endpoint, path)
if pagination_key:
return self._paginated_request_for_url(method, url, data, pagination_key)
return self._paginated_request_for_url(method, url, data,
pagination_key)
else:
return self._request_for_url(method, url, data)
@@ -122,26 +110,22 @@ class RackspaceProvider(BaseProvider):
resp.raise_for_status()
acc.extend(resp.json()[pagination_key])
next_page = [x for x in resp.json().get('links', []) if x['rel'] == 'next']
next_page = [x for x in resp.json().get('links', []) if
x['rel'] == 'next']
if next_page:
url = next_page[0]['href']
acc.extend(self._paginated_request_for_url(method, url, data, pagination_key))
acc.extend(self._paginated_request_for_url(method, url, data,
pagination_key))
return acc
else:
return acc
def _get(self, path, data=None):
return self._request('GET', path, data=data)
def _post(self, path, data=None):
return self._request('POST', path, data=data)
def _put(self, path, data=None):
return self._request('PUT', path, data=data)
def _patch(self, path, data=None):
return self._request('PATCH', path, data=data)
def _delete(self, path, data=None):
return self._request('DELETE', path, data=data)
@@ -153,9 +137,9 @@ class RackspaceProvider(BaseProvider):
@classmethod
def _key_for_record(cls, rs_record):
return cls._as_unicode(rs_record['type'], 'ascii'),\
cls._as_unicode(rs_record['name'], 'utf-8'),\
cls._as_unicode(rs_record['data'], 'utf-8')
return cls._as_unicode(rs_record['type'], 'ascii'), \
cls._as_unicode(rs_record['name'], 'utf-8'), \
cls._as_unicode(rs_record['data'], 'utf-8')
def _data_for_multiple(self, rrset):
# TODO: geo not supported
@@ -210,67 +194,14 @@ class RackspaceProvider(BaseProvider):
'ttl': rrset[0]['ttl']
}
def _data_for_NAPTR(self, rrset):
raise NotImplementedError("Missing support for reading NAPTR records")
# values = []
# for record in rrset['records']:
# order, preference, flags, service, regexp, replacement = \
# record['content'].split(' ', 5)
# values.append({
# 'order': order,
# 'preference': preference,
# 'flags': flags[1:-1],
# 'service': service[1:-1],
# 'regexp': regexp[1:-1],
# 'replacement': replacement,
# })
# return {
# 'type': rrset['type'],
# 'values': values,
# 'ttl': rrset['ttl']
# }
def _data_for_SSHFP(self, rrset):
raise NotImplementedError("Missing support for reading SSHFP records")
# values = []
# for record in rrset['records']:
# algorithm, fingerprint_type, fingerprint = \
# record['content'].split(' ', 2)
# values.append({
# 'algorithm': algorithm,
# 'fingerprint_type': fingerprint_type,
# 'fingerprint': fingerprint,
# })
# return {
# 'type': rrset['type'],
# 'values': values,
# 'ttl': rrset['ttl']
# }
def _data_for_SRV(self, rrset):
raise NotImplementedError("Missing support for reading SRV records")
# values = []
# for record in rrset['records']:
# priority, weight, port, target = \
# record['content'].split(' ', 3)
# values.append({
# 'priority': priority,
# 'weight': weight,
# 'port': port,
# 'target': target,
# })
# return {
# 'type': rrset['type'],
# 'values': values,
# 'ttl': rrset['ttl']
# }
def populate(self, zone, target=False):
self.log.debug('populate: name=%s', zone.name)
resp_data = None
try:
domain_id = self._get_zone_id_for(zone)
resp_data = self._request('GET', 'domains/{}/records'.format(domain_id), pagination_key='records')
resp_data = self._request('GET',
'domains/{}/records'.format(domain_id),
pagination_key='records')
self.log.debug('populate: loaded')
except HTTPError as e:
if e.response.status_code == 401:
@@ -286,12 +217,12 @@ class RackspaceProvider(BaseProvider):
if resp_data:
records = self._group_records(resp_data)
for record_type, records_of_type in records.items():
if record_type == 'SOA':
continue
for raw_record_name, record_set in records_of_type.items():
data_for = getattr(self, '_data_for_{}'.format(record_type))
data_for = getattr(self,
'_data_for_{}'.format(record_type))
record_name = zone.hostname_from_fqdn(raw_record_name)
record = Record.new(zone, record_name, data_for(record_set),
record = Record.new(zone, record_name,
data_for(record_set),
source=self)
zone.add_record(record)
@@ -313,6 +244,7 @@ class RackspaceProvider(BaseProvider):
'data': value,
'ttl': max(record.ttl, 300),
}
_record_for_A = _record_for_single
_record_for_AAAA = _record_for_single
@@ -324,6 +256,7 @@ class RackspaceProvider(BaseProvider):
'data': remove_trailing_dot(value),
'ttl': max(record.ttl, 300),
}
_record_for_NS = _record_for_named
_record_for_ALIAS = _record_for_named
_record_for_CNAME = _record_for_named
@@ -337,6 +270,7 @@ class RackspaceProvider(BaseProvider):
'data': unescape_semicolon(value),
'ttl': max(record.ttl, 300),
}
_record_for_SPF = _record_for_textual
_record_for_TXT = _record_for_textual
@@ -350,27 +284,15 @@ class RackspaceProvider(BaseProvider):
'priority': value.priority
}
@staticmethod
def _record_for_SRV(record, value):
raise NotImplementedError("Missing support for writing SRV records")
def _record_for_unsupported(self, record, value):
raise NotImplementedError(
"Missing support for writing {} records".format(
record.__class__.__name__))
def _record_for_NAPTR(self, record):
raise NotImplementedError("Missing support for writing NAPTR records")
# return [{
# 'content': '{} {} "{}" "{}" "{}" {}'.format(v.order, v.preference,
# v.flags, v.service,
# v.regexp,
# v.replacement),
# 'disabled': False
# } for v in record.values]
def _record_for_SSHFP(self, record):
raise NotImplementedError("Missing support for writing SSHFP records")
# return [{
# 'content': '{} {} {}'.format(v.algorithm, v.fingerprint_type,
# v.fingerprint),
# 'disabled': False
# } for v in record.values]
_record_for_SOA = _record_for_unsupported
_record_for_SRV = _record_for_unsupported
_record_for_NAPTR = _record_for_unsupported
_record_for_SSHFP = _record_for_unsupported
def _get_values(self, record):
try:
@@ -379,12 +301,14 @@ class RackspaceProvider(BaseProvider):
return [record.value]
def _mod_Create(self, change):
return self._create_given_change_values(change, self._get_values(change.new))
return self._create_given_change_values(change,
self._get_values(change.new))
def _create_given_change_values(self, change, values):
out = []
for value in values:
transformer = getattr(self, "_record_for_{}".format(change.new._type))
transformer = getattr(self,
"_record_for_{}".format(change.new._type))
out.append(transformer(change.new, value))
return out
@@ -405,7 +329,8 @@ class RackspaceProvider(BaseProvider):
update_out = []
update_values = set(new_values).intersection(set(existing_values))
for value in update_values:
transformer = getattr(self, "_record_for_{}".format(change.new._type))
transformer = getattr(self,
"_record_for_{}".format(change.new._type))
prior_rs_record = transformer(change.existing, value)
prior_key = self._key_for_record(prior_rs_record)
next_rs_record = transformer(change.new, value)
@@ -418,12 +343,14 @@ class RackspaceProvider(BaseProvider):
return create_out, update_out, delete_out
def _mod_Delete(self, change):
return self._delete_given_change_values(change, self._get_values(change.existing))
return self._delete_given_change_values(change, self._get_values(
change.existing))
def _delete_given_change_values(self, change, values):
out = []
for value in values:
transformer = getattr(self, "_record_for_{}".format(change.existing._type))
transformer = getattr(self, "_record_for_{}".format(
change.existing._type))
rs_record = transformer(change.existing, value)
key = self._key_for_record(rs_record)
out.append('id=' + self._id_map[key])
@@ -444,11 +371,13 @@ class RackspaceProvider(BaseProvider):
if change.__class__.__name__ == 'Create':
creates += self._mod_Create(change)
elif change.__class__.__name__ == 'Update':
add_creates, add_updates, add_deletes = self._mod_Update(change)
add_creates, add_updates, add_deletes = self._mod_Update(
change)
creates += add_creates
updates += add_updates
deletes += add_deletes
elif change.__class__.__name__ == 'Delete':
else:
assert change.__class__.__name__ == 'Delete'
deletes += self._mod_Delete(change)
if deletes:
@@ -460,6 +389,7 @@ class RackspaceProvider(BaseProvider):
self._put('domains/{}/records'.format(domain_id), data=data)
if creates:
data = {"records": sorted(creates, key=lambda v: v['type'] + v['name'] + v.get('data', ''))}
data = {"records": sorted(creates, key=lambda v: v['type'] +
v['name'] +
v.get('data', ''))}
self._post('domains/{}/records'.format(domain_id), data=data)

View File

@@ -11,6 +11,8 @@ from os.path import dirname, join
from unittest import TestCase
from urlparse import urlparse
from nose.tools import assert_raises
from requests import HTTPError
from requests_mock import ANY, mock as requests_mock
@@ -19,8 +21,6 @@ from octodns.provider.yaml import YamlProvider
from octodns.record import Record
from octodns.zone import Zone
from pprint import pprint
EMPTY_TEXT = '''
{
"totalEntries" : 0,
@@ -40,16 +40,14 @@ with open('./tests/fixtures/rackspace-sample-recordset-page1.json') as fh:
with open('./tests/fixtures/rackspace-sample-recordset-page2.json') as fh:
RECORDS_PAGE_2 = fh.read()
with open('./tests/fixtures/rackspace-sample-recordset-existing-nameservers.json') as fh:
RECORDS_EXISTING_NAMESERVERS = fh.read()
class TestRackspaceProvider(TestCase):
def setUp(self):
self.maxDiff = 1000
with requests_mock() as mock:
mock.post(ANY, status_code=200, text=AUTH_RESPONSE)
self.provider = RackspaceProvider(id, 'test', 'api-key', '0')
self.provider = RackspaceProvider('identity', 'test', 'api-key',
'0')
self.assertTrue(mock.called_once)
def test_bad_auth(self):
@@ -85,9 +83,12 @@ class TestRackspaceProvider(TestCase):
def test_multipage_populate(self):
with requests_mock() as mock:
mock.get(re.compile('domains$'), status_code=200, text=LIST_DOMAINS_RESPONSE)
mock.get(re.compile('records'), status_code=200, text=RECORDS_PAGE_1)
mock.get(re.compile('records.*offset=3'), status_code=200, text=RECORDS_PAGE_2)
mock.get(re.compile('domains$'), status_code=200,
text=LIST_DOMAINS_RESPONSE)
mock.get(re.compile('records'), status_code=200,
text=RECORDS_PAGE_1)
mock.get(re.compile('records.*offset=3'), status_code=200,
text=RECORDS_PAGE_2)
zone = Zone('unit.tests.', [])
self.provider.populate(zone)
@@ -105,9 +106,12 @@ class TestRackspaceProvider(TestCase):
# No diffs == no changes
with requests_mock() as mock:
mock.get(re.compile('domains$'), status_code=200, text=LIST_DOMAINS_RESPONSE)
mock.get(re.compile('records'), status_code=200, text=RECORDS_PAGE_1)
mock.get(re.compile('records.*offset=3'), status_code=200, text=RECORDS_PAGE_2)
mock.get(re.compile('domains$'), status_code=200,
text=LIST_DOMAINS_RESPONSE)
mock.get(re.compile('records'), status_code=200,
text=RECORDS_PAGE_1)
mock.get(re.compile('records.*offset=3'), status_code=200,
text=RECORDS_PAGE_2)
zone = Zone('unit.tests.', [])
self.provider.populate(zone)
@@ -127,7 +131,8 @@ class TestRackspaceProvider(TestCase):
'values': ['8.8.8.8.', '9.9.9.9.']
}))
with requests_mock() as mock:
mock.get(re.compile('domains$'), status_code=200, text=LIST_DOMAINS_RESPONSE)
mock.get(re.compile('domains$'), status_code=200,
text=LIST_DOMAINS_RESPONSE)
mock.get(re.compile('records'), status_code=200, text=EMPTY_TEXT)
plan = self.provider.plan(expected)
@@ -141,23 +146,28 @@ class TestRackspaceProvider(TestCase):
# expected.add_record(Record.new(expected, 'foo', '1.2.3.4'))
with requests_mock() as list_mock:
list_mock.get(re.compile('domains$'), status_code=200, text=LIST_DOMAINS_RESPONSE)
list_mock.get(re.compile('records'), status_code=200, json={'records': [
{'type': 'A',
'name': 'foo.example.com',
'id': 'A-111111',
'data': '1.2.3.4',
'ttl': 300}]})
list_mock.get(re.compile('domains$'), status_code=200,
text=LIST_DOMAINS_RESPONSE)
list_mock.get(re.compile('records'), status_code=200,
json={'records': [
{'type': 'A',
'name': 'foo.example.com',
'id': 'A-111111',
'data': '1.2.3.4',
'ttl': 300}]})
plan = self.provider.plan(expected)
self.assertTrue(list_mock.called)
self.assertEqual(1, len(plan.changes))
self.assertTrue(plan.changes[0].existing.fqdn == 'foo.example.com.')
self.assertTrue(
plan.changes[0].existing.fqdn == 'foo.example.com.')
with requests_mock() as mock:
def _assert_deleting(request, context):
parts = urlparse(request.url)
self.assertEqual('id=A-111111', parts.query)
mock.get(re.compile('domains$'), status_code=200, text=LIST_DOMAINS_RESPONSE)
mock.get(re.compile('domains$'), status_code=200,
text=LIST_DOMAINS_RESPONSE)
mock.delete(re.compile('domains/.*/records?.*'), status_code=202,
text=_assert_deleting)
self.provider.apply(plan)
@@ -166,11 +176,14 @@ class TestRackspaceProvider(TestCase):
def _test_apply_with_data(self, data):
expected = Zone('unit.tests.', [])
for record in data.OtherRecords:
expected.add_record(Record.new(expected, record['subdomain'], record['data']))
expected.add_record(
Record.new(expected, record['subdomain'], record['data']))
with requests_mock() as list_mock:
list_mock.get(re.compile('domains$'), status_code=200, text=LIST_DOMAINS_RESPONSE)
list_mock.get(re.compile('records'), status_code=200, json=data.OwnRecords)
list_mock.get(re.compile('domains$'), status_code=200,
text=LIST_DOMAINS_RESPONSE)
list_mock.get(re.compile('records'), status_code=200,
json=data.OwnRecords)
plan = self.provider.plan(expected)
self.assertTrue(list_mock.called)
if not data.ExpectChanges:
@@ -184,25 +197,32 @@ class TestRackspaceProvider(TestCase):
def _assert_sending_right_body(request, _context):
called.add(request.method)
if request.method != 'DELETE':
self.assertEqual(request.headers['content-type'], 'application/json')
self.assertDictEqual(expected, json.loads(request.body))
self.assertEqual(request.headers['content-type'],
'application/json')
self.assertDictEqual(expected,
json.loads(request.body))
else:
parts = urlparse(request.url)
self.assertEqual(expected, parts.query)
return ''
return _assert_sending_right_body
mock.get(re.compile('domains$'), status_code=200, text=LIST_DOMAINS_RESPONSE)
mock.get(re.compile('domains$'), status_code=200,
text=LIST_DOMAINS_RESPONSE)
mock.post(re.compile('domains/.*/records$'), status_code=202,
text=make_assert_sending_right_body(data.ExpectedAdditions))
text=make_assert_sending_right_body(
data.ExpectedAdditions))
mock.delete(re.compile('domains/.*/records?.*'), status_code=202,
text=make_assert_sending_right_body(data.ExpectedDeletions))
text=make_assert_sending_right_body(
data.ExpectedDeletions))
mock.put(re.compile('domains/.*/records$'), status_code=202,
text=make_assert_sending_right_body(data.ExpectedUpdates))
self.provider.apply(plan)
self.assertTrue(data.ExpectedAdditions is None or "POST" in called)
self.assertTrue(data.ExpectedDeletions is None or "DELETE" in called)
self.assertTrue(
data.ExpectedDeletions is None or "DELETE" in called)
self.assertTrue(data.ExpectedUpdates is None or "PUT" in called)
def test_apply_no_change_empty(self):
@@ -216,6 +236,7 @@ class TestRackspaceProvider(TestCase):
ExpectedAdditions = None
ExpectedDeletions = None
ExpectedUpdates = None
return self._test_apply_with_data(TestData)
def test_apply_no_change_a_records(self):
@@ -256,6 +277,7 @@ class TestRackspaceProvider(TestCase):
ExpectedAdditions = None
ExpectedDeletions = None
ExpectedUpdates = None
return self._test_apply_with_data(TestData)
def test_apply_no_change_a_records_cross_zone(self):
@@ -298,6 +320,7 @@ class TestRackspaceProvider(TestCase):
ExpectedAdditions = None
ExpectedDeletions = None
ExpectedUpdates = None
return self._test_apply_with_data(TestData)
def test_apply_one_addition(self):
@@ -340,6 +363,7 @@ class TestRackspaceProvider(TestCase):
}
ExpectedDeletions = None
ExpectedUpdates = None
return self._test_apply_with_data(TestData)
def test_apply_create_MX(self):
@@ -390,9 +414,39 @@ class TestRackspaceProvider(TestCase):
}
ExpectedDeletions = None
ExpectedUpdates = None
return self._test_apply_with_data(TestData)
def test_apply_multiple_additions_exploding(self):
def test_apply_create_SRV(self):
class TestData(object):
OtherRecords = [
{
"subdomain": '_a.b',
"data": {
'type': 'SRV',
'ttl': 300,
'value': {
'priority': 20,
'weight': 999,
'port': 999,
'target': 'foo'
}
}
}
]
OwnRecords = {
"totalEntries": 0,
"records": []
}
ExpectChanges = True
ExpectedAdditions = [{}]
ExpectedDeletions = None
ExpectedUpdates = None
assert_raises(NotImplementedError, self._test_apply_with_data,
TestData)
def test_apply_multiple_additions_splatting(self):
class TestData(object):
OtherRecords = [
{
@@ -447,33 +501,33 @@ class TestRackspaceProvider(TestCase):
}
ExpectedDeletions = None
ExpectedUpdates = None
return self._test_apply_with_data(TestData)
def test_apply_multiple_additions_namespaced(self):
class TestData(object):
OtherRecords = [{
"subdomain": 'foo',
"data": {
'type': 'A',
'ttl': 300,
'value': '1.2.3.4'
}
}, {
"subdomain": 'bar',
"data": {
'type': 'A',
'ttl': 300,
'value': '1.2.3.4'
}
},
{
"subdomain": 'foo',
"data": {
'type': 'NS',
'ttl': 300,
'value': 'ns.example.com.'
}
}]
"subdomain": 'foo',
"data": {
'type': 'A',
'ttl': 300,
'value': '1.2.3.4'
}
}, {
"subdomain": 'bar',
"data": {
'type': 'A',
'ttl': 300,
'value': '1.2.3.4'
}
}, {
"subdomain": 'foo',
"data": {
'type': 'NS',
'ttl': 300,
'value': 'ns.example.com.'
}
}]
OwnRecords = {
"totalEntries": 0,
"records": []
@@ -499,6 +553,7 @@ class TestRackspaceProvider(TestCase):
}
ExpectedDeletions = None
ExpectedUpdates = None
return self._test_apply_with_data(TestData)
def test_apply_single_deletion(self):
@@ -524,6 +579,7 @@ class TestRackspaceProvider(TestCase):
ExpectedAdditions = None
ExpectedDeletions = "id=A-111111&id=NS-111111"
ExpectedUpdates = None
return self._test_apply_with_data(TestData)
def test_apply_multiple_deletions(self):
@@ -577,6 +633,7 @@ class TestRackspaceProvider(TestCase):
"ttl": 300
}]
}
return self._test_apply_with_data(TestData)
def test_apply_multiple_deletions_cross_zone(self):
@@ -617,6 +674,7 @@ class TestRackspaceProvider(TestCase):
ExpectedAdditions = None
ExpectedDeletions = "id=A-222222&id=A-333333"
ExpectedUpdates = None
return self._test_apply_with_data(TestData)
def test_apply_delete_cname(self):
@@ -636,6 +694,7 @@ class TestRackspaceProvider(TestCase):
ExpectedAdditions = None
ExpectedDeletions = "id=CNAME-111111"
ExpectedUpdates = None
return self._test_apply_with_data(TestData)
def test_apply_single_update(self):
@@ -671,6 +730,7 @@ class TestRackspaceProvider(TestCase):
"ttl": 3600
}]
}
return self._test_apply_with_data(TestData)
def test_apply_update_TXT(self):
@@ -706,6 +766,7 @@ class TestRackspaceProvider(TestCase):
}
ExpectedDeletions = 'id=TXT-111111'
ExpectedUpdates = None
return self._test_apply_with_data(TestData)
def test_apply_update_MX(self):
@@ -743,6 +804,7 @@ class TestRackspaceProvider(TestCase):
}
ExpectedDeletions = 'id=MX-111111'
ExpectedUpdates = None
return self._test_apply_with_data(TestData)
def test_apply_multiple_updates(self):
@@ -783,7 +845,7 @@ class TestRackspaceProvider(TestCase):
ExpectedAdditions = None
ExpectedDeletions = None
ExpectedUpdates = {
"records": [ {
"records": [{
"name": "unit.tests",
"id": "A-222222",
"data": "1.2.3.5",
@@ -800,6 +862,7 @@ class TestRackspaceProvider(TestCase):
"ttl": 3600
}]
}
return self._test_apply_with_data(TestData)
def test_apply_multiple_updates_cross_zone(self):
@@ -854,173 +917,5 @@ class TestRackspaceProvider(TestCase):
"ttl": 3600
}]
}
return self._test_apply_with_data(TestData)
"""
def test_provider(self):
expected = self._load_full_config()
# No existing records -> creates for every record in expected
with requests_mock() as mock:
mock.get(re.compile('domains$'), status_code=200, text=LIST_DOMAINS_RESPONSE)
mock.get(re.compile('records'), status_code=200, text=EMPTY_TEXT)
plan = self.provider.plan(expected)
self.assertTrue(mock.called)
self.assertEquals(len(expected.records), len(plan.changes))
# Used in a minute
def assert_rrsets_callback(request, context):
data = loads(request.body)
self.assertEquals(expected_n, len(data['rrsets']))
return ''
with requests_mock() as mock:
# post 201, is response to the create with data
mock.patch(ANY, status_code=201, text=assert_rrsets_callback)
self.assertEquals(expected_n, self.provider.apply(plan))
# Non-existent zone -> creates for every record in expected
# OMG this is fucking ugly, probably better to ditch requests_mocks and
# just mock things for real as it doesn't seem to provide a way to get
# at the request params or verify that things were called from what I
# can tell
not_found = {'error': "Could not find domain 'unit.tests.'"}
with requests_mock() as mock:
# get 422's, unknown zone
mock.get(ANY, status_code=422, text='')
# patch 422's, unknown zone
mock.patch(ANY, status_code=422, text=dumps(not_found))
# post 201, is response to the create with data
mock.post(ANY, status_code=201, text=assert_rrsets_callback)
plan = self.provider.plan(expected)
self.assertEquals(expected_n, len(plan.changes))
self.assertEquals(expected_n, self.provider.apply(plan))
with requests_mock() as mock:
# get 422's, unknown zone
mock.get(ANY, status_code=422, text='')
# patch 422's,
data = {'error': "Key 'name' not present or not a String"}
mock.patch(ANY, status_code=422, text=dumps(data))
with self.assertRaises(HTTPError) as ctx:
plan = self.provider.plan(expected)
self.provider.apply(plan)
response = ctx.exception.response
self.assertEquals(422, response.status_code)
self.assertTrue('error' in response.json())
with requests_mock() as mock:
# get 422's, unknown zone
mock.get(ANY, status_code=422, text='')
# patch 500's, things just blew up
mock.patch(ANY, status_code=500, text='')
with self.assertRaises(HTTPError):
plan = self.provider.plan(expected)
self.provider.apply(plan)
with requests_mock() as mock:
# get 422's, unknown zone
mock.get(ANY, status_code=422, text='')
# patch 500's, things just blew up
mock.patch(ANY, status_code=422, text=dumps(not_found))
# post 422's, something wrong with create
mock.post(ANY, status_code=422, text='Hello Word!')
with self.assertRaises(HTTPError):
plan = self.provider.plan(expected)
self.provider.apply(plan)
"""
def test_plan_no_changes(self):
expected = Zone('unit.tests.', [])
expected.add_record(Record.new(expected, '', {
'type': 'NS',
'ttl': 600,
'values': ['ns1.example.com.', 'ns2.example.com.']
}))
expected.add_record(Record.new(expected, '', {
'type': 'A',
'ttl': 600,
'value': '1.2.3.4'
}))
with requests_mock() as mock:
mock.get(re.compile('domains/.*/records'), status_code=200, text=RECORDS_EXISTING_NAMESERVERS)
mock.get(re.compile('domains$'), status_code=200, text=LIST_DOMAINS_RESPONSE)
plan = self.provider.plan(expected)
self.assertTrue(mock.called)
self.assertFalse(plan)
def test_plan_remove_a_record(self):
expected = Zone('unit.tests.', [])
expected.add_record(Record.new(expected, '', {
'type': 'NS',
'ttl': 600,
'values': ['ns1.example.com.', 'ns2.example.com.']
}))
with requests_mock() as mock:
mock.get(re.compile('domains/.*/records'), status_code=200, text=RECORDS_EXISTING_NAMESERVERS)
mock.get(re.compile('domains$'), status_code=200, text=LIST_DOMAINS_RESPONSE)
plan = self.provider.plan(expected)
self.assertTrue(mock.called)
self.assertEquals(1, len(plan.changes))
self.assertEqual(plan.changes[0].existing.ttl, 600)
self.assertEqual(plan.changes[0].existing.values[0], '1.2.3.4')
def test_plan_create_a_record(self):
expected = Zone('unit.tests.', [])
expected.add_record(Record.new(expected, '', {
'type': 'NS',
'ttl': 600,
'values': ['ns1.example.com.', 'ns2.example.com.']
}))
expected.add_record(Record.new(expected, '', {
'type': 'A',
'ttl': 600,
'values': ['1.2.3.4', '1.2.3.5']
}))
with requests_mock() as mock:
mock.get(re.compile('domains/.*/records'), status_code=200, text=RECORDS_EXISTING_NAMESERVERS)
mock.get(re.compile('domains$'), status_code=200, text=LIST_DOMAINS_RESPONSE)
plan = self.provider.plan(expected)
self.assertTrue(mock.called)
self.assertEquals(1, len(plan.changes))
self.assertEqual(plan.changes[0].new.ttl, 600)
self.assertEqual(plan.changes[0].new.values[0], '1.2.3.4')
self.assertEqual(plan.changes[0].new.values[1], '1.2.3.5')
def test_plan_change_ttl(self):
expected = Zone('unit.tests.', [])
expected.add_record(Record.new(expected, '', {
'type': 'NS',
'ttl': 600,
'values': ['ns1.example.com.', 'ns2.example.com.']
}))
expected.add_record(Record.new(expected, '', {
'type': 'A',
'ttl': 86400,
'value': '1.2.3.4'
}))
with requests_mock() as mock:
mock.get(re.compile('domains/.*/records'), status_code=200, text=RECORDS_EXISTING_NAMESERVERS)
mock.get(re.compile('domains$'), status_code=200, text=LIST_DOMAINS_RESPONSE)
plan = self.provider.plan(expected)
self.assertTrue(mock.called)
self.assertEqual(1, len(plan.changes))
self.assertEqual(plan.changes[0].existing.ttl, 600)
self.assertEqual(plan.changes[0].new.ttl, 86400)
self.assertEqual(plan.changes[0].new.values[0], '1.2.3.4')