1
0
mirror of https://github.com/checktheroads/hyperglass synced 2024-05-11 05:55:08 +00:00
Files
checktheroads-hyperglass/hyperglass/external/http_client.py
2022-12-24 17:53:05 -05:00

235 lines
6.6 KiB
Python

"""HTTP Client for plugin use."""
# Standard Library
import typing as t
# Project
from hyperglass.models.fields import JsonValue, Primitives
# Local
from ._base import BaseExternal
class HTTPClient(BaseExternal, name="HTTPClient"):
"""Wrapper around a standard HTTP Client."""
def __init__(self: "HTTPClient", base_url: str, timeout: int = 10) -> None:
"""Create an HTTPClient instance."""
super().__init__(base_url=base_url, timeout=timeout, parse=False)
async def aget(
self: "HTTPClient",
endpoint: str,
headers: t.Dict[str, str] = None,
params: t.Dict[str, JsonValue[Primitives]] = None,
data: t.Optional[t.Any] = None,
timeout: t.Optional[int] = None,
) -> t.Any:
"""Perform an async HTTP GET request."""
return await self._arequest(
method="GET",
endpoint=endpoint,
headers=headers,
params=params,
data=data,
timeout=timeout,
)
async def apost(
self: "HTTPClient",
endpoint: str,
headers: t.Dict[str, str] = None,
params: t.Dict[str, JsonValue[Primitives]] = None,
data: t.Optional[t.Any] = None,
timeout: t.Optional[int] = None,
) -> t.Any:
"""Perform an async HTTP POST request."""
return await self._arequest(
method="POST",
endpoint=endpoint,
headers=headers,
params=params,
data=data,
timeout=timeout,
)
async def aput(
self: "HTTPClient",
endpoint: str,
headers: t.Dict[str, str] = None,
params: t.Dict[str, JsonValue[Primitives]] = None,
data: t.Optional[t.Any] = None,
timeout: t.Optional[int] = None,
) -> t.Any:
"""Perform an async HTTP PUT request."""
return await self._arequest(
method="PUT",
endpoint=endpoint,
headers=headers,
params=params,
data=data,
timeout=timeout,
)
async def adelete(
self: "HTTPClient",
endpoint: str,
headers: t.Dict[str, str] = None,
params: t.Dict[str, JsonValue[Primitives]] = None,
data: t.Optional[t.Any] = None,
timeout: t.Optional[int] = None,
) -> t.Any:
"""Perform an async HTTP DELETE request."""
return await self._arequest(
method="DELETE",
endpoint=endpoint,
headers=headers,
params=params,
data=data,
timeout=timeout,
)
async def apatch(
self: "HTTPClient",
endpoint: str,
headers: t.Dict[str, str] = None,
params: t.Dict[str, JsonValue[Primitives]] = None,
data: t.Optional[t.Any] = None,
timeout: t.Optional[int] = None,
) -> t.Any:
"""Perform an async HTTP PATCH request."""
return await self._arequest(
method="PATCH",
endpoint=endpoint,
headers=headers,
params=params,
data=data,
timeout=timeout,
)
async def ahead(
self: "HTTPClient",
endpoint: str,
headers: t.Dict[str, str] = None,
params: t.Dict[str, JsonValue[Primitives]] = None,
data: t.Optional[t.Any] = None,
timeout: t.Optional[int] = None,
) -> t.Any:
"""Perform an async HTTP HEAD request."""
return await self._arequest(
method="HEAD",
endpoint=endpoint,
headers=headers,
params=params,
data=data,
timeout=timeout,
)
def get(
self: "HTTPClient",
endpoint: str,
headers: t.Dict[str, str] = None,
params: t.Dict[str, JsonValue[Primitives]] = None,
data: t.Optional[t.Any] = None,
timeout: t.Optional[int] = None,
) -> t.Any:
"""Perform an HTTP GET request."""
return self._request(
method="GET",
endpoint=endpoint,
headers=headers,
params=params,
data=data,
timeout=timeout,
)
def post(
self: "HTTPClient",
endpoint: str,
headers: t.Dict[str, str] = None,
params: t.Dict[str, JsonValue[Primitives]] = None,
data: t.Optional[t.Any] = None,
timeout: t.Optional[int] = None,
) -> t.Any:
"""Perform an HTTP POST request."""
return self._request(
method="POST",
endpoint=endpoint,
headers=headers,
params=params,
data=data,
timeout=timeout,
)
def put(
self: "HTTPClient",
endpoint: str,
headers: t.Dict[str, str] = None,
params: t.Dict[str, JsonValue[Primitives]] = None,
data: t.Optional[t.Any] = None,
timeout: t.Optional[int] = None,
) -> t.Any:
"""Perform an HTTP PUT request."""
return self._request(
method="PUT",
endpoint=endpoint,
headers=headers,
params=params,
data=data,
timeout=timeout,
)
def delete(
self: "HTTPClient",
endpoint: str,
headers: t.Dict[str, str] = None,
params: t.Dict[str, JsonValue[Primitives]] = None,
data: t.Optional[t.Any] = None,
timeout: t.Optional[int] = None,
) -> t.Any:
"""Perform an HTTP DELETE request."""
return self._request(
method="DELETE",
endpoint=endpoint,
headers=headers,
params=params,
data=data,
timeout=timeout,
)
def patch(
self: "HTTPClient",
endpoint: str,
headers: t.Dict[str, str] = None,
params: t.Dict[str, JsonValue[Primitives]] = None,
data: t.Optional[t.Any] = None,
timeout: t.Optional[int] = None,
) -> t.Any:
"""Perform an HTTP PATCH request."""
return self._request(
method="PATCH",
endpoint=endpoint,
headers=headers,
params=params,
data=data,
timeout=timeout,
)
def head(
self: "HTTPClient",
endpoint: str,
headers: t.Dict[str, str] = None,
params: t.Dict[str, JsonValue[Primitives]] = None,
data: t.Optional[t.Any] = None,
timeout: t.Optional[int] = None,
) -> t.Any:
"""Perform an HTTP HEAD request."""
return self._request(
method="HEAD",
endpoint=endpoint,
headers=headers,
params=params,
data=data,
timeout=timeout,
)