Skip to content

Commit 29151ef

Browse files
authored
Merge pull request grpc#22837 from vjpai/channel_acec
Fixes on ApplicationCallbackExecCtx
2 parents 662141e + 611dccb commit 29151ef

File tree

4 files changed

+39
-8
lines changed

4 files changed

+39
-8
lines changed

Diff for: src/core/lib/iomgr/exec_ctx.h

+5
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,11 @@ class ApplicationCallbackExecCtx {
357357
/** Global shutdown for ApplicationCallbackExecCtx. Called by init. */
358358
static void GlobalShutdown(void) { gpr_tls_destroy(&callback_exec_ctx_); }
359359

360+
static bool Available() {
361+
return reinterpret_cast<ApplicationCallbackExecCtx*>(
362+
gpr_tls_get(&callback_exec_ctx_)) != nullptr;
363+
}
364+
360365
private:
361366
uintptr_t flags_{0u};
362367
grpc_experimental_completion_queue_functor* head_{nullptr};

Diff for: src/core/lib/surface/channel.cc

+6
Original file line numberDiff line numberDiff line change
@@ -329,13 +329,15 @@ char* grpc_channel_get_target(grpc_channel* channel) {
329329

330330
void grpc_channel_get_info(grpc_channel* channel,
331331
const grpc_channel_info* channel_info) {
332+
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
332333
grpc_core::ExecCtx exec_ctx;
333334
grpc_channel_element* elem =
334335
grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
335336
elem->filter->get_channel_info(elem, channel_info);
336337
}
337338

338339
void grpc_channel_reset_connect_backoff(grpc_channel* channel) {
340+
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
339341
grpc_core::ExecCtx exec_ctx;
340342
GRPC_API_TRACE("grpc_channel_reset_connect_backoff(channel=%p)", 1,
341343
(channel));
@@ -386,6 +388,7 @@ grpc_call* grpc_channel_create_call(grpc_channel* channel,
386388
grpc_slice method, const grpc_slice* host,
387389
gpr_timespec deadline, void* reserved) {
388390
GPR_ASSERT(!reserved);
391+
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
389392
grpc_core::ExecCtx exec_ctx;
390393
grpc_call* call = grpc_channel_create_call_internal(
391394
channel, parent_call, propagation_mask, cq, nullptr,
@@ -449,6 +452,7 @@ void* grpc_channel_register_call(grpc_channel* channel, const char* method,
449452
"grpc_channel_register_call(channel=%p, method=%s, host=%s, reserved=%p)",
450453
4, (channel, method, host, reserved));
451454
GPR_ASSERT(!reserved);
455+
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
452456
grpc_core::ExecCtx exec_ctx;
453457

454458
grpc_core::MutexLock lock(&channel->registration_table->mu);
@@ -481,6 +485,7 @@ grpc_call* grpc_channel_create_registered_call(
481485
registered_call_handle, deadline.tv_sec, deadline.tv_nsec,
482486
(int)deadline.clock_type, reserved));
483487
GPR_ASSERT(!reserved);
488+
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
484489
grpc_core::ExecCtx exec_ctx;
485490
grpc_call* call = grpc_channel_create_call_internal(
486491
channel, parent_call, propagation_mask, completion_queue, nullptr,
@@ -532,6 +537,7 @@ void grpc_channel_destroy_internal(grpc_channel* channel) {
532537
}
533538

534539
void grpc_channel_destroy(grpc_channel* channel) {
540+
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
535541
grpc_core::ExecCtx exec_ctx;
536542
grpc_channel_destroy_internal(channel);
537543
}

Diff for: src/core/lib/surface/completion_queue.cc

+8-1
Original file line numberDiff line numberDiff line change
@@ -874,8 +874,15 @@ static void cq_end_op_for_callback(
874874
cq_finish_shutdown_callback(cq);
875875
}
876876

877+
// If possible, schedule the callback onto an existing thread-local
878+
// ApplicationCallbackExecCtx, which is a work queue. This is possible for:
879+
// 1. The callback is internally-generated and there is an ACEC available
880+
// 2. The callback is marked inlineable and there is an ACEC available
881+
// 3. We are already running in a background poller thread (which always has
882+
// an ACEC available at the base of the stack).
877883
auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag);
878-
if (internal || functor->inlineable ||
884+
if (((internal || functor->inlineable) &&
885+
grpc_core::ApplicationCallbackExecCtx::Available()) ||
879886
grpc_iomgr_is_any_background_poller_thread()) {
880887
grpc_core::ApplicationCallbackExecCtx::Enqueue(functor,
881888
(error == GRPC_ERROR_NONE));

Diff for: test/cpp/end2end/client_callback_end2end_test.cc

+20-7
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ class ClientCallbackEnd2endTest
299299
}
300300
}
301301

302-
void SendGenericEchoAsBidi(int num_rpcs, int reuses) {
302+
void SendGenericEchoAsBidi(int num_rpcs, int reuses, bool do_writes_done) {
303303
const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo");
304304
grpc::string test_string("");
305305
for (int i = 0; i < num_rpcs; i++) {
@@ -308,8 +308,8 @@ class ClientCallbackEnd2endTest
308308
ByteBuffer> {
309309
public:
310310
Client(ClientCallbackEnd2endTest* test, const grpc::string& method_name,
311-
const grpc::string& test_str, int reuses)
312-
: reuses_remaining_(reuses) {
311+
const grpc::string& test_str, int reuses, bool do_writes_done)
312+
: reuses_remaining_(reuses), do_writes_done_(do_writes_done) {
313313
activate_ = [this, test, method_name, test_str] {
314314
if (reuses_remaining_ > 0) {
315315
cli_ctx_.reset(new ClientContext);
@@ -329,7 +329,11 @@ class ClientCallbackEnd2endTest
329329
};
330330
activate_();
331331
}
332-
void OnWriteDone(bool /*ok*/) override { StartWritesDone(); }
332+
void OnWriteDone(bool /*ok*/) override {
333+
if (do_writes_done_) {
334+
StartWritesDone();
335+
}
336+
}
333337
void OnReadDone(bool /*ok*/) override {
334338
EchoResponse response;
335339
EXPECT_TRUE(ParseFromByteBuffer(&recv_buf_, &response));
@@ -355,7 +359,10 @@ class ClientCallbackEnd2endTest
355359
std::mutex mu_;
356360
std::condition_variable cv_;
357361
bool done_ = false;
358-
} rpc{this, kMethodName, test_string, reuses};
362+
const bool do_writes_done_;
363+
};
364+
365+
Client rpc(this, kMethodName, test_string, reuses, do_writes_done);
359366

360367
rpc.Await();
361368
}
@@ -517,13 +524,19 @@ TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcs) {
517524
TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidi) {
518525
MAYBE_SKIP_TEST;
519526
ResetStub();
520-
SendGenericEchoAsBidi(10, 1);
527+
SendGenericEchoAsBidi(10, 1, /*do_writes_done=*/true);
521528
}
522529

523530
TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidiWithReactorReuse) {
524531
MAYBE_SKIP_TEST;
525532
ResetStub();
526-
SendGenericEchoAsBidi(10, 10);
533+
SendGenericEchoAsBidi(10, 10, /*do_writes_done=*/true);
534+
}
535+
536+
TEST_P(ClientCallbackEnd2endTest, GenericRpcNoWritesDone) {
537+
MAYBE_SKIP_TEST;
538+
ResetStub();
539+
SendGenericEchoAsBidi(1, 1, /*do_writes_done=*/false);
527540
}
528541

529542
#if GRPC_ALLOW_EXCEPTIONS

0 commit comments

Comments
 (0)