Skip to content

Commit

Permalink
feat: impl get_table_info in remote_engine_client (apache#798)
Browse files Browse the repository at this point in the history
  • Loading branch information
chunshao90 authored Mar 31, 2023
1 parent 2c997f9 commit e01e891
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 12 deletions.
82 changes: 75 additions & 7 deletions remote_engine_client/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Client for accessing remote table engine

Expand All @@ -18,8 +18,11 @@ use common_types::{
use common_util::error::BoxError;
use futures::{Stream, StreamExt};
use router::RouterRef;
use snafu::{ensure, ResultExt};
use table_engine::remote::model::{ReadRequest, TableIdentifier, WriteRequest};
use snafu::{ensure, OptionExt, ResultExt};
use table_engine::{
remote::model::{GetTableInfoRequest, ReadRequest, TableIdentifier, TableInfo, WriteRequest},
table::{SchemaId, TableId},
};
use tonic::{transport::Channel, Request, Streaming};

use crate::{cached_router::CachedRouter, config::Config, error::*, status_code};
Expand All @@ -46,15 +49,15 @@ impl Client {
let request_pb = ceresdbproto::remote_engine::ReadRequest::try_from(request)
.box_err()
.context(Convert {
msg: "convert ReadRequest to pb",
msg: "Failed to convert ReadRequest to pb",
})?;

let result = rpc_client
.read(Request::new(request_pb))
.await
.with_context(|| Rpc {
table_ident: table_ident.clone(),
msg: "read from remote",
msg: "Failed to read from remote engine",
});

let response = match result {
Expand Down Expand Up @@ -85,7 +88,7 @@ impl Client {
let request_pb = ceresdbproto::remote_engine::WriteRequest::try_from(request)
.box_err()
.context(Convert {
msg: "convert WriteRequest to pb failed",
msg: "Failed to convert WriteRequest to pb",
})?;
let mut rpc_client = RemoteEngineServiceClient::<Channel>::new(channel);

Expand All @@ -94,7 +97,7 @@ impl Client {
.await
.with_context(|| Rpc {
table_ident: table_ident.clone(),
msg: "write to remote failed",
msg: "Failed to write to remote engine",
});

let response = match result {
Expand All @@ -119,6 +122,71 @@ impl Client {
Ok(response.affected_rows as usize)
}
}

pub async fn get_table_info(&self, request: GetTableInfoRequest) -> Result<TableInfo> {
// Find the channel from router firstly.
let channel = self.cached_router.route(&request.table).await?;
let table_ident = request.table.clone();
let request_pb = ceresdbproto::remote_engine::GetTableInfoRequest::try_from(request)
.box_err()
.context(Convert {
msg: "Failed to convert GetTableInfoRequest to pb",
})?;

let mut rpc_client = RemoteEngineServiceClient::<Channel>::new(channel);

let result = rpc_client
.get_table_info(Request::new(request_pb))
.await
.with_context(|| Rpc {
table_ident: table_ident.clone(),
msg: "Failed to get table info",
});

let response = match result {
Ok(response) => response,
Err(e) => {
// If occurred error, we simply evict the corresponding channel now.
// TODO: evict according to the type of error.
self.cached_router.evict(&table_ident).await;
return Err(e);
}
};

let response = response.into_inner();
if let Some(header) = response.header && !status_code::is_ok(header.code) {
Server {
table_ident: table_ident.clone(),
code: header.code,
msg: header.error,
}.fail()
} else {
let table_info = response.table_info.context(Server {
table_ident: table_ident.clone(),
code: status_code::StatusCode::Internal.as_u32(),
msg: "Table info is empty",
})?;

Ok(TableInfo {
catalog_name: table_info.catalog_name,
schema_name: table_info.schema_name,
schema_id: SchemaId::from(table_info.schema_id),
table_name: table_info.table_name,
table_id: TableId::from(table_info.table_id),
table_schema: table_info.table_schema.map(TryInto::try_into).transpose().box_err()
.context(Convert { msg: "Failed to covert table schema" })?
.context(Server {
table_ident,
code: status_code::StatusCode::Internal.as_u32(),
msg: "Table schema is empty",
})?,
engine: table_info.engine,
options: table_info.options,
partition_info: table_info.partition_info.map(TryInto::try_into).transpose().box_err()
.context(Convert { msg: "Failed to covert partition info" })?,
})
}
}
}

pub struct ClientReadRecordBatchStream {
Expand Down
12 changes: 10 additions & 2 deletions remote_engine_client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Remote table engine implementation

Expand All @@ -25,7 +25,7 @@ use snafu::ResultExt;
use table_engine::{
remote::{
self,
model::{ReadRequest, WriteRequest},
model::{GetTableInfoRequest, ReadRequest, TableInfo, WriteRequest},
RemoteEngine,
},
stream::{self, ErrWithSource, RecordBatchStream, SendableRecordBatchStream},
Expand Down Expand Up @@ -121,6 +121,14 @@ impl RemoteEngine for RemoteEngineImpl {
async fn write(&self, request: WriteRequest) -> remote::Result<usize> {
self.0.write(request).await.box_err().context(remote::Write)
}

async fn get_table_info(&self, request: GetTableInfoRequest) -> remote::Result<TableInfo> {
self.0
.get_table_info(request)
.await
.box_err()
.context(remote::GetTableInfo)
}
}

struct RemoteReadRecordBatchStream(ClientReadRecordBatchStream);
Expand Down
12 changes: 10 additions & 2 deletions table_engine/src/remote/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Remote table engine

Expand All @@ -11,7 +11,10 @@ use common_util::{define_result, error::GenericError};
use model::{ReadRequest, WriteRequest};
use snafu::Snafu;

use crate::stream::SendableRecordBatchStream;
use crate::{
remote::model::{GetTableInfoRequest, TableInfo},
stream::SendableRecordBatchStream,
};

#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
Expand All @@ -21,6 +24,9 @@ pub enum Error {

#[snafu(display("Failed to write to remote, err:{}", source))]
Write { source: GenericError },

#[snafu(display("Failed to get table info from remote, err:{}", source))]
GetTableInfo { source: GenericError },
}

define_result!(Error);
Expand All @@ -33,6 +39,8 @@ pub trait RemoteEngine: Send + Sync {

/// Write to the remote engine.
async fn write(&self, request: WriteRequest) -> Result<usize>;

async fn get_table_info(&self, request: GetTableInfoRequest) -> Result<TableInfo>;
}

/// Remote engine reference
Expand Down
31 changes: 30 additions & 1 deletion table_engine/src/remote/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

//! Model for remote table engine

use std::collections::HashMap;

use arrow_ext::{
ipc,
ipc::{CompressOptions, CompressionMethod},
Expand All @@ -13,11 +15,17 @@ use ceresdbproto::{
use common_types::{
record_batch::{RecordBatch, RecordBatchWithKeyBuilder},
row::{RowGroup, RowGroupBuilder},
schema::Schema,
};
use common_util::error::{BoxError, GenericError};
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};

use crate::table::{ReadRequest as TableReadRequest, WriteRequest as TableWriteRequest};
use crate::{
partition::PartitionInfo,
table::{
ReadRequest as TableReadRequest, SchemaId, TableId, WriteRequest as TableWriteRequest,
},
};

#[derive(Debug, Snafu)]
pub enum Error {
Expand Down Expand Up @@ -253,6 +261,27 @@ impl TryFrom<GetTableInfoRequest> for ceresdbproto::remote_engine::GetTableInfoR
}
}

pub struct TableInfo {
/// Catalog name
pub catalog_name: String,
/// Schema name
pub schema_name: String,
/// Schema id
pub schema_id: SchemaId,
/// Table name
pub table_name: String,
/// Table id
pub table_id: TableId,
/// Table schema
pub table_schema: Schema,
/// Table engine type
pub engine: String,
/// Table options
pub options: HashMap<String, String>,
/// Partition Info
pub partition_info: Option<PartitionInfo>,
}

fn build_row_group_from_record_batch(
record_batches: Vec<arrow::record_batch::RecordBatch>,
) -> Result<RowGroup> {
Expand Down

0 comments on commit e01e891

Please sign in to comment.