2020-04-18 23:18:50 -07:00
|
|
|
"""Session handler for Slack API."""
|
|
|
|
|
2021-12-14 22:59:05 -07:00
|
|
|
# Standard Library
|
|
|
|
import typing as t
|
|
|
|
|
2020-04-18 23:18:50 -07:00
|
|
|
# Project
|
|
|
|
from hyperglass.log import log
|
|
|
|
from hyperglass.external._base import BaseExternal
|
2020-10-05 12:07:34 -07:00
|
|
|
from hyperglass.models.webhook import Webhook
|
2020-04-18 23:18:50 -07:00
|
|
|
|
2021-12-14 22:59:05 -07:00
|
|
|
if t.TYPE_CHECKING:
|
|
|
|
# Project
|
|
|
|
from hyperglass.models.config.logging import Http
|
|
|
|
|
2020-04-18 23:18:50 -07:00
|
|
|
|
|
|
|
class SlackHook(BaseExternal, name="Slack"):
|
|
|
|
"""Slack session handler."""
|
|
|
|
|
2021-12-14 22:59:05 -07:00
|
|
|
def __init__(self: "SlackHook", config: "Http") -> None:
|
2020-04-18 23:18:50 -07:00
|
|
|
"""Initialize external base class with Slack connection details."""
|
|
|
|
|
2020-06-26 10:10:43 -07:00
|
|
|
super().__init__(base_url="https://hooks.slack.com", config=config, parse=False)
|
2020-04-18 23:18:50 -07:00
|
|
|
|
2021-12-14 22:59:05 -07:00
|
|
|
async def send(self: "SlackHook", query: t.Dict[str, t.Any]):
|
2020-04-18 23:18:50 -07:00
|
|
|
"""Send an incoming webhook to Slack."""
|
|
|
|
|
|
|
|
payload = Webhook(**query)
|
|
|
|
|
|
|
|
log.debug("Sending query data to Slack:\n{}", payload)
|
|
|
|
|
2020-04-21 03:29:42 -07:00
|
|
|
return await self._apost(endpoint=self.config.host.path, data=payload.slack())
|