1
0
mirror of https://github.com/librenms/librenms-agent.git synced 2024-05-09 09:54:52 +00:00

Cleaning up certificate.py code and adding cert_location support for self-signed certificates (#447)

This commit is contained in:
bnerickson
2022-12-10 05:14:41 -08:00
committed by GitHub
parent f0d1b10e57
commit 3d9d1e18a2

View File

@@ -13,8 +13,7 @@ CONFIGFILE = "/etc/snmp/certificate.json"
# }
def get_certificate_data(domain, port=443):
def get_certificate_data(domain, cert_location, port=443):
context = ssl.create_default_context()
conn = context.wrap_socket(
socket.socket(socket.AF_INET),
@@ -25,13 +24,22 @@ def get_certificate_data(domain, port=443):
error_msg = None
ssl_info = {}
# Load certificate for self-signed certificates if provided
if cert_location:
try:
context.load_verify_locations(cert_location)
except (FileNotFoundError, ssl.SSLError, PermissionError) as err:
error_msg = err
return ssl_info, error_msg
try:
conn.connect((domain, port))
ssl_info = conn.getpeercert()
except ConnectionRefusedError as e:
error_msg = e
except ConnectionRefusedError as err:
error_msg = err
# Manage expired certificates
except ssl.SSLCertVerificationError as e:
except ssl.SSLCertVerificationError:
# Arbitrary start date
ssl_info["notBefore"] = "Jan 1 00:00:00 2020 GMT"
# End date is now (we don't have the real one but the certificate is expired)
@@ -41,53 +49,60 @@ def get_certificate_data(domain, port=443):
return ssl_info, error_msg
output = {}
output["error"] = 0
output["errorString"] = ""
output["version"] = 1
def main():
output = {}
output["error"] = 0
output["errorString"] = ""
output["version"] = 1
with open(CONFIGFILE, "r") as json_file:
try:
configfile = json.load(json_file)
except json.decoder.JSONDecodeError as e:
output["error"] = 1
output["errorString"] = "Configfile Error: '%s'" % e
if not output["error"]:
output_data_list = []
for domain in configfile["domains"]:
output_data = {}
if "port" not in domain.keys():
domain["port"] = 443
certificate_data, error_msg = get_certificate_data(
domain["fqdn"], domain["port"]
)
output_data["cert_name"] = domain["fqdn"]
if not error_msg:
ssl_date_format = r"%b %d %H:%M:%S %Y %Z"
validity_end = datetime.datetime.strptime(
certificate_data["notAfter"], ssl_date_format
)
validity_start = datetime.datetime.strptime(
certificate_data["notBefore"], ssl_date_format
)
cert_age = datetime.datetime.now() - validity_start
cert_still_valid = validity_end - datetime.datetime.now()
output_data["age"] = cert_age.days
output_data["remaining_days"] = cert_still_valid.days
else:
output_data["age"] = None
output_data["remaining_days"] = None
with open(CONFIGFILE, "r") as json_file:
try:
configfile = json.load(json_file)
except json.decoder.JSONDecodeError as err:
output["error"] = 1
output["errorString"] = "%s: %s" % (domain["fqdn"], error_msg)
output["errorString"] = "Configfile Error: '%s'" % err
output_data_list.append(output_data)
if not output["error"]:
output_data_list = []
for domain in configfile["domains"]:
output_data = {}
output["data"] = output_data_list
if "port" not in domain.keys():
domain["port"] = 443
if "cert_location" not in domain.keys():
domain["cert_location"] = None
certificate_data, error_msg = get_certificate_data(
domain["fqdn"], domain["cert_location"], domain["port"]
)
print(json.dumps(output))
output_data["cert_name"] = domain["fqdn"]
if not error_msg:
ssl_date_format = r"%b %d %H:%M:%S %Y %Z"
validity_end = datetime.datetime.strptime(
certificate_data["notAfter"], ssl_date_format
)
validity_start = datetime.datetime.strptime(
certificate_data["notBefore"], ssl_date_format
)
cert_age = datetime.datetime.now() - validity_start
cert_still_valid = validity_end - datetime.datetime.now()
output_data["age"] = cert_age.days
output_data["remaining_days"] = cert_still_valid.days
else:
output_data["age"] = None
output_data["remaining_days"] = None
output["error"] = 1
output["errorString"] = "%s: %s" % (domain["fqdn"], error_msg)
output_data_list.append(output_data)
output["data"] = output_data_list
print(json.dumps(output))
if __name__ == "__main__":
main()