1
0
mirror of https://github.com/NLnetLabs/rtrtr.git synced 2024-05-11 05:55:07 +00:00

Add metrics.

This commit is contained in:
Martin Hoffmann
2020-11-05 11:08:57 +01:00
parent b74f56cf46
commit f2d1591ef2
12 changed files with 819 additions and 20 deletions

View File

@ -12,8 +12,10 @@ readme = "README.md"
[dependencies]
chrono = "0.4.11"
clap = "2.33"
crossbeam-utils = "0.7.2"
fern = "0.6.0"
futures = "0.3"
hyper = "0.13.4"
log = "0.4"
log-reroute = "0.1.5"
rand = "0.7.3"

View File

@ -21,6 +21,12 @@ log_facility = "daemon"
# If file logging is used, the log file must be given.
log_file = "/var/log/rtrtr.log"
# Where should the HTTP server listen on?
#
# Currently, the server is only used for metrics, but eventually we will have
# HTTP targets that publish their data under a path of the HTTP server.
http-listen = ["127.0.0.1:8080"]
# RTRTR uses two classes of components: units and targets. Units take data
# from somewhere and produce a single, constantly updated data set. Targets
# take the data set from exactly one other unit and serve it in some specific

View File

@ -8,11 +8,19 @@
//! sends its updates. The opposite end is called a `Link` and is held by
//! other units or targets.
use std::fmt;
use std::sync::atomic;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicUsize};
use chrono::{DateTime, Utc};
use crossbeam_utils::atomic::AtomicCell;
use slab::Slab;
use serde_derive::Deserialize;
use tokio::sync::{mpsc, oneshot};
use crate::{manager, payload};
use crate::{manager, metrics, payload};
use crate::config::Marked;
use crate::metrics::{Metric, MetricType, MetricUnit};
//------------ Configuration -------------------------------------------------
@ -39,7 +47,7 @@ const COMMAND_QUEUE_LEN: usize = 16;
/// gate becomes active again.
///
/// In order for the gate to maintain its own state, the unit needs to
/// regularly run the `process` method. It return, the unit will receive an
/// regularly run the `process` method. In return, the unit will receive an
/// update to the gates state as soon as it becomes available.
///
/// Sending of updates happens via the `update_data` and `update_status`
@ -57,6 +65,9 @@ pub struct Gate {
/// The current unit status.
unit_status: UnitStatus,
/// The gate metrics.
metrics: Arc<GateMetrics>,
}
@ -71,12 +82,18 @@ impl Gate {
commands: rx,
updates: Slab::new(),
suspended: 0,
unit_status: UnitStatus::Healthy,
unit_status: UnitStatus::default(),
metrics: Default::default(),
};
let agent = GateAgent { commands: tx };
(gate, agent)
}
/// Returns the metrics.
pub fn metrics(&self) -> Arc<GateMetrics> {
self.metrics.clone()
}
/// Runs the gates internal machine.
///
/// This method returns a future that runs the gates internal machine.
@ -127,7 +144,8 @@ impl Gate {
}
item.sender = None
}
self.updates.retain(|_, item| item.sender.is_some())
self.updates.retain(|_, item| item.sender.is_some());
self.metrics.update(&update);
}
/// Updates the unit status.
@ -146,7 +164,8 @@ impl Gate {
}
item.sender = None
}
self.updates.retain(|_, item| item.sender.is_some())
self.updates.retain(|_, item| item.sender.is_some());
self.metrics.update_status(update);
}
/// Returns the current gate status.
@ -207,6 +226,91 @@ impl GateAgent {
}
//------------ GateMetrics ---------------------------------------------------
/// Metrics about the updates distributed via the gate.
#[derive(Debug, Default)]
pub struct GateMetrics {
status: AtomicCell<UnitStatus>,
serial: AtomicU32,
count: AtomicUsize,
update: AtomicCell<Option<DateTime<Utc>>>,
}
impl GateMetrics {
fn update(&self, update: &payload::Update) {
self.serial.store(update.serial().into(), atomic::Ordering::Relaxed);
self.count.store(update.set().len(), atomic::Ordering::Relaxed);
self.update.store(Some(Utc::now()));
}
fn update_status(&self, status: UnitStatus) {
self.status.store(status)
}
}
impl GateMetrics {
const STATUS_METRIC: Metric = Metric::new(
"gate_status", "the operational status of the unit",
MetricType::Text, MetricUnit::Info
);
const SERIAL_METRIC: Metric = Metric::new(
"serial", "the serial number of the units updates",
MetricType::Counter, MetricUnit::Info
);
const COUNT_METRIC: Metric = Metric::new(
"vrps", "the number of VRPs in the last update",
MetricType::Gauge, MetricUnit::Total
);
const UPDATE_METRIC: Metric = Metric::new(
"last_update", "the date and time of the last update",
MetricType::Text, MetricUnit::Info
);
const UPDATE_AGO_METRIC: Metric = Metric::new(
"since_last_update", "the number of seconds since the last update",
MetricType::Gauge, MetricUnit::Second
);
}
impl metrics::Source for GateMetrics {
fn append(&self, unit_name: &str, target: &mut metrics::Target) {
target.append_simple(
&Self::STATUS_METRIC, Some(unit_name), self.status.load()
);
target.append_simple(
&Self::SERIAL_METRIC, Some(unit_name),
self.serial.load(atomic::Ordering::Relaxed)
);
target.append_simple(
&Self::COUNT_METRIC, Some(unit_name),
self.count.load(atomic::Ordering::Relaxed)
);
match self.update.load() {
Some(update) => {
target.append_simple(
&Self::UPDATE_METRIC, Some(unit_name),
update
);
let ago = Utc::now().signed_duration_since(update);
let ago = (ago.num_milliseconds() as f64) / 1000.;
target.append_simple(
&Self::UPDATE_AGO_METRIC, Some(unit_name), ago
);
}
None => {
target.append_simple(
&Self::UPDATE_METRIC, Some(unit_name),
"N/A"
);
target.append_simple(
&Self::UPDATE_AGO_METRIC, Some(unit_name), -1
);
}
}
}
}
//------------ Link ----------------------------------------------------------
/// A link to another unit.
@ -447,10 +551,26 @@ pub enum UnitStatus {
Gone,
}
impl Default for UnitStatus {
fn default() -> Self {
UnitStatus::Healthy
}
}
impl fmt::Display for UnitStatus {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(match *self {
UnitStatus::Healthy => "healthy",
UnitStatus::Stalled => "stalled",
UnitStatus::Gone => "gone",
})
}
}
//------------ Terminated ----------------------------------------------------
/// A unit has been terminated.
/// An error signalling that a unit has been terminated.
///
/// In response to this error, a units run function should return.
#[derive(Clone, Copy, Debug)]

View File

@ -6,6 +6,7 @@ use std::sync::Arc;
use clap::{App, Arg, ArgMatches};
use serde_derive::Deserialize;
use toml::Spanned;
use crate::http;
use crate::log::{ExitError, Failed, LogConfig};
use crate::manager::{Manager, TargetSet, UnitSet};
@ -20,6 +21,9 @@ pub struct Config {
#[serde(flatten)]
pub log: LogConfig,
#[serde(flatten)]
pub http: http::Server,
}
impl Config {

198
src/http.rs Normal file
View File

@ -0,0 +1,198 @@
/// The HTTP server.
use std::io;
use std::convert::Infallible;
use std::net::SocketAddr;
use std::net::TcpListener as StdListener;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::pin_mut;
use hyper::{Body, Method, Request, Response, StatusCode};
use hyper::server::accept::Accept;
use hyper::service::{make_service_fn, service_fn};
use log::error;
use serde_derive::Deserialize;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream};
use tokio::runtime::Runtime;
use tokio::stream::Stream;
use crate::log::ExitError;
use crate::metrics;
//------------ Server --------------------------------------------------------
#[derive(Clone, Deserialize)]
pub struct Server {
#[serde(rename = "http-listen")]
listen: Vec<SocketAddr>,
}
impl Server {
pub fn run(
&self,
metrics: metrics::Collection,
runtime: &Runtime,
) -> Result<(), ExitError> {
let mut listeners = Vec::new();
for addr in &self.listen {
// Binding needs to have happened before dropping privileges
// during detach. So we do this here synchronously.
match StdListener::bind(addr) {
Ok(listener) => listeners.push(listener),
Err(err) => {
error!("Fatal: error listening on {}: {}", addr, err);
return Err(ExitError);
}
};
}
for listener in listeners {
runtime.spawn(
Self::single_listener(listener, metrics.clone())
);
}
Ok(())
}
async fn single_listener(
listener: StdListener,
metrics: metrics::Collection,
) {
let make_service = make_service_fn(|_conn| {
let metrics = metrics.clone();
async move {
Ok::<_, Infallible>(service_fn(move |req| {
let metrics = metrics.clone();
async move { Self::handle_request(req, &metrics).await }
}))
}
});
let listener = match TcpListener::from_std(listener) {
Ok(listener) => listener,
Err(err) => {
error!("Error on HTTP listener: {}", err);
return
}
};
if let Err(err) = hyper::Server::builder(
HttpAccept { sock: listener }
).serve(make_service).await {
error!("HTTP server error: {}", err);
}
}
async fn handle_request(
req: Request<Body>,
metrics: &metrics::Collection
) -> Result<Response<Body>, Infallible> {
if *req.method() != Method::GET {
return Ok(Self::method_not_allowed())
}
Ok(match req.uri().path() {
"/metrics" => Self::metrics(metrics),
"/status" => Self::status(metrics),
_ => Self::not_found()
})
}
fn metrics(metrics: &metrics::Collection) -> Response<Body> {
Response::builder()
.header("Content-Type", "text/plain; version=0.0.4")
.body(
metrics.assemble(metrics::OutputFormat::Prometheus).into()
)
.unwrap()
}
fn status(metrics: &metrics::Collection) -> Response<Body> {
Response::builder()
.header("Content-Type", "text/plain")
.body(
metrics.assemble(metrics::OutputFormat::Plain).into()
)
.unwrap()
}
fn method_not_allowed() -> Response<Body> {
Response::builder()
.status(StatusCode::METHOD_NOT_ALLOWED)
.header("Content-Type", "text/plain")
.body("Method Not Allowed".into())
.unwrap()
}
fn not_found() -> Response<Body> {
Response::builder()
.status(StatusCode::NOT_FOUND)
.header("Content-Type", "text/plain")
.body("Not Found".into())
.unwrap()
}
}
//------------ Wrapped sockets -----------------------------------------------
struct HttpAccept {
sock: TcpListener,
}
impl Accept for HttpAccept {
type Conn = HttpStream;
type Error = io::Error;
fn poll_accept(
mut self: Pin<&mut Self>,
cx: &mut Context
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
let sock = &mut self.sock;
pin_mut!(sock);
sock.poll_next(cx).map(|sock| sock.map(|sock| sock.map(|sock| {
HttpStream {
sock,
}
})))
}
}
struct HttpStream {
sock: TcpStream,
}
impl AsyncRead for HttpStream {
fn poll_read(
mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]
) -> Poll<Result<usize, io::Error>> {
let sock = &mut self.sock;
pin_mut!(sock);
sock.poll_read(cx, buf)
}
}
impl AsyncWrite for HttpStream {
fn poll_write(
mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]
) -> Poll<Result<usize, io::Error>> {
let sock = &mut self.sock;
pin_mut!(sock);
sock.poll_write(cx, buf)
}
fn poll_flush(
mut self: Pin<&mut Self>, cx: &mut Context
) -> Poll<Result<(), io::Error>> {
let sock = &mut self.sock;
pin_mut!(sock);
sock.poll_flush(cx)
}
fn poll_shutdown(
mut self: Pin<&mut Self>, cx: &mut Context
) -> Poll<Result<(), io::Error>> {
let sock = &mut self.sock;
pin_mut!(sock);
sock.poll_shutdown(cx)
}
}

View File

@ -1,8 +1,10 @@
pub mod comms;
pub mod config;
pub mod http;
pub mod log;
pub mod manager;
pub mod metrics;
pub mod payload;
pub mod targets;
pub mod units;

View File

@ -36,6 +36,7 @@ fn _main() -> Result<(), ExitError> {
.enable_all()
.build()
.unwrap();
config.http.run(manager.metrics(), &runtime)?;
manager.spawn(&mut config, &runtime);
runtime.block_on(pending())
}

View File

@ -2,9 +2,11 @@
use std::cell::RefCell;
use std::collections::HashMap;
use std::sync::Arc;
use log::error;
use serde_derive::Deserialize;
use tokio::runtime::Runtime;
use crate::metrics;
use crate::comms::{Gate, GateAgent, Link};
use crate::config::{Config, ConfigFile, Marked};
use crate::log::Failed;
@ -12,6 +14,35 @@ use crate::targets::Target;
use crate::units::Unit;
//------------ Component -----------------------------------------------------
/// A component.
///
/// Upon being started, every component receives one of these. It can use it
/// to communicate with the manager.
#[derive(Debug)]
pub struct Component {
name: Arc<str>,
metrics: metrics::Collection,
}
impl Component {
fn new(name: String, metrics: metrics::Collection) -> Self {
Component { name: name.into(), metrics }
}
/// Returns the name of the component.
pub fn name(&self) -> &Arc<str> {
&self.name
}
/// Register a metrics source.
pub fn register_metrics(&mut self, source: Arc<dyn metrics::Source>) {
self.metrics.register(self.name.clone(), Arc::downgrade(&source));
}
}
//------------ Manager -------------------------------------------------------
#[derive(Default)]
@ -19,6 +50,8 @@ pub struct Manager {
units: HashMap<String, GateAgent>,
pending: HashMap<String, Gate>,
metrics: metrics::Collection,
}
@ -78,7 +111,7 @@ impl Manager {
Ok(config)
}
/// Spawns all units and target in the config unto the given runtime.
/// Spawns all units and targets in the config unto the given runtime.
///
/// # Panics
///
@ -87,13 +120,18 @@ impl Manager {
pub fn spawn(&mut self, config: &mut Config, runtime: &Runtime) {
for (name, unit) in config.units.units.drain() {
let gate = self.pending.remove(&name).unwrap();
runtime.spawn(unit.run(name, gate));
let controller = Component::new(name, self.metrics.clone());
runtime.spawn(unit.run(controller, gate));
}
for (name, target) in config.targets.targets.drain() {
runtime.spawn(target.run(name));
}
}
pub fn metrics(&self) -> metrics::Collection {
self.metrics.clone()
}
}

369
src/metrics.rs Normal file
View File

@ -0,0 +1,369 @@
//! Metrics.
use std::{fmt, iter};
use std::sync::{Arc, Mutex, Weak};
use std::fmt::Write;
use clap::{crate_name, crate_version};
//------------ Module Configuration ------------------------------------------
const PROMETHEUS_PREFIX: &str = "rtrtr";
//------------ Collection ----------------------------------------------------
#[derive(Clone, Default)]
pub struct Collection {
sources: Arc<Mutex<Arc<Vec<RegisteredSource>>>>,
}
impl Collection {
pub fn register(&self, name: Arc<str>, source: Weak<dyn Source>) {
let mut sources = self.sources.lock().unwrap();
let old_sources = sources.clone();
let mut new_sources = Vec::new();
for item in old_sources.iter() {
if item.source.strong_count() > 0 {
new_sources.push(item.clone())
}
}
new_sources.push(
RegisteredSource {
name: name.into(),
source
}
);
*sources = new_sources.into()
}
pub fn assemble(&self, format: OutputFormat) -> String {
let sources = self.sources.lock().unwrap().clone();
let mut target = Target::new(format);
for item in sources.iter() {
if let Some(source) = item.source.upgrade() {
source.append(&item.name, &mut target)
}
}
target.into_string()
}
}
impl fmt::Debug for Collection {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let len = self.sources.lock().unwrap().len();
write!(f, "Collection({} sources)", len)
}
}
//------------ RegisteredSource ----------------------------------------------
#[derive(Clone)]
struct RegisteredSource {
name: Arc<str>,
source: Weak<dyn Source>,
}
//------------ Source --------------------------------------------------------
/// A type producing some metrics.
///
/// All this type needs to be able to do is output its metrics.
pub trait Source: Send + Sync {
/// Appends the metrics to the target.
///
/// The unit name is provided so a source doesnt need to keep it around.
fn append(&self, unit_name: &str, target: &mut Target);
}
impl<T: Source> Source for Arc<T> {
fn append(&self, unit_name: &str, target: &mut Target) {
AsRef::<T>::as_ref(self).append(unit_name, target)
}
}
//------------ Target --------------------------------------------------------
#[derive(Clone, Debug)]
pub struct Target {
format: OutputFormat,
target: String,
}
impl Target {
pub fn new(format: OutputFormat) -> Self {
let mut target = String::new();
if matches!(format, OutputFormat::Plain) {
target.push_str(
concat!(
"version: ", crate_name!(), "/", crate_version!(), "\n"
)
);
}
Target { format, target }
}
pub fn into_string(self) -> String {
self.target
}
pub fn append<F: FnOnce(&mut Records)>(
&mut self,
metric: &Metric,
unit_name: Option<&str>,
values: F,
) {
if !self.format.supports_type(metric.metric_type) {
return
}
if matches!(self.format, OutputFormat::Prometheus) {
self.target.push_str("# HELP ");
self.append_metric_name(metric, unit_name);
self.target.push(' ');
self.target.push_str(metric.help);
self.target.push('\n');
self.target.push_str("# TYPE ");
self.append_metric_name(metric, unit_name);
writeln!(&mut self.target, " {}", metric.metric_type).unwrap();
}
values(&mut Records { target: self, metric, unit_name })
}
pub fn append_simple(
&mut self,
metric: &Metric,
unit_name: Option<&str>,
value: impl fmt::Display,
) {
self.append(metric, unit_name, |records| {
records.value(value)
})
}
fn append_metric_name(
&mut self, metric: &Metric, unit_name: Option<&str>
) {
match self.format {
OutputFormat::Prometheus => {
match unit_name {
Some(unit) => {
write!(&mut self.target,
"{}_{}_{}_{}",
PROMETHEUS_PREFIX, unit, metric.name, metric.unit
).unwrap();
}
None => {
write!(&mut self.target,
"{}_{}_{}",
PROMETHEUS_PREFIX, metric.name, metric.unit
).unwrap();
}
}
}
OutputFormat::Plain => {
match unit_name {
Some(unit) => {
write!(&mut self.target,
"{} {}", unit, metric.name
).unwrap();
}
None => {
write!(&mut self.target,
"{}", metric.name
).unwrap();
}
}
}
}
}
}
//------------ Records -------------------------------------------------------
pub struct Records<'a> {
target: &'a mut Target,
metric: &'a Metric,
unit_name: Option<&'a str>,
}
impl<'a> Records<'a> {
pub fn value(&mut self, value: impl fmt::Display) {
match self.target.format {
OutputFormat::Prometheus => {
self.target.append_metric_name(self.metric, self.unit_name);
writeln!(&mut self.target.target, " {}", value).unwrap()
}
OutputFormat::Plain => {
self.target.append_metric_name(self.metric, self.unit_name);
writeln!(&mut self.target.target, ": {}", value).unwrap()
}
}
}
pub fn label_value(
&mut self,
labels: &[(&str, &str)],
value: impl fmt::Display
) {
match self.target.format {
OutputFormat::Prometheus => {
self.target.append_metric_name(self.metric, self.unit_name);
self.target.target.push('{');
for ((name, value), comma) in
labels.iter().zip(
iter::once(false).chain(iter::repeat(true))
)
{
if comma {
write!(&mut self.target.target,
", {}=\"{}\"", name, value
).unwrap();
}
else {
write!(&mut self.target.target,
"{}=\"{}\"", name, value
).unwrap();
}
}
writeln!(&mut self.target.target, "}} {}", value).unwrap()
}
OutputFormat::Plain => {
self.target.append_metric_name(self.metric, self.unit_name);
for (name, value) in labels {
write!(&mut self.target.target,
" {}={}", name, value
).unwrap();
}
writeln!(&mut self.target.target, ": {}", value).unwrap()
}
}
}
}
//------------ OutputFormat --------------------------------------------------
/// The output format for metrics.
///
/// This is a non-exhaustive enum so that we can add additional metrics
/// without having to do breaking releases. Output for unknown formats should
/// be empty.
#[non_exhaustive]
#[derive(Clone, Copy, Debug)]
pub enum OutputFormat {
/// Prometheus text-base exposition format.
///
/// See https://prometheus.io/docs/instrumenting/exposition_formats/
/// for details.
Prometheus,
/// Simple, human-readable plain-text output.
Plain
}
impl OutputFormat {
/// Returns whether the format supports non-numerical metrics.
pub fn allows_text(self) -> bool {
match self {
OutputFormat::Prometheus => false,
OutputFormat::Plain => false,
}
}
/// Returns whether this output format supports this metric type.
pub fn supports_type(self, metric: MetricType) -> bool {
match (self, metric) {
(OutputFormat::Prometheus, MetricType::Text) => false,
_ => true
}
}
}
//------------ Metric --------------------------------------------------------
pub struct Metric {
pub name: &'static str,
pub help: &'static str,
pub metric_type: MetricType,
pub unit: MetricUnit,
}
impl Metric {
pub const fn new(
name: &'static str, help: &'static str,
metric_type: MetricType, unit: MetricUnit
) -> Self {
Metric { name, help, metric_type, unit
}
}
}
//------------ MetricType ----------------------------------------------------
#[derive(Clone, Copy, Debug)]
pub enum MetricType {
Counter,
Gauge,
Histogram,
Summary,
Text,
}
impl fmt::Display for MetricType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
MetricType::Counter => f.write_str("counter"),
MetricType::Gauge => f.write_str("gauge"),
MetricType::Histogram => f.write_str("histogram"),
MetricType::Summary => f.write_str("summary"),
MetricType::Text => f.write_str("text"),
}
}
}
//------------ MetricUnit ----------------------------------------------------
#[derive(Clone, Copy, Debug)]
pub enum MetricUnit {
Second,
Celsius,
Meter,
Byte,
Ratio,
Volt,
Ampere,
Joule,
Gram,
Total,
Info,
}
impl fmt::Display for MetricUnit {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
MetricUnit::Second => f.write_str("seconds"),
MetricUnit::Celsius => f.write_str("celsius"),
MetricUnit::Meter => f.write_str("meters"),
MetricUnit::Byte => f.write_str("bytes"),
MetricUnit::Ratio => f.write_str("ratio"),
MetricUnit::Volt => f.write_str("volts"),
MetricUnit::Ampere => f.write_str("amperes"),
MetricUnit::Joule => f.write_str("joules"),
MetricUnit::Gram => f.write_str("grams"),
MetricUnit::Total => f.write_str("total"),
MetricUnit::Info => f.write_str("info"),
}
}
}

View File

@ -1,9 +1,12 @@
/// Units that combine the updates from other units.
use std::sync::Arc;
use futures::future::{select_all, FutureExt};
use rand::{thread_rng, Rng};
use serde_derive::Deserialize;
use crate::comms::{Gate, Link, Terminated, UnitStatus};
use crate::metrics;
use crate::comms::{Gate, GateMetrics, Link, Terminated, UnitStatus};
use crate::manager::Component;
use crate::payload;
@ -21,12 +24,14 @@ pub struct Any {
impl Any {
pub async fn run(
mut self, _name: String, mut gate: Gate
mut self, mut component: Component, mut gate: Gate
) -> Result<(), Terminated> {
if self.sources.is_empty() {
gate.update_status(UnitStatus::Gone).await;
return Err(Terminated)
}
let metrics = Arc::new(AnyMetrics::new(&gate));
component.register_metrics(metrics.clone());
let mut gate = [gate];
@ -47,8 +52,11 @@ impl Any {
).await.0
};
match res {
Ok(Some(update)) => gate[0].update_data(update).await,
Ok(None) => { }
Ok(Some(update)) => {
gate[0].update_data(update).await
}
Ok(None) => {
}
Err(()) => {
self.sources[curr_idx].suspend().await;
curr_idx = self.pick(Some(curr_idx));
@ -101,3 +109,25 @@ impl<'a> AnySource<'a> {
}
}
//------------ AnyMetrics ----------------------------------------------------
#[derive(Debug, Default)]
struct AnyMetrics {
gate: Arc<GateMetrics>,
}
impl AnyMetrics {
fn new(gate: &Gate) -> Self {
AnyMetrics {
gate: gate.metrics(),
}
}
}
impl metrics::Source for AnyMetrics {
fn append(&self, unit_name: &str, target: &mut metrics::Target) {
self.gate.append(unit_name, target);
}
}

View File

@ -25,6 +25,7 @@ mod rtr;
use serde_derive::Deserialize;
use crate::comms::Gate;
use crate::manager::Component;
/// The fundamental entity for data processing.
#[derive(Debug, Deserialize)]
@ -38,10 +39,12 @@ pub enum Unit {
}
impl Unit {
pub async fn run(self, name: String, gate: Gate) {
pub async fn run(
self, component: Component, gate: Gate
) {
let _ = match self {
Unit::Any(unit) => unit.run(name, gate).await,
Unit::RtrTcp(unit) => unit.run(name, gate).await,
Unit::Any(unit) => unit.run(component, gate).await,
Unit::RtrTcp(unit) => unit.run(component, gate).await,
};
}
}

View File

@ -12,7 +12,9 @@ use rpki_rtr::state::{Serial, State};
use serde_derive::Deserialize;
use tokio::net::TcpStream;
use tokio::time::{timeout_at, Instant};
use crate::comms::{Gate, GateStatus, Terminated};
use crate::metrics;
use crate::comms::{Gate, GateMetrics, GateStatus, Terminated};
use crate::manager::Component;
use crate::payload;
@ -43,9 +45,11 @@ impl Tcp {
}
pub async fn run(
mut self, name: String, mut gate: Gate
mut self, mut component: Component, mut gate: Gate
) -> Result<(), Terminated> {
let mut target = Target::new(name);
let mut target = Target::new(component.name().clone());
let metrics = Arc::new(RtrMetrics::new(&gate));
component.register_metrics(metrics.clone());
loop {
let mut client = match self.connect(target, &mut gate).await {
Ok(client) => client,
@ -167,11 +171,11 @@ struct Target {
state: Option<State>,
name: String,
name: Arc<str>,
}
impl Target {
pub fn new(name: String) -> Self {
pub fn new(name: Arc<str>) -> Self {
Target {
current: Default::default(),
state: None,
@ -259,3 +263,25 @@ impl VrpUpdate for TargetUpdate {
}
}
//------------ RtrMetrics ----------------------------------------------------
#[derive(Debug, Default)]
struct RtrMetrics {
gate: Arc<GateMetrics>,
}
impl RtrMetrics {
fn new(gate: &Gate) -> Self {
RtrMetrics {
gate: gate.metrics(),
}
}
}
impl metrics::Source for RtrMetrics {
fn append(&self, unit_name: &str, target: &mut metrics::Target) {
self.gate.append(unit_name, target);
}
}