Skip to content

Commit

Permalink
Merge branch 'main' into cyx/non-proto-vnode-bitmap
Browse files Browse the repository at this point in the history
  • Loading branch information
xx01cyx committed Jun 7, 2022
2 parents 3eee2cb + 109e7d0 commit 728f899
Show file tree
Hide file tree
Showing 61 changed files with 1,483 additions and 605 deletions.
21 changes: 19 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ members = [
"src/utils/memcomparable",
"src/utils/pgwire",
"src/utils/static-link",
"src/utils/stats_alloc",
"src/utils/value-encoding",
"src/workspace-hack",
]
Expand Down
7 changes: 6 additions & 1 deletion e2e_test/batch/join.slt
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,9 @@ statement ok
drop table t2;

statement ok
drop table t3;
drop table t3;

query I
select count(*) from (values (1, 2), (3, 4)) as a, (values (9),(4),(1)) as b;
----
6
5 changes: 4 additions & 1 deletion e2e_test/batch/local_mode.slt
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
statement ok
SET QUERY_MODE TO local;

include ./boolean.slt.part
#include ./boolean.slt.part

statement ok
SET QUERY_MODE TO distributed;
38 changes: 27 additions & 11 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ message CompactTask {
repeated KeyRange splits = 2;
// low watermark in 'ts-aware compaction'
uint64 watermark = 3;
// compacion output, which will be added to [`target_level`] of LSM after compaction
// compaction output, which will be added to [`target_level`] of LSM after compaction
repeated SstableInfo sorted_output_ssts = 4;
// task id assigned by hummock storage service
uint64 task_id = 5;
Expand All @@ -135,18 +135,11 @@ message CompactTask {
bool is_target_ultimate_and_leveling = 7;
CompactMetrics metrics = 8;
bool task_status = 9;
repeated CompactionGroup.PrefixPair prefix_pairs = 10;
// Hash mapping from virtual node to parallel unit. Since one compactor might deal with SSTs
// with data for more than one relational state tables, here a vector is required.
repeated common.ParallelUnitMapping vnode_mappings = 11;
}

message CompactionGroup {
message PrefixPair {
// key value with `prefix` belongs to compaction group `group_id`
uint64 group_id = 1;
bytes prefix = 2;
}
// compaction group the task belongs to
uint64 compaction_group_id = 12;
}

message LevelHandler {
Expand All @@ -160,7 +153,15 @@ message LevelHandler {
}

message CompactStatus {
repeated LevelHandler level_handlers = 1;
uint64 compaction_group_id = 1;
repeated LevelHandler level_handlers = 2;
CompactionConfig compaction_config = 3;
}

message CompactionGroup {
uint64 id = 1;
repeated bytes member_prefixes = 2;
CompactionConfig compaction_config = 3;
}

message CompactTaskAssignment {
Expand Down Expand Up @@ -240,3 +241,18 @@ service HummockManagerService {
}

service CompactorService {}

message CompactionConfig {
enum CompactionMode {
RANGE = 0;
CONSISTENT_HASH = 1;
}
uint64 max_bytes_for_level_base = 1;
uint64 max_level = 2;
uint64 max_bytes_for_level_multiplier = 3;
uint64 max_compaction_bytes = 4;
uint64 min_compaction_bytes = 5;
uint64 level0_tigger_file_numer = 6;
uint64 level0_tier_compact_file_number = 7;
CompactionMode compaction_mode = 8;
}
3 changes: 2 additions & 1 deletion src/batch/src/executor/generate_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ where
}

let arr = builder.finish()?;
let len = arr.len();
let columns = vec![Column::new(Arc::new(arr.into()))];
let chunk: DataChunk = DataChunk::builder().columns(columns).build();
let chunk: DataChunk = DataChunk::new(columns, len);

yield chunk;
}
Expand Down
4 changes: 3 additions & 1 deletion src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,10 @@ impl<K: HashKey + Send + Sync> HashAggExecutor<K> {
.collect::<Result<Vec<_>>>()?;

let mut has_next = false;
let mut array_len = 0;
for (key, states) in result.by_ref().take(cardinality) {
has_next = true;
array_len += 1;
key.deserialize_to_builders(&mut group_builders[..])?;
states
.into_iter()
Expand All @@ -250,7 +252,7 @@ impl<K: HashKey + Send + Sync> HashAggExecutor<K> {
.map(|b| Ok(Column::new(Arc::new(b.finish()?))))
.collect::<Result<Vec<_>>>()?;

let output = DataChunk::builder().columns(columns).build();
let output = DataChunk::new(columns, array_len);
yield output;
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ mod tests {
struct DataChunkMerger {
data_types: Vec<DataType>,
array_builders: Vec<ArrayBuilderImpl>,
array_len: usize,
}

impl DataChunkMerger {
Expand All @@ -414,6 +415,7 @@ mod tests {
Ok(Self {
data_types,
array_builders,
array_len: 0,
})
}

Expand All @@ -422,6 +424,7 @@ mod tests {
for idx in 0..self.array_builders.len() {
self.array_builders[idx].append_array(data_chunk.column_at(idx).array_ref())?;
}
self.array_len += data_chunk.capacity();

Ok(())
}
Expand All @@ -433,7 +436,7 @@ mod tests {
.map(|array_builder| array_builder.finish().map(|arr| Column::new(Arc::new(arr))))
.collect::<Result<Vec<Column>>>()?;

DataChunk::try_from(columns)
Ok(DataChunk::new(columns, self.array_len))
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/batch/src/executor/join/hash_join_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ pub(super) struct ProbeTable<K> {
params: EquiJoinParams,

array_builders: Vec<ArrayBuilderImpl>,
array_len: usize,
}

/// Iterator for joined row ids for one key.
Expand Down Expand Up @@ -216,6 +217,7 @@ impl<K: HashKey> TryFrom<BuildTable> for ProbeTable<K> {
cur_remaining_build_row_id: remaining_build_row_id,
params: build_table.params,
array_builders,
array_len: 0,
})
}
}
Expand Down Expand Up @@ -972,6 +974,7 @@ impl<K: HashKey> ProbeTable<K> {
// The indices before the offset are already appended and dirty.
let offset = self.result_offset;
self.result_offset = self.result_build_index.len();
self.array_len += self.result_offset - offset;
for col_idx in 0..self.params.left_len() {
let builder_idx = col_idx;
for probe_row_id in &self.result_probe_index[offset..] {
Expand Down Expand Up @@ -1021,13 +1024,14 @@ impl<K: HashKey> ProbeTable<K> {
.into_iter()
.map(|builder| builder.finish())
.collect::<Result<Vec<_>>>()?;
let new_len = mem::replace(&mut self.array_len, 0);

let new_columns = new_arrays
.into_iter()
.map(|array| Column::new(Arc::new(array)))
.collect_vec();

let data_chunk = DataChunk::try_from(new_columns)?;
let data_chunk = DataChunk::new(new_columns, new_len);

Ok(data_chunk)
}
Expand Down
19 changes: 7 additions & 12 deletions src/batch/src/executor/join/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::column::Column;
use risingwave_common::array::data_chunk_iter::RowRef;
use risingwave_common::array::{ArrayBuilderImpl, DataChunk, Row};
use risingwave_common::array::{ArrayBuilderImpl, DataChunk, Row, Vis};
use risingwave_common::catalog::Schema;
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::types::{DataType, DatumRef};
Expand Down Expand Up @@ -172,7 +172,7 @@ impl NestedLoopJoinExecutor {
.map(|builder| builder.finish().map(|arr| Column::new(Arc::new(arr))))
.collect::<Result<Vec<Column>>>()?;

Ok(DataChunk::builder().columns(result_columns).build())
Ok(DataChunk::new(result_columns, num_tuples))
}

/// Create constant data chunk (one tuple repeat `num_tuples` times).
Expand Down Expand Up @@ -550,10 +550,10 @@ impl NestedLoopJoinExecutor {
concated_columns.extend_from_slice(left.columns());
concated_columns.extend_from_slice(right.columns());
// Only handle one side is constant row chunk: One of visibility must be None.
let vis = match (left.visibility(), right.visibility()) {
(None, _) => right.visibility().cloned(),
(_, None) => left.visibility().cloned(),
(Some(_), Some(_)) => {
let vis = match (left.vis(), right.vis()) {
(Vis::Compact(_), _) => right.vis().clone(),
(_, Vis::Compact(_)) => left.vis().clone(),
(Vis::Bitmap(_), Vis::Bitmap(_)) => {
return Err(ErrorCode::NotImplemented(
"The concatenate behaviour of two chunk with visibility is undefined"
.to_string(),
Expand All @@ -562,12 +562,7 @@ impl NestedLoopJoinExecutor {
.into())
}
};
let builder = DataChunk::builder().columns(concated_columns);
let data_chunk = if let Some(vis) = vis {
builder.visibility(vis).build()
} else {
builder.build()
};
let data_chunk = DataChunk::new(concated_columns, vis);
Ok(data_chunk)
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/batch/src/executor/merge_sort_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ impl<CS: 'static + CreateSource, C: BatchTaskContext> MergeSortExchangeExecutorI
.create_array_builder(K_PROCESSING_WINDOW_SIZE)
})
.collect::<Result<Vec<ArrayBuilderImpl>>>()?;
let mut array_len = 0;
while want_to_produce > 0 && !self.min_heap.is_empty() {
let top_elem = self.min_heap.pop().unwrap();
let child_idx = top_elem.chunk_idx;
Expand All @@ -158,6 +159,7 @@ impl<CS: 'static + CreateSource, C: BatchTaskContext> MergeSortExchangeExecutorI
builder.append_datum(&datum)?;
}
want_to_produce -= 1;
array_len += 1;
// check whether we have another row from the same chunk being popped
let possible_next_row_idx = cur_chunk.next_visible_row_idx(row_idx + 1);
match possible_next_row_idx {
Expand All @@ -178,7 +180,7 @@ impl<CS: 'static + CreateSource, C: BatchTaskContext> MergeSortExchangeExecutorI
.into_iter()
.map(|builder| Ok(Column::new(Arc::new(builder.finish()?))))
.collect::<Result<Vec<_>>>()?;
let chunk = DataChunk::builder().columns(columns).build();
let chunk = DataChunk::new(columns, array_len);
yield chunk
}
}
Expand Down
14 changes: 10 additions & 4 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{ColumnDesc, Schema, TableId};
use risingwave_common::error::{Result, RwError};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_storage::table::cell_based_table::{CellBasedTable, CellBasedTableRowIter};
use risingwave_storage::table::cell_based_table::{
CellBasedTable, CellTableChunkIter, DedupPkCellBasedTableRowIter,
};
use risingwave_storage::{dispatch_state_store, Keyspace, StateStore, StateStoreImpl};

use crate::executor::monitor::BatchMetrics;
Expand All @@ -35,13 +37,13 @@ pub struct RowSeqScanExecutor<S: StateStore> {
schema: Schema,
identity: String,
stats: Arc<BatchMetrics>,
row_iter: CellBasedTableRowIter<S>,
row_iter: DedupPkCellBasedTableRowIter<S>,
}

impl<S: StateStore> RowSeqScanExecutor<S> {
pub fn new(
schema: Schema,
row_iter: CellBasedTableRowIter<S>,
row_iter: DedupPkCellBasedTableRowIter<S>,
chunk_size: usize,
primary: bool,
identity: String,
Expand Down Expand Up @@ -100,7 +102,11 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
let storage_stats = state_store.stats();
let batch_stats = source.context().stats();
let table = CellBasedTable::new_adhoc(keyspace, column_descs, storage_stats);
let iter = table.iter(source.epoch).await?;

let pk_descs_proto = &seq_scan_node.table_desc.as_ref().unwrap().pk;
let pk_descs = pk_descs_proto.iter().map(|d| d.into()).collect();
let iter = table.iter_with_pk(source.epoch, pk_descs).await?;

Ok(Box::new(RowSeqScanExecutor::new(
table.schema().clone(),
iter,
Expand Down
8 changes: 2 additions & 6 deletions src/batch/src/executor/sort_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ impl SortAggExecutor {
.map(|b| Ok(Column::new(Arc::new(b.finish()?))))
.collect::<Result<Vec<_>>>()?;

let output = DataChunk::builder().columns(columns).build();
let output = DataChunk::new(columns, self.output_size_limit);
yield output;

// reset builders and capactiy to build next output chunk
Expand Down Expand Up @@ -207,11 +207,7 @@ impl SortAggExecutor {
.map(|b| Ok(Column::new(Arc::new(b.finish()?))))
.collect::<Result<Vec<_>>>()?;

let output = match columns.is_empty() {
// Zero group column means SimpleAgg, which always returns 1 row.
true => DataChunk::new_dummy(1),
false => DataChunk::builder().columns(columns).build(),
};
let output = DataChunk::new(columns, self.output_size_limit - left_capacity + 1);

yield output;
}
Expand Down
Loading

0 comments on commit 728f899

Please sign in to comment.