Skip to content

Commit

Permalink
simple read segments (#11433)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass authored May 13, 2023
1 parent 7046a9f commit 75281fc
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 110 deletions.
90 changes: 14 additions & 76 deletions src/query/storages/fuse/src/io/segments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl SegmentsIO {
segment_location: Location,
table_schema: TableSchemaRef,
put_cache: bool,
) -> Result<Arc<SegmentInfo>> {
) -> Result<SegmentInfo> {
let (path, ver) = segment_location;
let reader = MetaReaders::segment_info_reader(dal, table_schema);

Expand All @@ -73,95 +73,33 @@ impl SegmentsIO {
};

let raw_bytes = reader.read(&load_params).await?;
let segment_info = SegmentInfo::try_from(raw_bytes.as_ref())?;
Ok(Arc::new(segment_info))
SegmentInfo::try_from(raw_bytes.as_ref())
}

// Read all segments information from s3 in concurrently.
#[tracing::instrument(level = "debug", skip_all)]
#[async_backtrace::framed]
pub async fn read_segments(
&self,
segment_locations: &[Location],
put_cache: bool,
) -> Result<Vec<Result<Arc<SegmentInfo>>>> {
if segment_locations.is_empty() {
return Ok(vec![]);
}

// combine all the tasks.
let mut iter = segment_locations.iter();
let schema = self.schema.clone();
let tasks = std::iter::from_fn(move || {
if let Some(location) = iter.next() {
let location = location.clone();
Some(
Self::read_segment(self.operator.clone(), location, schema.clone(), put_cache)
.instrument(tracing::debug_span!("read_segment")),
)
} else {
None
}
});

let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;
execute_futures_in_parallel(
tasks,
threads_nums,
permit_nums,
"fuse-req-segments-worker".to_owned(),
)
.await
}

#[async_backtrace::framed]
pub async fn read_segment_into<T>(
dal: Operator,
segment_location: Location,
table_schema: TableSchemaRef,
put_cache: bool,
) -> Result<T>
where
T: From<Arc<SegmentInfo>> + Send + 'static,
{
let (path, ver) = segment_location;
let reader = MetaReaders::segment_info_reader(dal, table_schema);

// Keep in mind that segment_info_read must need a schema
let load_params = LoadParams {
location: path.clone(),
len_hint: None,
ver,
put_cache,
};

let compact_segment = reader.read(&load_params).await?;
let segment = Arc::new(SegmentInfo::try_from(compact_segment.as_ref())?);
Ok(segment.into())
}

#[tracing::instrument(level = "debug", skip_all)]
#[async_backtrace::framed]
pub async fn read_segments_into<T>(
pub async fn read_segments<T>(
&self,
segment_locations: &[Location],
put_cache: bool,
) -> Result<Vec<Result<T>>>
where
T: From<Arc<SegmentInfo>> + Send + 'static,
T: From<SegmentInfo> + Send + 'static,
{
// combine all the tasks.
let mut iter = segment_locations.iter();
let tasks = std::iter::from_fn(move || {
let tasks = std::iter::from_fn(|| {
iter.next().map(|location| {
Self::read_segment_into(
self.operator.clone(),
location.clone(),
self.schema.clone(),
put_cache,
)
.instrument(tracing::debug_span!("read_location_tuples"))
let dal = self.operator.clone();
let table_schema = self.schema.clone();
let segment_location = location.clone();
async move {
let segment =
Self::read_segment(dal, segment_location, table_schema, put_cache).await?;
Ok(segment.into())
}
.instrument(tracing::debug_span!("read_segments"))
})
});

Expand Down
5 changes: 4 additions & 1 deletion src/query/storages/fuse/src/operations/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use common_catalog::table::Table;
use common_catalog::table_context::TableContext;
use common_exception::ErrorCode;
use common_exception::Result;
use storages_common_table_meta::meta::SegmentInfo;
use storages_common_table_meta::meta::TableSnapshot;
use storages_common_table_meta::meta::TableSnapshotStatistics;
use tracing::info;
Expand Down Expand Up @@ -66,7 +67,9 @@ impl FuseTable {
stats_of_columns.push(col_stats.clone());
}

let segments = segments_io.read_segments(chunk, true).await?;
let segments = segments_io
.read_segments::<Arc<SegmentInfo>>(chunk, true)
.await?;
for segment in segments {
let segment = segment?;
stats_of_columns.push(segment.summary.col_stats.clone());
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/fuse/src/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ impl FuseTable {

let fuse_segment_io = SegmentsIO::create(ctx, operator, schema);
let concurrent_appended_segment_infos = fuse_segment_io
.read_segments(concurrently_appended_segment_locations, true)
.read_segments::<Arc<SegmentInfo>>(concurrently_appended_segment_locations, true)
.await?;

let mut new_statistics = base_summary.clone();
Expand Down
6 changes: 3 additions & 3 deletions src/query/storages/fuse/src/operations/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ impl FuseTable {

let fuse_segments = SegmentsIO::create(ctx.clone(), self.operator.clone(), self.schema());
let results = fuse_segments
.read_segments_into::<LocationTuple>(segment_locations, put_cache)
.read_segments::<LocationTuple>(segment_locations, put_cache)
.await?;
for (idx, location_tuple) in results.into_iter().enumerate() {
let location_tuple = match location_tuple {
Expand Down Expand Up @@ -586,8 +586,8 @@ pub struct LocationTuple {
pub bloom_location: HashSet<String>,
}

impl From<Arc<SegmentInfo>> for LocationTuple {
fn from(value: Arc<SegmentInfo>) -> Self {
impl From<SegmentInfo> for LocationTuple {
fn from(value: SegmentInfo) -> Self {
let mut block_location = HashSet::new();
let mut bloom_location = HashSet::new();
for block_meta in &value.blocks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,31 +245,29 @@ impl MutationAccumulator {

tasks.push(async move {
// read the old segment
let segment_info =
let mut segment_info =
SegmentsIO::read_segment(op.clone(), location, schema, false).await?;
// prepare the new segment
let mut new_segment =
SegmentInfo::new(segment_info.blocks.clone(), segment_info.summary.clone());

// take away the blocks, they are being mutated
let mut block_editor = BTreeMap::<_, _>::from_iter(
std::mem::take(&mut new_segment.blocks)
std::mem::take(&mut segment_info.blocks)
.into_iter()
.enumerate(),
);

for (idx, new_meta) in segment_mutation.replaced_blocks {
block_editor.insert(idx, new_meta);
}
for idx in segment_mutation.deleted_blocks {
block_editor.remove(&idx);
}

// assign back the mutated blocks to segment
new_segment.blocks = block_editor.into_values().collect();
if !new_segment.blocks.is_empty() {
if !block_editor.is_empty() {
// assign back the mutated blocks to segment
let new_blocks = block_editor.into_values().collect::<Vec<_>>();
// re-calculate the segment statistics
let new_summary = reduce_block_metas(&new_segment.blocks, thresholds)?;
new_segment.summary = new_summary.clone();
let new_summary = reduce_block_metas(&new_blocks, thresholds)?;
// create new segment info
let new_segment = SegmentInfo::new(new_blocks, new_summary.clone());

// write the segment info.
let location = location_gen.gen_segment_info_location();
Expand All @@ -282,13 +280,13 @@ impl MutationAccumulator {
Ok(SegmentLite {
index,
new_segment_info: Some((location, new_summary)),
origin_summary: segment_info.summary.clone(),
origin_summary: segment_info.summary,
})
} else {
Ok(SegmentLite {
index,
new_segment_info: None,
origin_summary: segment_info.summary.clone(),
origin_summary: segment_info.summary,
})
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use common_expression::BlockMetaInfoDowncast;
use common_expression::BlockMetaInfoPtr;
use opendal::Operator;
use storages_common_table_meta::meta::Location;
use storages_common_table_meta::meta::SegmentInfo;
use storages_common_table_meta::meta::Statistics;
use storages_common_table_meta::meta::TableSnapshot;

Expand Down Expand Up @@ -250,8 +251,9 @@ impl Processor for CommitSink {
.collect();
let segments_io =
SegmentsIO::create(self.ctx.clone(), self.dal.clone(), self.table.schema());
let append_segment_infos =
segments_io.read_segments(&appended_segments, true).await?;
let append_segment_infos = segments_io
.read_segments::<Arc<SegmentInfo>>(&appended_segments, true)
.await?;
for result in append_segment_infos.into_iter() {
let appended_segment = result?;
merge_statistics_mut(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,13 @@ impl BlockCompactMutator {
for chunk in segment_locations.chunks(max_io_requests) {
// Read the segments information in parallel.
let segment_infos = segments_io
.read_segments(chunk, false)
.await?
.into_iter()
.collect::<Result<Vec<_>>>()?;
.read_segments::<Arc<SegmentInfo>>(chunk, false)
.await?;

// Check the segment to be compacted.
// Size of compacted segment should be in range R == [threshold, 2 * threshold)
for (idx, segment) in segment_infos.iter().enumerate() {
for (idx, segment) in segment_infos.into_iter().enumerate() {
let segment = segment?;
let segments_vec = checker.add(chunk[idx].clone(), segment.clone());
for segments in segments_vec {
if SegmentCompactChecker::check_for_compact(&segments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,11 @@ impl<'a> SegmentCompactor<'a> {
let mut is_end = false;
for chunk in reverse_locations.chunks(chunk_size) {
let segment_infos = segments_io
.read_segments(chunk, false)
.await?
.into_iter()
.collect::<Result<Vec<_>>>()?;
.read_segments::<Arc<SegmentInfo>>(chunk, false)
.await?;

for (segment, location) in segment_infos.into_iter().zip(chunk.iter()) {
let segment = segment?;
self.add(segment, location.clone()).await?;
let compacted = self.num_fragments_compacted();
checked_end_at += 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use jsonb::Value as JsonbValue;
use serde_json::json;
use serde_json::Value as JsonValue;
use storages_common_table_meta::meta::BlockMeta;
use storages_common_table_meta::meta::SegmentInfo;

use crate::io::SegmentsIO;
use crate::sessions::TableContext;
Expand Down Expand Up @@ -96,7 +97,7 @@ impl<'a> ClusteringInformation<'a> {
self.table.schema(),
);
let segments = segments_io
.read_segments(segment_locations, true)
.read_segments::<Arc<SegmentInfo>>(segment_locations, true)
.await?
.into_iter()
.collect::<Result<Vec<_>>>()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use common_expression::TableSchema;
use common_expression::TableSchemaRefExt;
use common_expression::Value;
use futures_util::TryStreamExt;
use storages_common_table_meta::meta::SegmentInfo;
use storages_common_table_meta::meta::TableSnapshot;

use crate::io::MetaReaders;
Expand Down Expand Up @@ -118,7 +119,7 @@ impl<'a> FuseBlock<'a> {
self.table.schema(),
);
let segments = segments_io
.read_segments(&snapshot.segments[..len], true)
.read_segments::<Arc<SegmentInfo>>(&snapshot.segments[..len], true)
.await?;
for segment in segments {
let segment = segment?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use common_expression::TableSchema;
use common_expression::TableSchemaRefExt;
use futures_util::TryStreamExt;
use storages_common_table_meta::meta::Location;
use storages_common_table_meta::meta::SegmentInfo;

use crate::io::MetaReaders;
use crate::io::SegmentsIO;
Expand Down Expand Up @@ -94,9 +95,11 @@ impl<'a> FuseSegment<'a> {
self.table.operator.clone(),
self.table.schema(),
);
let segments = segments_io.read_segments(segment_locations, true).await?;
for (idx, segment) in segments.iter().enumerate() {
let segment = segment.clone()?;
let segments = segments_io
.read_segments::<Arc<SegmentInfo>>(segment_locations, true)
.await?;
for (idx, segment) in segments.into_iter().enumerate() {
let segment = segment?;
format_versions.push(segment_locations[idx].1);
block_count.push(segment.summary.block_count);
row_count.push(segment.summary.row_count);
Expand Down

1 comment on commit 75281fc

@vercel
Copy link

@vercel vercel bot commented on 75281fc May 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-git-main-databend.vercel.app
databend.vercel.app
databend-databend.vercel.app
databend.rs

Please sign in to comment.