From 3d9d1e18a259295d0a846a7eba3150a8d7a06875 Mon Sep 17 00:00:00 2001 From: bnerickson Date: Sat, 10 Dec 2022 05:14:41 -0800 Subject: [PATCH] Cleaning up certificate.py code and adding cert_location support for self-signed certificates (#447) --- snmp/certificate.py | 115 +++++++++++++++++++++++++------------------- 1 file changed, 65 insertions(+), 50 deletions(-) diff --git a/snmp/certificate.py b/snmp/certificate.py index c141afc..a1fa87d 100755 --- a/snmp/certificate.py +++ b/snmp/certificate.py @@ -13,8 +13,7 @@ CONFIGFILE = "/etc/snmp/certificate.json" # } -def get_certificate_data(domain, port=443): - +def get_certificate_data(domain, cert_location, port=443): context = ssl.create_default_context() conn = context.wrap_socket( socket.socket(socket.AF_INET), @@ -25,13 +24,22 @@ def get_certificate_data(domain, port=443): error_msg = None ssl_info = {} + + # Load certificate for self-signed certificates if provided + if cert_location: + try: + context.load_verify_locations(cert_location) + except (FileNotFoundError, ssl.SSLError, PermissionError) as err: + error_msg = err + return ssl_info, error_msg + try: conn.connect((domain, port)) ssl_info = conn.getpeercert() - except ConnectionRefusedError as e: - error_msg = e + except ConnectionRefusedError as err: + error_msg = err # Manage expired certificates - except ssl.SSLCertVerificationError as e: + except ssl.SSLCertVerificationError: # Arbitrary start date ssl_info["notBefore"] = "Jan 1 00:00:00 2020 GMT" # End date is now (we don't have the real one but the certificate is expired) @@ -41,53 +49,60 @@ def get_certificate_data(domain, port=443): return ssl_info, error_msg -output = {} -output["error"] = 0 -output["errorString"] = "" -output["version"] = 1 +def main(): + output = {} + output["error"] = 0 + output["errorString"] = "" + output["version"] = 1 -with open(CONFIGFILE, "r") as json_file: - try: - configfile = json.load(json_file) - except json.decoder.JSONDecodeError as e: - output["error"] = 1 - output["errorString"] = "Configfile Error: '%s'" % e - -if not output["error"]: - output_data_list = [] - for domain in configfile["domains"]: - output_data = {} - - if "port" not in domain.keys(): - domain["port"] = 443 - certificate_data, error_msg = get_certificate_data( - domain["fqdn"], domain["port"] - ) - - output_data["cert_name"] = domain["fqdn"] - - if not error_msg: - ssl_date_format = r"%b %d %H:%M:%S %Y %Z" - validity_end = datetime.datetime.strptime( - certificate_data["notAfter"], ssl_date_format - ) - validity_start = datetime.datetime.strptime( - certificate_data["notBefore"], ssl_date_format - ) - cert_age = datetime.datetime.now() - validity_start - cert_still_valid = validity_end - datetime.datetime.now() - - output_data["age"] = cert_age.days - output_data["remaining_days"] = cert_still_valid.days - - else: - output_data["age"] = None - output_data["remaining_days"] = None + with open(CONFIGFILE, "r") as json_file: + try: + configfile = json.load(json_file) + except json.decoder.JSONDecodeError as err: output["error"] = 1 - output["errorString"] = "%s: %s" % (domain["fqdn"], error_msg) + output["errorString"] = "Configfile Error: '%s'" % err - output_data_list.append(output_data) + if not output["error"]: + output_data_list = [] + for domain in configfile["domains"]: + output_data = {} - output["data"] = output_data_list + if "port" not in domain.keys(): + domain["port"] = 443 + if "cert_location" not in domain.keys(): + domain["cert_location"] = None + certificate_data, error_msg = get_certificate_data( + domain["fqdn"], domain["cert_location"], domain["port"] + ) -print(json.dumps(output)) + output_data["cert_name"] = domain["fqdn"] + + if not error_msg: + ssl_date_format = r"%b %d %H:%M:%S %Y %Z" + validity_end = datetime.datetime.strptime( + certificate_data["notAfter"], ssl_date_format + ) + validity_start = datetime.datetime.strptime( + certificate_data["notBefore"], ssl_date_format + ) + cert_age = datetime.datetime.now() - validity_start + cert_still_valid = validity_end - datetime.datetime.now() + + output_data["age"] = cert_age.days + output_data["remaining_days"] = cert_still_valid.days + + else: + output_data["age"] = None + output_data["remaining_days"] = None + output["error"] = 1 + output["errorString"] = "%s: %s" % (domain["fqdn"], error_msg) + + output_data_list.append(output_data) + + output["data"] = output_data_list + + print(json.dumps(output)) + + +if __name__ == "__main__": + main()