Skip to content

Commit

Permalink
impl remote sdk.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Dec 27, 2022
1 parent d1b7826 commit 19edda7
Show file tree
Hide file tree
Showing 14 changed files with 809 additions and 7 deletions.
18 changes: 18 additions & 0 deletions common_types/src/projected_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,24 @@ impl ProjectedSchema {
}
}

impl From<ProjectedSchema> for proto::remote_engine::ProjectedSchema {
fn from(request: ProjectedSchema) -> Self {
let table_schema_pb = (&request.0.original_schema).into();
let projection_pb = request.0.projection.as_ref().map(|project| {
let project = project
.iter()
.map(|one_project| *one_project as u64)
.collect::<Vec<u64>>();
proto::remote_engine::Projection { idx: project }
});

Self {
table_schema: Some(table_schema_pb),
projection: projection_pb,
}
}
}

/// Schema with projection informations
struct ProjectedSchemaInner {
/// The schema before projection that the reader intended to read, may
Expand Down
2 changes: 1 addition & 1 deletion common_types/src/request_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
};

#[derive(Debug, Clone, Copy)]
pub struct RequestId(u64);
pub struct RequestId(pub u64);

impl RequestId {
/// Acquire next request id.
Expand Down
2 changes: 1 addition & 1 deletion components/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ workspace = true
async-trait = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
clru = "0.6.1"
clru = { workspace = true }
common_util = { workspace = true }
crc = "3.0.0"
futures = { workspace = true }
Expand Down
26 changes: 26 additions & 0 deletions remote_engine_client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "remote_engine_client"

[package.version]
workspace = true

[package.authors]
workspace = true

[package.edition]
workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = { workspace = true }
ceresdbproto = { workspace = true }
clru = { workspace = true }
common_types = { workspace = true }
common_util = { workspace = true }
futures = { workspace = true }
proto = { workspace = true }
router = { workspace = true }
snafu = { workspace = true }
table_engine = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }
92 changes: 92 additions & 0 deletions remote_engine_client/src/channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

//! Channel pool

use std::num::NonZeroUsize;

use clru::CLruCache;
use snafu::ResultExt;
use tokio::sync::Mutex;
use tonic::transport::{Channel, Endpoint};

use super::config::Config;
use crate::error::*;

/// Pool for reusing the built channel
pub struct ChannelPool {
/// Channels in pool
// TODO: should be replaced with a cache(like "moka")
// or partition the lock.
channels: Mutex<CLruCache<String, Channel>>,

/// Channel builder
builder: ChannelBuilder,
}

impl ChannelPool {
pub fn new(config: Config) -> Self {
let channels = Mutex::new(CLruCache::new(
NonZeroUsize::new(config.channel_pool_max_size).unwrap(),
));
let builder = ChannelBuilder::new(config);

Self { channels, builder }
}

pub async fn get(&self, endpoint: &str) -> Result<Channel> {
{
let mut inner = self.channels.lock().await;
if let Some(channel) = inner.get(endpoint) {
return Ok(channel.clone());
}
}

let mut inner = self.channels.lock().await;
// Double check here.
if let Some(channel) = inner.get(endpoint) {
return Ok(channel.clone());
}

let channel = self.builder.build(endpoint).await?;
inner.put(endpoint.to_string(), channel.clone());

Ok(channel)
}
}

/// Channel builder
struct ChannelBuilder {
config: Config,
}

impl ChannelBuilder {
fn new(config: Config) -> Self {
Self { config }
}

async fn build(&self, endpoint: &str) -> Result<Channel> {
let formatted_endpoint = make_formatted_endpoint(endpoint);
let configured_endpoint =
Endpoint::from_shared(formatted_endpoint.clone()).context(BuildChannel {
addr: formatted_endpoint.clone(),
msg: "invalid endpoint",
})?;

let configured_endpoint = configured_endpoint
.connect_timeout(self.config.connect_timeout.0)
.keep_alive_timeout(self.config.channel_keep_alive_timeout.0)
.http2_keep_alive_interval(self.config.channel_keep_alive_interval.0)
.keep_alive_while_idle(true);

let channel = configured_endpoint.connect().await.context(BuildChannel {
addr: formatted_endpoint.clone(),
msg: "connect failed",
})?;

Ok(channel)
}
}

fn make_formatted_endpoint(endpoint: &str) -> String {
format!("http://{}", endpoint)
}
Loading

0 comments on commit 19edda7

Please sign in to comment.