diff --git a/src/units/slurm.rs b/src/units/slurm.rs index 69bfb96..21f295e 100644 --- a/src/units/slurm.rs +++ b/src/units/slurm.rs @@ -5,10 +5,10 @@ use std::path::{Path, PathBuf}; use std::sync::{Arc, Weak}; use std::time::{Duration, SystemTime}; use arc_swap::ArcSwap; -use futures::future::{select, Either, FutureExt}; use log::debug; use rpki::slurm::{SlurmFile, ValidationOutputFilters}; use serde::Deserialize; +use tokio::sync::Notify; use crate::payload; use crate::comms::{Gate, Link, Terminated, UnitStatus}; use crate::config::ConfigPath; @@ -41,15 +41,44 @@ impl LocalExceptions { let files = ExceptionSet::new( self.files.into_iter().map(Into::into).collect() ); + + // The data set we receive from the source. + let mut data = None; + + // Whether we are ready to submit an update to our gate. + // + // This will stay at false until we have received our first + // notification from the exception set indicating that it loaded all + // files. + let mut ready = false; loop { - let update = match select( - self.source.query().boxed(), gate.process().boxed() - ).await { - Either::Left((Ok(update), _)) => update, - Either::Left((Err(UnitStatus::Gone), _)) => return Ok(()), - _ => continue - }; - gate.update_data(files.apply(component.name(), update)).await; + tokio::select! { + biased; + + maybe_update = self.source.query() => { + match maybe_update { + Ok(update) => { + data = Some(update); + } + Err(UnitStatus::Gone) => return Ok(()), + _ => continue, + } + } + + _ = files.notified() => { + ready = true; + } + + _ = gate.process() => { + continue + } + } + + if let (true, Some(data)) = (ready, data.as_ref()) { + gate.update_data( + files.apply(component.name(), data.clone()) + ).await; + } } } } @@ -59,14 +88,7 @@ impl LocalExceptions { /// A collection of all the local exception files we are using. struct ExceptionSet { - /// The paths to the various files. - paths: Arc>, - - /// The content of the various files. - /// - /// This lives behind an `ArcSwap` so we can cheaply swap out the content - /// if a file updates. - files: Arc>>, + data: Arc, /// An alive check for the update thread. /// @@ -77,21 +99,22 @@ struct ExceptionSet { impl ExceptionSet { fn new(paths: Vec) -> Self { - let paths = Arc::new(paths); - // Doing things in this order avoids the need for type annotations. let res = ExceptionSet { - paths: paths.clone(), - files: Arc::new( - paths.iter().map(|_| Default::default()).collect() + data: Arc::new( + ExceptionSetData { + files: paths.iter().map(|_| Default::default()).collect(), + paths, + notify: Notify::new(), + } ), alive: Arc::new(()), }; - let content = res.files.clone(); + let data = res.data.clone(); let alive = Arc::downgrade(&res.alive); thread::spawn(move || { - Self::update_thread(paths, content, alive) + data.update_thread(alive) }); res @@ -101,7 +124,9 @@ impl ExceptionSet { let serial = update.serial(); let mut set = update.into_set(); - for (path, file) in self.paths.iter().zip(self.files.iter()) { + for (path, file) in + self.data.paths.iter().zip(self.data.files.iter()) + { set = file.load().apply(unit, path, set); } @@ -109,12 +134,31 @@ impl ExceptionSet { payload::Update::new(serial, set, None) } - fn update_thread( - paths: Arc>, - content: Arc>>, - alive: Weak<()>, - ) { - let mut modified = vec![None::; paths.len()]; + async fn notified(&self) { + self.data.notify.notified().await + } +} + + +//------------ ExceptionSetData --------------------------------------------- + +struct ExceptionSetData { + /// The paths to the various files. + paths: Vec, + + /// The content of the various files. + /// + /// This lives behind an `ArcSwap` so we can cheaply swap out the content + /// if a file updates. + files: Vec>, + + /// A notifier for when the set has changed. + notify: Notify, +} + +impl ExceptionSetData { + fn update_thread(self: Arc, alive: Weak<()>) { + let mut modified = vec![None::; self.paths.len()]; loop { if alive.upgrade().is_none() { @@ -122,26 +166,39 @@ impl ExceptionSet { return } + let mut updated = false; + for (path, (modified, content)) in - paths.iter().zip(modified.iter_mut().zip(content.iter())) + self.paths.iter().zip( + modified.iter_mut().zip(self.files.iter()) + ) { // We simply ignore any errors for now. - let _ = Self::update_file(path, modified, content); + if let Ok(true) = Self::update_file(path, modified, content) { + updated = true; + } + } + + if updated { + self.notify.notify_one(); } thread::sleep(UPDATE_SLEEP); } } + /// Updates the given file if it changed. + /// + /// Returns `Ok(true)` if the file was updated or `Ok(false)` if not. fn update_file( path: &Path, old_modified: &mut Option, content: &ArcSwap - ) -> Result<(), io::Error> { + ) -> Result { let new_modified = fs::metadata(path)?.modified()?; if let Some(old_modified) = old_modified.as_ref() { - if new_modified >= *old_modified { - return Ok(()) + if new_modified <= *old_modified { + return Ok(false) } } @@ -154,7 +211,7 @@ impl ExceptionSet { )); *old_modified = Some(new_modified); debug!("Updated Slurm file {}", path.display()); - Ok(()) + Ok(true) } }