Skip to content

Commit

Permalink
feat: move gc::utils to test_kits crate (#11340)
Browse files Browse the repository at this point in the history
* feat: move table_test_fixture to test_utils crate

* feat: move table_test_fixture to test_utils crate

* feat: move table_test_fixture to test_kits crate

* feat: move table_test_fixture to test_kits crate

* feat: move gc::utils to test_kits crate

* feat: move gc::utils to test_kits crate

* feat: move gc::utils to test_kits crate

* feat: move gc::utils to test_kits crate
  • Loading branch information
lichuang authored May 6, 2023
1 parent d23e90a commit 1fe3aa9
Show file tree
Hide file tree
Showing 11 changed files with 257 additions and 248 deletions.
1 change: 1 addition & 0 deletions src/query/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#![feature(vec_into_raw_parts)]
#![feature(associated_type_bounds)]
#![feature(hash_drain_filter)]
#![feature(io_error_other)]

extern crate core;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
// Copyright 2021 Datafuse Labs.
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_arrow::parquet::metadata::ThriftFileMetaData;
use common_exception::Result;
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/test_kits/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
// limitations under the License.

#![allow(clippy::too_many_arguments)]

pub mod block_writer;
pub mod config;
pub mod context;
#[allow(dead_code)]
pub mod sessions;
pub mod table_test_fixture;
pub mod utils;

pub use config::ConfigBuilder;
pub use context::create_query_context;
Expand Down
228 changes: 228 additions & 0 deletions src/query/service/src/test_kits/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io::Error;
use std::sync::Arc;
use std::vec;

use chrono::DateTime;
use chrono::Duration;
use chrono::Utc;
use common_exception::Result;
use common_expression::BlockThresholds;
use common_expression::DataBlock;
use common_storages_factory::Table;
use common_storages_fuse::io::MetaWriter;
use common_storages_fuse::io::SegmentWriter;
use common_storages_fuse::statistics::gen_columns_statistics;
use common_storages_fuse::statistics::merge_statistics;
use common_storages_fuse::statistics::reducers::reduce_block_metas;
use common_storages_fuse::FuseStorageFormat;
use common_storages_fuse::FuseTable;
use common_storages_fuse::FUSE_TBL_SEGMENT_PREFIX;
use futures_util::TryStreamExt;
use opendal::Operator;
use serde::Serialize;
use storages_common_table_meta::meta::BlockMeta;
use storages_common_table_meta::meta::Location;
use storages_common_table_meta::meta::SegmentInfo;
use storages_common_table_meta::meta::SegmentInfoV2;
use storages_common_table_meta::meta::Statistics;
use storages_common_table_meta::meta::TableSnapshot;
use storages_common_table_meta::meta::TableSnapshotV2;
use storages_common_table_meta::meta::Versioned;
use uuid::Uuid;

use super::block_writer::BlockWriter;
use super::TestFixture;

pub async fn generate_snapshot_with_segments(
fuse_table: &FuseTable,
segment_locations: Vec<Location>,
time_stamp: Option<DateTime<Utc>>,
) -> Result<String> {
let current_snapshot = fuse_table.read_table_snapshot().await?.unwrap();
let operator = fuse_table.get_operator();
let location_gen = fuse_table.meta_location_generator();
let mut new_snapshot = TableSnapshot::from_previous(current_snapshot.as_ref());
new_snapshot.segments = segment_locations;
let new_snapshot_location = location_gen
.snapshot_location_from_uuid(&new_snapshot.snapshot_id, TableSnapshot::VERSION)?;
if let Some(ts) = time_stamp {
new_snapshot.timestamp = Some(ts)
}

new_snapshot
.write_meta(&operator, &new_snapshot_location)
.await?;

Ok(new_snapshot_location)
}

pub async fn generate_segments_v2(
fuse_table: &FuseTable,
number_of_segments: usize,
blocks_per_segment: usize,
) -> Result<Vec<(Location, SegmentInfoV2)>> {
let mut segs = vec![];
for _ in 0..number_of_segments {
let dal = fuse_table.get_operator_ref();
let block_metas = generate_blocks(fuse_table, blocks_per_segment).await?;
let summary = reduce_block_metas(&block_metas, BlockThresholds::default())?;
let segment_info = SegmentInfoV2::new(block_metas, summary);
let uuid = Uuid::new_v4();
let location = format!(
"{}/{}/{}_v{}.json",
&fuse_table.meta_location_generator().prefix(),
FUSE_TBL_SEGMENT_PREFIX,
uuid,
SegmentInfoV2::VERSION,
);
write_v2_to_storage(dal, &location, &segment_info).await?;
segs.push(((location, SegmentInfoV2::VERSION), segment_info))
}
Ok(segs)
}

pub async fn generate_segments(
fuse_table: &FuseTable,
number_of_segments: usize,
blocks_per_segment: usize,
) -> Result<Vec<(Location, SegmentInfo)>> {
let mut segs = vec![];
for _ in 0..number_of_segments {
let dal = fuse_table.get_operator_ref();
let block_metas = generate_blocks(fuse_table, blocks_per_segment).await?;
let summary = reduce_block_metas(&block_metas, BlockThresholds::default())?;
let segment_info = SegmentInfo::new(block_metas, summary);
let segment_writer = SegmentWriter::new(dal, fuse_table.meta_location_generator());
let segment_location = segment_writer.write_segment_no_cache(&segment_info).await?;
segs.push((segment_location, segment_info))
}
Ok(segs)
}

async fn generate_blocks(fuse_table: &FuseTable, num_blocks: usize) -> Result<Vec<Arc<BlockMeta>>> {
let dal = fuse_table.get_operator_ref();
let schema = fuse_table.schema();
let block_writer = BlockWriter::new(dal, fuse_table.meta_location_generator());
let mut block_metas = vec![];

// does not matter in this suite
let rows_per_block = 1;
let value_start_from = 1;

let stream =
TestFixture::gen_sample_blocks_stream_ex(num_blocks, rows_per_block, value_start_from);

let blocks: std::vec::Vec<DataBlock> = stream.try_collect().await?;
for block in blocks {
let stats = gen_columns_statistics(&block, None, &schema)?;
let (block_meta, _index_meta) = block_writer
.write(FuseStorageFormat::Parquet, &schema, block, stats, None)
.await?;
block_metas.push(Arc::new(block_meta));
}
Ok(block_metas)
}

pub async fn generate_snapshots(fixture: &TestFixture) -> Result<()> {
let now = Utc::now();
let schema = TestFixture::default_table_schema();

let table = fixture.latest_default_table().await?;
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let location_gen = fuse_table.meta_location_generator();
let operator = fuse_table.get_operator();

// generate 1 v2 segments, 2 blocks.
let segments_v2 = generate_segments_v2(fuse_table, 1, 2).await?;

// create snapshot 0, the format version is 2.
let locations = vec![segments_v2[0].0.clone()];
let id = Uuid::new_v4();
let mut snapshot_0 = TableSnapshotV2::new(
id,
&None,
None,
schema.as_ref().clone(),
segments_v2[0].1.summary.clone(),
locations,
None,
None,
);
snapshot_0.timestamp = Some(now - Duration::hours(13));

let new_snapshot_location = location_gen
.snapshot_location_from_uuid(&snapshot_0.snapshot_id, TableSnapshotV2::VERSION)?;
write_v2_to_storage(&operator, &new_snapshot_location, &snapshot_0).await?;

// generate 2 segments, 4 blocks.
let num_of_segments = 2;
let blocks_per_segment = 2;
let segments_v3 = generate_segments(fuse_table, num_of_segments, blocks_per_segment).await?;

// create snapshot 1, the format version is 3.
let locations = vec![segments_v3[0].0.clone(), segments_v2[0].0.clone()];
let mut snapshot_1 = TableSnapshot::new(
Uuid::new_v4(),
&snapshot_0.timestamp,
Some((snapshot_0.snapshot_id, TableSnapshotV2::VERSION)),
schema.as_ref().clone(),
Statistics::default(),
locations,
None,
None,
);
snapshot_1.timestamp = Some(now - Duration::hours(12));
snapshot_1.summary = merge_statistics(&snapshot_0.summary, &segments_v3[0].1.summary)?;
let new_snapshot_location = location_gen
.snapshot_location_from_uuid(&snapshot_1.snapshot_id, TableSnapshot::VERSION)?;
snapshot_1
.write_meta(&operator, &new_snapshot_location)
.await?;

// create snapshot 2, the format version is 3.
let locations = vec![
segments_v3[1].0.clone(),
segments_v3[0].0.clone(),
segments_v2[0].0.clone(),
];
let mut snapshot_2 = TableSnapshot::from_previous(&snapshot_1);
snapshot_2.segments = locations;
snapshot_2.timestamp = Some(now);
snapshot_2.summary = merge_statistics(&snapshot_1.summary, &segments_v3[1].1.summary)?;
let new_snapshot_location = location_gen
.snapshot_location_from_uuid(&snapshot_2.snapshot_id, TableSnapshot::VERSION)?;
snapshot_2
.write_meta(&operator, &new_snapshot_location)
.await?;
FuseTable::commit_to_meta_server(
fixture.ctx().as_ref(),
fuse_table.get_table_info(),
location_gen,
snapshot_2,
None,
&None,
&operator,
)
.await
}

async fn write_v2_to_storage<T>(data_accessor: &Operator, location: &str, meta: &T) -> Result<()>
where T: Serialize {
let bs = serde_json::to_vec(&meta).map_err(Error::other)?;
data_accessor.write(location, bs).await?;
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use common_expression::TableSchemaRefExt;
use common_storages_fuse::io::TableMetaLocationGenerator;
use common_storages_fuse::statistics::gen_columns_statistics;
use common_storages_fuse::FuseStorageFormat;
use databend_query::test_kits::block_writer::BlockWriter;
use opendal::Operator;
use storages_common_cache::InMemoryCacheBuilder;
use storages_common_cache::InMemoryItemCacheHolder;
Expand All @@ -36,8 +37,6 @@ use sysinfo::System;
use sysinfo::SystemExt;
use uuid::Uuid;

use crate::storages::fuse::block_writer::BlockWriter;

// NOTE:
//
// usage of memory is observed at *process* level, please do not combine them into
Expand Down
1 change: 0 additions & 1 deletion src/query/service/tests/it/storages/fuse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

#![allow(clippy::too_many_arguments)]
mod block_writer;
mod bloom_index_meta_size;
mod io;
mod meta;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ use common_storages_fuse::FuseTable;
use common_storages_fuse::FUSE_TBL_SNAPSHOT_PREFIX;
use dashmap::DashMap;
use databend_query::sessions::QueryContext;
use databend_query::test_kits::block_writer::BlockWriter;
use databend_query::test_kits::table_test_fixture::execute_query;
use databend_query::test_kits::table_test_fixture::TestFixture;
use futures::TryStreamExt;
Expand All @@ -92,7 +93,6 @@ use storages_common_table_meta::meta::SegmentInfo;
use storages_common_table_meta::meta::Statistics;
use walkdir::WalkDir;

use crate::storages::fuse::block_writer::BlockWriter;
use crate::storages::fuse::operations::mutation::CompactSegmentTestFixture;

#[tokio::test(flavor = "multi_thread")]
Expand Down
Loading

1 comment on commit 1fe3aa9

@vercel
Copy link

@vercel vercel bot commented on 1fe3aa9 May 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-git-main-databend.vercel.app
databend.rs
databend.vercel.app
databend-databend.vercel.app

Please sign in to comment.