Skip to content

Commit

Permalink
feat: add stream api for get, mget and list (#13299)
Browse files Browse the repository at this point in the history
* feat: add stream api for get, mget and list

In this commit, meta grpc service provides new api
`kv_read_v1() -> Stream<StreamItem>`. Service and client are both ready
but not yet used by databend-query yet.

This new API support `get` single kv, `mget` multiple kv and `list` kv
by a prefix. The result is returned in a stream.

Raft service also provides a `kv_read_v1()` API for internal forwarding
such a request to the leader.

Other changes: rename internal term `PrefixList -> List`.

* chore: fix testing impl of MetaService

* chore: fix clippy
  • Loading branch information
drmingdrmer authored Oct 17, 2023
1 parent 51289d7 commit 67ee61b
Show file tree
Hide file tree
Showing 18 changed files with 652 additions and 37 deletions.
47 changes: 47 additions & 0 deletions src/meta/client/src/grpc_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use common_meta_kvapi::kvapi::UpsertKVReq;
use common_meta_types::protobuf::meta_service_client::MetaServiceClient;
use common_meta_types::protobuf::ClientInfo;
use common_meta_types::protobuf::RaftRequest;
use common_meta_types::protobuf::StreamItem;
use common_meta_types::protobuf::WatchRequest;
use common_meta_types::protobuf::WatchResponse;
use common_meta_types::InvalidArgument;
Expand All @@ -36,12 +37,14 @@ use log::debug;
use tonic::codegen::InterceptedService;
use tonic::transport::Channel;
use tonic::Request;
use tonic::Streaming;

use crate::grpc_client::AuthInterceptor;
use crate::message::ExportReq;
use crate::message::GetClientInfo;
use crate::message::GetEndpoints;
use crate::message::MakeClient;
use crate::message::Streamed;

/// Bind a request type to its corresponding response type.
pub trait RequestFor {
Expand Down Expand Up @@ -86,18 +89,62 @@ impl MetaGrpcReq {
}
}

#[derive(
serde::Serialize,
serde::Deserialize,
Debug,
Clone,
PartialEq,
Eq,
derive_more::From,
derive_more::TryInto,
)]
pub enum MetaGrpcReadReq {
GetKV(GetKVReq),
MGetKV(MGetKVReq),
ListKV(ListKVReq),
}

impl MetaGrpcReadReq {
pub fn to_raft_request(&self) -> Result<RaftRequest, InvalidArgument> {
let raft_request = RaftRequest {
data: serde_json::to_string(self)
.map_err(|e| InvalidArgument::new(e, "fail to encode request"))?,
};

debug!(
req = as_debug!(&raft_request);
"build raft_request"
);

Ok(raft_request)
}
}

impl RequestFor for GetKVReq {
type Reply = GetKVReply;
}

impl RequestFor for Streamed<GetKVReq> {
type Reply = Streaming<StreamItem>;
}

impl RequestFor for MGetKVReq {
type Reply = MGetKVReply;
}

impl RequestFor for Streamed<MGetKVReq> {
type Reply = Streaming<StreamItem>;
}

impl RequestFor for ListKVReq {
type Reply = ListKVReply;
}

impl RequestFor for Streamed<ListKVReq> {
type Reply = Streaming<StreamItem>;
}

impl RequestFor for UpsertKVReq {
type Reply = UpsertKVReply;
}
Expand Down
86 changes: 82 additions & 4 deletions src/meta/client/src/grpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use common_grpc::RpcClientConf;
use common_grpc::RpcClientTlsConfig;
use common_meta_api::reply::reply_to_api_result;
use common_meta_types::anyerror::AnyError;
use common_meta_types::protobuf as pb;
use common_meta_types::protobuf::meta_service_client::MetaServiceClient;
use common_meta_types::protobuf::ClientInfo;
use common_meta_types::protobuf::Empty;
Expand Down Expand Up @@ -81,16 +82,19 @@ use tonic::transport::Channel;
use tonic::Code;
use tonic::Request;
use tonic::Status;
use tonic::Streaming;

use crate::from_digit_ver;
use crate::grpc_action::RequestFor;
use crate::grpc_metrics;
use crate::message;
use crate::to_digit_ver;
use crate::MetaGrpcReadReq;
use crate::MetaGrpcReq;
use crate::METACLI_COMMIT_SEMVER;
use crate::MIN_METASRV_SEMVER;

const RPC_RETRIES: usize = 2;
const AUTH_TOKEN_KEY: &str = "auth-token-bin";

#[derive(Debug)]
Expand Down Expand Up @@ -403,19 +407,46 @@ impl MetaGrpcClient {
.await;
message::Response::Get(resp)
}
message::Request::StreamGet(r) => {
let strm = self
.kv_read_v1(MetaGrpcReadReq::GetKV(r.into_inner()))
.timed_ge(threshold(), info_spent("MetaGrpcClient::kv_read_v1(GetKV)"))
.await;
message::Response::StreamGet(strm)
}
message::Request::MGet(r) => {
let resp = self
.kv_api(r)
.timed_ge(threshold(), info_spent("MetaGrpcClient::kv_api"))
.await;
message::Response::MGet(resp)
}
message::Request::PrefixList(r) => {
message::Request::StreamMGet(r) => {
let strm = self
.kv_read_v1(MetaGrpcReadReq::MGetKV(r.into_inner()))
.timed_ge(
threshold(),
info_spent("MetaGrpcClient::kv_read_v1(MGetKV)"),
)
.await;
message::Response::StreamMGet(strm)
}
message::Request::List(r) => {
let resp = self
.kv_api(r)
.timed_ge(threshold(), info_spent("MetaGrpcClient::kv_api"))
.await;
message::Response::PrefixList(resp)
message::Response::List(resp)
}
message::Request::StreamList(r) => {
let strm = self
.kv_read_v1(MetaGrpcReadReq::ListKV(r.into_inner()))
.timed_ge(
threshold(),
info_spent("MetaGrpcClient::kv_read_v1(ListKV)"),
)
.await;
message::Response::StreamMGet(strm)
}
message::Request::Upsert(r) => {
let resp = self
Expand Down Expand Up @@ -898,7 +929,7 @@ impl MetaGrpcClient {
.to_raft_request()
.map_err(MetaNetworkError::InvalidArgument)?;

for i in 0..2 {
for i in 0..RPC_RETRIES {
let req = common_tracing::inject_span_to_tonic_request(Request::new(raft_req.clone()));

let mut client = self
Expand Down Expand Up @@ -929,7 +960,54 @@ impl MetaGrpcClient {
return Ok(resp);
}

unreachable!("impossible to reach here");
unreachable!("impossible to quit loop without error or success");
}

#[minitrace::trace]
pub(crate) async fn kv_read_v1(
&self,
grpc_req: MetaGrpcReadReq,
) -> Result<Streaming<pb::StreamItem>, MetaError> {
debug!(
req = as_debug!(&grpc_req);
"MetaGrpcClient::kv_api request"
);

let raft_req: RaftRequest = grpc_req
.to_raft_request()
.map_err(MetaNetworkError::InvalidArgument)?;

for i in 0..RPC_RETRIES {
let req = common_tracing::inject_span_to_tonic_request(Request::new(raft_req.clone()));

let mut client = self
.make_client()
.timed_ge(threshold(), info_spent("MetaGrpcClient::make_client"))
.await?;

let result = client
.kv_read_v1(req)
.timed_ge(threshold(), info_spent("client::kv_read_v1"))
.await;

debug!(
result = as_debug!(&result);
"MetaGrpcClient::kv_read_v1 result, {}-th try", i
);

if let Err(ref e) = result {
if status_is_retryable(e) {
self.mark_as_unhealthy();
continue;
}
}

let strm = result?.into_inner();

return Ok(strm);
}

unreachable!("impossible to quit loop without error or success");
}

#[minitrace::trace]
Expand Down
5 changes: 5 additions & 0 deletions src/meta/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ mod message;

pub use common_meta_api::reply::reply_to_api_result;
pub use common_meta_api::reply::reply_to_meta_result;
pub use grpc_action::MetaGrpcReadReq;
pub use grpc_action::MetaGrpcReq;
pub use grpc_action::RequestFor;
pub use grpc_client::ClientHandle;
pub use grpc_client::MetaGrpcClient;
pub use message::ClientWorkerRequest;
pub use message::Streamed;
use once_cell::sync::Lazy;
use semver::BuildMetadata;
use semver::Prerelease;
Expand Down Expand Up @@ -67,6 +69,9 @@ pub static METACLI_COMMIT_SEMVER: Lazy<Version> = Lazy::new(|| {
///
/// - 2023-10-11: since 1.2.153:
/// Meta service: add: pb::SeqV.meta field to support record expiration.
///
/// - 2023-10-17: since TODO(fill in when merged):
/// Meta service: add: stream api: kv_read_v1().
pub static MIN_METASRV_SEMVER: Version = Version {
major: 1,
minor: 1,
Expand Down
62 changes: 43 additions & 19 deletions src/meta/client/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use common_meta_kvapi::kvapi::UpsertKVReq;
use common_meta_types::protobuf::meta_service_client::MetaServiceClient;
use common_meta_types::protobuf::ClientInfo;
use common_meta_types::protobuf::ExportedChunk;
use common_meta_types::protobuf::StreamItem;
use common_meta_types::protobuf::WatchRequest;
use common_meta_types::protobuf::WatchResponse;
use common_meta_types::MetaClientError;
Expand All @@ -34,6 +35,7 @@ use common_meta_types::TxnReply;
use common_meta_types::TxnRequest;
use tonic::codegen::InterceptedService;
use tonic::transport::Channel;
use tonic::Streaming;

use crate::grpc_client::AuthInterceptor;

Expand All @@ -57,6 +59,16 @@ impl fmt::Debug for ClientWorkerRequest {
}
}

/// Mark an RPC to return a stream.
#[derive(Debug, Clone)]
pub struct Streamed<T>(pub T);

impl<T> Streamed<T> {
pub fn into_inner(self) -> T {
self.0
}
}

/// Meta-client handle-to-worker request body
#[derive(Debug, Clone, derive_more::From)]
pub enum Request {
Expand All @@ -67,7 +79,16 @@ pub enum Request {
MGet(MGetKVReq),

/// List KVs by key prefix
PrefixList(ListKVReq),
List(ListKVReq),

/// Get KV, returning a stream
StreamGet(Streamed<GetKVReq>),

/// Get multiple KV, returning a stream.
StreamMGet(Streamed<MGetKVReq>),

/// List KVs by key prefix, returning a stream.
StreamList(Streamed<ListKVReq>),

/// Update or insert KV
Upsert(UpsertKVReq),
Expand Down Expand Up @@ -96,7 +117,10 @@ impl Request {
match self {
Request::Get(_) => "Get",
Request::MGet(_) => "MGet",
Request::PrefixList(_) => "PrefixList",
Request::List(_) => "PrefixList",
Request::StreamGet(_) => "StreamGet",
Request::StreamMGet(_) => "StreamMGet",
Request::StreamList(_) => "StreamPrefixList",
Request::Upsert(_) => "Upsert",
Request::Txn(_) => "Txn",
Request::Watch(_) => "Watch",
Expand All @@ -113,7 +137,10 @@ impl Request {
pub enum Response {
Get(Result<GetKVReply, MetaError>),
MGet(Result<MGetKVReply, MetaError>),
PrefixList(Result<ListKVReply, MetaError>),
List(Result<ListKVReply, MetaError>),
StreamGet(Result<Streaming<StreamItem>, MetaError>),
StreamMGet(Result<Streaming<StreamItem>, MetaError>),
StreamList(Result<Streaming<StreamItem>, MetaError>),
Upsert(Result<UpsertKVReply, MetaError>),
Txn(Result<TxnReply, MetaError>),
Watch(Result<tonic::codec::Streaming<WatchResponse>, MetaError>),
Expand All @@ -126,21 +153,6 @@ pub enum Response {
}

impl Response {
pub fn is_err(&self) -> bool {
match self {
Response::Get(res) => res.is_err(),
Response::MGet(res) => res.is_err(),
Response::PrefixList(res) => res.is_err(),
Response::Upsert(res) => res.is_err(),
Response::Txn(res) => res.is_err(),
Response::Watch(res) => res.is_err(),
Response::Export(res) => res.is_err(),
Response::MakeClient(res) => res.is_err(),
Response::GetEndpoints(res) => res.is_err(),
Response::GetClientInfo(res) => res.is_err(),
}
}

pub fn err(&self) -> Option<&(dyn std::error::Error + 'static)> {
let e = match self {
Response::Get(res) => res
Expand All @@ -151,7 +163,19 @@ impl Response {
.as_ref()
.err()
.map(|x| x as &(dyn std::error::Error + 'static)),
Response::PrefixList(res) => res
Response::List(res) => res
.as_ref()
.err()
.map(|x| x as &(dyn std::error::Error + 'static)),
Response::StreamGet(res) => res
.as_ref()
.err()
.map(|x| x as &(dyn std::error::Error + 'static)),
Response::StreamMGet(res) => res
.as_ref()
.err()
.map(|x| x as &(dyn std::error::Error + 'static)),
Response::StreamList(res) => res
.as_ref()
.err()
.map(|x| x as &(dyn std::error::Error + 'static)),
Expand Down
Loading

1 comment on commit 67ee61b

@vercel
Copy link

@vercel vercel bot commented on 67ee61b Oct 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.