Skip to content

Commit

Permalink
fix: use store version on metadata stream filter (#3664)
Browse files Browse the repository at this point in the history
The metadata watch stream was mistakenly filtering by a spec version when it should filter by store version.
  • Loading branch information
Alexander Galibey committed Nov 3, 2023
1 parent 4d4151a commit 4d6b2fe
Showing 1 changed file with 37 additions and 17 deletions.
54 changes: 37 additions & 17 deletions crates/fluvio-stream-dispatcher/src/metadata/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ impl MetadataClient<LocalMetadataItem> for LocalMetadataStorage {
where
S: K8ExtendedSpec,
{
trace!(label = S::LABEL, ?resource_version, "watch stream");
futures_util::stream::once(self.get_store::<S>())
.flat_map(move |store| match store {
Ok(store) => store.watch_stream_since(resource_version.as_ref()),
Expand Down Expand Up @@ -268,9 +269,11 @@ struct SpecStore {
struct SpecPointer {
inner: Arc<dyn Any + Send + Sync>,
revision: u64,
store_revision: u64,
path: PathBuf,
}

#[derive(Debug)]
enum SpecUpdate {
Mod(SpecPointer),
Delete(SpecPointer),
Expand Down Expand Up @@ -433,11 +436,7 @@ impl SpecStore {
if let Some(removed) = write.remove(metadata.uid()) {
removed.delete();
drop(write);
if let Err(err) = self.sender.send(SpecUpdate::Delete(removed)).await {
warn!("store sender failed: {err}");
}
self.version
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
self.send_update(SpecUpdate::Delete(removed)).await;
}
}

Expand All @@ -460,27 +459,23 @@ impl SpecStore {
write.insert(id, pointer.clone());
pointer.flush::<S>()?;
drop(write);
if let Err(err) = self.sender.send(SpecUpdate::Mod(pointer)).await {
warn!("store sender failed: {err}");
}
self.version
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
self.send_update(SpecUpdate::Mod(pointer)).await;
Ok(())
}

fn watch_stream_since<'a, S>(
&self,
resource_version: Option<&String>,
resources_version: Option<&String>,
) -> BoxStream<'a, Result<Vec<LSUpdate<S, LocalMetadataItem>>>>
where
S: Spec,
{
match resource_version.map(|rv| rv.parse::<u64>()) {
match resources_version.map(|rv| rv.parse::<u64>()) {
Some(Ok(version)) => self
.receiver
.clone()
.filter(move |update| {
let res = update.revision() >= version;
let res = update.store_revision() >= version;
async move { res }
})
.map(|update| Ok(vec![update.into_ls_update()?]))
Expand Down Expand Up @@ -514,17 +509,30 @@ impl SpecStore {
anyhow::bail!("'{key}' not found");
}
}

async fn send_update(&self, mut update: SpecUpdate) {
let store_revision = self
.version
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
update.set_store_revision(store_revision);
trace!(?update, "spec update sending");
if let Err(err) = self.sender.send(update).await {
warn!("store sender failed: {err}");
}
}
}

impl SpecPointer {
fn new<S: Spec, P: AsRef<Path>>(path: P, obj: LocalStoreObject<S>) -> Self {
let revision = obj.ctx().item().revision;
let inner = Arc::new(obj);
let path = path.as_ref().to_path_buf();
let store_revision = Default::default();
Self {
inner,
path,
revision,
store_revision,
}
}

Expand Down Expand Up @@ -572,10 +580,17 @@ impl SpecUpdate {
})
}

fn revision(&self) -> u64 {
fn store_revision(&self) -> u64 {
match self {
SpecUpdate::Mod(p) => p.store_revision,
SpecUpdate::Delete(p) => p.store_revision,
}
}

fn set_store_revision(&mut self, store_revision: u64) {
match self {
SpecUpdate::Mod(p) => p.revision,
SpecUpdate::Delete(p) => p.revision,
SpecUpdate::Mod(p) => p.store_revision = store_revision,
SpecUpdate::Delete(p) => p.store_revision = store_revision,
}
}
}
Expand Down Expand Up @@ -1103,10 +1118,15 @@ spec:
let meta_store = LocalMetadataStorage::new(&meta_folder);
let mut obj = default_test_store_obj();
let stream =
meta_store.watch_stream_since::<TestSpec>(&NameSpace::All, Some("2".to_string()));
meta_store.watch_stream_since::<TestSpec>(&NameSpace::All, Some("4".to_string()));

//when
meta_store.apply(obj.clone()).await.expect("applied");
meta_store
.delete_item::<TestSpec>(obj.ctx_owned().item_owned())
.await
.expect("deleted");
meta_store.apply(obj.clone()).await.expect("applied");
meta_store
.update_status::<TestSpec>(
obj.ctx().item().clone(),
Expand Down

0 comments on commit 4d6b2fe

Please sign in to comment.