Skip to content

Commit

Permalink
feat: impl getTableInfo in remoteEngine service (#793)
Browse files Browse the repository at this point in the history
* feat: impl getTableInfo in remoteEngine service

* refactor by CR
  • Loading branch information
chunshao90 authored Mar 30, 2023
1 parent c7efe44 commit 37d8746
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 38 deletions.
40 changes: 26 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ bytes = "1.1.0"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
ceresdbproto = "1.0.1"
chrono = "0.4"
clap = "3.0"
clru = "0.6.1"
Expand Down Expand Up @@ -150,6 +149,10 @@ table_engine = { workspace = true }
toml = { workspace = true }
tracing_util = { workspace = true }

[workspace.dependencies.ceresdbproto]
git = "https://github.com/CeresDB/ceresdbproto.git"
rev = "1c3bf4e803ef8b7a1fbc8c3d4a07fdd372e1830b"

[build-dependencies]
vergen = { version = "7", default-features = false, features = ["build", "git", "rustc"] }

Expand Down
2 changes: 1 addition & 1 deletion common_types/src/schema.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Schema of table

Expand Down
3 changes: 2 additions & 1 deletion server/src/grpc/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

// Grpc server metrics

Expand All @@ -24,6 +24,7 @@ make_auto_flush_static_metric! {
pub label_enum RemoteEngineTypeKind {
stream_read,
write,
get_table_info,
}

pub struct RemoteEngineGrpcHandlerDurationHistogramVec: LocalHistogram {
Expand Down
125 changes: 105 additions & 20 deletions server/src/grpc/remote_engine_service/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

// Remote engine rpc service implementation.

use std::{sync::Arc, time::Instant};

use arrow_ext::ipc::{self, CompressOptions, CompressOutput, CompressionMethod};
use async_trait::async_trait;
use catalog::manager::ManagerRef;
use catalog::{manager::ManagerRef, schema::SchemaRef};
use ceresdbproto::{
remote_engine::{
read_response::Output::Arrow, remote_engine_service_server::RemoteEngineService,
ReadRequest, ReadResponse, WriteRequest, WriteResponse,
GetTableInfoRequest, GetTableInfoResponse, ReadRequest, ReadResponse, WriteRequest,
WriteResponse,
},
storage::{arrow_payload, ArrowPayload},
};
Expand All @@ -31,9 +32,7 @@ use tonic::{Request, Response, Status};
use crate::{
grpc::{
metrics::REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC,
remote_engine_service::error::{
build_ok_header, ErrNoCause, ErrWithCause, Result, StatusCode,
},
remote_engine_service::error::{ErrNoCause, ErrWithCause, Result, StatusCode},
},
instance::InstanceRef,
};
Expand Down Expand Up @@ -123,6 +122,39 @@ impl<Q: QueryExecutor + 'static> RemoteEngineServiceImpl<Q> {
Ok(Response::new(resp))
}

async fn get_table_info_internal(
&self,
request: Request<GetTableInfoRequest>,
) -> std::result::Result<Response<GetTableInfoResponse>, Status> {
let begin_instant = Instant::now();
let ctx = self.handler_ctx();
let handle = self.runtimes.read_runtime.spawn(async move {
let request = request.into_inner();
handle_get_table_info(ctx, request).await
});

let res = handle.await.box_err().context(ErrWithCause {
code: StatusCode::Internal,
msg: "fail to join task",
});

let mut resp = GetTableInfoResponse::default();
match res {
Ok(Ok(v)) => {
resp.header = Some(error::build_ok_header());
resp.table_info = v.table_info;
}
Ok(Err(e)) | Err(e) => {
resp.header = Some(error::build_err_header(e));
}
};

REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC
.get_table_info
.observe(begin_instant.saturating_elapsed().as_secs_f64());
Ok(Response::new(resp))
}

fn handler_ctx(&self) -> HandlerContext {
HandlerContext {
catalog_manager: self.instance.catalog_manager.clone(),
Expand Down Expand Up @@ -170,7 +202,7 @@ impl<Q: QueryExecutor + 'static> RemoteEngineService for RemoteEngineServiceImpl
};

ReadResponse {
header: Some(build_ok_header()),
header: Some(error::build_ok_header()),
output: Some(Arrow(ArrowPayload {
record_batches: vec![payload],
compression: compression as i32,
Expand Down Expand Up @@ -209,6 +241,13 @@ impl<Q: QueryExecutor + 'static> RemoteEngineService for RemoteEngineServiceImpl
) -> std::result::Result<Response<WriteResponse>, Status> {
self.write_internal(request).await
}

async fn get_table_info(
&self,
request: Request<GetTableInfoRequest>,
) -> std::result::Result<Response<GetTableInfoResponse>, Status> {
self.get_table_info_internal(request).await
}
}

async fn handle_stream_read(
Expand Down Expand Up @@ -258,10 +297,68 @@ async fn handle_write(ctx: HandlerContext, request: WriteRequest) -> Result<Writ
})
}

async fn handle_get_table_info(
ctx: HandlerContext,
request: GetTableInfoRequest,
) -> Result<GetTableInfoResponse> {
let request: table_engine::remote::model::GetTableInfoRequest =
request.try_into().box_err().context(ErrWithCause {
code: StatusCode::BadRequest,
msg: "fail to convert get table info request",
})?;

let schema = find_schema_by_identifier(&ctx, &request.table)?;
let table = schema
.table_by_name(&request.table.table)
.box_err()
.context(ErrWithCause {
code: StatusCode::Internal,
msg: format!("fail to get table, table:{}", request.table.table),
})?
.context(ErrNoCause {
code: StatusCode::NotFound,
msg: format!("table is not found, table:{}", request.table.table),
})?;

Ok(GetTableInfoResponse {
header: None,
table_info: Some(ceresdbproto::remote_engine::TableInfo {
catalog_name: request.table.catalog,
schema_name: schema.name().to_string(),
schema_id: schema.id().as_u32(),
table_name: table.name().to_string(),
table_id: table.id().as_u64(),
table_schema: Some((&table.schema()).into()),
engine: table.engine_type().to_string(),
options: table.options(),
partition_info: table.partition_info().map(Into::into),
}),
})
}

fn find_table_by_identifier(
ctx: &HandlerContext,
table_identifier: &TableIdentifier,
) -> Result<TableRef> {
let schema = find_schema_by_identifier(ctx, table_identifier)?;

schema
.table_by_name(&table_identifier.table)
.box_err()
.context(ErrWithCause {
code: StatusCode::Internal,
msg: format!("fail to get table, table:{}", table_identifier.table),
})?
.context(ErrNoCause {
code: StatusCode::NotFound,
msg: format!("table is not found, table:{}", table_identifier.table),
})
}

fn find_schema_by_identifier(
ctx: &HandlerContext,
table_identifier: &TableIdentifier,
) -> Result<SchemaRef> {
let catalog = ctx
.catalog_manager
.catalog_by_name(&table_identifier.catalog)
Expand All @@ -274,7 +371,7 @@ fn find_table_by_identifier(
code: StatusCode::NotFound,
msg: format!("catalog is not found, catalog:{}", table_identifier.catalog),
})?;
let schema = catalog
catalog
.schema_by_name(&table_identifier.schema)
.box_err()
.context(ErrWithCause {
Expand All @@ -290,17 +387,5 @@ fn find_table_by_identifier(
"schema of table is not found, schema:{}",
table_identifier.schema
),
})?;

schema
.table_by_name(&table_identifier.table)
.box_err()
.context(ErrWithCause {
code: StatusCode::Internal,
msg: format!("fail to get table, table:{}", table_identifier.table),
})?
.context(ErrNoCause {
code: StatusCode::NotFound,
msg: format!("table is not found, table:{}", table_identifier.table),
})
}
Loading

0 comments on commit 37d8746

Please sign in to comment.