1
0
mirror of https://github.com/peeringdb/peeringdb.git synced 2024-05-11 05:55:09 +00:00
Files
ch f0a6bb3f33 Validator: Add validator for X usernames, were requirements (#1580)
are different for the generalized regex match.

> A username can only contain alphanumeric characters (letters A-Z, numbers 0-9) with the
> exception of underscores, as noted above. Check to make sure your desired username
> does not contain any symbols, dashes, or spaces.

https://help.twitter.com/en/managing-your-account/x-username-rules

Add a specific if-else regex match for X usernames to take into account
that PeeringDB users are able to have X usernames of less than 4 characters
that are the current minimum our generalized username regexer checks in
the string length match. I.e. We currently accept 4-32. X allows 1-15 in length.
2024-05-01 14:45:33 -05:00

610 lines
17 KiB
Python

"""
peeringdb model / field validators
"""
import ipaddress
import re
import phonenumbers
from django.conf import settings
from django.core.exceptions import ValidationError
from django.core.validators import URLValidator, validate_email
from django.utils.translation import gettext_lazy as _
from geopy.distance import geodesic
from rest_framework.exceptions import ValidationError as RestValidationError
from schema import Schema
import peeringdb_server.geo as geo
import peeringdb_server.models
from peeringdb_server.inet import IRR_SOURCE, network_is_pdb_valid
from peeringdb_server.request import bypass_validation
from peeringdb_server.verified_update import const
def validate_email_domains(text):
if not text:
return ""
lines_in = text.split("\n")
lines_out = []
for line in lines_in:
if not line:
continue
line = line.strip()
try:
validate_email(f"name@{line}")
except ValidationError:
raise ValidationError(_("Invalid format"))
lines_out.append(line.lower())
return "\n".join(lines_out)
def validate_poc_visible(visible):
# we no longer allow "Private" network contacts
# however until all private network contacts have
# been either changed or deleted we cannot remove
# the value from the choices set for the field
#
# for now we handle validation here (see #944)
if visible == "Private":
raise ValidationError(_("Private contacts are no longer supported."))
return visible
def validate_phonenumber(phonenumber, country=None):
"""
Validate a phonenumber to E.164
Arguments:
- phonenumber (str)
Raises:
- ValidationError if phone number isn't valid E.164 and cannot
be made E.164 valid
Returns:
- str: validated phonenumber
"""
if not phonenumber:
return ""
try:
parsed_number = phonenumbers.parse(phonenumber, country)
validated_number = phonenumbers.format_number(
parsed_number, phonenumbers.PhoneNumberFormat.E164
)
return f"{validated_number}"
except phonenumbers.phonenumberutil.NumberParseException:
raise ValidationError(_("Not a valid phone number (E.164)"))
def validate_zipcode(zipcode, country):
"""
Validate a zipcode for a country. If a country has zipcodes, a zipcode
is required. If a country does not have zipcodes, it's not required.
Arguments:
- zipcode (can be Str or None at this point)
- country (two-letter country-code provided in data)
Raises:
- ValidationError if Zipcode is missing from a country WITH
zipcodes
Returns:
- str: zipcode
"""
if country in settings.NON_ZIPCODE_COUNTRIES:
return ""
else:
if (zipcode is None) or (zipcode == ""):
raise ValidationError(_("Input required"))
else:
return zipcode
def validate_prefix(prefix):
"""
Validate ip prefix.
Arguments:
- prefix: ipaddress.IPv4Network or an ipaddress.IPv6Network
Raises:
- ValidationError on failed validation
Returns:
- ipaddress.ip_network instance
"""
if isinstance(prefix, str):
try:
prefix = ipaddress.ip_network(prefix)
except ValueError:
raise ValidationError(_("Invalid prefix: {}").format(prefix))
return prefix
def validate_address_space(prefix):
"""
Validate an ip prefix according to peeringdb specs.
Arguments:
- prefix: ipaddress.IPv4Network or an ipaddress.IPv6Network
Raises:
- ValidationError on failed validation
"""
prefix = validate_prefix(prefix)
if not network_is_pdb_valid(prefix):
raise ValidationError(_("Address space invalid: {}").format(prefix))
# bypass validation according to #741
if bypass_validation():
return
prefixlen_min = getattr(settings, f"DATA_QUALITY_MIN_PREFIXLEN_V{prefix.version}")
prefixlen_max = getattr(settings, f"DATA_QUALITY_MAX_PREFIXLEN_V{prefix.version}")
if prefix.prefixlen < prefixlen_min:
raise ValidationError(
_("Maximum allowed prefix length is {}").format(prefixlen_min)
)
elif prefix.prefixlen > prefixlen_max:
raise ValidationError(
_("Minimum allowed prefix length is {}").format(prefixlen_max)
)
def validate_info_prefixes4(value):
if not value:
value = 0
if value < 0:
raise ValidationError(_("Negative value not allowed"))
# bypass validation according to #741
if bypass_validation():
return value
if value > settings.DATA_QUALITY_MAX_PREFIX_V4_LIMIT:
raise ValidationError(
_("Maximum value allowed {}").format(
settings.DATA_QUALITY_MAX_PREFIX_V4_LIMIT
)
)
return value
def validate_info_prefixes6(value):
if not value:
value = 0
if value < 0:
raise ValidationError(_("Negative value not allowed"))
# bypass validation according to #741
if bypass_validation():
return value
if value > settings.DATA_QUALITY_MAX_PREFIX_V6_LIMIT:
raise ValidationError(
_("Maximum value allowed {}").format(
settings.DATA_QUALITY_MAX_PREFIX_V6_LIMIT
)
)
return value
def validate_prefix_overlap(prefix):
"""
Validate that a prefix does not overlap with another prefix
on an already existing ixlan.
Arguments:
- prefix: ipaddress.IPv4Network or an ipaddress.IPv6Network
Raises:
- ValidationError on failed validation
"""
prefix = validate_prefix(prefix)
qs = peeringdb_server.models.IXLanPrefix.objects.filter(
protocol=f"IPv{prefix.version}", status="ok"
)
qs = qs.exclude(prefix=prefix)
for ixpfx in qs:
if ixpfx.prefix.overlaps(prefix):
raise ValidationError(
_(
"Prefix overlaps with {}'s prefix: {}".format(
ixpfx.ixlan.ix.name, ixpfx.prefix
)
)
)
def validate_irr_as_set(value):
"""
Validate irr as-set string.
- the as-set/rs-set name has to conform to RFC 2622 (5.1 and 5.2)
- the source may be specified by AS-SET@SOURCE or SOURCE::AS-SET
- multiple values must be separated by either comma, space or comma followed by space
Arguments:
- value: irr as-set string
Returns:
- str: validated irr as-set string
"""
if not isinstance(value, str):
raise ValueError(_("IRR AS-SET value must be string type"))
# split multiple values
# normalize value separation to commas
value = value.replace(", ", ",")
value = value.replace(" ", ",")
validated = []
# validate
for item in value.split(","):
item = item.upper()
source = None
as_set = None
# <name>@<source>
parts_match = re.match(r"^([\w\d\-:]+)@([\w\d\-:]+)$", item)
if parts_match:
source = parts_match.group(2)
as_set = parts_match.group(1)
# <source>::<name>
else:
parts_match = re.match(r"^([\w\d\-:]+)::([\w\d\-:]+)$", item)
if parts_match:
source = parts_match.group(1)
as_set = parts_match.group(2)
else:
sourceless_match = re.match(r"^([\w\d\-:]+)$", item)
if not sourceless_match:
raise ValidationError(
_(
"Invalid formatting: {} - should be AS-SET, ASx, AS-SET@SOURCE or SOURCE::AS-SET"
).format(item)
)
as_set = sourceless_match.group(1)
if source and source not in IRR_SOURCE:
raise ValidationError(_("Unknown IRR source: {}").format(source))
# validate set name and as hierarchy
as_parts = as_set.split(":")
# validate max depth (superusers are allowed to bypass this validation, see #741)
if (
len(as_parts) > settings.DATA_QUALITY_MAX_IRR_DEPTH
and not bypass_validation()
):
raise ValidationError(
_("Maximum AS-SET hierarchy depth: {}").format(
settings.DATA_QUALITY_MAX_IRR_DEPTH
)
)
set_found = False
types = []
for part in as_parts:
match_set = re.match(r"^(AS|RS)-[\w\d\-]+$", part)
match_as = re.match(r"^(AS)[\d]+$", part)
# set name found
if match_set:
set_found = True
types.append(match_set.group(1))
elif not match_as:
raise ValidationError(
_(
"Invalid formatting: {} - should be RS-SET, AS-SET or AS123"
).format(part)
)
if len(list(set(types))) > 1:
raise ValidationError(
_("All parts of an hierarchical name have to be of the same type")
)
if not set_found and len(as_parts) > 1:
raise ValidationError(
_("At least one component must be an actual set name")
)
validated.append(item)
return " ".join(validated)
def validate_bool(value):
"""
Validates a boolean value
This can be passed a string for `True` or `False` or an integer as 1, 0 as well
to convert and return a boolean value
Will raise ValidationError on failure.
Arguments:
- value (`str`|`int`|`bool`)
Returns:
- validated value (`bool`)
"""
try:
if isinstance(value, bool):
return value
if isinstance(value, str):
if value.lower() == "true":
return True
if value.lower() == "false":
return False
return bool(int(value))
except TypeError:
raise ValidationError(_("Needs to be 'True', 'False', 1 or 0"))
def validate_api_rate(value):
"""
Validates a number/time-unit format used to determine rate limits
e.g., 10/second or 100/minute
Will raise a ValidationError on failure
Arguments:
- value(`str`)
Returns:
- validated value (`str`)
"""
value = str(value)
if re.match(r"([/\d]+)\s*(?:minute|hour|seconds|day|week|month|year)", value):
return value
else:
raise ValidationError(
_(
"Invalid setting! Acceptable value is a number followed by one of the following: minute, hour, seconds, day, week, month, year. eg (10/minute, 1/hour, 5/day, 1/week, 1/month, 1/year)"
)
)
def validate_social_media(value):
"""
Validates a social media value
Will raise a ValidationError on failure
Arguments:
- value(`dict`)
Returns:
- validated value (`dict`)
"""
if value:
schema = Schema([{"service": str, "identifier": str}])
try:
schema.validate(value)
except Exception:
raise ValidationError(_("Malformed social media data."))
service = [sc.get("service") for sc in value]
if not len(set(service)) == len(service):
raise ValidationError(_("Duplicate social media services set."))
for data in value:
service = data.get("service")
identifier = data.get("identifier")
if service == "":
raise ValidationError(_("Service should not be empty!"))
elif identifier == "":
raise ValidationError(_("Identifier should not be empty!"))
if service in ["website"]:
# validate URL
try:
URLValidator(identifier)
except Exception:
raise ValidationError(
_("Invalid {service} URL!").format(service=service)
)
elif service in ["instagram", "x", "tiktok", "facebook", "linkedin"]:
# validate username
if service in ["x"]:
regex = r"^[a-zA-Z0-9_]{1,15}$"
else:
regex = r"^(-*)(?=.{4,32}$)(?![.\-])(?!.*[.]{2})[a-zA-Z0-9._\-]+(?<![.])$"
matches = re.search(regex, identifier)
if not matches:
raise ValidationError(
_("Invalid {service} username!").format(service=service)
)
elif not service:
# service can't be None and empty.
raise ValidationError(_("Invalid service!"))
elif not identifier:
# identifier can't be None and empty.
raise ValidationError(_("Invalid identifier!"))
return value
def validate_website_override(website, org_website):
"""
Validates a website value
Will raise a ValidationError on failure
Arguments:
- value(`str`)
Returns:
- validated value (`str`)
"""
if not website and not org_website:
raise ValidationError({"website": ["Website required"]})
elif not website and org_website:
return org_website
return website
def validate_verified_update_data(ref_tag, obj_id, data):
"""
Validates a VerifiedUpdate updates value
Will return a False and message on failure
Arguments:
- ref_tag(`str`)
- obj_id(`int`)
- data(`dict`)
Returns:
- status (`bool`)
- validated data (`dict`)
"""
if not data:
return False, _("Data is empty")
if ref_tag not in const.SUPPORTED_FIELDS:
return False, _(f"Unknown object type: {ref_tag}")
model = peeringdb_server.models.REFTAG_MAP[ref_tag]
try:
obj = model.objects.get(id=obj_id)
except model.DoesNotExist:
return False, _(f"object {ref_tag}.{obj_id} not found")
result = {}
for field, value in data.items():
if field not in const.SUPPORTED_FIELDS[ref_tag]:
continue
if not hasattr(obj, field):
continue
if value == "true":
value = True
elif value == "false":
value = False
else:
try:
value = int(value)
except (TypeError, ValueError):
pass
if value != getattr(obj, field):
result.update({field: value})
return True, result
def validate_asn_prefix(asn):
"""
Validates a ASN prefix value
Will raise RestValidationError on failure
Arguments:
- asn(`str`)
Returns:
- status (`bool`)
- validated_value (`int`)
"""
value = str(asn)
validated_value = re.match(r"^(asn|as|)(\d+)$", value.lower())
if validated_value:
return validated_value.group(2)
else:
raise RestValidationError({"asn": ["ASN contains invalid value"]})
def validate_latitude(latitude):
try:
value = float(latitude)
is_valid = -90 <= value <= 90
except ValueError:
is_valid = False
if not is_valid:
raise ValidationError({"latitude": f"Invalid {latitude} latitude!"})
return value
def validate_longitude(longitude):
try:
value = float(longitude)
is_valid = -180 <= value <= 180
except ValueError:
is_valid = False
if not is_valid:
raise ValidationError({"longitude": f"Invalid {longitude} longitude!"})
return value
def validate_distance_geocode(current_geocode, new_geocode, city):
if (
current_geocode
and type(tuple)
and all(value is not None for value in current_geocode)
):
# When geocode already
max_distance = settings.FACILITY_MAX_DISTANCE_GEOCODE_EXISTS
distance = geodesic(current_geocode, new_geocode).km
if distance > max_distance:
message = f"exceeds the maximum distance of {max_distance}KM from the previous geocode"
raise ValidationError({"latitude": message, "longitude": message})
else:
# When no geocode currently exists
gmaps = geo.GoogleMaps(settings.GOOGLE_GEOLOC_API_KEY, timeout=5)
city_geocode = ()
max_distance = settings.FACILITY_MAX_DISTANCE_GEOCODE_NOT_EXISTS
try:
result = gmaps.geocode_freeform(city)
city_geocode = (result.get("lat"), result.get("lng"))
except geo.Timeout:
raise ValidationError(_("Geo coding timed out"))
except geo.RequestError as exc:
raise ValidationError(_("Geo coding failed: {}").format(exc))
except geo.NotFound:
raise ValidationError(_("Geo coding failed: City not found"))
distance = geodesic(city_geocode, new_geocode).km
if distance > max_distance:
message = (
f"exceeds a maximum distance of {max_distance}KM from the city center"
)
raise ValidationError({"latitude": message, "longitude": message})
return new_geocode