Skip to content

Commit

Permalink
Merge pull request #9615 from dantengsky/fix-sqllogic-test-hangs
Browse files Browse the repository at this point in the history
fix: sqllogic test hangs (cluster mod + clickhouse handler)
  • Loading branch information
BohuTANG authored Jan 16, 2023
2 parents c41ad78 + 9ae1dbf commit 1a3b107
Showing 1 changed file with 72 additions and 72 deletions.
144 changes: 72 additions & 72 deletions src/query/service/src/servers/http/clickhouse_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ async fn execute(
let format_typ = format.typ.clone();

// the reason of spawning new task to execute the interpreter:
// (sorry, I failed to describe it in a more concise way)
// (FIXME describe this in a more concise way)
//
// - there are executions of interpreters that will block the caller (NOT async wait)
// e.g. PipelineCompleteExecutor::execute, will spawn thread that executes the pipeline,
// and then, join the thread handle.
// - async mutex (tokio::sync::Mutex) are used while executing the queries/statements
// An async task may yield while holding the lock of an async mutex. e.g. embedded meta store
// - this method(execute) is running with default tokio runtime
// - this method(execute) is running with default tokio runtime (the "tokio-runtime-worker" thread)
//
// if executes the interpreter "directly" (by using current thread), the following deadlock may happen:
//
Expand All @@ -145,79 +145,79 @@ async fn execute(
//
// P.S. I think it will be better/more reasonable if we could avoid using pthread_join inside an async stack.

let mut data_stream = ctx
.try_spawn({
let ctx = ctx.clone();
async move { interpreter.execute(ctx.clone()).await }
})?
.await
.map_err(|err| {
ErrorCode::from_string(format!(
"clickhouse handler failed to join interpreter thread: {err:?}"
))
})??;

let table_schema = infer_table_schema(&schema)?;
let mut output_format = FileFormatOptionsExt::get_output_format_from_clickhouse_format(
format,
table_schema,
&ctx.get_settings(),
)?;

let prefix = Ok(output_format.serialize_prefix()?);

let compress_fn = move |rb: Result<Vec<u8>>| -> Result<Vec<u8>> {
if params.compress() {
match rb {
Ok(b) => compress_block(b),
Err(e) => Err(e),
}
} else {
rb
}
};

// try to catch runtime error before http response, so user can client can get http 500
let first_block = match data_stream.next().await {
Some(block) => match block {
Ok(block) => Some(compress_fn(output_format.serialize_block(&block))),
Err(err) => return Err(err),
},
None => None,
};

let session = ctx.get_current_session();
let stream = stream! {
yield compress_fn(prefix);
let mut ok = true;
// do not pull data_stream if we already meet a None
if let Some(block) = first_block {
yield block;
while let Some(block) = data_stream.next().await {
match block{
Ok(block) => {
yield compress_fn(output_format.serialize_block(&block));
},
Err(err) => {
let message = format!("{}", err);
yield compress_fn(Ok(message.into_bytes()));
ok = false;
break
ctx.try_spawn({
let ctx = ctx.clone();
async move {
let mut data_stream = interpreter.execute(ctx.clone()).await?;
let table_schema = infer_table_schema(&schema)?;
let mut output_format = FileFormatOptionsExt::get_output_format_from_clickhouse_format(
format,
table_schema,
&ctx.get_settings(),
)?;

let prefix = Ok(output_format.serialize_prefix()?);

let compress_fn = move |rb: Result<Vec<u8>>| -> Result<Vec<u8>> {
if params.compress() {
match rb {
Ok(b) => compress_block(b),
Err(e) => Err(e),
}
} else {
rb
}
};

// try to catch runtime error before http response, so user can client can get http 500
let first_block = match data_stream.next().await {
Some(block) => match block {
Ok(block) => Some(compress_fn(output_format.serialize_block(&block))),
Err(err) => return Err(err),
},
None => None,
};

let session = ctx.get_current_session();
let stream = stream! {
yield compress_fn(prefix);
let mut ok = true;
// do not pull data_stream if we already meet a None
if let Some(block) = first_block {
yield block;
while let Some(block) = data_stream.next().await {
match block{
Ok(block) => {
yield compress_fn(output_format.serialize_block(&block));
},
Err(err) => {
let message = format!("{}", err);
yield compress_fn(Ok(message.into_bytes()));
ok = false;
break
}
};
}
};
}
if ok {
yield compress_fn(output_format.finalize());
}
// to hold session ref until stream is all consumed
let _ = session.get_id();
};
if let Some(handle) = handle {
handle.await.expect("must")
}
}
if ok {
yield compress_fn(output_format.finalize());
}
// to hold session ref until stream is all consumed
let _ = session.get_id();
};
if let Some(handle) = handle {
handle.await.expect("must")
}

Ok(Body::from_bytes_stream(stream).with_content_type(format_typ.get_content_type()))
Ok(Body::from_bytes_stream(stream).with_content_type(format_typ.get_content_type()))
}
})?
.await
.map_err(|err| {
ErrorCode::from_string(format!(
"clickhouse handler failed to join interpreter thread: {err:?}"
))
})?
}

#[poem::handler]
Expand Down

1 comment on commit 1a3b107

@vercel
Copy link

@vercel vercel bot commented on 1a3b107 Jan 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-databend.vercel.app
databend.vercel.app
databend.rs
databend-git-main-databend.vercel.app

Please sign in to comment.