Skip to content

Commit

Permalink
Merge pull request grpc#11703 from yang-g/cq_shutdown
Browse files Browse the repository at this point in the history
Allow adding events to cq after shutdown is called.
  • Loading branch information
yang-g authored Jul 31, 2017
2 parents 025ca9e + 097cbfc commit f787594
Show file tree
Hide file tree
Showing 15 changed files with 108 additions and 76 deletions.
4 changes: 3 additions & 1 deletion include/grpc/impl/codegen/grpc_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,9 @@ typedef enum grpc_call_error {
/** this batch of operations leads to more operations than allowed */
GRPC_CALL_ERROR_BATCH_TOO_BIG,
/** payload type requested is not the type registered */
GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH
GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH,
/** completion queue has been shutdown */
GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN
} grpc_call_error;

/** Default send/receive message size limits in bytes. -1 for unlimited. */
Expand Down
2 changes: 1 addition & 1 deletion src/core/ext/filters/client_channel/channel_connectivity.c
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ void grpc_channel_watch_connectivity_state(
7, (channel, (int)last_observed_state, deadline.tv_sec, deadline.tv_nsec,
(int)deadline.clock_type, cq, tag));

grpc_cq_begin_op(cq, tag);
GPR_ASSERT(grpc_cq_begin_op(cq, tag));

gpr_mu_init(&w->mu);
GRPC_CLOSURE_INIT(&w->on_complete, watch_complete, w,
Expand Down
3 changes: 2 additions & 1 deletion src/core/lib/surface/alarm.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/surface/completion_queue.h"

Expand Down Expand Up @@ -49,7 +50,7 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline,
alarm->cq = cq;
alarm->tag = tag;

grpc_cq_begin_op(cq, tag);
GPR_ASSERT(grpc_cq_begin_op(cq, tag));
GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&exec_ctx, &alarm->alarm,
Expand Down
6 changes: 4 additions & 2 deletions src/core/lib/surface/call.c
Original file line number Diff line number Diff line change
Expand Up @@ -1422,7 +1422,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,

if (nops == 0) {
if (!is_notify_tag_closure) {
grpc_cq_begin_op(call->cq, notify_tag);
GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE,
free_no_op_completion, NULL,
gpr_malloc(sizeof(grpc_cq_completion)));
Expand Down Expand Up @@ -1723,7 +1723,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,

GRPC_CALL_INTERNAL_REF(call, "completion");
if (!is_notify_tag_closure) {
grpc_cq_begin_op(call->cq, notify_tag);
GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
}
gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);

Expand Down Expand Up @@ -1844,6 +1844,8 @@ const char *grpc_call_error_to_string(grpc_call_error error) {
return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
case GRPC_CALL_OK:
return "GRPC_CALL_OK";
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/lib/surface/channel_ping.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq,
GRPC_CLOSURE_INIT(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx);
op->send_ping = &pr->closure;
op->bind_pollset = grpc_cq_pollset(cq);
grpc_cq_begin_op(cq, tag);
GPR_ASSERT(grpc_cq_begin_op(cq, tag));
top_elem->filter->start_transport_op(&exec_ctx, top_elem, op);
grpc_exec_ctx_finish(&exec_ctx);
}
68 changes: 38 additions & 30 deletions src/core/lib/surface/completion_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ typedef struct cq_vtable {
void (*init)(void *data);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq);
void (*destroy)(void *data);
void (*begin_op)(grpc_completion_queue *cq, void *tag);
bool (*begin_op)(grpc_completion_queue *cq, void *tag);
void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
Expand Down Expand Up @@ -288,8 +288,8 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq);

static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag);
static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag);
static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag);
static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag);

static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq, void *tag,
Expand Down Expand Up @@ -522,33 +522,6 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx,
}
}

static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
cq_next_data *cqd = DATA_FROM_CQ(cq);
GPR_ASSERT(!cqd->shutdown_called);
gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1);
}

static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
cq_pluck_data *cqd = DATA_FROM_CQ(cq);
GPR_ASSERT(!cqd->shutdown_called);
gpr_ref(&cqd->pending_events);
}

void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
#ifndef NDEBUG
gpr_mu_lock(cq->mu);
if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
cq->outstanding_tags =
gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) *
cq->outstanding_tag_capacity);
}
cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
gpr_mu_unlock(cq->mu);
#endif
cq->vtable->begin_op(cq, tag);
}

#ifndef NDEBUG
static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
int found = 0;
Expand Down Expand Up @@ -576,6 +549,41 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {}
#endif

static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
cq_next_data *cqd = DATA_FROM_CQ(cq);
while (true) {
gpr_atm count = gpr_atm_no_barrier_load(&cqd->pending_events);
if (count == 0) {
return false;
} else if (gpr_atm_no_barrier_cas(&cqd->pending_events, count, count + 1)) {
break;
}
}
return true;
}

static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
cq_pluck_data *cqd = DATA_FROM_CQ(cq);
GPR_ASSERT(!cqd->shutdown_called);
gpr_ref(&cqd->pending_events);
return true;
}

bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
#ifndef NDEBUG
gpr_mu_lock(cq->mu);
if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
cq->outstanding_tags =
gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) *
cq->outstanding_tag_capacity);
}
cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
gpr_mu_unlock(cq->mu);
#endif
return cq->vtable->begin_op(cq, tag);
}

/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
* completion
* type of GRPC_CQ_NEXT) */
Expand Down
5 changes: 3 additions & 2 deletions src/core/lib/surface/completion_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc);

/* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made.
\a tag is currently used only in debug builds. */
void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag);
\a tag is currently used only in debug builds. Return true on success, and
false if completion_queue has been shutdown. */
bool grpc_cq_begin_op(grpc_completion_queue *cc, void *tag);

/* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to
grpc_cq_begin_op */
Expand Down
14 changes: 11 additions & 3 deletions src/core/lib/surface/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1259,7 +1259,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
}

/* stay locked, and gather up some stuff to do */
grpc_cq_begin_op(cq, tag);
GPR_ASSERT(grpc_cq_begin_op(cq, tag));
if (server->shutdown_published) {
grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown,
NULL, gpr_malloc(sizeof(grpc_cq_completion)));
Expand Down Expand Up @@ -1446,7 +1446,11 @@ grpc_call_error grpc_server_request_call(
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
}
grpc_cq_begin_op(cq_for_notification, tag);
if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
gpr_free(rc);
error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
goto done;
}
details->reserved = NULL;
rc->cq_idx = cq_idx;
rc->type = BATCH_CALL;
Expand Down Expand Up @@ -1496,7 +1500,11 @@ grpc_call_error grpc_server_request_registered_call(
error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
goto done;
}
grpc_cq_begin_op(cq_for_notification, tag);
if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
gpr_free(rc);
error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
goto done;
}
rc->cq_idx = cq_idx;
rc->type = REGISTERED_CALL;
rc->server = server;
Expand Down
50 changes: 28 additions & 22 deletions src/cpp/server/server_cc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,19 +151,25 @@ class Server::SyncRequest final : public CompletionQueueTag {
GPR_ASSERT(cq_ && !in_flight_);
in_flight_ = true;
if (tag_) {
GPR_ASSERT(GRPC_CALL_OK ==
grpc_server_request_registered_call(
server, tag_, &call_, &deadline_, &request_metadata_,
has_request_payload_ ? &request_payload_ : nullptr, cq_,
notify_cq, this));
if (GRPC_CALL_OK !=
grpc_server_request_registered_call(
server, tag_, &call_, &deadline_, &request_metadata_,
has_request_payload_ ? &request_payload_ : nullptr, cq_,
notify_cq, this)) {
TeardownRequest();
return;
}
} else {
if (!call_details_) {
call_details_ = new grpc_call_details;
grpc_call_details_init(call_details_);
}
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
server, &call_, call_details_,
&request_metadata_, cq_, notify_cq, this));
if (grpc_server_request_call(server, &call_, call_details_,
&request_metadata_, cq_, notify_cq,
this) != GRPC_CALL_OK) {
TeardownRequest();
return;
}
}
}

Expand Down Expand Up @@ -286,12 +292,10 @@ class Server::SyncRequestThreadManager : public ThreadManager {
if (ok) {
// Calldata takes ownership of the completion queue inside sync_req
SyncRequest::CallData cd(server_, sync_req);
{
// Prepare for the next request
if (!IsShutdown()) {
sync_req->SetupRequest(); // Create new completion queue for sync_req
sync_req->Request(server_->c_server(), server_cq_->cq());
}
// Prepare for the next request
if (!IsShutdown()) {
sync_req->SetupRequest(); // Create new completion queue for sync_req
sync_req->Request(server_->c_server(), server_cq_->cq());
}

GPR_TIMER_SCOPE("cd.Run()", 0);
Expand All @@ -316,8 +320,8 @@ class Server::SyncRequestThreadManager : public ThreadManager {
}

void Shutdown() override {
server_cq_->Shutdown();
ThreadManager::Shutdown();
server_cq_->Shutdown();
}

void Wait() override {
Expand Down Expand Up @@ -652,10 +656,11 @@ ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
void ServerInterface::RegisteredAsyncRequest::IssueRequest(
void* registered_method, grpc_byte_buffer** payload,
ServerCompletionQueue* notification_cq) {
grpc_server_request_registered_call(
server_->server(), registered_method, &call_, &context_->deadline_,
context_->client_metadata_.arr(), payload, call_cq_->cq(),
notification_cq->cq(), this);
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_registered_call(
server_->server(), registered_method, &call_,
&context_->deadline_,
context_->client_metadata_.arr(), payload,
call_cq_->cq(), notification_cq->cq(), this));
}

ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
Expand All @@ -667,9 +672,10 @@ ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
grpc_call_details_init(&call_details_);
GPR_ASSERT(notification_cq);
GPR_ASSERT(call_cq);
grpc_server_request_call(server->server(), &call_, &call_details_,
context->client_metadata_.arr(), call_cq->cq(),
notification_cq->cq(), this);
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
server->server(), &call_, &call_details_,
context->client_metadata_.arr(), call_cq->cq(),
notification_cq->cq(), this));
}

bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag,
Expand Down
5 changes: 3 additions & 2 deletions test/core/end2end/fuzzers/server_fuzzer.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
grpc_metadata_array_init(&request_metadata1);
int requested_calls = 0;

grpc_server_request_call(server, &call1, &call_details1, &request_metadata1,
cq, cq, tag(1));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_server_request_call(server, &call1, &call_details1,
&request_metadata1, cq, cq, tag(1)));
requested_calls++;

grpc_event ev;
Expand Down
6 changes: 4 additions & 2 deletions test/core/fling/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ typedef struct {

static void request_call(void) {
grpc_metadata_array_init(&request_metadata_recv);
grpc_server_request_call(server, &call, &call_details, &request_metadata_recv,
cq, cq, tag(FLING_SERVER_NEW_REQUEST));
GPR_ASSERT(GRPC_CALL_OK ==
grpc_server_request_call(server, &call, &call_details,
&request_metadata_recv, cq, cq,
tag(FLING_SERVER_NEW_REQUEST)));
}

static void handle_unary_method(void) {
Expand Down
6 changes: 3 additions & 3 deletions test/core/surface/completion_queue_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ static void test_cq_end_op(void) {
cc = grpc_completion_queue_create(
grpc_completion_queue_factory_lookup(&attr), &attr, NULL);

grpc_cq_begin_op(cc, tag);
GPR_ASSERT(grpc_cq_begin_op(cc, tag));
grpc_cq_end_op(&exec_ctx, cc, tag, GRPC_ERROR_NONE,
do_nothing_end_completion, NULL, &completion);

Expand Down Expand Up @@ -233,7 +233,7 @@ static void test_pluck(void) {
grpc_completion_queue_factory_lookup(&attr), &attr, NULL);

for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
grpc_cq_begin_op(cc, tags[i]);
GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
grpc_cq_end_op(&exec_ctx, cc, tags[i], GRPC_ERROR_NONE,
do_nothing_end_completion, NULL, &completions[i]);
}
Expand All @@ -245,7 +245,7 @@ static void test_pluck(void) {
}

for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
grpc_cq_begin_op(cc, tags[i]);
GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
grpc_cq_end_op(&exec_ctx, cc, tags[i], GRPC_ERROR_NONE,
do_nothing_end_completion, NULL, &completions[i]);
}
Expand Down
4 changes: 2 additions & 2 deletions test/core/surface/completion_queue_threading_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ static void test_too_many_plucks(void) {
GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);

for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
grpc_cq_begin_op(cc, tags[i]);
GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
grpc_cq_end_op(&exec_ctx, cc, tags[i], GRPC_ERROR_NONE,
do_nothing_end_completion, NULL, &completions[i]);
}
Expand Down Expand Up @@ -153,7 +153,7 @@ static void producer_thread(void *arg) {

gpr_log(GPR_INFO, "producer %d phase 1", opt->id);
for (i = 0; i < TEST_THREAD_EVENTS; i++) {
grpc_cq_begin_op(opt->cc, (void *)(intptr_t)1);
GPR_ASSERT(grpc_cq_begin_op(opt->cc, (void *)(intptr_t)1));
}

gpr_log(GPR_INFO, "producer %d phase 1 done", opt->id);
Expand Down
Loading

0 comments on commit f787594

Please sign in to comment.