Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): support schema change in actor graph builder #7878

Merged
merged 5 commits into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ message UpdateMutation {
// Merge executor can be uniquely identified by a combination of actor id and upstream fragment id.
uint32 actor_id = 1;
uint32 upstream_fragment_id = 2;
// - For scaling, this is always `None`.
// - For plan change, the upstream fragment will be changed to a new one, and this will be `Some`.
// In this case, all the upstream actors should be removed and replaced by the `new` ones.
optional uint32 new_upstream_fragment_id = 5;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may reuse the Update barrier of scaling for schema change.

// Added upstream actors.
repeated uint32 added_upstream_actor_id = 3;
// Removed upstream actors.
Expand Down Expand Up @@ -561,10 +565,7 @@ message Dispatcher {
// For dispatcher types other than HASH, this is ignored.
ActorMapping hash_mapping = 3;
// Dispatcher can be uniquely identified by a combination of actor id and dispatcher id.
// - For dispatchers within actors, the id is the same as its downstream fragment id.
// We can't use the exchange operator id directly as the dispatch id, because an exchange
// could belong to more than one downstream in DAG.
// - For MV on MV, the id is the same as the actor id of chain node in the downstream MV.
// This is exactly the same as its downstream fragment id.
uint64 dispatcher_id = 4;
// Number of downstreams decides how many endpoints a dispatcher should dispatch.
repeated uint32 downstream_actor_id = 5;
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ pub trait CatalogWriter: Send + Sync {
graph: StreamFragmentGraph,
) -> Result<()>;

async fn replace_table(&self, table: ProstTable, graph: StreamFragmentGraph) -> Result<()>;

async fn create_index(
&self,
index: ProstIndex,
Expand Down Expand Up @@ -186,6 +188,11 @@ impl CatalogWriter for CatalogWriterImpl {
self.wait_version(version).await
}

async fn replace_table(&self, table: ProstTable, graph: StreamFragmentGraph) -> Result<()> {
let version = self.meta_client.replace_table(table, graph).await?;
self.wait_version(version).await
}

async fn create_source(&self, source: ProstSource) -> Result<()> {
let (_id, version) = self.meta_client.create_source(source).await?;
self.wait_version(version).await
Expand Down
33 changes: 25 additions & 8 deletions src/frontend/src/handler/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
use anyhow::Context;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_pb::catalog::Table;
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_pb::stream_plan::StreamFragmentGraph;
use risingwave_sqlparser::ast::{ColumnDef, ObjectName, Statement};
use risingwave_sqlparser::parser::Parser;

Expand Down Expand Up @@ -93,35 +95,50 @@ pub async fn handle_add_column(
panic!("unexpected statement type: {:?}", definition);
};

let (graph, source, table) = {
let (graph, table) = {
let context = OptimizerContext::from_handler_args(handler_args);
let (plan, source, table) =
gen_create_table_plan(context, table_name, columns, constraints, col_id_gen)?;

// We should already have rejected the case where the table has a connector.
assert!(source.is_none());

// TODO: avoid this backward conversion.
if TableCatalog::from(&table).pk_column_ids() != original_catalog.pk_column_ids() {
Err(ErrorCode::InvalidInputSyntax(
"alter primary key of table is not supported".to_owned(),
))?
}

let mut graph = build_graph(plan);
graph.parallelism = session
.config()
.get_streaming_parallelism()
.map(|parallelism| Parallelism { parallelism });
(graph, source, table)
let graph = StreamFragmentGraph {
parallelism: session
.config()
.get_streaming_parallelism()
.map(|parallelism| Parallelism { parallelism }),
..build_graph(plan)
};

// Fill the original table ID.
let table = Table {
id: original_catalog.id().table_id(),
..table
};

(graph, table)
};

// TODO: for test purpose only, we drop the original table and create a new one. This is wrong
// and really dangerous in production.
if cfg!(debug_assertions) {
let catalog_writer = session.env().catalog_writer();

// TODO: call replace_table RPC
// catalog_writer.replace_table(table, graph).await?;

catalog_writer
.drop_table(None, original_catalog.id())
.await?;
catalog_writer.create_table(source, table, graph).await?;
catalog_writer.create_table(None, table, graph).await?;

Ok(PgResponse::empty_result_with_notice(
StatementType::ALTER_TABLE,
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ impl CatalogWriter for MockCatalogWriter {
Ok(())
}

async fn replace_table(&self, table: ProstTable, _graph: StreamFragmentGraph) -> Result<()> {
self.catalog.write().update_table(&table);
Ok(())
}

async fn create_source(&self, source: ProstSource) -> Result<()> {
self.create_source_inner(source).map(|_| ())
}
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ where
MergeUpdate {
actor_id,
upstream_fragment_id: fragment_id,
new_upstream_fragment_id: None,
added_upstream_actor_id: reschedule.added_actors.clone(),
removed_upstream_actor_id: reschedule
.removed_actors
Expand Down
75 changes: 58 additions & 17 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ where
Ok(map
.get(table_id)
.cloned()
.context(format!("table_fragment not exist: id={}", table_id))?)
.with_context(|| format!("table_fragment not exist: id={}", table_id))?)
}

pub async fn select_table_fragments_by_ids(
Expand All @@ -187,7 +187,7 @@ where
table_fragments.push(
map.get(table_id)
.cloned()
.context(format!("table_fragment not exist: id={}", table_id))?,
.with_context(|| format!("table_fragment not exist: id={}", table_id))?,
);
}
Ok(table_fragments)
Expand Down Expand Up @@ -228,7 +228,7 @@ where
let mut table_fragments = BTreeMapTransaction::new(map);
let mut table_fragment = table_fragments
.get_mut(*table_id)
.context(format!("table_fragment not exist: id={}", table_id))?;
.with_context(|| format!("table_fragment not exist: id={}", table_id))?;

assert_eq!(table_fragment.state(), State::Initial);
table_fragment.set_state(State::Creating);
Expand All @@ -240,10 +240,12 @@ where
let mut dependent_table =
table_fragments
.get_mut(dependent_table_id)
.context(format!(
"dependent table_fragment not exist: id={}",
dependent_table_id
))?;
.with_context(|| {
format!(
"dependent table_fragment not exist: id={}",
dependent_table_id
)
})?;
for fragment in dependent_table.fragments.values_mut() {
for actor in &mut fragment.actors {
// Extend new dispatchers to table fragments.
Expand All @@ -268,7 +270,7 @@ where
let mut table_fragments = BTreeMapTransaction::new(map);
let mut table_fragment = table_fragments
.get_mut(table_id)
.context(format!("table_fragment not exist: id={}", table_id))?;
.with_context(|| format!("table_fragment not exist: id={}", table_id))?;

assert_eq!(table_fragment.state(), State::Creating);
table_fragment.set_state(State::Created);
Expand All @@ -293,13 +295,14 @@ where
if table_ids.contains(&dependent_table_id) {
continue;
}
let mut dependent_table =
table_fragments
.get_mut(dependent_table_id)
.context(format!(
let mut dependent_table = table_fragments
.get_mut(dependent_table_id)
.with_context(|| {
format!(
"dependent table_fragment not exist: id={}",
dependent_table_id
))?;
)
})?;

dependent_table
.fragments
Expand Down Expand Up @@ -814,27 +817,65 @@ where
let map = &self.core.read().await.table_fragments;
Ok(map
.get(table_id)
.context(format!("table_fragment not exist: id={}", table_id))?
.with_context(|| format!("table_fragment not exist: id={}", table_id))?
.mview_actor_ids())
}

/// Get the upstream `Materialize` fragments of the specified tables.
pub async fn get_upstream_mview_fragments(
&self,
table_ids: &HashSet<TableId>,
upstream_table_ids: &HashSet<TableId>,
) -> MetaResult<HashMap<TableId, Fragment>> {
let map = &self.core.read().await.table_fragments;
let mut fragments = HashMap::new();

for &table_id in table_ids {
for &table_id in upstream_table_ids {
let table_fragments = map
.get(&table_id)
.context(format!("table_fragment not exist: id={}", table_id))?;
.with_context(|| format!("table_fragment not exist: id={}", table_id))?;
if let Some(fragment) = table_fragments.mview_fragment() {
fragments.insert(table_id, fragment);
}
}

Ok(fragments)
}

/// Get the downstream `Chain` fragments of the specified table.
pub async fn get_downstream_chain_fragments(
&self,
table_id: TableId,
) -> MetaResult<Vec<Fragment>> {
let map = &self.core.read().await.table_fragments;

let table_fragments = map
.get(&table_id)
.with_context(|| format!("table_fragment not exist: id={}", table_id))?;

let mview_fragment = table_fragments.mview_fragment().unwrap();
let downstream_fragment_ids: HashSet<_> = mview_fragment.actors[0]
.dispatcher
.iter()
.map(|d| d.dispatcher_id as FragmentId)
.collect();

// Find the fragments based on the fragment ids.
let fragments = map
.values()
.flat_map(|table_fragments| {
table_fragments
.fragments
.values()
.filter(|fragment| downstream_fragment_ids.contains(&fragment.fragment_id))
.inspect(|f| {
assert!((f.fragment_type_mask & FragmentTypeFlag::ChainNode as u32) != 0)
})
})
.cloned()
.collect_vec();

assert_eq!(downstream_fragment_ids.len(), fragments.len());

Ok(fragments)
}
}
2 changes: 1 addition & 1 deletion src/meta/src/manager/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::model::FragmentId;

// This enum is used in order to re-use code in `DdlServiceImpl` for creating MaterializedView and
// Sink.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum StreamingJob {
MaterializedView(Table),
Sink(Sink),
Expand Down
Loading