mirror of
				https://github.com/github/octodns.git
				synced 2024-05-11 05:55:00 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			459 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			459 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #
 | |
| #
 | |
| #
 | |
| 
 | |
| from __future__ import absolute_import, division, print_function, \
 | |
|     unicode_literals
 | |
| 
 | |
| from octodns.record import Create, Delete, Update, Record
 | |
| from octodns.provider.googlecloud import GoogleCloudProvider
 | |
| 
 | |
| from octodns.zone import Zone
 | |
| from octodns.provider.base import Plan, BaseProvider
 | |
| 
 | |
| from unittest import TestCase
 | |
| from mock import Mock, patch, PropertyMock
 | |
| 
 | |
| zone = Zone(name='unit.tests.', sub_zones=[])
 | |
| octo_records = []
 | |
| octo_records.append(Record.new(zone, '', {
 | |
|     'ttl': 0,
 | |
|     'type': 'A',
 | |
|     'values': ['1.2.3.4', '10.10.10.10']}))
 | |
| octo_records.append(Record.new(zone, 'a', {
 | |
|     'ttl': 1,
 | |
|     'type': 'A',
 | |
|     'values': ['1.2.3.4', '1.1.1.1']}))
 | |
| octo_records.append(Record.new(zone, 'aa', {
 | |
|     'ttl': 9001,
 | |
|     'type': 'A',
 | |
|     'values': ['1.2.4.3']}))
 | |
| octo_records.append(Record.new(zone, 'aaa', {
 | |
|     'ttl': 2,
 | |
|     'type': 'A',
 | |
|     'values': ['1.1.1.3']}))
 | |
| octo_records.append(Record.new(zone, 'cname', {
 | |
|     'ttl': 3,
 | |
|     'type': 'CNAME',
 | |
|     'value': 'a.unit.tests.'}))
 | |
| octo_records.append(Record.new(zone, 'mx1', {
 | |
|     'ttl': 3,
 | |
|     'type': 'MX',
 | |
|     'values': [{
 | |
|         'priority': 10,
 | |
|         'value': 'mx1.unit.tests.',
 | |
|     }, {
 | |
|         'priority': 20,
 | |
|         'value': 'mx2.unit.tests.',
 | |
|     }]}))
 | |
| octo_records.append(Record.new(zone, 'mx2', {
 | |
|     'ttl': 3,
 | |
|     'type': 'MX',
 | |
|     'values': [{
 | |
|         'priority': 10,
 | |
|         'value': 'mx1.unit.tests.',
 | |
|     }]}))
 | |
| octo_records.append(Record.new(zone, '', {
 | |
|     'ttl': 4,
 | |
|     'type': 'NS',
 | |
|     'values': ['ns1.unit.tests.', 'ns2.unit.tests.']}))
 | |
| octo_records.append(Record.new(zone, 'foo', {
 | |
|     'ttl': 5,
 | |
|     'type': 'NS',
 | |
|     'value': 'ns1.unit.tests.'}))
 | |
| octo_records.append(Record.new(zone, '_srv._tcp', {
 | |
|     'ttl': 6,
 | |
|     'type': 'SRV',
 | |
|     'values': [{
 | |
|         'priority': 10,
 | |
|         'weight': 20,
 | |
|         'port': 30,
 | |
|         'target': 'foo-1.unit.tests.',
 | |
|     }, {
 | |
|         'priority': 12,
 | |
|         'weight': 30,
 | |
|         'port': 30,
 | |
|         'target': 'foo-2.unit.tests.',
 | |
|     }]}))
 | |
| octo_records.append(Record.new(zone, '_srv2._tcp', {
 | |
|     'ttl': 7,
 | |
|     'type': 'SRV',
 | |
|     'values': [{
 | |
|         'priority': 12,
 | |
|         'weight': 17,
 | |
|         'port': 1,
 | |
|         'target': 'srvfoo.unit.tests.',
 | |
|     }]}))
 | |
| octo_records.append(Record.new(zone, 'txt1', {
 | |
|     'ttl': 8,
 | |
|     'type': 'TXT',
 | |
|     'value': 'txt singleton test'}))
 | |
| octo_records.append(Record.new(zone, 'txt2', {
 | |
|     'ttl': 9,
 | |
|     'type': 'TXT',
 | |
|     'values': ['txt multiple test', 'txt multiple test 2']}))
 | |
| octo_records.append(Record.new(zone, 'naptr', {
 | |
|     'ttl': 9,
 | |
|     'type': 'NAPTR',
 | |
|     'values': [{
 | |
|         'order': 100,
 | |
|         'preference': 10,
 | |
|         'flags': 'S',
 | |
|         'service': 'SIP+D2U',
 | |
|         'regexp': "!^.*$!sip:customer-service@unit.tests!",
 | |
|         'replacement': '_sip._udp.unit.tests.'
 | |
|     }]}))
 | |
| octo_records.append(Record.new(zone, 'caa', {
 | |
|     'ttl': 9,
 | |
|     'type': 'CAA',
 | |
|     'value': {
 | |
|         'flags': 0,
 | |
|         'tag': 'issue',
 | |
|         'value': 'ca.unit.tests',
 | |
|     }}))
 | |
| for record in octo_records:
 | |
|     zone.add_record(record)
 | |
| 
 | |
| # This is the format which the google API likes.
 | |
| resource_record_sets = [
 | |
|     ('unit.tests.', u'A', 0, [u'1.2.3.4', u'10.10.10.10']),
 | |
|     (u'a.unit.tests.', u'A', 1, [u'1.1.1.1', u'1.2.3.4']),
 | |
|     (u'aa.unit.tests.', u'A', 9001, [u'1.2.4.3']),
 | |
|     (u'aaa.unit.tests.', u'A', 2, [u'1.1.1.3']),
 | |
|     (u'cname.unit.tests.', u'CNAME', 3, [u'a.unit.tests.']),
 | |
|     (u'mx1.unit.tests.', u'MX', 3,
 | |
|      [u'10 mx1.unit.tests.', u'20 mx2.unit.tests.']),
 | |
|     (u'mx2.unit.tests.', u'MX', 3, [u'10 mx1.unit.tests.']),
 | |
|     ('unit.tests.', u'NS', 4, [u'ns1.unit.tests.', u'ns2.unit.tests.']),
 | |
|     (u'foo.unit.tests.', u'NS', 5, [u'ns1.unit.tests.']),
 | |
|     (u'_srv._tcp.unit.tests.', u'SRV', 6,
 | |
|      [u'10 20 30 foo-1.unit.tests.', u'12 30 30 foo-2.unit.tests.']),
 | |
|     (u'_srv2._tcp.unit.tests.', u'SRV', 7, [u'12 17 1 srvfoo.unit.tests.']),
 | |
|     (u'txt1.unit.tests.', u'TXT', 8, [u'txt singleton test']),
 | |
|     (u'txt2.unit.tests.', u'TXT', 9,
 | |
|      [u'txt multiple test', u'txt multiple test 2']),
 | |
|     (u'naptr.unit.tests.', u'NAPTR', 9, [
 | |
|         u'100 10 "S" "SIP+D2U" "!^.*$!sip:customer-service@unit.tests!"'
 | |
|         u' _sip._udp.unit.tests.']),
 | |
|     (u'caa.unit.tests.', u'CAA', 9, [u'0 issue ca.unit.tests'])
 | |
| ]
 | |
| 
 | |
| 
 | |
| class DummyResourceRecordSet:
 | |
|     def __init__(self, record_name, record_type, ttl, rrdatas):
 | |
|         self.name = record_name
 | |
|         self.record_type = record_type
 | |
|         self.ttl = ttl
 | |
|         self.rrdatas = rrdatas
 | |
| 
 | |
|     def __eq__(self, other):
 | |
|         try:
 | |
|             return self.name == other.name \
 | |
|                 and self.record_type == other.record_type \
 | |
|                 and self.ttl == other.ttl  \
 | |
|                 and sorted(self.rrdatas) == sorted(other.rrdatas)
 | |
|         except:
 | |
|             return False
 | |
| 
 | |
|     def __repr__(self):
 | |
|         return "{} {} {} {!s}"\
 | |
|             .format(self.name, self.record_type, self.ttl, self.rrdatas)
 | |
| 
 | |
|     def __hash__(self):
 | |
|         return hash(repr(self))
 | |
| 
 | |
| 
 | |
| class DummyGoogleCloudZone:
 | |
|     def __init__(self, dns_name, name=""):
 | |
|         self.dns_name = dns_name
 | |
|         self.name = name
 | |
| 
 | |
|     def resource_record_set(self, *args):
 | |
|         return DummyResourceRecordSet(*args)
 | |
| 
 | |
|     def list_resource_record_sets(self, *args):
 | |
|         pass
 | |
| 
 | |
|     def create(self, *args, **kwargs):
 | |
|         pass
 | |
| 
 | |
| 
 | |
| class DummyIterator:
 | |
|     """Returns a mock DummyIterator object to use in testing.
 | |
|     This is because API calls for google cloud DNS, if paged, contains a
 | |
|     "next_page_token", which can be used to grab a subsequent
 | |
|     iterator with more results.
 | |
| 
 | |
|         :type return: DummyIterator
 | |
|     """
 | |
|     def __init__(self,  list_of_stuff, page_token=None):
 | |
|         self.iterable = iter(list_of_stuff)
 | |
|         self.next_page_token = page_token
 | |
| 
 | |
|     def __iter__(self):
 | |
|         return self
 | |
| 
 | |
|     def next(self):
 | |
|         return self.iterable.next()
 | |
| 
 | |
| 
 | |
| class TestGoogleCloudProvider(TestCase):
 | |
|     @patch('octodns.provider.googlecloud.dns')
 | |
|     def _get_provider(*args):
 | |
|         '''Returns a mock GoogleCloudProvider object to use in testing.
 | |
| 
 | |
|             :type return: GoogleCloudProvider
 | |
|         '''
 | |
|         return GoogleCloudProvider(id=1, project="mock")
 | |
| 
 | |
|     @patch('octodns.provider.googlecloud.dns')
 | |
|     def test___init__(self, *_):
 | |
|         self.assertIsInstance(GoogleCloudProvider(id=1,
 | |
|                                                   credentials_file="test",
 | |
|                                                   project="unit test"),
 | |
|                               BaseProvider)
 | |
| 
 | |
|         self.assertIsInstance(GoogleCloudProvider(id=1),
 | |
|                               BaseProvider)
 | |
| 
 | |
|     @patch('octodns.provider.googlecloud.time.sleep')
 | |
|     @patch('octodns.provider.googlecloud.dns')
 | |
|     def test__apply(self, *_):
 | |
|         class DummyDesired:
 | |
|             def __init__(self, name, changes):
 | |
|                 self.name = name
 | |
|                 self.changes = changes
 | |
| 
 | |
|         apply_z = Zone("unit.tests.", [])
 | |
|         create_r = Record.new(apply_z, '', {
 | |
|             'ttl': 0,
 | |
|             'type': 'A',
 | |
|             'values': ['1.2.3.4', '10.10.10.10']})
 | |
|         delete_r = Record.new(apply_z, 'a', {
 | |
|             'ttl': 1,
 | |
|             'type': 'A',
 | |
|             'values': ['1.2.3.4', '1.1.1.1']})
 | |
|         update_existing_r = Record.new(apply_z, 'aa', {
 | |
|             'ttl': 9001,
 | |
|             'type': 'A',
 | |
|             'values': ['1.2.4.3']})
 | |
|         update_new_r = Record.new(apply_z, 'aa', {
 | |
|             'ttl': 666,
 | |
|             'type': 'A',
 | |
|             'values': ['1.4.3.2']})
 | |
| 
 | |
|         gcloud_zone_mock = DummyGoogleCloudZone("unit.tests.", "unit-tests")
 | |
|         status_mock = Mock()
 | |
|         return_values_for_status = iter(
 | |
|             ["pending"] * 11 + ['done', 'done'])
 | |
|         type(status_mock).status = PropertyMock(
 | |
|             side_effect=return_values_for_status.next)
 | |
|         gcloud_zone_mock.changes = Mock(return_value=status_mock)
 | |
| 
 | |
|         provider = self._get_provider()
 | |
|         provider.gcloud_client = Mock()
 | |
|         provider._gcloud_zones = {"unit.tests.": gcloud_zone_mock}
 | |
|         desired = Mock()
 | |
|         desired.name = "unit.tests."
 | |
|         changes = []
 | |
|         changes.append(Create(create_r))
 | |
|         changes.append(Delete(delete_r))
 | |
|         changes.append(Update(existing=update_existing_r, new=update_new_r))
 | |
| 
 | |
|         provider.apply(Plan(
 | |
|             existing=[update_existing_r, delete_r],
 | |
|             desired=desired,
 | |
|             changes=changes,
 | |
|             exists=True
 | |
|         ))
 | |
| 
 | |
|         calls_mock = gcloud_zone_mock.changes.return_value
 | |
|         mocked_calls = []
 | |
|         for mock_call in calls_mock.add_record_set.mock_calls:
 | |
|             mocked_calls.append(mock_call[1][0])
 | |
| 
 | |
|         self.assertEqual(mocked_calls, [
 | |
|             DummyResourceRecordSet(
 | |
|                 'unit.tests.', 'A', 0, ['1.2.3.4', '10.10.10.10']),
 | |
|             DummyResourceRecordSet(
 | |
|                 'aa.unit.tests.', 'A', 666, ['1.4.3.2'])
 | |
|         ])
 | |
| 
 | |
|         mocked_calls2 = []
 | |
|         for mock_call in calls_mock.delete_record_set.mock_calls:
 | |
|             mocked_calls2.append(mock_call[1][0])
 | |
| 
 | |
|         self.assertEqual(mocked_calls2, [
 | |
|             DummyResourceRecordSet(
 | |
|                 'a.unit.tests.', 'A', 1, ['1.2.3.4', '1.1.1.1']),
 | |
|             DummyResourceRecordSet(
 | |
|                 'aa.unit.tests.', 'A', 9001, ['1.2.4.3'])
 | |
|         ])
 | |
| 
 | |
|         type(status_mock).status = "pending"
 | |
| 
 | |
|         with self.assertRaises(RuntimeError):
 | |
|             provider.apply(Plan(
 | |
|                 existing=[update_existing_r, delete_r],
 | |
|                 desired=desired,
 | |
|                 changes=changes,
 | |
|                 exists=True
 | |
|             ))
 | |
| 
 | |
|         unsupported_change = Mock()
 | |
|         unsupported_change.__len__ = Mock(return_value=1)
 | |
|         type_mock = Mock()
 | |
|         type_mock._type = "A"
 | |
|         unsupported_change.record = type_mock
 | |
| 
 | |
|         mock_plan = Mock()
 | |
|         type(mock_plan).desired = PropertyMock(return_value=DummyDesired(
 | |
|             "dummy name", []))
 | |
|         type(mock_plan).changes = [unsupported_change]
 | |
| 
 | |
|         with self.assertRaises(RuntimeError):
 | |
|             provider.apply(mock_plan)
 | |
| 
 | |
|     def test__get_gcloud_client(self):
 | |
|         provider = self._get_provider()
 | |
| 
 | |
|         self.assertIsInstance(provider, GoogleCloudProvider)
 | |
| 
 | |
|     @patch('octodns.provider.googlecloud.dns')
 | |
|     def test_populate(self, _):
 | |
|         def _get_mock_zones(page_token=None):
 | |
|             if not page_token:
 | |
|                 return DummyIterator([
 | |
|                     DummyGoogleCloudZone('example.com.'),
 | |
|                 ], page_token="MOCK_PAGE_TOKEN")
 | |
|             elif page_token == "MOCK_PAGE_TOKEN":
 | |
|                 return DummyIterator([
 | |
|                     DummyGoogleCloudZone('example2.com.'),
 | |
|                 ], page_token="MOCK_PAGE_TOKEN2")
 | |
| 
 | |
|             return DummyIterator([
 | |
|                 google_cloud_zone
 | |
|             ])
 | |
| 
 | |
|         def _get_mock_record_sets(page_token=None):
 | |
|             if not page_token:
 | |
|                 return DummyIterator(
 | |
|                     [DummyResourceRecordSet(*v) for v in
 | |
|                      resource_record_sets[:3]], page_token="MOCK_PAGE_TOKEN")
 | |
|             elif page_token == "MOCK_PAGE_TOKEN":
 | |
| 
 | |
|                 return DummyIterator(
 | |
|                     [DummyResourceRecordSet(*v) for v in
 | |
|                      resource_record_sets[3:5]], page_token="MOCK_PAGE_TOKEN2")
 | |
|             return DummyIterator(
 | |
|                 [DummyResourceRecordSet(*v) for v in resource_record_sets[5:]])
 | |
| 
 | |
|         google_cloud_zone = DummyGoogleCloudZone('unit.tests.')
 | |
| 
 | |
|         provider = self._get_provider()
 | |
|         provider.gcloud_client.list_zones = Mock(side_effect=_get_mock_zones)
 | |
|         google_cloud_zone.list_resource_record_sets = Mock(
 | |
|             side_effect=_get_mock_record_sets)
 | |
| 
 | |
|         self.assertEqual(provider.gcloud_zones.get("unit.tests.").dns_name,
 | |
|                          "unit.tests.")
 | |
| 
 | |
|         test_zone = Zone('unit.tests.', [])
 | |
|         exists = provider.populate(test_zone)
 | |
|         self.assertTrue(exists)
 | |
| 
 | |
|         # test_zone gets fed the same records as zone does, except it's in
 | |
|         # the format returned by google API, so after populate they should look
 | |
|         # exactly the same.
 | |
|         self.assertEqual(test_zone.records, zone.records)
 | |
| 
 | |
|         test_zone2 = Zone('nonexistent.zone.', [])
 | |
|         exists = provider.populate(test_zone2, False, False)
 | |
|         self.assertFalse(exists)
 | |
| 
 | |
|         self.assertEqual(len(test_zone2.records), 0,
 | |
|                          msg="Zone should not get records from wrong domain")
 | |
| 
 | |
|         provider.SUPPORTS = set()
 | |
|         test_zone3 = Zone('unit.tests.', [])
 | |
|         provider.populate(test_zone3)
 | |
|         self.assertEqual(len(test_zone3.records), 0)
 | |
| 
 | |
|     @patch('octodns.provider.googlecloud.dns')
 | |
|     def test_populate_corner_cases(self, _):
 | |
|         provider = self._get_provider()
 | |
|         test_zone = Zone('unit.tests.', [])
 | |
|         not_same_fqdn = DummyResourceRecordSet(
 | |
|             'unit.tests.gr', u'A', 0, [u'1.2.3.4']),
 | |
| 
 | |
|         provider._get_gcloud_records = Mock(
 | |
|             side_effect=[not_same_fqdn])
 | |
|         provider._gcloud_zones = {
 | |
|             "unit.tests.": DummyGoogleCloudZone("unit.tests.", "unit-tests")}
 | |
| 
 | |
|         provider.populate(test_zone)
 | |
| 
 | |
|         self.assertEqual(len(test_zone.records), 1)
 | |
| 
 | |
|         self.assertEqual(test_zone.records.pop().fqdn,
 | |
|                          u'unit.tests.gr.unit.tests.')
 | |
| 
 | |
|     def test__get_gcloud_zone(self):
 | |
|         provider = self._get_provider()
 | |
| 
 | |
|         provider.gcloud_client = Mock()
 | |
|         provider.gcloud_client.list_zones = Mock(
 | |
|             return_value=DummyIterator([]))
 | |
| 
 | |
|         self.assertIsNone(provider.gcloud_zones.get("nonexistent.zone"),
 | |
|                           msg="Check that nonexistent zones return None when"
 | |
|                               "there's no create=True flag")
 | |
| 
 | |
|     def test__get_rrsets(self):
 | |
|         provider = self._get_provider()
 | |
|         dummy_gcloud_zone = DummyGoogleCloudZone("unit.tests")
 | |
|         for octo_record in octo_records:
 | |
|             _rrset_func = getattr(
 | |
|                 provider, '_rrset_for_{}'.format(octo_record._type))
 | |
|             self.assertEqual(
 | |
|                 _rrset_func(dummy_gcloud_zone, octo_record).record_type,
 | |
|                 octo_record._type
 | |
|             )
 | |
| 
 | |
|     def test__create_zone(self):
 | |
|         provider = self._get_provider()
 | |
| 
 | |
|         provider.gcloud_client = Mock()
 | |
|         provider.gcloud_client.list_zones = Mock(
 | |
|             return_value=DummyIterator([]))
 | |
| 
 | |
|         mock_zone = provider._create_gcloud_zone("nonexistent.zone.mock")
 | |
| 
 | |
|         mock_zone.create.assert_called()
 | |
|         provider.gcloud_client.zone.assert_called()
 | |
| 
 | |
|     def test__create_zone_ip6_arpa(self):
 | |
|         def _create_dummy_zone(name, dns_name):
 | |
|             return DummyGoogleCloudZone(name=name, dns_name=dns_name)
 | |
| 
 | |
|         provider = self._get_provider()
 | |
| 
 | |
|         provider.gcloud_client = Mock()
 | |
|         provider.gcloud_client.zone = Mock(side_effect=_create_dummy_zone)
 | |
| 
 | |
|         mock_zone = \
 | |
|             provider._create_gcloud_zone('0.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa')
 | |
| 
 | |
|         self.assertRegexpMatches(mock_zone.name, '^[a-z][a-z0-9-]*[a-z0-9]$')
 | |
|         self.assertEqual(len(mock_zone.name), 63)
 | |
| 
 | |
|     def test_semicolon_fixup(self):
 | |
|         provider = self._get_provider()
 | |
| 
 | |
|         self.assertEquals({
 | |
|             'values': ['abcd\\; ef\\;g', 'hij\\; klm\\;n']
 | |
|         }, provider._data_for_TXT(
 | |
|             DummyResourceRecordSet(
 | |
|                 'unit.tests.', 'TXT', 0, ['abcd; ef;g', 'hij\\; klm\\;n'])
 | |
|         ))
 |