1
0
mirror of https://github.com/github/octodns.git synced 2024-05-11 05:55:00 +00:00

Start sketchin of Rackspace provider, half rewritten from powerdns...

This commit is contained in:
Vietor Davis
2017-06-26 17:03:15 -07:00
parent 3b349c96f0
commit 679c2be0e0
6 changed files with 903 additions and 0 deletions

View File

@@ -0,0 +1,386 @@
#
#
#
from __future__ import absolute_import, division, print_function, \
unicode_literals
from requests import HTTPError, Session, post
import json
import logging
from ..record import Create, Record
from .base import BaseProvider
class RackspaceProvider(BaseProvider):
SUPPORTS_GEO = False
TIMEOUT = 5
def __init__(self, username, api_key, *args, **kwargs):
'''
Rackspace API v1 Provider
rackspace:
class: octodns.provider.rackspace.RackspaceProvider
# The the username to authenticate with (required)
username: username
# The api key that grants access for that user (required)
api_key: api-key
'''
self.log = logging.getLogger('RackspaceProvider[{}]'.format(username))
super(RackspaceProvider, self).__init__(id, *args, **kwargs)
auth_token, dns_endpoint = self._get_auth_token(username, api_key)
self.dns_endpoint = dns_endpoint
sess = Session()
sess.headers.update({'X-Auth-Token': auth_token})
self._sess = sess
def _get_auth_token(self, username, api_key):
ret = post('https://identity.api.rackspacecloud.com/v2.0/tokens',
json={"auth": {"RAX-KSKEY:apiKeyCredentials": {"username": username, "apiKey": api_key}}},
)
cloud_dns_endpoint = [x for x in ret.json()['access']['serviceCatalog'] if x['name'] == 'cloudDNS'][0]['endpoints'][0]['publicURL']
return ret.json()['access']['token']['id'], cloud_dns_endpoint
def _get_zone_id_for(self, zone_name):
ret = self._request('GET', 'domains', pagination_key='domains')
if ret and 'name' in ret:
return [x for x in ret if x['name'] == zone_name][0]['id']
else:
return None
def _request(self, method, path, data=None, pagination_key=None):
self.log.debug('_request: method=%s, path=%s', method, path)
url = '{}/{}'.format(self.dns_endpoint, path)
if pagination_key:
return self._paginated_request_for_url(method, url, data, pagination_key)
else:
return self._request_for_url(method, url, data)
def _request_for_url(self, method, url, data):
resp = self._sess.request(method, url, json=data, timeout=self.TIMEOUT)
self.log.debug('_request: status=%d', resp.status_code)
resp.raise_for_status()
return resp
def _paginated_request_for_url(self, method, url, data, pagination_key):
acc = []
resp = self._sess.request(method, url, json=data, timeout=self.TIMEOUT)
self.log.debug('_request: status=%d', resp.status_code)
resp.raise_for_status()
acc.extend(resp.json()[pagination_key])
next_page = [x for x in resp.json().get('links', []) if x['rel'] == 'next']
if next_page:
url = next_page[0]['href']
return acc.extend(self._paginated_request_for_url(method, url, data, pagination_key))
else:
return acc
def _get(self, path, data=None):
return self._request('GET', path, data=data)
def _post(self, path, data=None):
return self._request('POST', path, data=data)
def _patch(self, path, data=None):
return self._request('PATCH', path, data=data)
def _data_for_multiple(self, rrset):
# TODO: geo not supported
return {
'type': rrset['type'],
'values': [r['content'] for r in rrset['records']],
'ttl': rrset['ttl']
}
_data_for_A = _data_for_multiple
_data_for_AAAA = _data_for_multiple
_data_for_NS = _data_for_multiple
def _data_for_single(self, record):
return {
'type': record['type'],
'value': record['data'],
'ttl': record['ttl']
}
_data_for_ALIAS = _data_for_single
_data_for_CNAME = _data_for_single
_data_for_PTR = _data_for_single
def _data_for_quoted(self, rrset):
return {
'type': rrset['type'],
'values': [r['content'][1:-1] for r in rrset['records']],
'ttl': rrset['ttl']
}
_data_for_SPF = _data_for_quoted
_data_for_TXT = _data_for_quoted
def _data_for_MX(self, rrset):
values = []
for record in rrset['records']:
priority, value = record['content'].split(' ', 1)
values.append({
'priority': priority,
'value': value,
})
return {
'type': rrset['type'],
'values': values,
'ttl': rrset['ttl']
}
def _data_for_NAPTR(self, rrset):
values = []
for record in rrset['records']:
order, preference, flags, service, regexp, replacement = \
record['content'].split(' ', 5)
values.append({
'order': order,
'preference': preference,
'flags': flags[1:-1],
'service': service[1:-1],
'regexp': regexp[1:-1],
'replacement': replacement,
})
return {
'type': rrset['type'],
'values': values,
'ttl': rrset['ttl']
}
def _data_for_SSHFP(self, rrset):
values = []
for record in rrset['records']:
algorithm, fingerprint_type, fingerprint = \
record['content'].split(' ', 2)
values.append({
'algorithm': algorithm,
'fingerprint_type': fingerprint_type,
'fingerprint': fingerprint,
})
return {
'type': rrset['type'],
'values': values,
'ttl': rrset['ttl']
}
def _data_for_SRV(self, rrset):
values = []
for record in rrset['records']:
priority, weight, port, target = \
record['content'].split(' ', 3)
values.append({
'priority': priority,
'weight': weight,
'port': port,
'target': target,
})
return {
'type': rrset['type'],
'values': values,
'ttl': rrset['ttl']
}
def populate(self, zone, target=False):
self.log.debug('populate: name=%s', zone.name)
resp = None
try:
domain_id = self._get_zone_id_for(zone.name)
resp = self._request('GET', '/domains/{}/records'.format(domain_id), pagination_key='records')
self.log.debug('populate: loaded')
except HTTPError as e:
if e.response.status_code == 401:
# Nicer error message for auth problems
raise Exception('Rackspace request unauthorized')
elif e.response.status_code == 422:
# 422 means powerdns doesn't know anything about the requsted
# domain. We'll just ignore it here and leave the zone
# untouched.
pass
else:
# just re-throw
raise
before = len(zone.records)
if resp:
for record in resp.json()['records']:
record_type = record['type']
if record_type == 'SOA':
continue
data_for = getattr(self, '_data_for_{}'.format(record_type))
record_name = zone.hostname_from_fqdn(record['name'])
record = Record.new(zone, record_name, data_for(record),
source=self)
zone.add_record(record)
self.log.info('populate: found %s records',
len(zone.records) - before)
def _records_for_multiple(self, record):
return [{'content': v, 'disabled': False}
for v in record.values]
_records_for_A = _records_for_multiple
_records_for_AAAA = _records_for_multiple
_records_for_NS = _records_for_multiple
def _records_for_single(self, record):
return [{'content': record.value, 'disabled': False}]
_records_for_ALIAS = _records_for_single
_records_for_CNAME = _records_for_single
_records_for_PTR = _records_for_single
def _records_for_quoted(self, record):
return [{'content': '"{}"'.format(v), 'disabled': False}
for v in record.values]
_records_for_SPF = _records_for_quoted
_records_for_TXT = _records_for_quoted
def _records_for_MX(self, record):
return [{
'content': '{} {}'.format(v.priority, v.value),
'disabled': False
} for v in record.values]
def _records_for_NAPTR(self, record):
return [{
'content': '{} {} "{}" "{}" "{}" {}'.format(v.order, v.preference,
v.flags, v.service,
v.regexp,
v.replacement),
'disabled': False
} for v in record.values]
def _records_for_SSHFP(self, record):
return [{
'content': '{} {} {}'.format(v.algorithm, v.fingerprint_type,
v.fingerprint),
'disabled': False
} for v in record.values]
def _records_for_SRV(self, record):
return [{
'content': '{} {} {} {}'.format(v.priority, v.weight, v.port,
v.target),
'disabled': False
} for v in record.values]
def _mod_Create(self, change):
new = change.new
records_for = getattr(self, '_records_for_{}'.format(new._type))
return {
'name': new.fqdn,
'type': new._type,
'ttl': new.ttl,
'changetype': 'REPLACE',
'records': records_for(new)
}
_mod_Update = _mod_Create
def _mod_Delete(self, change):
existing = change.existing
records_for = getattr(self, '_records_for_{}'.format(existing._type))
return {
'name': existing.fqdn,
'type': existing._type,
'ttl': existing.ttl,
'changetype': 'DELETE',
'records': records_for(existing)
}
def _get_nameserver_record(self, existing):
return None
def _extra_changes(self, existing, _):
self.log.debug('_extra_changes: zone=%s', existing.name)
ns = self._get_nameserver_record(existing)
if not ns:
return []
# sorting mostly to make things deterministic for testing, but in
# theory it let us find what we're after quickier (though sorting would
# ve more exepensive.)
for record in sorted(existing.records):
if record == ns:
# We've found the top-level NS record, return any changes
change = record.changes(ns, self)
self.log.debug('_extra_changes: change=%s', change)
if change:
# We need to modify an existing record
return [change]
# No change is necessary
return []
# No existing top-level NS
self.log.debug('_extra_changes: create')
return [Create(ns)]
def _get_error(self, http_error):
try:
return http_error.response.json()['error']
except Exception:
return ''
def _apply(self, plan):
desired = plan.desired
changes = plan.changes
self.log.debug('_apply: zone=%s, len(changes)=%d', desired.name,
len(changes))
mods = []
for change in changes:
class_name = change.__class__.__name__
mods.append(getattr(self, '_mod_{}'.format(class_name))(change))
self.log.debug('_apply: sending change request')
try:
self._patch('zones/{}'.format(desired.name),
data={'rrsets': mods})
self.log.debug('_apply: patched')
except HTTPError as e:
error = self._get_error(e)
if e.response.status_code != 422 or \
not error.startswith('Could not find domain '):
self.log.error('_apply: status=%d, text=%s',
e.response.status_code,
e.response.text)
raise
self.log.info('_apply: creating zone=%s', desired.name)
# 422 means powerdns doesn't know anything about the requsted
# domain. We'll try to create it with the correct records instead
# of update. Hopefully all the mods are creates :-)
data = {
'name': desired.name,
'kind': 'Master',
'masters': [],
'nameservers': [],
'rrsets': mods,
'soa_edit_api': 'INCEPTION-INCREMENT',
'serial': 0,
}
try:
self._post('zones', data)
except HTTPError as e:
self.log.error('_apply: status=%d, text=%s',
e.response.status_code,
e.response.text)
raise
self.log.debug('_apply: created')
self.log.debug('_apply: complete')

View File

@@ -0,0 +1,87 @@
{
"access": {
"token": {
"id": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"expires": "2014-11-24T22:05:39.115Z",
"tenant": {
"id": "110011",
"name": "110011"
},
"RAX-AUTH:authenticatedBy": [
"APIKEY"
]
},
"serviceCatalog": [
{
"name": "cloudDatabases",
"endpoints": [
{
"publicURL": "https://syd.databases.api.rackspacecloud.com/v1.0/110011",
"region": "SYD",
"tenantId": "110011"
},
{
"publicURL": "https://dfw.databases.api.rackspacecloud.com/v1.0/110011",
"region": "DFW",
"tenantId": "110011"
},
{
"publicURL": "https://ord.databases.api.rackspacecloud.com/v1.0/110011",
"region": "ORD",
"tenantId": "110011"
},
{
"publicURL": "https://iad.databases.api.rackspacecloud.com/v1.0/110011",
"region": "IAD",
"tenantId": "110011"
},
{
"publicURL": "https://hkg.databases.api.rackspacecloud.com/v1.0/110011",
"region": "HKG",
"tenantId": "110011"
}
],
"type": "rax:database"
},
{
"name": "cloudDNS",
"endpoints": [
{
"publicURL": "https://dns.api.rackspacecloud.com/v1.0/110011",
"tenantId": "110011"
}
],
"type": "rax:dns"
},
{
"name": "rackCDN",
"endpoints": [
{
"internalURL": "https://global.cdn.api.rackspacecloud.com/v1.0/110011",
"publicURL": "https://global.cdn.api.rackspacecloud.com/v1.0/110011",
"tenantId": "110011"
}
],
"type": "rax:cdn"
}
],
"user": {
"id": "123456",
"roles": [
{
"description": "A Role that allows a user access to keystone Service methods",
"id": "6",
"name": "compute:default",
"tenantId": "110011"
},
{
"description": "User Admin Role.",
"id": "3",
"name": "identity:user-admin"
}
],
"name": "jsmith",
"RAX-AUTH:defaultRegion": "ORD"
}
}
}

View File

@@ -0,0 +1,68 @@
{
"totalEntries" : 10,
"domains" : [ {
"name" : "example.com",
"id" : 2725233,
"comment" : "Optional domain comment...",
"updated" : "2011-06-24T01:23:15.000+0000",
"accountId" : 1234,
"emailAddress" : "sample@rackspace.com",
"created" : "2011-06-24T01:12:51.000+0000"
}, {
"name" : "sub1.example.com",
"id" : 2725257,
"comment" : "1st sample subdomain",
"updated" : "2011-06-23T03:09:34.000+0000",
"accountId" : 1234,
"emailAddress" : "sample@rackspace.com",
"created" : "2011-06-23T03:09:33.000+0000"
}, {
"name" : "sub2.example.com",
"id" : 2725258,
"comment" : "1st sample subdomain",
"updated" : "2011-06-23T03:52:55.000+0000",
"accountId" : 1234,
"emailAddress" : "sample@rackspace.com",
"created" : "2011-06-23T03:52:55.000+0000"
}, {
"name" : "north.example.com",
"id" : 2725260,
"updated" : "2011-06-23T03:53:10.000+0000",
"accountId" : 1234,
"emailAddress" : "sample@rackspace.com",
"created" : "2011-06-23T03:53:09.000+0000"
}, {
"name" : "south.example.com",
"id" : 2725261,
"comment" : "Final sample subdomain",
"updated" : "2011-06-23T03:53:14.000+0000",
"accountId" : 1234,
"emailAddress" : "sample@rackspace.com",
"created" : "2011-06-23T03:53:14.000+0000"
}, {
"name" : "region2.example.net",
"id" : 2725352,
"updated" : "2011-06-23T20:21:06.000+0000",
"accountId" : 1234,
"created" : "2011-06-23T19:24:27.000+0000"
}, {
"name" : "example.org",
"id" : 2718984,
"updated" : "2011-05-03T14:47:32.000+0000",
"accountId" : 1234,
"created" : "2011-05-03T14:47:30.000+0000"
}, {
"name" : "rackspace.example",
"id" : 2722346,
"updated" : "2011-06-21T15:54:31.000+0000",
"accountId" : 1234,
"created" : "2011-06-15T19:02:07.000+0000"
}, {
"name" : "dnsaas.example",
"id" : 2722347,
"comment" : "Sample comment",
"updated" : "2011-06-21T15:54:31.000+0000",
"accountId" : 1234,
"created" : "2011-06-15T19:02:07.000+0000"
} ]
}

View File

@@ -0,0 +1,33 @@
{
"totalEntries" : 6,
"records" : [ {
"name" : "ftp.example.com",
"id" : "A-6817754",
"type" : "A",
"data" : "192.0.2.8",
"updated" : "2011-05-19T13:07:08.000+0000",
"ttl" : 5771,
"created" : "2011-05-18T19:53:09.000+0000"
}, {
"name" : "example.com",
"id" : "A-6822994",
"type" : "A",
"data" : "192.0.2.17",
"updated" : "2011-06-24T01:12:52.000+0000",
"ttl" : 86400,
"created" : "2011-06-24T01:12:52.000+0000"
}, {
"name" : "example.com",
"id" : "NS-6251982",
"type" : "NS",
"data" : "ns.rackspace.com",
"updated" : "2011-06-24T01:12:51.000+0000",
"ttl" : 3600,
"created" : "2011-06-24T01:12:51.000+0000"
} ],
"links" : [ {
"content" : "",
"href" : "https://localhost/v1.0/1234/domains/domain_id/records?limit=3&offset=3",
"rel" : "next"
} ]
}

View File

@@ -0,0 +1,35 @@
{
"totalEntries" : 6,
"records" : [ {
"name" : "example.com",
"id" : "NS-6251983",
"type" : "NS",
"data" : "ns2.rackspace.com",
"updated" : "2011-06-24T01:12:51.000+0000",
"ttl" : 3600,
"created" : "2011-06-24T01:12:51.000+0000"
}, {
"name" : "example.com",
"priority" : 5,
"id" : "MX-3151218",
"type" : "MX",
"data" : "mail.example.com",
"updated" : "2011-06-24T01:12:53.000+0000",
"ttl" : 3600,
"created" : "2011-06-24T01:12:53.000+0000"
}, {
"name" : "www.example.com",
"id" : "CNAME-9778009",
"type" : "CNAME",
"comment" : "This is a comment on the CNAME record",
"data" : "example.com",
"updated" : "2011-06-24T01:12:54.000+0000",
"ttl" : 5400,
"created" : "2011-06-24T01:12:54.000+0000"
} ],
"links" : [ {
"content" : "",
"href" : "https://dns.api.rackspacecloud.com/v1.0/1234/domains/domain_id/records?limit=3&offset=0",
"rel" : "previous"
}]
}

View File

@@ -0,0 +1,294 @@
#
#
#
from __future__ import absolute_import, division, print_function, \
unicode_literals
import re
from json import loads, dumps
from os.path import dirname, join
from unittest import TestCase
from requests import HTTPError
from requests_mock import ANY, mock as requests_mock
from octodns.provider.rackspace import RackspaceProvider
from octodns.provider.yaml import YamlProvider
from octodns.record import Record
from octodns.zone import Zone
EMPTY_TEXT = '''
{
"totalEntries" : 6,
"records" : []
}
'''
with open('./tests/fixtures/rackspace-auth-response.json') as fh:
AUTH_RESPONSE = fh.read()
with open('./tests/fixtures/rackspace-list-domains-response.json') as fh:
LIST_DOMAINS_RESPONSE = fh.read()
with open('./tests/fixtures/rackspace-sample-recordset-page1.json') as fh:
RECORDS_PAGE_1 = fh.read()
with open('./tests/fixtures/rackspace-sample-recordset-page2.json') as fh:
RECORDS_PAGE_2 = fh.read()
def load_provider():
with requests_mock() as mock:
mock.post(ANY, status_code=200, text=AUTH_RESPONSE)
return RackspaceProvider('test', 'api-key')
class TestRackspaceSource(TestCase):
def test_provider(self):
provider = load_provider()
# Bad auth
with requests_mock() as mock:
mock.get(ANY, status_code=401, text='Unauthorized')
with self.assertRaises(Exception) as ctx:
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertTrue('unauthorized' in ctx.exception.message)
# General error
with requests_mock() as mock:
mock.get(ANY, status_code=502, text='Things caught fire')
with self.assertRaises(HTTPError) as ctx:
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals(502, ctx.exception.response.status_code)
# Non-existant zone doesn't populate anything
with requests_mock() as mock:
mock.get(ANY, status_code=422,
json={'error': "Could not find domain 'unit.tests.'"})
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals(set(), zone.records)
# The rest of this is messy/complicated b/c it's dealing with mocking
expected = Zone('unit.tests.', [])
source = YamlProvider('test', join(dirname(__file__), 'config'))
source.populate(expected)
expected_n = len(expected.records) - 1
self.assertEquals(14, expected_n)
# No diffs == no changes
with requests_mock() as mock:
mock.get(re.compile('domains$'), status_code=200, text=LIST_DOMAINS_RESPONSE)
mock.get(re.compile('records'), status_code=200, text=RECORDS_PAGE_1)
mock.get(re.compile('records.*offset=3'), status_code=200, text=RECORDS_PAGE_2)
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals(14, len(zone.records))
changes = expected.changes(zone, provider)
self.assertEquals(0, len(changes))
# Used in a minute
def assert_rrsets_callback(request, context):
data = loads(request.body)
self.assertEquals(expected_n, len(data['rrsets']))
return ''
# No existing records -> creates for every record in expected
with requests_mock() as mock:
mock.get(ANY, status_code=200, text=EMPTY_TEXT)
# post 201, is reponse to the create with data
mock.patch(ANY, status_code=201, text=assert_rrsets_callback)
plan = provider.plan(expected)
self.assertEquals(expected_n, len(plan.changes))
self.assertEquals(expected_n, provider.apply(plan))
# Non-existent zone -> creates for every record in expected
# OMG this is fucking ugly, probably better to ditch requests_mocks and
# just mock things for real as it doesn't seem to provide a way to get
# at the request params or verify that things were called from what I
# can tell
not_found = {'error': "Could not find domain 'unit.tests.'"}
with requests_mock() as mock:
# get 422's, unknown zone
mock.get(ANY, status_code=422, text='')
# patch 422's, unknown zone
mock.patch(ANY, status_code=422, text=dumps(not_found))
# post 201, is reponse to the create with data
mock.post(ANY, status_code=201, text=assert_rrsets_callback)
plan = provider.plan(expected)
self.assertEquals(expected_n, len(plan.changes))
self.assertEquals(expected_n, provider.apply(plan))
with requests_mock() as mock:
# get 422's, unknown zone
mock.get(ANY, status_code=422, text='')
# patch 422's,
data = {'error': "Key 'name' not present or not a String"}
mock.patch(ANY, status_code=422, text=dumps(data))
with self.assertRaises(HTTPError) as ctx:
plan = provider.plan(expected)
provider.apply(plan)
response = ctx.exception.response
self.assertEquals(422, response.status_code)
self.assertTrue('error' in response.json())
with requests_mock() as mock:
# get 422's, unknown zone
mock.get(ANY, status_code=422, text='')
# patch 500's, things just blew up
mock.patch(ANY, status_code=500, text='')
with self.assertRaises(HTTPError):
plan = provider.plan(expected)
provider.apply(plan)
with requests_mock() as mock:
# get 422's, unknown zone
mock.get(ANY, status_code=422, text='')
# patch 500's, things just blew up
mock.patch(ANY, status_code=422, text=dumps(not_found))
# post 422's, something wrong with create
mock.post(ANY, status_code=422, text='Hello Word!')
with self.assertRaises(HTTPError):
plan = provider.plan(expected)
provider.apply(plan)
def test_small_change(self):
provider = load_provider()
expected = Zone('unit.tests.', [])
source = YamlProvider('test', join(dirname(__file__), 'config'))
source.populate(expected)
self.assertEquals(15, len(expected.records))
# A small change to a single record
with requests_mock() as mock:
mock.get(ANY, status_code=200, text=FULL_TEXT)
missing = Zone(expected.name, [])
# Find and delete the SPF record
for record in expected.records:
if record._type != 'SPF':
missing.add_record(record)
def assert_delete_callback(request, context):
self.assertEquals({
'rrsets': [{
'records': [
{'content': '"v=spf1 ip4:192.168.0.1/16-all"',
'disabled': False}
],
'changetype': 'DELETE',
'type': 'SPF',
'name': 'spf.unit.tests.',
'ttl': 600
}]
}, loads(request.body))
return ''
mock.patch(ANY, status_code=201, text=assert_delete_callback)
plan = provider.plan(missing)
self.assertEquals(1, len(plan.changes))
self.assertEquals(1, provider.apply(plan))
def test_existing_nameservers(self):
ns_values = ['8.8.8.8.', '9.9.9.9.']
provider = load_provider()
expected = Zone('unit.tests.', [])
ns_record = Record.new(expected, '', {
'type': 'NS',
'ttl': 600,
'values': ns_values
})
expected.add_record(ns_record)
# no changes
with requests_mock() as mock:
data = {
'rrsets': [{
'comments': [],
'name': 'unit.tests.',
'records': [
{
'content': '8.8.8.8.',
'disabled': False
},
{
'content': '9.9.9.9.',
'disabled': False
}
],
'ttl': 600,
'type': 'NS'
}, {
'comments': [],
'name': 'unit.tests.',
'records': [{
'content': '1.2.3.4',
'disabled': False,
}],
'ttl': 60,
'type': 'A'
}]
}
mock.get(ANY, status_code=200, json=data)
unrelated_record = Record.new(expected, '', {
'type': 'A',
'ttl': 60,
'value': '1.2.3.4'
})
expected.add_record(unrelated_record)
plan = provider.plan(expected)
self.assertFalse(plan)
# remove it now that we don't need the unrelated change any longer
expected.records.remove(unrelated_record)
# ttl diff
with requests_mock() as mock:
data = {
'rrsets': [{
'comments': [],
'name': 'unit.tests.',
'records': [
{
'content': '8.8.8.8.',
'disabled': False
},
{
'content': '9.9.9.9.',
'disabled': False
},
],
'ttl': 3600,
'type': 'NS'
}]
}
mock.get(ANY, status_code=200, json=data)
plan = provider.plan(expected)
self.assertEquals(1, len(plan.changes))
# create
with requests_mock() as mock:
data = {
'rrsets': []
}
mock.get(ANY, status_code=200, json=data)
plan = provider.plan(expected)
self.assertEquals(1, len(plan.changes))