Skip to content

Commit

Permalink
feat: fill correct table version ID for DML (#8120)
Browse files Browse the repository at this point in the history
This PR implements versioning for DML statements.

- Frontend: use the correct version ID extracted from the catalog to fill the fields of batch DML plan node protos.
- Connector: do sanity check on the table schema when registering with the same version.
- Meta: fill the `version` field of streaming DML plan node proto when visiting the fragment graph.

Approved-By: chenzl25
Approved-By: st1page
Approved-By: xx01cyx
  • Loading branch information
BugenZhao authored Feb 23, 2023
1 parent 88dc35e commit 5c050ef
Show file tree
Hide file tree
Showing 19 changed files with 138 additions and 29 deletions.
7 changes: 6 additions & 1 deletion src/frontend/src/binder/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::Schema;
use risingwave_common::catalog::{Schema, TableVersionId};
use risingwave_common::error::Result;
use risingwave_sqlparser::ast::{Expr, ObjectName, SelectItem};

Expand All @@ -26,6 +26,9 @@ pub struct BoundDelete {
/// Id of the table to perform deleting.
pub table_id: TableId,

/// Version id of the table.
pub table_version_id: TableVersionId,

/// Name of the table to perform deleting.
pub table_name: String,

Expand Down Expand Up @@ -58,12 +61,14 @@ impl Binder {
let table_catalog = self.resolve_dml_table(schema_name, &table_name, false)?;
let table_id = table_catalog.id;
let owner = table_catalog.owner;
let table_version_id = table_catalog.version_id().expect("table must be versioned");

let table = self.bind_table(schema_name, &table_name, None)?;
let (returning_list, fields) = self.bind_returning_list(returning_items)?;
let returning = !returning_list.is_empty();
let delete = BoundDelete {
table_id,
table_version_id,
table_name,
owner,
table,
Expand Down
7 changes: 6 additions & 1 deletion src/frontend/src/binder/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashSet;

use itertools::Itertools;
use risingwave_common::catalog::Schema;
use risingwave_common::catalog::{Schema, TableVersionId};
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
Expand All @@ -32,6 +32,9 @@ pub struct BoundInsert {
/// Id of the table to perform inserting.
pub table_id: TableId,

/// Version id of the table.
pub table_version_id: TableVersionId,

/// Name of the table to perform inserting.
pub table_name: String,

Expand Down Expand Up @@ -77,6 +80,7 @@ impl Binder {
let table_catalog = self.resolve_dml_table(schema_name.as_deref(), &table_name, true)?;
let table_id = table_catalog.id;
let owner = table_catalog.owner;
let table_version_id = table_catalog.version_id().expect("table must be versioned");
let columns_to_insert = table_catalog
.columns
.clone()
Expand Down Expand Up @@ -220,6 +224,7 @@ impl Binder {

let insert = BoundInsert {
table_id,
table_version_id,
table_name,
owner,
row_id_index,
Expand Down
7 changes: 6 additions & 1 deletion src/frontend/src/binder/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;

use itertools::Itertools;
use risingwave_common::catalog::Schema;
use risingwave_common::catalog::{Schema, TableVersionId};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_sqlparser::ast::{Assignment, Expr, ObjectName, SelectItem};
Expand All @@ -31,6 +31,9 @@ pub struct BoundUpdate {
/// Id of the table to perform updating.
pub table_id: TableId,

/// Version id of the table.
pub table_version_id: TableVersionId,

/// Name of the table to perform updating.
pub table_name: String,

Expand Down Expand Up @@ -68,6 +71,7 @@ impl Binder {
let table_catalog = self.resolve_dml_table(schema_name.as_deref(), &table_name, false)?;
let table_id = table_catalog.id;
let owner = table_catalog.owner;
let table_version_id = table_catalog.version_id().expect("table must be versioned");

let table = self.bind_relation_by_name(name, None)?;

Expand Down Expand Up @@ -132,6 +136,7 @@ impl Binder {

Ok(BoundUpdate {
table_id,
table_version_id,
table_name,
owner,
table,
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@ impl TableCatalog {
self.version.as_ref()
}

/// Get the table's version id. Returns `None` if the table has no version field.
pub fn version_id(&self) -> Option<TableVersionId> {
self.version().map(|v| v.version_id)
}

pub fn to_prost(&self, schema_id: SchemaId, database_id: DatabaseId) -> ProstTable {
ProstTable {
id: self.id.table_id,
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/handler/create_table_as.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ pub async fn handle_create_as(
}
}

let mut col_id_gen = ColumnIdGenerator::new_initial();

// Generate catalog descs from query
let mut column_descs: Vec<_> = {
let mut binder = Binder::new(&session);
let bound = binder.bind(Statement::Query(query.clone()))?;
if let BoundStatement::Query(query) = bound {
let mut col_id_gen = ColumnIdGenerator::new_initial();

// Create ColumnCatelog by Field
query
.schema()
Expand Down Expand Up @@ -93,7 +93,7 @@ pub async fn handle_create_as(
None,
vec![],
"".to_owned(), // TODO: support `SHOW CREATE TABLE` for `CREATE TABLE AS`
None, // TODO: support `ALTER TABLE` for `CREATE TABLE AS`
Some(col_id_gen.into_version()),
)?;
let mut graph = build_graph(plan);
graph.parallelism = session
Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/optimizer/plan_node/batch_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::fmt;

use risingwave_common::catalog::INITIAL_TABLE_VERSION_ID;
use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::DeleteNode;
Expand Down Expand Up @@ -76,7 +75,7 @@ impl ToBatchProst for BatchDelete {
fn to_batch_prost_body(&self) -> NodeBody {
NodeBody::Delete(DeleteNode {
table_id: self.logical.table_id().table_id(),
table_version_id: INITIAL_TABLE_VERSION_ID, // TODO: use correct version id
table_version_id: self.logical.table_version_id(),
returning: self.logical.has_returning(),
})
}
Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/optimizer/plan_node/batch_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::fmt;

use risingwave_common::catalog::INITIAL_TABLE_VERSION_ID;
use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::InsertNode;
Expand Down Expand Up @@ -83,7 +82,7 @@ impl ToBatchProst for BatchInsert {
.collect();
NodeBody::Insert(InsertNode {
table_id: self.logical.table_id().table_id(),
table_version_id: INITIAL_TABLE_VERSION_ID, // TODO: use correct version id
table_version_id: self.logical.table_version_id(),
column_indices,
row_id_index: self
.logical
Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/optimizer/plan_node/batch_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::fmt;

use risingwave_common::catalog::INITIAL_TABLE_VERSION_ID;
use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::UpdateNode;
Expand Down Expand Up @@ -85,7 +84,7 @@ impl ToBatchProst for BatchUpdate {
NodeBody::Update(UpdateNode {
exprs,
table_id: self.logical.table_id().table_id(),
table_version_id: INITIAL_TABLE_VERSION_ID, // TODO: use correct version id
table_version_id: self.logical.table_version_id(),
returning: self.logical.has_returning(),
})
}
Expand Down
26 changes: 23 additions & 3 deletions src/frontend/src/optimizer/plan_node/logical_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::{fmt, vec};

use risingwave_common::catalog::{Field, Schema};
use risingwave_common::catalog::{Field, Schema, TableVersionId};
use risingwave_common::error::Result;
use risingwave_common::types::DataType;

Expand All @@ -37,13 +37,20 @@ pub struct LogicalDelete {
pub base: PlanBase,
table_name: String, // explain-only
table_id: TableId,
table_version_id: TableVersionId,
input: PlanRef,
returning: bool,
}

impl LogicalDelete {
/// Create a [`LogicalDelete`] node. Used internally by optimizer.
pub fn new(input: PlanRef, table_name: String, table_id: TableId, returning: bool) -> Self {
pub fn new(
input: PlanRef,
table_name: String,
table_id: TableId,
table_version_id: TableVersionId,
returning: bool,
) -> Self {
let ctx = input.ctx();
let schema = if returning {
input.schema().clone()
Expand All @@ -56,6 +63,7 @@ impl LogicalDelete {
base,
table_name,
table_id,
table_version_id,
input,
returning,
}
Expand All @@ -66,9 +74,16 @@ impl LogicalDelete {
input: PlanRef,
table_name: String,
table_id: TableId,
table_version_id: TableVersionId,
returning: bool,
) -> Result<Self> {
Ok(Self::new(input, table_name, table_id, returning))
Ok(Self::new(
input,
table_name,
table_id,
table_version_id,
returning,
))
}

pub(super) fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result {
Expand All @@ -93,6 +108,10 @@ impl LogicalDelete {
pub fn has_returning(&self) -> bool {
self.returning
}

pub fn table_version_id(&self) -> TableVersionId {
self.table_version_id
}
}

impl PlanTreeNodeUnary for LogicalDelete {
Expand All @@ -105,6 +124,7 @@ impl PlanTreeNodeUnary for LogicalDelete {
input,
self.table_name.clone(),
self.table_id,
self.table_version_id,
self.returning,
)
}
Expand Down
12 changes: 11 additions & 1 deletion src/frontend/src/optimizer/plan_node/logical_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::fmt;

use risingwave_common::catalog::{Field, Schema};
use risingwave_common::catalog::{Field, Schema, TableVersionId};
use risingwave_common::error::Result;
use risingwave_common::types::DataType;

Expand All @@ -38,6 +38,7 @@ pub struct LogicalInsert {
pub base: PlanBase,
table_name: String, // explain-only
table_id: TableId,
table_version_id: TableVersionId,
input: PlanRef,
column_indices: Vec<usize>, // columns in which to insert
row_id_index: Option<usize>,
Expand All @@ -50,6 +51,7 @@ impl LogicalInsert {
input: PlanRef,
table_name: String,
table_id: TableId,
table_version_id: TableVersionId,
column_indices: Vec<usize>,
row_id_index: Option<usize>,
returning: bool,
Expand All @@ -66,6 +68,7 @@ impl LogicalInsert {
base,
table_name,
table_id,
table_version_id,
input,
column_indices,
row_id_index,
Expand All @@ -78,6 +81,7 @@ impl LogicalInsert {
input: PlanRef,
table_name: String,
table_id: TableId,
table_version_id: TableVersionId,
column_indices: Vec<usize>,
row_id_index: Option<usize>,
returning: bool,
Expand All @@ -86,6 +90,7 @@ impl LogicalInsert {
input,
table_name,
table_id,
table_version_id,
column_indices,
row_id_index,
returning,
Expand Down Expand Up @@ -125,6 +130,10 @@ impl LogicalInsert {
pub fn has_returning(&self) -> bool {
self.returning
}

pub fn table_version_id(&self) -> TableVersionId {
self.table_version_id
}
}

impl PlanTreeNodeUnary for LogicalInsert {
Expand All @@ -137,6 +146,7 @@ impl PlanTreeNodeUnary for LogicalInsert {
input,
self.table_name.clone(),
self.table_id,
self.table_version_id,
self.column_indices.clone(),
self.row_id_index,
self.returning,
Expand Down
Loading

0 comments on commit 5c050ef

Please sign in to comment.