Skip to content

Commit

Permalink
fix(storage-manager): validate presence of timestamp (#1229)
Browse files Browse the repository at this point in the history
This commit introduces checks before accessing the `timestamp`
associated with a Sample — instead of calling `unwrap()`.

In theory, a Sample should never arrive to a Storage without a
Timestamp. In practice, we cannot guarantee this invariant with
certainty (future modifications of the code base?).
With these checks, the Storage will simply discard the Sample instead of
panicking the entire storage manager.

* plugins/zenoh-plugin-storage-manager/src/replica/storage.rs: add
  checks when accessing the timestamp and remove `unwrap`.

Signed-off-by: Julien Loudet <julien.loudet@zettascale.tech>
  • Loading branch information
J-Loudet authored Jul 8, 2024
1 parent 6df74c7 commit 12b11ee
Showing 1 changed file with 29 additions and 12 deletions.
41 changes: 29 additions & 12 deletions plugins/zenoh-plugin-storage-manager/src/replica/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,17 @@ impl StorageService {
// the trimming during PUT and GET should be handled by the plugin
async fn process_sample(&self, sample: Sample) {
tracing::trace!("[STORAGE] Processing sample: {:?}", sample);

// A Sample, in theory, will not arrive to a Storage without a Timestamp. This check (which, again, should
// never enter the `None` branch) ensures that the Storage Manager does not panic even if it ever happens.
let sample_timestamp = match sample.timestamp() {
Some(timestamp) => timestamp,
None => {
tracing::error!("Discarding Sample that has no Timestamp: {:?}", sample);
return;
}
};

// if wildcard, update wildcard_updates
if sample.key_expr().is_wild() {
self.register_wildcard_update(sample.clone()).await;
Expand All @@ -288,12 +299,10 @@ impl StorageService {
);

for k in matching_keys {
if !self
.is_deleted(&k.clone(), sample.timestamp().unwrap())
.await
if !self.is_deleted(&k.clone(), sample_timestamp).await
&& (self.capability.history.eq(&History::All)
|| (self.capability.history.eq(&History::Latest)
&& self.is_latest(&k, sample.timestamp().unwrap()).await))
&& self.is_latest(&k, sample_timestamp).await))
{
tracing::trace!(
"Sample `{:?}` identified as needed processing for key {}",
Expand All @@ -302,9 +311,8 @@ impl StorageService {
);
// there might be the case that the actual update was outdated due to a wild card update, but not stored yet in the storage.
// get the relevant wild card entry and use that value and timestamp to update the storage
let sample_to_store: Sample = if let Some(update) = self
.ovderriding_wild_update(&k, sample.timestamp().unwrap())
.await
let sample_to_store: Sample = if let Some(update) =
self.ovderriding_wild_update(&k, sample_timestamp).await
{
match update.kind {
SampleKind::Put => {
Expand All @@ -323,6 +331,16 @@ impl StorageService {
.into()
};

// A Sample that is to be stored **must** have a Timestamp. In theory, the Sample generated should have
// a Timestamp and, in theory, this check is unneeded.
let sample_to_store_timestamp = match sample_to_store.timestamp() {
Some(timestamp) => *timestamp,
None => {
tracing::error!("Discarding `Sample` generated through `SampleBuilder` that has no Timestamp: {:?}", sample_to_store);
continue;
}
};

let stripped_key = match self.strip_prefix(sample_to_store.key_expr()) {
Ok(stripped) => stripped,
Err(e) => {
Expand All @@ -340,16 +358,15 @@ impl StorageService {
sample_to_store.payload().clone(),
sample_to_store.encoding().clone(),
),
*sample_to_store.timestamp().unwrap(),
sample_to_store_timestamp,
)
.await
}
SampleKind::Delete => {
// register a tombstone
self.mark_tombstone(&k, *sample_to_store.timestamp().unwrap())
.await;
self.mark_tombstone(&k, sample_to_store_timestamp).await;
storage
.delete(stripped_key, *sample_to_store.timestamp().unwrap())
.delete(stripped_key, sample_to_store_timestamp)
.await
}
};
Expand All @@ -363,7 +380,7 @@ impl StorageService {
.as_ref()
.unwrap()
.log_propagation
.send((k.clone(), *sample_to_store.timestamp().unwrap()));
.send((k.clone(), sample_to_store_timestamp));
match sending {
Ok(_) => (),
Err(e) => {
Expand Down

0 comments on commit 12b11ee

Please sign in to comment.