Skip to content

Commit

Permalink
[metasrv] feature: exchange protocol version with client
Browse files Browse the repository at this point in the history
Before transfer any data, `MetaGrpcClient` must ensure it is compatible
with the metasrv it is connecting to, in the handshake RPC.

Add compatibility cross test:

```
latest query -- latest metasrv
old    query -- latest metasrv
latest query -- old    metasrv
```

Features

- Compatibility test: download test suite for query.

  When testing an old query with latest metasrv,
  download the test suite for that version of query instead of using
  the latest suite, which may not pass if new feature test is added
  into the latest suite but not yet impl-ed in the old query.

- Compatibility test:

  Test the compatibility between:
  - The latest query and the latest metasrv.
  - The oldest compatible query with latest metasrv.
  - The latest query with oldest compatible  metasrv.

- Protocol version exchange when meta-client `handshake` with metasrv.

  meta-client(query) keeps track of the min-metasrv version it is compatible
  with.

  metasrv keeps track of the min-meta-client version it is compatible
  with.

  Client handshake check these versions to ensure compatibility.
  Otherwise an error is returned and no further data will be transferred.

- Add admin CLI command `--cmd ver`.

  `databend-query --cmd ver` outputs its build version and the minimal
  compatible metasrv version, such as:

  ```
  version: 0.7.61-nightly
  min-compatible-metasrv-version: 0.7.59
  ```

  `databend-meta --cmd ver` outputs its build version and the minimal
  compatible meta-client(query) version, such as:

  ```
  version: 0.7.61-nightly
  min-compatible-client-version: 0.7.57
  ```

Refactors

- Add a new error `InvalidArgument` to indicate protocol-error

- `MetaGrpcClient::try_new()` does not need to be `async`.

- Metasrv config does not `check()` if `--cmd` is non-empty:

  When `cmd` is specified, it does not mean to run it.

Docs

- Add doc explain how to check versions and how to upgrade.

---

- Fix: databendlabs#5627
- Fix: databendlabs#4948
  • Loading branch information
drmingdrmer committed May 28, 2022
1 parent bda7388 commit dd08e7b
Show file tree
Hide file tree
Showing 43 changed files with 919 additions and 122 deletions.
2 changes: 1 addition & 1 deletion .github/actions/test_compat/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ runs:
- uses: actions/download-artifact@v2
with:
name: ${{ inputs.profile }}-${{ github.sha }}-${{ inputs.target }}
path: ./current/
path: ./bins/current/

- uses: dsaltares/fetch-gh-release-asset@master
with:
Expand Down
10 changes: 8 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ build_exceptions! {
MetaServiceError(2001),
InvalidConfig(2002),
MetaStorageError(2003),
InvalidArgument(2004),

TableVersionMismatched(2009),
OCCRetryFailure(2011),
Expand Down
5 changes: 5 additions & 0 deletions common/meta/grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,16 @@ common-tracing = { path = "../../tracing" }

derive_more = "0.99.17"
futures = "0.3.21"
once_cell = "1.10.0"
prost = "=0.9.0"
rand = "0.8.5"
semver = "1.0.9"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.79"
thiserror = "1.0.30"
tonic = { version = "=0.6.2", features = ["transport", "codegen", "prost", "tls-roots", "tls"] }

[dev-dependencies]

[build-dependencies]
common-building = { path = "../../building" }
16 changes: 16 additions & 0 deletions common/meta/grpc/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
fn main() {
common_building::setup();
}
111 changes: 95 additions & 16 deletions common/meta/grpc/src/grpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use common_meta_types::protobuf::RaftRequest;
use common_meta_types::protobuf::WatchRequest;
use common_meta_types::protobuf::WatchResponse;
use common_meta_types::ConnectionError;
use common_meta_types::InvalidArgument;
use common_meta_types::MetaError;
use common_meta_types::MetaNetworkError;
use common_meta_types::MetaResultError;
Expand All @@ -52,6 +53,7 @@ use common_tracing::tracing;
use futures::stream::StreamExt;
use prost::Message;
use rand::Rng;
use semver::Version;
use serde::de::DeserializeOwned;
use tonic::async_trait;
use tonic::client::GrpcService;
Expand All @@ -63,10 +65,14 @@ use tonic::Code;
use tonic::Request;
use tonic::Status;

use crate::from_digit_ver;
use crate::grpc_action::MetaGrpcReadReq;
use crate::grpc_action::MetaGrpcWriteReq;
use crate::grpc_action::RequestFor;
use crate::message;
use crate::to_digit_ver;
use crate::METACLI_COMMIT_SEMVER;
use crate::MIN_METASRV_SEMVER;

const AUTH_TOKEN_KEY: &str = "auth-token-bin";

Expand Down Expand Up @@ -200,27 +206,26 @@ impl MetaGrpcClient {
///
/// The worker is a singleton and the returned handle is cheap to clone.
/// When all handles are dropped the worker will quit, then the runtime will be destroyed.
pub async fn try_new(
conf: &RpcClientConf,
) -> std::result::Result<Arc<ClientHandle>, ErrorCode> {
pub fn try_new(conf: &RpcClientConf) -> std::result::Result<Arc<ClientHandle>, ErrorCode> {
Self::try_create(
conf.get_endpoints(),
&conf.username,
&conf.password,
conf.timeout,
conf.tls_conf.clone(),
)
.await
}

#[tracing::instrument(level = "debug", skip(password))]
pub async fn try_create(
pub fn try_create(
endpoints: Vec<String>,
username: &str,
password: &str,
timeout: Option<Duration>,
conf: Option<RpcClientTlsConfig>,
) -> Result<Arc<ClientHandle>> {
Self::endpoints_non_empty(&endpoints)?;

let mgr = MetaChannelManager {
timeout,
conf: conf.clone(),
Expand All @@ -238,13 +243,12 @@ impl MetaGrpcClient {

let worker = Arc::new(Self {
conn_pool: Pool::new(mgr, Duration::from_millis(50)),
endpoints: RwLock::new(vec![]),
endpoints: RwLock::new(endpoints),
username: username.to_string(),
password: password.to_string(),
token: RwLock::new(None),
rt: rt.clone(),
});
worker.set_endpoints(endpoints).await?;

rt.spawn(Self::worker_loop(worker, rx));

Expand Down Expand Up @@ -379,8 +383,14 @@ impl MetaGrpcClient {
let token = match t.clone() {
Some(t) => t,
None => {
let new_token =
MetaGrpcClient::handshake(&mut client, &self.username, &self.password).await?;
let new_token = Self::handshake(
&mut client,
&METACLI_COMMIT_SEMVER,
&MIN_METASRV_SEMVER,
&self.username,
&self.password,
)
.await?;
*t = Some(new_token.clone());
new_token
}
Expand All @@ -391,24 +401,66 @@ impl MetaGrpcClient {
Ok(client)
}

pub fn endpoints_non_empty(endpoints: &[String]) -> std::result::Result<(), MetaError> {
if endpoints.is_empty() {
return Err(MetaError::InvalidConfig("endpoints is empty".to_string()));
}
Ok(())
}

#[tracing::instrument(level = "debug", skip(self))]
pub async fn set_endpoints(
&self,
endpoints: Vec<String>,
) -> std::result::Result<(), MetaError> {
if endpoints.is_empty() {
return Err(MetaError::InvalidConfig("endpoints is empty".to_string()));
}
Self::endpoints_non_empty(&endpoints)?;

let mut eps = self.endpoints.write().await;
*eps = endpoints;
Ok(())
}

/// Handshake with metasrv.
///
/// - Check whether the versions of this client(`C`) and the remote metasrv(`S`) are compatible.
/// - Authorize this client.
///
/// ## Check compatibility
///
/// Both client `C` and server `S` maintains two semantic-version:
/// - `C` maintains the its own semver(`C.ver`) and the minimal compatible `S` semver(`C.min_srv_ver`).
/// - `S` maintains the its own semver(`S.ver`) and the minimal compatible `S` semver(`S.min_cli_ver`).
///
/// When handshaking:
/// - `C` sends its ver `C.ver` to `S`,
/// - When `S` receives handshake request, `S` asserts that `C.ver >= S.min_cli_ver`.
/// - Then `S` replies handshake-reply with its `S.ver`.
/// - When `C` receives the reply, `C` asserts that `S.ver >= C.min_srv_ver`.
///
/// Handshake succeeds if both of these two assertions hold.
///
/// E.g.:
/// - `S: (ver=3, min_cli_ver=1)` is compatible with `C: (ver=3, min_srv_ver=2)`.
/// - `S: (ver=4, min_cli_ver=4)` is **NOT** compatible with `C: (ver=3, min_srv_ver=2)`.
/// Because although `S.ver(4) >= C.min_srv_ver(3)` holds,
/// but `C.ver(3) >= S.min_cli_ver(4)` does not hold.
///
/// ```text
/// C.ver: 1 3 4
/// C --------+-------------+------+------------>
/// ^ .------' ^
/// | | |
/// '-------------. |
/// | | |
/// v | |
/// S ---------------+------+------+------------>
/// S.ver: 2 3 4
/// ```
#[tracing::instrument(level = "debug", skip(client, password))]
async fn handshake(
pub async fn handshake(
client: &mut MetaServiceClient<Channel>,
client_ver: &Version,
min_metasrv_ver: &Version,
username: &str,
password: &str,
) -> std::result::Result<Vec<u8>, MetaError> {
Expand All @@ -419,17 +471,44 @@ impl MetaGrpcClient {
let mut payload = vec![];
auth.encode(&mut payload)?;

let req = Request::new(futures::stream::once(async {
let my_ver = to_digit_ver(client_ver);
let req = Request::new(futures::stream::once(async move {
HandshakeRequest {
protocol_version: 0,
protocol_version: my_ver,
payload,
}
}));

let rx = client.handshake(req).await?;
let mut rx = rx.into_inner();

let resp = rx.next().await.expect("Must respond from handshake")?;
let res = rx.next().await.ok_or_else(|| {
MetaNetworkError::ConnectionError(ConnectionError::new(
AnyError::error("handshake returns nothing"),
"",
))
})?;

let resp = res?;

// backward compatibility: no version in handshake.
// TODO(xp): remove this when merged.
if resp.protocol_version > 0 {
let min_compatible = to_digit_ver(min_metasrv_ver);
if resp.protocol_version < min_compatible {
return Err(MetaError::MetaNetworkError(
MetaNetworkError::InvalidArgument(InvalidArgument::new(
AnyError::error(format!(
"metasrv protocol_version({}) < meta-client min-compatible({})",
from_digit_ver(resp.protocol_version),
min_metasrv_ver,
)),
"",
)),
));
}
}

let token = resp.payload;
Ok(token)
}
Expand Down
34 changes: 34 additions & 0 deletions common/meta/grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,37 @@ pub use grpc_action::RequestFor;
pub use grpc_client::ClientHandle;
pub use grpc_client::MetaGrpcClient;
pub use message::ClientWorkerRequest;
use once_cell::sync::Lazy;
use semver::BuildMetadata;
use semver::Prerelease;
use semver::Version;

pub static METACLI_COMMIT_SEMVER: Lazy<Version> = Lazy::new(|| {
let build_semver = option_env!("VERGEN_GIT_SEMVER");
let semver = build_semver.expect("VERGEN_GIT_SEMVER can not be None");

let semver = if semver.starts_with("v") {
&semver[1..]
} else {
semver
};

Version::parse(semver).unwrap()
});

/// Oldest compatible nightly metasrv version
pub static MIN_METASRV_SEMVER: Version = Version {
major: 0,
minor: 7,
patch: 63,
pre: Prerelease::EMPTY,
build: BuildMetadata::EMPTY,
};

pub fn to_digit_ver(v: &Version) -> u64 {
v.major * 1_000_000 + v.minor * 1_000 + v.patch
}

pub fn from_digit_ver(u: u64) -> Version {
Version::new(u / 1_000_000, u / 1_000 % 1_000, u % 1_000)
}
8 changes: 2 additions & 6 deletions common/meta/grpc/tests/it/grpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ async fn test_grpc_client_action_timeout() {
// server's handshake impl will sleep 2secs.
let timeout = Duration::from_secs(3);

let client = MetaGrpcClient::try_create(vec![srv_addr], "", "", Some(timeout), None)
.await
.unwrap();
let client = MetaGrpcClient::try_create(vec![srv_addr], "", "", Some(timeout), None).unwrap();

let res = client
.get_database(GetDatabaseReq::new("tenant1", "xx"))
Expand All @@ -49,9 +47,7 @@ async fn test_grpc_client_handshake_timeout() {
let srv_addr = start_grpc_server();

let timeout = Duration::from_secs(1);
let res = MetaGrpcClient::try_create(vec![srv_addr], "", "", Some(timeout), None)
.await
.unwrap();
let res = MetaGrpcClient::try_create(vec![srv_addr], "", "", Some(timeout), None).unwrap();

let client = res.make_client().await;
let got = client.unwrap_err();
Expand Down
2 changes: 1 addition & 1 deletion common/meta/raft-store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub static DATABEND_COMMIT_VERSION: Lazy<String> = Lazy::new(|| {
ver
});

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, serde::Serialize)]
pub struct RaftConfig {
/// Identify a config.
/// This is only meant to make debugging easier with more than one Config involved.
Expand Down
1 change: 1 addition & 0 deletions common/meta/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ pub use meta_errors::MetaError;
pub use meta_errors::MetaResult;
pub use meta_errors_into::ToMetaError;
pub use meta_network_errors::ConnectionError;
pub use meta_network_errors::InvalidArgument;
pub use meta_network_errors::MetaNetworkError;
pub use meta_network_errors::MetaNetworkResult;
pub use meta_raft_errors::ForwardToLeader;
Expand Down
Loading

0 comments on commit dd08e7b

Please sign in to comment.