Skip to content

Commit

Permalink
feat: hotspot record remote engine requests (apache#1127)
Browse files Browse the repository at this point in the history
## Rationale
Currently hotspot will not records remote engine requests 

## Detailed Changes
Add hotspot for remote engine

## Test Plan
No need
  • Loading branch information
jiacai2050 authored Aug 4, 2023
1 parent 7a701c6 commit cfe178a
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions proxy/src/hotspot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl Default for Config {
}
}

enum Message {
pub enum Message {
Query(QueryKey),
Write {
key: WriteKey,
Expand Down Expand Up @@ -117,6 +117,7 @@ pub struct Dump {
pub write_field_hots: Vec<String>,
}

// TODO: move HotspotRecorder to components dir for reuse.
impl HotspotRecorder {
pub fn new(config: Config, runtime: Arc<Runtime>) -> Self {
let hotspot_query = Self::init_lru(config.query_cap);
Expand Down Expand Up @@ -287,7 +288,7 @@ impl HotspotRecorder {
}
}

async fn send_msg_or_log(&self, method: &str, msg: Message) {
pub async fn send_msg_or_log(&self, method: &str, msg: Message) {
if let Err(e) = self.tx.send(msg).await {
warn!(
"HotspotRecoder::{} fail to send \
Expand Down
6 changes: 1 addition & 5 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
resp_compress_min_length: usize,
auto_create_table: bool,
schema_config_provider: SchemaConfigProviderRef,
hotspot_config: hotspot::Config,
hotspot_recorder: Arc<HotspotRecorder>,
engine_runtimes: Arc<EngineRuntimes>,
cluster_with_meta: bool,
) -> Self {
Expand All @@ -113,10 +113,6 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
router.clone(),
local_endpoint,
));
let hotspot_recorder = Arc::new(HotspotRecorder::new(
hotspot_config,
engine_runtimes.default_runtime.clone(),
));

Self {
router,
Expand Down
1 change: 1 addition & 0 deletions remote_engine_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ futures = { workspace = true }
generic_error = { workspace = true }
log = { workspace = true }
macros = { workspace = true }
proxy = { workspace = true }
router = { workspace = true }
runtime = { workspace = true }
serde = { workspace = true }
Expand Down
73 changes: 65 additions & 8 deletions remote_engine_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ use common_types::{record_batch::RecordBatch, schema::RecordSchema};
pub use config::Config;
use futures::{Stream, StreamExt};
use generic_error::BoxError;
use proxy::hotspot::{HotspotRecorder, Message};
use router::RouterRef;
use runtime::Runtime;
use snafu::ResultExt;
use table_engine::{
remote::{
self,
model::{GetTableInfoRequest, ReadRequest, TableInfo, WriteBatchResult, WriteRequest},
model::{
GetTableInfoRequest, ReadRequest, TableIdentifier, TableInfo, WriteBatchResult,
WriteRequest,
},
RemoteEngine,
},
stream::{self, ErrWithSource, RecordBatchStream, SendableRecordBatchStream},
Expand Down Expand Up @@ -104,40 +108,93 @@ pub mod error {
define_result!(Error);
}

pub struct RemoteEngineImpl(Client);
pub struct RemoteEngineImpl {
client: Client,
hotspot_recorder: Arc<HotspotRecorder>,
}

impl RemoteEngineImpl {
pub fn new(config: Config, router: RouterRef, worker_runtime: Arc<Runtime>) -> Self {
pub fn new(
config: Config,
router: RouterRef,
worker_runtime: Arc<Runtime>,
hotspot_recorder: Arc<HotspotRecorder>,
) -> Self {
let client = Client::new(config, router, worker_runtime);

Self(client)
Self {
client,
hotspot_recorder,
}
}

fn format_hot_key(table: &TableIdentifier) -> String {
format!("{}/{}", table.schema, table.table)
}

async fn record_write(&self, request: &WriteRequest) {
let hot_key = Self::format_hot_key(&request.table);
let row_count = request.write_request.row_group.num_rows();
let field_count = row_count * request.write_request.row_group.schema().num_columns();
self.hotspot_recorder
.send_msg_or_log(
"inc_write_reqs",
Message::Write {
key: hot_key,
row_count,
field_count,
},
)
.await;
}
}

#[async_trait]
impl RemoteEngine for RemoteEngineImpl {
async fn read(&self, request: ReadRequest) -> remote::Result<SendableRecordBatchStream> {
let client_read_stream = self.0.read(request).await.box_err().context(remote::Read)?;
self.hotspot_recorder
.send_msg_or_log(
"inc_query_reqs",
Message::Query(Self::format_hot_key(&request.table)),
)
.await;

let client_read_stream = self
.client
.read(request)
.await
.box_err()
.context(remote::Read)?;
Ok(Box::pin(RemoteReadRecordBatchStream(client_read_stream)))
}

async fn write(&self, request: WriteRequest) -> remote::Result<usize> {
self.0.write(request).await.box_err().context(remote::Write)
self.record_write(&request).await;

self.client
.write(request)
.await
.box_err()
.context(remote::Write)
}

async fn write_batch(
&self,
requests: Vec<WriteRequest>,
) -> remote::Result<Vec<WriteBatchResult>> {
self.0
for req in &requests {
self.record_write(req).await;
}

self.client
.write_batch(requests)
.await
.box_err()
.context(remote::Write)
}

async fn get_table_info(&self, request: GetTableInfoRequest) -> remote::Result<TableInfo> {
self.0
self.client
.get_table_info(request)
.await
.box_err()
Expand Down
8 changes: 7 additions & 1 deletion server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use logger::RuntimeLevel;
use macros::define_result;
use partition_table_engine::PartitionTableEngine;
use proxy::{
hotspot::HotspotRecorder,
instance::{Instance, InstanceRef},
limiter::Limiter,
schema_config_provider::SchemaConfigProviderRef,
Expand Down Expand Up @@ -316,10 +317,15 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
let engine_runtimes = self.engine_runtimes.context(MissingEngineRuntimes)?;
let config_content = self.config_content.expect("Missing config content");

let hotspot_recorder = Arc::new(HotspotRecorder::new(
self.server_config.hotspot,
engine_runtimes.default_runtime.clone(),
));
let remote_engine_ref = Arc::new(RemoteEngineImpl::new(
self.remote_engine_client_config.clone(),
router.clone(),
engine_runtimes.io_runtime.clone(),
hotspot_recorder.clone(),
));

let partition_table_engine = Arc::new(PartitionTableEngine::new(remote_engine_ref.clone()));
Expand Down Expand Up @@ -363,7 +369,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
self.server_config.resp_compress_min_length.as_byte() as usize,
self.server_config.auto_create_table,
provider.clone(),
self.server_config.hotspot,
hotspot_recorder,
engine_runtimes.clone(),
self.cluster.is_some(),
));
Expand Down

0 comments on commit cfe178a

Please sign in to comment.