Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: simple read segments #11433

Merged
merged 1 commit into from
May 13, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 14 additions & 76 deletions src/query/storages/fuse/src/io/segments.rs
Original file line number Diff line number Diff line change
@@ -60,7 +60,7 @@ impl SegmentsIO {
segment_location: Location,
table_schema: TableSchemaRef,
put_cache: bool,
) -> Result<Arc<SegmentInfo>> {
) -> Result<SegmentInfo> {
let (path, ver) = segment_location;
let reader = MetaReaders::segment_info_reader(dal, table_schema);

@@ -73,95 +73,33 @@ impl SegmentsIO {
};

let raw_bytes = reader.read(&load_params).await?;
let segment_info = SegmentInfo::try_from(raw_bytes.as_ref())?;
Ok(Arc::new(segment_info))
SegmentInfo::try_from(raw_bytes.as_ref())
}

// Read all segments information from s3 in concurrently.
#[tracing::instrument(level = "debug", skip_all)]
#[async_backtrace::framed]
pub async fn read_segments(
&self,
segment_locations: &[Location],
put_cache: bool,
) -> Result<Vec<Result<Arc<SegmentInfo>>>> {
if segment_locations.is_empty() {
return Ok(vec![]);
}

// combine all the tasks.
let mut iter = segment_locations.iter();
let schema = self.schema.clone();
let tasks = std::iter::from_fn(move || {
if let Some(location) = iter.next() {
let location = location.clone();
Some(
Self::read_segment(self.operator.clone(), location, schema.clone(), put_cache)
.instrument(tracing::debug_span!("read_segment")),
)
} else {
None
}
});

let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;
execute_futures_in_parallel(
tasks,
threads_nums,
permit_nums,
"fuse-req-segments-worker".to_owned(),
)
.await
}

#[async_backtrace::framed]
pub async fn read_segment_into<T>(
dal: Operator,
segment_location: Location,
table_schema: TableSchemaRef,
put_cache: bool,
) -> Result<T>
where
T: From<Arc<SegmentInfo>> + Send + 'static,
{
let (path, ver) = segment_location;
let reader = MetaReaders::segment_info_reader(dal, table_schema);

// Keep in mind that segment_info_read must need a schema
let load_params = LoadParams {
location: path.clone(),
len_hint: None,
ver,
put_cache,
};

let compact_segment = reader.read(&load_params).await?;
let segment = Arc::new(SegmentInfo::try_from(compact_segment.as_ref())?);
Ok(segment.into())
}

#[tracing::instrument(level = "debug", skip_all)]
#[async_backtrace::framed]
pub async fn read_segments_into<T>(
pub async fn read_segments<T>(
&self,
segment_locations: &[Location],
put_cache: bool,
) -> Result<Vec<Result<T>>>
where
T: From<Arc<SegmentInfo>> + Send + 'static,
T: From<SegmentInfo> + Send + 'static,
{
// combine all the tasks.
let mut iter = segment_locations.iter();
let tasks = std::iter::from_fn(move || {
let tasks = std::iter::from_fn(|| {
iter.next().map(|location| {
Self::read_segment_into(
self.operator.clone(),
location.clone(),
self.schema.clone(),
put_cache,
)
.instrument(tracing::debug_span!("read_location_tuples"))
let dal = self.operator.clone();
let table_schema = self.schema.clone();
let segment_location = location.clone();
async move {
let segment =
Self::read_segment(dal, segment_location, table_schema, put_cache).await?;
Ok(segment.into())
}
.instrument(tracing::debug_span!("read_segments"))
})
});

5 changes: 4 additions & 1 deletion src/query/storages/fuse/src/operations/analyze.rs
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ use common_catalog::table::Table;
use common_catalog::table_context::TableContext;
use common_exception::ErrorCode;
use common_exception::Result;
use storages_common_table_meta::meta::SegmentInfo;
use storages_common_table_meta::meta::TableSnapshot;
use storages_common_table_meta::meta::TableSnapshotStatistics;
use tracing::info;
@@ -66,7 +67,9 @@ impl FuseTable {
stats_of_columns.push(col_stats.clone());
}

let segments = segments_io.read_segments(chunk, true).await?;
let segments = segments_io
.read_segments::<Arc<SegmentInfo>>(chunk, true)
.await?;
for segment in segments {
let segment = segment?;
stats_of_columns.push(segment.summary.col_stats.clone());
2 changes: 1 addition & 1 deletion src/query/storages/fuse/src/operations/commit.rs
Original file line number Diff line number Diff line change
@@ -665,7 +665,7 @@ impl FuseTable {

let fuse_segment_io = SegmentsIO::create(ctx, operator, schema);
let concurrent_appended_segment_infos = fuse_segment_io
.read_segments(concurrently_appended_segment_locations, true)
.read_segments::<Arc<SegmentInfo>>(concurrently_appended_segment_locations, true)
.await?;

let mut new_statistics = base_summary.clone();
6 changes: 3 additions & 3 deletions src/query/storages/fuse/src/operations/gc.rs
Original file line number Diff line number Diff line change
@@ -544,7 +544,7 @@ impl FuseTable {

let fuse_segments = SegmentsIO::create(ctx.clone(), self.operator.clone(), self.schema());
let results = fuse_segments
.read_segments_into::<LocationTuple>(segment_locations, put_cache)
.read_segments::<LocationTuple>(segment_locations, put_cache)
.await?;
for (idx, location_tuple) in results.into_iter().enumerate() {
let location_tuple = match location_tuple {
@@ -586,8 +586,8 @@ pub struct LocationTuple {
pub bloom_location: HashSet<String>,
}

impl From<Arc<SegmentInfo>> for LocationTuple {
fn from(value: Arc<SegmentInfo>) -> Self {
impl From<SegmentInfo> for LocationTuple {
fn from(value: SegmentInfo) -> Self {
let mut block_location = HashSet::new();
let mut bloom_location = HashSet::new();
for block_meta in &value.blocks {
Original file line number Diff line number Diff line change
@@ -245,31 +245,29 @@ impl MutationAccumulator {

tasks.push(async move {
// read the old segment
let segment_info =
let mut segment_info =
SegmentsIO::read_segment(op.clone(), location, schema, false).await?;
// prepare the new segment
let mut new_segment =
SegmentInfo::new(segment_info.blocks.clone(), segment_info.summary.clone());

// take away the blocks, they are being mutated
let mut block_editor = BTreeMap::<_, _>::from_iter(
std::mem::take(&mut new_segment.blocks)
std::mem::take(&mut segment_info.blocks)
.into_iter()
.enumerate(),
);

for (idx, new_meta) in segment_mutation.replaced_blocks {
block_editor.insert(idx, new_meta);
}
for idx in segment_mutation.deleted_blocks {
block_editor.remove(&idx);
}

// assign back the mutated blocks to segment
new_segment.blocks = block_editor.into_values().collect();
if !new_segment.blocks.is_empty() {
if !block_editor.is_empty() {
// assign back the mutated blocks to segment
let new_blocks = block_editor.into_values().collect::<Vec<_>>();
// re-calculate the segment statistics
let new_summary = reduce_block_metas(&new_segment.blocks, thresholds)?;
new_segment.summary = new_summary.clone();
let new_summary = reduce_block_metas(&new_blocks, thresholds)?;
// create new segment info
let new_segment = SegmentInfo::new(new_blocks, new_summary.clone());

// write the segment info.
let location = location_gen.gen_segment_info_location();
@@ -282,13 +280,13 @@ impl MutationAccumulator {
Ok(SegmentLite {
index,
new_segment_info: Some((location, new_summary)),
origin_summary: segment_info.summary.clone(),
origin_summary: segment_info.summary,
})
} else {
Ok(SegmentLite {
index,
new_segment_info: None,
origin_summary: segment_info.summary.clone(),
origin_summary: segment_info.summary,
})
}
});
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ use common_expression::BlockMetaInfoDowncast;
use common_expression::BlockMetaInfoPtr;
use opendal::Operator;
use storages_common_table_meta::meta::Location;
use storages_common_table_meta::meta::SegmentInfo;
use storages_common_table_meta::meta::Statistics;
use storages_common_table_meta::meta::TableSnapshot;

@@ -250,8 +251,9 @@ impl Processor for CommitSink {
.collect();
let segments_io =
SegmentsIO::create(self.ctx.clone(), self.dal.clone(), self.table.schema());
let append_segment_infos =
segments_io.read_segments(&appended_segments, true).await?;
let append_segment_infos = segments_io
.read_segments::<Arc<SegmentInfo>>(&appended_segments, true)
.await?;
for result in append_segment_infos.into_iter() {
let appended_segment = result?;
merge_statistics_mut(
Original file line number Diff line number Diff line change
@@ -112,14 +112,13 @@ impl BlockCompactMutator {
for chunk in segment_locations.chunks(max_io_requests) {
// Read the segments information in parallel.
let segment_infos = segments_io
.read_segments(chunk, false)
.await?
.into_iter()
.collect::<Result<Vec<_>>>()?;
.read_segments::<Arc<SegmentInfo>>(chunk, false)
.await?;

// Check the segment to be compacted.
// Size of compacted segment should be in range R == [threshold, 2 * threshold)
for (idx, segment) in segment_infos.iter().enumerate() {
for (idx, segment) in segment_infos.into_iter().enumerate() {
let segment = segment?;
let segments_vec = checker.add(chunk[idx].clone(), segment.clone());
for segments in segments_vec {
if SegmentCompactChecker::check_for_compact(&segments) {
Original file line number Diff line number Diff line change
@@ -210,12 +210,11 @@ impl<'a> SegmentCompactor<'a> {
let mut is_end = false;
for chunk in reverse_locations.chunks(chunk_size) {
let segment_infos = segments_io
.read_segments(chunk, false)
.await?
.into_iter()
.collect::<Result<Vec<_>>>()?;
.read_segments::<Arc<SegmentInfo>>(chunk, false)
.await?;

for (segment, location) in segment_infos.into_iter().zip(chunk.iter()) {
let segment = segment?;
self.add(segment, location.clone()).await?;
let compacted = self.num_fragments_compacted();
checked_end_at += 1;
Original file line number Diff line number Diff line change
@@ -35,6 +35,7 @@ use jsonb::Value as JsonbValue;
use serde_json::json;
use serde_json::Value as JsonValue;
use storages_common_table_meta::meta::BlockMeta;
use storages_common_table_meta::meta::SegmentInfo;

use crate::io::SegmentsIO;
use crate::sessions::TableContext;
@@ -96,7 +97,7 @@ impl<'a> ClusteringInformation<'a> {
self.table.schema(),
);
let segments = segments_io
.read_segments(segment_locations, true)
.read_segments::<Arc<SegmentInfo>>(segment_locations, true)
.await?
.into_iter()
.collect::<Result<Vec<_>>>()?;
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@ use common_expression::TableSchema;
use common_expression::TableSchemaRefExt;
use common_expression::Value;
use futures_util::TryStreamExt;
use storages_common_table_meta::meta::SegmentInfo;
use storages_common_table_meta::meta::TableSnapshot;

use crate::io::MetaReaders;
@@ -118,7 +119,7 @@ impl<'a> FuseBlock<'a> {
self.table.schema(),
);
let segments = segments_io
.read_segments(&snapshot.segments[..len], true)
.read_segments::<Arc<SegmentInfo>>(&snapshot.segments[..len], true)
.await?;
for segment in segments {
let segment = segment?;
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@ use common_expression::TableSchema;
use common_expression::TableSchemaRefExt;
use futures_util::TryStreamExt;
use storages_common_table_meta::meta::Location;
use storages_common_table_meta::meta::SegmentInfo;

use crate::io::MetaReaders;
use crate::io::SegmentsIO;
@@ -94,9 +95,11 @@ impl<'a> FuseSegment<'a> {
self.table.operator.clone(),
self.table.schema(),
);
let segments = segments_io.read_segments(segment_locations, true).await?;
for (idx, segment) in segments.iter().enumerate() {
let segment = segment.clone()?;
let segments = segments_io
.read_segments::<Arc<SegmentInfo>>(segment_locations, true)
.await?;
for (idx, segment) in segments.into_iter().enumerate() {
let segment = segment?;
format_versions.push(segment_locations[idx].1);
block_count.push(segment.summary.block_count);
row_count.push(segment.summary.row_count);