1
0
mirror of https://github.com/github/octodns.git synced 2024-05-11 05:55:00 +00:00
Files
github-octodns/tests/test_octodns_provider_googlecloud.py
2019-10-05 08:57:14 -07:00

464 lines
15 KiB
Python

#
#
#
from __future__ import absolute_import, division, print_function, \
unicode_literals
from octodns.record import Create, Delete, Update, Record
from octodns.provider.googlecloud import GoogleCloudProvider
from octodns.zone import Zone
from octodns.provider.base import Plan, BaseProvider
from unittest import TestCase
from mock import Mock, patch, PropertyMock
zone = Zone(name='unit.tests.', sub_zones=[])
octo_records = []
octo_records.append(Record.new(zone, '', {
'ttl': 0,
'type': 'A',
'values': ['1.2.3.4', '10.10.10.10']}))
octo_records.append(Record.new(zone, 'a', {
'ttl': 1,
'type': 'A',
'values': ['1.2.3.4', '1.1.1.1']}))
octo_records.append(Record.new(zone, 'aa', {
'ttl': 9001,
'type': 'A',
'values': ['1.2.4.3']}))
octo_records.append(Record.new(zone, 'aaa', {
'ttl': 2,
'type': 'A',
'values': ['1.1.1.3']}))
octo_records.append(Record.new(zone, 'cname', {
'ttl': 3,
'type': 'CNAME',
'value': 'a.unit.tests.'}))
octo_records.append(Record.new(zone, 'mx1', {
'ttl': 3,
'type': 'MX',
'values': [{
'priority': 10,
'value': 'mx1.unit.tests.',
}, {
'priority': 20,
'value': 'mx2.unit.tests.',
}]}))
octo_records.append(Record.new(zone, 'mx2', {
'ttl': 3,
'type': 'MX',
'values': [{
'priority': 10,
'value': 'mx1.unit.tests.',
}]}))
octo_records.append(Record.new(zone, '', {
'ttl': 4,
'type': 'NS',
'values': ['ns1.unit.tests.', 'ns2.unit.tests.']}))
octo_records.append(Record.new(zone, 'foo', {
'ttl': 5,
'type': 'NS',
'value': 'ns1.unit.tests.'}))
octo_records.append(Record.new(zone, '_srv._tcp', {
'ttl': 6,
'type': 'SRV',
'values': [{
'priority': 10,
'weight': 20,
'port': 30,
'target': 'foo-1.unit.tests.',
}, {
'priority': 12,
'weight': 30,
'port': 30,
'target': 'foo-2.unit.tests.',
}]}))
octo_records.append(Record.new(zone, '_srv2._tcp', {
'ttl': 7,
'type': 'SRV',
'values': [{
'priority': 12,
'weight': 17,
'port': 1,
'target': 'srvfoo.unit.tests.',
}]}))
octo_records.append(Record.new(zone, 'txt1', {
'ttl': 8,
'type': 'TXT',
'value': 'txt singleton test'}))
octo_records.append(Record.new(zone, 'txt2', {
'ttl': 9,
'type': 'TXT',
'values': ['txt multiple test', 'txt multiple test 2']}))
octo_records.append(Record.new(zone, 'naptr', {
'ttl': 9,
'type': 'NAPTR',
'values': [{
'order': 100,
'preference': 10,
'flags': 'S',
'service': 'SIP+D2U',
'regexp': "!^.*$!sip:customer-service@unit.tests!",
'replacement': '_sip._udp.unit.tests.'
}]}))
octo_records.append(Record.new(zone, 'caa', {
'ttl': 9,
'type': 'CAA',
'value': {
'flags': 0,
'tag': 'issue',
'value': 'ca.unit.tests',
}}))
for record in octo_records:
zone.add_record(record)
# This is the format which the google API likes.
resource_record_sets = [
('unit.tests.', u'A', 0, [u'1.2.3.4', u'10.10.10.10']),
(u'a.unit.tests.', u'A', 1, [u'1.1.1.1', u'1.2.3.4']),
(u'aa.unit.tests.', u'A', 9001, [u'1.2.4.3']),
(u'aaa.unit.tests.', u'A', 2, [u'1.1.1.3']),
(u'cname.unit.tests.', u'CNAME', 3, [u'a.unit.tests.']),
(u'mx1.unit.tests.', u'MX', 3,
[u'10 mx1.unit.tests.', u'20 mx2.unit.tests.']),
(u'mx2.unit.tests.', u'MX', 3, [u'10 mx1.unit.tests.']),
('unit.tests.', u'NS', 4, [u'ns1.unit.tests.', u'ns2.unit.tests.']),
(u'foo.unit.tests.', u'NS', 5, [u'ns1.unit.tests.']),
(u'_srv._tcp.unit.tests.', u'SRV', 6,
[u'10 20 30 foo-1.unit.tests.', u'12 30 30 foo-2.unit.tests.']),
(u'_srv2._tcp.unit.tests.', u'SRV', 7, [u'12 17 1 srvfoo.unit.tests.']),
(u'txt1.unit.tests.', u'TXT', 8, [u'txt singleton test']),
(u'txt2.unit.tests.', u'TXT', 9,
[u'txt multiple test', u'txt multiple test 2']),
(u'naptr.unit.tests.', u'NAPTR', 9, [
u'100 10 "S" "SIP+D2U" "!^.*$!sip:customer-service@unit.tests!"'
u' _sip._udp.unit.tests.']),
(u'caa.unit.tests.', u'CAA', 9, [u'0 issue ca.unit.tests'])
]
class DummyResourceRecordSet:
def __init__(self, record_name, record_type, ttl, rrdatas):
self.name = record_name
self.record_type = record_type
self.ttl = ttl
self.rrdatas = rrdatas
def __eq__(self, other):
try:
return self.name == other.name \
and self.record_type == other.record_type \
and self.ttl == other.ttl \
and sorted(self.rrdatas) == sorted(other.rrdatas)
except:
return False
def __repr__(self):
return "{} {} {} {!s}"\
.format(self.name, self.record_type, self.ttl, self.rrdatas)
def __hash__(self):
return hash(repr(self))
class DummyGoogleCloudZone:
def __init__(self, dns_name, name=""):
self.dns_name = dns_name
self.name = name
def resource_record_set(self, *args):
return DummyResourceRecordSet(*args)
def list_resource_record_sets(self, *args):
pass
def create(self, *args, **kwargs):
pass
class DummyIterator:
"""Returns a mock DummyIterator object to use in testing.
This is because API calls for google cloud DNS, if paged, contains a
"next_page_token", which can be used to grab a subsequent
iterator with more results.
:type return: DummyIterator
"""
def __init__(self, list_of_stuff, page_token=None):
self.iterable = iter(list_of_stuff)
self.next_page_token = page_token
def __iter__(self):
return self
# python2
def next(self):
return next(self.iterable)
# python3
def __next__(self):
return next(self.iterable)
class TestGoogleCloudProvider(TestCase):
@patch('octodns.provider.googlecloud.dns')
def _get_provider(*args):
'''Returns a mock GoogleCloudProvider object to use in testing.
:type return: GoogleCloudProvider
'''
return GoogleCloudProvider(id=1, project="mock")
@patch('octodns.provider.googlecloud.dns')
def test___init__(self, *_):
self.assertIsInstance(GoogleCloudProvider(id=1,
credentials_file="test",
project="unit test"),
BaseProvider)
self.assertIsInstance(GoogleCloudProvider(id=1),
BaseProvider)
@patch('octodns.provider.googlecloud.time.sleep')
@patch('octodns.provider.googlecloud.dns')
def test__apply(self, *_):
class DummyDesired:
def __init__(self, name, changes):
self.name = name
self.changes = changes
apply_z = Zone("unit.tests.", [])
create_r = Record.new(apply_z, '', {
'ttl': 0,
'type': 'A',
'values': ['1.2.3.4', '10.10.10.10']})
delete_r = Record.new(apply_z, 'a', {
'ttl': 1,
'type': 'A',
'values': ['1.2.3.4', '1.1.1.1']})
update_existing_r = Record.new(apply_z, 'aa', {
'ttl': 9001,
'type': 'A',
'values': ['1.2.4.3']})
update_new_r = Record.new(apply_z, 'aa', {
'ttl': 666,
'type': 'A',
'values': ['1.4.3.2']})
gcloud_zone_mock = DummyGoogleCloudZone("unit.tests.", "unit-tests")
status_mock = Mock()
return_values_for_status = iter(
["pending"] * 11 + ['done', 'done'])
type(status_mock).status = PropertyMock(
side_effect=lambda: next(return_values_for_status))
gcloud_zone_mock.changes = Mock(return_value=status_mock)
provider = self._get_provider()
provider.gcloud_client = Mock()
provider._gcloud_zones = {"unit.tests.": gcloud_zone_mock}
desired = Mock()
desired.name = "unit.tests."
changes = []
changes.append(Create(create_r))
changes.append(Delete(delete_r))
changes.append(Update(existing=update_existing_r, new=update_new_r))
provider.apply(Plan(
existing=[update_existing_r, delete_r],
desired=desired,
changes=changes,
exists=True
))
calls_mock = gcloud_zone_mock.changes.return_value
mocked_calls = []
for mock_call in calls_mock.add_record_set.mock_calls:
mocked_calls.append(mock_call[1][0])
self.assertEqual(mocked_calls, [
DummyResourceRecordSet(
'unit.tests.', 'A', 0, ['1.2.3.4', '10.10.10.10']),
DummyResourceRecordSet(
'aa.unit.tests.', 'A', 666, ['1.4.3.2'])
])
mocked_calls2 = []
for mock_call in calls_mock.delete_record_set.mock_calls:
mocked_calls2.append(mock_call[1][0])
self.assertEqual(mocked_calls2, [
DummyResourceRecordSet(
'a.unit.tests.', 'A', 1, ['1.2.3.4', '1.1.1.1']),
DummyResourceRecordSet(
'aa.unit.tests.', 'A', 9001, ['1.2.4.3'])
])
type(status_mock).status = "pending"
with self.assertRaises(RuntimeError):
provider.apply(Plan(
existing=[update_existing_r, delete_r],
desired=desired,
changes=changes,
exists=True
))
unsupported_change = Mock()
unsupported_change.__len__ = Mock(return_value=1)
type_mock = Mock()
type_mock._type = "A"
unsupported_change.record = type_mock
mock_plan = Mock()
type(mock_plan).desired = PropertyMock(return_value=DummyDesired(
"dummy name", []))
type(mock_plan).changes = [unsupported_change]
with self.assertRaises(RuntimeError):
provider.apply(mock_plan)
def test__get_gcloud_client(self):
provider = self._get_provider()
self.assertIsInstance(provider, GoogleCloudProvider)
@patch('octodns.provider.googlecloud.dns')
def test_populate(self, _):
def _get_mock_zones(page_token=None):
if not page_token:
return DummyIterator([
DummyGoogleCloudZone('example.com.'),
], page_token="MOCK_PAGE_TOKEN")
elif page_token == "MOCK_PAGE_TOKEN":
return DummyIterator([
DummyGoogleCloudZone('example2.com.'),
], page_token="MOCK_PAGE_TOKEN2")
return DummyIterator([
google_cloud_zone
])
def _get_mock_record_sets(page_token=None):
if not page_token:
return DummyIterator(
[DummyResourceRecordSet(*v) for v in
resource_record_sets[:3]], page_token="MOCK_PAGE_TOKEN")
elif page_token == "MOCK_PAGE_TOKEN":
return DummyIterator(
[DummyResourceRecordSet(*v) for v in
resource_record_sets[3:5]], page_token="MOCK_PAGE_TOKEN2")
return DummyIterator(
[DummyResourceRecordSet(*v) for v in resource_record_sets[5:]])
google_cloud_zone = DummyGoogleCloudZone('unit.tests.')
provider = self._get_provider()
provider.gcloud_client.list_zones = Mock(side_effect=_get_mock_zones)
google_cloud_zone.list_resource_record_sets = Mock(
side_effect=_get_mock_record_sets)
self.assertEqual(provider.gcloud_zones.get("unit.tests.").dns_name,
"unit.tests.")
test_zone = Zone('unit.tests.', [])
exists = provider.populate(test_zone)
self.assertTrue(exists)
# test_zone gets fed the same records as zone does, except it's in
# the format returned by google API, so after populate they should look
# exactly the same.
self.assertEqual(test_zone.records, zone.records)
test_zone2 = Zone('nonexistent.zone.', [])
exists = provider.populate(test_zone2, False, False)
self.assertFalse(exists)
self.assertEqual(len(test_zone2.records), 0,
msg="Zone should not get records from wrong domain")
provider.SUPPORTS = set()
test_zone3 = Zone('unit.tests.', [])
provider.populate(test_zone3)
self.assertEqual(len(test_zone3.records), 0)
@patch('octodns.provider.googlecloud.dns')
def test_populate_corner_cases(self, _):
provider = self._get_provider()
test_zone = Zone('unit.tests.', [])
not_same_fqdn = DummyResourceRecordSet(
'unit.tests.gr', u'A', 0, [u'1.2.3.4']),
provider._get_gcloud_records = Mock(
side_effect=[not_same_fqdn])
provider._gcloud_zones = {
"unit.tests.": DummyGoogleCloudZone("unit.tests.", "unit-tests")}
provider.populate(test_zone)
self.assertEqual(len(test_zone.records), 1)
self.assertEqual(test_zone.records.pop().fqdn,
u'unit.tests.gr.unit.tests.')
def test__get_gcloud_zone(self):
provider = self._get_provider()
provider.gcloud_client = Mock()
provider.gcloud_client.list_zones = Mock(
return_value=DummyIterator([]))
self.assertIsNone(provider.gcloud_zones.get("nonexistent.zone"),
msg="Check that nonexistent zones return None when"
"there's no create=True flag")
def test__get_rrsets(self):
provider = self._get_provider()
dummy_gcloud_zone = DummyGoogleCloudZone("unit.tests")
for octo_record in octo_records:
_rrset_func = getattr(
provider, '_rrset_for_{}'.format(octo_record._type))
self.assertEqual(
_rrset_func(dummy_gcloud_zone, octo_record).record_type,
octo_record._type
)
def test__create_zone(self):
provider = self._get_provider()
provider.gcloud_client = Mock()
provider.gcloud_client.list_zones = Mock(
return_value=DummyIterator([]))
mock_zone = provider._create_gcloud_zone("nonexistent.zone.mock")
mock_zone.create.assert_called()
provider.gcloud_client.zone.assert_called()
def test__create_zone_ip6_arpa(self):
def _create_dummy_zone(name, dns_name):
return DummyGoogleCloudZone(name=name, dns_name=dns_name)
provider = self._get_provider()
provider.gcloud_client = Mock()
provider.gcloud_client.zone = Mock(side_effect=_create_dummy_zone)
mock_zone = \
provider._create_gcloud_zone('0.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa')
self.assertRegexpMatches(mock_zone.name, '^[a-z][a-z0-9-]*[a-z0-9]$')
self.assertEqual(len(mock_zone.name), 63)
def test_semicolon_fixup(self):
provider = self._get_provider()
self.assertEquals({
'values': ['abcd\\; ef\\;g', 'hij\\; klm\\;n']
}, provider._data_for_TXT(
DummyResourceRecordSet(
'unit.tests.', 'TXT', 0, ['abcd; ef;g', 'hij\\; klm\\;n'])
))