mirror of
https://github.com/interdotlink/ripe-updater.git
synced 2024-05-06 15:54:57 +00:00
171 lines
5.0 KiB
Python
171 lines
5.0 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import json
|
|
import os
|
|
import smtplib
|
|
import socket
|
|
from email.message import EmailMessage
|
|
|
|
from ipaddress import ip_network
|
|
from .exceptions import ErrorSmallPrefix, NotRoutedNetwork
|
|
from .log_manager import LogManager
|
|
from .configuration import *
|
|
|
|
# Dictionary RIPE Documentaion of response codes for each action
|
|
RIPE_DOCU_URLS = {'POST': 'https://github.com/RIPE-NCC/whois/wiki/WHOIS-REST-API-Create',
|
|
'PUT': 'https://github.com/RIPE-NCC/whois/wiki/WHOIS-REST-API-Update',
|
|
'DELETE': 'https://github.com/RIPE-NCC/whois/wiki/WHOIS-REST-API-Delete'}
|
|
|
|
logger = LogManager().logger
|
|
|
|
|
|
def read_json_file(template):
|
|
"""
|
|
Reading JSON template file and return dict
|
|
"""
|
|
if not os.path.exists(template):
|
|
msg = f'No template file {template}'
|
|
logger.critical(msg)
|
|
raise RuntimeError(msg)
|
|
|
|
with open(template, 'r') as f:
|
|
dict = json.load(f)
|
|
return dict
|
|
|
|
|
|
def flatten_ripe_attributes(obj):
|
|
"""
|
|
flattens ripe attributes
|
|
"""
|
|
ripe_attributes = find('attributes.attribute', obj)
|
|
return {attr.get('name'): attr.get('value') for attr in ripe_attributes}
|
|
|
|
|
|
def format_ripe_object(obj, prefix=''):
|
|
"""
|
|
expects a ripe_object dict and return a flat string representation
|
|
"""
|
|
string = ''
|
|
if obj:
|
|
for key, value in flatten_ripe_attributes(obj).items():
|
|
string += f'{prefix}{key}:\t\t{value}\n'
|
|
|
|
return string
|
|
|
|
|
|
def is_v6(prefix):
|
|
return ip_network(prefix).version == 6
|
|
|
|
|
|
def validate_prefix(prefix):
|
|
"""
|
|
validate if prefix is valid to be pushed to RIPE DB
|
|
"""
|
|
logger.debug('Processing prefix to formating it to valid RIPE format')
|
|
network = ip_network(prefix)
|
|
|
|
if is_v6(prefix):
|
|
|
|
# Check if prefix big enough; bigger than defined
|
|
if network.prefixlen > int(SMALLEST_PREFIX_V6):
|
|
raise ErrorSmallPrefix(f'This prefix is too small update only bigger than {SMALLEST_PREFIX_V6}')
|
|
return False
|
|
|
|
# Check if private network; no need to continue
|
|
if network.is_loopback or \
|
|
network.is_reserved or \
|
|
network.is_private or \
|
|
network.is_multicast or \
|
|
network.is_link_local or \
|
|
not network.is_global:
|
|
raise NotRoutedNetwork('This is not routed prefix, it will be ignored')
|
|
return False
|
|
else:
|
|
# Check if prefix big enough; bigger than defined
|
|
if network.prefixlen > int(SMALLEST_PREFIX_V4):
|
|
raise ErrorSmallPrefix(f'This prefix is too small update only bigger than {SMALLEST_PREFIX_V4}')
|
|
|
|
# Check if private network; no need to continue
|
|
if network.is_loopback or \
|
|
network.is_reserved or \
|
|
network.is_private or \
|
|
network.is_multicast:
|
|
raise NotRoutedNetwork('This is not routed prefix, it will be ignored')
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def format_cidr(prefix):
|
|
"""
|
|
change format of prefix to legacy CIDR notation
|
|
"""
|
|
network = ip_network(prefix)
|
|
|
|
return f'{network[0]} - {network[-1]}'
|
|
|
|
|
|
def notify(ripe_object, action, prefix, username, response_code, ripe_errors):
|
|
"""
|
|
This function uses smtplib and sendmail to send mails to the local MTA
|
|
MTA forward it to your recipient. Added to support alarming,
|
|
when something is not working.
|
|
"""
|
|
# Read hostname and IP Address to send out within mail
|
|
hostname = socket.gethostname()
|
|
try:
|
|
ipaddr = socket.gethostbyname(hostname)
|
|
except socket.gaierror as err:
|
|
ipaddr = 'unknown'
|
|
# Building mail content
|
|
msg = EmailMessage()
|
|
ripe_errors = '\n'.join(ripe_errors)
|
|
status = 'succeeded' if response_code == 200 else 'failed'
|
|
text = f"""{action} inetnum {prefix} has {status}:
|
|
{ripe_errors}
|
|
----------------
|
|
{ripe_object}
|
|
Result: {status}
|
|
Action: {action}
|
|
Response code: {str(response_code)}
|
|
Response codes doc: {RIPE_DOCU_URLS[action]}
|
|
Triggered by: {username}
|
|
FQDN: {hostname}
|
|
RIPE-Service source IP: {ipaddr}
|
|
----------------
|
|
\nFor more informations check logs
|
|
\nYour awesome RIPE-Service!"""
|
|
|
|
msg.set_content(text)
|
|
msg['Subject'] = f'{action} {prefix} has {status}'
|
|
msg['From'] = SENDER_MAIL
|
|
msg['To'] = RECIPIENT_MAIL
|
|
|
|
logger.debug(msg)
|
|
|
|
if MAIL_REPORT == 'yes':
|
|
try:
|
|
logger.debug(f'opening SMTP connection to {SMTP}')
|
|
with smtplib.SMTP(SMTP) as server:
|
|
if SMTP_STARTTLS == 'yes':
|
|
server.starttls()
|
|
server.send_message(msg)
|
|
except (ConnectionRefusedError, socket.timeout, OSError, smtplib.SMTPServerDisconnected) as err:
|
|
msg = f'unable to connect to SMTP server: {SMTP} - {err}'
|
|
logger.critical(msg)
|
|
raise RuntimeError(msg)
|
|
|
|
|
|
def find(path, obj):
|
|
"""
|
|
find an element in a dictionary using a path
|
|
path: 'elem1.elem2.elem3'
|
|
obj: {'elem1': {'elem2': {'elem3': 'foo'}}}
|
|
"""
|
|
for elem in path.split('.'):
|
|
obj = obj.get(elem)
|
|
if obj is None:
|
|
break
|
|
|
|
return obj
|