Skip to content

Commit

Permalink
fixup! storage: use a WriteHandle directly in DifferentialWriteTask
Browse files Browse the repository at this point in the history
  • Loading branch information
danhhz committed Aug 26, 2024
1 parent b74cea7 commit d2d2099
Showing 1 changed file with 30 additions and 26 deletions.
56 changes: 30 additions & 26 deletions src/storage-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1636,22 +1636,7 @@ where
&mut self,
id: GlobalId,
) -> Result<Vec<Row>, StorageError<Self::Timestamp>> {
let metadata = &self.storage_collections.collection_metadata(id)?;
let persist_client = self
.persist
.open(metadata.persist_location.clone())
.await
.unwrap();
let write_handle = self
.open_data_handles(
&id,
metadata.data_shard,
metadata.relation_desc.clone(),
&persist_client,
)
.await;
let upper = write_handle.shared_upper();

let upper = self.recent_upper(id).await?;
let res = match upper.as_option() {
Some(f) if f > &T::minimum() => {
let as_of = f.step_back().unwrap();
Expand Down Expand Up @@ -2780,6 +2765,33 @@ where
.map(|(id, e)| (*id, e))
}

async fn recent_upper(&self, id: GlobalId) -> Result<Antichain<T>, StorageError<T>> {
let metadata = &self.storage_collections.collection_metadata(id)?;
let persist_client = self
.persist
.open(metadata.persist_location.clone())
.await
.unwrap();
// Duplicate part of open_data_handles here because we don't need the
// fetch_recent_upper call. The pubsub-updated shared_upper is enough.
let diagnostics = Diagnostics {
shard_name: id.to_string(),
handle_purpose: format!("controller data for {}", id),
};
// NB: Opening a WriteHandle is cheap if it's never used in a
// compare_and_append operation.
let write = persist_client
.open_writer::<SourceData, (), T, Diff>(
metadata.data_shard,
Arc::new(metadata.relation_desc.clone()),
Arc::new(UnitSchema),
diagnostics.clone(),
)
.await
.expect("invalid persist usage");
Ok(write.shared_upper())
}

/// Opens a write and critical since handles for the given `shard`.
///
/// `since` is an optional `since` that the read handle will be forwarded to if it is less than
Expand Down Expand Up @@ -3413,11 +3425,6 @@ where
/// Use a `diff` of 1 to append a new entry; -1 to retract an existing
/// entry.
///
/// However, data is written iff we know of the `GlobalId` of the
/// `IntrospectionType::ShardMapping` collection; in other cases, data is
/// dropped on the floor. In these cases, the data is later written by
/// [`Self::initialize_shard_mapping`].
///
/// # Panics
/// - If `self.collections` does not have an entry for `global_id`.
/// - If `IntrospectionType::ShardMapping`'s `GlobalId` is not registered as
Expand All @@ -3430,15 +3437,12 @@ where
{
mz_ore::soft_assert_or_log!(diff == -1 || diff == 1, "use 1 for insert or -1 for delete");

let id = match self
let id = *self
.introspection_ids
.lock()
.expect("poisoned")
.get(&IntrospectionType::ShardMapping)
{
Some(id) => *id,
_ => return,
};
.expect("should be registered before this call");

let mut updates = vec![];
// Pack updates into rows
Expand Down

0 comments on commit d2d2099

Please sign in to comment.