diff --git a/tests/definitions_test.py b/tests/definitions_test.py new file mode 100644 index 000000000..00144e21f --- /dev/null +++ b/tests/definitions_test.py @@ -0,0 +1,224 @@ +from test_configuration import COMPONENT_TYPES, IMAGE_FILETYPES, SCHEMAS, KNOWN_SLUGS, ROOT_DIR, USE_LOCAL_KNOWN_SLUGS, NETBOX_DT_LIBRARY_URL, KNOWN_MODULES, USE_UPSTREAM_DIFF, PRECOMMIT_ALL_SWITCHES +import pickle_operations +from yaml_loader import DecimalSafeLoader +from device_types import DeviceType, ModuleType, verify_filename, validate_components +import decimal +import glob +import json +import os +import tempfile +import psutil +from urllib.request import urlopen + +import pytest +import yaml +from jsonschema import Draft4Validator, RefResolver +from jsonschema.exceptions import ValidationError +from git import Repo + +def _get_definition_files(): + """ + Return a list of all definition files within the specified path. + """ + file_list = [] + + for path, schema in SCHEMAS: + # Initialize the schema + with open(f"schema/{schema}") as schema_file: + schema = json.loads(schema_file.read(), parse_float=decimal.Decimal) + + # Validate that the schema exists + assert schema, f"Schema definition for {path} is empty!" + + # Map each definition file to its schema as a tuple (file, schema) + for file in sorted(glob.glob(f"{path}/*/*", recursive=True)): + file_list.append((file, schema, 'skip')) + + return file_list + +def _get_diff_from_upstream(): + file_list = [] + + repo = Repo(f"{os.path.dirname(os.path.abspath(__file__))}/../") + commits_list = list(repo.iter_commits()) + + if "upstream" not in repo.remotes: + repo.create_remote("upstream", NETBOX_DT_LIBRARY_URL) + + upstream = repo.remotes.upstream + upstream.fetch() + changes = upstream.refs.master.commit.diff(repo.head) + changes = changes + repo.index.diff("HEAD") + + for path, schema in SCHEMAS: + # Initialize the schema + with open(f"schema/{schema}") as schema_file: + schema = json.loads(schema_file.read(), parse_float=decimal.Decimal) + + # Validate that the schema exists + assert schema, f"Schema definition for {path} is empty!" + + # Ensure files are either added, renamed, modified or type changed (do not get deleted files) + CHANGE_TYPE_LIST = ['A', 'R', 'M', 'T'] + + # Iterate through changed files + for file in changes: + # Ensure the files are modified or added, this will disclude deleted files + if file.change_type in CHANGE_TYPE_LIST: + # If the file is renamed, ensure we are picking the right schema + if 'R' in file.change_type and path in file.rename_to: + file_list.append((file.rename_to, schema, file.change_type)) + elif path in file.a_path: + file_list.append((file.a_path, schema, file.change_type)) + elif path in file.b_path: + file_list.append((file.b_path, schema, file.change_type)) + + return file_list + +def _get_image_files(): + """ + Return a list of all image files within the specified path and manufacturer. + """ + file_list = [] + + # Map each image file to its manufacturer + for file in sorted(glob.glob(f"elevation-images{os.path.sep}*{os.path.sep}*", recursive=True)): + # Validate that the file extension is valid + assert file.split(os.path.sep)[2].split('.')[-1] in IMAGE_FILETYPES, f"Invalid file extension: {file}" + + # Map each image file to its manufacturer as a tuple (manufacturer, file) + file_list.append((file.split(os.path.sep)[1], file)) + + return file_list + +def _decimal_file_handler(uri): + """ + Handler to work with floating decimals that fail normal validation. + """ + with urlopen(uri) as url: + result = json.loads(url.read().decode("utf-8"), parse_float=decimal.Decimal) + return result + +def test_environment(): + """ + Run basic sanity checks on the environment to ensure tests are running correctly. + """ + # Validate that definition files exist + if definition_files: + pytest.skip("No changes to definition files found.") + +EVALUATE_ALL = False +if any(x in PRECOMMIT_ALL_SWITCHES for x in psutil.Process(os.getppid()).cmdline()): + EVALUATE_ALL = True + +if USE_UPSTREAM_DIFF and not EVALUATE_ALL: + definition_files = _get_diff_from_upstream() +else: + definition_files = _get_definition_files() +image_files = _get_image_files() + +if USE_LOCAL_KNOWN_SLUGS: + KNOWN_SLUGS = pickle_operations.read_pickle_data(f'{ROOT_DIR}/tests/known-slugs.pickle') + KNOWN_MODULES = pickle_operations.read_pickle_data(f'{ROOT_DIR}/tests/known-modules.pickle') +else: + temp_dir = tempfile.TemporaryDirectory() + repo = Repo.clone_from(url=NETBOX_DT_LIBRARY_URL, to_path=temp_dir.name) + KNOWN_SLUGS = pickle_operations.read_pickle_data(f'{temp_dir.name}/tests/known-slugs.pickle') + KNOWN_MODULES = pickle_operations.read_pickle_data(f'{temp_dir.name}/tests/known-modules.pickle') + + +@pytest.mark.parametrize(('file_path', 'schema', 'change_type'), definition_files) +def test_definitions(file_path, schema, change_type): + """ + Validate each definition file using the provided JSON schema and check for duplicate entries. + """ + # Check file extension. Only .yml or .yaml files are supported. + assert file_path.split('.')[-1] in ('yaml', 'yml'), f"Invalid file extension: {file_path}" + + # Read file + with open(file_path) as definition_file: + content = definition_file.read() + + # Check for trailing newline. YAML files must end with an emtpy newline. + assert content.endswith('\n'), "Missing trailing newline" + + # Load YAML data from file + definition = yaml.load(content, Loader=DecimalSafeLoader) + + # Validate YAML definition against the supplied schema + try: + resolver = RefResolver( + f"file://{os.getcwd()}/schema/devicetype.json", + schema, + handlers={"file": _decimal_file_handler}, + ) + # Validate definition against schema + Draft4Validator(schema, resolver=resolver).validate(definition) + except ValidationError as e: + # Schema validation failure. Ensure you are following the proper format. + pytest.fail(f"{file_path} failed validation: {e}", False) + + # Identify if the definition is for a Device or Module + if "device-types" in file_path: + # A device + this_device = DeviceType(definition, file_path, change_type) + else: + # A module + this_device = ModuleType(definition, file_path, change_type) + + # Verify the slug is valid, only if the definition type is a Device + if this_device.isDevice: + assert this_device.verify_slug(KNOWN_SLUGS), pytest.fail(this_device.failureMessage, False) + + # Verify the filename is valid. Must either be the model or part_number. + assert verify_filename(this_device, (KNOWN_MODULES if not this_device.isDevice else None)), pytest.fail(this_device.failureMessage, False) + + # Check for duplicate components within the definition + assert validate_components(COMPONENT_TYPES, this_device), pytest.fail(this_device.failureMessage, False) + + # Check for empty quotes and fail if found + def iterdict(var): + for dict_value in var.values(): + if isinstance(dict_value, dict): + iterdict(dict_value) + if isinstance(dict_value, list): + iterlist(dict_value) + else: + if(isinstance(dict_value, str) and not dict_value): + pytest.fail(f'{file_path} has empty quotes', False) + + def iterlist(var): + for list_value in var: + if isinstance(list_value, dict): + iterdict(list_value) + elif isinstance(list_value, list): + iterlist(list_value) + + # Check for valid power definitions + if this_device.isDevice: + assert this_device.validate_power(), pytest.fail(this_device.failureMessage, False) + + # Check for images if front_image or rear_image is True + if (definition.get('front_image') or definition.get('rear_image')): + # Find images for given manufacturer, with matching device slug (exact match including case) + manufacturer_images = [image[1] for image in image_files if image[0] == file_path.split('/')[1] and os.path.basename(image[1]).split('.')[0] == this_device.get_slug()] + if not manufacturer_images: + pytest.fail(f'{file_path} has Front or Rear Image set to True but no images found for manufacturer/device (slug={this_device.get_slug()})', False) + elif len(manufacturer_images)>2: + pytest.fail(f'More than 2 images found for device with slug {this_device.get_slug()}: {manufacturer_images}', False) + + # If front_image is True, verify that a front image exists + if(definition.get('front_image')): + front_image = [image_path.split('/')[2] for image_path in manufacturer_images if os.path.basename(image_path).split('.')[1] == 'front'] + + if not front_image: + pytest.fail(f'{file_path} has front_image set to True but no matching image found for device ({manufacturer_images})', False) + + # If rear_image is True, verify that a front image exists + if(definition.get('rear_image')): + rear_image = [image_path.split('/')[2] for image_path in manufacturer_images if os.path.basename(image_path).split('.')[1] == 'rear'] + + if not rear_image: + pytest.fail(f'{file_path} has rear_image set to True but no images found for device', False) + + iterdict(definition) diff --git a/tests/device_types.py b/tests/device_types.py new file mode 100644 index 000000000..b99356c0d --- /dev/null +++ b/tests/device_types.py @@ -0,0 +1,242 @@ +import os + + +class DeviceType: + def __new__(cls, *args, **kwargs): + return super().__new__(cls) + + def __init__(self, definition, file_path, change_type): + self.file_path = file_path + self.isDevice = True + self.definition = definition + self.manufacturer = definition.get('manufacturer') + self._slug_manufacturer = self._slugify_manufacturer() + self.slug = definition.get('slug') + self.model = definition.get('model') + self._slug_model = self._slugify_model() + self.part_number = definition.get('part_number', "") + self._slug_part_number = self._slugify_part_number() + self.failureMessage = None + self.change_type = change_type + + def _slugify_manufacturer(self): + return self.manufacturer.casefold().replace(" ", "-").replace("sfp+", "sfpp").replace("poe+", "poep").replace("-+", "-plus-").replace("+", "-plus").replace("_", "-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-").replace("&", "and") + + def get_slug(self): + if hasattr(self, "slug"): + return self.slug + return None + + def _slugify_model(self): + slugified = self.model.casefold().replace(" ", "-").replace("sfp+", "sfpp").replace("poe+", "poep").replace("-+", "-plus").replace("+", "-plus-").replace("_", "-").replace("&", "-and-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-") + if slugified.endswith("-"): + slugified = slugified[:-1] + return slugified + + def _slugify_part_number(self): + slugified = self.part_number.casefold().replace(" ", "-").replace("-+", "-plus").replace("+", "-plus-").replace("_", "-").replace("&", "-and-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-") + if slugified.endswith("-"): + slugified = slugified[:-1] + return slugified + + def get_filepath(self): + return self.file_path + + def verify_slug(self, KNOWN_SLUGS): + # Verify the slug is unique, and not already known + known_slug_list_intersect = [(slug, file_path) for slug, file_path in KNOWN_SLUGS if slug == self.slug] + + if len(known_slug_list_intersect) == 0: + pass + elif len(known_slug_list_intersect) == 1: + if self.file_path not in known_slug_list_intersect[0][1]: + if 'R' not in self.change_type: + self.failureMessage = f'{self.file_path} has a duplicate slug: "{self.slug}"' + return False + return True + else: + self.failureMessage = f'{self.file_path} has a duplicate slug "{self.slug}"' + return False + + # Verify the manufacturer is appended to the slug + if not self.slug.startswith(self._slug_manufacturer): + self.failureMessage = f'{self.file_path} contains slug "{self.slug}". Does not start with manufacturer: "{self.manufacturer.casefold()}-"' + return False + + # Verify the slug ends with either the model or part number + if not (self.slug.endswith(self._slug_model) or self.slug.endswith(self._slug_part_number)): + self.failureMessage = f'{self.file_path} has slug "{self.slug}". Does not end with the model "{self._slug_model}" or part_number "{self._slug_part_number}"' + return False + + # Add the slug to the list of known slugs + KNOWN_SLUGS.add((self.slug, self.file_path)) + return True + + def validate_power(self): + # Check if power-ports exists + if self.definition.get('power-ports', False): + # Verify that is_powered is not set to False. If so, there should not be any power-ports defined + if not self.definition.get('is_powered', True): + self.failureMessage = f'{self.file_path} has is_powered set to False, but "power-ports" are defined.' + return False + return True + + # Lastly, check if interfaces exists and has a poe_mode defined + interfaces = self.definition.get('interfaces', False) + if interfaces: + for interface in interfaces: + poe_mode = interface.get('poe_mode', "") + if poe_mode != "" and poe_mode == "pd": + return True + + console_ports = self.definition.get('console-ports', False) + if console_ports: + for console_port in console_ports: + poe = console_port.get('poe', False) + if poe: + return True + + rear_ports = self.definition.get('rear-ports', False) + if rear_ports: + for rear_port in rear_ports: + poe = rear_port.get('poe', False) + if poe: + return True + + # Check if the device is a child device, and if so, assume it has a valid power source from the parent + subdevice_role = self.definition.get('subdevice_role', False) + if subdevice_role: + if subdevice_role == "child": + return True + + # Check if module-bays exists + if self.definition.get('module-bays', False): + # There is not a standardized way to define PSUs that are module bays, so we will just assume they are valid + return True + + # As the very last case, check if is_powered is defined and is False. Otherwise assume the device is powered + if not self.definition.get('is_powered', True): # is_powered defaults to True + # Arriving here means is_powered is set to False, so verify that there are no power-outlets defined + if self.definition.get('power-outlets', False): + self.failureMessage = f'{self.file_path} has is_powered set to False, but "power-outlets" are defined.' + return False + return True + + self.failureMessage = f'{self.file_path} has does not appear to have a valid power source. Ensure either "power-ports" or "interfaces" with "poe_mode" is defined.' + return False + +class ModuleType: + def __new__(cls, *args, **kwargs): + return super().__new__(cls) + + def __init__(self, definition, file_path, change_type): + self.file_path = file_path + self.isDevice = False + self.definition = definition + self.manufacturer = definition.get('manufacturer') + self.model = definition.get('model') + self._slug_model = self._slugify_model() + self.part_number = definition.get('part_number', "") + self._slug_part_number = self._slugify_part_number() + self.change_type = change_type + + def get_filepath(self): + return self.file_path + + def _slugify_model(self): + slugified = self.model.casefold().replace(" ", "-").replace("sfp+", "sfpp").replace("poe+", "poep").replace("-+", "-plus").replace("+", "-plus-").replace("_", "-").replace("&", "-and-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-") + if slugified.endswith("-"): + slugified = slugified[:-1] + return slugified + + def _slugify_part_number(self): + slugified = self.part_number.casefold().replace(" ", "-").replace("-+", "-plus").replace("+", "-plus-").replace("_", "-").replace("&", "-and-").replace("!", "").replace("/", "-").replace(",", "").replace("'", "").replace("*", "-") + if slugified.endswith("-"): + slugified = slugified[:-1] + return slugified + +def validate_component_names(component_names: (set or None)): + if len(component_names) > 1: + verify_name = list(component_names[0]) + for index, name in enumerate(component_names): + if index == 0: + continue + + intersection = sorted(set(verify_name) & set(list(name)), key = verify_name.index) + + intersection_len = len(intersection) + verify_subset = verify_name[:intersection_len] + name_subset = list(name)[:intersection_len] + subset_match = sorted(set(verify_subset) & set(name_subset), key = name_subset.index) + + if len(intersection) > 2 and len(subset_match) == len(intersection): + return False + return True + +def verify_filename(device: (DeviceType or ModuleType), KNOWN_MODULES: (set or None)): + head, tail = os.path.split(device.get_filepath()) + filename = tail.rsplit(".", 1)[0].casefold() + + if not (filename == device._slug_model or filename == device._slug_part_number or filename == device.part_number.casefold()): + device.failureMessage = f'{device.file_path} file name is invalid. Must be either the model "{device._slug_model}" or part_number "{device.part_number} / {device._slug_part_number}"' + return False + + if not device.isDevice: + matches = [file_name for file_name, file_path in KNOWN_MODULES if file_name.casefold() == filename.casefold()] + if len(matches) > 1: + device.failureMessage = f'{device.file_path} appears to be duplicated. Found {len(matches)} matches: {", ".join(matches)}' + return False + + return True + +def validate_components(component_types, device_or_module): + for component_type in component_types: + known_names = set() + known_components = [] + defined_components = device_or_module.definition.get(component_type, []) + if not isinstance(defined_components, list): + device_or_module.failureMessage = f'{device_or_module.file_path} has an invalid definition for {component_type}.' + return False + for idx, component in enumerate(defined_components): + if not isinstance(component, dict): + device_or_module.failureMessage = f'{device_or_module.file_path} has an invalid definition for {component_type} ({idx}).' + return False + name = component.get('name') + position = component.get('position') + eval_component = (name, position) + if not isinstance(name, str): + device_or_module.failureMessage = f'{device_or_module.file_path} has an invalid definition for {component_type} name ({idx}).' + return False + if eval_component[0] in known_names: + device_or_module.failureMessage = f'{device_or_module.file_path} has duplicated names within {component_type} ({name}).' + return False + known_components.append(eval_component) + known_names.add(name) + + # Adding check for duplicate positions within a component type + # Stems from https://github.com/netbox-community/devicetype-library/pull/1586 + # and from https://github.com/netbox-community/devicetype-library/issues/1584 + position_set = {} + index = 0 + for name, position in known_components: + if position is not None: + match = [] + if len(position_set) > 0: + match = [key for key,val in position_set.items() if key == position] + if len(match) == 0: + if len(position_set) == 0: + position_set = {position: {known_components[index]}} + else: + position_set.update({position: {known_components[index]}}) + else: + position_set[position].add(known_components[index]) + index = index + 1 + + for position in position_set: + if len(position_set[position]) > 1: + component_names = [name for name,pos in position_set[position]] + if not validate_component_names(component_names): + device_or_module.failureMessage = f'{device_or_module.file_path} has duplicated positions within {component_type} ({position}).' + return False + + return True diff --git a/tests/generate-slug-list.py b/tests/generate-slug-list.py new file mode 100644 index 000000000..cb901c559 --- /dev/null +++ b/tests/generate-slug-list.py @@ -0,0 +1,96 @@ +import os +import json +import glob +import yaml +import decimal +from yaml_loader import DecimalSafeLoader +from jsonschema import Draft4Validator, RefResolver +from jsonschema.exceptions import ValidationError +from test_configuration import SCHEMAS, KNOWN_SLUGS, ROOT_DIR, KNOWN_MODULES +from urllib.request import urlopen +import pickle_operations + +def _get_type_files(device_or_module): + """ + Return a list of all definition files within the specified path. + """ + file_list = [] + + for path, schema in SCHEMAS: + if path == f'{device_or_module}-types': + # Initialize the schema + with open(f"{ROOT_DIR}/schema/{schema}") as schema_file: + schema = json.loads(schema_file.read(), + parse_float=decimal.Decimal) + + # Validate that the schema exists + if not schema: + print(f"Schema definition for {path} is empty!") + exit(1) + + # Map each definition file to its schema as a tuple (file, schema) + for file in sorted(glob.glob(f"{path}/*/*", recursive=True)): + file_list.append((f'{file}', schema)) + + return file_list + +def _decimal_file_handler(uri): + """ + Handler to work with floating decimals that fail normal validation. + """ + with urlopen(uri) as url: + result = json.loads(url.read().decode("utf-8"), parse_float=decimal.Decimal) + return result + +def load_file(file_path, schema): + # Read file + try: + with open(file_path) as definition_file: + content = definition_file.read() + except Exception as exc: + return (False, f'Error opening "{file_path}". stderr: {exc}') + + # Check for trailing newline. YAML files must end with an emtpy newline. + if not content.endswith('\n'): + return (False, f'{file_path} is missing trailing newline') + + # Load YAML data from file + try: + definition = yaml.load(content, Loader=DecimalSafeLoader) + except Exception as exc: + return (False, f'Error during yaml.load "{file_path}". stderr: {exc}') + + # Validate YAML definition against the supplied schema + try: + resolver = RefResolver( + f"file://{os.getcwd()}/schema/devicetype.json", + schema, + handlers={"file": _decimal_file_handler}, + ) + # Validate definition against schema + Draft4Validator(schema, resolver=resolver).validate(definition) + except ValidationError as exc: + # Schema validation failure. Ensure you are following the proper format. + return (False, f'{file_path} failed validation: {exc}') + + return (True, definition) + +def _generate_knowns(device_or_module): + all_files = _get_type_files(device_or_module) + + for file_path, schema in all_files: + definition_status, definition = load_file(file_path, schema) + if not definition_status: + print(definition) + exit(1) + + if device_or_module == 'device': + KNOWN_SLUGS.add((definition.get('slug'), file_path)) + else: + KNOWN_MODULES.add((os.path.splitext(os.path.basename(file_path))[0], os.path.dirname(file_path))) + +_generate_knowns('device') +pickle_operations.write_pickle_data(KNOWN_SLUGS, f'{ROOT_DIR}/tests/known-slugs.pickle') + +_generate_knowns('module') +pickle_operations.write_pickle_data(KNOWN_MODULES, f'{ROOT_DIR}/tests/known-modules.pickle') \ No newline at end of file diff --git a/tests/known-modules.pickle b/tests/known-modules.pickle new file mode 100644 index 000000000..9126a10cf Binary files /dev/null and b/tests/known-modules.pickle differ diff --git a/tests/known-slugs.pickle b/tests/known-slugs.pickle new file mode 100644 index 000000000..fc710e9a2 Binary files /dev/null and b/tests/known-slugs.pickle differ diff --git a/tests/pickle_operations.py b/tests/pickle_operations.py new file mode 100644 index 000000000..193dd4da5 --- /dev/null +++ b/tests/pickle_operations.py @@ -0,0 +1,14 @@ +import pickle + +def write_pickle_data(data, file_path): + with open(file_path, 'wb') as pickle_file: + pickle.dump(data, pickle_file) + pickle_file.close() + + +def read_pickle_data(file_path): + with open(file_path, 'rb') as pickle_file: + data = pickle.load(pickle_file) + pickle_file.close() + + return data diff --git a/tests/test_configuration.py b/tests/test_configuration.py new file mode 100644 index 000000000..2082033b7 --- /dev/null +++ b/tests/test_configuration.py @@ -0,0 +1,38 @@ +import os + +SCHEMAS = ( + ('device-types', 'devicetype.json'), + ('module-types', 'moduletype.json'), +) + +IMAGE_FILETYPES = ( + 'bmp', 'gif', 'pjp', 'jpg', 'pjpeg', 'jpeg', 'jfif', 'png', 'tif', 'tiff', 'webp' +) + +COMPONENT_TYPES = ( + 'console-ports', + 'console-server-ports', + 'power-ports', + 'power-outlets', + 'interfaces', + 'front-ports', + 'rear-ports', + 'device-bays', + 'module-bays', +) + +PRECOMMIT_ALL_SWITCHES = [ + '-a', + '--all-files', + '--all' +] + +ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname( __file__ ), '..')) + +KNOWN_SLUGS = set() +KNOWN_MODULES = set() + +USE_LOCAL_KNOWN_SLUGS = False +USE_UPSTREAM_DIFF = True + +NETBOX_DT_LIBRARY_URL = "https://github.com/netbox-community/devicetype-library.git" \ No newline at end of file diff --git a/tests/yaml_loader.py b/tests/yaml_loader.py new file mode 100644 index 000000000..dac9d522d --- /dev/null +++ b/tests/yaml_loader.py @@ -0,0 +1,34 @@ +import decimal + +from yaml.composer import Composer +from yaml.constructor import SafeConstructor +from yaml.parser import Parser +from yaml.reader import Reader +from yaml.resolver import Resolver +from yaml.scanner import Scanner + + +class DecimalSafeConstructor(SafeConstructor): + """Special constructor to override construct_yaml_float() in order to cast "Decimal" types to the value""" + + def construct_yaml_float(self, node): + value = super().construct_yaml_float(node) + # We force the string representation of the float here to avoid things like: + # In [11]: decimal.Decimal(10.11) + # Out[11]: Decimal('10.1099999999999994315658113919198513031005859375') + return decimal.Decimal(f"{value}") + + +DecimalSafeConstructor.add_constructor( + "tag:yaml.org,2002:float", DecimalSafeConstructor.construct_yaml_float +) + + +class DecimalSafeLoader(Reader, Scanner, Parser, Composer, DecimalSafeConstructor, Resolver): + def __init__(self, stream): + Reader.__init__(self, stream) + Scanner.__init__(self) + Parser.__init__(self) + Composer.__init__(self) + DecimalSafeConstructor.__init__(self) + Resolver.__init__(self)