1
0
mirror of https://github.com/NLnetLabs/rtrtr.git synced 2024-05-11 05:55:07 +00:00

Add a merge unit. (#110)

This PR adds a new merge unit that merges the payload from all its healthy
sources.
This commit is contained in:
Martin Hoffmann
2024-04-29 10:36:39 +02:00
committed by GitHub
parent 01cda627a7
commit bbcb3b0a6f
6 changed files with 311 additions and 84 deletions

View File

@ -240,6 +240,16 @@ random
at random. If the value is ``false`` or not given, the source units are at random. If the value is ``false`` or not given, the source units are
picked in the order given. picked in the order given.
Merge Unit
----------
A unit of type ``"merge"`` will merge the data from all data sets of its
source units. It has the following configuration options:
sources
A list of strings each containing the name of a unit to use as a
source.
SLURM Unit SLURM Unit
---------- ----------

View File

@ -94,6 +94,12 @@ sources = [ "local-3323", "local-3324", "cloudflare-json" ]
random = false random = false
# We can also merge the data from multiple sources
[units.merged-rtr]
type = "merge"
sources = [ "local-3323", "local-3324", "cloudflare-json" ]
# Local exceptions can be applied, too. # Local exceptions can be applied, too.
[units.slurm] [units.slurm]
type = "slurm" type = "slurm"
@ -115,6 +121,14 @@ listen = [ "127.0.0.1:9001" ]
unit = "any-rtr" unit = "any-rtr"
# Lets have a target for the merged data, too.
[targets.local-9002]
type = "rtr"
listen = [ "127.0.0.1:9002" ]
unit = "merged-rtr"
# We can also provide the data as JSON over HTTP.
[targets.http-json] [targets.http-json]
type = "http" type = "http"
path = "/json" path = "/json"

View File

@ -47,7 +47,7 @@
//! base type yet returns references to the items. For now, these need to //! base type yet returns references to the items. For now, these need to
//! separate because the `Iterator` trait requires the returned items to have //! separate because the `Iterator` trait requires the returned items to have
//! the same lifetime as the iterator type itself. //! the same lifetime as the iterator type itself.
use std::slice; use std::{mem, slice};
use std::borrow::Borrow; use std::borrow::Borrow;
use std::cmp::Ordering; use std::cmp::Ordering;
use std::collections::HashSet; use std::collections::HashSet;
@ -254,6 +254,19 @@ impl Block {
} }
} }
/// Splits off the part of the block before the given pack index.
///
/// Moves the start of this block to the given index and returns a block
/// from the original start to the new start.
fn split_off_at(&mut self, pack_index: usize) -> Block {
assert!(pack_index >= self.range.start);
assert!(pack_index <= self.range.end);
let mut res = self.clone();
res.range.end = pack_index;
self.range.start = res.range.end;
res
}
/// Returns an owned iterator-like for the block. /// Returns an owned iterator-like for the block.
pub fn owned_iter(&self) -> OwnedBlockIter { pub fn owned_iter(&self) -> OwnedBlockIter {
OwnedBlockIter::new(self.clone()) OwnedBlockIter::new(self.clone())
@ -417,15 +430,6 @@ impl Set {
OwnedSetIter::new(self) OwnedSetIter::new(self)
} }
/// Returns a set which has this set and the other set merged.
///
/// The two sets may overlap.
pub fn merge(&self, other: &Set) -> Set {
let mut res = self.to_builder();
res.insert_set(other.clone());
res.finalize()
}
/// Returns a set with the indicated elements removed. /// Returns a set with the indicated elements removed.
/// ///
/// Each element in the current set is presented to the closure and only /// Each element in the current set is presented to the closure and only
@ -478,6 +482,108 @@ impl Set {
} }
} }
/// Returns a set merging the elements from this and another set.
pub fn merge(&self, other: &Set) -> Set {
let mut left_tail = self.blocks.iter().cloned();
let mut right_tail = other.blocks.iter().cloned();
let mut left_head = left_tail.next();
let mut right_head = right_tail.next();
let mut target = Vec::new();
let mut target_len = 0;
// Merge potentially overlapping blocks.
loop {
// Skip over empty blocks. If either side runs out of block, we
// are done with this difficult part.
let left = loop {
match left_head.as_mut() {
Some(block) if block.is_empty() => { }
Some(block) => break Some(block),
None => break None,
}
left_head = left_tail.next();
};
let right = loop {
match right_head.as_mut() {
Some(block) if block.is_empty() => { }
Some(block) => break Some(block),
None => break None,
}
right_head = right_tail.next();
};
let (left, right) = match (left, right) {
(Some(left), Some(right)) => (left, right),
_ => break,
};
// Make left the block that starts first. Since neither block is
// empty, we can unwrap.
if right.first().unwrap() < left.first().unwrap() {
mem::swap(left, right);
}
// Find out how much of left we can add.
//
// First, find the part of left that is before right.
let first_right = right.first().unwrap();
let mut left_idx = left.range.start;
while let Some(item) = left.get_from_pack(left_idx) {
if item >= first_right {
break;
}
left_idx += 1;
}
// Now progress left_idx as long as elements are equal with right.
let mut right_idx = right.range.start;
while let (Some(left_item), Some(right_item)) = (
left.get_from_pack(left_idx), right.get_from_pack(right_idx)
) {
if left_item == right_item {
left_idx += 1;
right_idx += 1;
}
else {
break
}
}
// left_idx now is the end of the range in left we need to add to
// the target.
let new = left.split_off_at(left_idx);
target_len += new.len();
target.push(new);
// Finally, right to its new start.
right.range.start = right_idx;
}
// At least one of the two iterators is now exhausted. So we can now
// just push whatever is left on either to the target. Dont forget
// the heads, though, only one of which at most should not be empty.
if let Some(block) = left_head {
if !block.is_empty() {
target_len += block.len();
target.push(block);
}
}
if let Some(block) = right_head {
if !block.is_empty() {
target_len += block.len();
target.push(block);
}
}
for block in left_tail.chain(right_tail) {
target_len += block.len();
target.push(block)
}
Set {
blocks: target.into(),
len: target_len
}
}
/// Returns the diff to get from `other` to `self`. /// Returns the diff to get from `other` to `self`.
pub fn diff_from(&self, other: &Set) -> Diff { pub fn diff_from(&self, other: &Set) -> Diff {
let mut diff = DiffBuilder::empty(); let mut diff = DiffBuilder::empty();
@ -1188,31 +1294,33 @@ pub(crate) mod testrig {
) )
} }
/// Create a pack of payload from a slice of `u32`s. /// Create a pack of payload from an array of `u32`s.
pub fn pack(values: &[u32]) -> Pack { pub fn pack<const N: usize>(values: [u32; N]) -> Pack {
Pack { Pack {
items: items:
values.iter().cloned().map(p).collect::<Vec<_>>().into() values.into_iter().map(p).collect::<Vec<_>>().into()
} }
} }
/// Creates a set from a vec of blocks. /// Create a block of payload from an array of `u32`s.
pub fn set(values: Vec<Block>) -> Set { pub fn block<const N: usize>(
let len = values.iter().map(|item| item.len()).sum(); values: [u32; N], range: Range<usize>
Set { ) -> Block {
blocks: Arc::from(values.into_boxed_slice()),
len
}
}
/// Create a block of payload from a slice of `u32`s.
pub fn block(values: &[u32], range: Range<usize>) -> Block {
Block { Block {
pack: pack(values), pack: pack(values),
range range
} }
} }
/// Create a set from an array of blocks.
pub fn set<const N: usize>(blocks: [Block; N]) -> Set {
let len = blocks.iter().map(|item| item.len()).sum();
Set {
blocks: Arc::from(blocks.as_slice()),
len
}
}
/// Checks that a pack fulfils all invariants. /// Checks that a pack fulfils all invariants.
pub fn check_pack(pack: &Pack) { pub fn check_pack(pack: &Pack) {
// Empty pack is allowed. // Empty pack is allowed.
@ -1242,10 +1350,10 @@ pub(crate) mod testrig {
} }
} }
/// Creates an update from a bunch of integers /// Creates an update from an array of `u32`s.
pub fn update(values: &[u32]) -> Update { pub fn update<const N: usize>(values: [u32; N]) -> Update {
Update::new( Update::new(
set(vec![ set([
block(values, 0..values.len()) block(values, 0..values.len())
]) ])
) )
@ -1272,13 +1380,52 @@ mod test {
use super::*; use super::*;
use super::testrig::*; use super::testrig::*;
#[test]
fn set_merge() {
assert!(
set([block([], 0..0)]).merge(
&set([block([], 0..0)])
).iter().eq(set([block([], 0..0)]).iter())
);
assert!(
set([block([1, 3, 4], 0..3)]).merge(
&set([block([1, 3, 4], 0..3)])
).iter().eq(set([block([1, 3, 4], 0..3)]).iter())
);
assert!(
set([block([1, 3, 4], 0..3)]).merge(
&set([block([], 0..0)])
).iter().eq(set([block([1, 3, 4], 0..3)]).iter())
);
assert!(
set([block([], 0..0)]).merge(
&set([block([1, 3, 4], 0..3)])
).iter().eq(set([block([1, 3, 4], 0..3)]).iter())
);
assert!(
set([block([1, 3, 4, 5], 0..4)]).merge(
&set([block([1, 3, 4], 0..3)])
).iter().eq(set([block([1, 3, 4, 5], 0..4)]).iter())
);
assert!(
set([block([1, 3, 5], 0..3)]).merge(
&set([block([1, 3, 4], 0..3)])
).iter().eq(set([block([1, 3, 4, 5], 0..4)]).iter())
);
assert!(
set([block([1, 3, 5], 0..3), block([10, 11], 0..2)]).merge(
&set([block([3, 4], 0..2)])
).iter().eq(set([block([1, 3, 4, 5, 10, 11], 0..6)]).iter())
);
}
#[test] #[test]
fn set_iter() { fn set_iter() {
assert_eq!( assert_eq!(
Set { Set {
blocks: vec![ blocks: vec![
block(&[1, 2, 4], 0..3), block([1, 2, 4], 0..3),
block(&[4, 5], 1..2) block([4, 5], 1..2)
].into(), ].into(),
len: 4 len: 4
}.iter().cloned().collect::<Vec<_>>(), }.iter().cloned().collect::<Vec<_>>(),
@ -1289,11 +1436,11 @@ mod test {
#[test] #[test]
fn set_builder() { fn set_builder() {
let mut builder = SetBuilder::empty(); let mut builder = SetBuilder::empty();
builder.insert_pack(pack(&[1, 2, 11, 12])); builder.insert_pack(pack([1, 2, 11, 12]));
builder.insert_pack(pack(&[5, 6, 7, 15, 18])); builder.insert_pack(pack([5, 6, 7, 15, 18]));
builder.insert_pack(pack(&[6, 7])); builder.insert_pack(pack([6, 7]));
builder.insert_pack(pack(&[7])); builder.insert_pack(pack([7]));
builder.insert_pack(pack(&[17])); builder.insert_pack(pack([17]));
let set = builder.finalize(); let set = builder.finalize();
check_set(&set); check_set(&set);
assert_eq!( assert_eq!(
@ -1308,8 +1455,8 @@ mod test {
assert_eq!( assert_eq!(
Diff { Diff {
announced: pack(&[6, 7, 15, 18]), announced: pack([6, 7, 15, 18]),
withdrawn: pack(&[2, 8, 9]), withdrawn: pack([2, 8, 9]),
}.iter().collect::<Vec<_>>(), }.iter().collect::<Vec<_>>(),
[ [
(&p(2), W), (&p(6), A), (&p(7), A), (&p(8), W), (&p(9), W), (&p(2), W), (&p(6), A), (&p(7), A), (&p(8), W), (&p(9), W),
@ -1420,7 +1567,7 @@ mod test {
#[test] #[test]
fn owned_block_iter() { fn owned_block_iter() {
fn test_iter(payload: &[Payload], block: Block) { fn test_iter<const N: usize>(payload: [Payload; N], block: Block) {
let piter = payload.iter(); let piter = payload.iter();
let mut oiter = block.owned_iter(); let mut oiter = block.owned_iter();
@ -1434,46 +1581,46 @@ mod test {
// Empty set. // Empty set.
test_iter( test_iter(
&[], [],
block(&[], 0..0) block([], 0..0)
); );
// Empty range over a non-empty block. // Empty range over a non-empty block.
test_iter( test_iter(
&[], [],
block(&[7, 8, 10, 12, 18, 19], 3..3) block([7, 8, 10, 12, 18, 19], 3..3)
); );
// Blocks with a range. // Blocks with a range.
test_iter( test_iter(
&[p(7), p(8), p(10), p(12), p(18), p(19)], [p(7), p(8), p(10), p(12), p(18), p(19)],
block(&[7, 8, 10, 12, 18, 19], 0..6) block([7, 8, 10, 12, 18, 19], 0..6)
); );
test_iter( test_iter(
&[p(7), p(8), p(10), p(12), p(18), p(19)], [p(7), p(8), p(10), p(12), p(18), p(19)],
block(&[2, 3, 7, 8, 10, 12, 18, 19], 2..8) block([2, 3, 7, 8, 10, 12, 18, 19], 2..8)
); );
test_iter( test_iter(
&[p(7), p(8), p(10), p(12), p(18), p(19)], [p(7), p(8), p(10), p(12), p(18), p(19)],
block(&[7, 8, 10, 12, 18, 19, 21, 22], 0..6) block([7, 8, 10, 12, 18, 19, 21, 22], 0..6)
); );
test_iter( test_iter(
&[p(7), p(8), p(10), p(12), p(18), p(19)], [p(7), p(8), p(10), p(12), p(18), p(19)],
block(&[2, 3, 7, 8, 10, 12, 18, 19, 21], 2..8) block([2, 3, 7, 8, 10, 12, 18, 19, 21], 2..8)
); );
test_iter( test_iter(
&[p(7)], [p(7)],
block(&[2, 3, 7, 8, 10, 12, 18, 19, 21], 2..3) block([2, 3, 7, 8, 10, 12, 18, 19, 21], 2..3)
); );
test_iter( test_iter(
&[p(7), p(8), p(10), p(12), p(18), p(19)], [p(7), p(8), p(10), p(12), p(18), p(19)],
block(&[7, 8, 10, 12, 18, 19], 0..6) block([7, 8, 10, 12, 18, 19], 0..6)
); );
} }
#[test] #[test]
fn set_iters() { fn set_iters() {
fn test_iter(payload: &[Payload], set: Set) { fn test_iter<const N: usize>(payload: [Payload; N], set: Set) {
let piter = payload.iter(); let piter = payload.iter();
let mut iter = set.iter(); let mut iter = set.iter();
let mut oiter = set.owned_iter(); let mut oiter = set.owned_iter();
@ -1490,51 +1637,51 @@ mod test {
// Empty set. // Empty set.
test_iter( test_iter(
&[], [],
Set::from(pack(&[])) Set::from(pack([]))
); );
// Complete single pack. // Complete single pack.
test_iter( test_iter(
&[p(7), p(8), p(10), p(12), p(18), p(19)], [p(7), p(8), p(10), p(12), p(18), p(19)],
Set::from(pack(&[7, 8, 10, 12, 18, 19])) Set::from(pack([7, 8, 10, 12, 18, 19]))
); );
// Empty range over a non-empty block. // Empty range over a non-empty block.
test_iter( test_iter(
&[], [],
Set::from(block(&[7, 8, 10, 12, 18, 19], 3..3)) Set::from(block([7, 8, 10, 12, 18, 19], 3..3))
); );
// Blocks with a range. // Blocks with a range.
test_iter( test_iter(
&[p(7), p(8), p(10), p(12), p(18), p(19)], [p(7), p(8), p(10), p(12), p(18), p(19)],
Set::from(block(&[7, 8, 10, 12, 18, 19], 0..6)) Set::from(block([7, 8, 10, 12, 18, 19], 0..6))
); );
test_iter( test_iter(
&[p(7), p(8), p(10), p(12), p(18), p(19)], [p(7), p(8), p(10), p(12), p(18), p(19)],
Set::from(block(&[2, 3, 7, 8, 10, 12, 18, 19], 2..8)) Set::from(block([2, 3, 7, 8, 10, 12, 18, 19], 2..8))
); );
test_iter( test_iter(
&[p(7), p(8), p(10), p(12), p(18), p(19)], [p(7), p(8), p(10), p(12), p(18), p(19)],
Set::from(block(&[7, 8, 10, 12, 18, 19, 21, 22], 0..6)) Set::from(block([7, 8, 10, 12, 18, 19, 21, 22], 0..6))
); );
test_iter( test_iter(
&[p(7), p(8), p(10), p(12), p(18), p(19)], [p(7), p(8), p(10), p(12), p(18), p(19)],
Set::from(block(&[2, 3, 7, 8, 10, 12, 18, 19, 21], 2..8)) Set::from(block([2, 3, 7, 8, 10, 12, 18, 19, 21], 2..8))
); );
test_iter( test_iter(
&[p(7)], [p(7)],
Set::from(block(&[2, 3, 7, 8, 10, 12, 18, 19, 21], 2..3)) Set::from(block([2, 3, 7, 8, 10, 12, 18, 19, 21], 2..3))
); );
// Multiple blocks. // Multiple blocks.
test_iter( test_iter(
&[p(7), p(8), p(10), p(12), p(18), p(19)], [p(7), p(8), p(10), p(12), p(18), p(19)],
set(vec![ set([
block(&[2, 7, 8, 10], 1..3), block([2, 7, 8, 10], 1..3),
block(&[10], 0..1), block([10], 0..1),
block(&[2, 12, 18, 19], 1..4) block([2, 12, 18, 19], 1..4)
]) ])
); );
} }

View File

@ -168,8 +168,8 @@ async fn simple_comms() {
} }
).unwrap(); ).unwrap();
u.send_payload(testrig::update(&[2])).await; u.send_payload(testrig::update([2])).await;
assert_eq!(t.recv_payload().await.unwrap(), testrig::update(&[2])); assert_eq!(t.recv_payload().await.unwrap(), testrig::update([2]));
} }

View File

@ -6,7 +6,7 @@ use futures::future::{select, select_all, Either, FutureExt};
use log::debug; use log::debug;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use serde::Deserialize; use serde::Deserialize;
use crate::metrics; use crate::{metrics, payload};
use crate::metrics::{Metric, MetricType, MetricUnit}; use crate::metrics::{Metric, MetricType, MetricUnit};
use crate::comms::{ use crate::comms::{
Gate, GateMetrics, Link, Terminated, UnitHealth, UnitUpdate Gate, GateMetrics, Link, Terminated, UnitHealth, UnitUpdate
@ -195,6 +195,58 @@ impl metrics::Source for AnyMetrics {
} }
//------------ Merge ---------------------------------------------------------
/// A unit merging the data sets of all upstream units.
#[derive(Debug, Deserialize)]
pub struct Merge {
/// The set of units whose data set should be merged.
sources: Vec<Link>,
}
impl Merge {
pub async fn run(
mut self, mut component: Component, mut gate: Gate
) -> Result<(), Terminated> {
if self.sources.is_empty() {
gate.update(UnitUpdate::Gone).await;
return Err(Terminated)
}
let metrics = gate.metrics();
component.register_metrics(metrics.clone());
loop {
{
let res = select(
select_all(
self.sources.iter_mut().map(|link|
link.query().boxed()
)
),
gate.process().boxed()
).await;
if let Either::Right(_) = res {
continue
}
}
let mut output = payload::Set::default();
for source in self.sources.iter() {
if matches!(source.health(), UnitHealth::Healthy) {
if let Some(update) = source.payload() {
output = output.merge(update.set())
}
}
}
gate.update(
UnitUpdate::Payload(payload::Update::new(output))
).await;
}
}
}
//============ Tests ========================================================= //============ Tests =========================================================
#[cfg(test)] #[cfg(test)]
@ -245,25 +297,25 @@ mod test {
// Set one unit to healthy by sending a data update. Check that // Set one unit to healthy by sending a data update. Check that
// the target unstalls with an update. // the target unstalls with an update.
u1.send_payload(testrig::update(&[1])).await; u1.send_payload(testrig::update([1])).await;
assert_eq!(t.recv_payload().await.unwrap(), testrig::update(&[1])); assert_eq!(t.recv_payload().await.unwrap(), testrig::update([1]));
// Set another unit to healthy. This shouldnt change anything. // Set another unit to healthy. This shouldnt change anything.
u2.send_payload(testrig::update(&[2])).await; u2.send_payload(testrig::update([2])).await;
t.recv_nothing().unwrap(); t.recv_nothing().unwrap();
// Now stall the first one and check that we get an update with the // Now stall the first one and check that we get an update with the
// seconds data. // seconds data.
u1.send_stalled().await; u1.send_stalled().await;
assert_eq!(t.recv_payload().await.unwrap(), testrig::update(&[2])); assert_eq!(t.recv_payload().await.unwrap(), testrig::update([2]));
// Now stall the second one, too, and watch us stall. // Now stall the second one, too, and watch us stall.
u2.send_stalled().await; u2.send_stalled().await;
t.recv_stalled().await.unwrap(); t.recv_stalled().await.unwrap();
// Now unstall the third one and receive its data. // Now unstall the third one and receive its data.
u3.send_payload(testrig::update(&[3])).await; u3.send_payload(testrig::update([3])).await;
assert_eq!(t.recv_payload().await.unwrap(), testrig::update(&[3])); assert_eq!(t.recv_payload().await.unwrap(), testrig::update([3]));
} }
} }

View File

@ -45,6 +45,9 @@ pub enum Unit {
#[serde(rename = "json")] #[serde(rename = "json")]
Json(json::Json), Json(json::Json),
#[serde(rename = "merge")]
Merge(combine::Merge),
#[serde(rename = "slurm")] #[serde(rename = "slurm")]
Slurm(slurm::LocalExceptions), Slurm(slurm::LocalExceptions),
@ -62,6 +65,7 @@ impl Unit {
Unit::RtrTcp(unit) => unit.run(component, gate).await, Unit::RtrTcp(unit) => unit.run(component, gate).await,
Unit::RtrTls(unit) => unit.run(component, gate).await, Unit::RtrTls(unit) => unit.run(component, gate).await,
Unit::Json(unit) => unit.run(component, gate).await, Unit::Json(unit) => unit.run(component, gate).await,
Unit::Merge(unit) => unit.run(component, gate).await,
Unit::Slurm(unit) => unit.run(component, gate).await, Unit::Slurm(unit) => unit.run(component, gate).await,
#[cfg(test)] #[cfg(test)]