From 8a619dbb2a0d24a4aef66b4e1dcb81aacb35bc1f Mon Sep 17 00:00:00 2001 From: Martin Hoffmann Date: Fri, 15 Dec 2023 14:40:12 +0100 Subject: [PATCH] Allow skipping updates if the stored copy is new enough. --- src/collector/base.rs | 35 ++++- src/collector/mod.rs | 2 +- src/engine.rs | 278 +++++++++++++++++++++++++++++++++++--- src/operation.rs | 57 +++++--- src/payload/validation.rs | 7 +- src/rta.rs | 2 +- src/store.rs | 2 +- src/utils/binio.rs | 20 ++- src/utils/fs.rs | 17 ++- 9 files changed, 367 insertions(+), 53 deletions(-) diff --git a/src/collector/base.rs b/src/collector/base.rs index 92c7842..c05e5e1 100644 --- a/src/collector/base.rs +++ b/src/collector/base.rs @@ -2,6 +2,7 @@ //! //! This is a private module. It’s types are re-exported by the parent. +use std::io; use std::collections::HashSet; use std::path::Path; use std::sync::Arc; @@ -13,6 +14,7 @@ use crate::config::{Config, FallbackPolicy}; use crate::error::{Failed, Fatal, RunFailed}; use crate::metrics::Metrics; use crate::engine::CaCert; +use crate::utils::binio::ParseError; use super::{rrdp, rsync}; @@ -149,13 +151,14 @@ impl<'a> Run<'a> { /// /// If you are not interested in the metrics, you can simply drop the /// value, instead. - pub fn done(self, metrics: &mut Metrics) { + pub fn done(self, metrics: &mut Metrics) -> StoredStatus { if let Some(rrdp) = self.rrdp { rrdp.done(metrics) } if let Some(rsync) = self.rsync { rsync.done(metrics) } + StoredStatus(()) } /// Loads the trust anchor certificate at the given URI. @@ -361,3 +364,33 @@ impl Cleanup { } } + +//------------ StoredStatus -------------------------------------------------- + +/// The content of the collector’s status. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct StoredStatus(()); + +impl StoredStatus { + #[cfg(test)] + pub fn new_test() -> Self { + StoredStatus(()) + } + + pub fn is_current(&self, _collector: &Collector) -> bool { + true + } + + pub fn read( + _reader: &mut impl io::Read + ) -> Result { + Ok(Self(())) + } + + pub fn write( + &self, _writer: &mut impl io::Write + ) -> Result<(), io::Error> { + Ok(()) + } +} + diff --git a/src/collector/mod.rs b/src/collector/mod.rs index 88a3e2a..b2e731e 100644 --- a/src/collector/mod.rs +++ b/src/collector/mod.rs @@ -24,7 +24,7 @@ // mirroring the structure of the base module, i.e., they also have // `Collector`, `Run`, and `Repository` types. // -pub use self::base::{Collector, Cleanup, Run, Repository}; +pub use self::base::{Collector, Cleanup, Run, Repository, StoredStatus}; pub use self::rrdp::{HttpStatus, SnapshotReason}; mod base; diff --git a/src/engine.rs b/src/engine.rs index 9d4aba9..2f485b7 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -18,14 +18,16 @@ /// the accompanying trait [`ProcessPubPoint`] dealing with individual /// publication points. -use std::{fmt, fs, thread}; +use std::{fmt, fs, io, thread}; use std::borrow::Cow; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fs::File; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; use bytes::Bytes; +use chrono::{DateTime, TimeZone, Utc}; use crossbeam_queue::{ArrayQueue, SegQueue}; use log::{debug, error, info, warn}; use rpki::crypto::keys::KeyIdentifier; @@ -48,7 +50,10 @@ use crate::metrics::{ Metrics, PublicationMetrics, RepositoryMetrics, TalMetrics }; use crate::store::{Store, StoredManifest, StoredObject, StoredPoint}; +use crate::utils::binio::{Compose, Parse, ParseError}; +use crate::utils::fs::ExclusiveFile; use crate::utils::str::str_from_ascii; +use crate::utils::sync::Mutex; //------------ Configuration ------------------------------------------------- @@ -101,6 +106,9 @@ pub struct Engine { /// The store to load stored data from. store: Store, + /// The path to the status file. + status_path: PathBuf, + /// Should we be strict when decoding data? strict: bool, @@ -155,6 +163,7 @@ impl Engine { tals: Vec::new(), collector, store, + status_path: config.cache_dir.join("status.bin"), strict: config.strict, stale: config.stale, validation_threads: config.validation_threads, @@ -293,20 +302,29 @@ impl Engine { /// During the run, `processor` will be responsible for dealing with /// valid objects. It must implement the [`ProcessRun`] trait. /// + /// If `last_updated` is not `None`, it specifies a duration since the + /// last successful validation run before which a new update should be + /// attempted. Ie., if the last validation run was successful, is + /// compatible with the current configuration, and was less than the + /// duration ago, only the already stored data is used. + /// /// The method returns a [`Run`] that drives the validation run. pub fn start( - &self, processor: P - ) -> Result, Failed> { + &self, + processor: P, + last_updated: Option, + ) -> Result, RunFailed> { info!("Using the following TALs:"); for tal in &self.tals { info!(" * {}", tal.info().name()); } - Ok(Run::new( + Run::new( self, self.collector.as_ref().map(Collector::start), self.store.start(), + last_updated, processor - )) + ) } /// Dumps the content of the collector and store owned by the engine. @@ -341,6 +359,12 @@ pub struct Run<'a, P> { /// The processor for valid data. processor: P, + /// The status file. + /// + /// We keep it open during the run to avoid someone else starting a + /// competing run. + status_file: ExclusiveFile, + /// Was an error encountered during the run? had_err: AtomicBool, @@ -355,16 +379,57 @@ impl<'a, P> Run<'a, P> { /// Creates a new runner from all the parts. fn new( validation: &'a Engine, - collector: Option>, + mut collector: Option>, store: store::Run<'a>, + last_updated: Option, processor: P, - ) -> Self { - Run { - validation, collector, store, processor, + ) -> Result { + let mut status_file = ExclusiveFile::open( + &validation.status_path + ).map_err(|err| { + error!( + "Fatal: Failed to open store status file '{}': {}", + validation.status_path.display(), err + ); + error!( + " Is there another Routinator instance running?" + ); + RunFailed::fatal() + })?; + + if let Ok(status) = StoredStatus::read(&mut status_file) { + if status.is_current(validation, last_updated) { + eprintln!("Store is current, skipping update."); + collector = None; + } + } + + // If we have a collector, the status will change. Clear the status + // so that if we crash there isn’t a usable yet possibly incorrect + // status. + if collector.is_some() { + io::Seek::rewind(&mut status_file).map_err(|err| { + error!( + "Fatal: Failed to reset store status file '{}': {}", + validation.status_path.display(), err + ); + RunFailed::fatal() + })?; + status_file.set_len(0).map_err(|err| { + error!( + "Fatal: Failed to reset store status file '{}': {}", + validation.status_path.display(), err + ); + RunFailed::fatal() + })?; + } + + Ok(Run { + validation, collector, store, processor, status_file, had_err: AtomicBool::new(false), is_fatal: AtomicBool::new(false), metrics: Default::default() - } + }) } /// Cleans the collector and store owned by the engine. @@ -386,13 +451,35 @@ impl<'a, P> Run<'a, P> { /// /// If you are not interested in the metrics, you can simple drop the /// value, instead. - pub fn done(self) -> Metrics { + pub fn done(mut self) -> Result { let mut metrics = self.metrics; - if let Some(collector) = self.collector { + let collector_status = self.collector.map(|collector| { collector.done(&mut metrics) - } + }); self.store.done(&mut metrics); - metrics + + if let Some(collector_status) = collector_status { + StoredStatus::new( + Utc::now(), + self.validation.tals.as_slice(), + collector_status, + ).write(&mut self.status_file).map_err(|err| { + error!( + "Fatal: failed to write store status file '{}': {}", + self.validation.status_path.display(), err + ); + RunFailed::fatal() + })?; + self.status_file.close().map_err(|err| { + error!( + "Fatal: failed to close store status file '{}': {}", + self.validation.status_path.display(), err + ); + RunFailed::fatal() + })?; + } + + Ok(metrics) } } @@ -1847,7 +1934,7 @@ impl RunMetrics { cert.ca_repository.canonical_module() }); - let mut repository_indexes = self.repository_indexes.lock().unwrap(); + let mut repository_indexes = self.repository_indexes.lock(); if let Some(index) = repository_indexes.get(uri.as_ref()) { return *index } @@ -1878,7 +1965,7 @@ impl RunMetrics { /// Prepares the final metrics. pub fn prepare_final(&self, target: &mut Metrics) { let mut indexes: Vec<_> - = self.repository_indexes.lock().unwrap().iter().map(|item| { + = self.repository_indexes.lock().iter().map(|item| { (item.0.clone(), *item.1) }).collect(); indexes.sort_by_key(|(_, idx)| *idx); @@ -2049,3 +2136,158 @@ pub trait ProcessPubPoint: Sized + Send + Sync { } } + +//------------ StoredStatus -------------------------------------------------- + +/// The content of the store’s status. +/// +/// This type can be used to check whether the last validation run that +/// updated the store resulted in enough information for doing a run just +/// from the store. +/// +/// This requires that the run wasn’t too long ago, that all the TALs we +/// are using now were included back then as well, and that the collectors +/// used a compatible configuration. +#[derive(Clone, Debug, Eq, PartialEq)] +struct StoredStatus { + /// When did the last update conclude? + updated: DateTime, + + /// The public key bits of all the TALs used during the last update. + tals: HashSet, + + /// The status of the collectors. + collector: collector::StoredStatus, +} + +impl StoredStatus { + /// Creates a new stored status object from the contained data. + fn new( + updated: DateTime, + tals: &[Tal], + collector: collector::StoredStatus, + ) -> Self { + StoredStatus { + updated, + tals: tals.iter().map(|tal| tal.key_info().bits_bytes()).collect(), + collector, + } + } + + /// Returns whether data with this status should be considered current. + pub fn is_current( + &self, engine: &Engine, last_update: Option + ) -> bool { + // If last_update is None, we can never be current. + let earliest_update = match last_update { + Some(duration) => Utc::now() - duration, + None => return false, + }; + + // If the update is older that last_update, we are not current. + if self.updated < earliest_update { + return false + } + + // If there are TALs configured we didn’t use in the last update, + // we are not current. + for tal in &engine.tals { + if !self.tals.contains(tal.key_info().bits()) { + return false + } + } + + // If we have a collector, it needs to agree, too. + if let Some(collector) = engine.collector.as_ref() { + if !self.collector.is_current(collector) { + return false + } + } + + // Can’t find any more reasons why we shouldn’t be current. + true + } + + /// Reads the stored status from an IO reader. + pub fn read( + reader: &mut impl io::Read + ) -> Result { + // Version number. Must be 0u8. + let version = u8::parse(reader)?; + if version != 0 { + return Err(ParseError::format( + format!("unexpected version {}", version) + )) + } + + let updated_ts = i64::parse(reader)?; + let updated = match Utc.timestamp_opt(updated_ts, 0).single() { + Some(time) => time, + None => { + return Err(ParseError::format( + format!("invalid update time {}", updated_ts) + )); + } + }; + + let tal_len = usize::try_from(u32::parse(reader)?).map_err(|_| { + ParseError::format("too many TALs for this system") + })?; + let mut tals = HashSet::new(); + for _ in 0..tal_len { + tals.insert(Bytes::parse(reader)?); + } + + let collector = collector::StoredStatus::read(reader)?; + + Ok(Self { updated, tals, collector }) + } + + /// Appends the stored status to a writer. + pub fn write( + &self, writer: &mut impl io::Write + ) -> Result<(), io::Error> { + // Version: 0u8. + 0u8.compose(writer)?; + self.updated.timestamp().compose(writer)?; + u32::try_from(self.tals.len()).map_err(|_| + io::Error::new(io::ErrorKind::Other, "excessively large TAL set") + )?.compose(writer)?; + self.tals.iter().try_for_each(|tal| tal.compose(writer))?; + self.collector.write(writer)?; + + Ok(()) + } +} + + +//============ Tests ========================================================= + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn write_read_stored_status() { + fn cycle( + tals: impl IntoIterator + ) { + let orig = StoredStatus { + updated: Utc::now(), + tals: tals.into_iter().map(Bytes::from).collect(), + collector: collector::StoredStatus::new_test(), + }; + let mut written = Vec::new(); + orig.write(&mut written).unwrap(); + let decoded = StoredStatus::read( + &mut written.as_slice() + ).unwrap(); + assert_eq!(orig, decoded); + } + + cycle([]); + cycle(["foo", "barbar", ""]); + cycle([""]); + } +} + diff --git a/src/operation.rs b/src/operation.rs index 3ac29e1..51d4096 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -249,7 +249,15 @@ impl Server { validation.ignite()?; let join = thread::spawn(move || { + // On the first iteration we accept old data at most one refresh + // cycle old. This will be set to `None` for subsequent + // iterations. + let mut last_update = Some(process.config().refresh); + + // We only want to retry a run that can be retried once. If it + // fails again, something is odd and we should crash. let mut can_retry = true; + let err = loop { if let Some(log) = log.as_ref() { log.start(); @@ -260,9 +268,13 @@ impl Server { Ok(exceptions) => { match Self::process_once( process.config(), &validation, &history, - &mut notify, exceptions, + &mut notify, exceptions, last_update.take(), ) { Ok(()) => { + // Succcessful run, so we can go back to + // allowing a single retry-able failure. + can_retry = true; + history.read().refresh_wait() } Err(err) => { @@ -386,10 +398,13 @@ impl Server { history: &SharedHistory, notify: &mut NotifySender, exceptions: LocalExceptions, + last_update: Option, ) -> Result<(), RunFailed> { info!("Starting a validation run."); history.mark_update_start(); - let (report, metrics) = ValidationReport::process(engine, config)?; + let (report, metrics) = ValidationReport::process( + engine, config, last_update + )?; let must_notify = history.update( report, &exceptions, metrics, ); @@ -435,6 +450,9 @@ pub struct Vrps { /// Don’t update the repository. noupdate: bool, + /// Don’t update the cache if it isn’t older than given. + last_updated: Option, + /// Return an error on incomplete update. complete: bool, } @@ -485,9 +503,9 @@ struct VrpsArgs { #[arg(long)] no_aspas: bool, - /// Don't update the local cache - #[arg(short, long)] - noupdate: bool, + /// Don't update the local cache (if it is newer than given). + #[arg(short, long, value_name = "SECONDS")] + noupdate: Option>, /// Return an error status on incomplete update #[arg(long)] @@ -559,18 +577,13 @@ impl Vrps { path, format, output, - noupdate: args.noupdate, + noupdate: matches!(args.noupdate, Some(None)), + last_updated: args.noupdate.flatten().map(Duration::from_secs), complete: args.complete, }) } /// Produces a list of Validated ROA Payload. - /// - /// The list will be written to the file identified by `path` or - /// stdout if that is `None`. The format is determined by `format`. - /// If `noupdate` is `false`, the local repository will be updated first - /// and rsync will be enabled during validation to sync any new - /// publication points. fn run(mut self, process: Process) -> Result<(), ExitError> { self.output.update_from_config(process.config()); let mut engine = Engine::new(process.config(), !self.noupdate)?; @@ -582,7 +595,9 @@ impl Vrps { let mut once = false; loop { - match ValidationReport::process(&engine, process.config()) { + match ValidationReport::process( + &engine, process.config(), self.last_updated, + ) { Ok(res) => break res, Err(err) => { if err.should_retry() { @@ -667,6 +682,9 @@ pub struct Validate { /// Don’t update the repository. noupdate: bool, + /// Don’t update the cache if it isn’t older than given. + last_updated: Option, + /// Return an error on incomplete update. complete: bool, } @@ -709,9 +727,9 @@ struct ValidateArgs { #[arg(short, long, value_name = "PATH", default_value = "-")] output: PathBuf, - /// Don't update the local cache - #[arg(short, long)] - noupdate: bool, + /// Don't update the local cache (if it is newer than given). + #[arg(short, long, value_name = "SECONDS")] + noupdate: Option>, /// Return an error status on incomplete update #[arg(long)] @@ -770,7 +788,8 @@ impl Validate { Some(args.output) } }, - noupdate: args.noupdate, + noupdate: matches!(args.noupdate, Some(None)), + last_updated: args.noupdate.flatten().map(Duration::from_secs), complete: args.complete, }) } @@ -852,7 +871,7 @@ impl Validate { engine.ignite()?; process.switch_logging(false, false)?; let (report, mut metrics) = ValidationReport::process( - &engine, process.config(), + &engine, process.config(), self.last_updated, )?; let snapshot = report.into_snapshot( &LocalExceptions::load(process.config(), false)?, @@ -1091,7 +1110,7 @@ impl Update { engine.ignite()?; process.switch_logging(false, false)?; let (_, metrics) = ValidationReport::process( - &engine, process.config(), + &engine, process.config(), None )?; if self.complete && !metrics.rsync_complete() { Err(ExitError::IncompleteUpdate) diff --git a/src/payload/validation.rs b/src/payload/validation.rs index c6fa567..6abd757 100644 --- a/src/payload/validation.rs +++ b/src/payload/validation.rs @@ -17,6 +17,7 @@ use std::cmp; use std::collections::hash_map; use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use crossbeam_queue::SegQueue; use log::{info, warn}; use rpki::uri; @@ -94,13 +95,13 @@ impl ValidationReport { /// Creates a new validation report by running the engine. pub fn process( - engine: &Engine, config: &Config, + engine: &Engine, config: &Config, last_update: Option ) -> Result<(Self, Metrics), RunFailed> { let report = Self::new(config); - let mut run = engine.start(&report)?; + let mut run = engine.start(&report, last_update)?; run.process()?; run.cleanup()?; - let metrics = run.done(); + let metrics = run.done()?; Ok((report, metrics)) } diff --git a/src/rta.rs b/src/rta.rs index 34fb24f..7d11052 100644 --- a/src/rta.rs +++ b/src/rta.rs @@ -36,7 +36,7 @@ impl<'a> ValidationReport<'a> { &self, engine: &Engine, ) -> Result<(), RunFailed> { - let mut run = engine.start(self)?; + let mut run = engine.start(self, None)?; run.process()?; run.cleanup()?; Ok(()) diff --git a/src/store.rs b/src/store.rs index 2315887..55bd3ab 100644 --- a/src/store.rs +++ b/src/store.rs @@ -430,7 +430,7 @@ impl<'a> Run<'a> { fn new( store: &'a Store, ) -> Self { - Run { store } + Self { store } } /// Finishes the validation run. diff --git a/src/utils/binio.rs b/src/utils/binio.rs index b7860d2..1fbd89b 100644 --- a/src/utils/binio.rs +++ b/src/utils/binio.rs @@ -6,8 +6,7 @@ use std::{error, fmt, io, slice}; use bytes::Bytes; -use rpki::rrdp; -use rpki::uri; +use rpki::{rrdp, uri}; use uuid::Uuid; @@ -40,6 +39,23 @@ impl Parse for u8 { } +//------------ u16 ----------------------------------------------------------- + +impl Compose for u16 { + fn compose(&self, target: &mut W) -> Result<(), io::Error> { + target.write_all(&self.to_be_bytes()) + } +} + +impl Parse for u16 { + fn parse(source: &mut R) -> Result { + let mut res = 0u16.to_ne_bytes(); + source.read_exact(&mut res)?; + Ok(u16::from_be_bytes(res)) + } +} + + //------------ u32 ----------------------------------------------------------- impl Compose for u32 { diff --git a/src/utils/fs.rs b/src/utils/fs.rs index 70116a9..c5881d4 100644 --- a/src/utils/fs.rs +++ b/src/utils/fs.rs @@ -33,13 +33,16 @@ pub struct ExclusiveFile { impl ExclusiveFile { /// Opens an exclusive file. /// - /// Opens the file specified by `path` for reading and writing and blocks - /// access to it by other processes and possibly re-opening it by the same - /// process again, too. If the file doesn’t exist, it will be created. + /// Trys to Open the file specified by `path` for reading and writing and + /// blocks access to it by other processes and possibly re-opening it by + /// the same process again, too. If the file doesn’t exist, it will be + /// created. /// - /// Returns an error if opening fails. Because there is no portable way - /// to indicate opening failing because of an already locked file, this - /// will always just be a generic IO error. + /// If the file has already been opened exclusively, the function will + /// fail and return an error. Because there is no portable way + /// to indicate this error case, a generic IO error will be returned. + /// + /// An error is also returned if opening fails for other reasons. pub fn open(path: &Path) -> Result { Self::_open(path) } @@ -67,7 +70,7 @@ impl ExclusiveFile { let _ = nix::fcntl::fcntl( file.as_raw_fd(), nix::fcntl::FcntlArg::F_SETLK(&nix::libc::flock { - l_type: (nix::libc::F_RDLCK | nix::libc::F_WRLCK) as c_short, + l_type: nix::libc::F_WRLCK as c_short, l_whence: nix::libc::SEEK_SET as c_short, l_start: 0, l_len: 0,