Skip to content

Commit

Permalink
fixup! storage: use a WriteHandle directly in append_only_write_task
Browse files Browse the repository at this point in the history
  • Loading branch information
danhhz committed Aug 26, 2024
1 parent f184d02 commit b74cea7
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 35 deletions.
12 changes: 6 additions & 6 deletions src/storage-controller/src/collection_mgmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1054,16 +1054,16 @@ async fn monotonic_append<T: Timestamp + Lattice + Codec64 + TimestampManipulati
updates: Vec<TimestamplessUpdate>,
at_least: T,
) {
let mut current_upper = write_handle.shared_upper();
let mut expected_upper = write_handle.shared_upper();
loop {
if updates.is_empty() && current_upper.is_empty() {
if updates.is_empty() && expected_upper.is_empty() {
// Ignore timestamp advancement for
// closed collections. TODO? Make this a
// correctable error
return;
}

let upper = current_upper
let upper = expected_upper
.into_option()
.expect("cannot append data to closed collection");

Expand All @@ -1073,7 +1073,7 @@ async fn monotonic_append<T: Timestamp + Lattice + Codec64 + TimestampManipulati
upper.clone()
};

let upper = TimestampManipulation::step_forward(&lower);
let new_upper = TimestampManipulation::step_forward(&lower);
let updates = updates
.iter()
.map(|TimestamplessUpdate { row, diff }| {
Expand All @@ -1083,15 +1083,15 @@ async fn monotonic_append<T: Timestamp + Lattice + Codec64 + TimestampManipulati
let res = write_handle
.compare_and_append(
updates,
Antichain::from_elem(lower),
Antichain::from_elem(upper),
Antichain::from_elem(new_upper),
)
.await
.expect("valid usage");
match res {
Ok(()) => return,
Err(err) => {
current_upper = err.current;
expected_upper = err.current;
continue;
}
}
Expand Down
30 changes: 1 addition & 29 deletions src/storage-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3025,7 +3025,7 @@ where

match introspection_type {
IntrospectionType::ShardMapping => {
self.initialize_shard_mapping().await;
// Done by the `self.append_shard_mappings` call.
}
IntrospectionType::Frontiers | IntrospectionType::ReplicaFrontiers => {
// Differential collections start with an empty
Expand Down Expand Up @@ -3212,34 +3212,6 @@ where
.retain(|k, _| self.exports.contains_key(k));
}

/// Initializes the data expressing which global IDs correspond to which
/// shards. Necessary because we cannot write any of these mappings that we
/// discover before the shard mapping collection exists.
///
/// # Panics
/// - If `IntrospectionType::ShardMapping` is not associated with a
/// `GlobalId` in `self.introspection_ids`.
/// - If `IntrospectionType::ShardMapping`'s `GlobalId` is not registered as
/// a managed collection.
async fn initialize_shard_mapping(&mut self) {
let id =
self.introspection_ids.lock().expect("poisoned lock")[&IntrospectionType::ShardMapping];

let mut row_buf = Row::default();
let collection_metadatas = self.storage_collections.active_collection_metadatas();
let mut updates = Vec::with_capacity(collection_metadatas.len());
for (global_id, CollectionMetadata { data_shard, .. }) in collection_metadatas {
let mut packer = row_buf.packer();
packer.push(Datum::from(global_id.to_string().as_str()));
packer.push(Datum::from(data_shard.to_string().as_str()));
updates.push((row_buf.clone(), 1));
}

self.collection_manager
.differential_append(id, updates)
.await;
}

/// Effectively truncates the status history shard except for the most
/// recent updates from each ID.
///
Expand Down

0 comments on commit b74cea7

Please sign in to comment.