Skip to content

Commit

Permalink
Merge branch 'main' into lwz/selective-agg-frontend
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jul 5, 2022
2 parents 28ae796 + ce7612d commit 37cf174
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 17 deletions.
6 changes: 3 additions & 3 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ message DropMaterializedSourceResponse {
}

// Used by risectl (and in the future, dashboard)
message ListMaterializedViewRequest {}
message RisectlListStateTablesRequest {}

// Used by risectl (and in the future, dashboard)
message ListMaterializedViewResponse {
message RisectlListStateTablesResponse {
repeated catalog.Table tables = 1;
}

Expand All @@ -127,5 +127,5 @@ service DdlService {
rpc DropMaterializedView(DropMaterializedViewRequest) returns (DropMaterializedViewResponse);
rpc CreateMaterializedSource(CreateMaterializedSourceRequest) returns (CreateMaterializedSourceResponse);
rpc DropMaterializedSource(DropMaterializedSourceRequest) returns (DropMaterializedSourceResponse);
rpc ListMaterializedView(ListMaterializedViewRequest) returns (ListMaterializedViewResponse);
rpc RisectlListStateTables(RisectlListStateTablesRequest) returns (RisectlListStateTablesResponse);
}
3 changes: 3 additions & 0 deletions src/ctl/src/cmd_impl/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@

mod scan;
pub use scan::*;

mod list;
pub use list::*;
27 changes: 27 additions & 0 deletions src/ctl/src/cmd_impl/table/list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::Result;

use crate::common::MetaServiceOpts;

pub async fn list() -> Result<()> {
let meta_opts = MetaServiceOpts::from_env()?;
let meta = meta_opts.create_meta_client().await?;
let mvs = meta.risectl_list_state_tables().await?;
for mv in mvs {
println!("#{}: {}", mv.id, mv.name);
}
Ok(())
}
37 changes: 32 additions & 5 deletions src/ctl/src/cmd_impl/table/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,29 @@ use anyhow::{anyhow, Result};
use futures::{pin_mut, StreamExt};
use risingwave_frontend::catalog::TableCatalog;
use risingwave_rpc_client::MetaClient;
use risingwave_storage::hummock::HummockStorage;
use risingwave_storage::monitor::MonitoredStateStore;
use risingwave_storage::table::state_table::StateTable;
use risingwave_storage::table::Distribution;
use risingwave_storage::StateStore;

use crate::common::HummockServiceOpts;

pub async fn get_table_catalog(meta: MetaClient, table_id: String) -> Result<TableCatalog> {
let mvs = meta.list_materialize_view().await?;
pub async fn get_table_catalog(meta: MetaClient, mv_name: String) -> Result<TableCatalog> {
let mvs = meta.risectl_list_state_tables().await?;
let mv = mvs
.iter()
.find(|x| x.name == table_id)
.find(|x| x.name == mv_name)
.ok_or_else(|| anyhow!("mv not found"))?
.clone();
Ok(TableCatalog::from(&mv))
}

pub async fn get_table_catalog_by_id(meta: MetaClient, table_id: u32) -> Result<TableCatalog> {
let mvs = meta.risectl_list_state_tables().await?;
let mv = mvs
.iter()
.find(|x| x.id == table_id)
.ok_or_else(|| anyhow!("mv not found"))?
.clone();
Ok(TableCatalog::from(&mv))
Expand All @@ -53,10 +65,25 @@ pub fn make_state_table<S: StateStore>(hummock: S, table: &TableCatalog) -> Stat
)
}

pub async fn scan(table_id: String) -> Result<()> {
pub async fn scan(mv_name: String) -> Result<()> {
let mut hummock_opts = HummockServiceOpts::from_env()?;
let (meta, hummock) = hummock_opts.create_hummock_store().await?;
let table = get_table_catalog(meta.clone(), table_id).await?;
let table = get_table_catalog(meta.clone(), mv_name).await?;
do_scan(table, hummock, hummock_opts).await
}

pub async fn scan_id(table_id: u32) -> Result<()> {
let mut hummock_opts = HummockServiceOpts::from_env()?;
let (meta, hummock) = hummock_opts.create_hummock_store().await?;
let table = get_table_catalog_by_id(meta.clone(), table_id).await?;
do_scan(table, hummock, hummock_opts).await
}

async fn do_scan(
table: TableCatalog,
hummock: MonitoredStateStore<HummockStorage>,
mut hummock_opts: HummockServiceOpts,
) -> Result<()> {
print_table_catalog(&table);

// We use state table here instead of cell-based table to support iterating with u64::MAX epoch.
Expand Down
13 changes: 12 additions & 1 deletion src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,18 @@ enum HummockCommands {

#[derive(Subcommand)]
enum TableCommands {
/// benchmark state table
/// scan a state table with MV name
Scan {
/// name of the materialized view to operate on
mv_name: String,
},
/// scan a state table using Id
ScanById {
/// id of the state table to operate on
table_id: u32,
},
/// list all state tables
List,
}

pub async fn start(opts: CliOpts) -> Result<()> {
Expand All @@ -105,6 +112,10 @@ pub async fn start(opts: CliOpts) -> Result<()> {
Commands::Table(TableCommands::Scan { mv_name }) => {
tokio::spawn(cmd_impl::table::scan(mv_name)).await??
}
Commands::Table(TableCommands::ScanById { table_id }) => {
tokio::spawn(cmd_impl::table::scan_id(table_id)).await??
}
Commands::Table(TableCommands::List) => tokio::spawn(cmd_impl::table::list()).await??,
Commands::Bench(cmd) => tokio::spawn(cmd_impl::bench::do_bench(cmd)).await??,
}
Ok(())
Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/rpc/service/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,15 +398,15 @@ where
}))
}

async fn list_materialized_view(
async fn risectl_list_state_tables(
&self,
_request: Request<ListMaterializedViewRequest>,
) -> Result<Response<ListMaterializedViewResponse>, Status> {
_request: Request<RisectlListStateTablesRequest>,
) -> Result<Response<RisectlListStateTablesResponse>, Status> {
use crate::model::MetadataModel;
let tables = Table::list(self.env.meta_store())
.await
.map_err(tonic_err)?;
Ok(Response::new(ListMaterializedViewResponse { tables }))
Ok(Response::new(RisectlListStateTablesResponse { tables }))
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,9 @@ impl MetaClient {
(join_handle, shutdown_tx)
}

pub async fn list_materialize_view(&self) -> Result<Vec<ProstTable>> {
let request = ListMaterializedViewRequest {};
let resp = self.inner.list_materialized_view(request).await?;
pub async fn risectl_list_state_tables(&self) -> Result<Vec<ProstTable>> {
let request = RisectlListStateTablesRequest {};
let resp = self.inner.risectl_list_state_tables(request).await?;
Ok(resp.tables)
}

Expand Down Expand Up @@ -563,7 +563,7 @@ macro_rules! for_all_meta_rpc {
,{ ddl_client, drop_source, DropSourceRequest, DropSourceResponse }
,{ ddl_client, drop_database, DropDatabaseRequest, DropDatabaseResponse }
,{ ddl_client, drop_schema, DropSchemaRequest, DropSchemaResponse }
,{ ddl_client, list_materialized_view, ListMaterializedViewRequest, ListMaterializedViewResponse }
,{ ddl_client, risectl_list_state_tables, RisectlListStateTablesRequest, RisectlListStateTablesResponse }
,{ hummock_client, pin_version, PinVersionRequest, PinVersionResponse }
,{ hummock_client, unpin_version, UnpinVersionRequest, UnpinVersionResponse }
,{ hummock_client, pin_snapshot, PinSnapshotRequest, PinSnapshotResponse }
Expand Down

0 comments on commit 37cf174

Please sign in to comment.