Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: skip add sequence column when compact #1618

Merged
merged 5 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

[workspace.package]
version = "2.2.0-alpha"
authors = ["HoraeDB Authors"]
authors = ["Apache HoraeDB(incubating) <dev@horaedb.apache.org>"]
edition = "2021"
license = "Apache-2.0"
repository = "https://github.com/apache/horaedb"
homepage = "https://horaedb.apache.org/"
description = "A high-performance, distributed, cloud native time-series database."

[workspace]
resolver = "2"
Expand Down
19 changes: 7 additions & 12 deletions src/benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,13 @@

[package]
name = "benchmarks"

[package.license]
workspace = true

[package.version]
workspace = true

[package.authors]
workspace = true

[package.edition]
workspace = true
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
homepage.workspace = true
description.workspace = true

[dependencies]
bytes = { workspace = true }
Expand Down
19 changes: 7 additions & 12 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,13 @@

[package]
name = "common"

[package.license]
workspace = true

[package.version]
workspace = true

[package.authors]
workspace = true

[package.edition]
workspace = true
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
homepage.workspace = true
description.workspace = true

[dependencies]
serde = { workspace = true }
Expand Down
19 changes: 7 additions & 12 deletions src/metric_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,13 @@

[package]
name = "metric_engine"

[package.license]
workspace = true

[package.version]
workspace = true

[package.authors]
workspace = true

[package.edition]
workspace = true
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
homepage.workspace = true
description.workspace = true

[dependencies]
anyhow = { workspace = true }
Expand Down
35 changes: 16 additions & 19 deletions src/metric_engine/src/compaction/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::sync::{
};

use anyhow::Context;
use arrow::array::{RecordBatch, UInt64Array};
use async_scoped::TokioScope;
use datafusion::{execution::TaskContext, physical_plan::execute_stream};
use futures::StreamExt;
Expand All @@ -38,7 +37,7 @@ use crate::{
ensure,
manifest::{ManifestRef, ManifestUpdate},
read::ParquetReader,
sst::{FileMeta, SstFile, SstPathGenerator},
sst::{FileId, FileMeta, SstFile, SstPathGenerator},
types::{ObjectStoreRef, RuntimeRef, StorageSchema},
Result,
};
Expand Down Expand Up @@ -162,10 +161,12 @@ impl Executor {
for f in &task.inputs[1..] {
time_range.merge(&f.meta().time_range);
}
let plan =
self.inner
.parquet_reader
.build_df_plan(task.inputs.clone(), None, Vec::new())?;
let plan = self.inner.parquet_reader.build_df_plan(
task.inputs.clone(),
None, // projection
Vec::new(), // predicate
true, // keep_sequence
)?;
let mut stream = execute_stream(plan, Arc::new(TaskContext::default()))
.context("execute datafusion plan")?;

Expand All @@ -185,16 +186,7 @@ impl Executor {
while let Some(batch) = stream.next().await {
let batch = batch.context("execute plan")?;
num_rows += batch.num_rows();
let batch_with_seq = {
let mut new_cols = batch.columns().to_vec();
// Since file_id in increasing order, we can use it as sequence.
let seq_column = Arc::new(UInt64Array::from(vec![file_id; batch.num_rows()]));
new_cols.push(seq_column);
RecordBatch::try_new(self.inner.schema.arrow_schema.clone(), new_cols)
.context("construct record batch with seq column")?
};

writer.write(&batch_with_seq).await.context("write batch")?;
writer.write(&batch).await.context("write batch")?;
}
writer.close().await.context("close writer")?;
let object_meta = self
Expand Down Expand Up @@ -225,9 +217,16 @@ impl Executor {

// From now on, no error should be returned!
// Because we have already updated manifest.
self.delete_ssts(to_deletes.into_iter());
Ok(())
}

fn delete_ssts<I>(&self, ids: I)
where
I: Iterator<Item = FileId>,
{
let (_, results) = TokioScope::scope_and_block(|scope| {
for id in to_deletes {
for id in ids {
let path = Path::from(self.inner.sst_path_gen.generate(id));
trace!(id, "Delete sst file");
scope.spawn(async move {
Expand All @@ -251,8 +250,6 @@ impl Executor {
}
}
}

Ok(())
}
}

Expand Down
69 changes: 48 additions & 21 deletions src/metric_engine/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ pub(crate) struct MergeExec {
seq_idx: usize,
/// Operator to merge values when primary keys are the same
value_operator: Arc<dyn MergeOperator>,
/// Whether to keep the sequence column in the output
keep_sequence: bool,
}

impl MergeExec {
Expand All @@ -113,12 +115,14 @@ impl MergeExec {
num_primary_keys: usize,
seq_idx: usize,
value_operator: Arc<dyn MergeOperator>,
keep_sequence: bool,
) -> Self {
Self {
input,
num_primary_keys,
seq_idx,
value_operator,
keep_sequence,
}
}
}
Expand All @@ -130,8 +134,8 @@ impl DisplayAs for MergeExec {
) -> std::fmt::Result {
write!(
f,
"MergeExec: [primary_keys: {}, seq_idx: {}]",
self.num_primary_keys, self.seq_idx
"MergeExec: [primary_keys: {}, seq_idx: {}, keep_sequence: {}]",
self.num_primary_keys, self.seq_idx, self.keep_sequence
)?;
Ok(())
}
Expand Down Expand Up @@ -171,6 +175,7 @@ impl ExecutionPlan for MergeExec {
self.num_primary_keys,
self.seq_idx,
self.value_operator.clone(),
self.keep_sequence,
)))
}

Expand All @@ -188,6 +193,7 @@ impl ExecutionPlan for MergeExec {
self.num_primary_keys,
self.seq_idx,
self.value_operator.clone(),
self.keep_sequence,
)))
}
}
Expand All @@ -197,6 +203,7 @@ struct MergeStream {
num_primary_keys: usize,
seq_idx: usize,
value_operator: MergeOperatorRef,
keep_sequence: bool,

pending_batch: Option<RecordBatch>,
arrow_schema: SchemaRef,
Expand All @@ -208,28 +215,37 @@ impl MergeStream {
num_primary_keys: usize,
seq_idx: usize,
value_operator: MergeOperatorRef,
keep_sequence: bool,
) -> Self {
let fields = stream
.schema()
.fields()
.into_iter()
.filter_map(|f| {
if f.name() == SEQ_COLUMN_NAME {
None
} else {
Some(f.clone())
}
})
.collect_vec();
let arrow_schema = Arc::new(Schema::new_with_metadata(
fields,
stream.schema().metadata.clone(),
));
let arrow_schema = if keep_sequence {
let schema = stream.schema();
let found_seq = schema.fields().iter().any(|f| f.name() == SEQ_COLUMN_NAME);
assert!(found_seq, "Sequence column not found");
schema
} else {
let fields = stream
.schema()
.fields()
.into_iter()
.filter_map(|f| {
if f.name() == SEQ_COLUMN_NAME {
None
} else {
Some(f.clone())
}
})
.collect_vec();
Arc::new(Schema::new_with_metadata(
fields,
stream.schema().metadata.clone(),
))
};
Self {
stream,
num_primary_keys,
seq_idx,
value_operator,
keep_sequence,
pending_batch: None,
arrow_schema,
}
Expand Down Expand Up @@ -313,6 +329,10 @@ impl MergeStream {

let mut output_batches =
concat_batches(&self.stream.schema(), output_batches.iter()).context("concat batch")?;
if self.keep_sequence {
return Ok(Some(output_batches));
}

// Remove seq column
output_batches.remove_column(self.seq_idx);
Ok(Some(output_batches))
Expand All @@ -331,7 +351,9 @@ impl Stream for MergeStream {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
let value = if let Some(mut pending) = self.pending_batch.take() {
pending.remove_column(self.seq_idx);
if !self.keep_sequence {
pending.remove_column(self.seq_idx);
}
let res = self
.value_operator
.merge(pending)
Expand Down Expand Up @@ -407,6 +429,7 @@ impl ParquetReader {
ssts: Vec<SstFile>,
projections: Option<Vec<usize>>,
predicates: Vec<Expr>,
keep_sequence: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
// we won't use url for selecting object_store.
let dummy_url = ObjectStoreUrl::parse("empty://").unwrap();
Expand Down Expand Up @@ -464,6 +487,7 @@ impl ParquetReader {
Arc::new(BytesMergeOperator::new(self.schema.value_idxes.clone()))
}
},
keep_sequence,
);
Ok(Arc::new(merge_exec))
}
Expand Down Expand Up @@ -536,7 +560,9 @@ mod tests {
.unwrap(),
]);

let stream = MergeStream::new(stream, 1, 2, merge_op);
let stream = MergeStream::new(
stream, 1, 2, merge_op, false, // keep_sequence
);
check_stream(Box::pin(stream), expected).await;
}

Expand Down Expand Up @@ -574,13 +600,14 @@ mod tests {
.collect(),
None,
vec![expr],
false, // keep_sequence
)
.unwrap();
let display_plan =
datafusion::physical_plan::display::DisplayableExecutionPlan::new(plan.as_ref())
.indent(true);
assert_eq!(
r#"MergeExec: [primary_keys: 1, seq_idx: 2]
r#"MergeExec: [primary_keys: 1, seq_idx: 2, keep_sequence: false]
SortPreservingMergeExec: [pk1@0 ASC, __seq__@2 ASC]
FilterExec: pk1@0 = 0
ParquetExec: file_groups={3 groups: [[mock/data/100.sst], [mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, value, __seq__], output_orderings=[[pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC]], predicate=pk1@0 = 0, pruning_predicate=CASE WHEN pk1_null_count@2 = pk1_row_count@3 THEN false ELSE pk1_min@0 <= 0 AND 0 <= pk1_max@1 END, required_guarantees=[pk1 in (0)]
Expand Down
1 change: 1 addition & 0 deletions src/metric_engine/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ impl TimeMergeStorage for CloudObjectStorage {
ssts,
req.projections.clone(),
req.predicate.clone(),
false, // keep_sequence
)?;

plan_for_all_segments.push(plan);
Expand Down
19 changes: 7 additions & 12 deletions src/pb_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,13 @@

[package]
name = "pb_types"

[package.license]
workspace = true

[package.version]
workspace = true

[package.authors]
workspace = true

[package.edition]
workspace = true
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
homepage.workspace = true
description.workspace = true

[dependencies]
prost = { workspace = true }
Expand Down
Loading
Loading