Store RRDP data in one file per repository. (#886)

This PR implements a very simple archive that can be used to store, update,
and access the objects published by an RRDP repository in a single file.
This commit is contained in:
Martin Hoffmann
2023-11-29 17:26:22 +01:00
committed by GitHub
parent 53e79f93cc
commit c47461c17c
23 changed files with 5167 additions and 2849 deletions
Generated
+7
View File
@@ -1160,6 +1160,7 @@ dependencies = [
"rustls-pemfile",
"serde",
"serde_json",
"siphasher",
"syslog",
"tempfile",
"tokio",
@@ -1367,6 +1368,12 @@ dependencies = [
"libc",
]
[[package]]
name = "siphasher"
version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
[[package]]
name = "slab"
version = "0.4.9"
+2 -1
View File
@@ -34,6 +34,7 @@ rpki = { version = "0.17.2", features = [ "repository", "rrdp", "rtr"
rustls-pemfile = "1"
serde = { version = "1.0.95", features = [ "derive" ] }
serde_json = "1.0.57"
siphasher = "0.3.10"
tempfile = "3.1.0"
tokio = { version = "1.24", features = [ "io-util", "macros", "process", "rt", "rt-multi-thread", "signal", "sync" ] }
tokio-rustls = "0.24.1"
@@ -44,7 +45,7 @@ routinator-ui = { version = "0.3.4", optional = true }
[target.'cfg(unix)'.dependencies]
nix = { version = "0.27.1", features = ["fs", "net", "process", "socket", "user"] }
nix = { version = "0.27.1", features = ["fs", "mman", "net", "process", "socket", "user"] }
syslog = "6"
[features]
+56 -101
View File
@@ -140,9 +140,9 @@ checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635"
[[package]]
name = "bumpalo"
version = "3.13.0"
version = "3.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1"
checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec"
[[package]]
name = "bytes"
@@ -168,9 +168,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.30"
version = "0.4.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "defd4e7873dbddba6c7c91e199c7fcb946abc4a6a4ac3195400bcfb01b5de877"
checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38"
dependencies = [
"android-tzdata",
"arbitrary",
@@ -184,9 +184,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.4.3"
version = "4.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84ed82781cea27b43c9b106a979fe450a13a31aab0500595fb3fc06616de08e6"
checksum = "b1d7b8d5ec32af0fadc644bf1fd509a688c2103b185644bb1e29d164e0703136"
dependencies = [
"clap_builder",
"clap_derive",
@@ -194,9 +194,9 @@ dependencies = [
[[package]]
name = "clap_builder"
version = "4.4.2"
version = "4.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bb9faaa7c2ef94b2743a21f5a29e6f0010dff4caa69ac8e9d6cf8b6fa74da08"
checksum = "5179bb514e4d7c2051749d8fcefa2ed6d06a9f4e6d69faf3805f5d80b8cf8d56"
dependencies = [
"anstream",
"anstyle",
@@ -540,9 +540,9 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
[[package]]
name = "hermit-abi"
version = "0.3.2"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b"
checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7"
[[package]]
name = "hostname"
@@ -680,17 +680,6 @@ dependencies = [
"hashbrown 0.14.0",
]
[[package]]
name = "io-lifetimes"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
dependencies = [
"hermit-abi",
"libc",
"windows-sys",
]
[[package]]
name = "ipnet"
version = "2.8.0"
@@ -738,12 +727,6 @@ dependencies = [
"once_cell",
]
[[package]]
name = "linux-raw-sys"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519"
[[package]]
name = "linux-raw-sys"
version = "0.4.7"
@@ -781,9 +764,9 @@ checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c"
[[package]]
name = "memoffset"
version = "0.7.1"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4"
checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c"
dependencies = [
"autocfg",
]
@@ -816,15 +799,14 @@ dependencies = [
[[package]]
name = "nix"
version = "0.26.4"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b"
checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053"
dependencies = [
"bitflags 1.3.2",
"bitflags 2.4.0",
"cfg-if",
"libc",
"memoffset",
"pin-utils",
]
[[package]]
@@ -902,9 +884,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "proc-macro2"
version = "1.0.66"
version = "1.0.67"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9"
checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328"
dependencies = [
"unicode-ident",
]
@@ -1043,10 +1025,9 @@ dependencies = [
[[package]]
name = "routinator"
version = "0.13.0-dev"
version = "0.13.1-dev"
dependencies = [
"arbitrary",
"bcder",
"bytes",
"chrono",
"clap",
@@ -1064,10 +1045,11 @@ dependencies = [
"reqwest",
"ring",
"routinator-ui",
"rpki 0.17.1",
"rpki",
"rustls-pemfile",
"serde",
"serde_json",
"siphasher",
"syslog",
"tempfile",
"tokio",
@@ -1081,9 +1063,11 @@ dependencies = [
name = "routinator-fuzz"
version = "0.0.0"
dependencies = [
"arbitrary",
"libfuzzer-sys",
"routinator",
"rpki 0.17.2-dev",
"rpki",
"tempfile",
]
[[package]]
@@ -1099,9 +1083,9 @@ dependencies = [
[[package]]
name = "rpki"
version = "0.17.1"
version = "0.17.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2e2cf92592175551ef134dba1b30f8d1526479e680399d3a1eef27136023373"
checksum = "98a05b958a41ba8c923cf14bd2ad5f1aca3f3509c8ffd147c36e094346a0290b"
dependencies = [
"arbitrary",
"base64",
@@ -1120,27 +1104,6 @@ dependencies = [
"uuid",
]
[[package]]
name = "rpki"
version = "0.17.2-dev"
source = "git+https://github.com/NLnetLabs/rpki-rs.git#026ed37ec736ad74d6f8cd6bcf09c472f78ed7b8"
dependencies = [
"base64",
"bcder",
"bytes",
"chrono",
"futures-util",
"log",
"quick-xml",
"ring",
"serde",
"serde_json",
"tokio",
"tokio-stream",
"untrusted",
"uuid",
]
[[package]]
name = "rustc-demangle"
version = "0.1.23"
@@ -1149,28 +1112,14 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]]
name = "rustix"
version = "0.37.23"
version = "0.38.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d69718bf81c6127a49dc64e44a742e8bb9213c0ff8869a22c308f84c1d4ab06"
dependencies = [
"bitflags 1.3.2",
"errno",
"io-lifetimes",
"libc",
"linux-raw-sys 0.3.8",
"windows-sys",
]
[[package]]
name = "rustix"
version = "0.38.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7db8590df6dfcd144d22afd1b83b36c21a18d7cbc1dc4bb5295a8712e9eb662"
checksum = "747c788e9ce8e92b12cd485c49ddf90723550b654b32508f979b71a7b1ecda4f"
dependencies = [
"bitflags 2.4.0",
"errno",
"libc",
"linux-raw-sys 0.4.7",
"linux-raw-sys",
"windows-sys",
]
@@ -1197,9 +1146,9 @@ dependencies = [
[[package]]
name = "rustls-webpki"
version = "0.101.5"
version = "0.101.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45a27e3b59326c16e23d30aeb7a36a24cc0d29e71d68ff611cdfb4a01d013bed"
checksum = "3c7d5dece342910d9ba34d259310cae3e0154b873b35408b787b59bce53d34fe"
dependencies = [
"ring",
"untrusted",
@@ -1243,9 +1192,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.106"
version = "1.0.107"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cc66a619ed80bf7a0f6b17dd063a84b88f6dea1813737cf469aef1d081142c2"
checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"
dependencies = [
"itoa",
"ryu",
@@ -1273,6 +1222,12 @@ dependencies = [
"libc",
]
[[package]]
name = "siphasher"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d"
[[package]]
name = "slab"
version = "0.4.9"
@@ -1284,9 +1239,9 @@ dependencies = [
[[package]]
name = "smallvec"
version = "1.11.0"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9"
checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a"
[[package]]
name = "socket2"
@@ -1322,9 +1277,9 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "syn"
version = "2.0.32"
version = "2.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "239814284fd6f1a4ffe4ca893952cdd93c224b6a1571c9a9eadd670295c0c9e2"
checksum = "7303ef2c05cd654186cb250d29049a24840ca25d2747c25c0381c8d9e2f582e8"
dependencies = [
"proc-macro2",
"quote",
@@ -1364,17 +1319,17 @@ dependencies = [
"cfg-if",
"fastrand",
"redox_syscall 0.3.5",
"rustix 0.38.13",
"rustix",
"windows-sys",
]
[[package]]
name = "terminal_size"
version = "0.2.6"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e6bf6f19e9f8ed8d4048dc22981458ebcf406d67e94cd422e5ecd73d63b3237"
checksum = "21bebf2b7c9e0a515f6e0f8c51dc0f8e4696391e6f1ff30379559f8365fb0df7"
dependencies = [
"rustix 0.37.23",
"rustix",
"windows-sys",
]
@@ -1400,9 +1355,9 @@ dependencies = [
[[package]]
name = "time"
version = "0.3.28"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17f6bb557fd245c28e6411aa56b6403c689ad95061f50e4be16c274e70a17e48"
checksum = "426f806f4089c493dcac0d24c29c01e2c38baf8e30f1b716ee37e83d200b18fe"
dependencies = [
"deranged",
"itoa",
@@ -1415,15 +1370,15 @@ dependencies = [
[[package]]
name = "time-core"
version = "0.1.1"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb"
checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3"
[[package]]
name = "time-macros"
version = "0.2.14"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a942f44339478ef67935ab2bbaec2fb0322496cf3cbe84b261e06ac3814c572"
checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20"
dependencies = [
"time-core",
]
@@ -1507,9 +1462,9 @@ dependencies = [
[[package]]
name = "tokio-util"
version = "0.7.8"
version = "0.7.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d"
checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d"
dependencies = [
"bytes",
"futures-core",
@@ -1527,9 +1482,9 @@ checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b"
[[package]]
name = "toml_edit"
version = "0.19.15"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421"
checksum = "8ff63e60a958cefbb518ae1fd6566af80d9d4be430a33f3723dfc47d1d411d95"
dependencies = [
"indexmap 2.0.0",
"toml_datetime",
+9 -2
View File
@@ -8,8 +8,10 @@ edition = "2021"
cargo-fuzz = true
[dependencies]
arbitrary = "1"
libfuzzer-sys = "0.4"
rpki = { git = "https://github.com/NLnetLabs/rpki-rs.git", features = [ "repository", "rrdp", "rtr", "serde", "slurm" ] }
rpki = { version = "0.17.1", features = [ "repository", "rrdp", "rtr", "serde", "slurm" ] }
tempfile = "3.1.0"
[dependencies.routinator]
path = ".."
@@ -22,13 +24,18 @@ members = ["."]
[profile.release]
debug = 1
[[bin]]
name = "archive"
path = "fuzz_targets/archive.rs"
test = false
doc = false
[[bin]]
name = "construct_delta"
path = "fuzz_targets/construct_delta.rs"
test = false
doc = false
[[bin]]
name = "merge_deltas"
path = "fuzz_targets/merge_deltas.rs"
+105
View File
@@ -0,0 +1,105 @@
#![no_main]
use arbitrary::Arbitrary;
use std::collections::HashMap;
use libfuzzer_sys::fuzz_target;
use routinator::utils::archive::{
AccessError, Archive, ArchiveError, ObjectMeta, PublishError,
StorageRead, StorageWrite,
};
#[derive(Arbitrary, Clone, Debug)]
enum Op {
Publish { name: Vec<u8>, data: Vec<u8> },
Update { name: Vec<u8>, data: Vec<u8> },
Delete { name: Vec<u8> },
}
struct Meta;
impl ObjectMeta for Meta {
const SIZE: usize = 4;
type ConsistencyError = ();
fn write(
&self, write: &mut StorageWrite
) -> Result<(), ArchiveError> {
write.write(b"abcd")
}
fn read(
read: &mut StorageRead
) -> Result<Self, ArchiveError> {
let slice = read.read_slice(4).unwrap();
assert_eq!(slice.as_ref(), b"abcd");
Ok(Meta)
}
}
fn check_archive(
archive: &Archive<Meta>,
content: &HashMap<Vec<u8>, Vec<u8>>,
) {
archive.verify().unwrap();
let mut content = content.clone();
for item in archive.objects().unwrap() {
let (name, _, data) = item.unwrap();
assert_eq!(
content.remove(name.as_ref()).as_ref().map(|x| x.as_slice()),
Some(data.as_ref())
);
}
assert!(content.is_empty());
}
fn run_archive(ops: impl IntoIterator<Item = Op>) {
let mut archive = Archive::create_with_file(
tempfile::tempfile().unwrap()
).unwrap();
let mut content = HashMap::new();
for item in ops {
match item {
Op::Publish { name, data } => {
if name.is_empty() { continue }
let res = archive.publish(name.as_ref(), &Meta, data.as_ref());
if content.contains_key(&name) {
assert!(matches!(res, Err(PublishError::AlreadyExists)))
}
else {
content.insert(name, data);
assert!(matches!(res, Ok(())));
}
}
Op::Update { name, data } => {
if name.is_empty() { continue }
let res = archive.update(
name.as_ref(), &Meta, data.as_ref(), |_| Ok(())
);
if content.contains_key(&name) {
content.insert(name, data);
assert!(matches!(res, Ok(())));
}
else {
assert!(matches!(res, Err(AccessError::NotFound)))
}
}
Op::Delete { name } => {
if name.is_empty() { continue }
let res = archive.delete(name.as_ref(), |_| Ok(()));
if content.remove(name.as_slice()).is_some() {
assert!(matches!(res, Ok(())))
}
else {
assert!(matches!(res, Err(AccessError::NotFound)))
}
}
}
check_archive(&archive, &content);
}
}
fuzz_target!{|actions: Vec<Op>| {
run_archive(actions)
}}
+4 -4
View File
@@ -47,10 +47,10 @@ fuzz_target!{|data: (PayloadSnapshot, PayloadSnapshot, Serial)| {
assert_eq!(delta_keys, set_keys);
let old_aspas: HashMap<_, _> = old.aspas().map(|x| {
((x.0.customer, x.0.afi), x.0.providers.clone())
(x.0.customer, x.0.providers.clone())
}).collect();
let new_aspas: HashMap<_, _> = new.aspas().map(|x| {
((x.0.customer, x.0.afi), x.0.providers.clone())
(x.0.customer, x.0.providers.clone())
}).collect();
let delta_aspas: Vec<_> = delta.aspa_actions().map(|x| (x.0.clone(), x.1)).collect();
@@ -60,12 +60,12 @@ fuzz_target!{|data: (PayloadSnapshot, PayloadSnapshot, Serial)| {
return None
}
}
Some((Aspa::new(key.0, key.1, val.clone()), Action::Announce))
Some((Aspa::new(*key, val.clone()), Action::Announce))
}).chain(
old_aspas.keys().filter_map(|key| {
if !new_aspas.contains_key(key) {
Some((
Aspa::new(key.0, key.1, ProviderAsns::empty()),
Aspa::new(*key, ProviderAsns::empty()),
Action::Withdraw
))
}
+17 -5
View File
@@ -4,12 +4,13 @@
use std::collections::HashSet;
use std::path::Path;
use std::sync::Arc;
use bytes::Bytes;
use log::info;
use rpki::repository::tal::TalUri;
use rpki::uri;
use crate::config::{Config, FallbackPolicy};
use crate::error::Failed;
use crate::error::{Failed, Fatal, RunFailed};
use crate::metrics::Metrics;
use crate::engine::CaCert;
use super::{rrdp, rsync};
@@ -80,6 +81,17 @@ impl Collector {
Ok(())
}
/// Sanitizes the stored data.
pub fn sanitize(&self) -> Result<(), Fatal> {
if let Some(rrdp) = self.rrdp.as_ref() {
rrdp.sanitize()?;
}
if let Some(rsync) = self.rsync.as_ref() {
rsync.sanitize()?;
}
Ok(())
}
/// Starts a new validation run using this collector.
pub fn start(&self) -> Run {
Run::new(self)
@@ -178,7 +190,7 @@ impl<'a> Run<'a> {
/// `Ok(None)`.
pub fn repository<'s>(
&'s self, ca: &'s CaCert
) -> Result<Option<Repository<'s>>, Failed> {
) -> Result<Option<Repository<'s>>, RunFailed> {
// See if we should and can use RRDP
if let Some(rrdp_uri) = ca.rpki_notify() {
if let Some(ref rrdp) = self.rrdp {
@@ -273,7 +285,7 @@ enum RepoInner<'a> {
/// The repository is accessed via RRDP.
Rrdp {
/// The repository.
repository: rrdp::Repository,
repository: Arc<rrdp::ReadRepository>,
},
/// The repository is accessed via rsync.
@@ -285,7 +297,7 @@ enum RepoInner<'a> {
impl<'a> Repository<'a> {
/// Creates a RRDP repository.
fn rrdp(repository: rrdp::Repository) -> Self {
fn rrdp(repository: Arc<rrdp::ReadRepository>) -> Self {
Repository(RepoInner::Rrdp { repository })
}
@@ -307,7 +319,7 @@ impl<'a> Repository<'a> {
/// information and returns `None`.
pub fn load_object(
&self, uri: &uri::Rsync
) -> Result<Option<Bytes>, Failed> {
) -> Result<Option<Bytes>, RunFailed> {
match self.0 {
RepoInner::Rrdp { ref repository } => {
repository.load_object(uri)
File diff suppressed because it is too large Load Diff
+519
View File
@@ -0,0 +1,519 @@
use std::{cmp, io, fs};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
use chrono::{DateTime, TimeZone, Utc};
use log::{error, warn};
use rand::Rng;
use rpki::{rrdp, uri};
use uuid::Uuid;
use crate::config::Config;
use crate::error::RunFailed;
use crate::utils::archive;
use crate::utils::archive::{
Archive, ArchiveError, FetchError, OpenError, PublishError
};
use crate::utils::binio::{Compose, Parse};
//------------ RrdpArchive ---------------------------------------------------
#[derive(Debug)]
pub struct RrdpArchive {
/// The path where everything from this repository lives.
path: Arc<PathBuf>,
/// The archive for the repository.
archive: archive::Archive<RrdpObjectMeta>,
}
impl RrdpArchive {
pub fn create(
path: Arc<PathBuf>
) -> Result<Self, RunFailed> {
let archive = Archive::create(path.as_ref()).map_err(|err| {
archive_err(err, path.as_ref())
})?;
Ok(Self { path, archive })
}
pub fn create_with_file(
file: fs::File,
path: Arc<PathBuf>,
) -> Result<Self, RunFailed> {
let archive = Archive::create_with_file(file).map_err(|err| {
archive_err(err, path.as_ref())
})?;
Ok(Self { path, archive })
}
pub fn try_open(path: Arc<PathBuf>) -> Result<Option<Self>, RunFailed> {
let archive = match Archive::open(path.as_ref(), true) {
Ok(archive) => archive,
Err(OpenError::NotFound) => return Ok(None),
Err(OpenError::Archive(err)) => {
return Err(archive_err(err, path.as_ref()))
}
};
Ok(Some(Self { path, archive }))
}
pub fn open(path: Arc<PathBuf>) -> Result<Self, RunFailed> {
let archive = archive::Archive::open(
path.as_ref(), false
).map_err(|err| match err {
OpenError::NotFound => {
warn!(
"RRDP repository file {} not found.", path.display()
);
RunFailed::retry()
}
OpenError::Archive(err) => archive_err(err, path.as_ref())
})?;
Ok(Self { path, archive })
}
pub fn path(&self) -> &Arc<PathBuf> {
&self.path
}
}
impl RrdpArchive {
pub fn verify(path: &Path) -> Result<(), OpenError> {
let archive = archive::Archive::<RrdpObjectMeta>::open(path, false)?;
archive.verify()?;
Ok(())
}
/// Loads an object from the archive.
///
/// The object is identified by its rsync URI. If the object doesnt
/// exist, returns `None`.
pub fn load_object(
&self,
uri: &uri::Rsync
) -> Result<Option<Bytes>, RunFailed> {
let res = self.archive.fetch_bytes(uri.as_ref());
match res {
Ok(res) => Ok(Some(res)),
Err(FetchError::NotFound) => Ok(None),
Err(FetchError::Archive(err)) => {
Err(archive_err(err, self.path.as_ref()))
}
}
}
/// Loads the repository state.
///
/// Returns an error if the state is missing or broken.
pub fn load_state(&self) -> Result<RepositoryState, RunFailed> {
let data = match self.archive.fetch(b"state") {
Ok(data) => data,
Err(archive::FetchError::NotFound) => {
return Err(
archive_err(ArchiveError::Corrupt, self.path.as_ref())
)
}
Err(archive::FetchError::Archive(err)) => {
return Err(archive_err(err, self.path.as_ref()))
}
};
let mut data = data.as_ref();
RepositoryState::parse(&mut data).map_err(|_| {
archive_err(ArchiveError::Corrupt, self.path.as_ref())
})
}
/// Iterates over all the objects in the repository.
pub fn objects(
&self
) -> Result<
impl Iterator<Item = Result<(uri::Rsync, Bytes), RunFailed>> + '_,
RunFailed
> {
self.archive.objects().map(|iter| {
iter.filter_map(|item| {
let (name, _meta, data) = match item {
Ok(some) => some,
Err(ArchiveError::Corrupt) => {
return Some(Err(RunFailed::retry()))
}
Err(ArchiveError::Io(_)) => {
return Some(Err(RunFailed::fatal()))
}
};
let name = uri::Rsync::from_bytes(
name.into_owned().into()
).ok()?;
Some(Ok((name, data.into_owned().into())))
})
}).map_err(|err| {
match err {
ArchiveError::Corrupt => RunFailed::retry(),
ArchiveError::Io(_) => RunFailed::fatal(),
}
})
}
}
impl RrdpArchive {
/// Publishes a new object to the archie.
pub fn publish_object(
&mut self,
uri: &uri::Rsync,
content: &[u8]
) -> Result<(), PublishError> {
self.archive.publish(
uri.as_ref(),
&RrdpObjectMeta::from_content(content),
content
)
}
/// Updates an object in the archive.
pub fn update_object(
&mut self,
uri: &uri::Rsync,
hash: rrdp::Hash,
content: &[u8]
) -> Result<(), AccessError> {
Ok(self.archive.update(
uri.as_ref(),
&RrdpObjectMeta::from_content(content),
content,
|meta| {
if meta.hash == hash {
Ok(())
}
else {
Err(HashMismatch)
}
}
)?)
}
/// Deletes an object from the archive.
pub fn delete_object(
&mut self, uri: &uri::Rsync, hash: rrdp::Hash,
) -> Result<(), AccessError> {
Ok(self.archive.delete(
uri.as_ref(),
|meta| {
if meta.hash == hash {
Ok(())
}
else {
Err(HashMismatch)
}
}
)?)
}
pub fn publish_state(
&mut self, state: &RepositoryState
) -> Result<(), RunFailed> {
let mut buf = Vec::new();
state.compose(&mut buf).expect("writing to vec failed");
self.archive.publish(
b"state", &Default::default(), &buf
).map_err(|err| match err {
archive::PublishError::Archive(ArchiveError::Io(err)) => {
error!(
"Fatal: Failed write to RRDP repository archive {}: {}",
self.path.display(), err
);
RunFailed::fatal()
}
_ => {
warn!(
"Failed to write local RRDP repository state in {}.",
self.path.display()
);
RunFailed::retry()
}
})
}
pub fn update_state(
&mut self, state: &RepositoryState
) -> Result<(), RunFailed> {
let mut buf = Vec::new();
state.compose(&mut buf).expect("writing to vec failed");
self.archive.update(
b"state", &Default::default(), &buf,
|_| Ok(())
).map_err(|err| match err {
archive::AccessError::Archive(ArchiveError::Io(err)) => {
error!(
"Fatal: Failed write to RRDP repository archive {}: {}",
self.path.display(), err
);
RunFailed::fatal()
}
_ => {
warn!(
"Failed to update local RRDP repository state in {}.",
self.path.display()
);
RunFailed::retry()
}
})
}
}
//------------ archive_err ---------------------------------------------------
fn archive_err(err: ArchiveError, path: &Path) -> RunFailed {
match err {
ArchiveError::Corrupt => {
warn!(
"RRDP repository file '{}' is corrupt. \
Deleting and starting again.",
path.display()
);
match fs::remove_file(path) {
Ok(()) => {
RunFailed::retry()
}
Err(err) => {
warn!(
"Deleting RRDP repository archive '{}' failed: {}",
path.display(),
err
);
RunFailed::fatal()
}
}
}
ArchiveError::Io(err) => {
error!(
"Fatal: Failed to access RRDP repository archive '{}': {}",
path.display(),
err
);
RunFailed::fatal()
}
}
}
//------------ RrdpObjectMeta ------------------------------------------------
/// The meta data for an RRDP object.
#[derive(Clone, Copy, Debug)]
pub struct RrdpObjectMeta {
hash: rrdp::Hash,
}
impl Default for RrdpObjectMeta {
fn default() -> Self {
Self {
hash: [0; 32].into(),
}
}
}
impl RrdpObjectMeta {
pub fn from_content(content: &[u8]) -> Self {
Self {
hash: rrdp::Hash::from_data(content)
}
}
}
impl archive::ObjectMeta for RrdpObjectMeta {
const SIZE: usize = 32;
type ConsistencyError = HashMismatch;
fn write(
&self, write: &mut archive::StorageWrite
) -> Result<(), ArchiveError> {
write.write(self.hash.as_slice())
}
fn read(
read: &mut archive::StorageRead
) -> Result<Self, ArchiveError> {
Ok(Self { hash: read.read_array()?.into() })
}
}
//------------ RepositoryState -----------------------------------------------
/// The current state of an RRDP repository.
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct RepositoryState {
/// The rpkiNotify URI of the repository.
pub rpki_notify: uri::Https,
/// The UUID of the current session of repository.
pub session: Uuid,
/// The serial number within the current session.
pub serial: u64,
/// Unix timestamp in seconds of the time of last update of the server.
///
/// We are not using `DateTime<Utc>` here since we dont need sub-second
/// precision and converting on the fly makes a value change when cycled
/// through the database as its sub-second portion is forced to zero.
pub updated_ts: i64,
/// The time when we consider the stored data to be expired.
pub best_before_ts: i64,
/// The value of the date header of the notification file if present.
///
/// Given as the Unix timestamp in seconds.
pub last_modified_ts: Option<i64>,
/// The value of the ETag header of the notification file if present.
///
/// This is the complete tag including the quotation marks and possibly
/// the weak prefix.
pub etag: Option<Bytes>,
}
impl RepositoryState {
/// Reads the state from an IO reader.
fn parse(reader: &mut impl io::Read) -> Result<Self, io::Error> {
// Version number. Must be 0u8.
let version = u8::parse(reader)?;
if version != 0 {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("unexpected version {}", version)
))
}
Ok(RepositoryState {
rpki_notify: Parse::parse(reader)?,
session: Parse::parse(reader)?,
serial: Parse::parse(reader)?,
updated_ts: Parse::parse(reader)?,
best_before_ts: Parse::parse(reader)?,
last_modified_ts: Parse::parse(reader)?,
etag: Parse::parse(reader)?,
})
}
/// Composes the encoded state.
fn compose(&self, writer: &mut impl io::Write) -> Result<(), io::Error> {
0u8.compose(writer)?; // version
self.rpki_notify.compose(writer)?;
self.session.compose(writer)?;
self.serial.compose(writer)?;
self.updated_ts.compose(writer)?;
self.best_before_ts.compose(writer)?;
self.last_modified_ts.compose(writer)?;
self.etag.compose(writer)?;
Ok(())
}
/// Returns the last update time as proper timestamp.
///
/// Returns `None` if the time cannot be converted into a timestamp for
/// some reason.
pub fn updated(&self) -> Option<DateTime<Utc>> {
Utc.timestamp_opt(self.updated_ts, 0).single()
}
/// Returns the best before time as a proper timestamp.
///
/// Returns `None` if the time cannot be converted into a timestamp for
/// some reason.
pub fn best_before(&self) -> Option<DateTime<Utc>> {
Utc.timestamp_opt(self.best_before_ts, 0).single()
}
/// Sets the update time to now.
pub fn touch(&mut self, fallback: FallbackTime) {
self.updated_ts = Utc::now().timestamp();
self.best_before_ts = fallback.best_before().timestamp();
}
/// Returns whether this repository should be considered expired.
///
/// If in doubt, this will return `true`.
pub fn is_expired(&self) -> bool {
match self.best_before() {
Some(best_before) => Utc::now() > best_before,
None => true,
}
}
/// Returns the last modified time.
///
/// Returns `None` if there we do not have a last modifed time or if
/// it cannot be converted from a Unix timestamp into a date-time.
pub fn last_modified(&self) -> Option<DateTime<Utc>> {
self.last_modified_ts.and_then(|ts| Utc.timestamp_opt(ts, 0).single())
}
}
//------------ FallbackTime --------------------------------------------------
/// Parameters for calculating the best-before time of repositories.
#[derive(Clone, Copy, Debug)]
pub struct FallbackTime {
min: Duration,
max: Duration,
}
impl FallbackTime {
/// Creates a new value from the configuration.
pub fn from_config(config: &Config) -> Self {
FallbackTime {
min: config.refresh,
max: cmp::max(2 * config.refresh, config.rrdp_fallback_time)
}
}
/// Picks a best-before date for a repository updated around now.
pub fn best_before(self) -> DateTime<Utc> {
// Saturating conversion between stds and chronos Duration types.
Utc::now() + chrono::Duration::from_std(
rand::thread_rng().gen_range(self.min..self.max)
).unwrap_or_else(|_| chrono::Duration::milliseconds(i64::MAX))
}
}
//============ Errors ========================================================
//------------ HashMismatch --------------------------------------------------
#[derive(Debug)]
pub struct HashMismatch;
//------------ AccessError ---------------------------------------------------
/// An error happened while publishing an object.
#[derive(Debug)]
pub enum AccessError {
/// The object does not exist.
NotFound,
/// The objects hash is wrong
HashMismatch,
/// An error happened while trying to access the archive.
Archive(ArchiveError),
}
impl From<archive::AccessError<HashMismatch>> for AccessError {
fn from(err: archive::AccessError<HashMismatch>) -> Self {
match err {
archive::AccessError::NotFound => AccessError::NotFound,
archive::AccessError::Inconsistent(_) => AccessError::HashMismatch,
archive::AccessError::Archive(err) => AccessError::Archive(err),
}
}
}
File diff suppressed because it is too large Load Diff
+486
View File
@@ -0,0 +1,486 @@
use std::{fs, io};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::time::Duration;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use log::{error, warn};
use reqwest::header;
use reqwest::{Certificate, Proxy, StatusCode};
use reqwest::blocking::{Client, ClientBuilder, RequestBuilder, Response};
use rpki::uri;
use crate::config::Config;
use crate::error::Fatal;
use crate::utils::date::{format_http_date, parse_http_date};
//------------ HttpClient ----------------------------------------------------
/// The HTTP client for updating RRDP repositories.
#[derive(Debug)]
pub struct HttpClient {
/// The (blocking) reqwest client.
///
/// This will be of the error variant until `ignite` has been called. Yes,
/// that is not ideal but
client: Result<Client, Option<ClientBuilder>>,
/// The base directory for storing copies of responses if that is enabled.
response_dir: Option<PathBuf>,
/// The timeout for requests.
timeout: Option<Duration>,
}
impl HttpClient {
/// Creates a new, not-yet-ignited client based on the config.
pub fn new(config: &Config) -> Result<Self, Fatal> {
// Deal with the reqwests TLS features by defining a creator
// function for the two cases.
#[cfg(not(feature = "native-tls"))]
fn create_builder() -> ClientBuilder {
Client::builder().use_rustls_tls()
}
#[cfg(feature = "native-tls")]
fn create_builder() -> ClientBuilder {
Client::builder().use_native_tls()
}
let mut builder = create_builder();
builder = builder.user_agent(&config.rrdp_user_agent);
builder = builder.tcp_keepalive(config.rrdp_tcp_keepalive);
builder = builder.timeout(None); // Set per request.
if let Some(timeout) = config.rrdp_connect_timeout {
builder = builder.connect_timeout(timeout);
}
if let Some(addr) = config.rrdp_local_addr {
builder = builder.local_address(addr)
}
for path in &config.rrdp_root_certs {
builder = builder.add_root_certificate(
Self::load_cert(path)?
);
}
for proxy in &config.rrdp_proxies {
let proxy = match Proxy::all(proxy) {
Ok(proxy) => proxy,
Err(err) => {
error!(
"Invalid rrdp-proxy '{}': {}", proxy, err
);
return Err(Fatal)
}
};
builder = builder.proxy(proxy);
}
Ok(HttpClient {
client: Err(Some(builder)),
response_dir: config.rrdp_keep_responses.clone(),
timeout: config.rrdp_timeout,
})
}
/// Ignites the client.
///
/// This _must_ be called before any other methods can be called. It must
/// be called after any potential fork on Unix systems because it spawns
/// threads.
pub fn ignite(&mut self) -> Result<(), Fatal> {
let builder = match self.client.as_mut() {
Ok(_) => return Ok(()),
Err(builder) => match builder.take() {
Some(builder) => builder,
None => {
error!("Previously failed to initialize HTTP client.");
return Err(Fatal)
}
}
};
let client = match builder.build() {
Ok(client) => client,
Err(err) => {
error!("Failed to initialize HTTP client: {}.", err);
return Err(Fatal)
}
};
self.client = Ok(client);
Ok(())
}
/// Loads a WebPKI trusted certificate.
fn load_cert(path: &Path) -> Result<Certificate, Fatal> {
let mut file = match fs::File::open(path) {
Ok(file) => file,
Err(err) => {
error!(
"Cannot open rrdp-root-cert file '{}': {}'",
path.display(), err
);
return Err(Fatal);
}
};
let mut data = Vec::new();
if let Err(err) = io::Read::read_to_end(&mut file, &mut data) {
error!(
"Cannot read rrdp-root-cert file '{}': {}'",
path.display(), err
);
return Err(Fatal);
}
Certificate::from_pem(&data).map_err(|err| {
error!(
"Cannot decode rrdp-root-cert file '{}': {}'",
path.display(), err
);
Fatal
})
}
/// Returns a reference to the reqwest client.
///
/// # Panics
///
/// The method panics if the client hasnt been ignited yet.
fn client(&self) -> &Client {
self.client.as_ref().expect("HTTP client has not been ignited")
}
/// Performs an HTTP GET request for the given URI.
///
/// If keeping responses is enabled, the response is written to a file
/// corresponding to the URI. If the resource behind the URI changes over
/// time and this change should be tracked, set `multi` to `true` to
/// include the current time in the file name.
pub fn response(
&self,
uri: &uri::Https,
multi: bool,
) -> Result<HttpResponse, reqwest::Error> {
self._response(uri, self.client().get(uri.as_str()), multi)
}
pub fn conditional_response(
&self,
uri: &uri::Https,
etag: Option<&Bytes>,
last_modified: Option<DateTime<Utc>>,
multi: bool,
) -> Result<HttpResponse, reqwest::Error> {
let mut request = self.client().get(uri.as_str());
if let Some(etag) = etag {
request = request.header(
header::IF_NONE_MATCH, etag.as_ref()
);
}
if let Some(last_modified) = last_modified {
request = request.header(
header::IF_MODIFIED_SINCE,
format_http_date(last_modified)
);
}
self._response(uri, request, multi)
}
/// Creates a response from a request builder.
fn _response(
&self,
uri: &uri::Https,
mut request: RequestBuilder,
multi: bool
) -> Result<HttpResponse, reqwest::Error> {
if let Some(timeout) = self.timeout {
request = request.timeout(timeout);
}
request.send().and_then(|response| {
response.error_for_status()
}).map(|response| {
HttpResponse::create(response, uri, &self.response_dir, multi)
})
}
/*
/// Requests, parses, and returns the given RRDP notification file.
///
/// The value referred to by `status` will be updated to the received
/// status code or `HttpStatus::Error` if the request failed.
///
/// Returns the notification file on success.
pub fn notification_file(
&self,
uri: &uri::Https,
state: Option<&RepositoryState>,
status: &mut HttpStatus,
) -> Result<Option<Notification>, Failed> {
let mut request = self.client().get(uri.as_str());
if let Some(state) = state {
if let Some(etag) = state.etag.as_ref() {
request = request.header(
header::IF_NONE_MATCH, etag.as_ref()
);
}
if let Some(ts) = state.last_modified_ts {
if let Some(datetime) = Utc.timestamp_opt(ts, 0).single() {
request = request.header(
header::IF_MODIFIED_SINCE,
format_http_date(datetime)
);
}
}
}
let response = match self._response(uri, request, true) {
Ok(response) => {
*status = response.status().into();
response
}
Err(err) => {
warn!("RRDP {}: {}", uri, err);
*status = HttpStatus::Error;
return Err(Failed)
}
};
if response.status() == StatusCode::NOT_MODIFIED {
Ok(None)
}
else if response.status() != StatusCode::OK {
warn!(
"RRDP {}: Getting notification file failed with status {}",
uri, response.status()
);
Err(Failed)
}
else {
Notification::from_response(uri, response).map(Some)
}
}
*/
}
//------------ HttpResponse --------------------------------------------------
/// Wraps a reqwest response for added features.
pub struct HttpResponse {
/// The wrapped reqwest response.
response: Response,
/// A file to also store read data into.
file: Option<fs::File>,
}
impl HttpResponse {
/// Creates a new response wrapping a reqwest reponse.
///
/// If `response_dir` is some path, the response will also be written to
/// a file under this directory based on `uri`. Each URI component
/// starting with the authority will be a directory name. If `multi` is
/// `false` the last component will be the file name. If `multi` is
/// `true` the last component will be a directory, too, and the file name
/// will be the ISO timestamp of the current time.
pub fn create(
response: Response,
uri: &uri::Https,
response_dir: &Option<PathBuf>,
multi: bool
) -> Self {
HttpResponse {
response,
file: response_dir.as_ref().and_then(|base| {
Self::open_file(base, uri, multi)
})
}
}
/// Opens the file mirroring file.
///
/// See [`create`][Self::create] for the rules.
fn open_file(
base: &Path, uri: &uri::Https, multi: bool
) -> Option<fs::File> {
let path = base.join(&uri.as_str()[8..]);
let path = if multi {
path.join(Utc::now().to_rfc3339())
}
else {
path
};
let parent = match path.parent() {
Some(parent) => parent,
None => {
warn!(
"Cannot keep HTTP response; \
URI translated into a bad path '{}'",
path.display()
);
return None
}
};
if let Err(err) = fs::create_dir_all(parent) {
warn!(
"Cannot keep HTTP response; \
creating directory {} failed: {}",
parent.display(), err
);
return None
}
match fs::File::create(&path) {
Ok(file) => Some(file),
Err(err) => {
warn!(
"Cannot keep HTTP response; \
creating file {} failed: {}",
path.display(), err
);
None
}
}
}
/// Returns the value of the content length header if present.
pub fn content_length(&self) -> Option<u64> {
self.response.content_length()
}
/// Copies the full content of the response to the given writer.
pub fn copy_to<W: io::Write + ?Sized>(
&mut self, w: &mut W
) -> Result<u64, io::Error> {
// We cannot use the reqwest responses `copy_to` impl because we need
// to use our own `io::Read` impl which sneaks in the copying to file
// if necessary.
io::copy(self, w)
}
/// Returns the status code of the response.
pub fn status(&self) -> StatusCode {
self.response.status()
}
/// Returns the value of the ETag header if present.
///
/// The returned value is the complete content. That is, it includes the
/// quotation marks and a possible `W/` prefix.
///
/// The method quietly returns `None` if the content of a header is
/// malformed or if there is more than one occurence of the header.
///
/// The method returns a `Bytes` value as there is a good chance the
/// tag is short enough to be be inlined.
pub fn etag(&self) -> Option<Bytes> {
let mut etags = self.response.headers()
.get_all(header::ETAG)
.into_iter();
let etag = etags.next()?;
if etags.next().is_some() {
return None
}
Self::parse_etag(etag.as_bytes())
}
/// Parses the ETag value.
///
/// This is a separate function to make testing easier.
fn parse_etag(etag: &[u8]) -> Option<Bytes> {
// The tag starts with an optional case-sensitive `W/` followed by
// `"`. Lets remember where the actual tag starts.
let start = if etag.starts_with(b"W/\"") {
3
}
else if etag.first() == Some(&b'"') {
1
}
else {
return None
};
// We need at least one more character. Empty tags are allowed.
if etag.len() <= start {
return None
}
// The tag ends with a `"`.
if etag.last() != Some(&b'"') {
return None
}
Some(Bytes::copy_from_slice(etag))
}
/// Returns the value of the Last-Modified header if present.
///
/// The method quietly returns `None` if the content of a header is
/// malformed or if there is more than one occurence of the header.
pub fn last_modified(&self) -> Option<DateTime<Utc>> {
let mut iter = self.response.headers()
.get_all(header::LAST_MODIFIED)
.into_iter();
let value = iter.next()?;
if iter.next().is_some() {
return None
}
parse_http_date(value.to_str().ok()?)
}
}
//--- Read
impl io::Read for HttpResponse {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
let res = self.response.read(buf)?;
if let Some(file) = self.file.as_mut() {
file.write_all(&buf[..res])?;
}
Ok(res)
}
}
//------------ HttpStatus ----------------------------------------------------
/// The result of an HTTP request.
#[derive(Clone, Copy, Debug)]
pub enum HttpStatus {
/// A response was received with the given status code.
Response(StatusCode),
/// The repository URI was rejected.
Rejected,
/// An error happened.
Error
}
impl HttpStatus {
pub fn into_i16(self) -> i16 {
match self {
HttpStatus::Response(code) => code.as_u16() as i16,
HttpStatus::Rejected => -2,
HttpStatus::Error => -1,
}
}
pub fn is_not_modified(self) -> bool {
matches!(
self,
HttpStatus::Response(code) if code == StatusCode::NOT_MODIFIED
)
}
pub fn is_success(self) -> bool {
matches!(
self,
HttpStatus::Response(code) if code.is_success()
)
}
}
impl From<StatusCode> for HttpStatus {
fn from(code: StatusCode) -> Self {
HttpStatus::Response(code)
}
}
+11
View File
@@ -0,0 +1,11 @@
#![allow(dead_code)]
pub use self::base::{Collector, LoadResult, ReadRepository, Run};
pub use self::http::HttpStatus;
pub use self::update::SnapshotReason;
mod archive;
mod base;
mod http;
mod update;
+871
View File
@@ -0,0 +1,871 @@
use std::{error, fmt, io};
use std::collections::HashSet;
use std::io::Read;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use log::{error, warn};
use reqwest::StatusCode;
use ring::digest;
use ring::constant_time::verify_slices_are_equal;
use rpki::{rrdp, uri};
use rpki::rrdp::{DeltaInfo, NotificationFile, ProcessDelta, ProcessSnapshot};
use uuid::Uuid;
use crate::error::{Failed, RunFailed};
use crate::metrics::RrdpRepositoryMetrics;
use crate::utils::archive::{ArchiveError, PublishError};
use super::archive::{AccessError, FallbackTime, RepositoryState, RrdpArchive};
use super::base::Collector;
use super::http::{HttpClient, HttpResponse, HttpStatus};
//------------ Notification --------------------------------------------------
/// The notification file of an RRDP repository.
pub struct Notification {
/// The URI of the notification file.
uri: uri::Https,
/// The content of the file.
content: NotificationFile,
/// The Etag value if provided.
etag: Option<Bytes>,
/// The Last-Modified value if provided,
last_modified: Option<DateTime<Utc>>,
}
impl Notification {
/// Requests, parses, and returns the given RRDP notification file.
///
/// The value referred to by `status` will be updated to the received
/// status code or `HttpStatus::Error` if the request failed.
///
/// Returns the notification file on success. Returns `Ok(None)` if a
/// response was received successfully but indicated that the
/// notification file was not updated.
pub fn get(
http: &HttpClient,
uri: &uri::Https,
state: Option<&RepositoryState>,
status: &mut HttpStatus,
) -> Result<Option<Self>, Failed> {
let response = match http.conditional_response(
uri,
state.and_then(|state| state.etag.as_ref()),
state.and_then(|state| state.last_modified()),
true
) {
Ok(response) => {
*status = response.status().into();
response
}
Err(err) => {
warn!("RRDP {}: {}", uri, err);
*status = HttpStatus::Error;
return Err(Failed)
}
};
if response.status() == StatusCode::NOT_MODIFIED {
Ok(None)
}
else if response.status() != StatusCode::OK {
warn!(
"RRDP {}: Getting notification file failed with status {}",
uri, response.status()
);
Err(Failed)
}
else {
Notification::from_response(uri.clone(), response).map(Some)
}
}
/// Creates a new notification from a successful HTTP response.
///
/// Assumes that the response status was 200 OK.
fn from_response(
uri: uri::Https, response: HttpResponse
) -> Result<Self, Failed> {
let etag = response.etag();
let last_modified = response.last_modified();
let mut content = NotificationFile::parse(
io::BufReader::new(response)
).map_err(|err| {
warn!("RRDP {}: {}", uri, err);
Failed
})?;
content.sort_deltas();
Ok(Notification { uri, content, etag, last_modified })
}
/// Returns a reference to the content of the notification file.
pub fn content(&self) -> &NotificationFile {
&self.content
}
/// Creates repository state for this notification.
pub fn to_repository_state(
&self, fallback: FallbackTime,
) -> RepositoryState {
RepositoryState {
rpki_notify: self.uri.clone(),
session: self.content.session_id(),
serial: self.content.serial(),
updated_ts: Utc::now().timestamp(),
best_before_ts: fallback.best_before().timestamp(),
last_modified_ts: self.last_modified.map(|x| x.timestamp()),
etag: self.etag.clone(),
}
}
}
//------------ SnapshotUpdate ------------------------------------------------
/// An update to a repository performed from a snapshot file.
///
/// For this type of update, we collect all the published objects in the
/// repositorys temp directory and move it over to the object directory upon
/// success.
pub struct SnapshotUpdate<'a> {
/// The collector.
collector: &'a Collector,
/// The archive to store the snapshot into.
archive: &'a mut RrdpArchive,
/// The notification file pointing to the snapshot.
notify: &'a Notification,
/// The metrics for the update.
metrics: &'a mut RrdpRepositoryMetrics,
}
impl<'a> SnapshotUpdate<'a> {
pub fn new(
collector: &'a Collector,
archive: &'a mut RrdpArchive,
notify: &'a Notification,
metrics: &'a mut RrdpRepositoryMetrics,
) -> Self {
SnapshotUpdate { collector, archive, notify, metrics }
}
pub fn try_update(mut self) -> Result<(), SnapshotError> {
let response = match self.collector.http().response(
self.notify.content.snapshot().uri(), false
) {
Ok(response) => {
self.metrics.payload_status = Some(response.status().into());
if response.status() != StatusCode::OK {
return Err(response.status().into())
}
else {
response
}
}
Err(err) => {
self.metrics.payload_status = Some(HttpStatus::Error);
return Err(err.into())
}
};
let mut reader = io::BufReader::new(HashRead::new(response));
self.process(&mut reader)?;
let hash = reader.into_inner().into_hash();
if verify_slices_are_equal(
hash.as_ref(),
self.notify.content.snapshot().hash().as_ref()
).is_err() {
return Err(SnapshotError::HashMismatch)
}
self.archive.publish_state(
&self.notify.to_repository_state(
self.collector.config().fallback_time
)
)?;
Ok(())
}
}
impl<'a> ProcessSnapshot for SnapshotUpdate<'a> {
type Err = SnapshotError;
fn meta(
&mut self,
session_id: Uuid,
serial: u64,
) -> Result<(), Self::Err> {
if session_id != self.notify.content.session_id() {
return Err(SnapshotError::SessionMismatch {
expected: self.notify.content.session_id(),
received: session_id
})
}
if serial != self.notify.content.serial() {
return Err(SnapshotError::SerialMismatch {
expected: self.notify.content.serial(),
received: serial
})
}
Ok(())
}
fn publish(
&mut self,
uri: uri::Rsync,
data: &mut rrdp::ObjectReader,
) -> Result<(), Self::Err> {
let content = RrdpDataRead::new(
data, &uri, self.collector.config().max_object_size,
).read_all()?;
self.archive.publish_object(&uri, &content).map_err(|err| match err {
PublishError::AlreadyExists => {
SnapshotError::DuplicateObject(uri.clone())
}
PublishError::Archive(ArchiveError::Corrupt) => {
warn!(
"Temporary RRDP repository file {} became corrupt.",
self.archive.path().display(),
);
SnapshotError::RunFailed(RunFailed::retry())
}
PublishError::Archive(ArchiveError::Io(err)) => {
error!(
"Fatal: Failed to write to temporary RRDP repository file \
{}: {}",
self.archive.path().display(), err,
);
SnapshotError::RunFailed(RunFailed::fatal())
}
})
}
}
//------------ DeltaUpdate ---------------------------------------------------
/// An update to a repository performed from a delta file.
///
/// For this kind of update, we collect newly published and updated objects in
/// the repositorys temp directory and remember them as well as all deleted
/// objects and if everything is okay, copy files over to and delete files in
/// the object directory.
pub struct DeltaUpdate<'a> {
/// The collector.
collector: &'a Collector,
/// The archive the repository is stored in.
archive: &'a mut RrdpArchive,
/// The session ID of the RRDP session.
session_id: Uuid,
/// Information about the delta file.
info: &'a DeltaInfo,
/// The metrics for the update.
metrics: &'a mut RrdpRepositoryMetrics,
/// The URIs weve already seen in this delta.
///
/// This is so we can error out if a URI was touched more than once.
seen: HashSet<uri::Rsync>,
}
impl<'a> DeltaUpdate<'a> {
/// Creates a new delta update.
pub fn new(
collector: &'a Collector,
archive: &'a mut RrdpArchive,
session_id: Uuid,
info: &'a DeltaInfo,
metrics: &'a mut RrdpRepositoryMetrics,
) -> Self {
DeltaUpdate {
collector, archive, session_id, info, metrics,
seen: Default::default(),
}
}
pub fn try_update(mut self) -> Result<(), DeltaError> {
let response = match self.collector.http().response(
self.info.uri(), false
) {
Ok(response) => {
self.metrics.payload_status = Some(response.status().into());
if response.status() != StatusCode::OK {
return Err(response.status().into())
}
else {
response
}
}
Err(err) => {
self.metrics.payload_status = Some(HttpStatus::Error);
return Err(err.into())
}
};
let mut reader = io::BufReader::new(HashRead::new(response));
self.process(&mut reader)?;
let hash = reader.into_inner().into_hash();
if verify_slices_are_equal(
hash.as_ref(),
self.info.hash().as_ref()
).is_err() {
return Err(DeltaError::DeltaHashMismatch)
}
Ok(())
}
}
impl<'a> ProcessDelta for DeltaUpdate<'a> {
type Err = DeltaError;
fn meta(
&mut self, session_id: Uuid, serial: u64
) -> Result<(), Self::Err> {
if session_id != self.session_id {
return Err(DeltaError::SessionMismatch {
expected: self.session_id,
received: session_id
})
}
if serial != self.info.serial() {
return Err(DeltaError::SerialMismatch {
expected: self.info.serial(),
received: serial
})
}
Ok(())
}
fn publish(
&mut self,
uri: uri::Rsync,
hash: Option<rrdp::Hash>,
data: &mut rrdp::ObjectReader<'_>
) -> Result<(), Self::Err> {
if !self.seen.insert(uri.clone()) {
return Err(DeltaError::ObjectRepeated { uri })
}
let content = RrdpDataRead::new(
data, &uri, self.collector.config().max_object_size
).read_all()?;
match hash {
Some(hash) => {
self.archive.update_object(
&uri, hash, &content
).map_err(|err| match err {
AccessError::NotFound => {
DeltaError::MissingObject { uri: uri.clone() }
}
AccessError::HashMismatch => {
DeltaError::ObjectHashMismatch { uri: uri.clone() }
}
AccessError::Archive(err) => DeltaError::Archive(err),
})
}
None => {
self.archive.publish_object(&uri, &content).map_err(|err| {
match err {
PublishError::AlreadyExists => {
DeltaError::ObjectAlreadyPresent {
uri: uri.clone()
}
}
PublishError::Archive(err) => {
DeltaError::Archive(err)
}
}
})
}
}
}
fn withdraw(
&mut self,
uri: uri::Rsync,
hash: rrdp::Hash
) -> Result<(), Self::Err> {
if !self.seen.insert(uri.clone()) {
return Err(DeltaError::ObjectRepeated { uri })
}
self.archive.delete_object(&uri, hash).map_err(|err| match err {
AccessError::NotFound => {
DeltaError::MissingObject { uri: uri.clone() }
}
AccessError::HashMismatch => {
DeltaError::ObjectHashMismatch { uri: uri.clone() }
}
AccessError::Archive(err) => DeltaError::Archive(err),
})
}
}
//------------ HashRead ------------------------------------------------------
/// A reader wrapper that calculates the SHA-256 hash of all read data.
struct HashRead<R> {
/// The wrapped reader.
reader: R,
/// The context for hash calculation.
context: digest::Context,
}
impl<R> HashRead<R> {
/// Creates a new hash reader.
pub fn new(reader: R) -> Self {
HashRead {
reader,
context: digest::Context::new(&digest::SHA256)
}
}
/// Converts the reader into the hash.
pub fn into_hash(self) -> rrdp::Hash {
// Unwrap should be safe: This can only fail if the slice has the
// wrong length.
rrdp::Hash::try_from(self.context.finish()).unwrap()
}
}
impl<R: io::Read> io::Read for HashRead<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
let res = self.reader.read(buf)?;
self.context.update(&buf[..res]);
Ok(res)
}
}
//------------ RrdpDataRead --------------------------------------------------
/// A reader that reads the data of objects in a snapshot or delta.
///
/// The type ensures the size limit of objects and allows treating read errors
/// differently than write errors by storing any error and making it available
/// after the fact.
struct RrdpDataRead<'a, R> {
/// The wrapped reader.
reader: R,
/// The URI of the object we are reading.
uri: &'a uri::Rsync,
/// The number of bytes left to read.
///
/// If this is `None` we are allowed to read an unlimited amount.
left: Option<u64>,
/// The last error that happend.
err: Option<RrdpDataReadError>,
}
impl<'a, R> RrdpDataRead<'a, R> {
/// Creates a new read from necessary information.
///
/// The returned value will wrap `reader`. The `uri` should be the rsync
/// URI of the published object. It is only used for generating meaningful
/// error messages. If `max_size` is some value, the size of the object
/// will be limited to that value in bytes. Larger objects lead to an
/// error.
pub fn new(reader: R, uri: &'a uri::Rsync, max_size: Option<u64>) -> Self {
RrdpDataRead { reader, uri, left: max_size, err: None }
}
/// Returns a stored error if available.
///
/// If it returns some error, that error happened during reading before
/// an `io::Error` was returned.
///
/// The method takes the stored error and replaces it internally with
/// `None`.
pub fn take_err(&mut self) -> Option<RrdpDataReadError> {
self.err.take()
}
}
impl<'a, R: io::Read> RrdpDataRead<'a, R> {
/// Reads the data into a vec.
pub fn read_all(mut self) -> Result<Vec<u8>, RrdpDataReadError> {
let mut content = Vec::new();
if let Err(io_err) = self.read_to_end(&mut content) {
return Err(
match self.take_err() {
Some(data_err) => data_err,
None => RrdpDataReadError::Read(io_err),
}
)
}
Ok(content)
}
}
impl<'a, R: io::Read> io::Read for RrdpDataRead<'a, R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
let res = match self.reader.read(buf) {
Ok(res) => res,
Err(err) => {
self.err = Some(RrdpDataReadError::Read(err));
return Err(io::Error::new(
io::ErrorKind::Other, "reading data failed",
))
}
};
if let Some(left) = self.left {
let res64 = match u64::try_from(res) {
Ok(res) => res,
Err(_) => {
// If the usize doesnt fit into a u64, things are
// definitely way too big.
self.left = Some(0);
self.err = Some(
RrdpDataReadError::LargeObject(self.uri.clone())
);
return Err(io::Error::new(
io::ErrorKind::Other, "size limit exceeded"
))
}
};
if res64 > left {
self.left = Some(0);
self.err = Some(
RrdpDataReadError::LargeObject(self.uri.clone())
);
Err(io::Error::new(
io::ErrorKind::Other, "size limit exceeded")
)
}
else {
self.left = Some(left - res64);
Ok(res)
}
}
else {
Ok(res)
}
}
}
//------------ SnapshotReason ------------------------------------------------
/// The reason why a snapshot was used.
#[derive(Clone, Copy, Debug)]
pub enum SnapshotReason {
/// The respository is new.
NewRepository,
/// A new session was encountered.
NewSession,
/// The delta set in the notification file is inconsistent.
BadDeltaSet,
/// A larger-than-supported serial number was encountered.
LargeSerial,
/// The local copy is outdated and cannot be updated via deltas.
OutdatedLocal,
/// A delta file was conflicting with locally stored data.
ConflictingDelta,
/// There were too many deltas to process.
TooManyDeltas,
/// The local copy was corrupt.
CorruptArchive,
}
impl SnapshotReason {
/// Returns a shorthand code for the reason.
pub fn code(self) -> &'static str {
use SnapshotReason::*;
match self {
NewRepository => "new-repository",
NewSession => "new-session",
BadDeltaSet => "inconsistent-delta-set",
LargeSerial => "large-serial",
OutdatedLocal => "outdate-local",
ConflictingDelta => "conflicting-delta",
TooManyDeltas => "too-many-deltas",
CorruptArchive => "corrupt-local-copy",
}
}
}
//============ Errors ========================================================
//------------ RrdpDataReadError ---------------------------------------------
/// An error happened while reading object data.
///
/// This covers both the case where the maximum allowed file size was
/// exhausted as well as where reading data failed. Neither of them is fatal,
/// so we need to process them separately.
#[derive(Debug)]
enum RrdpDataReadError {
LargeObject(uri::Rsync),
Read(io::Error),
}
//------------ SnapshotError -------------------------------------------------
/// An error happened during snapshot processing.
///
/// This is an internal error type only necessary for error handling during
/// RRDP processing. Values will be logged and converted into failures or
/// negative results as necessary.
#[derive(Debug)]
pub enum SnapshotError {
Http(reqwest::Error),
HttpStatus(StatusCode),
Rrdp(rrdp::ProcessError),
SessionMismatch {
expected: Uuid,
received: Uuid
},
SerialMismatch {
expected: u64,
received: u64,
},
DuplicateObject(uri::Rsync),
HashMismatch,
LargeObject(uri::Rsync),
RunFailed(RunFailed),
}
impl From<reqwest::Error> for SnapshotError {
fn from(err: reqwest::Error) -> Self {
SnapshotError::Http(err)
}
}
impl From<StatusCode> for SnapshotError {
fn from(code: StatusCode) -> Self {
SnapshotError::HttpStatus(code)
}
}
impl From<rrdp::ProcessError> for SnapshotError {
fn from(err: rrdp::ProcessError) -> Self {
SnapshotError::Rrdp(err)
}
}
impl From<io::Error> for SnapshotError {
fn from(err: io::Error) -> Self {
SnapshotError::Rrdp(err.into())
}
}
impl From<RunFailed> for SnapshotError {
fn from(err: RunFailed) -> Self {
SnapshotError::RunFailed(err)
}
}
impl From<RrdpDataReadError> for SnapshotError {
fn from(err: RrdpDataReadError) -> Self {
match err {
RrdpDataReadError::LargeObject(uri) => {
SnapshotError::LargeObject(uri)
}
RrdpDataReadError::Read(err) => {
SnapshotError::Rrdp(err.into())
}
}
}
}
impl fmt::Display for SnapshotError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
SnapshotError::Http(ref err) => err.fmt(f),
SnapshotError::HttpStatus(status) => {
write!(f, "HTTP {}", status)
}
SnapshotError::Rrdp(ref err) => err.fmt(f),
SnapshotError::SessionMismatch { ref expected, ref received } => {
write!(
f,
"session ID mismatch (notification_file: {}, \
snapshot file: {}",
expected, received
)
}
SnapshotError::SerialMismatch { ref expected, ref received } => {
write!(
f,
"serial number mismatch (notification_file: {}, \
snapshot file: {}",
expected, received
)
}
SnapshotError::DuplicateObject(ref uri) => {
write!(f, "duplicate object: {}", uri)
}
SnapshotError::HashMismatch => {
write!(f, "hash value mismatch")
}
SnapshotError::LargeObject(ref uri) => {
write!(f, "object exceeds size limit: {}", uri)
}
SnapshotError::RunFailed(_) => Ok(()),
}
}
}
impl error::Error for SnapshotError { }
//------------ DeltaError ----------------------------------------------------
/// An error happened during delta processing.
///
/// This is an internal error type only necessary for error handling during
/// RRDP processing. Values will be logged and converted into failures or
/// negative results as necessary.
#[derive(Debug)]
pub enum DeltaError {
Http(reqwest::Error),
HttpStatus(StatusCode),
Rrdp(rrdp::ProcessError),
SessionMismatch {
expected: Uuid,
received: Uuid
},
SerialMismatch {
expected: u64,
received: u64,
},
MissingObject {
uri: uri::Rsync,
},
ObjectAlreadyPresent {
uri: uri::Rsync,
},
ObjectHashMismatch {
uri: uri::Rsync,
},
ObjectRepeated {
uri: uri::Rsync,
},
DeltaHashMismatch,
LargeObject(uri::Rsync),
Archive(ArchiveError),
}
impl From<reqwest::Error> for DeltaError {
fn from(err: reqwest::Error) -> Self {
DeltaError::Http(err)
}
}
impl From<StatusCode> for DeltaError {
fn from(code: StatusCode) -> Self {
DeltaError::HttpStatus(code)
}
}
impl From<rrdp::ProcessError> for DeltaError {
fn from(err: rrdp::ProcessError) -> Self {
DeltaError::Rrdp(err)
}
}
impl From<io::Error> for DeltaError {
fn from(err: io::Error) -> Self {
DeltaError::Rrdp(err.into())
}
}
impl From<RrdpDataReadError> for DeltaError {
fn from(err: RrdpDataReadError) -> Self {
match err {
RrdpDataReadError::LargeObject(uri) => {
DeltaError::LargeObject(uri)
}
RrdpDataReadError::Read(err) => {
DeltaError::Rrdp(err.into())
}
}
}
}
impl fmt::Display for DeltaError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
DeltaError::Http(ref err) => err.fmt(f),
DeltaError::HttpStatus(status) => {
write!(f, "HTTP {}", status)
}
DeltaError::Rrdp(ref err) => err.fmt(f),
DeltaError::SessionMismatch { ref expected, ref received } => {
write!(
f,
"session ID mismatch (notification_file: {}, \
snapshot file: {}",
expected, received
)
}
DeltaError::SerialMismatch { ref expected, ref received } => {
write!(
f,
"serial number mismatch (notification_file: {}, \
snapshot file: {}",
expected, received
)
}
DeltaError::MissingObject { ref uri } => {
write!(
f,
"reference to missing object {}",
uri
)
}
DeltaError::ObjectAlreadyPresent { ref uri } => {
write!(
f,
"attempt to add already present object {}",
uri
)
}
DeltaError::ObjectHashMismatch { ref uri } => {
write!(
f,
"local object {} has different hash",
uri
)
}
DeltaError::ObjectRepeated { ref uri } => {
write!(f, "object appears multiple times: {}", uri)
}
DeltaError::LargeObject(ref uri) => {
write!(f, "object exceeds size limit: {}", uri)
}
DeltaError::DeltaHashMismatch => {
write!(f, "delta file hash value mismatch")
}
DeltaError::Archive(ref err) => {
write!(f, "archive error: {}", err)
}
}
}
}
impl error::Error for DeltaError { }
+8 -1
View File
@@ -28,7 +28,7 @@ use log::{debug, error, info, warn};
use rpki::uri;
use tokio::process::Command as AsyncCommand;
use crate::config::Config;
use crate::error::Failed;
use crate::error::{Failed, Fatal};
use crate::metrics::{Metrics, RsyncModuleMetrics};
use crate::utils::fatal;
use crate::utils::sync::{Mutex, RwLock};
@@ -114,6 +114,13 @@ impl Collector {
Ok(())
}
/// Sanitizes the stored data.
///
/// Currently doesnt do anything.
pub fn sanitize(&self) -> Result<(), Fatal> {
Ok(())
}
/// Start a validation run on the collector.
pub fn start(&self) -> Run {
Run::new(self)
+54 -25
View File
@@ -43,7 +43,7 @@ use rpki::uri;
use crate::{collector, store, tals};
use crate::config::{Config, FilterPolicy};
use crate::collector::Collector;
use crate::error::Failed;
use crate::error::{Failed, Fatal, RunFailed};
use crate::metrics::{
Metrics, PublicationMetrics, RepositoryMetrics, TalMetrics
};
@@ -276,6 +276,18 @@ impl Engine {
Ok(())
}
/// Sanitizes the stored data.
///
/// This goes over the stored data and deletes what looks broken. It
/// should be called before retrying a failed restartable run.
pub fn sanitize(&self) -> Result<(), Fatal> {
self.store.sanitize()?;
if let Some(collector) = self.collector.as_ref() {
collector.sanitize()?;
}
Ok(())
}
/// Starts a validation run.
///
/// During the run, `processor` will be responsible for dealing with
@@ -329,6 +341,12 @@ pub struct Run<'a, P> {
/// The processor for valid data.
processor: P,
/// Was an error encountered during the run?
had_err: AtomicBool,
/// Was a fatal error encountered during the run?
is_fatal: AtomicBool,
/// The metrics collected during the run.
metrics: Metrics,
}
@@ -343,6 +361,8 @@ impl<'a, P> Run<'a, P> {
) -> Self {
Run {
validation, collector, store, processor,
had_err: AtomicBool::new(false),
is_fatal: AtomicBool::new(false),
metrics: Default::default()
}
}
@@ -378,7 +398,7 @@ impl<'a, P> Run<'a, P> {
impl<'a, P: ProcessRun> Run<'a, P> {
/// Performs the validation run.
pub fn process(&mut self) -> Result<(), Failed> {
pub fn process(&mut self) -> Result<(), RunFailed> {
// If we dont have any TALs, we aint got nothing to do.
if self.validation.tals.is_empty() {
return Ok(())
@@ -395,7 +415,6 @@ impl<'a, P: ProcessRun> Run<'a, P> {
// And off we trot.
// Keep a flag to cancel everything if something goes wrong.
let had_err = AtomicBool::new(false);
let thread_metrics = ArrayQueue::new(
self.validation.validation_threads
);
@@ -405,7 +424,7 @@ impl<'a, P: ProcessRun> Run<'a, P> {
let mut metrics = metrics.fork();
while let Some(task) = tasks.pop() {
if self.process_task(
task, &tasks, &mut metrics, &had_err,
task, &tasks, &mut metrics,
).is_err() {
break;
}
@@ -415,8 +434,13 @@ impl<'a, P: ProcessRun> Run<'a, P> {
}
});
if had_err.load(Ordering::Relaxed) {
return Err(Failed);
if self.had_err.load(Ordering::Relaxed) {
if self.is_fatal.load(Ordering::Relaxed) {
return Err(RunFailed::fatal())
}
else {
return Err(RunFailed::retry())
}
}
metrics.prepare_final(&mut self.metrics);
@@ -433,14 +457,13 @@ impl<'a, P: ProcessRun> Run<'a, P> {
task: Task<P::PubPoint>,
tasks: &SegQueue<Task<P::PubPoint>>,
metrics: &mut RunMetrics,
had_err: &AtomicBool,
) -> Result<(), Failed> {
match task {
Task::Tal(task) => {
self.process_tal_task(task, tasks, metrics, had_err)
self.process_tal_task(task, tasks, metrics)
}
Task::Ca(task) => {
self.process_ca_task(task, tasks, metrics, had_err)
self.process_ca_task(task, tasks, metrics)
}
}
}
@@ -450,7 +473,6 @@ impl<'a, P: ProcessRun> Run<'a, P> {
&self, task: TalTask,
tasks: &SegQueue<Task<P::PubPoint>>,
metrics: &mut RunMetrics,
had_err: &AtomicBool,
) -> Result<(), Failed> {
for uri in task.tal.uris() {
let cert = match self.load_ta(uri, task.tal.info())? {
@@ -489,7 +511,7 @@ impl<'a, P: ProcessRun> Run<'a, P> {
repository_index: None,
defer: false,
},
tasks, metrics, had_err
tasks, metrics,
)
}
None => {
@@ -533,29 +555,36 @@ impl<'a, P: ProcessRun> Run<'a, P> {
task: CaTask<P::PubPoint>,
tasks: &SegQueue<Task<P::PubPoint>>,
metrics: &mut RunMetrics,
had_err: &AtomicBool,
) -> Result<(), Failed> {
let more_tasks = PubPoint::new(
self, &task.cert, task.processor, task.repository_index,
).and_then(|point| {
point.process(metrics)
}).map_err(|_| {
had_err.store(true, Ordering::Relaxed);
}).map_err(|err| {
self.run_failed(err);
Failed
})?;
for task in more_tasks {
if had_err.load(Ordering::Relaxed) {
if self.had_err.load(Ordering::Relaxed) {
return Err(Failed)
}
if task.defer {
tasks.push(Task::Ca(task))
}
else {
self.process_ca_task(task, tasks, metrics, had_err)?;
self.process_ca_task(task, tasks, metrics)?;
}
}
Ok(())
}
/// Marks the run as failed.
fn run_failed(&self, err: RunFailed) {
self.had_err.store(true, Ordering::Relaxed);
if err.is_fatal() {
self.is_fatal.store(true, Ordering::Relaxed);
}
}
}
@@ -591,7 +620,7 @@ impl<'a, P: ProcessRun> PubPoint<'a, P> {
cert: &'a Arc<CaCert>,
processor: P::PubPoint,
repository_index: Option<usize>,
) -> Result<Self, Failed> {
) -> Result<Self, RunFailed> {
Ok(PubPoint {
run, cert, processor, repository_index,
metrics: Default::default(),
@@ -605,7 +634,7 @@ impl<'a, P: ProcessRun> PubPoint<'a, P> {
pub fn process(
self,
metrics: &mut RunMetrics,
) -> Result<Vec<CaTask<P::PubPoint>>, Failed> {
) -> Result<Vec<CaTask<P::PubPoint>>, RunFailed> {
let mut store = self.run.store.pub_point(self.cert)?;
if let Some(collector) = self.run.collector.as_ref() {
if let Some(collector) = collector.repository(self.cert)? {
@@ -615,12 +644,12 @@ impl<'a, P: ProcessRun> PubPoint<'a, P> {
Ok(res) => return Ok(res),
Err(mut this) => {
this.metrics = Default::default();
return this.process_stored(store, metrics)
return Ok(this.process_stored(store, metrics)?)
}
}
}
}
self.process_stored(store, metrics)
Ok(self.process_stored(store, metrics)?)
}
/// Tries to update the stored data and validate at the same time.
@@ -640,7 +669,7 @@ impl<'a, P: ProcessRun> PubPoint<'a, P> {
collector: collector::Repository,
store: &mut StoredPoint,
metrics: &mut RunMetrics,
) -> Result<Result<Vec<CaTask<P::PubPoint>>, Self>, Failed> {
) -> Result<Result<Vec<CaTask<P::PubPoint>>, Self>, RunFailed> {
// Try to load the manifest from the collector. If there isnt one,
// we are done, too.
let collected = match collector.load_object(
@@ -767,9 +796,9 @@ impl<'a, P: ProcessRun> PubPoint<'a, P> {
// Update was aborted. We need to use the store.
Ok(Err(self))
}
Err(store::UpdateError::Fatal) => {
Err(store::UpdateError::Failed(err)) => {
// We are doomed.
Err(Failed)
Err(err)
}
}
}
@@ -784,7 +813,7 @@ impl<'a, P: ProcessRun> PubPoint<'a, P> {
&mut self,
manifest_bytes: Bytes,
repository: &collector::Repository,
) -> Result<Option<ValidPointManifest>, Failed> {
) -> Result<Option<ValidPointManifest>, RunFailed> {
let manifest = match Manifest::decode(
manifest_bytes.clone(), self.run.validation.strict
) {
@@ -854,7 +883,7 @@ impl<'a, P: ProcessRun> PubPoint<'a, P> {
ee_cert: &ResourceCert,
manifest: &ManifestContent,
repository: &collector::Repository
) -> Result<Option<(uri::Rsync, Crl, Bytes)>, Failed> {
) -> Result<Option<(uri::Rsync, Crl, Bytes)>, RunFailed> {
// Lets first get the manifest CRLs name relative to repo_uri. If
// it aint relative at all, this is already invalid.
let crl_uri = match ee_cert.crl_uri() {
+89
View File
@@ -24,6 +24,81 @@ use log::error;
#[derive(Clone, Copy, Debug)]
pub struct Failed;
impl From<Fatal> for Failed {
fn from(_: Fatal) -> Failed {
Failed
}
}
//------------ RunFailed -----------------------------------------------------
/// A validation run has failed to complete.
///
/// This error may be recoverable, which typically happens after some local
/// data corruption has been discovered and the offending was data removed. A
/// new validation run should then be started immediately to hopefully lead
/// to a success.
///
/// The error may also be fatal in which Routinator should just exit.
#[derive(Clone, Copy, Debug)]
pub struct RunFailed {
/// Was the error fatal?
fatal: bool,
}
impl RunFailed {
/// Create a new fatal run failure.
pub fn fatal() -> Self {
RunFailed { fatal: true }
}
/// Create a new “retry” run failure.
pub fn retry() -> Self {
RunFailed { fatal: false }
}
/// Returns whether the error is fatal.
pub fn is_fatal(self) -> bool {
self.fatal
}
/// Returns whether the run should be retried.
pub fn should_retry(self) -> bool {
!self.fatal
}
}
impl From<Fatal> for RunFailed {
fn from(_: Fatal) -> Self {
RunFailed::fatal()
}
}
impl From<Failed> for RunFailed {
fn from(_: Failed) -> Self {
RunFailed::fatal()
}
}
//------------ Fatal ---------------------------------------------------------
/// An operation has failed and continuing is pointless.
///
/// This error types is used to indicate that an operation has failed,
/// diagnostic information has been printed or logged, and continuing is
/// pointless or even dangerous.
#[derive(Clone, Copy, Debug)]
pub struct Fatal;
// XXX This shouldnt be here.
impl From<Failed> for Fatal {
fn from(_: Failed) -> Self {
Self
}
}
//------------ ExitError -----------------------------------------------------
@@ -53,3 +128,17 @@ impl From<Failed> for ExitError {
}
}
impl From<RunFailed> for ExitError {
fn from(_: RunFailed) -> ExitError {
error!("Fatal error. Exiting.");
ExitError::Generic
}
}
impl From<Fatal> for ExitError {
fn from(_: Fatal) -> ExitError {
error!("Fatal error. Exiting.");
ExitError::Generic
}
}
+57 -9
View File
@@ -31,7 +31,7 @@ use tokio::sync::oneshot;
#[cfg(feature = "rta")] use crate::rta;
use crate::{output, validity};
use crate::config::Config;
use crate::error::{ExitError, Failed};
use crate::error::{ExitError, Failed, RunFailed};
use crate::http::http_listener;
use crate::metrics::{SharedRtrServerMetrics};
use crate::output::{Output, OutputFormat};
@@ -249,6 +249,7 @@ impl Server {
validation.ignite()?;
let join = thread::spawn(move || {
let mut can_retry = true;
let err = loop {
if let Some(log) = log.as_ref() {
log.start();
@@ -257,13 +258,39 @@ impl Server {
process.config(), true
) {
Ok(exceptions) => {
if Self::process_once(
match Self::process_once(
process.config(), &validation, &history,
&mut notify, exceptions,
).is_err() {
break Err(Failed);
) {
Ok(()) => {
history.read().refresh_wait()
}
Err(err) => {
if err.should_retry() {
if can_retry {
if validation.sanitize().is_err() {
break Err(Failed)
}
info!(
"Validation failed but \
can be retried."
);
can_retry = false;
Duration::from_secs(0)
}
else {
error!(
"Retried validation failed again."
);
break Err(Failed);
}
}
else {
break Err(Failed);
}
}
}
history.read().refresh_wait()
}
Err(_) => {
error!(
@@ -359,7 +386,7 @@ impl Server {
history: &SharedHistory,
notify: &mut NotifySender,
exceptions: LocalExceptions,
) -> Result<(), Failed> {
) -> Result<(), RunFailed> {
info!("Starting a validation run.");
history.mark_update_start();
let (report, metrics) = ValidationReport::process(engine, config)?;
@@ -550,9 +577,30 @@ impl Vrps {
engine.ignite()?;
process.switch_logging(false, false)?;
let exceptions = LocalExceptions::load(process.config(), true)?;
let (report, mut metrics) = ValidationReport::process(
&engine, process.config(),
)?;
let (report, mut metrics) = {
// Retry once if we get a non-fatal error.
let mut once = false;
loop {
match ValidationReport::process(&engine, process.config()) {
Ok(res) => break res,
Err(err) => {
if err.should_retry() {
if once {
error!(
"Restarted run failed again. Aborting."
);
}
if engine.sanitize().is_ok() {
once = true;
continue
}
}
return Err(ExitError::Generic)
}
}
}
};
let vrps = Arc::new(report.into_snapshot(&exceptions, &mut metrics));
let rsync_complete = metrics.rsync_complete();
let metrics = Arc::new(metrics);
+2 -2
View File
@@ -34,7 +34,7 @@ use rpki::rtr::payload::{Aspa, RouteOrigin, RouterKey};
use rpki::rtr::pdu::{ProviderAsns, RouterKeyInfo};
use crate::config::{Config, FilterPolicy};
use crate::engine::{CaCert, Engine, ProcessPubPoint, ProcessRun};
use crate::error::Failed;
use crate::error::{Failed, RunFailed};
use crate::metrics::{Metrics, PayloadMetrics, VrpMetrics};
use crate::slurm::LocalExceptions;
use super::info::{PayloadInfo, PublishInfo};
@@ -95,7 +95,7 @@ impl ValidationReport {
/// Creates a new validation report by running the engine.
pub fn process(
engine: &Engine, config: &Config,
) -> Result<(Self, Metrics), Failed> {
) -> Result<(Self, Metrics), RunFailed> {
let report = Self::new(config);
let mut run = engine.start(&report)?;
run.process()?;
+2 -2
View File
@@ -8,7 +8,7 @@ use rpki::repository::rta::{ResourceTaggedAttestation, Rta};
use rpki::repository::tal::{Tal, TalUri};
use crate::config::Config;
use crate::engine::{CaCert, ProcessPubPoint, ProcessRun, Engine};
use crate::error::Failed;
use crate::error::{Failed, RunFailed};
//------------ ValidationReport ----------------------------------------------
@@ -35,7 +35,7 @@ impl<'a> ValidationReport<'a> {
pub fn process(
&self,
engine: &Engine,
) -> Result<(), Failed> {
) -> Result<(), RunFailed> {
let mut run = engine.start(self)?;
run.process()?;
run.cleanup()?;
+29 -8
View File
@@ -79,7 +79,7 @@ use rpki::uri;
use crate::collector;
use crate::config::Config;
use crate::engine::CaCert;
use crate::error::Failed;
use crate::error::{Failed, Fatal, RunFailed};
use crate::metrics::Metrics;
use crate::utils::fatal;
use crate::utils::binio::{Compose, Parse, ParseError};
@@ -146,6 +146,13 @@ impl Store {
})
}
/// Sanitizes the stored data.
///
/// Currently doesnt do anything.
pub fn sanitize(&self) -> Result<(), Fatal> {
Ok(())
}
/// Start a validation run with the store.
pub fn start(&self) -> Run {
Run::new(self)
@@ -761,7 +768,7 @@ impl<'a> StoredPoint<'a> {
"Fatal: failed to write to file {}: {}",
tmp_path.display(), err
);
return Err(UpdateError::Fatal)
return Err(UpdateError::fatal())
}
let tmp_object_start = match tmp_file.stream_position() {
Ok(some) => some,
@@ -770,7 +777,7 @@ impl<'a> StoredPoint<'a> {
"Fatal: failed to get position in file {}: {}",
tmp_path.display(), err
);
return Err(UpdateError::Fatal)
return Err(UpdateError::fatal())
}
};
@@ -782,7 +789,7 @@ impl<'a> StoredPoint<'a> {
"Fatal: failed to write to file {}: {}",
tmp_path.display(), err
);
return Err(UpdateError::Fatal)
return Err(UpdateError::fatal())
}
}
Ok(None) => break,
@@ -811,7 +818,7 @@ impl<'a> StoredPoint<'a> {
"Fatal: failed to position file {}: {}",
self.path.display(), err
);
return Err(UpdateError::Fatal)
return Err(UpdateError::fatal())
}
self.file = Some(file);
@@ -1117,19 +1124,33 @@ impl StoredObject {
//============ Error Types ===================================================
//------------ UpdateError ---------------------------------------------------
/// An error happend while updating a publication point.
#[derive(Clone, Copy, Debug)]
pub enum UpdateError {
/// The update needs to be aborted and rolled back.
Abort,
/// Something really bad and fatal happened.
Fatal,
/// Something really bad happened that requires aborting the run.
Failed(RunFailed),
}
impl UpdateError {
pub fn fatal() -> Self {
UpdateError::Failed(RunFailed::fatal())
}
}
impl From<Failed> for UpdateError {
fn from(_: Failed) -> Self {
UpdateError::Fatal
UpdateError::Failed(RunFailed::fatal())
}
}
impl From<RunFailed> for UpdateError {
fn from(err: RunFailed) -> Self {
UpdateError::Failed(err)
}
}
+1825
View File
File diff suppressed because it is too large Load Diff
+1
View File
@@ -1,5 +1,6 @@
//! Various useful things.
pub mod archive;
pub mod binio;
pub mod date;
pub mod dump;
-1
View File
@@ -2,7 +2,6 @@
use std::sync::{Mutex as StdMutex, RwLock as StdRwLock};
pub use std::sync::{MutexGuard, RwLockReadGuard, RwLockWriteGuard};