2019-12-29 23:57:39 -07:00
|
|
|
"""Handle JSON Web Token Encoding & Decoding."""
|
|
|
|
|
2019-12-28 02:00:34 -07:00
|
|
|
# Standard Library Imports
|
|
|
|
import datetime
|
|
|
|
|
|
|
|
# Third Party Imports
|
|
|
|
import jwt
|
|
|
|
|
|
|
|
# Project Imports
|
|
|
|
from hyperglass.exceptions import RestError
|
|
|
|
|
|
|
|
|
|
|
|
async def jwt_decode(payload, secret):
|
2019-12-31 13:30:55 -07:00
|
|
|
"""Decode & validate an encoded JSON Web Token (JWT).
|
|
|
|
|
|
|
|
Arguments:
|
|
|
|
payload {str} -- Raw JWT payload
|
|
|
|
secret {str} -- JWT secret
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
RestError: Raised if decoded payload is improperly formatted
|
|
|
|
or if the JWT is not able to be decoded.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
{str} -- Decoded response payload
|
|
|
|
"""
|
2019-12-28 02:00:34 -07:00
|
|
|
try:
|
|
|
|
decoded = jwt.decode(payload, secret, algorithm="HS256")
|
|
|
|
decoded = decoded["payload"]
|
|
|
|
return decoded
|
|
|
|
except (KeyError, jwt.PyJWTError) as exp:
|
|
|
|
raise RestError(str(exp)) from None
|
|
|
|
|
|
|
|
|
|
|
|
async def jwt_encode(payload, secret, duration):
|
2019-12-31 13:30:55 -07:00
|
|
|
"""Encode a query to a JSON Web Token (JWT).
|
|
|
|
|
|
|
|
Arguments:
|
|
|
|
payload {str} -- Stringified JSON request
|
|
|
|
secret {str} -- JWT secret
|
|
|
|
duration {int} -- Number of seconds claim is valid
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
str -- Encoded request payload
|
|
|
|
"""
|
2019-12-28 02:00:34 -07:00
|
|
|
token = {
|
|
|
|
"payload": payload,
|
|
|
|
"nbf": datetime.datetime.utcnow(),
|
|
|
|
"iat": datetime.datetime.utcnow(),
|
|
|
|
"exp": datetime.datetime.utcnow() + datetime.timedelta(seconds=duration),
|
|
|
|
}
|
|
|
|
encoded = jwt.encode(token, secret, algorithm="HS256").decode("utf-8")
|
|
|
|
return encoded
|