1
0
mirror of https://github.com/github/octodns.git synced 2024-05-11 05:55:00 +00:00

Merge remote-tracking branch 'upstream/master'

This commit is contained in:
Terrence Cole
2017-10-19 09:25:36 -07:00
21 changed files with 1652 additions and 26 deletions

View File

@@ -11,6 +11,7 @@ from unittest import TestCase
from octodns.record import Record
from octodns.manager import _AggregateTarget, MainThreadExecutor, Manager
from octodns.yaml import safe_load
from octodns.zone import Zone
from helpers import GeoProvider, NoSshFpProvider, SimpleProvider, \
@@ -211,6 +212,17 @@ class TestManager(TestCase):
with self.assertRaises(IOError):
manager.dump('unknown.zone.', tmpdir.dirname, False, 'in')
def test_dump_empty(self):
with TemporaryDirectory() as tmpdir:
environ['YAML_TMP_DIR'] = tmpdir.dirname
manager = Manager(get_config_filename('simple.yaml'))
manager.dump('empty.', tmpdir.dirname, False, 'in')
with open(join(tmpdir.dirname, 'empty.yaml')) as fh:
data = safe_load(fh, False)
self.assertFalse(data)
def test_validate_configs(self):
Manager(get_config_filename('simple-validate.yaml')).validate_configs()

View File

@@ -23,6 +23,8 @@ class HelperProvider(BaseProvider):
self.__extra_changes = extra_changes
self.apply_disabled = apply_disabled
self.include_change_callback = include_change_callback
self.update_pcent_threshold = Plan.MAX_SAFE_UPDATE_PCENT
self.delete_pcent_threshold = Plan.MAX_SAFE_DELETE_PCENT
def populate(self, zone, target=False, lenient=False):
pass
@@ -288,3 +290,59 @@ class TestBaseProvider(TestCase):
Plan.MAX_SAFE_DELETE_PCENT))]
Plan(zone, zone, changes).raise_if_unsafe()
def test_safe_updates_min_existing_override(self):
safe_pcent = .4
# 40% + 1 fails when more
# than MIN_EXISTING_RECORDS exist
zone = Zone('unit.tests.', [])
record = Record.new(zone, 'a', {
'ttl': 30,
'type': 'A',
'value': '1.2.3.4',
})
for i in range(int(Plan.MIN_EXISTING_RECORDS)):
zone.add_record(Record.new(zone, str(i), {
'ttl': 60,
'type': 'A',
'value': '2.3.4.5'
}))
changes = [Update(record, record)
for i in range(int(Plan.MIN_EXISTING_RECORDS *
safe_pcent) + 1)]
with self.assertRaises(UnsafePlan) as ctx:
Plan(zone, zone, changes,
update_pcent_threshold=safe_pcent).raise_if_unsafe()
self.assertTrue('Too many updates' in ctx.exception.message)
def test_safe_deletes_min_existing_override(self):
safe_pcent = .4
# 40% + 1 fails when more
# than MIN_EXISTING_RECORDS exist
zone = Zone('unit.tests.', [])
record = Record.new(zone, 'a', {
'ttl': 30,
'type': 'A',
'value': '1.2.3.4',
})
for i in range(int(Plan.MIN_EXISTING_RECORDS)):
zone.add_record(Record.new(zone, str(i), {
'ttl': 60,
'type': 'A',
'value': '2.3.4.5'
}))
changes = [Delete(record)
for i in range(int(Plan.MIN_EXISTING_RECORDS *
safe_pcent) + 1)]
with self.assertRaises(UnsafePlan) as ctx:
Plan(zone, zone, changes,
delete_pcent_threshold=safe_pcent).raise_if_unsafe()
self.assertTrue('Too many deletes' in ctx.exception.message)

View File

@@ -601,6 +601,7 @@ class TestDynProviderGeo(TestCase):
provider = DynProvider('test', 'cust', 'user', 'pass', True)
# short-circuit session checking
provider._dyn_sess = True
provider.log.warn = MagicMock()
# no tds
mock.side_effect = [{'data': []}]
@@ -649,6 +650,10 @@ class TestDynProviderGeo(TestCase):
set(tds.keys()))
self.assertEquals(['A'], tds['unit.tests.'].keys())
self.assertEquals(['A'], tds['geo.unit.tests.'].keys())
provider.log.warn.assert_called_with("Failed to load TraficDirector "
"'%s': %s", 'something else',
'need more than 1 value to '
'unpack')
@patch('dyn.core.SessionEngine.execute')
def test_traffic_director_monitor(self, mock):

View File

@@ -0,0 +1,429 @@
#
#
#
from __future__ import absolute_import, division, print_function, \
unicode_literals
from octodns.record import Create, Delete, Update, Record
from octodns.provider.googlecloud import GoogleCloudProvider
from octodns.zone import Zone
from octodns.provider.base import Plan, BaseProvider
from unittest import TestCase
from mock import Mock, patch, PropertyMock
zone = Zone(name='unit.tests.', sub_zones=[])
octo_records = []
octo_records.append(Record.new(zone, '', {
'ttl': 0,
'type': 'A',
'values': ['1.2.3.4', '10.10.10.10']}))
octo_records.append(Record.new(zone, 'a', {
'ttl': 1,
'type': 'A',
'values': ['1.2.3.4', '1.1.1.1']}))
octo_records.append(Record.new(zone, 'aa', {
'ttl': 9001,
'type': 'A',
'values': ['1.2.4.3']}))
octo_records.append(Record.new(zone, 'aaa', {
'ttl': 2,
'type': 'A',
'values': ['1.1.1.3']}))
octo_records.append(Record.new(zone, 'cname', {
'ttl': 3,
'type': 'CNAME',
'value': 'a.unit.tests.'}))
octo_records.append(Record.new(zone, 'mx1', {
'ttl': 3,
'type': 'MX',
'values': [{
'priority': 10,
'value': 'mx1.unit.tests.',
}, {
'priority': 20,
'value': 'mx2.unit.tests.',
}]}))
octo_records.append(Record.new(zone, 'mx2', {
'ttl': 3,
'type': 'MX',
'values': [{
'priority': 10,
'value': 'mx1.unit.tests.',
}]}))
octo_records.append(Record.new(zone, '', {
'ttl': 4,
'type': 'NS',
'values': ['ns1.unit.tests.', 'ns2.unit.tests.']}))
octo_records.append(Record.new(zone, 'foo', {
'ttl': 5,
'type': 'NS',
'value': 'ns1.unit.tests.'}))
octo_records.append(Record.new(zone, '_srv._tcp', {
'ttl': 6,
'type': 'SRV',
'values': [{
'priority': 10,
'weight': 20,
'port': 30,
'target': 'foo-1.unit.tests.',
}, {
'priority': 12,
'weight': 30,
'port': 30,
'target': 'foo-2.unit.tests.',
}]}))
octo_records.append(Record.new(zone, '_srv2._tcp', {
'ttl': 7,
'type': 'SRV',
'values': [{
'priority': 12,
'weight': 17,
'port': 1,
'target': 'srvfoo.unit.tests.',
}]}))
octo_records.append(Record.new(zone, 'txt1', {
'ttl': 8,
'type': 'TXT',
'value': 'txt singleton test'}))
octo_records.append(Record.new(zone, 'txt2', {
'ttl': 9,
'type': 'TXT',
'values': ['txt multiple test', 'txt multiple test 2']}))
octo_records.append(Record.new(zone, 'naptr', {
'ttl': 9,
'type': 'NAPTR',
'values': [{
'order': 100,
'preference': 10,
'flags': 'S',
'service': 'SIP+D2U',
'regexp': "!^.*$!sip:customer-service@unit.tests!",
'replacement': '_sip._udp.unit.tests.'
}]}))
octo_records.append(Record.new(zone, 'caa', {
'ttl': 9,
'type': 'CAA',
'value': {
'flags': 0,
'tag': 'issue',
'value': 'ca.unit.tests',
}}))
for record in octo_records:
zone.add_record(record)
# This is the format which the google API likes.
resource_record_sets = [
('unit.tests.', u'A', 0, [u'1.2.3.4', u'10.10.10.10']),
(u'a.unit.tests.', u'A', 1, [u'1.1.1.1', u'1.2.3.4']),
(u'aa.unit.tests.', u'A', 9001, [u'1.2.4.3']),
(u'aaa.unit.tests.', u'A', 2, [u'1.1.1.3']),
(u'cname.unit.tests.', u'CNAME', 3, [u'a.unit.tests.']),
(u'mx1.unit.tests.', u'MX', 3,
[u'10 mx1.unit.tests.', u'20 mx2.unit.tests.']),
(u'mx2.unit.tests.', u'MX', 3, [u'10 mx1.unit.tests.']),
('unit.tests.', u'NS', 4, [u'ns1.unit.tests.', u'ns2.unit.tests.']),
(u'foo.unit.tests.', u'NS', 5, [u'ns1.unit.tests.']),
(u'_srv._tcp.unit.tests.', u'SRV', 6,
[u'10 20 30 foo-1.unit.tests.', u'12 30 30 foo-2.unit.tests.']),
(u'_srv2._tcp.unit.tests.', u'SRV', 7, [u'12 17 1 srvfoo.unit.tests.']),
(u'txt1.unit.tests.', u'TXT', 8, [u'txt singleton test']),
(u'txt2.unit.tests.', u'TXT', 9,
[u'txt multiple test', u'txt multiple test 2']),
(u'naptr.unit.tests.', u'NAPTR', 9, [
u'100 10 "S" "SIP+D2U" "!^.*$!sip:customer-service@unit.tests!"'
u' _sip._udp.unit.tests.']),
(u'caa.unit.tests.', u'CAA', 9, [u'0 issue ca.unit.tests'])
]
class DummyResourceRecordSet:
def __init__(self, record_name, record_type, ttl, rrdatas):
self.name = record_name
self.record_type = record_type
self.ttl = ttl
self.rrdatas = rrdatas
def __eq__(self, other):
try:
return self.name == other.name \
and self.record_type == other.record_type \
and self.ttl == other.ttl \
and sorted(self.rrdatas) == sorted(other.rrdatas)
except:
return False
def __repr__(self):
return "{} {} {} {!s}"\
.format(self.name, self.record_type, self.ttl, self.rrdatas)
def __hash__(self):
return hash(repr(self))
class DummyGoogleCloudZone:
def __init__(self, dns_name, name=""):
self.dns_name = dns_name
self.name = name
def resource_record_set(self, *args):
return DummyResourceRecordSet(*args)
def list_resource_record_sets(self, *args):
pass
def create(self, *args, **kwargs):
pass
class DummyIterator:
"""Returns a mock DummyIterator object to use in testing.
This is because API calls for google cloud DNS, if paged, contains a
"next_page_token", which can be used to grab a subsequent
iterator with more results.
:type return: DummyIterator
"""
def __init__(self, list_of_stuff, page_token=None):
self.iterable = iter(list_of_stuff)
self.next_page_token = page_token
def __iter__(self):
return self
def next(self):
return self.iterable.next()
class TestGoogleCloudProvider(TestCase):
@patch('octodns.provider.googlecloud.dns')
def _get_provider(*args):
'''Returns a mock GoogleCloudProvider object to use in testing.
:type return: GoogleCloudProvider
'''
return GoogleCloudProvider(id=1, project="mock")
@patch('octodns.provider.googlecloud.dns')
def test___init__(self, *_):
self.assertIsInstance(GoogleCloudProvider(id=1,
credentials_file="test",
project="unit test"),
BaseProvider)
self.assertIsInstance(GoogleCloudProvider(id=1),
BaseProvider)
@patch('octodns.provider.googlecloud.time.sleep')
@patch('octodns.provider.googlecloud.dns')
def test__apply(self, *_):
class DummyDesired:
def __init__(self, name, changes):
self.name = name
self.changes = changes
apply_z = Zone("unit.tests.", [])
create_r = Record.new(apply_z, '', {
'ttl': 0,
'type': 'A',
'values': ['1.2.3.4', '10.10.10.10']})
delete_r = Record.new(apply_z, 'a', {
'ttl': 1,
'type': 'A',
'values': ['1.2.3.4', '1.1.1.1']})
update_existing_r = Record.new(apply_z, 'aa', {
'ttl': 9001,
'type': 'A',
'values': ['1.2.4.3']})
update_new_r = Record.new(apply_z, 'aa', {
'ttl': 666,
'type': 'A',
'values': ['1.4.3.2']})
gcloud_zone_mock = DummyGoogleCloudZone("unit.tests.", "unit-tests")
status_mock = Mock()
return_values_for_status = iter(
["pending"] * 11 + ['done', 'done'])
type(status_mock).status = PropertyMock(
side_effect=return_values_for_status.next)
gcloud_zone_mock.changes = Mock(return_value=status_mock)
provider = self._get_provider()
provider.gcloud_client = Mock()
provider._gcloud_zones = {"unit.tests.": gcloud_zone_mock}
desired = Mock()
desired.name = "unit.tests."
changes = []
changes.append(Create(create_r))
changes.append(Delete(delete_r))
changes.append(Update(existing=update_existing_r, new=update_new_r))
provider.apply(Plan(
existing=[update_existing_r, delete_r],
desired=desired,
changes=changes
))
calls_mock = gcloud_zone_mock.changes.return_value
mocked_calls = []
for mock_call in calls_mock.add_record_set.mock_calls:
mocked_calls.append(mock_call[1][0])
self.assertEqual(mocked_calls, [
DummyResourceRecordSet(
'unit.tests.', 'A', 0, ['1.2.3.4', '10.10.10.10']),
DummyResourceRecordSet(
'aa.unit.tests.', 'A', 666, ['1.4.3.2'])
])
mocked_calls2 = []
for mock_call in calls_mock.delete_record_set.mock_calls:
mocked_calls2.append(mock_call[1][0])
self.assertEqual(mocked_calls2, [
DummyResourceRecordSet(
'a.unit.tests.', 'A', 1, ['1.2.3.4', '1.1.1.1']),
DummyResourceRecordSet(
'aa.unit.tests.', 'A', 9001, ['1.2.4.3'])
])
type(status_mock).status = "pending"
with self.assertRaises(RuntimeError):
provider.apply(Plan(
existing=[update_existing_r, delete_r],
desired=desired,
changes=changes
))
unsupported_change = Mock()
unsupported_change.__len__ = Mock(return_value=1)
type_mock = Mock()
type_mock._type = "A"
unsupported_change.record = type_mock
mock_plan = Mock()
type(mock_plan).desired = PropertyMock(return_value=DummyDesired(
"dummy name", []))
type(mock_plan).changes = [unsupported_change]
with self.assertRaises(RuntimeError):
provider.apply(mock_plan)
def test__get_gcloud_client(self):
provider = self._get_provider()
self.assertIsInstance(provider, GoogleCloudProvider)
@patch('octodns.provider.googlecloud.dns')
def test_populate(self, _):
def _get_mock_zones(page_token=None):
if not page_token:
return DummyIterator([
DummyGoogleCloudZone('example.com.'),
], page_token="MOCK_PAGE_TOKEN")
elif page_token == "MOCK_PAGE_TOKEN":
return DummyIterator([
DummyGoogleCloudZone('example2.com.'),
], page_token="MOCK_PAGE_TOKEN2")
return DummyIterator([
google_cloud_zone
])
def _get_mock_record_sets(page_token=None):
if not page_token:
return DummyIterator(
[DummyResourceRecordSet(*v) for v in
resource_record_sets[:3]], page_token="MOCK_PAGE_TOKEN")
elif page_token == "MOCK_PAGE_TOKEN":
return DummyIterator(
[DummyResourceRecordSet(*v) for v in
resource_record_sets[3:5]], page_token="MOCK_PAGE_TOKEN2")
return DummyIterator(
[DummyResourceRecordSet(*v) for v in resource_record_sets[5:]])
google_cloud_zone = DummyGoogleCloudZone('unit.tests.')
provider = self._get_provider()
provider.gcloud_client.list_zones = Mock(side_effect=_get_mock_zones)
google_cloud_zone.list_resource_record_sets = Mock(
side_effect=_get_mock_record_sets)
self.assertEqual(provider.gcloud_zones.get("unit.tests.").dns_name,
"unit.tests.")
test_zone = Zone('unit.tests.', [])
provider.populate(test_zone)
# test_zone gets fed the same records as zone does, except it's in
# the format returned by google API, so after populate they should look
# excactly the same.
self.assertEqual(test_zone.records, zone.records)
test_zone2 = Zone('nonexistant.zone.', [])
provider.populate(test_zone2, False, False)
self.assertEqual(len(test_zone2.records), 0,
msg="Zone should not get records from wrong domain")
provider.SUPPORTS = set()
test_zone3 = Zone('unit.tests.', [])
provider.populate(test_zone3)
self.assertEqual(len(test_zone3.records), 0)
@patch('octodns.provider.googlecloud.dns')
def test_populate_corner_cases(self, _):
provider = self._get_provider()
test_zone = Zone('unit.tests.', [])
not_same_fqdn = DummyResourceRecordSet(
'unit.tests.gr', u'A', 0, [u'1.2.3.4']),
provider._get_gcloud_records = Mock(
side_effect=[not_same_fqdn])
provider._gcloud_zones = {
"unit.tests.": DummyGoogleCloudZone("unit.tests.", "unit-tests")}
provider.populate(test_zone)
self.assertEqual(len(test_zone.records), 1)
self.assertEqual(test_zone.records.pop().fqdn,
u'unit.tests.gr.unit.tests.')
def test__get_gcloud_zone(self):
provider = self._get_provider()
provider.gcloud_client = Mock()
provider.gcloud_client.list_zones = Mock(
return_value=DummyIterator([]))
self.assertIsNone(provider.gcloud_zones.get("nonexistant.xone"),
msg="Check that nonexistant zones return None when"
"there's no create=True flag")
def test__get_rrsets(self):
provider = self._get_provider()
dummy_gcloud_zone = DummyGoogleCloudZone("unit.tests")
for octo_record in octo_records:
_rrset_func = getattr(
provider, '_rrset_for_{}'.format(octo_record._type))
self.assertEqual(
_rrset_func(dummy_gcloud_zone, octo_record).record_type,
octo_record._type
)
def test__create_zone(self):
provider = self._get_provider()
provider.gcloud_client = Mock()
provider.gcloud_client.list_zones = Mock(
return_value=DummyIterator([]))
mock_zone = provider._create_gcloud_zone("nonexistant.zone.mock")
mock_zone.create.assert_called()
provider.gcloud_client.zone.assert_called()

View File

@@ -0,0 +1,359 @@
#
#
#
from __future__ import absolute_import, division, print_function, \
unicode_literals
from unittest import TestCase
from mock import patch, call
from ovh import APIError
from octodns.provider.ovh import OvhProvider
from octodns.record import Record
from octodns.zone import Zone
class TestOvhProvider(TestCase):
api_record = []
zone = Zone('unit.tests.', [])
expected = set()
# A, subdomain=''
api_record.append({
'fieldType': 'A',
'ttl': 100,
'target': '1.2.3.4',
'subDomain': '',
'id': 1
})
expected.add(Record.new(zone, '', {
'ttl': 100,
'type': 'A',
'value': '1.2.3.4',
}))
# A, subdomain='sub
api_record.append({
'fieldType': 'A',
'ttl': 200,
'target': '1.2.3.4',
'subDomain': 'sub',
'id': 2
})
expected.add(Record.new(zone, 'sub', {
'ttl': 200,
'type': 'A',
'value': '1.2.3.4',
}))
# CNAME
api_record.append({
'fieldType': 'CNAME',
'ttl': 300,
'target': 'unit.tests.',
'subDomain': 'www2',
'id': 3
})
expected.add(Record.new(zone, 'www2', {
'ttl': 300,
'type': 'CNAME',
'value': 'unit.tests.',
}))
# MX
api_record.append({
'fieldType': 'MX',
'ttl': 400,
'target': '10 mx1.unit.tests.',
'subDomain': '',
'id': 4
})
expected.add(Record.new(zone, '', {
'ttl': 400,
'type': 'MX',
'values': [{
'preference': 10,
'exchange': 'mx1.unit.tests.',
}]
}))
# NAPTR
api_record.append({
'fieldType': 'NAPTR',
'ttl': 500,
'target': '10 100 "S" "SIP+D2U" "!^.*$!sip:info@bar.example.com!" .',
'subDomain': 'naptr',
'id': 5
})
expected.add(Record.new(zone, 'naptr', {
'ttl': 500,
'type': 'NAPTR',
'values': [{
'flags': 'S',
'order': 10,
'preference': 100,
'regexp': '!^.*$!sip:info@bar.example.com!',
'replacement': '.',
'service': 'SIP+D2U',
}]
}))
# NS
api_record.append({
'fieldType': 'NS',
'ttl': 600,
'target': 'ns1.unit.tests.',
'subDomain': '',
'id': 6
})
api_record.append({
'fieldType': 'NS',
'ttl': 600,
'target': 'ns2.unit.tests.',
'subDomain': '',
'id': 7
})
expected.add(Record.new(zone, '', {
'ttl': 600,
'type': 'NS',
'values': ['ns1.unit.tests.', 'ns2.unit.tests.'],
}))
# NS with sub
api_record.append({
'fieldType': 'NS',
'ttl': 700,
'target': 'ns3.unit.tests.',
'subDomain': 'www3',
'id': 8
})
api_record.append({
'fieldType': 'NS',
'ttl': 700,
'target': 'ns4.unit.tests.',
'subDomain': 'www3',
'id': 9
})
expected.add(Record.new(zone, 'www3', {
'ttl': 700,
'type': 'NS',
'values': ['ns3.unit.tests.', 'ns4.unit.tests.'],
}))
api_record.append({
'fieldType': 'SRV',
'ttl': 800,
'target': '10 20 30 foo-1.unit.tests.',
'subDomain': '_srv._tcp',
'id': 10
})
api_record.append({
'fieldType': 'SRV',
'ttl': 800,
'target': '40 50 60 foo-2.unit.tests.',
'subDomain': '_srv._tcp',
'id': 11
})
expected.add(Record.new(zone, '_srv._tcp', {
'ttl': 800,
'type': 'SRV',
'values': [{
'priority': 10,
'weight': 20,
'port': 30,
'target': 'foo-1.unit.tests.',
}, {
'priority': 40,
'weight': 50,
'port': 60,
'target': 'foo-2.unit.tests.',
}]
}))
# PTR
api_record.append({
'fieldType': 'PTR',
'ttl': 900,
'target': 'unit.tests.',
'subDomain': '4',
'id': 12
})
expected.add(Record.new(zone, '4', {
'ttl': 900,
'type': 'PTR',
'value': 'unit.tests.'
}))
# SPF
api_record.append({
'fieldType': 'SPF',
'ttl': 1000,
'target': 'v=spf1 include:unit.texts.rerirect ~all',
'subDomain': '',
'id': 13
})
expected.add(Record.new(zone, '', {
'ttl': 1000,
'type': 'SPF',
'value': 'v=spf1 include:unit.texts.rerirect ~all'
}))
# SSHFP
api_record.append({
'fieldType': 'SSHFP',
'ttl': 1100,
'target': '1 1 bf6b6825d2977c511a475bbefb88aad54a92ac73 ',
'subDomain': '',
'id': 14
})
expected.add(Record.new(zone, '', {
'ttl': 1100,
'type': 'SSHFP',
'value': {
'algorithm': 1,
'fingerprint': 'bf6b6825d2977c511a475bbefb88aad54a92ac73',
'fingerprint_type': 1
}
}))
# AAAA
api_record.append({
'fieldType': 'AAAA',
'ttl': 1200,
'target': '1:1ec:1::1',
'subDomain': '',
'id': 15
})
expected.add(Record.new(zone, '', {
'ttl': 200,
'type': 'AAAA',
'value': '1:1ec:1::1',
}))
@patch('ovh.Client')
def test_populate(self, client_mock):
provider = OvhProvider('test', 'endpoint', 'application_key',
'application_secret', 'consumer_key')
with patch.object(provider._client, 'get') as get_mock:
zone = Zone('unit.tests.', [])
get_mock.side_effect = APIError('boom')
with self.assertRaises(APIError) as ctx:
provider.populate(zone)
self.assertEquals(get_mock.side_effect, ctx.exception)
with patch.object(provider._client, 'get') as get_mock:
zone = Zone('unit.tests.', [])
get_returns = [[record['id'] for record in self.api_record]]
get_returns += self.api_record
get_mock.side_effect = get_returns
provider.populate(zone)
self.assertEquals(self.expected, zone.records)
@patch('ovh.Client')
def test_apply(self, client_mock):
provider = OvhProvider('test', 'endpoint', 'application_key',
'application_secret', 'consumer_key')
desired = Zone('unit.tests.', [])
for r in self.expected:
desired.add_record(r)
with patch.object(provider._client, 'post') as get_mock:
plan = provider.plan(desired)
get_mock.side_effect = APIError('boom')
with self.assertRaises(APIError) as ctx:
provider.apply(plan)
self.assertEquals(get_mock.side_effect, ctx.exception)
with patch.object(provider._client, 'get') as get_mock:
get_returns = [[1, 2], {
'fieldType': 'A',
'ttl': 600,
'target': '5.6.7.8',
'subDomain': '',
'id': 100
}, {'fieldType': 'A',
'ttl': 600,
'target': '5.6.7.8',
'subDomain': 'fake',
'id': 101
}]
get_mock.side_effect = get_returns
plan = provider.plan(desired)
with patch.object(provider._client, 'post') as post_mock:
with patch.object(provider._client, 'delete') as delete_mock:
with patch.object(provider._client, 'get') as get_mock:
get_mock.side_effect = [[100], [101]]
provider.apply(plan)
wanted_calls = [
call(u'/domain/zone/unit.tests/record',
fieldType=u'A',
subDomain=u'', target=u'1.2.3.4', ttl=100),
call(u'/domain/zone/unit.tests/record',
fieldType=u'SRV',
subDomain=u'10 20 30 foo-1.unit.tests.',
target='_srv._tcp', ttl=800),
call(u'/domain/zone/unit.tests/record',
fieldType=u'SRV',
subDomain=u'40 50 60 foo-2.unit.tests.',
target='_srv._tcp', ttl=800),
call(u'/domain/zone/unit.tests/record',
fieldType=u'PTR', subDomain='4',
target=u'unit.tests.', ttl=900),
call(u'/domain/zone/unit.tests/record',
fieldType=u'NS', subDomain='www3',
target=u'ns3.unit.tests.', ttl=700),
call(u'/domain/zone/unit.tests/record',
fieldType=u'NS', subDomain='www3',
target=u'ns4.unit.tests.', ttl=700),
call(u'/domain/zone/unit.tests/record',
fieldType=u'SSHFP',
subDomain=u'1 1 bf6b6825d2977c511a475bbefb88a'
u'ad54'
u'a92ac73',
target=u'', ttl=1100),
call(u'/domain/zone/unit.tests/record',
fieldType=u'AAAA', subDomain=u'',
target=u'1:1ec:1::1', ttl=200),
call(u'/domain/zone/unit.tests/record',
fieldType=u'MX', subDomain=u'',
target=u'10 mx1.unit.tests.', ttl=400),
call(u'/domain/zone/unit.tests/record',
fieldType=u'CNAME', subDomain='www2',
target=u'unit.tests.', ttl=300),
call(u'/domain/zone/unit.tests/record',
fieldType=u'SPF', subDomain=u'',
target=u'v=spf1 include:unit.texts.'
u'rerirect ~all',
ttl=1000),
call(u'/domain/zone/unit.tests/record',
fieldType=u'A',
subDomain='sub', target=u'1.2.3.4', ttl=200),
call(u'/domain/zone/unit.tests/record',
fieldType=u'NAPTR', subDomain='naptr',
target=u'10 100 "S" "SIP+D2U" "!^.*$!sip:'
u'info@bar'
u'.example.com!" .',
ttl=500),
call(u'/domain/zone/unit.tests/refresh')]
post_mock.assert_has_calls(wanted_calls)
# Get for delete calls
get_mock.assert_has_calls(
[call(u'/domain/zone/unit.tests/record',
fieldType=u'A', subDomain=u''),
call(u'/domain/zone/unit.tests/record',
fieldType=u'A', subDomain='fake')]
)
# 2 delete calls, one for update + one for delete
delete_mock.assert_has_calls(
[call(u'/domain/zone/unit.tests/record/100'),
call(u'/domain/zone/unit.tests/record/101')])

View File

@@ -1490,3 +1490,63 @@ class TestRecordValidation(TestCase):
'value': 'this has some; semi-colons\; in it',
})
self.assertEquals(['unescaped ;'], ctx.exception.reasons)
def test_TXT_long_value_chunking(self):
expected = '"Lorem ipsum dolor sit amet, consectetur adipiscing ' \
'elit, sed do eiusmod tempor incididunt ut labore et dolore ' \
'magna aliqua. Ut enim ad minim veniam, quis nostrud ' \
'exercitation ullamco laboris nisi ut aliquip ex ea commodo ' \
'consequat. Duis aute irure dolor i" "n reprehenderit in ' \
'voluptate velit esse cillum dolore eu fugiat nulla pariatur. ' \
'Excepteur sint occaecat cupidatat non proident, sunt in culpa ' \
'qui officia deserunt mollit anim id est laborum."'
long_value = 'Lorem ipsum dolor sit amet, consectetur adipiscing ' \
'elit, sed do eiusmod tempor incididunt ut labore et dolore ' \
'magna aliqua. Ut enim ad minim veniam, quis nostrud ' \
'exercitation ullamco laboris nisi ut aliquip ex ea commodo ' \
'consequat. Duis aute irure dolor in reprehenderit in ' \
'voluptate velit esse cillum dolore eu fugiat nulla ' \
'pariatur. Excepteur sint occaecat cupidatat non proident, ' \
'sunt in culpa qui officia deserunt mollit anim id est ' \
'laborum.'
# Single string
single = Record.new(self.zone, '', {
'type': 'TXT',
'ttl': 600,
'values': [
'hello world',
long_value,
'this has some\; semi-colons\; in it',
]
})
self.assertEquals(3, len(single.values))
self.assertEquals(3, len(single.chunked_values))
# Note we are checking that this normalizes the chunking, not that we
# get out what we put in.
self.assertEquals(expected, single.chunked_values[0])
long_split_value = '"Lorem ipsum dolor sit amet, consectetur ' \
'adipiscing elit, sed do eiusmod tempor incididunt ut ' \
'labore et dolore magna aliqua. Ut enim ad minim veniam, ' \
'quis nostrud exercitation ullamco laboris nisi ut aliquip ' \
'ex" " ea commodo consequat. Duis aute irure dolor in ' \
'reprehenderit in voluptate velit esse cillum dolore eu ' \
'fugiat nulla pariatur. Excepteur sint occaecat cupidatat ' \
'non proident, sunt in culpa qui officia deserunt mollit ' \
'anim id est laborum."'
# Chunked
chunked = Record.new(self.zone, '', {
'type': 'TXT',
'ttl': 600,
'values': [
'"hello world"',
long_split_value,
'"this has some\; semi-colons\; in it"',
]
})
self.assertEquals(expected, chunked.chunked_values[0])
# should be single values, no quoting
self.assertEquals(single.values, chunked.values)
# should be chunked values, with quoting
self.assertEquals(single.chunked_values, chunked.chunked_values)