Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/cluster client #465

Merged
merged 3 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion xline-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Official Xline API client for Rust that supports the [CURP](https://github.com/x
- [ ] MemberAdd
- [ ] MemberRemove
- [ ] MemberUpdate
- [ ] MemberList
- [x] MemberList
- [ ] MemberPromote
- Election
- [ ] Campaign
Expand Down
58 changes: 58 additions & 0 deletions xline-client/examples/cluster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use xline_client::types::cluster::{
MemberAddRequest, MemberListRequest, MemberPromoteRequest, MemberRemoveRequest,
MemberUpdateRequest,
};
use xline_client::{error::Result, Client, ClientOptions};

#[tokio::main]
async fn main() -> Result<()> {
// the name and address of all curp members
let curp_members = ["10.0.0.1:2379", "10.0.0.2:2379", "10.0.0.3:2379"];

let mut client = Client::connect(curp_members, ClientOptions::default())
.await?
.cluster_client();

// send a linearizable member list request
let resp = client.member_list(MemberListRequest::new(true)).await?;
println!("members: {:?}", resp.members);

// whether the added member is a learner.
// the learner does not participate in voting and will only catch up with the progress of the leader.
let is_learner = true;

// add a normal node into the cluster
let resp = client
.member_add(MemberAddRequest::new(
vec!["127.0.0.1:2379".to_owned()],
is_learner,
))
.await?;
let added_member = resp.member.unwrap();
println!("members: {:?}, added: {}", resp.members, added_member.id);

if is_learner {
// promote the learner to a normal node
let resp = client
.member_promote(MemberPromoteRequest::new(added_member.id))
.await?;
println!("members: {:?}", resp.members);
}

// update the peer_ur_ls of the added member if the network topology has changed.
let resp = client
.member_update(MemberUpdateRequest::new(
added_member.id,
vec!["127.0.0.2:2379".to_owned()],
))
.await?;
println!("members: {:?}", resp.members);

// remove the member from the cluster if it is no longer needed.
let resp = client
.member_remove(MemberRemoveRequest::new(added_member.id))
.await?;
println!("members: {:?}", resp.members);

Ok(())
}
214 changes: 208 additions & 6 deletions xline-client/src/clients/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,219 @@
// TODO: Remove these when the placeholder is implemented.
#![allow(missing_copy_implementations)]
#![allow(clippy::new_without_default)]
use std::sync::Arc;
use tonic::transport::Channel;

use crate::error::Result;
use crate::types::cluster::{
MemberAddRequest, MemberAddResponse, MemberListRequest, MemberListResponse,
MemberPromoteRequest, MemberPromoteResponse, MemberRemoveRequest, MemberRemoveResponse,
MemberUpdateRequest, MemberUpdateResponse,
};
use crate::AuthService;

/// Client for Cluster operations.
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct ClusterClient;
pub struct ClusterClient {
/// Inner client
inner: xlineapi::ClusterClient<AuthService<Channel>>,
}

impl ClusterClient {
/// Create a new cluster client
#[inline]
#[must_use]
pub fn new() -> Self {
Self
pub fn new(channel: Channel, token: Option<String>) -> Self {
Self {
inner: xlineapi::ClusterClient::new(AuthService::new(
channel,
token.and_then(|t| t.parse().ok().map(Arc::new)),
)),
}
}

/// Add a new member to the cluster.
///
/// # Errors
///
/// Returns an error if the request could not be sent or if the response is invalid.
///
/// # Examples
///
/// ```no_run
/// use xline_client::{error::Result, Client, ClientOptions};
/// use xline_client::types::cluster::*;
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
/// let curp_members = ["10.0.0.1:2379", "10.0.0.2:2379", "10.0.0.3:2379"];
///
/// let mut client = Client::connect(curp_members, ClientOptions::default())
/// .await?
/// .cluster_client();
///
/// let resp = client.member_add(MemberAddRequest::new(vec!["127.0.0.1:2380".to_owned()], true)).await?;
///
/// println!(
/// "members: {:?}, added: {:?}",
/// resp.members, resp.member
/// );
///
/// Ok(())
/// }
/// ```
#[inline]
pub async fn member_add(&mut self, request: MemberAddRequest) -> Result<MemberAddResponse> {
Ok(self
.inner
.member_add(xlineapi::MemberAddRequest::from(request))
.await?
.into_inner())
}

/// Remove an existing member from the cluster.
///
/// # Errors
///
/// Returns an error if the request could not be sent or if the response is invalid.
///
/// # Examples
///
/// ```no_run
/// use xline_client::{error::Result, Client, ClientOptions};
/// use xline_client::types::cluster::*;
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
/// let curp_members = ["10.0.0.1:2379", "10.0.0.2:2379", "10.0.0.3:2379"];
///
/// let mut client = Client::connect(curp_members, ClientOptions::default())
/// .await?
/// .cluster_client();
/// let resp = client.member_remove(MemberRemoveRequest::new(1)).await?;
///
/// println!("members: {:?}", resp.members);
///
/// Ok(())
/// }
///
#[inline]
pub async fn member_remove(
&mut self,
request: MemberRemoveRequest,
) -> Result<MemberRemoveResponse> {
Ok(self
.inner
.member_remove(xlineapi::MemberRemoveRequest::from(request))
.await?
.into_inner())
}

/// Promote an existing member to be the leader of the cluster.
///
/// # Errors
///
/// Returns an error if the request could not be sent or if the response is invalid.
///
/// # Examples
///
/// ```no_run
/// use xline_client::{error::Result, Client, ClientOptions};
/// use xline_client::types::cluster::*;
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
/// let curp_members = ["10.0.0.1:2379", "10.0.0.2:2379", "10.0.0.3:2379"];
///
/// let mut client = Client::connect(curp_members, ClientOptions::default())
/// .await?
/// .cluster_client();
/// let resp = client.member_promote(MemberPromoteRequest::new(1)).await?;
///
/// println!("members: {:?}", resp.members);
///
/// Ok(())
/// }
///
#[inline]
pub async fn member_promote(
&mut self,
request: MemberPromoteRequest,
) -> Result<MemberPromoteResponse> {
Ok(self
.inner
.member_promote(xlineapi::MemberPromoteRequest::from(request))
.await?
.into_inner())
}

/// Update an existing member in the cluster.
///
/// # Errors
///
/// Returns an error if the request could not be sent or if the response is invalid.
///
/// # Examples
///
/// ```no_run
/// use xline_client::{error::Result, Client, ClientOptions};
/// use xline_client::types::cluster::*;
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
/// let curp_members = ["10.0.0.1:2379", "10.0.0.2:2379", "10.0.0.3:2379"];
///
/// let mut client = Client::connect(curp_members, ClientOptions::default())
/// .await?
/// .cluster_client();
/// let resp = client.member_update(MemberUpdateRequest::new(1, vec!["127.0.0.1:2379".to_owned()])).await?;
///
/// println!("members: {:?}", resp.members);
///
/// Ok(())
/// }
///
#[inline]
pub async fn member_update(
&mut self,
request: MemberUpdateRequest,
) -> Result<MemberUpdateResponse> {
Ok(self
.inner
.member_update(xlineapi::MemberUpdateRequest::from(request))
.await?
.into_inner())
}

/// List all members in the cluster.
///
/// # Errors
///
/// Returns an error if the request could not be sent or if the response is invalid.
///
/// # Examples
///
/// ```no_run
/// use xline_client::{error::Result, Client, ClientOptions};
/// use xline_client::types::cluster::*;
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
/// let curp_members = ["10.0.0.1:2379", "10.0.0.2:2379", "10.0.0.3:2379"];
///
/// let mut client = Client::connect(curp_members, ClientOptions::default())
/// .await?
/// .cluster_client();
/// let resp = client.member_list(MemberListRequest::new(false)).await?;
///
/// println!("members: {:?}", resp.members);
///
/// Ok(())
/// }
#[inline]
pub async fn member_list(&mut self, request: MemberListRequest) -> Result<MemberListResponse> {
Ok(self
.inner
.member_list(xlineapi::MemberListRequest::from(request))
.await?
.into_inner())
}
}
2 changes: 1 addition & 1 deletion xline-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ impl Client {
);
let auth = AuthClient::new(curp_client, channel.clone(), token.clone());
let maintenance = MaintenanceClient::new(channel.clone(), token.clone());
let cluster = ClusterClient::new(channel.clone(), token.clone());
let watch = WatchClient::new(channel, token);
let cluster = ClusterClient::new();
let election = ElectionClient::new();

Ok(Self {
Expand Down
Loading
Loading