diff --git a/octodns/provider/ns1.py b/octodns/provider/ns1.py index 8d230cf..e5cb1ed 100644 --- a/octodns/provider/ns1.py +++ b/octodns/provider/ns1.py @@ -12,10 +12,11 @@ from ns1 import NS1 from ns1.rest.errors import RateLimitException, ResourceException from pycountry_convert import country_alpha2_to_continent_code from time import sleep +from uuid import uuid4 from six import text_type -from ..record import Record +from ..record import Record, Update from .base import BaseProvider @@ -33,6 +34,9 @@ class Ns1Client(object): self._records = client.records() self._zones = client.zones() self._monitors = client.monitors() + self._notifylists = client.notifylists() + self._datasource = client.datasource() + self._datafeed = client.datafeed() def _try(self, method, *args, **kwargs): tries = self.retry_count @@ -49,35 +53,62 @@ class Ns1Client(object): sleep(period) tries -= 1 - def zones_retrieve(self, name): - return self._try(self._zones.retrieve, name) + def datafeed_create(self, sourceid, name, config): + return self._try(self._datafeed.create, sourceid, name, config) - def zones_create(self, name): - return self._try(self._zones.create, name) + def datafeed_delete(self, sourceid, feedid): + return self._try(self._datafeed.delete, sourceid, feedid) - def records_retrieve(self, zone, domain, _type): - return self._try(self._records.retrieve, zone, domain, _type) + def datafeed_list(self, sourceid): + return self._try(self._datafeed.list, sourceid) - def records_create(self, zone, domain, _type, **params): - return self._try(self._records.create, zone, domain, _type, **params) + def datasource_create(self, **body): + return self._try(self._datasource.create, **body) - def records_update(self, zone, domain, _type, **params): - return self._try(self._records.update, zone, domain, _type, **params) + def datasource_list(self): + return self._try(self._datasource.list) - def records_delete(self, zone, domain, _type): - return self._try(self._records.delete, zone, domain, _type) + def monitors_create(self, **params): + body = {} + return self._try(self._monitors.create, body, **params) + + def monitors_delete(self, jobid): + return self._try(self._monitors.delete, jobid) def monitors_list(self): return self._try(self._monitors.list) - def monitors_create(self, **params): - body = {} # TODO: not clear what this is supposed to be - return self._try(self._monitors.create, body, **params) - def monitors_update(self, job_id, **params): - body = {} # TODO: not clear what this is supposed to be + body = {} return self._try(self._monitors.update, job_id, body, **params) + def notifylists_delete(self, nlid): + return self._try(self._notifylists.delete, nlid) + + def notifylists_create(self, **body): + return self._try(self._notifylists.create, body) + + def notifylists_list(self): + return self._try(self._notifylists.list) + + def records_create(self, zone, domain, _type, **params): + return self._try(self._records.create, zone, domain, _type, **params) + + def records_delete(self, zone, domain, _type): + return self._try(self._records.delete, zone, domain, _type) + + def records_retrieve(self, zone, domain, _type): + return self._try(self._records.retrieve, zone, domain, _type) + + def records_update(self, zone, domain, _type, **params): + return self._try(self._records.update, zone, domain, _type, **params) + + def zones_create(self, name): + return self._try(self._zones.create, name) + + def zones_retrieve(self, name): + return self._try(self._zones.retrieve, name) + class Ns1Provider(BaseProvider): ''' @@ -140,7 +171,11 @@ class Ns1Provider(BaseProvider): self.log.debug('__init__: id=%s, api_key=***, retry_count=%d', id, retry_count) super(Ns1Provider, self).__init__(id, *args, **kwargs) + self._client = Ns1Client(api_key, retry_count) + self.__monitors = None + self.__datasource_id = None + self.__feeds_for_monitors = None def _encode_notes(self, data): return ' '.join(['{}:{}'.format(k, v) @@ -462,38 +497,139 @@ class Ns1Provider(BaseProvider): len(zone.records) - before, exists) return exists - def _extra_changes(self, desired, changes, **kwargs): - # TODO: check monitors to see if they need updated - return [] + def _params_for_geo_A(self, record): + # purposefully set non-geo answers to have an empty meta, + # so that we know we did this on purpose if/when troubleshooting + params = { + 'answers': [{"answer": [x], "meta": {}} for x in record.values], + 'ttl': record.ttl, + } + + has_country = False + for iso_region, target in record.geo.items(): + key = 'iso_region_code' + value = iso_region + if not has_country and \ + len(value.split('-')) > 1: # pragma: nocover + has_country = True + for answer in target.values: + params['answers'].append( + { + 'answer': [answer], + 'meta': {key: [value]}, + }, + ) + + params['filters'] = [] + if has_country: + params['filters'].append( + {"filter": "shuffle", "config": {}} + ) + params['filters'].append( + {"filter": "geotarget_country", "config": {}} + ) + params['filters'].append( + {"filter": "select_first_n", + "config": {"N": 1}} + ) + + return params, None + + @property + def _monitors(self): + # TODO: cache in sync, here and for others + if self.__monitors is None: + self.__monitors = {m['id']: m + for m in self._client.monitors_list()} + return self.__monitors def _monitors_for(self, record): - # TODO: should this just be a global cache by fqdn, type, and value? - expected_host = record.fqdn[:-1] - expected_type = record._type - monitors = {} - # TODO: cache here or in Ns1Client - for monitor in self._client.monitors_list(): - data = self._parse_notes(monitor['notes']) - if expected_host == data['host'] or \ - expected_type == data['type']: - # This monitor does not belong to this record - config = monitor['config'] - value = config['host'] - monitors[value] = monitor + if getattr(record, 'dynamic', False): + # TODO: should this just be a global cache by fqdn, type, and + # value? + expected_host = record.fqdn[:-1] + expected_type = record._type + + # TODO: cache here or in Ns1Client + for monitor in self._monitors.values(): + data = self._parse_notes(monitor['notes']) + if expected_host == data['host'] or \ + expected_type == data['type']: + # This monitor does not belong to this record + config = monitor['config'] + value = config['host'] + monitors[value] = monitor return monitors - def _sync_monitor(self, record, value, existing): + @property + def _datasource_id(self): + if self.__datasource_id is None: + name = 'octoDNS NS1 Data Source' + source = None + for candidate in self._client.datasource_list(): + if candidate['name'] == name: + # Found it + source = candidate + break + + if source is None: + # We need to create it + source = self._client \ + .datasource_create(name=name, + sourcetype='nsone_monitoring') + + self.__datasource_id = source['id'] + + return self.__datasource_id + + def _feed_for_monitor(self, monitor): + if self.__feeds_for_monitors is None: + self.__feeds_for_monitors = { + f['config']['jobid']: f['id'] + for f in self._client.datafeed_list(self._datasource_id) + } + + return self.__feeds_for_monitors.get(monitor['id']) + + def _create_monitor(self, monitor): + # TODO: looks like length limit is 64 char + name = '{} - {}'.format(monitor['name'], uuid4().hex[:6]) + + # Create the notify list + notify_list = [{ + 'config': { + 'sourceid': self._datasource_id, + }, + 'type': 'datafeed', + }] + nl = self._client.notifylists_create(name=name, + notify_list=notify_list) + + # Create the monitor + monitor['notify_list'] = nl['id'] + monitor = self._client.monitors_create(**monitor) + + # Create the data feed + config = { + 'jobid': monitor['id'], + } + feed = self._client.datafeed_create(self._datasource_id, name, + config) + + return monitor['id'], feed['id'] + + def _monitor_gen(self, record, value): host = record.fqdn[:-1] _type = record._type request = 'GET {path} HTTP/1.0\\r\\nHost: {host}\\r\\n' \ 'User-agent: NS1\\r\\n\\r\\n'.format(path=record.healthcheck_path, - host=host) + host=record.healthcheck_host) - expected = { + return { 'active': True, 'config': { 'connect_timeout': 2000, @@ -522,177 +658,189 @@ class Ns1Provider(BaseProvider): }], } + def _monitor_is_match(self, expected, have): + # Make sure what we have matches what's in expected exactly. Anything + # else in have will be ignored. + for k, v in expected.items(): + if have.get(k, '--missing--') != v: + return False + + return True + + def _monitor_sync(self, record, value, existing): + expected = self._monitor_gen(record, value) + if existing: monitor_id = existing['id'] - # See if the monitor needs updating - for k, v in expected.items(): - if existing.get(k, '--missing--') != v: - self._client.monitors_update(monitor_id, **expected) - break + + if not self._monitor_is_match(expected, existing): + # Update the monitor to match expected, everything else will be + # left alone and assumed correct + self._client.monitors_update(monitor_id, **expected) + + try: + feed_id = self._feed_for_monitor(existing) + except KeyError: + raise Ns1Exception('Failed to find the feed for {} ({})' + .format(existing['name'], existing['id'])) else: - return self._client.monitors_create(**expected)['id'] + # We don't have an existing monitor create it (and related bits) + monitor_id, feed_id = self._create_monitor(expected) - # TODO: this needs to return the feed - return None + return monitor_id, feed_id - def _params_for_A(self, record): - params = {'ttl': record.ttl} + def _gc_monitors(self, record, active_monitor_ids=None): - if hasattr(record, 'dynamic') and record.dynamic: - pools = record.dynamic.pools + if active_monitor_ids is None: + active_monitor_ids = set() - # Convert rules to regions - regions = {} - for i, rule in enumerate(record.dynamic.rules): - pool_name = rule.data['pool'] + for monitor in self._monitors_for(record).values(): + monitor_id = monitor['id'] + if monitor_id in active_monitor_ids: + continue - notes = { - 'rule-order': i, - } + feed_id = self._feed_for_monitor(monitor) + if feed_id: + self._client.datafeed_delete(self._datasource_id, feed_id) - fallback = pools[pool_name].data.get('fallback', None) - if fallback: - notes['fallback'] = fallback + self._client.monitors_delete(monitor_id) - country = set() - georegion = set() - us_state = set() + notify_list_id = monitor['notify_list'] + self._client.notifylists_delete(notify_list_id) - for geo in rule.data.get('geos', []): - n = len(geo) - if n == 8: - # US state - us_state.add(geo[-2:]) - elif n == 5: - # Country - country.add(geo[-2:]) - else: - # Continent - georegion.update(self._CONTINENT_TO_REGIONS[geo]) + def _params_for_dynamic_A(self, record): + pools = record.dynamic.pools - meta = { - 'note': self._encode_notes(notes), - } - if georegion: - meta['georegion'] = sorted(georegion) - if country: - meta['country'] = sorted(country) - if us_state: - meta['us_state'] = sorted(us_state) + # Convert rules to regions + regions = {} + for i, rule in enumerate(record.dynamic.rules): + pool_name = rule.data['pool'] - regions[pool_name] = { - 'meta': meta, - } + notes = { + 'rule-order': i, + } - existing_monitors = self._monitors_for(record) + fallback = pools[pool_name].data.get('fallback', None) + if fallback: + notes['fallback'] = fallback - # Build a list of primary values for each pool, including their - # monitor - pool_answers = defaultdict(list) - for pool_name, pool in sorted(pools.items()): - for value in pool.data['values']: - weight = value['weight'] - value = value['value'] - existing = existing_monitors.get(value) - monitor_id = self._sync_monitor(record, value, existing) - pool_answers[pool_name].append({ - 'answer': [value], - 'weight': weight, - 'monitor_id': monitor_id, - }) + country = set() + georegion = set() + us_state = set() - default_answers = [{ - 'answer': [v], - 'weight': 1, - } for v in record.values] + for geo in rule.data.get('geos', []): + n = len(geo) + if n == 8: + # US state + us_state.add(geo[-2:]) + elif n == 5: + # Country + country.add(geo[-2:]) + else: + # Continent + georegion.update(self._CONTINENT_TO_REGIONS[geo]) - # Build our list of answers - answers = [] - for pool_name in sorted(pools.keys()): - priority = 1 + meta = { + 'note': self._encode_notes(notes), + } + if georegion: + meta['georegion'] = sorted(georegion) + if country: + meta['country'] = sorted(country) + if us_state: + meta['us_state'] = sorted(us_state) - # Dynamic/health checked - current_pool_name = pool_name - while current_pool_name: - pool = pools[current_pool_name] - for answer in pool_answers[current_pool_name]: - answer = { - 'answer': answer['answer'], - 'meta': { - 'priority': priority, - 'note': self._encode_notes({ - 'from': current_pool_name, - }), - 'weight': answer['weight'], - }, - 'up': True, # TODO: this should be a monitor/feed - 'region': pool_name, # the one we're answering - } - answers.append(answer) + regions[pool_name] = { + 'meta': meta, + } - current_pool_name = pool.data.get('fallback', None) - priority += 1 + existing_monitors = self._monitors_for(record) + active_monitors = set() - # Static/default - for answer in default_answers: + # Build a list of primary values for each pool, including their + # feed_id (monitor) + pool_answers = defaultdict(list) + for pool_name, pool in sorted(pools.items()): + for value in pool.data['values']: + weight = value['weight'] + value = value['value'] + existing = existing_monitors.get(value) + monitor_id, feed_id = self._monitor_sync(record, value, + existing) + active_monitors.add(monitor_id) + pool_answers[pool_name].append({ + 'answer': [value], + 'weight': weight, + 'feed_id': feed_id, + }) + + default_answers = [{ + 'answer': [v], + 'weight': 1, + } for v in record.values] + + # Build our list of answers + answers = [] + for pool_name in sorted(pools.keys()): + priority = 1 + + # Dynamic/health checked + current_pool_name = pool_name + while current_pool_name: + pool = pools[current_pool_name] + for answer in pool_answers[current_pool_name]: answer = { 'answer': answer['answer'], 'meta': { 'priority': priority, 'note': self._encode_notes({ - 'from': '--default--', + 'from': current_pool_name, }), - 'up': True, - 'weight': 1, + 'up': { + 'feed': answer['feed_id'], + }, + 'weight': answer['weight'], }, 'region': pool_name, # the one we're answering } answers.append(answer) - params.update({ - 'answers': answers, - 'filters': self._DYNAMIC_FILTERS, - 'regions': regions, - }) + current_pool_name = pool.data.get('fallback', None) + priority += 1 - return params + # Static/default + for answer in default_answers: + answer = { + 'answer': answer['answer'], + 'meta': { + 'priority': priority, + 'note': self._encode_notes({ + 'from': '--default--', + }), + 'up': True, + 'weight': 1, + }, + 'region': pool_name, # the one we're answering + } + answers.append(answer) + return { + 'answers': answers, + 'filters': self._DYNAMIC_FILTERS, + 'regions': regions, + 'ttl': record.ttl, + }, active_monitors + + def _params_for_A(self, record): + if getattr(record, 'dynamic', False): + return self._params_for_dynamic_A(record) elif hasattr(record, 'geo'): - # purposefully set non-geo answers to have an empty meta, - # so that we know we did this on purpose if/when troubleshooting - params['answers'] = [{"answer": [x], "meta": {}} - for x in record.values] - has_country = False - for iso_region, target in record.geo.items(): - key = 'iso_region_code' - value = iso_region - if not has_country and \ - len(value.split('-')) > 1: # pragma: nocover - has_country = True - for answer in target.values: - params['answers'].append( - { - 'answer': [answer], - 'meta': {key: [value]}, - }, - ) - params['filters'] = [] - if has_country: - params['filters'].append( - {"filter": "shuffle", "config": {}} - ) - params['filters'].append( - {"filter": "geotarget_country", "config": {}} - ) - params['filters'].append( - {"filter": "select_first_n", - "config": {"N": 1}} - ) - else: - params['answers'] = record.values + return self._params_for_geo_A(record) - self.log.debug("params for A: %s", params) - return params + return { + 'answers': record.values, + 'ttl': record.ttl, + }, None _params_for_AAAA = _params_for_A _params_for_NS = _params_for_A @@ -702,49 +850,81 @@ class Ns1Provider(BaseProvider): # escaped in values so we have to strip them here and add # them when going the other way values = [v.replace('\\;', ';') for v in record.values] - return {'answers': values, 'ttl': record.ttl} + return {'answers': values, 'ttl': record.ttl}, None _params_for_TXT = _params_for_SPF def _params_for_CAA(self, record): values = [(v.flags, v.tag, v.value) for v in record.values] - return {'answers': values, 'ttl': record.ttl} + return {'answers': values, 'ttl': record.ttl}, None + # TODO: dynamic CNAME support def _params_for_CNAME(self, record): - return {'answers': [record.value], 'ttl': record.ttl} + return {'answers': [record.value], 'ttl': record.ttl}, None _params_for_ALIAS = _params_for_CNAME _params_for_PTR = _params_for_CNAME def _params_for_MX(self, record): values = [(v.preference, v.exchange) for v in record.values] - return {'answers': values, 'ttl': record.ttl} + return {'answers': values, 'ttl': record.ttl}, None def _params_for_NAPTR(self, record): values = [(v.order, v.preference, v.flags, v.service, v.regexp, v.replacement) for v in record.values] - return {'answers': values, 'ttl': record.ttl} + return {'answers': values, 'ttl': record.ttl}, None def _params_for_SRV(self, record): values = [(v.priority, v.weight, v.port, v.target) for v in record.values] - return {'answers': values, 'ttl': record.ttl} + return {'answers': values, 'ttl': record.ttl}, None + + def _extra_changes(self, desired, changes, **kwargs): + self.log.debug('_extra_changes: desired=%s', desired.name) + + changed = set([c.record for c in changes]) + + extra = [] + for record in desired.records: + if record in changed or not getattr(record, 'dynamic', False): + # Already changed, or no dynamic , no need to check it + continue + + for have in self._monitors_for(record).values(): + value = have['config']['host'] + expected = self._monitor_gen(record, value) + if not expected: + self.log.info('_extra_changes: monitor missing for %s', + expected['name']) + extra.append(Update(record, record)) + break + if not self._monitor_is_match(expected, have): + self.log.info('_extra_changes: monitor mis-match for %s', + expected['name']) + extra.append(Update(record, record)) + break + + return extra def _apply_Create(self, ns1_zone, change): new = change.new zone = new.zone.name[:-1] domain = new.fqdn[:-1] _type = new._type - params = getattr(self, '_params_for_{}'.format(_type))(new) + params, active_monitor_ids = \ + getattr(self, '_params_for_{}'.format(_type))(new) self._client.records_create(zone, domain, _type, **params) + self._gc_monitors(new, active_monitor_ids) def _apply_Update(self, ns1_zone, change): new = change.new zone = new.zone.name[:-1] domain = new.fqdn[:-1] _type = new._type - params = getattr(self, '_params_for_{}'.format(_type))(new) + params, active_monitor_ids = \ + getattr(self, '_params_for_{}'.format(_type))(new) self._client.records_update(zone, domain, _type, **params) + self._gc_monitors(new, active_monitor_ids) def _apply_Delete(self, ns1_zone, change): existing = change.existing @@ -752,6 +932,7 @@ class Ns1Provider(BaseProvider): domain = existing.fqdn[:-1] _type = existing._type self._client.records_delete(zone, domain, _type) + self._gc_monitors(existing) def _apply(self, plan): desired = plan.desired diff --git a/tests/test_octodns_provider_ns1.py b/tests/test_octodns_provider_ns1.py index fedcc2e..539fcfb 100644 --- a/tests/test_octodns_provider_ns1.py +++ b/tests/test_octodns_provider_ns1.py @@ -474,16 +474,16 @@ class TestNs1Provider(TestCase): 'type': 'SPF', 'value': 'foo\\; bar baz\\; blip' }) - self.assertEquals(['foo; bar baz; blip'], - provider._params_for_SPF(record)['answers']) + params, _ = provider._params_for_SPF(record) + self.assertEquals(['foo; bar baz; blip'], params['answers']) record = Record.new(zone, 'txt', { 'ttl': 35, 'type': 'TXT', 'value': 'foo\\; bar baz\\; blip' }) - self.assertEquals(['foo; bar baz; blip'], - provider._params_for_TXT(record)['answers']) + params, _ = provider._params_for_SPF(record) + self.assertEquals(['foo; bar baz; blip'], params['answers']) def test_data_for_CNAME(self): provider = Ns1Provider('test', 'api-key')