Skip to content

Commit

Permalink
feat: add graph check in risectl for sql meta store (#19764)
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <peng@risingwave-labs.com>
  • Loading branch information
shanicky authored and shanicky committed Jan 8, 2025
1 parent beca9f1 commit 0ec0912
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 16 deletions.
2 changes: 2 additions & 0 deletions src/ctl/src/cmd_impl/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
// limitations under the License.

mod backup_meta;
mod check;
mod cluster_info;
mod connection;
mod pause_resume;
mod reschedule;
mod serving;

pub use backup_meta::*;
pub use check::*;
pub use cluster_info::*;
pub use connection::*;
pub use pause_resume::*;
Expand Down
33 changes: 33 additions & 0 deletions src/ctl/src/cmd_impl/meta/check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::process::exit;

use risingwave_meta::controller::catalog::CatalogController;
use sea_orm::TransactionTrait;

pub async fn graph_check(endpoint: String) -> anyhow::Result<()> {
let conn = sea_orm::Database::connect(sea_orm::ConnectOptions::new(endpoint)).await?;
let txn = conn.begin().await?;
match CatalogController::graph_check(&txn).await {
Ok(_) => {
println!("all integrity check passed!");
exit(0);
}
Err(_) => {
println!("integrity check failed!");
exit(1);
}
}
}
11 changes: 11 additions & 0 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,14 @@ enum MetaCommands {
#[clap(long)]
props: String,
},

/// Performing graph check for scaling.
#[clap(verbatim_doc_comment)]
GraphCheck {
/// SQL endpoint
#[clap(long, required = true)]
endpoint: String,
},
}

#[derive(Subcommand, Clone, Debug)]
Expand Down Expand Up @@ -776,6 +784,9 @@ async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
Commands::Meta(MetaCommands::ValidateSource { props }) => {
cmd_impl::meta::validate_source(context, props).await?
}
Commands::Meta(MetaCommands::GraphCheck { endpoint }) => {
cmd_impl::meta::graph_check(endpoint).await?
}
Commands::AwaitTree => cmd_impl::await_tree::dump(context).await?,
Commands::Profile(ProfileCommands::Cpu { sleep }) => {
cmd_impl::profile::cpu_profile(context, sleep).await?
Expand Down
35 changes: 19 additions & 16 deletions src/meta/src/controller/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,14 @@ impl CatalogController {
pub async fn integrity_check(&self) -> MetaResult<()> {
let inner = self.inner.read().await;
let txn = inner.db.begin().await?;
Self::graph_check(&txn).await
}

// Perform integrity checks on the Actor, ActorDispatcher and Fragment tables.
pub async fn graph_check<C>(txn: &C) -> MetaResult<()>
where
C: ConnectionTrait,
{
#[derive(Clone, DerivePartialModel, FromQueryResult)]
#[sea_orm(entity = "ActorDispatcher")]
pub struct PartialActorDispatcher {
Expand Down Expand Up @@ -487,14 +494,14 @@ impl CatalogController {
let mut flag = false;

let fragments: Vec<PartialFragment> =
Fragment::find().into_partial_model().all(&txn).await?;
Fragment::find().into_partial_model().all(txn).await?;

let fragment_map: HashMap<_, _> = fragments
.into_iter()
.map(|fragment| (fragment.fragment_id, fragment))
.collect();

let actors: Vec<PartialActor> = Actor::find().into_partial_model().all(&txn).await?;
let actors: Vec<PartialActor> = Actor::find().into_partial_model().all(txn).await?;

let mut fragment_actors = HashMap::new();
for actor in &actors {
Expand All @@ -511,7 +518,7 @@ impl CatalogController {

let actor_dispatchers: Vec<PartialActorDispatcher> = ActorDispatcher::find()
.into_partial_model()
.all(&txn)
.all(txn)
.await?;

let mut discovered_upstream_fragments = HashMap::new();
Expand Down Expand Up @@ -640,10 +647,7 @@ impl CatalogController {
crit_check_in_loop!(
flag,
actor_map.contains_key(actor_id),
format!(
"ActorDispatcher {} has actor_id {} which does not exist",
id, actor_id
)
format!("ActorDispatcher {id} has actor_id {actor_id} which does not exist",)
);

let actor = &actor_map[actor_id];
Expand All @@ -652,8 +656,7 @@ impl CatalogController {
flag,
fragment_map.contains_key(dispatcher_id),
format!(
"ActorDispatcher {} has dispatcher_id {} which does not exist",
id, dispatcher_id
"ActorDispatcher {id} has dispatcher_id {dispatcher_id} which does not exist",
)
);

Expand All @@ -677,8 +680,8 @@ impl CatalogController {
flag,
fragment_actors.contains_key(dispatcher_id),
format!(
"ActorDispatcher {id} has downstream fragment {dispatcher_id} which has no actors",
)
"ActorDispatcher {id} has downstream fragment {dispatcher_id} which has no actors",
)
);

let dispatcher_downstream_actor_ids: HashSet<_> =
Expand Down Expand Up @@ -718,7 +721,7 @@ impl CatalogController {
flag,
&dispatcher_downstream_actor_ids == target_fragment_actor_ids,
format!(
"ActorDispatcher {id} has downstream fragment {dispatcher_id} which has different actors: {dispatcher_downstream_actor_ids:?} != {target_fragment_actor_ids:?}",
"ActorDispatcher {id} has downstream fragment {dispatcher_id}, but dispatcher downstream actor ids: {dispatcher_downstream_actor_ids:?} != target fragment actor ids: {target_fragment_actor_ids:?}",
)
);
}
Expand Down Expand Up @@ -771,7 +774,7 @@ impl CatalogController {
flag,
&mapping_actors == target_fragment_actor_ids,
format!(
"ActorDispatcher {id} has downstream fragment {dispatcher_id} which has different actors: {mapping_actors:?} != {target_fragment_actor_ids:?}",
"ActorDispatcher {id} has downstream fragment {dispatcher_id}, but dispatcher mapping actor ids {mapping_actors:?} != target fragment actor ids: {target_fragment_actor_ids:?}",
)
);

Expand All @@ -797,7 +800,7 @@ impl CatalogController {
flag,
mapping.to_bitmaps() == downstream_bitmaps,
format!(
"ActorDispatcher {id} has hash downstream fragment {dispatcher_id} which has different bitmaps: {mapping:?} != {downstream_bitmaps:?}"
"ActorDispatcher {id} has hash downstream fragment {dispatcher_id}, but dispatcher mapping {mapping:?} != discovered downstream actor bitmaps: {downstream_bitmaps:?}"
)
);
}
Expand Down Expand Up @@ -868,7 +871,7 @@ impl CatalogController {
flag,
discovered_upstream_fragment_ids == upstream_fragment_ids,
format!(
"Fragment {fragment_id} has different upstream_fragment_ids from discovered: {discovered_upstream_fragment_ids:?} != {upstream_fragment_ids:?}",
"Fragment {fragment_id} has different upstream_fragment_ids from discovered: {discovered_upstream_fragment_ids:?} != fragment upstream fragment ids: {upstream_fragment_ids:?}",
)
);
}
Expand Down Expand Up @@ -901,7 +904,7 @@ impl CatalogController {
flag,
discovered_upstream_actor_ids == upstream_actor_ids,
format!(
"Actor {actor_id} has different upstream_actor_ids from discovered: {discovered_upstream_actor_ids:?} != {upstream_actor_ids:?}",
"Actor {actor_id} has different upstream_actor_ids from discovered: {discovered_upstream_actor_ids:?} != actor upstream actor ids: {upstream_actor_ids:?}",
)
)
}
Expand Down

0 comments on commit 0ec0912

Please sign in to comment.