diff --git a/Cargo.toml b/Cargo.toml index 332a52e..8620d23 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,8 +12,10 @@ readme = "README.md" [dependencies] chrono = "0.4.11" clap = "2.33" +crossbeam-utils = "0.7.2" fern = "0.6.0" futures = "0.3" +hyper = "0.13.4" log = "0.4" log-reroute = "0.1.5" rand = "0.7.3" diff --git a/etc/rtrtr.conf b/etc/rtrtr.conf index 5ed59b8..6d9f6ea 100644 --- a/etc/rtrtr.conf +++ b/etc/rtrtr.conf @@ -21,6 +21,12 @@ log_facility = "daemon" # If file logging is used, the log file must be given. log_file = "/var/log/rtrtr.log" +# Where should the HTTP server listen on? +# +# Currently, the server is only used for metrics, but eventually we will have +# HTTP targets that publish their data under a path of the HTTP server. +http-listen = ["127.0.0.1:8080"] + # RTRTR uses two classes of components: units and targets. Units take data # from somewhere and produce a single, constantly updated data set. Targets # take the data set from exactly one other unit and serve it in some specific diff --git a/src/comms.rs b/src/comms.rs index 97d78af..83c2858 100644 --- a/src/comms.rs +++ b/src/comms.rs @@ -8,11 +8,19 @@ //! sends its updates. The opposite end is called a `Link` and is held by //! other units or targets. +use std::fmt; +use std::sync::atomic; +use std::sync::Arc; +use std::sync::atomic::{AtomicU32, AtomicUsize}; +use chrono::{DateTime, Utc}; +use crossbeam_utils::atomic::AtomicCell; use slab::Slab; use serde_derive::Deserialize; use tokio::sync::{mpsc, oneshot}; -use crate::{manager, payload}; +use crate::{manager, metrics, payload}; use crate::config::Marked; +use crate::metrics::{Metric, MetricType, MetricUnit}; + //------------ Configuration ------------------------------------------------- @@ -39,7 +47,7 @@ const COMMAND_QUEUE_LEN: usize = 16; /// gate becomes active again. /// /// In order for the gate to maintain its own state, the unit needs to -/// regularly run the `process` method. It return, the unit will receive an +/// regularly run the `process` method. In return, the unit will receive an /// update to the gate’s state as soon as it becomes available. /// /// Sending of updates happens via the `update_data` and `update_status` @@ -57,6 +65,9 @@ pub struct Gate { /// The current unit status. unit_status: UnitStatus, + + /// The gate metrics. + metrics: Arc, } @@ -71,12 +82,18 @@ impl Gate { commands: rx, updates: Slab::new(), suspended: 0, - unit_status: UnitStatus::Healthy, + unit_status: UnitStatus::default(), + metrics: Default::default(), }; let agent = GateAgent { commands: tx }; (gate, agent) } + /// Returns the metrics. + pub fn metrics(&self) -> Arc { + self.metrics.clone() + } + /// Runs the gate’s internal machine. /// /// This method returns a future that runs the gate’s internal machine. @@ -127,7 +144,8 @@ impl Gate { } item.sender = None } - self.updates.retain(|_, item| item.sender.is_some()) + self.updates.retain(|_, item| item.sender.is_some()); + self.metrics.update(&update); } /// Updates the unit status. @@ -146,7 +164,8 @@ impl Gate { } item.sender = None } - self.updates.retain(|_, item| item.sender.is_some()) + self.updates.retain(|_, item| item.sender.is_some()); + self.metrics.update_status(update); } /// Returns the current gate status. @@ -207,6 +226,91 @@ impl GateAgent { } +//------------ GateMetrics --------------------------------------------------- + +/// Metrics about the updates distributed via the gate. +#[derive(Debug, Default)] +pub struct GateMetrics { + status: AtomicCell, + serial: AtomicU32, + count: AtomicUsize, + update: AtomicCell>>, +} + +impl GateMetrics { + fn update(&self, update: &payload::Update) { + self.serial.store(update.serial().into(), atomic::Ordering::Relaxed); + self.count.store(update.set().len(), atomic::Ordering::Relaxed); + self.update.store(Some(Utc::now())); + } + + fn update_status(&self, status: UnitStatus) { + self.status.store(status) + } +} + +impl GateMetrics { + const STATUS_METRIC: Metric = Metric::new( + "gate_status", "the operational status of the unit", + MetricType::Text, MetricUnit::Info + ); + const SERIAL_METRIC: Metric = Metric::new( + "serial", "the serial number of the unit’s updates", + MetricType::Counter, MetricUnit::Info + ); + const COUNT_METRIC: Metric = Metric::new( + "vrps", "the number of VRPs in the last update", + MetricType::Gauge, MetricUnit::Total + ); + const UPDATE_METRIC: Metric = Metric::new( + "last_update", "the date and time of the last update", + MetricType::Text, MetricUnit::Info + ); + const UPDATE_AGO_METRIC: Metric = Metric::new( + "since_last_update", "the number of seconds since the last update", + MetricType::Gauge, MetricUnit::Second + ); +} + +impl metrics::Source for GateMetrics { + fn append(&self, unit_name: &str, target: &mut metrics::Target) { + target.append_simple( + &Self::STATUS_METRIC, Some(unit_name), self.status.load() + ); + target.append_simple( + &Self::SERIAL_METRIC, Some(unit_name), + self.serial.load(atomic::Ordering::Relaxed) + ); + target.append_simple( + &Self::COUNT_METRIC, Some(unit_name), + self.count.load(atomic::Ordering::Relaxed) + ); + match self.update.load() { + Some(update) => { + target.append_simple( + &Self::UPDATE_METRIC, Some(unit_name), + update + ); + let ago = Utc::now().signed_duration_since(update); + let ago = (ago.num_milliseconds() as f64) / 1000.; + target.append_simple( + &Self::UPDATE_AGO_METRIC, Some(unit_name), ago + ); + } + None => { + target.append_simple( + &Self::UPDATE_METRIC, Some(unit_name), + "N/A" + ); + target.append_simple( + &Self::UPDATE_AGO_METRIC, Some(unit_name), -1 + ); + } + } + } +} + + //------------ Link ---------------------------------------------------------- /// A link to another unit. @@ -447,10 +551,26 @@ pub enum UnitStatus { Gone, } +impl Default for UnitStatus { + fn default() -> Self { + UnitStatus::Healthy + } +} + +impl fmt::Display for UnitStatus { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str(match *self { + UnitStatus::Healthy => "healthy", + UnitStatus::Stalled => "stalled", + UnitStatus::Gone => "gone", + }) + } +} + //------------ Terminated ---------------------------------------------------- -/// A unit has been terminated. +/// An error signalling that a unit has been terminated. /// /// In response to this error, a unit’s run function should return. #[derive(Clone, Copy, Debug)] diff --git a/src/config.rs b/src/config.rs index d86c7a0..eb93652 100644 --- a/src/config.rs +++ b/src/config.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use clap::{App, Arg, ArgMatches}; use serde_derive::Deserialize; use toml::Spanned; +use crate::http; use crate::log::{ExitError, Failed, LogConfig}; use crate::manager::{Manager, TargetSet, UnitSet}; @@ -20,6 +21,9 @@ pub struct Config { #[serde(flatten)] pub log: LogConfig, + + #[serde(flatten)] + pub http: http::Server, } impl Config { diff --git a/src/http.rs b/src/http.rs new file mode 100644 index 0000000..250cc75 --- /dev/null +++ b/src/http.rs @@ -0,0 +1,198 @@ +/// The HTTP server. + +use std::io; +use std::convert::Infallible; +use std::net::SocketAddr; +use std::net::TcpListener as StdListener; +use std::pin::Pin; +use std::task::{Context, Poll}; +use futures::pin_mut; +use hyper::{Body, Method, Request, Response, StatusCode}; +use hyper::server::accept::Accept; +use hyper::service::{make_service_fn, service_fn}; +use log::error; +use serde_derive::Deserialize; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::runtime::Runtime; +use tokio::stream::Stream; +use crate::log::ExitError; +use crate::metrics; + + +//------------ Server -------------------------------------------------------- + +#[derive(Clone, Deserialize)] +pub struct Server { + #[serde(rename = "http-listen")] + listen: Vec, +} + +impl Server { + pub fn run( + &self, + metrics: metrics::Collection, + runtime: &Runtime, + ) -> Result<(), ExitError> { + let mut listeners = Vec::new(); + for addr in &self.listen { + // Binding needs to have happened before dropping privileges + // during detach. So we do this here synchronously. + match StdListener::bind(addr) { + Ok(listener) => listeners.push(listener), + Err(err) => { + error!("Fatal: error listening on {}: {}", addr, err); + return Err(ExitError); + } + }; + } + for listener in listeners { + runtime.spawn( + Self::single_listener(listener, metrics.clone()) + ); + } + Ok(()) + } + + async fn single_listener( + listener: StdListener, + metrics: metrics::Collection, + ) { + let make_service = make_service_fn(|_conn| { + let metrics = metrics.clone(); + async move { + Ok::<_, Infallible>(service_fn(move |req| { + let metrics = metrics.clone(); + async move { Self::handle_request(req, &metrics).await } + })) + } + }); + let listener = match TcpListener::from_std(listener) { + Ok(listener) => listener, + Err(err) => { + error!("Error on HTTP listener: {}", err); + return + } + }; + if let Err(err) = hyper::Server::builder( + HttpAccept { sock: listener } + ).serve(make_service).await { + error!("HTTP server error: {}", err); + } + } + + async fn handle_request( + req: Request, + metrics: &metrics::Collection + ) -> Result, Infallible> { + if *req.method() != Method::GET { + return Ok(Self::method_not_allowed()) + } + Ok(match req.uri().path() { + "/metrics" => Self::metrics(metrics), + "/status" => Self::status(metrics), + _ => Self::not_found() + }) + } + + fn metrics(metrics: &metrics::Collection) -> Response { + Response::builder() + .header("Content-Type", "text/plain; version=0.0.4") + .body( + metrics.assemble(metrics::OutputFormat::Prometheus).into() + ) + .unwrap() + } + + fn status(metrics: &metrics::Collection) -> Response { + Response::builder() + .header("Content-Type", "text/plain") + .body( + metrics.assemble(metrics::OutputFormat::Plain).into() + ) + .unwrap() + } + + fn method_not_allowed() -> Response { + Response::builder() + .status(StatusCode::METHOD_NOT_ALLOWED) + .header("Content-Type", "text/plain") + .body("Method Not Allowed".into()) + .unwrap() + } + + fn not_found() -> Response { + Response::builder() + .status(StatusCode::NOT_FOUND) + .header("Content-Type", "text/plain") + .body("Not Found".into()) + .unwrap() + } +} + + +//------------ Wrapped sockets ----------------------------------------------- + +struct HttpAccept { + sock: TcpListener, +} + +impl Accept for HttpAccept { + type Conn = HttpStream; + type Error = io::Error; + + fn poll_accept( + mut self: Pin<&mut Self>, + cx: &mut Context + ) -> Poll>> { + let sock = &mut self.sock; + pin_mut!(sock); + sock.poll_next(cx).map(|sock| sock.map(|sock| sock.map(|sock| { + HttpStream { + sock, + } + }))) + } +} + + +struct HttpStream { + sock: TcpStream, +} + +impl AsyncRead for HttpStream { + fn poll_read( + mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8] + ) -> Poll> { + let sock = &mut self.sock; + pin_mut!(sock); + sock.poll_read(cx, buf) + } +} + +impl AsyncWrite for HttpStream { + fn poll_write( + mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8] + ) -> Poll> { + let sock = &mut self.sock; + pin_mut!(sock); + sock.poll_write(cx, buf) + } + + fn poll_flush( + mut self: Pin<&mut Self>, cx: &mut Context + ) -> Poll> { + let sock = &mut self.sock; + pin_mut!(sock); + sock.poll_flush(cx) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, cx: &mut Context + ) -> Poll> { + let sock = &mut self.sock; + pin_mut!(sock); + sock.poll_shutdown(cx) + } +} + diff --git a/src/lib.rs b/src/lib.rs index 2dc191e..988be1e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,10 @@ pub mod comms; pub mod config; +pub mod http; pub mod log; pub mod manager; +pub mod metrics; pub mod payload; pub mod targets; pub mod units; diff --git a/src/main.rs b/src/main.rs index f2e50d4..6fefe76 100644 --- a/src/main.rs +++ b/src/main.rs @@ -36,6 +36,7 @@ fn _main() -> Result<(), ExitError> { .enable_all() .build() .unwrap(); + config.http.run(manager.metrics(), &runtime)?; manager.spawn(&mut config, &runtime); runtime.block_on(pending()) } diff --git a/src/manager.rs b/src/manager.rs index 9819eb3..39deb8c 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -2,9 +2,11 @@ use std::cell::RefCell; use std::collections::HashMap; +use std::sync::Arc; use log::error; use serde_derive::Deserialize; use tokio::runtime::Runtime; +use crate::metrics; use crate::comms::{Gate, GateAgent, Link}; use crate::config::{Config, ConfigFile, Marked}; use crate::log::Failed; @@ -12,6 +14,35 @@ use crate::targets::Target; use crate::units::Unit; +//------------ Component ----------------------------------------------------- + +/// A component. +/// +/// Upon being started, every component receives one of these. It can use it +/// to communicate with the manager. +#[derive(Debug)] +pub struct Component { + name: Arc, + metrics: metrics::Collection, +} + +impl Component { + fn new(name: String, metrics: metrics::Collection) -> Self { + Component { name: name.into(), metrics } + } + + /// Returns the name of the component. + pub fn name(&self) -> &Arc { + &self.name + } + + /// Register a metrics source. + pub fn register_metrics(&mut self, source: Arc) { + self.metrics.register(self.name.clone(), Arc::downgrade(&source)); + } +} + + //------------ Manager ------------------------------------------------------- #[derive(Default)] @@ -19,6 +50,8 @@ pub struct Manager { units: HashMap, pending: HashMap, + + metrics: metrics::Collection, } @@ -78,7 +111,7 @@ impl Manager { Ok(config) } - /// Spawns all units and target in the config unto the given runtime. + /// Spawns all units and targets in the config unto the given runtime. /// /// # Panics /// @@ -87,13 +120,18 @@ impl Manager { pub fn spawn(&mut self, config: &mut Config, runtime: &Runtime) { for (name, unit) in config.units.units.drain() { let gate = self.pending.remove(&name).unwrap(); - runtime.spawn(unit.run(name, gate)); + let controller = Component::new(name, self.metrics.clone()); + runtime.spawn(unit.run(controller, gate)); } for (name, target) in config.targets.targets.drain() { runtime.spawn(target.run(name)); } } + + pub fn metrics(&self) -> metrics::Collection { + self.metrics.clone() + } } diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 0000000..d46c68a --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,369 @@ +//! Metrics. + +use std::{fmt, iter}; +use std::sync::{Arc, Mutex, Weak}; +use std::fmt::Write; +use clap::{crate_name, crate_version}; + + +//------------ Module Configuration ------------------------------------------ + +const PROMETHEUS_PREFIX: &str = "rtrtr"; + + +//------------ Collection ---------------------------------------------------- + +#[derive(Clone, Default)] +pub struct Collection { + sources: Arc>>>, +} + +impl Collection { + pub fn register(&self, name: Arc, source: Weak) { + let mut sources = self.sources.lock().unwrap(); + + let old_sources = sources.clone(); + let mut new_sources = Vec::new(); + for item in old_sources.iter() { + if item.source.strong_count() > 0 { + new_sources.push(item.clone()) + } + } + new_sources.push( + RegisteredSource { + name: name.into(), + source + } + ); + *sources = new_sources.into() + } + + pub fn assemble(&self, format: OutputFormat) -> String { + let sources = self.sources.lock().unwrap().clone(); + let mut target = Target::new(format); + for item in sources.iter() { + if let Some(source) = item.source.upgrade() { + source.append(&item.name, &mut target) + } + } + target.into_string() + } +} + + +impl fmt::Debug for Collection { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let len = self.sources.lock().unwrap().len(); + write!(f, "Collection({} sources)", len) + } +} + + +//------------ RegisteredSource ---------------------------------------------- + +#[derive(Clone)] +struct RegisteredSource { + name: Arc, + source: Weak, +} + + +//------------ Source -------------------------------------------------------- + +/// A type producing some metrics. +/// +/// All this type needs to be able to do is output its metrics. +pub trait Source: Send + Sync { + /// Appends the metrics to the target. + /// + /// The unit name is provided so a source doesn’t need to keep it around. + fn append(&self, unit_name: &str, target: &mut Target); +} + +impl Source for Arc { + fn append(&self, unit_name: &str, target: &mut Target) { + AsRef::::as_ref(self).append(unit_name, target) + } +} + + +//------------ Target -------------------------------------------------------- + +#[derive(Clone, Debug)] +pub struct Target { + format: OutputFormat, + target: String, +} + +impl Target { + pub fn new(format: OutputFormat) -> Self { + let mut target = String::new(); + if matches!(format, OutputFormat::Plain) { + target.push_str( + concat!( + "version: ", crate_name!(), "/", crate_version!(), "\n" + ) + ); + } + Target { format, target } + } + + pub fn into_string(self) -> String { + self.target + } + + pub fn append( + &mut self, + metric: &Metric, + unit_name: Option<&str>, + values: F, + ) { + if !self.format.supports_type(metric.metric_type) { + return + } + + if matches!(self.format, OutputFormat::Prometheus) { + self.target.push_str("# HELP "); + self.append_metric_name(metric, unit_name); + self.target.push(' '); + self.target.push_str(metric.help); + self.target.push('\n'); + + self.target.push_str("# TYPE "); + self.append_metric_name(metric, unit_name); + writeln!(&mut self.target, " {}", metric.metric_type).unwrap(); + } + values(&mut Records { target: self, metric, unit_name }) + } + + pub fn append_simple( + &mut self, + metric: &Metric, + unit_name: Option<&str>, + value: impl fmt::Display, + ) { + self.append(metric, unit_name, |records| { + records.value(value) + }) + } + + fn append_metric_name( + &mut self, metric: &Metric, unit_name: Option<&str> + ) { + match self.format { + OutputFormat::Prometheus => { + match unit_name { + Some(unit) => { + write!(&mut self.target, + "{}_{}_{}_{}", + PROMETHEUS_PREFIX, unit, metric.name, metric.unit + ).unwrap(); + } + None => { + write!(&mut self.target, + "{}_{}_{}", + PROMETHEUS_PREFIX, metric.name, metric.unit + ).unwrap(); + } + } + } + OutputFormat::Plain => { + match unit_name { + Some(unit) => { + write!(&mut self.target, + "{} {}", unit, metric.name + ).unwrap(); + } + None => { + write!(&mut self.target, + "{}", metric.name + ).unwrap(); + } + } + } + } + } +} + + +//------------ Records ------------------------------------------------------- + +pub struct Records<'a> { + target: &'a mut Target, + metric: &'a Metric, + unit_name: Option<&'a str>, +} + +impl<'a> Records<'a> { + pub fn value(&mut self, value: impl fmt::Display) { + match self.target.format { + OutputFormat::Prometheus => { + self.target.append_metric_name(self.metric, self.unit_name); + writeln!(&mut self.target.target, " {}", value).unwrap() + } + OutputFormat::Plain => { + self.target.append_metric_name(self.metric, self.unit_name); + writeln!(&mut self.target.target, ": {}", value).unwrap() + } + } + } + + pub fn label_value( + &mut self, + labels: &[(&str, &str)], + value: impl fmt::Display + ) { + match self.target.format { + OutputFormat::Prometheus => { + self.target.append_metric_name(self.metric, self.unit_name); + self.target.target.push('{'); + for ((name, value), comma) in + labels.iter().zip( + iter::once(false).chain(iter::repeat(true)) + ) + { + if comma { + write!(&mut self.target.target, + ", {}=\"{}\"", name, value + ).unwrap(); + } + else { + write!(&mut self.target.target, + "{}=\"{}\"", name, value + ).unwrap(); + } + } + writeln!(&mut self.target.target, "}} {}", value).unwrap() + } + OutputFormat::Plain => { + self.target.append_metric_name(self.metric, self.unit_name); + for (name, value) in labels { + write!(&mut self.target.target, + " {}={}", name, value + ).unwrap(); + } + writeln!(&mut self.target.target, ": {}", value).unwrap() + } + } + } +} + + +//------------ OutputFormat -------------------------------------------------- + +/// The output format for metrics. +/// +/// This is a non-exhaustive enum so that we can add additional metrics +/// without having to do breaking releases. Output for unknown formats should +/// be empty. +#[non_exhaustive] +#[derive(Clone, Copy, Debug)] +pub enum OutputFormat { + /// Prometheus’ text-base exposition format. + /// + /// See https://prometheus.io/docs/instrumenting/exposition_formats/ + /// for details. + Prometheus, + + /// Simple, human-readable plain-text output. + Plain +} + +impl OutputFormat { + /// Returns whether the format supports non-numerical metrics. + pub fn allows_text(self) -> bool { + match self { + OutputFormat::Prometheus => false, + OutputFormat::Plain => false, + } + } + + /// Returns whether this output format supports this metric type. + pub fn supports_type(self, metric: MetricType) -> bool { + match (self, metric) { + (OutputFormat::Prometheus, MetricType::Text) => false, + _ => true + } + } +} + + +//------------ Metric -------------------------------------------------------- + +pub struct Metric { + pub name: &'static str, + pub help: &'static str, + pub metric_type: MetricType, + pub unit: MetricUnit, +} + +impl Metric { + pub const fn new( + name: &'static str, help: &'static str, + metric_type: MetricType, unit: MetricUnit + ) -> Self { + Metric { name, help, metric_type, unit + } + } +} + + +//------------ MetricType ---------------------------------------------------- + +#[derive(Clone, Copy, Debug)] +pub enum MetricType { + Counter, + Gauge, + Histogram, + Summary, + Text, +} + +impl fmt::Display for MetricType { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + MetricType::Counter => f.write_str("counter"), + MetricType::Gauge => f.write_str("gauge"), + MetricType::Histogram => f.write_str("histogram"), + MetricType::Summary => f.write_str("summary"), + MetricType::Text => f.write_str("text"), + } + } +} + + +//------------ MetricUnit ---------------------------------------------------- + +#[derive(Clone, Copy, Debug)] +pub enum MetricUnit { + Second, + Celsius, + Meter, + Byte, + Ratio, + Volt, + Ampere, + Joule, + Gram, + Total, + Info, +} + +impl fmt::Display for MetricUnit { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + MetricUnit::Second => f.write_str("seconds"), + MetricUnit::Celsius => f.write_str("celsius"), + MetricUnit::Meter => f.write_str("meters"), + MetricUnit::Byte => f.write_str("bytes"), + MetricUnit::Ratio => f.write_str("ratio"), + MetricUnit::Volt => f.write_str("volts"), + MetricUnit::Ampere => f.write_str("amperes"), + MetricUnit::Joule => f.write_str("joules"), + MetricUnit::Gram => f.write_str("grams"), + MetricUnit::Total => f.write_str("total"), + MetricUnit::Info => f.write_str("info"), + } + } +} + diff --git a/src/units/combine.rs b/src/units/combine.rs index a7f7d8c..0f32ce8 100644 --- a/src/units/combine.rs +++ b/src/units/combine.rs @@ -1,9 +1,12 @@ /// Units that combine the updates from other units. +use std::sync::Arc; use futures::future::{select_all, FutureExt}; use rand::{thread_rng, Rng}; use serde_derive::Deserialize; -use crate::comms::{Gate, Link, Terminated, UnitStatus}; +use crate::metrics; +use crate::comms::{Gate, GateMetrics, Link, Terminated, UnitStatus}; +use crate::manager::Component; use crate::payload; @@ -21,12 +24,14 @@ pub struct Any { impl Any { pub async fn run( - mut self, _name: String, mut gate: Gate + mut self, mut component: Component, mut gate: Gate ) -> Result<(), Terminated> { if self.sources.is_empty() { gate.update_status(UnitStatus::Gone).await; return Err(Terminated) } + let metrics = Arc::new(AnyMetrics::new(&gate)); + component.register_metrics(metrics.clone()); let mut gate = [gate]; @@ -47,8 +52,11 @@ impl Any { ).await.0 }; match res { - Ok(Some(update)) => gate[0].update_data(update).await, - Ok(None) => { } + Ok(Some(update)) => { + gate[0].update_data(update).await + } + Ok(None) => { + } Err(()) => { self.sources[curr_idx].suspend().await; curr_idx = self.pick(Some(curr_idx)); @@ -101,3 +109,25 @@ impl<'a> AnySource<'a> { } } + +//------------ AnyMetrics ---------------------------------------------------- + +#[derive(Debug, Default)] +struct AnyMetrics { + gate: Arc, +} + +impl AnyMetrics { + fn new(gate: &Gate) -> Self { + AnyMetrics { + gate: gate.metrics(), + } + } +} + +impl metrics::Source for AnyMetrics { + fn append(&self, unit_name: &str, target: &mut metrics::Target) { + self.gate.append(unit_name, target); + } +} + diff --git a/src/units/mod.rs b/src/units/mod.rs index ce00a3e..e7b63cb 100644 --- a/src/units/mod.rs +++ b/src/units/mod.rs @@ -25,6 +25,7 @@ mod rtr; use serde_derive::Deserialize; use crate::comms::Gate; +use crate::manager::Component; /// The fundamental entity for data processing. #[derive(Debug, Deserialize)] @@ -38,10 +39,12 @@ pub enum Unit { } impl Unit { - pub async fn run(self, name: String, gate: Gate) { + pub async fn run( + self, component: Component, gate: Gate + ) { let _ = match self { - Unit::Any(unit) => unit.run(name, gate).await, - Unit::RtrTcp(unit) => unit.run(name, gate).await, + Unit::Any(unit) => unit.run(component, gate).await, + Unit::RtrTcp(unit) => unit.run(component, gate).await, }; } } diff --git a/src/units/rtr.rs b/src/units/rtr.rs index 9a6ff71..eb1c336 100644 --- a/src/units/rtr.rs +++ b/src/units/rtr.rs @@ -12,7 +12,9 @@ use rpki_rtr::state::{Serial, State}; use serde_derive::Deserialize; use tokio::net::TcpStream; use tokio::time::{timeout_at, Instant}; -use crate::comms::{Gate, GateStatus, Terminated}; +use crate::metrics; +use crate::comms::{Gate, GateMetrics, GateStatus, Terminated}; +use crate::manager::Component; use crate::payload; @@ -43,9 +45,11 @@ impl Tcp { } pub async fn run( - mut self, name: String, mut gate: Gate + mut self, mut component: Component, mut gate: Gate ) -> Result<(), Terminated> { - let mut target = Target::new(name); + let mut target = Target::new(component.name().clone()); + let metrics = Arc::new(RtrMetrics::new(&gate)); + component.register_metrics(metrics.clone()); loop { let mut client = match self.connect(target, &mut gate).await { Ok(client) => client, @@ -167,11 +171,11 @@ struct Target { state: Option, - name: String, + name: Arc, } impl Target { - pub fn new(name: String) -> Self { + pub fn new(name: Arc) -> Self { Target { current: Default::default(), state: None, @@ -259,3 +263,25 @@ impl VrpUpdate for TargetUpdate { } } + +//------------ RtrMetrics ---------------------------------------------------- + +#[derive(Debug, Default)] +struct RtrMetrics { + gate: Arc, +} + +impl RtrMetrics { + fn new(gate: &Gate) -> Self { + RtrMetrics { + gate: gate.metrics(), + } + } +} + +impl metrics::Source for RtrMetrics { + fn append(&self, unit_name: &str, target: &mut metrics::Target) { + self.gate.append(unit_name, target); + } +} +