1
0
mirror of https://github.com/github/octodns.git synced 2024-05-11 05:55:00 +00:00

Merge remote-tracking branch 'origin/master' into extract-hetzner

This commit is contained in:
Ross McFarland
2022-01-13 10:01:14 -08:00
16 changed files with 76 additions and 3768 deletions

View File

@@ -7,7 +7,7 @@ jobs:
strategy:
matrix:
# Tested versions based on dates in https://devguide.python.org/devcycle/#end-of-life-branches,
python-version: [3.6, 3.7, 3.8, 3.9]
python-version: ['3.7', '3.8', '3.9']
steps:
- uses: actions/checkout@master
- name: Setup python

View File

@@ -16,6 +16,9 @@
* [DynProvider](https://github.com/octodns/octodns-dynprovider/)
* [EasyDnsProvider](https://github.com/octodns/octodns-easydns/)
* [EtcHostsProvider](https://github.com/octodns/octodns-etchosts/)
* [GandiProvider](https://github.com/octodns/octodns-gandi/)
* [GcoreProvider](https://github.com/octodns/octodns-gcore/)
* [GoogleCloudProvider](https://github.com/octodns/octodns-googlecloud/)
* [HetznerProvider](https://github.com/octodns/octodns-hetzner/)
* [Ns1Provider](https://github.com/octodns/octodns-ns1/)
* [PowerDnsProvider](https://github.com/octodns/octodns-powerdns/)

View File

@@ -203,9 +203,9 @@ The table below lists the providers octoDNS supports. We're currently in the pro
| [EasyDnsProvider](https://github.com/octodns/octodns-easydns/) | [octodns_easydns](https://github.com/octodns/octodns-easydns/) | | | | |
| [EtcHostsProvider](https://github.com/octodns/octodns-etchosts/) | [octodns_etchosts](https://github.com/octodns/octodns-etchosts/) | | | | |
| [EnvVarSource](/octodns/source/envvar.py) | | | TXT | No | read-only environment variable injection |
| [GandiProvider](/octodns/provider/gandi.py) | | | A, AAAA, ALIAS, CAA, CNAME, DNAME, MX, NS, PTR, SPF, SRV, SSHFP, TXT | No | |
| [GCoreProvider](/octodns/provider/gcore.py) | | | A, AAAA, NS, MX, TXT, SRV, CNAME, PTR | Dynamic | |
| [GoogleCloudProvider](/octodns/provider/googlecloud.py) | | google-cloud-dns | A, AAAA, CAA, CNAME, MX, NAPTR, NS, PTR, SPF, SRV, TXT | No | |
| [GandiProvider](https://github.com/octodns/octodns-gandi/) | [octodns_gandi](https://github.com/octodns/octodns-gandi/) | | | | |
| [GCoreProvider](https://github.com/octodns/octodns-gcore/) | [octodns_gcore](https://github.com/octodns/octodns-gcore/) | | | | |
| [GoogleCloudProvider](https://github.com/octodns/octodns-googlecloud/) | [octodns_googlecloud](https://github.com/octodns/octodns-googlecloud/) | | | | |
| [HetznerProvider](https://github.com/octodns/octodns-hetzner/) | [octodns_hetzner](https://github.com/octodns/octodns-hetzner/) | | | | |
| [MythicBeastsProvider](/octodns/provider/mythicbeasts.py) | | Mythic Beasts | A, AAAA, ALIAS, CNAME, MX, NS, SRV, SSHFP, CAA, TXT | No | |
| [Ns1Provider](https://github.com/octodns/octodns-ns1/) | [octodns_ns1](https://github.com/octodns/octodns-ns1/) | | | | |

View File

@@ -5,373 +5,19 @@
from __future__ import absolute_import, division, print_function, \
unicode_literals
from collections import defaultdict
from requests import Session
import logging
from ..record import Record
from . import ProviderException
from .base import BaseProvider
class GandiClientException(ProviderException):
pass
class GandiClientBadRequest(GandiClientException):
def __init__(self, r):
super(GandiClientBadRequest, self).__init__(r.text)
class GandiClientUnauthorized(GandiClientException):
def __init__(self, r):
super(GandiClientUnauthorized, self).__init__(r.text)
class GandiClientForbidden(GandiClientException):
def __init__(self, r):
super(GandiClientForbidden, self).__init__(r.text)
class GandiClientNotFound(GandiClientException):
def __init__(self, r):
super(GandiClientNotFound, self).__init__(r.text)
class GandiClientUnknownDomainName(GandiClientException):
def __init__(self, msg):
super(GandiClientUnknownDomainName, self).__init__(msg)
class GandiClient(object):
def __init__(self, token):
session = Session()
session.headers.update({'Authorization': f'Apikey {token}'})
self._session = session
self.endpoint = 'https://api.gandi.net/v5'
def _request(self, method, path, params={}, data=None):
url = f'{self.endpoint}{path}'
r = self._session.request(method, url, params=params, json=data)
if r.status_code == 400:
raise GandiClientBadRequest(r)
if r.status_code == 401:
raise GandiClientUnauthorized(r)
elif r.status_code == 403:
raise GandiClientForbidden(r)
elif r.status_code == 404:
raise GandiClientNotFound(r)
r.raise_for_status()
return r
def zone(self, zone_name):
return self._request('GET', f'/livedns/domains/{zone_name}').json()
def zone_create(self, zone_name):
return self._request('POST', '/livedns/domains', data={
'fqdn': zone_name,
'zone': {}
}).json()
def zone_records(self, zone_name):
records = self._request('GET',
f'/livedns/domains/{zone_name}/records').json()
for record in records:
if record['rrset_name'] == '@':
record['rrset_name'] = ''
# Change relative targets to absolute ones.
if record['rrset_type'] in ['ALIAS', 'CNAME', 'DNAME', 'MX',
'NS', 'SRV']:
for i, value in enumerate(record['rrset_values']):
if not value.endswith('.'):
record['rrset_values'][i] = f'{value}.{zone_name}.'
return records
def record_create(self, zone_name, data):
self._request('POST', f'/livedns/domains/{zone_name}/records',
data=data)
def record_delete(self, zone_name, record_name, record_type):
self._request('DELETE', f'/livedns/domains/{zone_name}/records/'
f'{record_name}/{record_type}')
class GandiProvider(BaseProvider):
'''
Gandi provider using API v5.
gandi:
class: octodns.provider.gandi.GandiProvider
# Your API key (required)
token: XXXXXXXXXXXX
'''
SUPPORTS_GEO = False
SUPPORTS_DYNAMIC = False
SUPPORTS = set((['A', 'AAAA', 'ALIAS', 'CAA', 'CNAME', 'DNAME',
'MX', 'NS', 'PTR', 'SPF', 'SRV', 'SSHFP', 'TXT']))
def __init__(self, id, token, *args, **kwargs):
self.log = logging.getLogger(f'GandiProvider[{id}]')
self.log.debug('__init__: id=%s, token=***', id)
super(GandiProvider, self).__init__(id, *args, **kwargs)
self._client = GandiClient(token)
self._zone_records = {}
def _data_for_multiple(self, _type, records):
return {
'ttl': records[0]['rrset_ttl'],
'type': _type,
'values': [v.replace(';', '\\;') for v in
records[0]['rrset_values']] if _type == 'TXT' else
records[0]['rrset_values']
}
_data_for_A = _data_for_multiple
_data_for_AAAA = _data_for_multiple
_data_for_TXT = _data_for_multiple
_data_for_SPF = _data_for_multiple
_data_for_NS = _data_for_multiple
def _data_for_CAA(self, _type, records):
values = []
for record in records[0]['rrset_values']:
flags, tag, value = record.split(' ')
values.append({
'flags': flags,
'tag': tag,
# Remove quotes around value.
'value': value[1:-1],
})
return {
'ttl': records[0]['rrset_ttl'],
'type': _type,
'values': values
}
def _data_for_single(self, _type, records):
return {
'ttl': records[0]['rrset_ttl'],
'type': _type,
'value': records[0]['rrset_values'][0]
}
_data_for_ALIAS = _data_for_single
_data_for_CNAME = _data_for_single
_data_for_DNAME = _data_for_single
_data_for_PTR = _data_for_single
def _data_for_MX(self, _type, records):
values = []
for record in records[0]['rrset_values']:
priority, server = record.split(' ')
values.append({
'preference': priority,
'exchange': server
})
return {
'ttl': records[0]['rrset_ttl'],
'type': _type,
'values': values
}
def _data_for_SRV(self, _type, records):
values = []
for record in records[0]['rrset_values']:
priority, weight, port, target = record.split(' ', 3)
values.append({
'priority': priority,
'weight': weight,
'port': port,
'target': target
})
return {
'ttl': records[0]['rrset_ttl'],
'type': _type,
'values': values
}
def _data_for_SSHFP(self, _type, records):
values = []
for record in records[0]['rrset_values']:
algorithm, fingerprint_type, fingerprint = record.split(' ', 2)
values.append({
'algorithm': algorithm,
'fingerprint': fingerprint,
'fingerprint_type': fingerprint_type
})
return {
'ttl': records[0]['rrset_ttl'],
'type': _type,
'values': values
}
def zone_records(self, zone):
if zone.name not in self._zone_records:
try:
self._zone_records[zone.name] = \
self._client.zone_records(zone.name[:-1])
except GandiClientNotFound:
return []
return self._zone_records[zone.name]
def populate(self, zone, target=False, lenient=False):
self.log.debug('populate: name=%s, target=%s, lenient=%s', zone.name,
target, lenient)
values = defaultdict(lambda: defaultdict(list))
for record in self.zone_records(zone):
_type = record['rrset_type']
if _type not in self.SUPPORTS:
continue
values[record['rrset_name']][record['rrset_type']].append(record)
before = len(zone.records)
for name, types in values.items():
for _type, records in types.items():
data_for = getattr(self, f'_data_for_{_type}')
record = Record.new(zone, name, data_for(_type, records),
source=self, lenient=lenient)
zone.add_record(record, lenient=lenient)
exists = zone.name in self._zone_records
self.log.info('populate: found %s records, exists=%s',
len(zone.records) - before, exists)
return exists
def _record_name(self, name):
return name if name else '@'
def _params_for_multiple(self, record):
return {
'rrset_name': self._record_name(record.name),
'rrset_ttl': record.ttl,
'rrset_type': record._type,
'rrset_values': [v.replace('\\;', ';') for v in
record.values] if record._type == 'TXT'
else record.values
}
_params_for_A = _params_for_multiple
_params_for_AAAA = _params_for_multiple
_params_for_NS = _params_for_multiple
_params_for_TXT = _params_for_multiple
_params_for_SPF = _params_for_multiple
def _params_for_CAA(self, record):
return {
'rrset_name': self._record_name(record.name),
'rrset_ttl': record.ttl,
'rrset_type': record._type,
'rrset_values': [f'{v.flags} {v.tag} "{v.value}"'
for v in record.values]
}
def _params_for_single(self, record):
return {
'rrset_name': self._record_name(record.name),
'rrset_ttl': record.ttl,
'rrset_type': record._type,
'rrset_values': [record.value]
}
_params_for_ALIAS = _params_for_single
_params_for_CNAME = _params_for_single
_params_for_DNAME = _params_for_single
_params_for_PTR = _params_for_single
def _params_for_MX(self, record):
return {
'rrset_name': self._record_name(record.name),
'rrset_ttl': record.ttl,
'rrset_type': record._type,
'rrset_values': [f'{v.preference} {v.exchange}'
for v in record.values]
}
def _params_for_SRV(self, record):
return {
'rrset_name': self._record_name(record.name),
'rrset_ttl': record.ttl,
'rrset_type': record._type,
'rrset_values': [f'{v.priority} {v.weight} {v.port} {v.target}'
for v in record.values]
}
def _params_for_SSHFP(self, record):
return {
'rrset_name': self._record_name(record.name),
'rrset_ttl': record.ttl,
'rrset_type': record._type,
'rrset_values': [f'{v.algorithm} {v.fingerprint_type} '
f'{v.fingerprint}' for v in record.values]
}
def _apply_create(self, change):
new = change.new
data = getattr(self, f'_params_for_{new._type}')(new)
self._client.record_create(new.zone.name[:-1], data)
def _apply_update(self, change):
self._apply_delete(change)
self._apply_create(change)
def _apply_delete(self, change):
existing = change.existing
zone = existing.zone
self._client.record_delete(zone.name[:-1],
self._record_name(existing.name),
existing._type)
def _apply(self, plan):
desired = plan.desired
changes = plan.changes
zone = desired.name[:-1]
self.log.debug('_apply: zone=%s, len(changes)=%d', desired.name,
len(changes))
try:
self._client.zone(zone)
except GandiClientNotFound:
self.log.info('_apply: no existing zone, trying to create it')
try:
self._client.zone_create(zone)
self.log.info('_apply: zone has been successfully created')
except GandiClientNotFound:
# We suppress existing exception before raising
# GandiClientUnknownDomainName.
e = GandiClientUnknownDomainName('This domain is not '
'registered at Gandi. '
'Please register or '
'transfer it here '
'to be able to manage its '
'DNS zone.')
e.__cause__ = None
raise e
# Force records deletion to be done before creation in order to avoid
# "CNAME record must be the only record" error when an existing CNAME
# record is replaced by an A/AAAA record.
changes.reverse()
for change in changes:
class_name = change.__class__.__name__
getattr(self, f'_apply_{class_name.lower()}')(change)
# Clear out the cache if any
self._zone_records.pop(desired.name, None)
from logging import getLogger
logger = getLogger('Gandi')
try:
logger.warn('octodns_gandi shimmed. Update your provider class to '
'octodns_gandi.GandiProvider. '
'Shim will be removed in 1.0')
from octodns_gandi import GandiProvider
GandiProvider # pragma: no cover
except ModuleNotFoundError:
logger.exception('GandiProvider has been moved into a seperate module, '
'octodns_gandi is now required. Provider class should '
'be updated to octodns_gandi.GandiProvider. See '
'https://github.com/octodns/octodns/README.md#updating-'
'to-use-extracted-providers for more information.')
raise

View File

@@ -2,615 +2,20 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from collections import defaultdict
from requests import Session
import http
import logging
import urllib.parse
from ..record import GeoCodes
from ..record import Record
from . import ProviderException
from .base import BaseProvider
class GCoreClientException(ProviderException):
def __init__(self, r):
super(GCoreClientException, self).__init__(r.text)
class GCoreClientBadRequest(GCoreClientException):
def __init__(self, r):
super(GCoreClientBadRequest, self).__init__(r)
class GCoreClientNotFound(GCoreClientException):
def __init__(self, r):
super(GCoreClientNotFound, self).__init__(r)
class GCoreClient(object):
ROOT_ZONES = "zones"
def __init__(
self,
log,
api_url,
auth_url,
token=None,
token_type=None,
login=None,
password=None,
):
self.log = log
self._session = Session()
self._api_url = api_url
if token is not None and token_type is not None:
self._session.headers.update(
{"Authorization": f"{token_type} {token}"}
)
elif login is not None and password is not None:
token = self._auth(auth_url, login, password)
self._session.headers.update(
{"Authorization": f"Bearer {token}"}
)
else:
raise ValueError("either token or login & password must be set")
def _auth(self, url, login, password):
# well, can't use _request, since API returns 400 if credentials
# invalid which will be logged, but we don't want do this
r = self._session.request(
"POST",
self._build_url(url, "auth", "jwt", "login"),
json={"username": login, "password": password},
)
r.raise_for_status()
return r.json()["access"]
def _request(self, method, url, params=None, data=None):
r = self._session.request(
method, url, params=params, json=data, timeout=30.0
)
if r.status_code == http.HTTPStatus.BAD_REQUEST:
self.log.error(
"bad request %r has been sent to %r: %s", data, url, r.text
)
raise GCoreClientBadRequest(r)
elif r.status_code == http.HTTPStatus.NOT_FOUND:
self.log.error("resource %r not found: %s", url, r.text)
raise GCoreClientNotFound(r)
elif r.status_code == http.HTTPStatus.INTERNAL_SERVER_ERROR:
self.log.error("server error no %r to %r: %s", data, url, r.text)
raise GCoreClientException(r)
r.raise_for_status()
return r
def zone(self, zone_name):
return self._request(
"GET", self._build_url(self._api_url, self.ROOT_ZONES, zone_name)
).json()
def zone_create(self, zone_name):
return self._request(
"POST",
self._build_url(self._api_url, self.ROOT_ZONES),
data={"name": zone_name},
).json()
def zone_records(self, zone_name):
url = self._build_url(self._api_url, self.ROOT_ZONES, zone_name,
"rrsets")
rrsets = self._request("GET", url, params={"all": "true"}).json()
records = rrsets["rrsets"]
return records
def record_create(self, zone_name, rrset_name, type_, data):
self._request(
"POST", self._rrset_url(zone_name, rrset_name, type_), data=data
)
def record_update(self, zone_name, rrset_name, type_, data):
self._request(
"PUT", self._rrset_url(zone_name, rrset_name, type_), data=data
)
def record_delete(self, zone_name, rrset_name, type_):
self._request("DELETE", self._rrset_url(zone_name, rrset_name, type_))
def _rrset_url(self, zone_name, rrset_name, type_):
return self._build_url(
self._api_url, self.ROOT_ZONES, zone_name, rrset_name, type_
)
@staticmethod
def _build_url(base, *items):
for i in items:
base = base.strip("/") + "/"
base = urllib.parse.urljoin(base, i)
return base
class GCoreProvider(BaseProvider):
"""
GCore provider using API v2.
gcore:
class: octodns.provider.gcore.GCoreProvider
# Your API key
token: XXXXXXXXXXXX
# token_type: APIKey
# or login + password
login: XXXXXXXXXXXX
password: XXXXXXXXXXXX
# auth_url: https://api.gcdn.co
# url: https://dnsapi.gcorelabs.com/v2
# records_per_response: 1
"""
SUPPORTS_GEO = False
SUPPORTS_DYNAMIC = True
SUPPORTS = set(("A", "AAAA", "NS", "MX", "TXT", "SRV", "CNAME", "PTR"))
def __init__(self, id, *args, **kwargs):
token = kwargs.pop("token", None)
token_type = kwargs.pop("token_type", "APIKey")
login = kwargs.pop("login", None)
password = kwargs.pop("password", None)
api_url = kwargs.pop("url", "https://dnsapi.gcorelabs.com/v2")
auth_url = kwargs.pop("auth_url", "https://api.gcdn.co")
self.records_per_response = kwargs.pop("records_per_response", 1)
self.log = logging.getLogger(f"GCoreProvider[{id}]")
self.log.debug("__init__: id=%s", id)
super(GCoreProvider, self).__init__(id, *args, **kwargs)
self._client = GCoreClient(
self.log,
api_url,
auth_url,
token=token,
token_type=token_type,
login=login,
password=password,
)
def _add_dot_if_need(self, value):
return f"{value}." if not value.endswith(".") else value
def _build_pools(self, record, default_pool_name, value_transform_fn):
defaults = []
geo_sets, pool_idx = dict(), 0
pools = defaultdict(lambda: {"values": []})
for rr in record["resource_records"]:
meta = rr.get("meta", {}) or {}
value = {"value": value_transform_fn(rr["content"][0])}
countries = meta.get("countries", []) or []
continents = meta.get("continents", []) or []
if meta.get("default", False):
pools[default_pool_name]["values"].append(value)
defaults.append(value["value"])
continue
# defaults is false or missing and no conties or continents
elif len(continents) == 0 and len(countries) == 0:
defaults.append(value["value"])
continue
# RR with the same set of countries and continents are
# combined in single pool
geo_set = frozenset(
[GeoCodes.country_to_code(cc.upper()) for cc in countries]
) | frozenset(cc.upper() for cc in continents)
if geo_set not in geo_sets:
geo_sets[geo_set] = f"pool-{pool_idx}"
pool_idx += 1
pools[geo_sets[geo_set]]["values"].append(value)
return pools, geo_sets, defaults
def _build_rules(self, pools, geo_sets):
rules = []
for name, _ in pools.items():
rule = {"pool": name}
geo_set = next(
(
geo_set
for geo_set, pool_name in geo_sets.items()
if pool_name == name
),
{},
)
if len(geo_set) > 0:
rule["geos"] = list(geo_set)
rules.append(rule)
return sorted(rules, key=lambda x: x["pool"])
def _data_for_dynamic(self, record, value_transform_fn=lambda x: x):
default_pool = "other"
pools, geo_sets, defaults = self._build_pools(
record, default_pool, value_transform_fn
)
if len(pools) == 0:
raise RuntimeError(
f"filter is enabled, but no pools where built for {record}"
)
# defaults can't be empty, so use first pool values
if len(defaults) == 0:
defaults = [
value_transform_fn(v["value"])
for v in next(iter(pools.values()))["values"]
]
# if at least one default RR was found then setup fallback for
# other pools to default
if default_pool in pools:
for pool_name, pool in pools.items():
if pool_name == default_pool:
continue
pool["fallback"] = default_pool
rules = self._build_rules(pools, geo_sets)
return pools, rules, defaults
def _data_for_single(self, _type, record):
return {
"ttl": record["ttl"],
"type": _type,
"value": self._add_dot_if_need(
record["resource_records"][0]["content"][0]
),
}
_data_for_PTR = _data_for_single
def _data_for_CNAME(self, _type, record):
if record.get("filters") is None:
return self._data_for_single(_type, record)
pools, rules, defaults = self._data_for_dynamic(
record, self._add_dot_if_need
)
return {
"ttl": record["ttl"],
"type": _type,
"dynamic": {"pools": pools, "rules": rules},
"value": self._add_dot_if_need(defaults[0]),
}
def _data_for_multiple(self, _type, record):
extra = dict()
if record.get("filters") is not None:
pools, rules, defaults = self._data_for_dynamic(record)
extra = {
"dynamic": {"pools": pools, "rules": rules},
"values": defaults,
}
else:
extra = {
"values": [
rr_value
for resource_record in record["resource_records"]
for rr_value in resource_record["content"]
]
}
return {
"ttl": record["ttl"],
"type": _type,
**extra,
}
_data_for_A = _data_for_multiple
_data_for_AAAA = _data_for_multiple
def _data_for_TXT(self, _type, record):
return {
"ttl": record["ttl"],
"type": _type,
"values": [
rr_value.replace(";", "\\;")
for resource_record in record["resource_records"]
for rr_value in resource_record["content"]
],
}
def _data_for_MX(self, _type, record):
return {
"ttl": record["ttl"],
"type": _type,
"values": [
dict(
preference=preference,
exchange=self._add_dot_if_need(exchange),
)
for preference, exchange in map(
lambda x: x["content"], record["resource_records"]
)
],
}
def _data_for_NS(self, _type, record):
return {
"ttl": record["ttl"],
"type": _type,
"values": [
self._add_dot_if_need(rr_value)
for resource_record in record["resource_records"]
for rr_value in resource_record["content"]
],
}
def _data_for_SRV(self, _type, record):
return {
"ttl": record["ttl"],
"type": _type,
"values": [
dict(
priority=priority,
weight=weight,
port=port,
target=self._add_dot_if_need(target),
)
for priority, weight, port, target in map(
lambda x: x["content"], record["resource_records"]
)
],
}
def zone_records(self, zone):
try:
return self._client.zone_records(zone.name[:-1]), True
except GCoreClientNotFound:
return [], False
def populate(self, zone, target=False, lenient=False):
self.log.debug(
"populate: name=%s, target=%s, lenient=%s",
zone.name,
target,
lenient,
)
values = defaultdict(defaultdict)
records, exists = self.zone_records(zone)
for record in records:
_type = record["type"].upper()
if _type not in self.SUPPORTS:
continue
if self._should_ignore(record):
continue
rr_name = zone.hostname_from_fqdn(record["name"])
values[rr_name][_type] = record
before = len(zone.records)
for name, types in values.items():
for _type, record in types.items():
data_for = getattr(self, f"_data_for_{_type}")
record = Record.new(
zone,
name,
data_for(_type, record),
source=self,
lenient=lenient,
)
zone.add_record(record, lenient=lenient)
self.log.info(
"populate: found %s records, exists=%s",
len(zone.records) - before,
exists,
)
return exists
def _should_ignore(self, record):
name = record.get("name", "name-not-defined")
if record.get("filters") is None:
return False
want_filters = 3
filters = record.get("filters", [])
if len(filters) != want_filters:
self.log.info(
"ignore %s has filters and their count is not %d",
name,
want_filters,
)
return True
types = [v.get("type") for v in filters]
for i, want_type in enumerate(["geodns", "default", "first_n"]):
if types[i] != want_type:
self.log.info(
"ignore %s, filters.%d.type is %s, want %s",
name,
i,
types[i],
want_type,
)
return True
limits = [filters[i].get("limit", 1) for i in [1, 2]]
if limits[0] != limits[1]:
self.log.info(
"ignore %s, filters.1.limit (%d) != filters.2.limit (%d)",
name,
limits[0],
limits[1],
)
return True
return False
def _params_for_dymanic(self, record):
records = []
default_pool_found = False
default_values = set(
record.values if hasattr(record, "values") else [record.value]
)
for rule in record.dynamic.rules:
meta = dict()
# build meta tags if geos information present
if len(rule.data.get("geos", [])) > 0:
for geo_code in rule.data["geos"]:
geo = GeoCodes.parse(geo_code)
country = geo["country_code"]
continent = geo["continent_code"]
if country is not None:
meta.setdefault("countries", []).append(country)
else:
meta.setdefault("continents", []).append(continent)
else:
meta["default"] = True
pool_values = set()
pool_name = rule.data["pool"]
for value in record.dynamic.pools[pool_name].data["values"]:
v = value["value"]
records.append({"content": [v], "meta": meta})
pool_values.add(v)
default_pool_found |= default_values == pool_values
# if default values doesn't match any pool values, then just add this
# values with no any meta
if not default_pool_found:
for value in default_values:
records.append({"content": [value]})
return records
def _params_for_single(self, record):
return {
"ttl": record.ttl,
"resource_records": [{"content": [record.value]}],
}
_params_for_PTR = _params_for_single
def _params_for_CNAME(self, record):
if not record.dynamic:
return self._params_for_single(record)
return {
"ttl": record.ttl,
"resource_records": self._params_for_dymanic(record),
"filters": [
{"type": "geodns"},
{
"type": "default",
"limit": self.records_per_response,
"strict": False,
},
{"type": "first_n", "limit": self.records_per_response},
],
}
def _params_for_multiple(self, record):
extra = dict()
if record.dynamic:
extra["resource_records"] = self._params_for_dymanic(record)
extra["filters"] = [
{"type": "geodns"},
{
"type": "default",
"limit": self.records_per_response,
"strict": False,
},
{"type": "first_n", "limit": self.records_per_response},
]
else:
extra["resource_records"] = [
{"content": [value]} for value in record.values
]
return {
"ttl": record.ttl,
**extra,
}
_params_for_A = _params_for_multiple
_params_for_AAAA = _params_for_multiple
def _params_for_NS(self, record):
return {
"ttl": record.ttl,
"resource_records": [
{"content": [value]} for value in record.values
],
}
def _params_for_TXT(self, record):
return {
"ttl": record.ttl,
"resource_records": [
{"content": [value.replace("\\;", ";")]}
for value in record.values
],
}
def _params_for_MX(self, record):
return {
"ttl": record.ttl,
"resource_records": [
{"content": [rec.preference, rec.exchange]}
for rec in record.values
],
}
def _params_for_SRV(self, record):
return {
"ttl": record.ttl,
"resource_records": [
{"content": [rec.priority, rec.weight, rec.port, rec.target]}
for rec in record.values
],
}
def _apply_create(self, change):
self.log.info("creating: %s", change)
new = change.new
data = getattr(self, f"_params_for_{new._type}")(new)
self._client.record_create(
new.zone.name[:-1], new.fqdn, new._type, data
)
def _apply_update(self, change):
self.log.info("updating: %s", change)
new = change.new
data = getattr(self, f"_params_for_{new._type}")(new)
self._client.record_update(
new.zone.name[:-1], new.fqdn, new._type, data
)
def _apply_delete(self, change):
self.log.info("deleting: %s", change)
existing = change.existing
self._client.record_delete(
existing.zone.name[:-1], existing.fqdn, existing._type
)
def _apply(self, plan):
desired = plan.desired
changes = plan.changes
zone = desired.name[:-1]
self.log.debug(
"_apply: zone=%s, len(changes)=%d", desired.name, len(changes)
)
try:
self._client.zone(zone)
except GCoreClientNotFound:
self.log.info("_apply: no existing zone, trying to create it")
self._client.zone_create(zone)
self.log.info("_apply: zone has been successfully created")
changes.reverse()
for change in changes:
class_name = change.__class__.__name__
getattr(self, f"_apply_{class_name.lower()}")(change)
from __future__ import absolute_import, division, print_function, \
unicode_literals
from logging import getLogger
logger = getLogger('GCore')
try:
logger.warn('octodns_gcore shimmed. Update your provider class to '
'octodns_gcore.GCoreProvider. '
'Shim will be removed in 1.0')
from octodns_gcore import GCoreProvider
GCoreProvider # pragma: no cover
except ModuleNotFoundError:
logger.exception('GCoreProvider has been moved into a seperate module, '
'octodns_gcore is now required. Provider class should '
'be updated to octodns_gcore.GCoreProvider')
raise

View File

@@ -5,334 +5,18 @@
from __future__ import absolute_import, division, print_function, \
unicode_literals
import shlex
import time
from logging import getLogger
from uuid import uuid4
import re
from google.cloud import dns
from .base import BaseProvider
from ..record import Record
class GoogleCloudProvider(BaseProvider):
"""
Google Cloud DNS provider
google_cloud:
class: octodns.provider.googlecloud.GoogleCloudProvider
# Credentials file for a service_account or other account can be
# specified with the GOOGLE_APPLICATION_CREDENTIALS environment
# variable. (https://console.cloud.google.com/apis/credentials)
#
# The project to work on (not required)
# project: foobar
#
# The File with the google credentials (not required). If used, the
# "project" parameter needs to be set, else it will fall back to the
# "default credentials"
# credentials_file: ~/google_cloud_credentials_file.json
#
"""
SUPPORTS = set(('A', 'AAAA', 'CAA', 'CNAME', 'MX', 'NAPTR',
'NS', 'PTR', 'SPF', 'SRV', 'TXT'))
SUPPORTS_GEO = False
SUPPORTS_DYNAMIC = False
CHANGE_LOOP_WAIT = 5
def __init__(self, id, project=None, credentials_file=None,
*args, **kwargs):
if credentials_file:
self.gcloud_client = dns.Client.from_service_account_json(
credentials_file, project=project)
else:
self.gcloud_client = dns.Client(project=project)
# Logger
self.log = getLogger(f'GoogleCloudProvider[{id}]')
self.id = id
self._gcloud_zones = {}
super(GoogleCloudProvider, self).__init__(id, *args, **kwargs)
def _apply(self, plan):
"""Required function of manager.py to actually apply a record change.
:param plan: Contains the zones and changes to be made
:type plan: octodns.provider.base.Plan
:type return: void
"""
desired = plan.desired
changes = plan.changes
self.log.debug('_apply: zone=%s, len(changes)=%d', desired.name,
len(changes))
# Get gcloud zone, or create one if none existed before.
if desired.name not in self.gcloud_zones:
gcloud_zone = self._create_gcloud_zone(desired.name)
else:
gcloud_zone = self.gcloud_zones.get(desired.name)
gcloud_changes = gcloud_zone.changes()
for change in changes:
class_name = change.__class__.__name__
_rrset_func = getattr(self, f'_rrset_for_{change.record._type}')
if class_name == 'Create':
gcloud_changes.add_record_set(
_rrset_func(gcloud_zone, change.record))
elif class_name == 'Delete':
gcloud_changes.delete_record_set(
_rrset_func(gcloud_zone, change.record))
elif class_name == 'Update':
gcloud_changes.delete_record_set(
_rrset_func(gcloud_zone, change.existing))
gcloud_changes.add_record_set(
_rrset_func(gcloud_zone, change.new))
else:
msg = f'Change type "{class_name}" for change ' \
f'"{str(change)}" is none of "Create", "Delete" or "Update'
raise RuntimeError(msg)
gcloud_changes.create()
for i in range(120):
gcloud_changes.reload()
# https://cloud.google.com/dns/api/v1/changes#resource
# status can be one of either "pending" or "done"
if gcloud_changes.status != 'pending':
break
self.log.debug("Waiting for changes to complete")
time.sleep(self.CHANGE_LOOP_WAIT)
if gcloud_changes.status != 'done':
timeout = i * self.CHANGE_LOOP_WAIT
raise RuntimeError(f"Timeout reached after {timeout} seconds")
def _create_gcloud_zone(self, dns_name):
"""Creates a google cloud ManagedZone with dns_name, and zone named
derived from it. calls .create() method and returns it.
:param dns_name: fqdn of zone to create
:type dns_name: str
:type return: new google.cloud.dns.ManagedZone
"""
# Zone name must begin with a letter, end with a letter or digit,
# and only contain lowercase letters, digits or dashes,
# and be 63 characters or less
zone_name = f'zone-{dns_name.replace(".", "-")}-{uuid4().hex}'[:63]
gcloud_zone = self.gcloud_client.zone(
name=zone_name,
dns_name=dns_name
)
gcloud_zone.create(client=self.gcloud_client)
# add this new zone to the list of zones.
self._gcloud_zones[gcloud_zone.dns_name] = gcloud_zone
self.log.info(f"Created zone {zone_name}. Fqdn {dns_name}.")
return gcloud_zone
def _get_gcloud_records(self, gcloud_zone, page_token=None):
""" Generator function which yields ResourceRecordSet for the managed
gcloud zone, until there are no more records to pull.
:param gcloud_zone: zone to pull records from
:type gcloud_zone: google.cloud.dns.ManagedZone
:param page_token: page token for the page to get
:return: a resource record set
:type return: google.cloud.dns.ResourceRecordSet
"""
gcloud_iterator = gcloud_zone.list_resource_record_sets(
page_token=page_token)
for gcloud_record in gcloud_iterator:
yield gcloud_record
# This is to get results which may be on a "paged" page.
# (if more than max_results) entries.
if gcloud_iterator.next_page_token:
for gcloud_record in self._get_gcloud_records(
gcloud_zone, gcloud_iterator.next_page_token):
# yield from is in python 3 only.
yield gcloud_record
def _get_cloud_zones(self, page_token=None):
"""Load all ManagedZones into the self._gcloud_zones dict which is
mapped with the dns_name as key.
:return: void
"""
gcloud_zones = self.gcloud_client.list_zones(page_token=page_token)
for gcloud_zone in gcloud_zones:
self._gcloud_zones[gcloud_zone.dns_name] = gcloud_zone
if gcloud_zones.next_page_token:
self._get_cloud_zones(gcloud_zones.next_page_token)
@property
def gcloud_zones(self):
if not self._gcloud_zones:
self._get_cloud_zones()
return self._gcloud_zones
def populate(self, zone, target=False, lenient=False):
"""Required function of manager.py to collect records from zone.
:param zone: A dns zone
:type zone: octodns.zone.Zone
:param target: Unused.
:type target: bool
:param lenient: Unused. Check octodns.manager for usage.
:type lenient: bool
:type return: void
"""
self.log.debug('populate: name=%s, target=%s, lenient=%s', zone.name,
target, lenient)
exists = False
before = len(zone.records)
gcloud_zone = self.gcloud_zones.get(zone.name)
if gcloud_zone:
exists = True
for gcloud_record in self._get_gcloud_records(gcloud_zone):
if gcloud_record.record_type.upper() not in self.SUPPORTS:
continue
record_name = gcloud_record.name
if record_name.endswith(zone.name):
# google cloud always return fqdn. Make relative record
# here. "root" records will then get the '' record_name,
# which is also the way octodns likes it.
record_name = record_name[:-(len(zone.name) + 1)]
typ = gcloud_record.record_type.upper()
data = getattr(self, f'_data_for_{typ}')
data = data(gcloud_record)
data['type'] = typ
data['ttl'] = gcloud_record.ttl
self.log.debug('populate: adding record %s records: %s',
record_name, data)
record = Record.new(zone, record_name, data, source=self)
zone.add_record(record, lenient=lenient)
self.log.info('populate: found %s records, exists=%s',
len(zone.records) - before, exists)
return exists
def _data_for_A(self, gcloud_record):
return {
'values': gcloud_record.rrdatas
}
_data_for_AAAA = _data_for_A
def _data_for_CAA(self, gcloud_record):
return {
'values': [{
'flags': v[0],
'tag': v[1],
'value': v[2]}
for v in [shlex.split(g) for g in gcloud_record.rrdatas]]}
def _data_for_CNAME(self, gcloud_record):
return {
'value': gcloud_record.rrdatas[0]
}
def _data_for_MX(self, gcloud_record):
return {'values': [{
"preference": v[0],
"exchange": v[1]}
for v in [shlex.split(g) for g in gcloud_record.rrdatas]]}
def _data_for_NAPTR(self, gcloud_record):
return {'values': [{
'order': v[0],
'preference': v[1],
'flags': v[2],
'service': v[3],
'regexp': v[4],
'replacement': v[5]}
for v in [shlex.split(g) for g in gcloud_record.rrdatas]]}
_data_for_NS = _data_for_A
_data_for_PTR = _data_for_CNAME
_fix_semicolons = re.compile(r'(?<!\\);')
def _data_for_SPF(self, gcloud_record):
if len(gcloud_record.rrdatas) > 1:
return {
'values': [self._fix_semicolons.sub('\\;', rr)
for rr in gcloud_record.rrdatas]}
return {
'value': self._fix_semicolons.sub('\\;', gcloud_record.rrdatas[0])}
def _data_for_SRV(self, gcloud_record):
return {'values': [{
'priority': v[0],
'weight': v[1],
'port': v[2],
'target': v[3]}
for v in [shlex.split(g) for g in gcloud_record.rrdatas]]}
_data_for_TXT = _data_for_SPF
def _rrset_for_A(self, gcloud_zone, record):
return gcloud_zone.resource_record_set(
record.fqdn, record._type, record.ttl, record.values)
_rrset_for_AAAA = _rrset_for_A
def _rrset_for_CAA(self, gcloud_zone, record):
return gcloud_zone.resource_record_set(
record.fqdn, record._type, record.ttl, [
f'{v.flags} {v.tag} {v.value}' for v in record.values])
def _rrset_for_CNAME(self, gcloud_zone, record):
return gcloud_zone.resource_record_set(
record.fqdn, record._type, record.ttl, [record.value])
def _rrset_for_MX(self, gcloud_zone, record):
return gcloud_zone.resource_record_set(
record.fqdn, record._type, record.ttl, [
f'{v.preference} {v.exchange}' for v in record.values])
def _rrset_for_NAPTR(self, gcloud_zone, record):
return gcloud_zone.resource_record_set(
record.fqdn, record._type, record.ttl, [
f'{v.order} {v.preference} "{v.flags}" "{v.service}" '
f'"{v.regexp}" {v.replacement}' for v in record.values])
_rrset_for_NS = _rrset_for_A
_rrset_for_PTR = _rrset_for_CNAME
def _rrset_for_SPF(self, gcloud_zone, record):
return gcloud_zone.resource_record_set(
record.fqdn, record._type, record.ttl, record.chunked_values)
def _rrset_for_SRV(self, gcloud_zone, record):
return gcloud_zone.resource_record_set(
record.fqdn, record._type, record.ttl, [
f'{v.priority} {v.weight} {v.port} {v.target}'
for v in record.values])
_rrset_for_TXT = _rrset_for_SPF
logger = getLogger('GoogleCloud')
try:
logger.warn('octodns_googlecloud shimmed. Update your provider class to '
'octodns_googlecloud.GoogleCloudProvider. '
'Shim will be removed in 1.0')
from octodns_googlecloud import GoogleCloudProvider
GoogleCloudProvider # pragma: no cover
except ModuleNotFoundError:
logger.exception('GoogleCloudProvider has been moved into a seperate '
'module, octodns_googlecloud is now required. Provider '
'class should be updated to '
'octodns_googlecloud.GoogleCloudProvider')
raise

View File

@@ -2,14 +2,12 @@ PyYaml==5.4
dnspython==1.16.0
docutils==0.16
fqdn==1.5.0
google-cloud-core==1.4.1
google-cloud-dns==0.32.0
jmespath==0.10.0
natsort==6.2.1
ovh==0.5.0
pycountry-convert==0.7.2
pycountry==20.7.3
pycountry==22.1.10
python-dateutil==2.8.1
requests==2.25.1
setuptools==44.1.1
setuptools==60.5.0
python-transip==0.5.0

View File

@@ -1,154 +0,0 @@
[
{
"rrset_type": "A",
"rrset_ttl": 300,
"rrset_name": "@",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/%40/A",
"rrset_values": [
"1.2.3.4",
"1.2.3.5"
]
},
{
"rrset_type": "CAA",
"rrset_ttl": 3600,
"rrset_name": "@",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/%40/CAA",
"rrset_values": [
"0 issue \"ca.unit.tests\""
]
},
{
"rrset_type": "SSHFP",
"rrset_ttl": 3600,
"rrset_name": "@",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/%40/SSHFP",
"rrset_values": [
"1 1 7491973e5f8b39d5327cd4e08bc81b05f7710b49",
"1 1 bf6b6825d2977c511a475bbefb88aad54a92ac73"
]
},
{
"rrset_type": "AAAA",
"rrset_ttl": 600,
"rrset_name": "aaaa",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/aaaa/AAAA",
"rrset_values": [
"2601:644:500:e210:62f8:1dff:feb8:947a"
]
},
{
"rrset_type": "CNAME",
"rrset_ttl": 300,
"rrset_name": "cname",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/cname/CNAME",
"rrset_values": [
"unit.tests."
]
},
{
"rrset_type": "DNAME",
"rrset_ttl": 300,
"rrset_name": "dname",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/dname/DNAME",
"rrset_values": [
"unit.tests."
]
},
{
"rrset_type": "CNAME",
"rrset_ttl": 3600,
"rrset_name": "excluded",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/excluded/CNAME",
"rrset_values": [
"unit.tests."
]
},
{
"rrset_type": "MX",
"rrset_ttl": 300,
"rrset_name": "mx",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/mx/MX",
"rrset_values": [
"10 smtp-4.unit.tests.",
"20 smtp-2.unit.tests.",
"30 smtp-3.unit.tests.",
"40 smtp-1.unit.tests."
]
},
{
"rrset_type": "PTR",
"rrset_ttl": 300,
"rrset_name": "ptr",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/ptr/PTR",
"rrset_values": [
"foo.bar.com."
]
},
{
"rrset_type": "SPF",
"rrset_ttl": 600,
"rrset_name": "spf",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/spf/SPF",
"rrset_values": [
"\"v=spf1 ip4:192.168.0.1/16-all\""
]
},
{
"rrset_type": "TXT",
"rrset_ttl": 600,
"rrset_name": "txt",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/txt/TXT",
"rrset_values": [
"\"Bah bah black sheep\"",
"\"have you any wool.\"",
"\"v=DKIM1;k=rsa;s=email;h=sha256;p=A/kinda+of/long/string+with+numb3rs\""
]
},
{
"rrset_type": "A",
"rrset_ttl": 300,
"rrset_name": "www",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/www/A",
"rrset_values": [
"2.2.3.6"
]
},
{
"rrset_type": "A",
"rrset_ttl": 300,
"rrset_name": "www.sub",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/www.sub/A",
"rrset_values": [
"2.2.3.6"
]
},
{
"rrset_type": "SRV",
"rrset_ttl": 600,
"rrset_name": "_imap._tcp",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/_imap._tcp/SRV",
"rrset_values": [
"0 0 0 ."
]
},
{
"rrset_type": "SRV",
"rrset_ttl": 600,
"rrset_name": "_pop3._tcp",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/_pop3._tcp/SRV",
"rrset_values": [
"0 0 0 ."
]
},
{
"rrset_type": "SRV",
"rrset_ttl": 600,
"rrset_name": "_srv._tcp",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/_srv._tcp/SRV",
"rrset_values": [
"10 20 30 foo-1.unit.tests.",
"12 20 30 foo-2.unit.tests."
]
}
]

View File

@@ -1,111 +0,0 @@
[
{
"rrset_type": "A",
"rrset_ttl": 10800,
"rrset_name": "@",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/%40/A",
"rrset_values": [
"217.70.184.38"
]
},
{
"rrset_type": "MX",
"rrset_ttl": 10800,
"rrset_name": "@",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/%40/MX",
"rrset_values": [
"10 spool.mail.gandi.net.",
"50 fb.mail.gandi.net."
]
},
{
"rrset_type": "TXT",
"rrset_ttl": 10800,
"rrset_name": "@",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/%40/TXT",
"rrset_values": [
"\"v=spf1 include:_mailcust.gandi.net ?all\""
]
},
{
"rrset_type": "CNAME",
"rrset_ttl": 10800,
"rrset_name": "webmail",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/webmail/CNAME",
"rrset_values": [
"webmail.gandi.net."
]
},
{
"rrset_type": "CNAME",
"rrset_ttl": 10800,
"rrset_name": "www",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/www/CNAME",
"rrset_values": [
"webredir.vip.gandi.net."
]
},
{
"rrset_type": "SRV",
"rrset_ttl": 10800,
"rrset_name": "_imap._tcp",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/_imap._tcp/SRV",
"rrset_values": [
"0 0 0 ."
]
},
{
"rrset_type": "SRV",
"rrset_ttl": 10800,
"rrset_name": "_imaps._tcp",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/_imaps._tcp/SRV",
"rrset_values": [
"0 1 993 mail.gandi.net."
]
},
{
"rrset_type": "SRV",
"rrset_ttl": 10800,
"rrset_name": "_pop3._tcp",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/_pop3._tcp/SRV",
"rrset_values": [
"0 0 0 ."
]
},
{
"rrset_type": "SRV",
"rrset_ttl": 10800,
"rrset_name": "_pop3s._tcp",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/_pop3s._tcp/SRV",
"rrset_values": [
"10 1 995 mail.gandi.net."
]
},
{
"rrset_type": "SRV",
"rrset_ttl": 10800,
"rrset_name": "_submission._tcp",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/_submission._tcp/SRV",
"rrset_values": [
"0 1 465 mail.gandi.net."
]
},
{
"rrset_type": "CDS",
"rrset_ttl": 10800,
"rrset_name": "sub",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/sub/CDS",
"rrset_values": [
"32128 13 1 6823D9BB1B03DF714DD0EB163E20B341C96D18C0"
]
},
{
"rrset_type": "CNAME",
"rrset_ttl": 10800,
"rrset_name": "relative",
"rrset_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records/relative/CNAME",
"rrset_values": [
"target"
]
}
]

View File

@@ -1,7 +0,0 @@
{
"domain_keys_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/keys",
"fqdn": "unit.tests",
"automatic_snapshots": true,
"domain_records_href": "https://api.gandi.net/v5/livedns/domains/unit.tests/records",
"domain_href": "https://api.gandi.net/v5/livedns/domains/unit.tests"
}

View File

@@ -1,245 +0,0 @@
{
"rrsets": [
{
"name": "unit.tests",
"type": "A",
"ttl": 300,
"resource_records": [
{
"content": [
"1.2.3.4"
]
},
{
"content": [
"1.2.3.5"
]
}
]
},
{
"name": "unit.tests",
"type": "NS",
"ttl": 300,
"resource_records": [
{
"content": [
"ns2.gcdn.services"
]
},
{
"content": [
"ns1.gcorelabs.net"
]
}
]
},
{
"name": "_imap._tcp",
"type": "SRV",
"ttl": 600,
"resource_records": [
{
"content": [
0,
0,
0,
"."
]
}
]
},
{
"name": "_pop3._tcp",
"type": "SRV",
"ttl": 600,
"resource_records": [
{
"content": [
0,
0,
0,
"."
]
}
]
},
{
"name": "_srv._tcp",
"type": "SRV",
"ttl": 600,
"resource_records": [
{
"content": [
12,
20,
30,
"foo-2.unit.tests"
]
},
{
"content": [
10,
20,
30,
"foo-1.unit.tests"
]
}
]
},
{
"name": "aaaa.unit.tests",
"type": "AAAA",
"ttl": 600,
"resource_records": [
{
"content": [
"2601:644:500:e210:62f8:1dff:feb8:947a"
]
}
]
},
{
"name": "cname.unit.tests",
"type": "CNAME",
"ttl": 300,
"resource_records": [
{
"content": [
"unit.tests."
]
}
]
},
{
"name": "excluded.unit.tests",
"type": "CNAME",
"ttl": 3600,
"resource_records": [
{
"content": [
"unit.tests."
]
}
]
},
{
"name": "mx.unit.tests",
"type": "MX",
"ttl": 300,
"resource_records": [
{
"content": [
40,
"smtp-1.unit.tests."
]
},
{
"content": [
20,
"smtp-2.unit.tests."
]
},
{
"content": [
30,
"smtp-3.unit.tests."
]
},
{
"content": [
10,
"smtp-4.unit.tests."
]
}
]
},
{
"name": "ptr.unit.tests.",
"type": "PTR",
"ttl": 300,
"resource_records": [
{
"content": [
"foo.bar.com"
]
}
]
},
{
"name": "sub.unit.tests",
"type": "NS",
"ttl": 3600,
"resource_records": [
{
"content": [
"6.2.3.4"
]
},
{
"content": [
"7.2.3.4"
]
}
]
},
{
"name": "txt.unit.tests",
"type": "TXT",
"ttl": 600,
"resource_records": [
{
"content": [
"Bah bah black sheep"
]
},
{
"content": [
"have you any wool."
]
},
{
"content": [
"v=DKIM1;k=rsa;s=email;h=sha256;p=A/kinda+of/long/string+with+numb3rs"
]
}
]
},
{
"name": "www.unit.tests.",
"type": "A",
"ttl": 300,
"resource_records": [
{
"content": [
"2.2.3.6"
]
}
]
},
{
"name": "www.sub.unit.tests.",
"type": "A",
"ttl": 300,
"resource_records": [
{
"content": [
"2.2.3.6"
]
}
]
},
{
"name": "spf.sub.unit.tests.",
"type": "SPF",
"ttl": 600,
"resource_records": [
{
"content": [
"v=spf1 ip4:192.168.0.1/16-all"
]
}
]
}
]
}

View File

@@ -1,428 +0,0 @@
{
"rrsets": [
{
"name": "unit.tests",
"type": "A",
"ttl": 300,
"resource_records": [
{
"content": [
"1.2.3.4"
]
}
]
},
{
"name": "unit.tests",
"type": "NS",
"ttl": 300,
"resource_records": [
{
"content": [
"ns2.gcdn.services"
]
},
{
"content": [
"ns1.gcorelabs.net"
]
}
]
},
{
"name": "_imap._tcp",
"type": "SRV",
"ttl": 1200,
"resource_records": [
{
"content": [
0,
0,
0,
"."
]
}
]
},
{
"name": "_pop3._tcp",
"type": "SRV",
"ttl": 1200,
"resource_records": [
{
"content": [
0,
0,
0,
"."
]
}
]
},
{
"name": "_srv._tcp",
"type": "SRV",
"ttl": 1200,
"resource_records": [
{
"content": [
12,
20,
30,
"foo-2.unit.tests."
]
},
{
"content": [
10,
20,
30,
"foo-1.unit.tests."
]
}
]
},
{
"name": "aaaa.unit.tests",
"type": "AAAA",
"ttl": 600,
"resource_records": [
{
"content": [
"2601:644:500:e210:62f8:1dff:feb8:947a"
]
}
]
},
{
"name": "cname.unit.tests",
"type": "CNAME",
"ttl": 300,
"resource_records": [
{
"content": [
"unit.tests."
]
}
]
},
{
"name": "mx.unit.tests",
"type": "MX",
"ttl": 600,
"resource_records": [
{
"content": [
40,
"smtp-1.unit.tests."
]
},
{
"content": [
20,
"smtp-2.unit.tests."
]
}
]
},
{
"name": "ptr.unit.tests.",
"type": "PTR",
"ttl": 300,
"resource_records": [
{
"content": [
"foo.bar.com"
]
}
]
},
{
"name": "sub.unit.tests",
"type": "NS",
"ttl": 300,
"resource_records": [
{
"content": [
"6.2.3.4"
]
},
{
"content": [
"7.2.3.4"
]
}
]
},
{
"name": "txt.unit.tests",
"type": "TXT",
"ttl": 300,
"resource_records": [
{
"content": [
"\"Bah bah black sheep\""
]
},
{
"content": [
"\"have you any wool.\""
]
},
{
"content": [
"\"v=DKIM1;k=rsa;s=email;h=sha256;p=A/kinda+of/long/string+with+numb3rs\""
]
}
]
},
{
"name": "www.unit.tests.",
"type": "A",
"ttl": 300,
"resource_records": [
{
"content": [
"2.2.3.6"
]
}
]
},
{
"name": "www.sub.unit.tests.",
"type": "A",
"ttl": 300,
"resource_records": [
{
"content": [
"2.2.3.6"
]
}
]
},
{
"name": "geo-A-single.unit.tests.",
"type": "A",
"ttl": 300,
"filters": [
{
"type": "geodns"
},
{
"limit": 1,
"strict": false,
"type": "default"
},
{
"limit": 1,
"type": "first_n"
}
],
"resource_records": [
{
"content": [
"7.7.7.7"
],
"meta": {
"countries": [
"RU"
]
}
},
{
"content": [
"8.8.8.8"
],
"meta": {
"countries": [
"RU"
]
}
},
{
"content": [
"9.9.9.9"
],
"meta": {
"continents": [
"EU"
]
}
},
{
"content": [
"10.10.10.10"
],
"meta": {
"default": true
}
}
]
},
{
"name": "geo-no-def.unit.tests.",
"type": "A",
"ttl": 300,
"filters": [
{
"type": "geodns"
},
{
"limit": 1,
"strict": false,
"type": "default"
},
{
"limit": 1,
"type": "first_n"
}
],
"resource_records": [
{
"content": [
"7.7.7.7"
],
"meta": {
"countries": [
"RU"
]
}
}
]
},
{
"name": "geo-CNAME.unit.tests.",
"type": "CNAME",
"ttl": 300,
"filters": [
{
"type": "geodns"
},
{
"limit": 1,
"strict": false,
"type": "default"
},
{
"limit": 1,
"type": "first_n"
}
],
"resource_records": [
{
"content": [
"ru-1.unit.tests"
],
"meta": {
"countries": [
"RU"
]
}
},
{
"content": [
"ru-2.unit.tests"
],
"meta": {
"countries": [
"RU"
]
}
},
{
"content": [
"eu.unit.tests"
],
"meta": {
"continents": [
"EU"
]
}
},
{
"content": [
"any.unit.tests."
],
"meta": {
"default": true
}
}
]
},
{
"name": "geo-ignore-len-filters.unit.tests.",
"type": "A",
"ttl": 300,
"filters": [
{
"limit": 1,
"type": "first_n"
},
{
"limit": 1,
"strict": false,
"type": "default"
}
],
"resource_records": [
{
"content": [
"7.7.7.7"
]
}
]
},
{
"name": "geo-ignore-types.unit.tests.",
"type": "A",
"ttl": 300,
"filters": [
{
"type": "geodistance"
},
{
"limit": 1,
"type": "first_n"
},
{
"limit": 1,
"strict": false,
"type": "default"
}
],
"resource_records": [
{
"content": [
"7.7.7.7"
]
}
]
},
{
"name": "geo-ignore-limits.unit.tests.",
"type": "A",
"ttl": 300,
"filters": [
{
"type": "geodns"
},
{
"limit": 2,
"strict": false,
"type": "default"
},
{
"limit": 1,
"type": "first_n"
}
],
"resource_records": [
{
"content": [
"7.7.7.7"
]
}
]
}
]
}

View File

@@ -1,27 +0,0 @@
{
"id": 27757,
"name": "unit.test",
"nx_ttl": 300,
"retry": 5400,
"refresh": 0,
"expiry": 1209600,
"contact": "support@gcorelabs.com",
"serial": 1614752868,
"primary_server": "ns1.gcorelabs.net",
"records": [
{
"id": 12419,
"name": "unit.test",
"type": "ns",
"ttl": 300,
"short_answers": [
"[ns2.gcdn.services]",
"[ns1.gcorelabs.net]"
]
}
],
"dns_servers": [
"ns1.gcorelabs.net",
"ns2.gcdn.services"
]
}

View File

@@ -5,372 +5,17 @@
from __future__ import absolute_import, division, print_function, \
unicode_literals
from mock import Mock, call
from os.path import dirname, join
from requests import HTTPError
from requests_mock import ANY, mock as requests_mock
from unittest import TestCase
from octodns.record import Record
from octodns.provider.gandi import GandiProvider, GandiClientBadRequest, \
GandiClientUnauthorized, GandiClientForbidden, GandiClientNotFound, \
GandiClientUnknownDomainName
from octodns.provider.yaml import YamlProvider
from octodns.zone import Zone
# Just for coverage
import octodns.provider.fastdns
# Quell warnings
octodns.provider.fastdns
class TestGandiProvider(TestCase):
expected = Zone('unit.tests.', [])
source = YamlProvider('test', join(dirname(__file__), 'config'))
source.populate(expected)
class TestGandiShim(TestCase):
# We remove this record from the test zone as Gandi API reject it
# (rightfully).
expected._remove_record(Record.new(expected, 'sub', {
'ttl': 1800,
'type': 'NS',
'values': [
'6.2.3.4.',
'7.2.3.4.'
]
}))
def test_populate(self):
provider = GandiProvider('test_id', 'token')
# 400 - Bad Request.
with requests_mock() as mock:
mock.get(ANY, status_code=400,
text='{"status": "error", "errors": [{"location": '
'"body", "name": "items", "description": '
'"\'6.2.3.4.\': invalid hostname (param: '
'{\'rrset_type\': u\'NS\', \'rrset_ttl\': 3600, '
'\'rrset_name\': u\'sub\', \'rrset_values\': '
'[u\'6.2.3.4.\', u\'7.2.3.4.\']})"}, {"location": '
'"body", "name": "items", "description": '
'"\'7.2.3.4.\': invalid hostname (param: '
'{\'rrset_type\': u\'NS\', \'rrset_ttl\': 3600, '
'\'rrset_name\': u\'sub\', \'rrset_values\': '
'[u\'6.2.3.4.\', u\'7.2.3.4.\']})"}]}')
with self.assertRaises(GandiClientBadRequest) as ctx:
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertIn('"status": "error"', str(ctx.exception))
# 401 - Unauthorized.
with requests_mock() as mock:
mock.get(ANY, status_code=401,
text='{"code":401,"message":"The server could not verify '
'that you authorized to access the document you '
'requested. Either you supplied the wrong '
'credentials (e.g., bad api key), or your access '
'token has expired","object":"HTTPUnauthorized",'
'"cause":"Unauthorized"}')
with self.assertRaises(GandiClientUnauthorized) as ctx:
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertIn('"cause":"Unauthorized"', str(ctx.exception))
# 403 - Forbidden.
with requests_mock() as mock:
mock.get(ANY, status_code=403,
text='{"code":403,"message":"Access was denied to this '
'resource.","object":"HTTPForbidden","cause":'
'"Forbidden"}')
with self.assertRaises(GandiClientForbidden) as ctx:
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertIn('"cause":"Forbidden"', str(ctx.exception))
# 404 - Not Found.
with requests_mock() as mock:
mock.get(ANY, status_code=404,
text='{"code": 404, "message": "The resource could not '
'be found.", "object": "HTTPNotFound", "cause": '
'"Not Found"}')
with self.assertRaises(GandiClientNotFound) as ctx:
zone = Zone('unit.tests.', [])
provider._client.zone(zone)
self.assertIn('"cause": "Not Found"', str(ctx.exception))
# General error
with requests_mock() as mock:
mock.get(ANY, status_code=502, text='Things caught fire')
with self.assertRaises(HTTPError) as ctx:
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals(502, ctx.exception.response.status_code)
# No diffs == no changes
with requests_mock() as mock:
base = 'https://api.gandi.net/v5/livedns/domains/unit.tests' \
'/records'
with open('tests/fixtures/gandi-no-changes.json') as fh:
mock.get(base, text=fh.read())
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals(16, len(zone.records))
changes = self.expected.changes(zone, provider)
self.assertEquals(0, len(changes))
del provider._zone_records[zone.name]
# Default Gandi zone file.
with requests_mock() as mock:
base = 'https://api.gandi.net/v5/livedns/domains/unit.tests' \
'/records'
with open('tests/fixtures/gandi-records.json') as fh:
mock.get(base, text=fh.read())
zone = Zone('unit.tests.', [])
provider.populate(zone)
self.assertEquals(11, len(zone.records))
changes = self.expected.changes(zone, provider)
self.assertEquals(24, len(changes))
# 2nd populate makes no network calls/all from cache
again = Zone('unit.tests.', [])
provider.populate(again)
self.assertEquals(11, len(again.records))
# bust the cache
del provider._zone_records[zone.name]
def test_apply(self):
provider = GandiProvider('test_id', 'token')
# Zone does not exists but can be created.
with requests_mock() as mock:
mock.get(ANY, status_code=404,
text='{"code": 404, "message": "The resource could not '
'be found.", "object": "HTTPNotFound", "cause": '
'"Not Found"}')
mock.post(ANY, status_code=201,
text='{"message": "Domain Created"}')
plan = provider.plan(self.expected)
provider.apply(plan)
# Zone does not exists and can't be created.
with requests_mock() as mock:
mock.get(ANY, status_code=404,
text='{"code": 404, "message": "The resource could not '
'be found.", "object": "HTTPNotFound", "cause": '
'"Not Found"}')
mock.post(ANY, status_code=404,
text='{"code": 404, "message": "The resource could not '
'be found.", "object": "HTTPNotFound", "cause": '
'"Not Found"}')
with self.assertRaises((GandiClientNotFound,
GandiClientUnknownDomainName)) as ctx:
plan = provider.plan(self.expected)
provider.apply(plan)
self.assertIn('This domain is not registered at Gandi.',
str(ctx.exception))
resp = Mock()
resp.json = Mock()
provider._client._request = Mock(return_value=resp)
with open('tests/fixtures/gandi-zone.json') as fh:
zone = fh.read()
# non-existent domain
resp.json.side_effect = [
GandiClientNotFound(resp), # no zone in populate
GandiClientNotFound(resp), # no domain during apply
zone
]
plan = provider.plan(self.expected)
# No root NS, no ignored, no excluded, no LOC
n = len(self.expected.records) - 6
self.assertEquals(n, len(plan.changes))
self.assertEquals(n, provider.apply(plan))
self.assertFalse(plan.exists)
provider._client._request.assert_has_calls([
call('GET', '/livedns/domains/unit.tests/records'),
call('GET', '/livedns/domains/unit.tests'),
call('POST', '/livedns/domains', data={
'fqdn': 'unit.tests',
'zone': {}
}),
call('POST', '/livedns/domains/unit.tests/records', data={
'rrset_name': 'www.sub',
'rrset_ttl': 300,
'rrset_type': 'A',
'rrset_values': ['2.2.3.6']
}),
call('POST', '/livedns/domains/unit.tests/records', data={
'rrset_name': 'www',
'rrset_ttl': 300,
'rrset_type': 'A',
'rrset_values': ['2.2.3.6']
}),
call('POST', '/livedns/domains/unit.tests/records', data={
'rrset_name': 'txt',
'rrset_ttl': 600,
'rrset_type': 'TXT',
'rrset_values': [
'Bah bah black sheep',
'have you any wool.',
'v=DKIM1;k=rsa;s=email;h=sha256;p=A/kinda+of/long/string'
'+with+numb3rs'
]
}),
call('POST', '/livedns/domains/unit.tests/records', data={
'rrset_name': 'spf',
'rrset_ttl': 600,
'rrset_type': 'SPF',
'rrset_values': ['v=spf1 ip4:192.168.0.1/16-all']
}),
call('POST', '/livedns/domains/unit.tests/records', data={
'rrset_name': 'ptr',
'rrset_ttl': 300,
'rrset_type': 'PTR',
'rrset_values': ['foo.bar.com.']
}),
call('POST', '/livedns/domains/unit.tests/records', data={
'rrset_name': 'mx',
'rrset_ttl': 300,
'rrset_type': 'MX',
'rrset_values': [
'10 smtp-4.unit.tests.',
'20 smtp-2.unit.tests.',
'30 smtp-3.unit.tests.',
'40 smtp-1.unit.tests.'
]
}),
call('POST', '/livedns/domains/unit.tests/records', data={
'rrset_name': 'excluded',
'rrset_ttl': 3600,
'rrset_type': 'CNAME',
'rrset_values': ['unit.tests.']
}),
call('POST', '/livedns/domains/unit.tests/records', data={
'rrset_name': 'dname',
'rrset_ttl': 300,
'rrset_type': 'DNAME',
'rrset_values': ['unit.tests.']
}),
call('POST', '/livedns/domains/unit.tests/records', data={
'rrset_name': 'cname',
'rrset_ttl': 300,
'rrset_type': 'CNAME',
'rrset_values': ['unit.tests.']
}),
call('POST', '/livedns/domains/unit.tests/records', data={
'rrset_name': 'aaaa',
'rrset_ttl': 600,
'rrset_type': 'AAAA',
'rrset_values': ['2601:644:500:e210:62f8:1dff:feb8:947a']
}),
call('POST', '/livedns/domains/unit.tests/records', data={
'rrset_name': '_srv._tcp',
'rrset_ttl': 600,
'rrset_type': 'SRV',
'rrset_values': [
'10 20 30 foo-1.unit.tests.',
'12 20 30 foo-2.unit.tests.'
]
}),
call('POST', '/livedns/domains/unit.tests/records', data={
'rrset_name': '_pop3._tcp',
'rrset_ttl': 600,
'rrset_type': 'SRV',
'rrset_values': [
'0 0 0 .',
]
}),
call('POST', '/livedns/domains/unit.tests/records', data={
'rrset_name': '_imap._tcp',
'rrset_ttl': 600,
'rrset_type': 'SRV',
'rrset_values': [
'0 0 0 .',
]
}),
call('POST', '/livedns/domains/unit.tests/records', data={
'rrset_name': '@',
'rrset_ttl': 3600,
'rrset_type': 'SSHFP',
'rrset_values': [
'1 1 7491973e5f8b39d5327cd4e08bc81b05f7710b49',
'1 1 bf6b6825d2977c511a475bbefb88aad54a92ac73'
]
}),
call('POST', '/livedns/domains/unit.tests/records', data={
'rrset_name': '@',
'rrset_ttl': 3600,
'rrset_type': 'CAA',
'rrset_values': ['0 issue "ca.unit.tests"']
}),
call('POST', '/livedns/domains/unit.tests/records', data={
'rrset_name': '@',
'rrset_ttl': 300,
'rrset_type': 'A',
'rrset_values': ['1.2.3.4', '1.2.3.5']
})
])
# expected number of total calls
self.assertEquals(19, provider._client._request.call_count)
provider._client._request.reset_mock()
# delete 1 and update 1
provider._client.zone_records = Mock(return_value=[
{
'rrset_name': 'www',
'rrset_ttl': 300,
'rrset_type': 'A',
'rrset_values': ['1.2.3.4']
},
{
'rrset_name': 'www',
'rrset_ttl': 300,
'rrset_type': 'A',
'rrset_values': ['2.2.3.4']
},
{
'rrset_name': 'ttl',
'rrset_ttl': 600,
'rrset_type': 'A',
'rrset_values': ['3.2.3.4']
}
])
# Domain exists, we don't care about return
resp.json.side_effect = ['{}']
wanted = Zone('unit.tests.', [])
wanted.add_record(Record.new(wanted, 'ttl', {
'ttl': 300,
'type': 'A',
'value': '3.2.3.4'
}))
plan = provider.plan(wanted)
self.assertTrue(plan.exists)
self.assertEquals(2, len(plan.changes))
self.assertEquals(2, provider.apply(plan))
# recreate for update, and deletes for the 2 parts of the other
provider._client._request.assert_has_calls([
call('DELETE', '/livedns/domains/unit.tests/records/www/A'),
call('DELETE', '/livedns/domains/unit.tests/records/ttl/A'),
call('POST', '/livedns/domains/unit.tests/records', data={
'rrset_name': 'ttl',
'rrset_ttl': 300,
'rrset_type': 'A',
'rrset_values': ['3.2.3.4']
})
], any_order=True)
def test_missing(self):
with self.assertRaises(ModuleNotFoundError):
from octodns.provider.gandi import GandiProvider
GandiProvider

View File

@@ -2,670 +2,15 @@
#
#
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
from __future__ import absolute_import, division, print_function, \
unicode_literals
from mock import Mock, call
from os.path import dirname, join
from requests_mock import ANY, mock as requests_mock
from unittest import TestCase
from octodns.record import Record, Update, Delete, Create
from octodns.provider.gcore import (
GCoreProvider,
GCoreClientBadRequest,
GCoreClientNotFound,
GCoreClientException,
)
from octodns.provider.yaml import YamlProvider
from octodns.zone import Zone
class TestGCoreShim(TestCase):
class TestGCoreProvider(TestCase):
expected = Zone("unit.tests.", [])
source = YamlProvider("test", join(dirname(__file__), "config"))
source.populate(expected)
default_filters = [
{"type": "geodns"},
{
"type": "default",
"limit": 1,
"strict": False,
},
{"type": "first_n", "limit": 1},
]
def test_populate(self):
provider = GCoreProvider("test_id", token="token")
# TC: 400 - Bad Request.
with requests_mock() as mock:
mock.get(ANY, status_code=400, text='{"error":"bad body"}')
with self.assertRaises(GCoreClientBadRequest) as ctx:
zone = Zone("unit.tests.", [])
provider.populate(zone)
self.assertIn('"error":"bad body"', str(ctx.exception))
# TC: 404 - Not Found.
with requests_mock() as mock:
mock.get(
ANY, status_code=404, text='{"error":"zone is not found"}'
)
with self.assertRaises(GCoreClientNotFound) as ctx:
zone = Zone("unit.tests.", [])
provider._client.zone(zone.name)
self.assertIn(
'"error":"zone is not found"', str(ctx.exception)
)
# TC: General error
with requests_mock() as mock:
mock.get(ANY, status_code=500, text="Things caught fire")
with self.assertRaises(GCoreClientException) as ctx:
zone = Zone("unit.tests.", [])
provider.populate(zone)
self.assertEqual("Things caught fire", str(ctx.exception))
# TC: No credentials or token error
with requests_mock() as mock:
with self.assertRaises(ValueError) as ctx:
GCoreProvider("test_id")
self.assertEqual(
"either token or login & password must be set",
str(ctx.exception),
)
# TC: Auth with login password
with requests_mock() as mock:
def match_body(request):
return {"username": "foo", "password": "bar"} == request.json()
auth_url = "http://api/auth/jwt/login"
mock.post(
auth_url,
additional_matcher=match_body,
status_code=200,
json={"access": "access"},
)
providerPassword = GCoreProvider(
"test_id",
url="http://dns",
auth_url="http://api",
login="foo",
password="bar",
)
assert mock.called
# make sure token passed in header
zone_rrset_url = "http://dns/zones/unit.tests/rrsets?all=true"
mock.get(
zone_rrset_url,
request_headers={"Authorization": "Bearer access"},
status_code=404,
)
zone = Zone("unit.tests.", [])
assert not providerPassword.populate(zone)
# TC: No diffs == no changes
with requests_mock() as mock:
base = "https://dnsapi.gcorelabs.com/v2/zones/unit.tests/rrsets"
with open("tests/fixtures/gcore-no-changes.json") as fh:
mock.get(base, text=fh.read())
zone = Zone("unit.tests.", [])
provider.populate(zone)
self.assertEqual(14, len(zone.records))
self.assertEqual(
{
"",
"_imap._tcp",
"_pop3._tcp",
"_srv._tcp",
"aaaa",
"cname",
"excluded",
"mx",
"ptr",
"sub",
"txt",
"www",
"www.sub",
},
{r.name for r in zone.records},
)
changes = self.expected.changes(zone, provider)
self.assertEqual(0, len(changes))
# TC: 4 create (dynamic) + 1 removed + 7 modified
with requests_mock() as mock:
base = "https://dnsapi.gcorelabs.com/v2/zones/unit.tests/rrsets"
with open("tests/fixtures/gcore-records.json") as fh:
mock.get(base, text=fh.read())
zone = Zone("unit.tests.", [])
provider.populate(zone)
self.assertEqual(16, len(zone.records))
changes = self.expected.changes(zone, provider)
self.assertEqual(11, len(changes))
self.assertEqual(
3, len([c for c in changes if isinstance(c, Create)])
)
self.assertEqual(
1, len([c for c in changes if isinstance(c, Delete)])
)
self.assertEqual(
7, len([c for c in changes if isinstance(c, Update)])
)
# TC: no pools can be built
with requests_mock() as mock:
base = "https://dnsapi.gcorelabs.com/v2/zones/unit.tests/rrsets"
mock.get(
base,
json={
"rrsets": [
{
"name": "unit.tests.",
"type": "A",
"ttl": 300,
"filters": self.default_filters,
"resource_records": [{"content": ["7.7.7.7"]}],
}
]
},
)
zone = Zone("unit.tests.", [])
with self.assertRaises(RuntimeError) as ctx:
provider.populate(zone)
self.assertTrue(
str(ctx.exception).startswith(
"filter is enabled, but no pools where built for"
),
f"{ctx.exception} - is not start from desired text",
)
def test_apply(self):
provider = GCoreProvider("test_id", url="http://api", token="token")
# TC: Zone does not exists but can be created.
with requests_mock() as mock:
mock.get(
ANY, status_code=404, text='{"error":"zone is not found"}'
)
mock.post(ANY, status_code=200, text='{"id":1234}')
plan = provider.plan(self.expected)
provider.apply(plan)
# TC: Zone does not exists and can't be created.
with requests_mock() as mock:
mock.get(
ANY, status_code=404, text='{"error":"zone is not found"}'
)
mock.post(
ANY,
status_code=400,
text='{"error":"parent zone is already'
' occupied by another client"}',
)
with self.assertRaises(
(GCoreClientNotFound, GCoreClientBadRequest)
) as ctx:
plan = provider.plan(self.expected)
provider.apply(plan)
self.assertIn(
"parent zone is already occupied by another client",
str(ctx.exception),
)
resp = Mock()
resp.json = Mock()
provider._client._request = Mock(return_value=resp)
with open("tests/fixtures/gcore-zone.json") as fh:
zone = fh.read()
# non-existent domain
resp.json.side_effect = [
GCoreClientNotFound(resp), # no zone in populate
GCoreClientNotFound(resp), # no domain during apply
zone,
]
plan = provider.plan(self.expected)
# TC: create all
self.assertEqual(13, len(plan.changes))
self.assertEqual(13, provider.apply(plan))
self.assertFalse(plan.exists)
provider._client._request.assert_has_calls(
[
call(
"GET",
"http://api/zones/unit.tests/rrsets",
params={"all": "true"},
),
call("GET", "http://api/zones/unit.tests"),
call("POST", "http://api/zones", data={"name": "unit.tests"}),
call(
"POST",
"http://api/zones/unit.tests/www.sub.unit.tests./A",
data={
"ttl": 300,
"resource_records": [{"content": ["2.2.3.6"]}],
},
),
call(
"POST",
"http://api/zones/unit.tests/www.unit.tests./A",
data={
"ttl": 300,
"resource_records": [{"content": ["2.2.3.6"]}],
},
),
call(
"POST",
"http://api/zones/unit.tests/txt.unit.tests./TXT",
data={
"ttl": 600,
"resource_records": [
{"content": ["Bah bah black sheep"]},
{"content": ["have you any wool."]},
{
"content": [
"v=DKIM1;k=rsa;s=email;h=sha256;p=A/kinda+"
"of/long/string+with+numb3rs"
]
},
],
},
),
call(
"POST",
"http://api/zones/unit.tests/sub.unit.tests./NS",
data={
"ttl": 3600,
"resource_records": [
{"content": ["6.2.3.4."]},
{"content": ["7.2.3.4."]},
],
},
),
call(
"POST",
"http://api/zones/unit.tests/ptr.unit.tests./PTR",
data={
"ttl": 300,
"resource_records": [
{"content": ["foo.bar.com."]},
],
},
),
call(
"POST",
"http://api/zones/unit.tests/mx.unit.tests./MX",
data={
"ttl": 300,
"resource_records": [
{"content": [10, "smtp-4.unit.tests."]},
{"content": [20, "smtp-2.unit.tests."]},
{"content": [30, "smtp-3.unit.tests."]},
{"content": [40, "smtp-1.unit.tests."]},
],
},
),
call(
"POST",
"http://api/zones/unit.tests/excluded.unit.tests./CNAME",
data={
"ttl": 3600,
"resource_records": [{"content": ["unit.tests."]}],
},
),
call(
"POST",
"http://api/zones/unit.tests/cname.unit.tests./CNAME",
data={
"ttl": 300,
"resource_records": [{"content": ["unit.tests."]}],
},
),
call(
"POST",
"http://api/zones/unit.tests/aaaa.unit.tests./AAAA",
data={
"ttl": 600,
"resource_records": [
{
"content": [
"2601:644:500:e210:62f8:1dff:feb8:947a"
]
}
],
},
),
call(
"POST",
"http://api/zones/unit.tests/_srv._tcp.unit.tests./SRV",
data={
"ttl": 600,
"resource_records": [
{"content": [10, 20, 30, "foo-1.unit.tests."]},
{"content": [12, 20, 30, "foo-2.unit.tests."]},
],
},
),
call(
"POST",
"http://api/zones/unit.tests/_pop3._tcp.unit.tests./SRV",
data={
"ttl": 600,
"resource_records": [{"content": [0, 0, 0, "."]}],
},
),
call(
"POST",
"http://api/zones/unit.tests/_imap._tcp.unit.tests./SRV",
data={
"ttl": 600,
"resource_records": [{"content": [0, 0, 0, "."]}],
},
),
call(
"POST",
"http://api/zones/unit.tests/unit.tests./A",
data={
"ttl": 300,
"resource_records": [
{"content": ["1.2.3.4"]},
{"content": ["1.2.3.5"]},
],
},
),
]
)
# expected number of total calls
self.assertEqual(16, provider._client._request.call_count)
# TC: delete 1 and update 1
provider._client._request.reset_mock()
provider._client.zone_records = Mock(
return_value=[
{
"name": "www",
"ttl": 300,
"type": "A",
"resource_records": [{"content": ["1.2.3.4"]}],
},
{
"name": "ttl",
"ttl": 600,
"type": "A",
"resource_records": [{"content": ["3.2.3.4"]}],
},
]
)
# Domain exists, we don't care about return
resp.json.side_effect = ["{}"]
wanted = Zone("unit.tests.", [])
wanted.add_record(
Record.new(
wanted, "ttl", {"ttl": 300, "type": "A", "value": "3.2.3.4"}
)
)
plan = provider.plan(wanted)
self.assertTrue(plan.exists)
self.assertEqual(2, len(plan.changes))
self.assertEqual(2, provider.apply(plan))
provider._client._request.assert_has_calls(
[
call(
"DELETE", "http://api/zones/unit.tests/www.unit.tests./A"
),
call(
"PUT",
"http://api/zones/unit.tests/ttl.unit.tests./A",
data={
"ttl": 300,
"resource_records": [{"content": ["3.2.3.4"]}],
},
),
]
)
# TC: create dynamics
provider._client._request.reset_mock()
provider._client.zone_records = Mock(return_value=[])
# Domain exists, we don't care about return
resp.json.side_effect = ["{}"]
wanted = Zone("unit.tests.", [])
wanted.add_record(
Record.new(
wanted,
"geo-simple",
{
"ttl": 300,
"type": "A",
"value": "3.3.3.3",
"dynamic": {
"pools": {
"pool-1": {
"fallback": "other",
"values": [
{"value": "1.1.1.1"},
{"value": "1.1.1.2"},
],
},
"pool-2": {
"fallback": "other",
"values": [
{"value": "2.2.2.1"},
],
},
"other": {"values": [{"value": "3.3.3.3"}]},
},
"rules": [
{"pool": "pool-1", "geos": ["EU-RU"]},
{"pool": "pool-2", "geos": ["EU"]},
{"pool": "other"},
],
},
},
),
)
wanted.add_record(
Record.new(
wanted,
"geo-defaults",
{
"ttl": 300,
"type": "A",
"value": "3.2.3.4",
"dynamic": {
"pools": {
"pool-1": {
"values": [
{"value": "2.2.2.1"},
],
},
},
"rules": [
{"pool": "pool-1", "geos": ["EU"]},
],
},
},
),
)
wanted.add_record(
Record.new(
wanted,
"cname-smpl",
{
"ttl": 300,
"type": "CNAME",
"value": "en.unit.tests.",
"dynamic": {
"pools": {
"pool-1": {
"fallback": "other",
"values": [
{"value": "ru-1.unit.tests."},
{"value": "ru-2.unit.tests."},
],
},
"pool-2": {
"fallback": "other",
"values": [
{"value": "eu.unit.tests."},
],
},
"other": {"values": [{"value": "en.unit.tests."}]},
},
"rules": [
{"pool": "pool-1", "geos": ["EU-RU"]},
{"pool": "pool-2", "geos": ["EU"]},
{"pool": "other"},
],
},
},
),
)
wanted.add_record(
Record.new(
wanted,
"cname-dflt",
{
"ttl": 300,
"type": "CNAME",
"value": "en.unit.tests.",
"dynamic": {
"pools": {
"pool-1": {
"values": [
{"value": "eu.unit.tests."},
],
},
},
"rules": [
{"pool": "pool-1", "geos": ["EU"]},
],
},
},
),
)
plan = provider.plan(wanted)
self.assertTrue(plan.exists)
self.assertEqual(4, len(plan.changes))
self.assertEqual(4, provider.apply(plan))
provider._client._request.assert_has_calls(
[
call(
"POST",
"http://api/zones/unit.tests/geo-simple.unit.tests./A",
data={
"ttl": 300,
"filters": self.default_filters,
"resource_records": [
{
"content": ["1.1.1.1"],
"meta": {"countries": ["RU"]},
},
{
"content": ["1.1.1.2"],
"meta": {"countries": ["RU"]},
},
{
"content": ["2.2.2.1"],
"meta": {"continents": ["EU"]},
},
{
"content": ["3.3.3.3"],
"meta": {"default": True},
},
],
},
),
call(
"POST",
"http://api/zones/unit.tests/geo-defaults.unit.tests./A",
data={
"ttl": 300,
"filters": self.default_filters,
"resource_records": [
{
"content": ["2.2.2.1"],
"meta": {"continents": ["EU"]},
},
{
"content": ["3.2.3.4"],
},
],
},
),
call(
"POST",
"http://api/zones/unit.tests/cname-smpl.unit.tests./CNAME",
data={
"ttl": 300,
"filters": self.default_filters,
"resource_records": [
{
"content": ["ru-1.unit.tests."],
"meta": {"countries": ["RU"]},
},
{
"content": ["ru-2.unit.tests."],
"meta": {"countries": ["RU"]},
},
{
"content": ["eu.unit.tests."],
"meta": {"continents": ["EU"]},
},
{
"content": ["en.unit.tests."],
"meta": {"default": True},
},
],
},
),
call(
"POST",
"http://api/zones/unit.tests/cname-dflt.unit.tests./CNAME",
data={
"ttl": 300,
"filters": self.default_filters,
"resource_records": [
{
"content": ["eu.unit.tests."],
"meta": {"continents": ["EU"]},
},
{
"content": ["en.unit.tests."],
},
],
},
),
]
)
def test_missing(self):
with self.assertRaises(ModuleNotFoundError):
from octodns.provider.gcore import GCoreProvider
GCoreProvider

View File

@@ -5,458 +5,12 @@
from __future__ import absolute_import, division, print_function, \
unicode_literals
from octodns.record import Create, Delete, Update, Record
from octodns.provider.googlecloud import GoogleCloudProvider
from octodns.zone import Zone
from octodns.provider.base import Plan, BaseProvider
from unittest import TestCase
from mock import Mock, patch, PropertyMock
zone = Zone(name='unit.tests.', sub_zones=[])
octo_records = []
octo_records.append(Record.new(zone, '', {
'ttl': 0,
'type': 'A',
'values': ['1.2.3.4', '10.10.10.10']}))
octo_records.append(Record.new(zone, 'a', {
'ttl': 1,
'type': 'A',
'values': ['1.2.3.4', '1.1.1.1']}))
octo_records.append(Record.new(zone, 'aa', {
'ttl': 9001,
'type': 'A',
'values': ['1.2.4.3']}))
octo_records.append(Record.new(zone, 'aaa', {
'ttl': 2,
'type': 'A',
'values': ['1.1.1.3']}))
octo_records.append(Record.new(zone, 'cname', {
'ttl': 3,
'type': 'CNAME',
'value': 'a.unit.tests.'}))
octo_records.append(Record.new(zone, 'mx1', {
'ttl': 3,
'type': 'MX',
'values': [{
'priority': 10,
'value': 'mx1.unit.tests.',
}, {
'priority': 20,
'value': 'mx2.unit.tests.',
}]}))
octo_records.append(Record.new(zone, 'mx2', {
'ttl': 3,
'type': 'MX',
'values': [{
'priority': 10,
'value': 'mx1.unit.tests.',
}]}))
octo_records.append(Record.new(zone, '', {
'ttl': 4,
'type': 'NS',
'values': ['ns1.unit.tests.', 'ns2.unit.tests.']}))
octo_records.append(Record.new(zone, 'foo', {
'ttl': 5,
'type': 'NS',
'value': 'ns1.unit.tests.'}))
octo_records.append(Record.new(zone, '_srv._tcp', {
'ttl': 6,
'type': 'SRV',
'values': [{
'priority': 10,
'weight': 20,
'port': 30,
'target': 'foo-1.unit.tests.',
}, {
'priority': 12,
'weight': 30,
'port': 30,
'target': 'foo-2.unit.tests.',
}]}))
octo_records.append(Record.new(zone, '_srv2._tcp', {
'ttl': 7,
'type': 'SRV',
'values': [{
'priority': 12,
'weight': 17,
'port': 1,
'target': 'srvfoo.unit.tests.',
}]}))
octo_records.append(Record.new(zone, 'txt1', {
'ttl': 8,
'type': 'TXT',
'value': 'txt singleton test'}))
octo_records.append(Record.new(zone, 'txt2', {
'ttl': 9,
'type': 'TXT',
'values': ['txt multiple test', 'txt multiple test 2']}))
octo_records.append(Record.new(zone, 'naptr', {
'ttl': 9,
'type': 'NAPTR',
'values': [{
'order': 100,
'preference': 10,
'flags': 'S',
'service': 'SIP+D2U',
'regexp': "!^.*$!sip:customer-service@unit.tests!",
'replacement': '_sip._udp.unit.tests.'
}]}))
octo_records.append(Record.new(zone, 'caa', {
'ttl': 9,
'type': 'CAA',
'value': {
'flags': 0,
'tag': 'issue',
'value': 'ca.unit.tests',
}}))
for record in octo_records:
zone.add_record(record)
# This is the format which the google API likes.
resource_record_sets = [
('unit.tests.', u'A', 0, [u'1.2.3.4', u'10.10.10.10']),
(u'a.unit.tests.', u'A', 1, [u'1.1.1.1', u'1.2.3.4']),
(u'aa.unit.tests.', u'A', 9001, [u'1.2.4.3']),
(u'aaa.unit.tests.', u'A', 2, [u'1.1.1.3']),
(u'cname.unit.tests.', u'CNAME', 3, [u'a.unit.tests.']),
(u'mx1.unit.tests.', u'MX', 3,
[u'10 mx1.unit.tests.', u'20 mx2.unit.tests.']),
(u'mx2.unit.tests.', u'MX', 3, [u'10 mx1.unit.tests.']),
('unit.tests.', u'NS', 4, [u'ns1.unit.tests.', u'ns2.unit.tests.']),
(u'foo.unit.tests.', u'NS', 5, [u'ns1.unit.tests.']),
(u'_srv._tcp.unit.tests.', u'SRV', 6,
[u'10 20 30 foo-1.unit.tests.', u'12 30 30 foo-2.unit.tests.']),
(u'_srv2._tcp.unit.tests.', u'SRV', 7, [u'12 17 1 srvfoo.unit.tests.']),
(u'txt1.unit.tests.', u'TXT', 8, [u'txt singleton test']),
(u'txt2.unit.tests.', u'TXT', 9,
[u'txt multiple test', u'txt multiple test 2']),
(u'naptr.unit.tests.', u'NAPTR', 9, [
u'100 10 "S" "SIP+D2U" "!^.*$!sip:customer-service@unit.tests!"'
u' _sip._udp.unit.tests.']),
(u'caa.unit.tests.', u'CAA', 9, [u'0 issue ca.unit.tests'])
]
class TestGoogleCloudShim(TestCase):
class DummyResourceRecordSet:
def __init__(self, record_name, record_type, ttl, rrdatas):
self.name = record_name
self.record_type = record_type
self.ttl = ttl
self.rrdatas = rrdatas
def __eq__(self, other):
try:
return self.name == other.name \
and self.record_type == other.record_type \
and self.ttl == other.ttl \
and sorted(self.rrdatas) == sorted(other.rrdatas)
except:
return False
def __repr__(self):
return f"{self.name} {self.record_type} {self.ttl} {self.rrdatas}"
def __hash__(self):
return hash(repr(self))
class DummyGoogleCloudZone:
def __init__(self, dns_name, name=""):
self.dns_name = dns_name
self.name = name
def resource_record_set(self, *args):
return DummyResourceRecordSet(*args)
def list_resource_record_sets(self, *args):
pass
def create(self, *args, **kwargs):
pass
class DummyIterator:
"""Returns a mock DummyIterator object to use in testing.
This is because API calls for google cloud DNS, if paged, contains a
"next_page_token", which can be used to grab a subsequent
iterator with more results.
:type return: DummyIterator
"""
def __init__(self, list_of_stuff, page_token=None):
self.iterable = iter(list_of_stuff)
self.next_page_token = page_token
def __iter__(self):
return self
# python2
def next(self):
return next(self.iterable)
# python3
def __next__(self):
return next(self.iterable)
class TestGoogleCloudProvider(TestCase):
@patch('octodns.provider.googlecloud.dns')
def _get_provider(*args):
'''Returns a mock GoogleCloudProvider object to use in testing.
:type return: GoogleCloudProvider
'''
return GoogleCloudProvider(id=1, project="mock")
@patch('octodns.provider.googlecloud.dns')
def test___init__(self, *_):
self.assertIsInstance(GoogleCloudProvider(id=1,
credentials_file="test",
project="unit test"),
BaseProvider)
self.assertIsInstance(GoogleCloudProvider(id=1),
BaseProvider)
@patch('octodns.provider.googlecloud.time.sleep')
@patch('octodns.provider.googlecloud.dns')
def test__apply(self, *_):
class DummyDesired:
def __init__(self, name, changes):
self.name = name
self.changes = changes
apply_z = Zone("unit.tests.", [])
create_r = Record.new(apply_z, '', {
'ttl': 0,
'type': 'A',
'values': ['1.2.3.4', '10.10.10.10']})
delete_r = Record.new(apply_z, 'a', {
'ttl': 1,
'type': 'A',
'values': ['1.2.3.4', '1.1.1.1']})
update_existing_r = Record.new(apply_z, 'aa', {
'ttl': 9001,
'type': 'A',
'values': ['1.2.4.3']})
update_new_r = Record.new(apply_z, 'aa', {
'ttl': 666,
'type': 'A',
'values': ['1.4.3.2']})
gcloud_zone_mock = DummyGoogleCloudZone("unit.tests.", "unit-tests")
status_mock = Mock()
return_values_for_status = iter(
["pending"] * 11 + ['done', 'done'])
type(status_mock).status = PropertyMock(
side_effect=lambda: next(return_values_for_status))
gcloud_zone_mock.changes = Mock(return_value=status_mock)
provider = self._get_provider()
provider.gcloud_client = Mock()
provider._gcloud_zones = {"unit.tests.": gcloud_zone_mock}
desired = Mock()
desired.name = "unit.tests."
changes = []
changes.append(Create(create_r))
changes.append(Delete(delete_r))
changes.append(Update(existing=update_existing_r, new=update_new_r))
provider.apply(Plan(
existing=[update_existing_r, delete_r],
desired=desired,
changes=changes,
exists=True
))
calls_mock = gcloud_zone_mock.changes.return_value
mocked_calls = []
for mock_call in calls_mock.add_record_set.mock_calls:
mocked_calls.append(mock_call[1][0])
self.assertEqual(mocked_calls, [
DummyResourceRecordSet(
'unit.tests.', 'A', 0, ['1.2.3.4', '10.10.10.10']),
DummyResourceRecordSet(
'aa.unit.tests.', 'A', 666, ['1.4.3.2'])
])
mocked_calls2 = []
for mock_call in calls_mock.delete_record_set.mock_calls:
mocked_calls2.append(mock_call[1][0])
self.assertEqual(mocked_calls2, [
DummyResourceRecordSet(
'a.unit.tests.', 'A', 1, ['1.2.3.4', '1.1.1.1']),
DummyResourceRecordSet(
'aa.unit.tests.', 'A', 9001, ['1.2.4.3'])
])
type(status_mock).status = "pending"
with self.assertRaises(RuntimeError):
provider.apply(Plan(
existing=[update_existing_r, delete_r],
desired=desired,
changes=changes,
exists=True
))
unsupported_change = Mock()
unsupported_change.__len__ = Mock(return_value=1)
type_mock = Mock()
type_mock._type = "A"
unsupported_change.record = type_mock
mock_plan = Mock()
type(mock_plan).desired = PropertyMock(return_value=DummyDesired(
"dummy name", []))
type(mock_plan).changes = [unsupported_change]
with self.assertRaises(RuntimeError):
provider.apply(mock_plan)
def test__get_gcloud_client(self):
provider = self._get_provider()
self.assertIsInstance(provider, GoogleCloudProvider)
@patch('octodns.provider.googlecloud.dns')
def test_populate(self, _):
def _get_mock_zones(page_token=None):
if not page_token:
return DummyIterator([
DummyGoogleCloudZone('example.com.'),
], page_token="MOCK_PAGE_TOKEN")
elif page_token == "MOCK_PAGE_TOKEN":
return DummyIterator([
DummyGoogleCloudZone('example2.com.'),
], page_token="MOCK_PAGE_TOKEN2")
return DummyIterator([
google_cloud_zone
])
def _get_mock_record_sets(page_token=None):
if not page_token:
return DummyIterator(
[DummyResourceRecordSet(*v) for v in
resource_record_sets[:3]], page_token="MOCK_PAGE_TOKEN")
elif page_token == "MOCK_PAGE_TOKEN":
return DummyIterator(
[DummyResourceRecordSet(*v) for v in
resource_record_sets[3:5]], page_token="MOCK_PAGE_TOKEN2")
return DummyIterator(
[DummyResourceRecordSet(*v) for v in resource_record_sets[5:]])
google_cloud_zone = DummyGoogleCloudZone('unit.tests.')
provider = self._get_provider()
provider.gcloud_client.list_zones = Mock(side_effect=_get_mock_zones)
google_cloud_zone.list_resource_record_sets = Mock(
side_effect=_get_mock_record_sets)
self.assertEqual(provider.gcloud_zones.get("unit.tests.").dns_name,
"unit.tests.")
test_zone = Zone('unit.tests.', [])
exists = provider.populate(test_zone)
self.assertTrue(exists)
# test_zone gets fed the same records as zone does, except it's in
# the format returned by google API, so after populate they should look
# exactly the same.
self.assertEqual(test_zone.records, zone.records)
test_zone2 = Zone('nonexistent.zone.', [])
exists = provider.populate(test_zone2, False, False)
self.assertFalse(exists)
self.assertEqual(len(test_zone2.records), 0,
msg="Zone should not get records from wrong domain")
provider.SUPPORTS = set()
test_zone3 = Zone('unit.tests.', [])
provider.populate(test_zone3)
self.assertEqual(len(test_zone3.records), 0)
@patch('octodns.provider.googlecloud.dns')
def test_populate_corner_cases(self, _):
provider = self._get_provider()
test_zone = Zone('unit.tests.', [])
not_same_fqdn = DummyResourceRecordSet(
'unit.tests.gr', u'A', 0, [u'1.2.3.4']),
provider._get_gcloud_records = Mock(
side_effect=[not_same_fqdn])
provider._gcloud_zones = {
"unit.tests.": DummyGoogleCloudZone("unit.tests.", "unit-tests")}
provider.populate(test_zone)
self.assertEqual(len(test_zone.records), 1)
self.assertEqual(test_zone.records.pop().fqdn,
u'unit.tests.gr.unit.tests.')
def test__get_gcloud_zone(self):
provider = self._get_provider()
provider.gcloud_client = Mock()
provider.gcloud_client.list_zones = Mock(
return_value=DummyIterator([]))
self.assertIsNone(provider.gcloud_zones.get("nonexistent.zone"),
msg="Check that nonexistent zones return None when"
"there's no create=True flag")
def test__get_rrsets(self):
provider = self._get_provider()
dummy_gcloud_zone = DummyGoogleCloudZone("unit.tests")
for octo_record in octo_records:
_rrset_func = getattr(
provider, f'_rrset_for_{octo_record._type}')
self.assertEqual(
_rrset_func(dummy_gcloud_zone, octo_record).record_type,
octo_record._type
)
def test__create_zone(self):
provider = self._get_provider()
provider.gcloud_client = Mock()
provider.gcloud_client.list_zones = Mock(
return_value=DummyIterator([]))
mock_zone = provider._create_gcloud_zone("nonexistent.zone.mock")
mock_zone.create.assert_called()
provider.gcloud_client.zone.assert_called()
def test__create_zone_ip6_arpa(self):
def _create_dummy_zone(name, dns_name):
return DummyGoogleCloudZone(name=name, dns_name=dns_name)
provider = self._get_provider()
provider.gcloud_client = Mock()
provider.gcloud_client.zone = Mock(side_effect=_create_dummy_zone)
mock_zone = \
provider._create_gcloud_zone('0.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa')
self.assertRegexpMatches(mock_zone.name, '^[a-z][a-z0-9-]*[a-z0-9]$')
self.assertEqual(len(mock_zone.name), 63)
def test_semicolon_fixup(self):
provider = self._get_provider()
self.assertEquals({
'values': ['abcd\\; ef\\;g', 'hij\\; klm\\;n']
}, provider._data_for_TXT(
DummyResourceRecordSet(
'unit.tests.', 'TXT', 0, ['abcd; ef;g', 'hij\\; klm\\;n'])
))
def test_missing(self):
with self.assertRaises(ModuleNotFoundError):
from octodns.provider.googlecloud import GoogleCloudProvider
GoogleCloudProvider