Allow skipping updates if the stored copy is new enough.

This commit is contained in:
Martin Hoffmann
2023-12-15 14:40:12 +01:00
parent 99f672e51c
commit 8a619dbb2a
9 changed files with 367 additions and 53 deletions
+34 -1
View File
@@ -2,6 +2,7 @@
//!
//! This is a private module. Its types are re-exported by the parent.
use std::io;
use std::collections::HashSet;
use std::path::Path;
use std::sync::Arc;
@@ -13,6 +14,7 @@ use crate::config::{Config, FallbackPolicy};
use crate::error::{Failed, Fatal, RunFailed};
use crate::metrics::Metrics;
use crate::engine::CaCert;
use crate::utils::binio::ParseError;
use super::{rrdp, rsync};
@@ -149,13 +151,14 @@ impl<'a> Run<'a> {
///
/// If you are not interested in the metrics, you can simply drop the
/// value, instead.
pub fn done(self, metrics: &mut Metrics) {
pub fn done(self, metrics: &mut Metrics) -> StoredStatus {
if let Some(rrdp) = self.rrdp {
rrdp.done(metrics)
}
if let Some(rsync) = self.rsync {
rsync.done(metrics)
}
StoredStatus(())
}
/// Loads the trust anchor certificate at the given URI.
@@ -361,3 +364,33 @@ impl Cleanup {
}
}
//------------ StoredStatus --------------------------------------------------
/// The content of the collectors status.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct StoredStatus(());
impl StoredStatus {
#[cfg(test)]
pub fn new_test() -> Self {
StoredStatus(())
}
pub fn is_current(&self, _collector: &Collector) -> bool {
true
}
pub fn read(
_reader: &mut impl io::Read
) -> Result<Self, ParseError> {
Ok(Self(()))
}
pub fn write(
&self, _writer: &mut impl io::Write
) -> Result<(), io::Error> {
Ok(())
}
}
+1 -1
View File
@@ -24,7 +24,7 @@
// mirroring the structure of the base module, i.e., they also have
// `Collector`, `Run`, and `Repository` types.
//
pub use self::base::{Collector, Cleanup, Run, Repository};
pub use self::base::{Collector, Cleanup, Run, Repository, StoredStatus};
pub use self::rrdp::{HttpStatus, SnapshotReason};
mod base;
+260 -18
View File
@@ -18,14 +18,16 @@
/// the accompanying trait [`ProcessPubPoint`] dealing with individual
/// publication points.
use std::{fmt, fs, thread};
use std::{fmt, fs, io, thread};
use std::borrow::Cow;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use bytes::Bytes;
use chrono::{DateTime, TimeZone, Utc};
use crossbeam_queue::{ArrayQueue, SegQueue};
use log::{debug, error, info, warn};
use rpki::crypto::keys::KeyIdentifier;
@@ -48,7 +50,10 @@ use crate::metrics::{
Metrics, PublicationMetrics, RepositoryMetrics, TalMetrics
};
use crate::store::{Store, StoredManifest, StoredObject, StoredPoint};
use crate::utils::binio::{Compose, Parse, ParseError};
use crate::utils::fs::ExclusiveFile;
use crate::utils::str::str_from_ascii;
use crate::utils::sync::Mutex;
//------------ Configuration -------------------------------------------------
@@ -101,6 +106,9 @@ pub struct Engine {
/// The store to load stored data from.
store: Store,
/// The path to the status file.
status_path: PathBuf,
/// Should we be strict when decoding data?
strict: bool,
@@ -155,6 +163,7 @@ impl Engine {
tals: Vec::new(),
collector,
store,
status_path: config.cache_dir.join("status.bin"),
strict: config.strict,
stale: config.stale,
validation_threads: config.validation_threads,
@@ -293,20 +302,29 @@ impl Engine {
/// During the run, `processor` will be responsible for dealing with
/// valid objects. It must implement the [`ProcessRun`] trait.
///
/// If `last_updated` is not `None`, it specifies a duration since the
/// last successful validation run before which a new update should be
/// attempted. Ie., if the last validation run was successful, is
/// compatible with the current configuration, and was less than the
/// duration ago, only the already stored data is used.
///
/// The method returns a [`Run`] that drives the validation run.
pub fn start<P: ProcessRun>(
&self, processor: P
) -> Result<Run<P>, Failed> {
&self,
processor: P,
last_updated: Option<Duration>,
) -> Result<Run<P>, RunFailed> {
info!("Using the following TALs:");
for tal in &self.tals {
info!(" * {}", tal.info().name());
}
Ok(Run::new(
Run::new(
self,
self.collector.as_ref().map(Collector::start),
self.store.start(),
last_updated,
processor
))
)
}
/// Dumps the content of the collector and store owned by the engine.
@@ -341,6 +359,12 @@ pub struct Run<'a, P> {
/// The processor for valid data.
processor: P,
/// The status file.
///
/// We keep it open during the run to avoid someone else starting a
/// competing run.
status_file: ExclusiveFile,
/// Was an error encountered during the run?
had_err: AtomicBool,
@@ -355,16 +379,57 @@ impl<'a, P> Run<'a, P> {
/// Creates a new runner from all the parts.
fn new(
validation: &'a Engine,
collector: Option<collector::Run<'a>>,
mut collector: Option<collector::Run<'a>>,
store: store::Run<'a>,
last_updated: Option<Duration>,
processor: P,
) -> Self {
Run {
validation, collector, store, processor,
) -> Result<Self, RunFailed> {
let mut status_file = ExclusiveFile::open(
&validation.status_path
).map_err(|err| {
error!(
"Fatal: Failed to open store status file '{}': {}",
validation.status_path.display(), err
);
error!(
" Is there another Routinator instance running?"
);
RunFailed::fatal()
})?;
if let Ok(status) = StoredStatus::read(&mut status_file) {
if status.is_current(validation, last_updated) {
eprintln!("Store is current, skipping update.");
collector = None;
}
}
// If we have a collector, the status will change. Clear the status
// so that if we crash there isnt a usable yet possibly incorrect
// status.
if collector.is_some() {
io::Seek::rewind(&mut status_file).map_err(|err| {
error!(
"Fatal: Failed to reset store status file '{}': {}",
validation.status_path.display(), err
);
RunFailed::fatal()
})?;
status_file.set_len(0).map_err(|err| {
error!(
"Fatal: Failed to reset store status file '{}': {}",
validation.status_path.display(), err
);
RunFailed::fatal()
})?;
}
Ok(Run {
validation, collector, store, processor, status_file,
had_err: AtomicBool::new(false),
is_fatal: AtomicBool::new(false),
metrics: Default::default()
}
})
}
/// Cleans the collector and store owned by the engine.
@@ -386,13 +451,35 @@ impl<'a, P> Run<'a, P> {
///
/// If you are not interested in the metrics, you can simple drop the
/// value, instead.
pub fn done(self) -> Metrics {
pub fn done(mut self) -> Result<Metrics, RunFailed> {
let mut metrics = self.metrics;
if let Some(collector) = self.collector {
let collector_status = self.collector.map(|collector| {
collector.done(&mut metrics)
}
});
self.store.done(&mut metrics);
metrics
if let Some(collector_status) = collector_status {
StoredStatus::new(
Utc::now(),
self.validation.tals.as_slice(),
collector_status,
).write(&mut self.status_file).map_err(|err| {
error!(
"Fatal: failed to write store status file '{}': {}",
self.validation.status_path.display(), err
);
RunFailed::fatal()
})?;
self.status_file.close().map_err(|err| {
error!(
"Fatal: failed to close store status file '{}': {}",
self.validation.status_path.display(), err
);
RunFailed::fatal()
})?;
}
Ok(metrics)
}
}
@@ -1847,7 +1934,7 @@ impl RunMetrics {
cert.ca_repository.canonical_module()
});
let mut repository_indexes = self.repository_indexes.lock().unwrap();
let mut repository_indexes = self.repository_indexes.lock();
if let Some(index) = repository_indexes.get(uri.as_ref()) {
return *index
}
@@ -1878,7 +1965,7 @@ impl RunMetrics {
/// Prepares the final metrics.
pub fn prepare_final(&self, target: &mut Metrics) {
let mut indexes: Vec<_>
= self.repository_indexes.lock().unwrap().iter().map(|item| {
= self.repository_indexes.lock().iter().map(|item| {
(item.0.clone(), *item.1)
}).collect();
indexes.sort_by_key(|(_, idx)| *idx);
@@ -2049,3 +2136,158 @@ pub trait ProcessPubPoint: Sized + Send + Sync {
}
}
//------------ StoredStatus --------------------------------------------------
/// The content of the stores status.
///
/// This type can be used to check whether the last validation run that
/// updated the store resulted in enough information for doing a run just
/// from the store.
///
/// This requires that the run wasnt too long ago, that all the TALs we
/// are using now were included back then as well, and that the collectors
/// used a compatible configuration.
#[derive(Clone, Debug, Eq, PartialEq)]
struct StoredStatus {
/// When did the last update conclude?
updated: DateTime<Utc>,
/// The public key bits of all the TALs used during the last update.
tals: HashSet<Bytes>,
/// The status of the collectors.
collector: collector::StoredStatus,
}
impl StoredStatus {
/// Creates a new stored status object from the contained data.
fn new(
updated: DateTime<Utc>,
tals: &[Tal],
collector: collector::StoredStatus,
) -> Self {
StoredStatus {
updated,
tals: tals.iter().map(|tal| tal.key_info().bits_bytes()).collect(),
collector,
}
}
/// Returns whether data with this status should be considered current.
pub fn is_current(
&self, engine: &Engine, last_update: Option<Duration>
) -> bool {
// If last_update is None, we can never be current.
let earliest_update = match last_update {
Some(duration) => Utc::now() - duration,
None => return false,
};
// If the update is older that last_update, we are not current.
if self.updated < earliest_update {
return false
}
// If there are TALs configured we didnt use in the last update,
// we are not current.
for tal in &engine.tals {
if !self.tals.contains(tal.key_info().bits()) {
return false
}
}
// If we have a collector, it needs to agree, too.
if let Some(collector) = engine.collector.as_ref() {
if !self.collector.is_current(collector) {
return false
}
}
// Cant find any more reasons why we shouldnt be current.
true
}
/// Reads the stored status from an IO reader.
pub fn read(
reader: &mut impl io::Read
) -> Result<Self, ParseError> {
// Version number. Must be 0u8.
let version = u8::parse(reader)?;
if version != 0 {
return Err(ParseError::format(
format!("unexpected version {}", version)
))
}
let updated_ts = i64::parse(reader)?;
let updated = match Utc.timestamp_opt(updated_ts, 0).single() {
Some(time) => time,
None => {
return Err(ParseError::format(
format!("invalid update time {}", updated_ts)
));
}
};
let tal_len = usize::try_from(u32::parse(reader)?).map_err(|_| {
ParseError::format("too many TALs for this system")
})?;
let mut tals = HashSet::new();
for _ in 0..tal_len {
tals.insert(Bytes::parse(reader)?);
}
let collector = collector::StoredStatus::read(reader)?;
Ok(Self { updated, tals, collector })
}
/// Appends the stored status to a writer.
pub fn write(
&self, writer: &mut impl io::Write
) -> Result<(), io::Error> {
// Version: 0u8.
0u8.compose(writer)?;
self.updated.timestamp().compose(writer)?;
u32::try_from(self.tals.len()).map_err(|_|
io::Error::new(io::ErrorKind::Other, "excessively large TAL set")
)?.compose(writer)?;
self.tals.iter().try_for_each(|tal| tal.compose(writer))?;
self.collector.write(writer)?;
Ok(())
}
}
//============ Tests =========================================================
#[cfg(test)]
mod test {
use super::*;
#[test]
fn write_read_stored_status() {
fn cycle(
tals: impl IntoIterator<Item = &'static str>
) {
let orig = StoredStatus {
updated: Utc::now(),
tals: tals.into_iter().map(Bytes::from).collect(),
collector: collector::StoredStatus::new_test(),
};
let mut written = Vec::new();
orig.write(&mut written).unwrap();
let decoded = StoredStatus::read(
&mut written.as_slice()
).unwrap();
assert_eq!(orig, decoded);
}
cycle([]);
cycle(["foo", "barbar", ""]);
cycle([""]);
}
}
+38 -19
View File
@@ -249,7 +249,15 @@ impl Server {
validation.ignite()?;
let join = thread::spawn(move || {
// On the first iteration we accept old data at most one refresh
// cycle old. This will be set to `None` for subsequent
// iterations.
let mut last_update = Some(process.config().refresh);
// We only want to retry a run that can be retried once. If it
// fails again, something is odd and we should crash.
let mut can_retry = true;
let err = loop {
if let Some(log) = log.as_ref() {
log.start();
@@ -260,9 +268,13 @@ impl Server {
Ok(exceptions) => {
match Self::process_once(
process.config(), &validation, &history,
&mut notify, exceptions,
&mut notify, exceptions, last_update.take(),
) {
Ok(()) => {
// Succcessful run, so we can go back to
// allowing a single retry-able failure.
can_retry = true;
history.read().refresh_wait()
}
Err(err) => {
@@ -386,10 +398,13 @@ impl Server {
history: &SharedHistory,
notify: &mut NotifySender,
exceptions: LocalExceptions,
last_update: Option<Duration>,
) -> Result<(), RunFailed> {
info!("Starting a validation run.");
history.mark_update_start();
let (report, metrics) = ValidationReport::process(engine, config)?;
let (report, metrics) = ValidationReport::process(
engine, config, last_update
)?;
let must_notify = history.update(
report, &exceptions, metrics,
);
@@ -435,6 +450,9 @@ pub struct Vrps {
/// Dont update the repository.
noupdate: bool,
/// Dont update the cache if it isnt older than given.
last_updated: Option<Duration>,
/// Return an error on incomplete update.
complete: bool,
}
@@ -485,9 +503,9 @@ struct VrpsArgs {
#[arg(long)]
no_aspas: bool,
/// Don't update the local cache
#[arg(short, long)]
noupdate: bool,
/// Don't update the local cache (if it is newer than given).
#[arg(short, long, value_name = "SECONDS")]
noupdate: Option<Option<u64>>,
/// Return an error status on incomplete update
#[arg(long)]
@@ -559,18 +577,13 @@ impl Vrps {
path,
format,
output,
noupdate: args.noupdate,
noupdate: matches!(args.noupdate, Some(None)),
last_updated: args.noupdate.flatten().map(Duration::from_secs),
complete: args.complete,
})
}
/// Produces a list of Validated ROA Payload.
///
/// The list will be written to the file identified by `path` or
/// stdout if that is `None`. The format is determined by `format`.
/// If `noupdate` is `false`, the local repository will be updated first
/// and rsync will be enabled during validation to sync any new
/// publication points.
fn run(mut self, process: Process) -> Result<(), ExitError> {
self.output.update_from_config(process.config());
let mut engine = Engine::new(process.config(), !self.noupdate)?;
@@ -582,7 +595,9 @@ impl Vrps {
let mut once = false;
loop {
match ValidationReport::process(&engine, process.config()) {
match ValidationReport::process(
&engine, process.config(), self.last_updated,
) {
Ok(res) => break res,
Err(err) => {
if err.should_retry() {
@@ -667,6 +682,9 @@ pub struct Validate {
/// Dont update the repository.
noupdate: bool,
/// Dont update the cache if it isnt older than given.
last_updated: Option<Duration>,
/// Return an error on incomplete update.
complete: bool,
}
@@ -709,9 +727,9 @@ struct ValidateArgs {
#[arg(short, long, value_name = "PATH", default_value = "-")]
output: PathBuf,
/// Don't update the local cache
#[arg(short, long)]
noupdate: bool,
/// Don't update the local cache (if it is newer than given).
#[arg(short, long, value_name = "SECONDS")]
noupdate: Option<Option<u64>>,
/// Return an error status on incomplete update
#[arg(long)]
@@ -770,7 +788,8 @@ impl Validate {
Some(args.output)
}
},
noupdate: args.noupdate,
noupdate: matches!(args.noupdate, Some(None)),
last_updated: args.noupdate.flatten().map(Duration::from_secs),
complete: args.complete,
})
}
@@ -852,7 +871,7 @@ impl Validate {
engine.ignite()?;
process.switch_logging(false, false)?;
let (report, mut metrics) = ValidationReport::process(
&engine, process.config(),
&engine, process.config(), self.last_updated,
)?;
let snapshot = report.into_snapshot(
&LocalExceptions::load(process.config(), false)?,
@@ -1091,7 +1110,7 @@ impl Update {
engine.ignite()?;
process.switch_logging(false, false)?;
let (_, metrics) = ValidationReport::process(
&engine, process.config(),
&engine, process.config(), None
)?;
if self.complete && !metrics.rsync_complete() {
Err(ExitError::IncompleteUpdate)
+4 -3
View File
@@ -17,6 +17,7 @@ use std::cmp;
use std::collections::hash_map;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use crossbeam_queue::SegQueue;
use log::{info, warn};
use rpki::uri;
@@ -94,13 +95,13 @@ impl ValidationReport {
/// Creates a new validation report by running the engine.
pub fn process(
engine: &Engine, config: &Config,
engine: &Engine, config: &Config, last_update: Option<Duration>
) -> Result<(Self, Metrics), RunFailed> {
let report = Self::new(config);
let mut run = engine.start(&report)?;
let mut run = engine.start(&report, last_update)?;
run.process()?;
run.cleanup()?;
let metrics = run.done();
let metrics = run.done()?;
Ok((report, metrics))
}
+1 -1
View File
@@ -36,7 +36,7 @@ impl<'a> ValidationReport<'a> {
&self,
engine: &Engine,
) -> Result<(), RunFailed> {
let mut run = engine.start(self)?;
let mut run = engine.start(self, None)?;
run.process()?;
run.cleanup()?;
Ok(())
+1 -1
View File
@@ -430,7 +430,7 @@ impl<'a> Run<'a> {
fn new(
store: &'a Store,
) -> Self {
Run { store }
Self { store }
}
/// Finishes the validation run.
+18 -2
View File
@@ -6,8 +6,7 @@
use std::{error, fmt, io, slice};
use bytes::Bytes;
use rpki::rrdp;
use rpki::uri;
use rpki::{rrdp, uri};
use uuid::Uuid;
@@ -40,6 +39,23 @@ impl<R: io::Read> Parse<R> for u8 {
}
//------------ u16 -----------------------------------------------------------
impl<W: io::Write> Compose<W> for u16 {
fn compose(&self, target: &mut W) -> Result<(), io::Error> {
target.write_all(&self.to_be_bytes())
}
}
impl<R: io::Read> Parse<R> for u16 {
fn parse(source: &mut R) -> Result<Self, ParseError> {
let mut res = 0u16.to_ne_bytes();
source.read_exact(&mut res)?;
Ok(u16::from_be_bytes(res))
}
}
//------------ u32 -----------------------------------------------------------
impl<W: io::Write> Compose<W> for u32 {
+10 -7
View File
@@ -33,13 +33,16 @@ pub struct ExclusiveFile {
impl ExclusiveFile {
/// Opens an exclusive file.
///
/// Opens the file specified by `path` for reading and writing and blocks
/// access to it by other processes and possibly re-opening it by the same
/// process again, too. If the file doesnt exist, it will be created.
/// Trys to Open the file specified by `path` for reading and writing and
/// blocks access to it by other processes and possibly re-opening it by
/// the same process again, too. If the file doesnt exist, it will be
/// created.
///
/// Returns an error if opening fails. Because there is no portable way
/// to indicate opening failing because of an already locked file, this
/// will always just be a generic IO error.
/// If the file has already been opened exclusively, the function will
/// fail and return an error. Because there is no portable way
/// to indicate this error case, a generic IO error will be returned.
///
/// An error is also returned if opening fails for other reasons.
pub fn open(path: &Path) -> Result<Self, io::Error> {
Self::_open(path)
}
@@ -67,7 +70,7 @@ impl ExclusiveFile {
let _ = nix::fcntl::fcntl(
file.as_raw_fd(),
nix::fcntl::FcntlArg::F_SETLK(&nix::libc::flock {
l_type: (nix::libc::F_RDLCK | nix::libc::F_WRLCK) as c_short,
l_type: nix::libc::F_WRLCK as c_short,
l_whence: nix::libc::SEEK_SET as c_short,
l_start: 0,
l_len: 0,