From cd9ef343768b5d39e68039d8f6127e3850045fcd Mon Sep 17 00:00:00 2001 From: zhyass <34016424+zhyass@users.noreply.github.com> Date: Sun, 26 Nov 2023 01:08:15 +0800 Subject: [PATCH] add stream_count in snapshot --- src/query/ee/src/stream/handler.rs | 50 +++++++++++-- src/query/service/src/test_kits/utils.rs | 1 + .../tests/it/storages/fuse/meta/snapshot.rs | 4 +- .../it/storages/fuse/operations/commit.rs | 1 + .../mutation/block_compact_mutator.rs | 1 + .../operations/mutation/recluster_mutator.rs | 1 + .../common/table_meta/src/meta/v4/snapshot.rs | 18 +++++ .../common/table_meta/src/meta/v5/snapshot.rs | 18 +++++ src/query/storages/fuse/src/fuse_table.rs | 67 +++--------------- .../storages/fuse/src/io/write/meta_writer.rs | 2 + .../common/processors/sink_commit.rs | 37 ++++++---- .../operations/common/snapshot_generator.rs | 4 ++ .../storages/fuse/src/operations/truncate.rs | 1 + src/query/storages/stream/src/stream_table.rs | 70 +++++++++++-------- 14 files changed, 165 insertions(+), 110 deletions(-) diff --git a/src/query/ee/src/stream/handler.rs b/src/query/ee/src/stream/handler.rs index 610368fe622ba..7edf1f91e0063 100644 --- a/src/query/ee/src/stream/handler.rs +++ b/src/query/ee/src/stream/handler.rs @@ -38,6 +38,7 @@ use common_storages_stream::stream_table::OPT_KEY_TABLE_ID; use common_storages_stream::stream_table::OPT_KEY_TABLE_NAME; use common_storages_stream::stream_table::OPT_KEY_TABLE_VER; use common_storages_stream::stream_table::STREAM_ENGINE; +use storages_common_table_meta::meta::TableSnapshot; use storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION; use stream_handler::StreamHandler; use stream_handler::StreamHandlerWrapper; @@ -52,8 +53,10 @@ impl StreamHandler for RealStreamHandler { ctx: Arc, plan: &CreateStreamPlan, ) -> Result { - let table = ctx - .get_table(&plan.catalog, &plan.table_database, &plan.table_name) + let tenant = plan.tenant.as_str(); + let catalog = ctx.get_catalog(&plan.catalog).await?; + let table = catalog + .get_table(tenant, &plan.table_database, &plan.table_name) .await?; if !table.change_tracking_enabled() { return Err(ErrorCode::IllegalStream(format!( @@ -66,7 +69,45 @@ impl StreamHandler for RealStreamHandler { let table_info = fuse_table.get_table_info(); let table_version = table_info.ident.seq; let table_id = table_info.ident.table_id; - let schema = table_info.schema().clone(); + + match catalog + .get_table(tenant, &plan.database, &plan.stream_name) + .await + { + Ok(stream) => { + if plan.if_not_exists { + return Ok(CreateTableReply { + table_id: stream.get_id(), + new_table: false, + }); + } + return Err(ErrorCode::StreamAlreadyExists(format!( + "'{}.{}' as stream Already Exists", + plan.database, plan.stream_name + ))); + } + Err(e) => { + if e.code() != ErrorCode::UNKNOWN_TABLE { + return Err(e); + } + + // update table meta. + let previous = fuse_table.read_table_snapshot().await?; + let mut new_snapshot = + TableSnapshot::from_opt_previous(previous, &table_info.meta.schema); + new_snapshot.stream_count += 1; + FuseTable::commit_to_meta_server( + ctx.as_ref(), + table_info, + fuse_table.meta_location_generator(), + new_snapshot, + None, + &None, + fuse_table.get_operator_ref(), + ) + .await?; + } + } let mut options = BTreeMap::new(); match &plan.navigation { @@ -121,12 +162,11 @@ impl StreamHandler for RealStreamHandler { engine: STREAM_ENGINE.to_string(), options, comment: plan.comment.clone().unwrap_or("".to_string()), - schema, + schema: table_info.schema(), ..Default::default() }, }; - let catalog = ctx.get_catalog(&plan.catalog).await?; catalog.create_table(req).await } diff --git a/src/query/service/src/test_kits/utils.rs b/src/query/service/src/test_kits/utils.rs index 7bdcfde1d23f3..0eb257b580f06 100644 --- a/src/query/service/src/test_kits/utils.rs +++ b/src/query/service/src/test_kits/utils.rs @@ -196,6 +196,7 @@ pub async fn generate_snapshots(fixture: &TestFixture) -> Result<()> { &snapshot_0.timestamp, Some((snapshot_0.snapshot_id, TableSnapshotV2::VERSION)), schema.as_ref().clone(), + 0, Statistics::default(), locations, None, diff --git a/src/query/service/tests/it/storages/fuse/meta/snapshot.rs b/src/query/service/tests/it/storages/fuse/meta/snapshot.rs index 46f94418b49e9..229f3568c625c 100644 --- a/src/query/service/tests/it/storages/fuse/meta/snapshot.rs +++ b/src/query/service/tests/it/storages/fuse/meta/snapshot.rs @@ -26,7 +26,7 @@ fn default_snapshot() -> TableSnapshot { let uuid = Uuid::new_v4(); let schema = TableSchema::empty(); let stats = Default::default(); - TableSnapshot::new(uuid, &None, None, schema, stats, vec![], None, None) + TableSnapshot::new(uuid, &None, None, schema, 0, stats, vec![], None, None) } #[test] @@ -45,6 +45,7 @@ fn snapshot_timestamp_monotonic_increase() { &prev.timestamp, prev.prev_snapshot_id, schema, + 0, Default::default(), vec![], None, @@ -69,6 +70,7 @@ fn snapshot_timestamp_time_skew_tolerance() { &prev.timestamp, prev.prev_snapshot_id, schema, + 0, Default::default(), vec![], None, diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index 698be71b25c94..e8f0818bd3fe7 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -243,6 +243,7 @@ async fn test_commit_to_meta_server() -> Result<()> { &None, None, table.schema().as_ref().clone(), + 0, Statistics::default(), new_segments, None, diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs index 83d238fae79b2..3f9e10133cb0d 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs @@ -208,6 +208,7 @@ async fn test_safety() -> Result<()> { &None, None, schema.as_ref().clone(), + 0, summary, locations.clone(), None, diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs index a118ce2ddf993..819249253aa2a 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs @@ -221,6 +221,7 @@ async fn test_safety_for_recluster() -> Result<()> { &None, None, schema.as_ref().clone(), + 0, summary, locations.clone(), None, diff --git a/src/query/storages/common/table_meta/src/meta/v4/snapshot.rs b/src/query/storages/common/table_meta/src/meta/v4/snapshot.rs index ff1ea66eae7d4..1c33ee2bc83e1 100644 --- a/src/query/storages/common/table_meta/src/meta/v4/snapshot.rs +++ b/src/query/storages/common/table_meta/src/meta/v4/snapshot.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::io::Cursor; +use std::sync::Arc; use chrono::DateTime; use chrono::Utc; @@ -71,6 +72,10 @@ pub struct TableSnapshot { /// For each snapshot, we keep a schema for it (in case of schema evolution) pub schema: TableSchema, + /// The counter of streams ever created on the table, incremented when create stream. + #[serde(default)] + pub stream_count: u64, + /// Summary Statistics pub summary: Statistics, @@ -91,6 +96,7 @@ impl TableSnapshot { prev_timestamp: &Option>, prev_snapshot_id: Option<(SnapshotId, FormatVersion)>, schema: TableSchema, + stream_count: u64, summary: Statistics, segments: Vec, cluster_key_meta: Option, @@ -110,6 +116,7 @@ impl TableSnapshot { timestamp, prev_snapshot_id, schema, + stream_count, summary, segments, cluster_key_meta, @@ -123,6 +130,7 @@ impl TableSnapshot { &None, None, schema, + 0, Statistics::default(), vec![], None, @@ -139,6 +147,7 @@ impl TableSnapshot { &clone.timestamp, Some((clone.snapshot_id, clone.format_version)), clone.schema, + clone.stream_count, clone.summary, clone.segments, clone.cluster_key_meta, @@ -146,6 +155,13 @@ impl TableSnapshot { ) } + pub fn from_opt_previous(previous: Option>, schema: &TableSchema) -> Self { + match previous { + Some(prev) => Self::from_previous(&prev), + None => Self::new_empty_snapshot(schema.clone()), + } + } + /// Serializes the struct to a byte vector. /// /// The byte vector contains the format version, encoding, compression, and compressed data. The encoding @@ -217,6 +233,7 @@ impl From for TableSnapshot { timestamp: s.timestamp, prev_snapshot_id: s.prev_snapshot_id, schema: s.schema, + stream_count: 0, summary: s.summary, segments: s.segments, cluster_key_meta: s.cluster_key_meta, @@ -238,6 +255,7 @@ where T: Into timestamp: s.timestamp, prev_snapshot_id: s.prev_snapshot_id, schema: s.schema.into(), + stream_count: 0, summary: s.summary.into(), segments: s.segments, cluster_key_meta: s.cluster_key_meta, diff --git a/src/query/storages/common/table_meta/src/meta/v5/snapshot.rs b/src/query/storages/common/table_meta/src/meta/v5/snapshot.rs index fcadab0a0c985..ca53f116ca45d 100644 --- a/src/query/storages/common/table_meta/src/meta/v5/snapshot.rs +++ b/src/query/storages/common/table_meta/src/meta/v5/snapshot.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::io::Cursor; +use std::sync::Arc; use chrono::DateTime; use chrono::Utc; @@ -71,6 +72,9 @@ pub struct TableSnapshot { /// For each snapshot, we keep a schema for it (in case of schema evolution) pub schema: TableSchema, + /// The counter of streams ever created on the table, incremented when create stream. + pub stream_count: u64, + /// Summary Statistics pub summary: Statistics, @@ -91,6 +95,7 @@ impl TableSnapshot { prev_timestamp: &Option>, prev_snapshot_id: Option<(SnapshotId, FormatVersion)>, schema: TableSchema, + stream_count: u64, summary: Statistics, segments: Vec, cluster_key_meta: Option, @@ -110,6 +115,7 @@ impl TableSnapshot { timestamp, prev_snapshot_id, schema, + stream_count, summary, segments, cluster_key_meta, @@ -123,6 +129,7 @@ impl TableSnapshot { &None, None, schema, + 0, Statistics::default(), vec![], None, @@ -139,6 +146,7 @@ impl TableSnapshot { &clone.timestamp, Some((clone.snapshot_id, clone.format_version)), clone.schema, + clone.stream_count, clone.summary, clone.segments, clone.cluster_key_meta, @@ -146,6 +154,13 @@ impl TableSnapshot { ) } + pub fn from_opt_previous(previous: Option>, schema: &TableSchema) -> Self { + match previous { + Some(prev) => Self::from_previous(&prev), + None => Self::new_empty_snapshot(schema.clone()), + } + } + /// Serializes the struct to a byte vector. /// /// The byte vector contains the format version, encoding, compression, and compressed data. The encoding @@ -218,6 +233,7 @@ impl From for TableSnapshot { timestamp: s.timestamp, prev_snapshot_id: s.prev_snapshot_id, schema: s.schema, + stream_count: 0, summary: Statistics::from_v2(s.summary), segments: s.segments, cluster_key_meta: s.cluster_key_meta, @@ -239,6 +255,7 @@ where T: Into timestamp: s.timestamp, prev_snapshot_id: s.prev_snapshot_id, schema: s.schema.into(), + stream_count: 0, summary: s.summary.into(), segments: s.segments, cluster_key_meta: s.cluster_key_meta, @@ -257,6 +274,7 @@ impl From for TableSnapshot { timestamp: s.timestamp, prev_snapshot_id: s.prev_snapshot_id, schema: s.schema, + stream_count: s.stream_count, summary: Statistics::from_v2(s.summary), segments: s.segments, cluster_key_meta: s.cluster_key_meta, diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 4cf38b63cb975..256599ce2213f 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -57,7 +57,6 @@ use opendal::Operator; use storages_common_cache::LoadParams; use storages_common_table_meta::meta::ClusterKey; use storages_common_table_meta::meta::SnapshotId; -use storages_common_table_meta::meta::Statistics as FuseStatistics; use storages_common_table_meta::meta::TableSnapshot; use storages_common_table_meta::meta::TableSnapshotStatistics; use storages_common_table_meta::meta::Versioned; @@ -73,7 +72,6 @@ use storages_common_table_meta::table::OPT_KEY_STORAGE_PREFIX; use storages_common_table_meta::table::OPT_KEY_TABLE_ATTACHED_DATA_URI; use storages_common_table_meta::table::OPT_KEY_TABLE_ATTACHED_READ_ONLY; use storages_common_table_meta::table::OPT_KEY_TABLE_COMPRESSION; -use uuid::Uuid; use crate::fuse_column::FuseTableColumnStatisticsProvider; use crate::fuse_type::FuseTableType; @@ -499,34 +497,12 @@ impl Table for FuseTable { if let Some(old_cluster_key_str) = self.cluster_key_str() && *old_cluster_key_str == cluster_key_str{ return Ok(()) } + + let previous = self.read_table_snapshot().await?; + let mut new_snapshot = TableSnapshot::from_opt_previous(previous, self.schema().as_ref()); let mut new_table_meta = self.get_table_info().meta.clone(); new_table_meta = new_table_meta.push_cluster_key(cluster_key_str); - let cluster_key_meta = new_table_meta.cluster_key(); - let schema = self.schema().as_ref().clone(); - - let prev = self.read_table_snapshot().await?; - let prev_version = self.snapshot_format_version(None).await?; - let prev_timestamp = prev.as_ref().and_then(|v| v.timestamp); - let prev_snapshot_id = prev.as_ref().map(|v| (v.snapshot_id, prev_version)); - let prev_statistics_location = prev - .as_ref() - .and_then(|v| v.table_statistics_location.clone()); - let (summary, segments) = if let Some(v) = prev { - (v.summary.clone(), v.segments.clone()) - } else { - (FuseStatistics::default(), vec![]) - }; - - let new_snapshot = TableSnapshot::new( - Uuid::new_v4(), - &prev_timestamp, - prev_snapshot_id, - schema, - summary, - segments, - cluster_key_meta, - prev_statistics_location, - ); + new_snapshot.cluster_key_meta = new_table_meta.cluster_key(); let mut table_info = self.table_info.clone(); table_info.meta = new_table_meta; @@ -549,38 +525,13 @@ impl Table for FuseTable { return Ok(()); } - let mut new_table_meta = self.get_table_info().meta.clone(); - new_table_meta.default_cluster_key = None; - new_table_meta.default_cluster_key_id = None; - - let schema = self.schema().as_ref().clone(); - - let prev = self.read_table_snapshot().await?; - let prev_version = self.snapshot_format_version(None).await?; - let prev_timestamp = prev.as_ref().and_then(|v| v.timestamp); - let prev_statistics_location = prev - .as_ref() - .and_then(|v| v.table_statistics_location.clone()); - let prev_snapshot_id = prev.as_ref().map(|v| (v.snapshot_id, prev_version)); - let (summary, segments) = if let Some(v) = prev { - (v.summary.clone(), v.segments.clone()) - } else { - (FuseStatistics::default(), vec![]) - }; - - let new_snapshot = TableSnapshot::new( - Uuid::new_v4(), - &prev_timestamp, - prev_snapshot_id, - schema, - summary, - segments, - None, - prev_statistics_location, - ); + let previous = self.read_table_snapshot().await?; + let mut new_snapshot = TableSnapshot::from_opt_previous(previous, self.schema().as_ref()); + new_snapshot.cluster_key_meta = None; let mut table_info = self.table_info.clone(); - table_info.meta = new_table_meta; + table_info.meta.default_cluster_key = None; + table_info.meta.default_cluster_key_id = None; FuseTable::commit_to_meta_server( ctx.as_ref(), diff --git a/src/query/storages/fuse/src/io/write/meta_writer.rs b/src/query/storages/fuse/src/io/write/meta_writer.rs index ea4ae9522a34b..62b1ffb101949 100644 --- a/src/query/storages/fuse/src/io/write/meta_writer.rs +++ b/src/query/storages/fuse/src/io/write/meta_writer.rs @@ -140,6 +140,7 @@ mod tests { &None, None, TableSchema::default(), + 0, Statistics::default(), vec![], None, @@ -157,6 +158,7 @@ mod tests { &None, None, TableSchema::default(), + 0, Statistics::default(), vec![], None, diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index 65b0e99108aed..3494627a6caf2 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -58,7 +58,7 @@ enum State { None, FillDefault, TryLock, - RefreshTable, + RefreshTable(u64), GenerateSnapshot { previous: Option>, cluster_key_meta: Option, @@ -204,7 +204,7 @@ where F: SnapshotGenerator + Send + 'static &self.state, State::FillDefault | State::TryCommit { .. } - | State::RefreshTable + | State::RefreshTable(_) | State::AbortOperation ) { return Ok(Event::Async); @@ -329,6 +329,7 @@ where F: SnapshotGenerator + Send + 'static .snapshot_location_from_uuid(&snapshot.snapshot_id, TableSnapshot::VERSION)?; self.dal.write(&location, data).await?; + let stream_count = snapshot.stream_count; match FuseTable::update_table_meta( self.ctx.as_ref(), @@ -398,11 +399,8 @@ where F: SnapshotGenerator + Send + 'static Err(e) if self.is_error_recoverable(&e) => { let table_info = self.table.get_table_info(); // If change tracking is enabled, we cannot retry the commit operation. - match ( - self.backoff.next_backoff(), - self.table.change_tracking_enabled(), - ) { - (Some(d), false) => { + match self.backoff.next_backoff() { + Some(d) => { let name = table_info.name.clone(); debug!( "got error TableVersionMismatched, tx will be retried {} ms later. table name {}, identity {}", @@ -412,7 +410,7 @@ where F: SnapshotGenerator + Send + 'static ); common_base::base::tokio::time::sleep(d).await; self.retries += 1; - self.state = State::RefreshTable; + self.state = State::RefreshTable(stream_count); } _ => { // Commit not fulfilled. try to abort the operations. @@ -442,16 +440,25 @@ where F: SnapshotGenerator + Send + 'static } }; } - State::RefreshTable => { + State::RefreshTable(stream_count) => { self.table = self.table.refresh(self.ctx.as_ref()).await?; let fuse_table = FuseTable::try_from_table(self.table.as_ref())?.to_owned(); let previous = fuse_table.read_table_snapshot().await?; - let cluster_key_meta = fuse_table.cluster_key_meta.clone(); - self.state = State::GenerateSnapshot { - previous, - cluster_key_meta, - table_info: fuse_table.table_info.clone(), - }; + let new_stream_count = previous.as_ref().map_or(0, |p| p.stream_count); + if new_stream_count != stream_count { + // stream count has changed, abort the operation. + warn!( + "stream count has changed when commit, previous stream count: {}, current stream count: {}", + stream_count, new_stream_count + ); + self.state = State::AbortOperation; + } else { + self.state = State::GenerateSnapshot { + previous, + cluster_key_meta: fuse_table.cluster_key_meta.clone(), + table_info: fuse_table.table_info.clone(), + }; + } } State::AbortOperation => { let duration = self.start_time.elapsed(); diff --git a/src/query/storages/fuse/src/operations/common/snapshot_generator.rs b/src/query/storages/fuse/src/operations/common/snapshot_generator.rs index 91672ef032944..b95fc6f3b149c 100644 --- a/src/query/storages/fuse/src/operations/common/snapshot_generator.rs +++ b/src/query/storages/fuse/src/operations/common/snapshot_generator.rs @@ -262,6 +262,7 @@ impl SnapshotGenerator for MutationGenerator { &previous.timestamp, Some((previous.snapshot_id, previous.format_version)), schema, + previous.stream_count, new_summary, new_segments, cluster_key_meta, @@ -374,12 +375,14 @@ impl SnapshotGenerator for AppendGenerator { let mut prev_timestamp = None; let mut prev_snapshot_id = None; let mut table_statistics_location = None; + let mut stream_count = 0; let mut new_segments = snapshot_merged.merged_segments.clone(); let mut new_summary = snapshot_merged.merged_statistics.clone(); if let Some(snapshot) = &previous { prev_timestamp = snapshot.timestamp; prev_snapshot_id = Some((snapshot.snapshot_id, snapshot.format_version)); + stream_count = snapshot.stream_count; table_statistics_location = snapshot.table_statistics_location.clone(); if !self.overwrite { @@ -426,6 +429,7 @@ impl SnapshotGenerator for AppendGenerator { &prev_timestamp, prev_snapshot_id, schema, + stream_count, new_summary, new_segments, cluster_key_meta, diff --git a/src/query/storages/fuse/src/operations/truncate.rs b/src/query/storages/fuse/src/operations/truncate.rs index b70b0267d5e3a..2b0f08f27d215 100644 --- a/src/query/storages/fuse/src/operations/truncate.rs +++ b/src/query/storages/fuse/src/operations/truncate.rs @@ -40,6 +40,7 @@ impl FuseTable { &prev_snapshot.timestamp, Some((prev_id, prev_format_version)), prev_snapshot.schema.clone(), + prev_snapshot.stream_count, Default::default(), vec![], self.cluster_key_meta.clone(), diff --git a/src/query/storages/stream/src/stream_table.rs b/src/query/storages/stream/src/stream_table.rs index 7b9052d12495b..bf2875fbfa98a 100644 --- a/src/query/storages/stream/src/stream_table.rs +++ b/src/query/storages/stream/src/stream_table.rs @@ -228,39 +228,47 @@ impl StreamTable { HashSet::new() }; - let fuse_segment_io = - SegmentsIO::create(ctx.clone(), operator.clone(), fuse_table.schema()); - - let diff_in_base = base_segments - .difference(&latest_segments) - .cloned() - .collect::>(); - let diff_in_base = fuse_segment_io - .read_segments::(&diff_in_base, true) - .await?; - let mut base_blocks = HashSet::new(); - for segment in diff_in_base { - let segment = segment?; - segment.blocks.into_iter().for_each(|block| { - base_blocks.insert(block.location.clone()); - }) - } - - let diff_in_latest = latest_segments - .difference(&base_segments) - .cloned() - .collect::>(); - let diff_in_latest = fuse_segment_io - .read_segments::(&diff_in_latest, true) - .await?; let mut latest_blocks = Vec::new(); - for segment in diff_in_latest { - let segment = segment?; - segment.blocks.into_iter().for_each(|block| { - if !base_blocks.contains(&block.location) { - latest_blocks.push(block); + { + let fuse_segment_io = + SegmentsIO::create(ctx.clone(), operator.clone(), fuse_table.schema()); + let chunk_size = ctx.get_settings().get_max_threads()? as usize * 4; + + let mut base_blocks = HashSet::new(); + let diff_in_base = base_segments + .difference(&latest_segments) + .cloned() + .collect::>(); + for chunk in diff_in_base.chunks(chunk_size) { + let segments = fuse_segment_io + .read_segments::(chunk, true) + .await?; + for segment in segments { + let segment = segment?; + segment.blocks.into_iter().for_each(|block| { + base_blocks.insert(block.location.clone()); + }) + } + } + + let diff_in_latest = latest_segments + .difference(&base_segments) + .cloned() + .collect::>(); + for chunk in diff_in_latest.chunks(chunk_size) { + let segments = fuse_segment_io + .read_segments::(chunk, true) + .await?; + + for segment in segments { + let segment = segment?; + segment.blocks.into_iter().for_each(|block| { + if !base_blocks.contains(&block.location) { + latest_blocks.push(block); + } + }); } - }); + } } if latest_blocks.is_empty() { return Ok((PartStatistics::default(), Partitions::default()));