1
0
mirror of https://github.com/github/octodns.git synced 2024-05-11 05:55:00 +00:00

f-strings for PwerDnsProvider

This commit is contained in:
Ross McFarland
2021-09-04 18:09:09 -07:00
parent e1cdfbdc88
commit 7f6b08b460

View File

@@ -38,8 +38,8 @@ class PowerDnsBaseProvider(BaseProvider):
def _request(self, method, path, data=None): def _request(self, method, path, data=None):
self.log.debug('_request: method=%s, path=%s', method, path) self.log.debug('_request: method=%s, path=%s', method, path)
url = '{}://{}:{}/api/v1/servers/localhost/{}' \ url = f'{self.scheme}://{self.host}:{self.port}/api/v1/servers/' \
.format(self.scheme, self.host, self.port, path).rstrip("/") f'localhost/{path}'.rstrip('/')
# Strip trailing / from url. # Strip trailing / from url.
resp = self._sess.request(method, url, json=data, timeout=self.timeout) resp = self._sess.request(method, url, json=data, timeout=self.timeout)
self.log.debug('_request: status=%d', resp.status_code) self.log.debug('_request: status=%d', resp.status_code)
@@ -204,8 +204,7 @@ class PowerDnsBaseProvider(BaseProvider):
except HTTPError as e: except HTTPError as e:
if e.response.status_code == 401: if e.response.status_code == 401:
# Nicer error message for auth problems # Nicer error message for auth problems
raise Exception('PowerDNS unauthorized host={}' raise Exception(f'PowerDNS unauthorized host={self.host}')
.format(self.host))
raise raise
version = resp.json()['version'] version = resp.json()['version']
@@ -241,14 +240,13 @@ class PowerDnsBaseProvider(BaseProvider):
resp = None resp = None
try: try:
resp = self._get('zones/{}'.format(zone.name)) resp = self._get(f'zones/{zone.name}')
self.log.debug('populate: loaded') self.log.debug('populate: loaded')
except HTTPError as e: except HTTPError as e:
error = self._get_error(e) error = self._get_error(e)
if e.response.status_code == 401: if e.response.status_code == 401:
# Nicer error message for auth problems # Nicer error message for auth problems
raise Exception('PowerDNS unauthorized host={}' raise Exception(f'PowerDNS unauthorized host={self.host}')
.format(self.host))
elif e.response.status_code == 404 \ elif e.response.status_code == 404 \
and self.check_status_not_found: and self.check_status_not_found:
# 404 means powerdns doesn't know anything about the requested # 404 means powerdns doesn't know anything about the requested
@@ -275,7 +273,7 @@ class PowerDnsBaseProvider(BaseProvider):
_type = rrset['type'] _type = rrset['type']
if _type == 'SOA': if _type == 'SOA':
continue continue
data_for = getattr(self, '_data_for_{}'.format(_type)) data_for = getattr(self, f'_data_for_{_type}')
record_name = zone.hostname_from_fqdn(rrset['name']) record_name = zone.hostname_from_fqdn(rrset['name'])
record = Record.new(zone, record_name, data_for(rrset), record = Record.new(zone, record_name, data_for(rrset),
source=self, lenient=lenient) source=self, lenient=lenient)
@@ -295,7 +293,7 @@ class PowerDnsBaseProvider(BaseProvider):
def _records_for_CAA(self, record): def _records_for_CAA(self, record):
return [{ return [{
'content': '{} {} "{}"'.format(v.flags, v.tag, v.value), 'content': f'{v.flags} {v.tag} "{v.value}"',
'disabled': False 'disabled': False
} for v in record.values] } for v in record.values]
@@ -307,7 +305,7 @@ class PowerDnsBaseProvider(BaseProvider):
_records_for_PTR = _records_for_single _records_for_PTR = _records_for_single
def _records_for_quoted(self, record): def _records_for_quoted(self, record):
return [{'content': '"{}"'.format(v), 'disabled': False} return [{'content': f'"{v}"', 'disabled': False}
for v in record.values] for v in record.values]
_records_for_SPF = _records_for_quoted _records_for_SPF = _records_for_quoted
@@ -336,36 +334,32 @@ class PowerDnsBaseProvider(BaseProvider):
def _records_for_MX(self, record): def _records_for_MX(self, record):
return [{ return [{
'content': '{} {}'.format(v.preference, v.exchange), 'content': f'{v.preference} {v.exchange}',
'disabled': False 'disabled': False
} for v in record.values] } for v in record.values]
def _records_for_NAPTR(self, record): def _records_for_NAPTR(self, record):
return [{ return [{
'content': '{} {} "{}" "{}" "{}" {}'.format(v.order, v.preference, 'content': f'{v.order} {v.preference} "{v.flags}" "{v.service}" '
v.flags, v.service, f'"{v.regexp}" {v.replacement}',
v.regexp,
v.replacement),
'disabled': False 'disabled': False
} for v in record.values] } for v in record.values]
def _records_for_SSHFP(self, record): def _records_for_SSHFP(self, record):
return [{ return [{
'content': '{} {} {}'.format(v.algorithm, v.fingerprint_type, 'content': f'{v.algorithm} {v.fingerprint_type} {v.fingerprint}',
v.fingerprint),
'disabled': False 'disabled': False
} for v in record.values] } for v in record.values]
def _records_for_SRV(self, record): def _records_for_SRV(self, record):
return [{ return [{
'content': '{} {} {} {}'.format(v.priority, v.weight, v.port, 'content': f'{v.priority} {v.weight} {v.port} {v.target}',
v.target),
'disabled': False 'disabled': False
} for v in record.values] } for v in record.values]
def _mod_Create(self, change): def _mod_Create(self, change):
new = change.new new = change.new
records_for = getattr(self, '_records_for_{}'.format(new._type)) records_for = getattr(self, f'_records_for_{new._type}')
return { return {
'name': new.fqdn, 'name': new.fqdn,
'type': new._type, 'type': new._type,
@@ -378,7 +372,7 @@ class PowerDnsBaseProvider(BaseProvider):
def _mod_Delete(self, change): def _mod_Delete(self, change):
existing = change.existing existing = change.existing
records_for = getattr(self, '_records_for_{}'.format(existing._type)) records_for = getattr(self, f'_records_for_{existing._type}')
return { return {
'name': existing.fqdn, 'name': existing.fqdn,
'type': existing._type, 'type': existing._type,
@@ -429,7 +423,7 @@ class PowerDnsBaseProvider(BaseProvider):
mods = [] mods = []
for change in changes: for change in changes:
class_name = change.__class__.__name__ class_name = change.__class__.__name__
mods.append(getattr(self, '_mod_{}'.format(class_name))(change)) mods.append(getattr(self, f'_mod_{class_name}')(change))
# Ensure that any DELETE modifications always occur before any REPLACE # Ensure that any DELETE modifications always occur before any REPLACE
# modifications. This ensures that an A record can be replaced by a # modifications. This ensures that an A record can be replaced by a
@@ -439,8 +433,7 @@ class PowerDnsBaseProvider(BaseProvider):
self.log.debug('_apply: sending change request') self.log.debug('_apply: sending change request')
try: try:
self._patch('zones/{}'.format(desired.name), self._patch(f'zones/{desired.name}', data={'rrsets': mods})
data={'rrsets': mods})
self.log.debug('_apply: patched') self.log.debug('_apply: patched')
except HTTPError as e: except HTTPError as e:
error = self._get_error(e) error = self._get_error(e)
@@ -510,7 +503,7 @@ class PowerDnsProvider(PowerDnsBaseProvider):
def __init__(self, id, host, api_key, port=8081, nameserver_values=None, def __init__(self, id, host, api_key, port=8081, nameserver_values=None,
nameserver_ttl=600, nameserver_ttl=600,
*args, **kwargs): *args, **kwargs):
self.log = logging.getLogger('PowerDnsProvider[{}]'.format(id)) self.log = logging.getLogger(f'PowerDnsProvider[{id}]')
self.log.debug('__init__: id=%s, host=%s, port=%d, ' self.log.debug('__init__: id=%s, host=%s, port=%d, '
'nameserver_values=%s, nameserver_ttl=%d', 'nameserver_values=%s, nameserver_ttl=%d',
id, host, port, nameserver_values, nameserver_ttl) id, host, port, nameserver_values, nameserver_ttl)