Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl get_table_info in remote_engine_client #798

Merged
merged 1 commit into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 75 additions & 7 deletions remote_engine_client/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Client for accessing remote table engine

Expand All @@ -18,8 +18,11 @@ use common_types::{
use common_util::error::BoxError;
use futures::{Stream, StreamExt};
use router::RouterRef;
use snafu::{ensure, ResultExt};
use table_engine::remote::model::{ReadRequest, TableIdentifier, WriteRequest};
use snafu::{ensure, OptionExt, ResultExt};
use table_engine::{
remote::model::{GetTableInfoRequest, ReadRequest, TableIdentifier, TableInfo, WriteRequest},
table::{SchemaId, TableId},
};
use tonic::{transport::Channel, Request, Streaming};

use crate::{cached_router::CachedRouter, config::Config, error::*, status_code};
Expand All @@ -46,15 +49,15 @@ impl Client {
let request_pb = ceresdbproto::remote_engine::ReadRequest::try_from(request)
.box_err()
.context(Convert {
msg: "convert ReadRequest to pb",
msg: "Failed to convert ReadRequest to pb",
})?;

let result = rpc_client
.read(Request::new(request_pb))
.await
.with_context(|| Rpc {
table_ident: table_ident.clone(),
msg: "read from remote",
msg: "Failed to read from remote engine",
});

let response = match result {
Expand Down Expand Up @@ -85,7 +88,7 @@ impl Client {
let request_pb = ceresdbproto::remote_engine::WriteRequest::try_from(request)
.box_err()
.context(Convert {
msg: "convert WriteRequest to pb failed",
msg: "Failed to convert WriteRequest to pb",
})?;
let mut rpc_client = RemoteEngineServiceClient::<Channel>::new(channel);

Expand All @@ -94,7 +97,7 @@ impl Client {
.await
.with_context(|| Rpc {
table_ident: table_ident.clone(),
msg: "write to remote failed",
msg: "Failed to write to remote engine",
});

let response = match result {
Expand All @@ -119,6 +122,71 @@ impl Client {
Ok(response.affected_rows as usize)
}
}

pub async fn get_table_info(&self, request: GetTableInfoRequest) -> Result<TableInfo> {
// Find the channel from router firstly.
let channel = self.cached_router.route(&request.table).await?;
let table_ident = request.table.clone();
let request_pb = ceresdbproto::remote_engine::GetTableInfoRequest::try_from(request)
.box_err()
.context(Convert {
msg: "Failed to convert GetTableInfoRequest to pb",
})?;

let mut rpc_client = RemoteEngineServiceClient::<Channel>::new(channel);

let result = rpc_client
.get_table_info(Request::new(request_pb))
.await
.with_context(|| Rpc {
table_ident: table_ident.clone(),
msg: "Failed to get table info",
});

let response = match result {
Ok(response) => response,
Err(e) => {
// If occurred error, we simply evict the corresponding channel now.
// TODO: evict according to the type of error.
self.cached_router.evict(&table_ident).await;
return Err(e);
}
};

let response = response.into_inner();
if let Some(header) = response.header && !status_code::is_ok(header.code) {
Server {
table_ident: table_ident.clone(),
code: header.code,
msg: header.error,
}.fail()
} else {
let table_info = response.table_info.context(Server {
table_ident: table_ident.clone(),
code: status_code::StatusCode::Internal.as_u32(),
msg: "Table info is empty",
})?;

Ok(TableInfo {
catalog_name: table_info.catalog_name,
schema_name: table_info.schema_name,
schema_id: SchemaId::from(table_info.schema_id),
table_name: table_info.table_name,
table_id: TableId::from(table_info.table_id),
table_schema: table_info.table_schema.map(TryInto::try_into).transpose().box_err()
.context(Convert { msg: "Failed to covert table schema" })?
.context(Server {
table_ident,
code: status_code::StatusCode::Internal.as_u32(),
msg: "Table schema is empty",
})?,
engine: table_info.engine,
options: table_info.options,
partition_info: table_info.partition_info.map(TryInto::try_into).transpose().box_err()
.context(Convert { msg: "Failed to covert partition info" })?,
})
}
}
}

pub struct ClientReadRecordBatchStream {
Expand Down
12 changes: 10 additions & 2 deletions remote_engine_client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Remote table engine implementation

Expand All @@ -25,7 +25,7 @@ use snafu::ResultExt;
use table_engine::{
remote::{
self,
model::{ReadRequest, WriteRequest},
model::{GetTableInfoRequest, ReadRequest, TableInfo, WriteRequest},
RemoteEngine,
},
stream::{self, ErrWithSource, RecordBatchStream, SendableRecordBatchStream},
Expand Down Expand Up @@ -121,6 +121,14 @@ impl RemoteEngine for RemoteEngineImpl {
async fn write(&self, request: WriteRequest) -> remote::Result<usize> {
self.0.write(request).await.box_err().context(remote::Write)
}

async fn get_table_info(&self, request: GetTableInfoRequest) -> remote::Result<TableInfo> {
self.0
.get_table_info(request)
.await
.box_err()
.context(remote::GetTableInfo)
}
}

struct RemoteReadRecordBatchStream(ClientReadRecordBatchStream);
Expand Down
12 changes: 10 additions & 2 deletions table_engine/src/remote/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Remote table engine

Expand All @@ -11,7 +11,10 @@ use common_util::{define_result, error::GenericError};
use model::{ReadRequest, WriteRequest};
use snafu::Snafu;

use crate::stream::SendableRecordBatchStream;
use crate::{
remote::model::{GetTableInfoRequest, TableInfo},
stream::SendableRecordBatchStream,
};

#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
Expand All @@ -21,6 +24,9 @@ pub enum Error {

#[snafu(display("Failed to write to remote, err:{}", source))]
Write { source: GenericError },

#[snafu(display("Failed to get table info from remote, err:{}", source))]
GetTableInfo { source: GenericError },
}

define_result!(Error);
Expand All @@ -33,6 +39,8 @@ pub trait RemoteEngine: Send + Sync {

/// Write to the remote engine.
async fn write(&self, request: WriteRequest) -> Result<usize>;

async fn get_table_info(&self, request: GetTableInfoRequest) -> Result<TableInfo>;
}

/// Remote engine reference
Expand Down
31 changes: 30 additions & 1 deletion table_engine/src/remote/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

//! Model for remote table engine

use std::collections::HashMap;

use arrow_ext::{
ipc,
ipc::{CompressOptions, CompressionMethod},
Expand All @@ -13,11 +15,17 @@ use ceresdbproto::{
use common_types::{
record_batch::{RecordBatch, RecordBatchWithKeyBuilder},
row::{RowGroup, RowGroupBuilder},
schema::Schema,
};
use common_util::error::{BoxError, GenericError};
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};

use crate::table::{ReadRequest as TableReadRequest, WriteRequest as TableWriteRequest};
use crate::{
partition::PartitionInfo,
table::{
ReadRequest as TableReadRequest, SchemaId, TableId, WriteRequest as TableWriteRequest,
},
};

#[derive(Debug, Snafu)]
pub enum Error {
Expand Down Expand Up @@ -253,6 +261,27 @@ impl TryFrom<GetTableInfoRequest> for ceresdbproto::remote_engine::GetTableInfoR
}
}

pub struct TableInfo {
/// Catalog name
pub catalog_name: String,
/// Schema name
pub schema_name: String,
/// Schema id
pub schema_id: SchemaId,
/// Table name
pub table_name: String,
/// Table id
pub table_id: TableId,
/// Table schema
pub table_schema: Schema,
/// Table engine type
pub engine: String,
/// Table options
pub options: HashMap<String, String>,
/// Partition Info
pub partition_info: Option<PartitionInfo>,
}

fn build_row_group_from_record_batch(
record_batches: Vec<arrow::record_batch::RecordBatch>,
) -> Result<RowGroup> {
Expand Down