Skip to content

Commit

Permalink
fix: improve remote write performance by using separate runtime (apac…
Browse files Browse the repository at this point in the history
…he#837)

* feat: call remote_engine service in io-runtime

* feat: spawn the sub io task immediately
  • Loading branch information
ShiKaiWi authored Apr 13, 2023
1 parent 8cc6371 commit 9d42f6e
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 37 deletions.
3 changes: 2 additions & 1 deletion analytic_engine/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,8 @@ impl Builder {
write_runtime: runtime.clone(),
meta_runtime: runtime.clone(),
compact_runtime: runtime.clone(),
default_runtime: runtime,
default_runtime: runtime.clone(),
io_runtime: runtime,
}),
}
}
Expand Down
20 changes: 14 additions & 6 deletions partition_table_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod partition;
use std::sync::Arc;

use async_trait::async_trait;
use common_util::error::BoxError;
use common_util::{error::BoxError, runtime::Runtime};
use snafu::{OptionExt, ResultExt};
use table_engine::{
engine::{
Expand All @@ -26,11 +26,15 @@ use crate::partition::{PartitionTableImpl, TableData};
/// Partition table engine implementation.
pub struct PartitionTableEngine {
remote_engine_ref: RemoteEngineRef,
io_runtime: Arc<Runtime>,
}

impl PartitionTableEngine {
pub fn new(remote_engine_ref: RemoteEngineRef) -> Self {
Self { remote_engine_ref }
pub fn new(remote_engine_ref: RemoteEngineRef, io_runtime: Arc<Runtime>) -> Self {
Self {
remote_engine_ref,
io_runtime,
}
}
}

Expand Down Expand Up @@ -58,9 +62,13 @@ impl TableEngine for PartitionTableEngine {
engine_type: request.engine,
};
Ok(Arc::new(
PartitionTableImpl::new(table_data, self.remote_engine_ref.clone())
.box_err()
.context(Unexpected)?,
PartitionTableImpl::new(
table_data,
self.remote_engine_ref.clone(),
self.io_runtime.clone(),
)
.box_err()
.context(Unexpected)?,
))
}

Expand Down
54 changes: 38 additions & 16 deletions partition_table_engine/src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

//! Distributed Table implementation
use std::{collections::HashMap, fmt};
use std::{collections::HashMap, fmt, sync::Arc};

use async_trait::async_trait;
use common_types::{
row::{Row, RowGroupBuilder},
schema::Schema,
};
use common_util::error::BoxError;
use common_util::{error::BoxError, runtime::Runtime};
use futures::future::try_join_all;
use snafu::ResultExt;
use table_engine::{
Expand Down Expand Up @@ -50,13 +50,19 @@ pub struct TableData {
pub struct PartitionTableImpl {
table_data: TableData,
remote_engine: RemoteEngineRef,
io_runtime: Arc<Runtime>,
}

impl PartitionTableImpl {
pub fn new(table_data: TableData, remote_engine: RemoteEngineRef) -> Result<Self> {
pub fn new(
table_data: TableData,
remote_engine: RemoteEngineRef,
io_runtime: Arc<Runtime>,
) -> Result<Self> {
Ok(Self {
table_data,
remote_engine,
io_runtime,
})
}

Expand Down Expand Up @@ -158,26 +164,42 @@ impl Table for PartitionTableImpl {
table: self.get_sub_table_ident(partition).table,
})?
.build();
futures.push(async move {
self.remote_engine
.write(RemoteWriteRequest {
table: self.get_sub_table_ident(partition),
write_request: WriteRequest { row_group },
})
.await
});

let request = RemoteWriteRequest {
table: self.get_sub_table_ident(partition),
write_request: WriteRequest { row_group },
};
let remote_engine = self.remote_engine.clone();
let write_handle = self
.io_runtime
.spawn(async move { remote_engine.write(request).await });
futures.push(write_handle);
}

let result = {
let write_results = {
// TODO: make it as local timer
let _remote_timer = PARTITION_TABLE_WRITE_DURATION_HISTOGRAM
.with_label_values(&["remote_write"])
.start_timer();
try_join_all(futures).await.box_err().context(Write {
table: self.name().to_string(),
})?

let handle = self.io_runtime.spawn(try_join_all(futures));
handle
.await
.box_err()
.context(Write { table: self.name() })?
.box_err()
.context(Write { table: self.name() })?
};

Ok(result.into_iter().sum())
let mut total_rows = 0;
for write_result in write_results {
let written_rows = write_result
.box_err()
.context(Write { table: self.name() })?;
total_rows += written_rows;
}

Ok(total_rows)
}

async fn read(&self, _request: ReadRequest) -> Result<SendableRecordBatchStream> {
Expand Down
14 changes: 6 additions & 8 deletions remote_engine_client/src/cached_router.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Cached router
Expand Down Expand Up @@ -43,14 +43,14 @@ impl CachedRouter {
cache.get(table_ident).cloned()
};

let channel = if let Some(channel) = channel_opt {
if let Some(channel) = channel_opt {
// If found, return it.
debug!(
"CachedRouter found channel in cache, table_ident:{:?}",
table_ident
);

channel
Ok(channel)
} else {
// If not found, do real route work, and try to put it into cache(may have been
// put by other threads).
Expand All @@ -63,7 +63,7 @@ impl CachedRouter {
{
let mut cache = self.cache.write().unwrap();
// Double check here, if still not found, we put it.
let channel_opt = cache.get(table_ident).cloned();
let channel_opt = cache.get(table_ident);
if channel_opt.is_none() {
debug!(
"CachedRouter put the new channel to cache, table_ident:{:?}",
Expand All @@ -73,10 +73,8 @@ impl CachedRouter {
}
}

channel
};

Ok(channel)
Ok(channel)
}
}

pub async fn evict(&self, table_ident: &TableIdentifier) {
Expand Down
7 changes: 2 additions & 5 deletions remote_engine_client/src/channel.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Channel pool
Expand Down Expand Up @@ -35,10 +35,7 @@ impl ChannelPool {
}
}

let channel = self
.builder
.build(endpoint.clone().to_string().as_str())
.await?;
let channel = self.builder.build(&endpoint.to_string()).await?;
let mut inner = self.channels.write().unwrap();
// Double check here.
if let Some(channel) = inner.get(endpoint) {
Expand Down
5 changes: 4 additions & 1 deletion server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,10 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
router.clone(),
));

let partition_table_engine = Arc::new(PartitionTableEngine::new(remote_engine_ref.clone()));
let partition_table_engine = Arc::new(PartitionTableEngine::new(
remote_engine_ref.clone(),
engine_runtimes.io_runtime.clone(),
));

let instance = {
let instance = Instance {
Expand Down
3 changes: 3 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ pub struct RuntimeConfig {
pub compact_thread_num: usize,
/// Runtime for other tasks which may not important
pub default_thread_num: usize,
/// Runtime for io
pub io_thread_num: usize,
}

impl Default for RuntimeConfig {
Expand All @@ -97,6 +99,7 @@ impl Default for RuntimeConfig {
meta_thread_num: 2,
compact_thread_num: 4,
default_thread_num: 8,
io_thread_num: 4,
}
}
}
1 change: 1 addition & 0 deletions src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ fn build_engine_runtimes(config: &RuntimeConfig) -> EngineRuntimes {
compact_runtime: Arc::new(build_runtime("ceres-compact", config.compact_thread_num)),
meta_runtime: Arc::new(build_runtime("ceres-meta", config.meta_thread_num)),
default_runtime: Arc::new(build_runtime("ceres-default", config.default_thread_num)),
io_runtime: Arc::new(build_runtime("ceres-io", config.io_thread_num)),
}
}

Expand Down
2 changes: 2 additions & 0 deletions table_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,4 +313,6 @@ pub struct EngineRuntimes {
pub meta_runtime: Arc<Runtime>,
/// Runtime for some other tasks which are not so important
pub default_runtime: Arc<Runtime>,
/// Runtime for io task
pub io_runtime: Arc<Runtime>,
}

0 comments on commit 9d42f6e

Please sign in to comment.