Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: apply column-aware row encoding & enable schema change #8394

Merged
merged 14 commits into from
Mar 9, 2023
Merged
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions dashboard/proto/gen/plan_common.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

128 changes: 100 additions & 28 deletions e2e_test/ddl/alter_table_column.slt
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t (v int);

statement ok
create materialized view mv as select * from t;

statement ok
insert into t values (1);

# Errors
statement error column .* already exists
alter table t add column v int;
Expand All @@ -30,57 +36,100 @@ alter table t add column r real;
statement ok
create materialized view mv2 as select * from t;

query IR
query IR rowsort
select v, r from t;
----
1 NULL

query TT
show create table t;
----
public.t CREATE TABLE t (v INT, r REAL)

statement ok
insert into t values (2, 2.2);

query IR rowsort
select v, r from t;
----
1 NULL
2 2.2

statement ok
alter table t add column s varchar;

statement ok
create materialized view mv3 as select * from t;

query IRT
query IRT rowsort
select v, r, s from t;
----
1 NULL NULL
2 2.2 NULL

query TT
show create table t;
----
public.t CREATE TABLE t (v INT, r REAL, s CHARACTER VARYING)

# Insert data
# TODO(#7906): alter after insert.
statement ok
insert into t values (1, 1.1, 'a');
insert into t values (3, 3.3, '3-3');

statement ok
flush;
query IRT rowsort
select v, r, s from t;
----
1 NULL NULL
2 2.2 NULL
3 3.3 3-3

# All materialized views should keep the schema when it's created.
query I
query I rowsort
select * from mv;
----
1
2
3

query IR
query IR rowsort
select * from mv2;
----
1 1.1
1 NULL
2 2.2
3 3.3

query IRT
query IRT rowsort
select * from mv3;
----
1 1.1 a
1 NULL NULL
2 2.2 NULL
3 3.3 3-3

# Clean up
statement ok
drop materialized view mv;
update t set r = 1.1, s = '1-1' where v = 1;

query IRT rowsort
select v, r, s from t where v = 1;
----
1 1.1 1-1

query IR rowsort
select * from mv2;
----
1 1.1
2 2.2
3 3.3

query IRT rowsort
select * from mv3;
----
1 1.1 1-1
2 2.2 NULL
3 3.3 3-3

# Drop column
# TODO(#4529): create mview on partial columns and test whether dropping the unrefereced column works.
statement error being referenced
alter table t drop column s;

statement ok
drop materialized view mv2;
Expand All @@ -89,34 +138,57 @@ statement ok
drop materialized view mv3;

statement ok
drop table t;
alter table t drop column r;

# Drop column
statement ok
create table t (v int, r real);
query TT
show create table t;
----
public.t CREATE TABLE t (v INT, s CHARACTER VARYING)

query IR rowsort
select v, s from t;
----
1 1-1
2 NULL
3 3-3

# Add column after dropping column, to test that the column ID is not reused.
statement ok
alter table t add column s varchar;
alter table t add column r real;

query TT
show create table t;
----
public.t CREATE TABLE t (v INT, r REAL, s CHARACTER VARYING)
public.t CREATE TABLE t (v INT, s CHARACTER VARYING, r REAL)

query ITR rowsort
select v, s, r from t;
----
1 1-1 NULL
2 NULL NULL
3 3-3 NULL

statement ok
alter table t drop column r;
insert into t values (4, '4-4', 4.4);

query TT
show create table t;
query ITR rowsort
select v, s, r from t;
----
public.t CREATE TABLE t (v INT, s CHARACTER VARYING)
1 1-1 NULL
2 NULL NULL
3 3-3 NULL
4 4-4 4.4

# TODO(#4529): create mview on partial columns and test whether dropping the unrefereced column works.
statement ok
create materialized view mv as select * from t;
update t set r = 2.2 where v = 2;

statement error being referenced
alter table t drop column s;
query ITR rowsort
select v, s, r from t;
----
1 1-1 NULL
2 NULL 2.2
3 3-3 NULL
4 4-4 4.4

# Clean up
statement ok
Expand Down
3 changes: 3 additions & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ message StorageTableDesc {
uint32 retention_seconds = 5;
repeated uint32 value_indices = 6;
uint32 read_prefix_len_hint = 7;
// Whether the table is versioned. If `true`, column-aware row encoding will be used
// to be compatible with schema changes.
bool versioned = 8;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that a table contains both basic encoded and column-aware encoded rows?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't consider backward compatibility, then nope. Currently, we follow the version field of TableCatalog to decide the encoding.

}

enum JoinType {
Expand Down
2 changes: 2 additions & 0 deletions src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
.map(|&k| k as usize)
.collect_vec();
let prefix_hint_len = table_desc.get_read_prefix_len_hint() as usize;
let versioned = table_desc.versioned;
dispatch_state_store!(source.context().state_store(), state_store, {
let table = StorageTable::new_partial(
state_store,
Expand All @@ -226,6 +227,7 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
table_option,
value_indices,
prefix_hint_len,
versioned,
);

let inner_side_builder = InnerSideExecutorBuilder::new(
Expand Down
2 changes: 2 additions & 0 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
.map(|&k| k as usize)
.collect_vec();
let prefix_hint_len = table_desc.get_read_prefix_len_hint() as usize;
let versioned = table_desc.versioned;
let scan_ranges = {
let scan_ranges = &seq_scan_node.scan_ranges;
if scan_ranges.is_empty() {
Expand Down Expand Up @@ -263,6 +264,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
table_option,
value_indices,
prefix_hint_len,
versioned,
);
Ok(Box::new(RowSeqScanExecutor::new(
table,
Expand Down
2 changes: 2 additions & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ comfy-table = "6"
crc32fast = "1"
derivative = "2"
easy-ext = "1"
either = "1"
enum-as-inner = "0.5"
fixedbitset = { version = "0.4", features = ["std"] }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = "0.2"
Expand Down
34 changes: 27 additions & 7 deletions src/common/src/array/data_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::types::to_text::ToText;
use crate::types::{DataType, Datum, NaiveDateTimeWrapper, ToOwnedDatum};
use crate::util::hash_util::finalize_hashers;
use crate::util::iter_util::{ZipEqDebug, ZipEqFast};
use crate::util::value_encoding::serialize_datum_into;
use crate::util::value_encoding::{serialize_datum_into, ValueRowSerializer};

/// `DataChunk` is a collection of arrays with visibility mask.
#[derive(Clone, PartialEq)]
Expand Down Expand Up @@ -377,12 +377,14 @@ impl DataChunk {
DataChunk::new(columns, indexes.len())
}

/// Serialize each rows into value encoding bytes.
/// Serialize each row into value encoding bytes.
///
/// the returned vector's size is self.capacity() and for the invisible row will give a empty
/// vec<u8>
/// The returned vector's size is `self.capacity()` and for the invisible row will give a empty
/// bytes.
// Note(bugen): should we exclude the invisible rows in the output so that the caller won't need
// to handle visibility again?
pub fn serialize(&self) -> Vec<Bytes> {
match &self.vis2 {
let buffers = match &self.vis2 {
Vis::Bitmap(vis) => {
let rows_num = vis.len();
let mut buffers = vec![BytesMut::new(); rows_num];
Expand All @@ -398,7 +400,7 @@ impl DataChunk {
}
}
}
buffers.into_iter().map(BytesMut::freeze).collect_vec()
buffers
}
Vis::Compact(rows_num) => {
let mut buffers = vec![BytesMut::new(); *rows_num];
Expand All @@ -412,9 +414,27 @@ impl DataChunk {
}
}
}
buffers.into_iter().map(BytesMut::freeze).collect_vec()
buffers
}
};

buffers.into_iter().map(BytesMut::freeze).collect_vec()
}

/// Serialize each row into bytes with given serializer.
///
/// This is similar to `serialize` but it uses a custom serializer. Prefer `serialize` if
/// possible since it might be more efficient due to columnar operations.
pub fn serialize_with(&self, serializer: &impl ValueRowSerializer) -> Vec<Bytes> {
let mut results = Vec::with_capacity(self.capacity());
for row in self.rows_with_holes() {
results.push(if let Some(row) = row {
serializer.serialize(row).into()
} else {
Bytes::new()
});
}
results
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/common/src/catalog/physical_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ pub struct TableDesc {

/// the column indices which could receive watermarks.
pub watermark_columns: FixedBitSet,

/// Whether the table is versioned. If `true`, column-aware row encoding will be used
/// to be compatible with schema changes.
///
/// See `version` field in `TableCatalog` for more details.
pub versioned: bool,
}

impl TableDesc {
Expand Down Expand Up @@ -78,6 +84,7 @@ impl TableDesc {
retention_seconds: self.retention_seconds,
value_indices: self.value_indices.iter().map(|&v| v as u32).collect(),
read_prefix_len_hint: self.read_prefix_len_hint as u32,
versioned: self.versioned,
}
}

Expand Down
Loading