mirror of
https://github.com/NLnetLabs/rtrtr.git
synced 2024-05-11 05:55:07 +00:00
Fix slurm unit start race. (#89)
This PR fixes a race in the slurm unit that caused exceptions not processed if loading the files took too long. As a side-effect, the unit’s output will now be updated immediately when an update to the files has been detected rather than only be considered when the next update from the unit’s source comes in. --------- Co-authored-by: Luuk Hendriks <mail@luukhendriks.eu>
This commit is contained in:
+94
-37
@@ -5,10 +5,10 @@ use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::{Duration, SystemTime};
|
||||
use arc_swap::ArcSwap;
|
||||
use futures::future::{select, Either, FutureExt};
|
||||
use log::debug;
|
||||
use rpki::slurm::{SlurmFile, ValidationOutputFilters};
|
||||
use serde::Deserialize;
|
||||
use tokio::sync::Notify;
|
||||
use crate::payload;
|
||||
use crate::comms::{Gate, Link, Terminated, UnitStatus};
|
||||
use crate::config::ConfigPath;
|
||||
@@ -41,15 +41,44 @@ impl LocalExceptions {
|
||||
let files = ExceptionSet::new(
|
||||
self.files.into_iter().map(Into::into).collect()
|
||||
);
|
||||
|
||||
// The data set we receive from the source.
|
||||
let mut data = None;
|
||||
|
||||
// Whether we are ready to submit an update to our gate.
|
||||
//
|
||||
// This will stay at false until we have received our first
|
||||
// notification from the exception set indicating that it loaded all
|
||||
// files.
|
||||
let mut ready = false;
|
||||
loop {
|
||||
let update = match select(
|
||||
self.source.query().boxed(), gate.process().boxed()
|
||||
).await {
|
||||
Either::Left((Ok(update), _)) => update,
|
||||
Either::Left((Err(UnitStatus::Gone), _)) => return Ok(()),
|
||||
_ => continue
|
||||
};
|
||||
gate.update_data(files.apply(component.name(), update)).await;
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
maybe_update = self.source.query() => {
|
||||
match maybe_update {
|
||||
Ok(update) => {
|
||||
data = Some(update);
|
||||
}
|
||||
Err(UnitStatus::Gone) => return Ok(()),
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
|
||||
_ = files.notified() => {
|
||||
ready = true;
|
||||
}
|
||||
|
||||
_ = gate.process() => {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if let (true, Some(data)) = (ready, data.as_ref()) {
|
||||
gate.update_data(
|
||||
files.apply(component.name(), data.clone())
|
||||
).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -59,14 +88,7 @@ impl LocalExceptions {
|
||||
|
||||
/// A collection of all the local exception files we are using.
|
||||
struct ExceptionSet {
|
||||
/// The paths to the various files.
|
||||
paths: Arc<Vec<PathBuf>>,
|
||||
|
||||
/// The content of the various files.
|
||||
///
|
||||
/// This lives behind an `ArcSwap` so we can cheaply swap out the content
|
||||
/// if a file updates.
|
||||
files: Arc<Vec<ArcSwap<Content>>>,
|
||||
data: Arc<ExceptionSetData>,
|
||||
|
||||
/// An alive check for the update thread.
|
||||
///
|
||||
@@ -77,21 +99,22 @@ struct ExceptionSet {
|
||||
|
||||
impl ExceptionSet {
|
||||
fn new(paths: Vec<PathBuf>) -> Self {
|
||||
let paths = Arc::new(paths);
|
||||
|
||||
// Doing things in this order avoids the need for type annotations.
|
||||
let res = ExceptionSet {
|
||||
paths: paths.clone(),
|
||||
files: Arc::new(
|
||||
paths.iter().map(|_| Default::default()).collect()
|
||||
data: Arc::new(
|
||||
ExceptionSetData {
|
||||
files: paths.iter().map(|_| Default::default()).collect(),
|
||||
paths,
|
||||
notify: Notify::new(),
|
||||
}
|
||||
),
|
||||
alive: Arc::new(()),
|
||||
};
|
||||
let content = res.files.clone();
|
||||
let data = res.data.clone();
|
||||
let alive = Arc::downgrade(&res.alive);
|
||||
|
||||
thread::spawn(move || {
|
||||
Self::update_thread(paths, content, alive)
|
||||
data.update_thread(alive)
|
||||
});
|
||||
|
||||
res
|
||||
@@ -101,7 +124,9 @@ impl ExceptionSet {
|
||||
let serial = update.serial();
|
||||
let mut set = update.into_set();
|
||||
|
||||
for (path, file) in self.paths.iter().zip(self.files.iter()) {
|
||||
for (path, file) in
|
||||
self.data.paths.iter().zip(self.data.files.iter())
|
||||
{
|
||||
set = file.load().apply(unit, path, set);
|
||||
|
||||
}
|
||||
@@ -109,12 +134,31 @@ impl ExceptionSet {
|
||||
payload::Update::new(serial, set, None)
|
||||
}
|
||||
|
||||
fn update_thread(
|
||||
paths: Arc<Vec<PathBuf>>,
|
||||
content: Arc<Vec<ArcSwap<Content>>>,
|
||||
alive: Weak<()>,
|
||||
) {
|
||||
let mut modified = vec![None::<SystemTime>; paths.len()];
|
||||
async fn notified(&self) {
|
||||
self.data.notify.notified().await
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//------------ ExceptionSetData ---------------------------------------------
|
||||
|
||||
struct ExceptionSetData {
|
||||
/// The paths to the various files.
|
||||
paths: Vec<PathBuf>,
|
||||
|
||||
/// The content of the various files.
|
||||
///
|
||||
/// This lives behind an `ArcSwap` so we can cheaply swap out the content
|
||||
/// if a file updates.
|
||||
files: Vec<ArcSwap<Content>>,
|
||||
|
||||
/// A notifier for when the set has changed.
|
||||
notify: Notify,
|
||||
}
|
||||
|
||||
impl ExceptionSetData {
|
||||
fn update_thread(self: Arc<Self>, alive: Weak<()>) {
|
||||
let mut modified = vec![None::<SystemTime>; self.paths.len()];
|
||||
|
||||
loop {
|
||||
if alive.upgrade().is_none() {
|
||||
@@ -122,26 +166,39 @@ impl ExceptionSet {
|
||||
return
|
||||
}
|
||||
|
||||
let mut updated = false;
|
||||
|
||||
for (path, (modified, content)) in
|
||||
paths.iter().zip(modified.iter_mut().zip(content.iter()))
|
||||
self.paths.iter().zip(
|
||||
modified.iter_mut().zip(self.files.iter())
|
||||
)
|
||||
{
|
||||
// We simply ignore any errors for now.
|
||||
let _ = Self::update_file(path, modified, content);
|
||||
if let Ok(true) = Self::update_file(path, modified, content) {
|
||||
updated = true;
|
||||
}
|
||||
}
|
||||
|
||||
if updated {
|
||||
self.notify.notify_one();
|
||||
}
|
||||
|
||||
thread::sleep(UPDATE_SLEEP);
|
||||
}
|
||||
}
|
||||
|
||||
/// Updates the given file if it changed.
|
||||
///
|
||||
/// Returns `Ok(true)` if the file was updated or `Ok(false)` if not.
|
||||
fn update_file(
|
||||
path: &Path,
|
||||
old_modified: &mut Option<SystemTime>,
|
||||
content: &ArcSwap<Content>
|
||||
) -> Result<(), io::Error> {
|
||||
) -> Result<bool, io::Error> {
|
||||
let new_modified = fs::metadata(path)?.modified()?;
|
||||
if let Some(old_modified) = old_modified.as_ref() {
|
||||
if new_modified >= *old_modified {
|
||||
return Ok(())
|
||||
if new_modified <= *old_modified {
|
||||
return Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -154,7 +211,7 @@ impl ExceptionSet {
|
||||
));
|
||||
*old_modified = Some(new_modified);
|
||||
debug!("Updated Slurm file {}", path.display());
|
||||
Ok(())
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user