Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[metasrv] feature: exchange protocol version with client #5645

Merged
merged 10 commits into from
May 29, 2022
Merged
14 changes: 1 addition & 13 deletions .github/actions/test_compat/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,7 @@ runs:
- uses: actions/download-artifact@v2
with:
name: ${{ inputs.profile }}-${{ github.sha }}-${{ inputs.target }}
path: ./current/

- uses: dsaltares/fetch-gh-release-asset@master
with:
repo: 'datafuselabs/databend'
# 2022-05-23
version: 'tags/v0.7.57-nightly'
file: 'databend-v0.7.57-nightly-x86_64-unknown-linux-gnu.tar.gz'

- shell: bash
run: |
mkdir ./old
tar -xf databend-v0.7.57-nightly-x86_64-unknown-linux-gnu.tar.gz -C ./old
path: ./bins/current/

- name: Test compatibility
shell: bash
Expand Down
10 changes: 8 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ build_exceptions! {
MetaServiceError(2001),
InvalidConfig(2002),
MetaStorageError(2003),
InvalidArgument(2004),

TableVersionMismatched(2009),
OCCRetryFailure(2011),
Expand Down
5 changes: 5 additions & 0 deletions common/meta/grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,16 @@ common-tracing = { path = "../../tracing" }

derive_more = "0.99.17"
futures = "0.3.21"
once_cell = "1.10.0"
prost = "=0.9.0"
rand = "0.8.5"
semver = "1.0.9"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.79"
thiserror = "1.0.30"
tonic = { version = "=0.6.2", features = ["transport", "codegen", "prost", "tls-roots", "tls"] }

[dev-dependencies]

[build-dependencies]
common-building = { path = "../../building" }
16 changes: 16 additions & 0 deletions common/meta/grpc/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
fn main() {
common_building::setup();
}
116 changes: 96 additions & 20 deletions common/meta/grpc/src/grpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use common_meta_types::protobuf::RaftRequest;
use common_meta_types::protobuf::WatchRequest;
use common_meta_types::protobuf::WatchResponse;
use common_meta_types::ConnectionError;
use common_meta_types::InvalidArgument;
use common_meta_types::MetaError;
use common_meta_types::MetaNetworkError;
use common_meta_types::MetaResultError;
Expand All @@ -52,6 +53,7 @@ use common_tracing::tracing;
use futures::stream::StreamExt;
use prost::Message;
use rand::Rng;
use semver::Version;
use serde::de::DeserializeOwned;
use tonic::async_trait;
use tonic::client::GrpcService;
Expand All @@ -63,10 +65,14 @@ use tonic::Code;
use tonic::Request;
use tonic::Status;

use crate::from_digit_ver;
use crate::grpc_action::MetaGrpcReadReq;
use crate::grpc_action::MetaGrpcWriteReq;
use crate::grpc_action::RequestFor;
use crate::message;
use crate::to_digit_ver;
use crate::METACLI_COMMIT_SEMVER;
use crate::MIN_METASRV_SEMVER;

const AUTH_TOKEN_KEY: &str = "auth-token-bin";

Expand Down Expand Up @@ -200,31 +206,27 @@ impl MetaGrpcClient {
///
/// The worker is a singleton and the returned handle is cheap to clone.
/// When all handles are dropped the worker will quit, then the runtime will be destroyed.
pub async fn try_new(
conf: &RpcClientConf,
) -> std::result::Result<Arc<ClientHandle>, ErrorCode> {
pub fn try_new(conf: &RpcClientConf) -> std::result::Result<Arc<ClientHandle>, ErrorCode> {
Self::try_create(
conf.get_endpoints(),
&conf.username,
&conf.password,
conf.timeout,
conf.tls_conf.clone(),
)
.await
}

#[tracing::instrument(level = "debug", skip(password))]
pub async fn try_create(
pub fn try_create(
endpoints: Vec<String>,
username: &str,
password: &str,
timeout: Option<Duration>,
conf: Option<RpcClientTlsConfig>,
) -> Result<Arc<ClientHandle>> {
let mgr = MetaChannelManager {
timeout,
conf: conf.clone(),
};
Self::endpoints_non_empty(&endpoints)?;

let mgr = MetaChannelManager { timeout, conf };

let rt = Runtime::with_worker_threads(1, Some("meta-client-rt".to_string()))
.map_err(|e| e.add_message_back("when creating meta-client"))?;
Expand All @@ -238,13 +240,12 @@ impl MetaGrpcClient {

let worker = Arc::new(Self {
conn_pool: Pool::new(mgr, Duration::from_millis(50)),
endpoints: RwLock::new(vec![]),
endpoints: RwLock::new(endpoints),
username: username.to_string(),
password: password.to_string(),
token: RwLock::new(None),
rt: rt.clone(),
});
worker.set_endpoints(endpoints).await?;

rt.spawn(Self::worker_loop(worker, rx));

Expand Down Expand Up @@ -379,8 +380,14 @@ impl MetaGrpcClient {
let token = match t.clone() {
Some(t) => t,
None => {
let new_token =
MetaGrpcClient::handshake(&mut client, &self.username, &self.password).await?;
let new_token = Self::handshake(
&mut client,
&METACLI_COMMIT_SEMVER,
&MIN_METASRV_SEMVER,
&self.username,
&self.password,
)
.await?;
*t = Some(new_token.clone());
new_token
}
Expand All @@ -391,24 +398,66 @@ impl MetaGrpcClient {
Ok(client)
}

pub fn endpoints_non_empty(endpoints: &[String]) -> std::result::Result<(), MetaError> {
if endpoints.is_empty() {
return Err(MetaError::InvalidConfig("endpoints is empty".to_string()));
}
Ok(())
}

#[tracing::instrument(level = "debug", skip(self))]
pub async fn set_endpoints(
&self,
endpoints: Vec<String>,
) -> std::result::Result<(), MetaError> {
if endpoints.is_empty() {
return Err(MetaError::InvalidConfig("endpoints is empty".to_string()));
}
Self::endpoints_non_empty(&endpoints)?;

let mut eps = self.endpoints.write().await;
*eps = endpoints;
Ok(())
}

/// Handshake with metasrv.
///
/// - Check whether the versions of this client(`C`) and the remote metasrv(`S`) are compatible.
/// - Authorize this client.
///
/// ## Check compatibility
///
/// Both client `C` and server `S` maintains two semantic-version:
/// - `C` maintains the its own semver(`C.ver`) and the minimal compatible `S` semver(`C.min_srv_ver`).
/// - `S` maintains the its own semver(`S.ver`) and the minimal compatible `S` semver(`S.min_cli_ver`).
///
/// When handshaking:
/// - `C` sends its ver `C.ver` to `S`,
/// - When `S` receives handshake request, `S` asserts that `C.ver >= S.min_cli_ver`.
/// - Then `S` replies handshake-reply with its `S.ver`.
/// - When `C` receives the reply, `C` asserts that `S.ver >= C.min_srv_ver`.
///
/// Handshake succeeds if both of these two assertions hold.
///
/// E.g.:
/// - `S: (ver=3, min_cli_ver=1)` is compatible with `C: (ver=3, min_srv_ver=2)`.
/// - `S: (ver=4, min_cli_ver=4)` is **NOT** compatible with `C: (ver=3, min_srv_ver=2)`.
/// Because although `S.ver(4) >= C.min_srv_ver(3)` holds,
/// but `C.ver(3) >= S.min_cli_ver(4)` does not hold.
///
/// ```text
/// C.ver: 1 3 4
/// C --------+-------------+------+------------>
/// ^ .------' ^
/// | | |
/// '-------------. |
/// | | |
/// v | |
/// S ---------------+------+------+------------>
/// S.ver: 2 3 4
/// ```
#[tracing::instrument(level = "debug", skip(client, password))]
async fn handshake(
pub async fn handshake(
client: &mut MetaServiceClient<Channel>,
client_ver: &Version,
min_metasrv_ver: &Version,
username: &str,
password: &str,
) -> std::result::Result<Vec<u8>, MetaError> {
Expand All @@ -419,17 +468,44 @@ impl MetaGrpcClient {
let mut payload = vec![];
auth.encode(&mut payload)?;

let req = Request::new(futures::stream::once(async {
let my_ver = to_digit_ver(client_ver);
let req = Request::new(futures::stream::once(async move {
HandshakeRequest {
protocol_version: 0,
protocol_version: my_ver,
payload,
}
}));

let rx = client.handshake(req).await?;
let mut rx = rx.into_inner();

let resp = rx.next().await.expect("Must respond from handshake")?;
let res = rx.next().await.ok_or_else(|| {
MetaNetworkError::ConnectionError(ConnectionError::new(
AnyError::error("handshake returns nothing"),
"",
))
})?;

let resp = res?;

// backward compatibility: no version in handshake.
// TODO(xp): remove this when merged.
if resp.protocol_version > 0 {
let min_compatible = to_digit_ver(min_metasrv_ver);
if resp.protocol_version < min_compatible {
return Err(MetaError::MetaNetworkError(
MetaNetworkError::InvalidArgument(InvalidArgument::new(
AnyError::error(format!(
"metasrv protocol_version({}) < meta-client min-compatible({})",
from_digit_ver(resp.protocol_version),
min_metasrv_ver,
)),
"",
)),
));
}
}

let token = resp.payload;
Ok(token)
}
Expand Down
30 changes: 30 additions & 0 deletions common/meta/grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,33 @@ pub use grpc_action::RequestFor;
pub use grpc_client::ClientHandle;
pub use grpc_client::MetaGrpcClient;
pub use message::ClientWorkerRequest;
use once_cell::sync::Lazy;
use semver::BuildMetadata;
use semver::Prerelease;
use semver::Version;

pub static METACLI_COMMIT_SEMVER: Lazy<Version> = Lazy::new(|| {
let build_semver = option_env!("VERGEN_GIT_SEMVER");
let semver = build_semver.expect("VERGEN_GIT_SEMVER can not be None");

let semver = semver.strip_prefix('v').unwrap_or(semver);

Version::parse(semver).unwrap()
});

/// Oldest compatible nightly metasrv version
pub static MIN_METASRV_SEMVER: Version = Version {
major: 0,
minor: 7,
patch: 63,
pre: Prerelease::EMPTY,
build: BuildMetadata::EMPTY,
};

pub fn to_digit_ver(v: &Version) -> u64 {
v.major * 1_000_000 + v.minor * 1_000 + v.patch
}

pub fn from_digit_ver(u: u64) -> Version {
Version::new(u / 1_000_000, u / 1_000 % 1_000, u % 1_000)
}
8 changes: 2 additions & 6 deletions common/meta/grpc/tests/it/grpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ async fn test_grpc_client_action_timeout() {
// server's handshake impl will sleep 2secs.
let timeout = Duration::from_secs(3);

let client = MetaGrpcClient::try_create(vec![srv_addr], "", "", Some(timeout), None)
.await
.unwrap();
let client = MetaGrpcClient::try_create(vec![srv_addr], "", "", Some(timeout), None).unwrap();

let res = client
.get_database(GetDatabaseReq::new("tenant1", "xx"))
Expand All @@ -49,9 +47,7 @@ async fn test_grpc_client_handshake_timeout() {
let srv_addr = start_grpc_server();

let timeout = Duration::from_secs(1);
let res = MetaGrpcClient::try_create(vec![srv_addr], "", "", Some(timeout), None)
.await
.unwrap();
let res = MetaGrpcClient::try_create(vec![srv_addr], "", "", Some(timeout), None).unwrap();

let client = res.make_client().await;
let got = client.unwrap_err();
Expand Down
Loading