1
0
mirror of https://github.com/interdotlink/ripe-updater.git synced 2024-05-06 15:54:57 +00:00
Files
Christian Harendt dc08fddac3 initial import
2022-05-13 14:33:30 +02:00

171 lines
5.0 KiB
Python

# -*- coding: utf-8 -*-
import json
import os
import smtplib
import socket
from email.message import EmailMessage
from ipaddress import ip_network
from .exceptions import ErrorSmallPrefix, NotRoutedNetwork
from .log_manager import LogManager
from .configuration import *
# Dictionary RIPE Documentaion of response codes for each action
RIPE_DOCU_URLS = {'POST': 'https://github.com/RIPE-NCC/whois/wiki/WHOIS-REST-API-Create',
'PUT': 'https://github.com/RIPE-NCC/whois/wiki/WHOIS-REST-API-Update',
'DELETE': 'https://github.com/RIPE-NCC/whois/wiki/WHOIS-REST-API-Delete'}
logger = LogManager().logger
def read_json_file(template):
"""
Reading JSON template file and return dict
"""
if not os.path.exists(template):
msg = f'No template file {template}'
logger.critical(msg)
raise RuntimeError(msg)
with open(template, 'r') as f:
dict = json.load(f)
return dict
def flatten_ripe_attributes(obj):
"""
flattens ripe attributes
"""
ripe_attributes = find('attributes.attribute', obj)
return {attr.get('name'): attr.get('value') for attr in ripe_attributes}
def format_ripe_object(obj, prefix=''):
"""
expects a ripe_object dict and return a flat string representation
"""
string = ''
if obj:
for key, value in flatten_ripe_attributes(obj).items():
string += f'{prefix}{key}:\t\t{value}\n'
return string
def is_v6(prefix):
return ip_network(prefix).version == 6
def validate_prefix(prefix):
"""
validate if prefix is valid to be pushed to RIPE DB
"""
logger.debug('Processing prefix to formating it to valid RIPE format')
network = ip_network(prefix)
if is_v6(prefix):
# Check if prefix big enough; bigger than defined
if network.prefixlen > int(SMALLEST_PREFIX_V6):
raise ErrorSmallPrefix(f'This prefix is too small update only bigger than {SMALLEST_PREFIX_V6}')
return False
# Check if private network; no need to continue
if network.is_loopback or \
network.is_reserved or \
network.is_private or \
network.is_multicast or \
network.is_link_local or \
not network.is_global:
raise NotRoutedNetwork('This is not routed prefix, it will be ignored')
return False
else:
# Check if prefix big enough; bigger than defined
if network.prefixlen > int(SMALLEST_PREFIX_V4):
raise ErrorSmallPrefix(f'This prefix is too small update only bigger than {SMALLEST_PREFIX_V4}')
# Check if private network; no need to continue
if network.is_loopback or \
network.is_reserved or \
network.is_private or \
network.is_multicast:
raise NotRoutedNetwork('This is not routed prefix, it will be ignored')
return False
return True
def format_cidr(prefix):
"""
change format of prefix to legacy CIDR notation
"""
network = ip_network(prefix)
return f'{network[0]} - {network[-1]}'
def notify(ripe_object, action, prefix, username, response_code, ripe_errors):
"""
This function uses smtplib and sendmail to send mails to the local MTA
MTA forward it to your recipient. Added to support alarming,
when something is not working.
"""
# Read hostname and IP Address to send out within mail
hostname = socket.gethostname()
try:
ipaddr = socket.gethostbyname(hostname)
except socket.gaierror as err:
ipaddr = 'unknown'
# Building mail content
msg = EmailMessage()
ripe_errors = '\n'.join(ripe_errors)
status = 'succeeded' if response_code == 200 else 'failed'
text = f"""{action} inetnum {prefix} has {status}:
{ripe_errors}
----------------
{ripe_object}
Result: {status}
Action: {action}
Response code: {str(response_code)}
Response codes doc: {RIPE_DOCU_URLS[action]}
Triggered by: {username}
FQDN: {hostname}
RIPE-Service source IP: {ipaddr}
----------------
\nFor more informations check logs
\nYour awesome RIPE-Service!"""
msg.set_content(text)
msg['Subject'] = f'{action} {prefix} has {status}'
msg['From'] = SENDER_MAIL
msg['To'] = RECIPIENT_MAIL
logger.debug(msg)
if MAIL_REPORT == 'yes':
try:
logger.debug(f'opening SMTP connection to {SMTP}')
with smtplib.SMTP(SMTP) as server:
if SMTP_STARTTLS == 'yes':
server.starttls()
server.send_message(msg)
except (ConnectionRefusedError, socket.timeout, OSError, smtplib.SMTPServerDisconnected) as err:
msg = f'unable to connect to SMTP server: {SMTP} - {err}'
logger.critical(msg)
raise RuntimeError(msg)
def find(path, obj):
"""
find an element in a dictionary using a path
path: 'elem1.elem2.elem3'
obj: {'elem1': {'elem2': {'elem3': 'foo'}}}
"""
for elem in path.split('.'):
obj = obj.get(elem)
if obj is None:
break
return obj