Skip to content

Commit

Permalink
fix: deadlock when dedup stream read (#1199)
Browse files Browse the repository at this point in the history
## Rationale
The deadlock will happen when dedup stream read requests because the
serial execution of sending and receiving data on a bounded channel.

## Detailed Changes
Send the data in background.

## Test Plan
Test it manually.
  • Loading branch information
ShiKaiWi authored Sep 7, 2023
1 parent 70996c5 commit 395debb
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 60 deletions.
26 changes: 11 additions & 15 deletions server/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use ceresdbproto::{
storage::storage_service_server::StorageServiceServer,
};
use cluster::ClusterRef;
use common_types::{column_schema, record_batch::RecordBatch};
use common_types::column_schema;
use futures::FutureExt;
use generic_error::GenericError;
use log::{info, warn};
Expand All @@ -44,15 +44,11 @@ use proxy::{
use runtime::{JoinHandle, Runtime};
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::engine::EngineRuntimes;
use tokio::sync::{
mpsc,
oneshot::{self, Sender},
};
use tokio::sync::oneshot::{self, Sender};
use tonic::transport::Server;

use crate::grpc::{
meta_event_service::MetaServiceImpl,
remote_engine_service::{error, RemoteEngineServiceImpl, StreamReadReqKey},
meta_event_service::MetaServiceImpl, remote_engine_service::RemoteEngineServiceImpl,
storage_service::StorageServiceImpl,
};

Expand Down Expand Up @@ -216,8 +212,7 @@ pub struct Builder {
cluster: Option<ClusterRef>,
opened_wals: Option<OpenedWals>,
proxy: Option<Arc<Proxy>>,
request_notifiers:
Option<Arc<RequestNotifiers<StreamReadReqKey, mpsc::Sender<error::Result<RecordBatch>>>>>,
enable_dedup_stream_read: bool,
hotspot_recorder: Option<Arc<HotspotRecorder>>,
}

Expand All @@ -231,7 +226,7 @@ impl Builder {
cluster: None,
opened_wals: None,
proxy: None,
request_notifiers: None,
enable_dedup_stream_read: false,
hotspot_recorder: None,
}
}
Expand Down Expand Up @@ -277,10 +272,8 @@ impl Builder {
self
}

pub fn request_notifiers(mut self, enable_query_dedup: bool) -> Self {
if enable_query_dedup {
self.request_notifiers = Some(Arc::new(RequestNotifiers::default()));
}
pub fn request_notifiers(mut self, v: bool) -> Self {
self.enable_dedup_stream_read = v;
self
}
}
Expand All @@ -304,10 +297,13 @@ impl Builder {
});

let remote_engine_server = {
let request_notifiers = self
.enable_dedup_stream_read
.then(|| Arc::new(RequestNotifiers::default()));
let service = RemoteEngineServiceImpl {
instance,
runtimes: runtimes.clone(),
request_notifiers: self.request_notifiers,
request_notifiers,
hotspot_recorder,
};
RemoteEngineServiceServer::new(service)
Expand Down
114 changes: 69 additions & 45 deletions server/src/grpc/remote_engine_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@

// Remote engine rpc service implementation.

use std::{hash::Hash, sync::Arc, time::Instant};
use std::{
hash::Hash,
sync::Arc,
time::{Duration, Instant},
};

use arrow_ext::ipc::{self, CompressOptions, CompressOutput, CompressionMethod};
use async_trait::async_trait;
Expand Down Expand Up @@ -45,7 +49,7 @@ use table_engine::{
table::TableRef,
};
use time_ext::InstantExt;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{self, Sender};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};

Expand All @@ -59,7 +63,8 @@ use crate::grpc::{

pub mod error;

const STREAM_QUERY_CHANNEL_LEN: usize = 20;
const DEDUP_RESP_NOTIFY_TIMEOUT_MS: u64 = 500;
const STREAM_QUERY_CHANNEL_LEN: usize = 200;
const DEFAULT_COMPRESS_MIN_LENGTH: usize = 80 * 1024;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -105,12 +110,14 @@ impl<F: FnMut()> Drop for ExecutionGuard<F> {
}
}

pub type StreamReadRequestNotifiers =
Arc<RequestNotifiers<StreamReadReqKey, mpsc::Sender<Result<RecordBatch>>>>;

#[derive(Clone)]
pub struct RemoteEngineServiceImpl {
pub instance: InstanceRef,
pub runtimes: Arc<EngineRuntimes>,
pub request_notifiers:
Option<Arc<RequestNotifiers<StreamReadReqKey, mpsc::Sender<Result<RecordBatch>>>>>,
pub request_notifiers: Option<StreamReadRequestNotifiers>,
pub hotspot_recorder: Arc<HotspotRecorder>,
}

Expand All @@ -119,7 +126,6 @@ impl RemoteEngineServiceImpl {
&self,
request: Request<ReadRequest>,
) -> Result<ReceiverStream<Result<RecordBatch>>> {
let instant = Instant::now();
let ctx = self.handler_ctx();
let (tx, rx) = mpsc::channel(STREAM_QUERY_CHANNEL_LEN);
let handle = self.runtimes.read_runtime.spawn(async move {
Expand Down Expand Up @@ -156,22 +162,15 @@ impl RemoteEngineServiceImpl {
});
}

// TODO(shuangxiao): this metric is invalid, refactor it.
REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC
.stream_read
.observe(instant.saturating_elapsed().as_secs_f64());
// TODO: add metrics to collect the time cost of the reading.
Ok(ReceiverStream::new(rx))
}

async fn deduped_stream_read_internal(
async fn dedup_stream_read_internal(
&self,
request_notifiers: Arc<
RequestNotifiers<StreamReadReqKey, mpsc::Sender<Result<RecordBatch>>>,
>,
request_notifiers: StreamReadRequestNotifiers,
request: Request<ReadRequest>,
) -> Result<ReceiverStream<Result<RecordBatch>>> {
let instant = Instant::now();
let ctx = self.handler_ctx();
let (tx, rx) = mpsc::channel(STREAM_QUERY_CHANNEL_LEN);

let request = request.into_inner();
Expand All @@ -189,25 +188,34 @@ impl RemoteEngineServiceImpl {
read_request.projected_schema.projection(),
);

let mut guard = match request_notifiers.insert_notifier(request_key.clone(), tx) {
match request_notifiers.insert_notifier(request_key.clone(), tx) {
// The first request, need to handle it, and then notify the other requests.
RequestResult::First => {
// This is used to remove key when future is cancelled.
ExecutionGuard::new(|| {
request_notifiers.take_notifiers(&request_key);
})
self.read_and_send_dedupped_resps(request, request_key, request_notifiers)
.await?;
}
// The request is waiting for the result of first request.
RequestResult::Wait => {
// TODO(shuangxiao): this metric is invalid, refactor it.
REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC
.stream_read
.observe(instant.saturating_elapsed().as_secs_f64());

return Ok(ReceiverStream::new(rx));
// TODO: add metrics to collect the time cost of waited stream
// read.
}
};
}
Ok(ReceiverStream::new(rx))
}

async fn read_and_send_dedupped_resps(
&self,
request: ReadRequest,
request_key: StreamReadReqKey,
request_notifiers: StreamReadRequestNotifiers,
) -> Result<()> {
let instant = Instant::now();
let ctx = self.handler_ctx();

// This is used to remove key when future is cancelled.
let mut guard = ExecutionGuard::new(|| {
request_notifiers.take_notifiers(&request_key);
});
let handle = self
.runtimes
.read_runtime
Expand Down Expand Up @@ -237,27 +245,46 @@ impl RemoteEngineServiceImpl {
stream_read.push(handle);
}

let mut batches = Vec::new();
// Collect all the data from the stream to let more duplicate request query to
// be batched.
let mut resps = Vec::new();
while let Some(result) = stream_read.next().await {
let batch = result.box_err().context(ErrWithCause {
code: StatusCode::Internal,
msg: "failed to join task",
})?;
batches.extend(batch);
resps.extend(batch);
}

// We should set cancel to guard, otherwise the key will be removed twice.
guard.cancel();
let notifiers = request_notifiers.take_notifiers(&request_key).unwrap();

let num_notifiers = notifiers.len();
// Do send in background to avoid blocking the rpc procedure.
self.runtimes.read_runtime.spawn(async move {
Self::send_dedupped_resps(resps, notifiers).await;

REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC
.stream_read
.observe(instant.saturating_elapsed().as_secs_f64());
});

Ok(())
}

/// Send the response to the queriers that share the same query request.
async fn send_dedupped_resps(
resps: Vec<Result<RecordBatch>>,
notifiers: Vec<Sender<Result<RecordBatch>>>,
) {
let mut num_rows = 0;
for batch in batches {
match batch {
let timeout = Duration::from_millis(DEDUP_RESP_NOTIFY_TIMEOUT_MS);
for resp in resps {
match resp {
Ok(batch) => {
num_rows += batch.num_rows() * num_notifiers;
num_rows += batch.num_rows();
for notifier in &notifiers {
if let Err(e) = notifier.send(Ok(batch.clone())).await {
if let Err(e) = notifier.send_timeout(Ok(batch.clone()), timeout).await {
error!("Failed to send handler result, err:{}.", e);
}
}
Expand All @@ -269,7 +296,8 @@ impl RemoteEngineServiceImpl {
msg: "failed to handler request".to_string(),
}
.fail();
if let Err(e) = notifier.send(err).await {

if let Err(e) = notifier.send_timeout(err, timeout).await {
error!("Failed to send handler result, err:{}.", e);
}
}
Expand All @@ -278,18 +306,14 @@ impl RemoteEngineServiceImpl {
}
}

let total_num_rows = (num_rows * notifiers.len()) as u64;
let num_dedupped_reqs = (notifiers.len() - 1) as u64;
REMOTE_ENGINE_GRPC_HANDLER_COUNTER_VEC
.query_succeeded_row
.inc_by(num_rows as u64);
.inc_by(total_num_rows);
REMOTE_ENGINE_GRPC_HANDLER_COUNTER_VEC
.dedupped_stream_query
.inc_by((num_notifiers - 1) as u64);
// TODO(shuangxiao): this metric is invalid, refactor it.
REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC
.stream_read
.observe(instant.saturating_elapsed().as_secs_f64());

Ok(ReceiverStream::new(rx))
.inc_by(num_dedupped_reqs);
}

async fn write_internal(
Expand Down Expand Up @@ -445,7 +469,7 @@ impl RemoteEngineService for RemoteEngineServiceImpl {
REMOTE_ENGINE_GRPC_HANDLER_COUNTER_VEC.stream_query.inc();
let result = match self.request_notifiers.clone() {
Some(request_notifiers) => {
self.deduped_stream_read_internal(request_notifiers, request)
self.dedup_stream_read_internal(request_notifiers, request)
.await
}
None => self.stream_read_internal(request).await,
Expand Down

0 comments on commit 395debb

Please sign in to comment.