Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated fix for refs/heads/status-x-test #22

Open
wants to merge 6 commits into
base: status-x-test
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/grpc/impl/codegen/port_platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
/*
* Defines GRPC_ERROR_IS_ABSEIL_STATUS to use absl::Status for grpc_error_handle
*/
// #define GRPC_ERROR_IS_ABSEIL_STATUS 1
#define GRPC_ERROR_IS_ABSEIL_STATUS 1

/* Get windows.h included everywhere (we need it) */
#if defined(_WIN64) || defined(WIN64) || defined(_WIN32) || defined(WIN32)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1171,7 +1171,7 @@ class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory {
if (!discovery_mechanism_errors.empty()) {
grpc_error_handle error = GRPC_ERROR_CREATE_FROM_CPP_STRING(
absl::StrCat("field:discovery_mechanism element: ", i, " error"));
for (grpc_error_handle discovery_mechanism_error :
for (const grpc_error_handle& discovery_mechanism_error :
discovery_mechanism_errors) {
error = grpc_error_add_child(error, discovery_mechanism_error);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -718,8 +718,7 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/,
hr->qtype, hr->host, hr->is_balancer, ares_strerror(status));
GRPC_CARES_TRACE_LOG("request:%p on_hostbyname_done_locked: %s", r,
error_msg.c_str());
grpc_error_handle error =
GRPC_ERROR_CREATE_FROM_CPP_STRING(std::move(error_msg));
grpc_error_handle error = GRPC_ERROR_CREATE_FROM_CPP_STRING(error_msg);
r->error = grpc_error_add_child(error, r->error);
}
destroy_hostbyname_request_locked(hr);
Expand Down Expand Up @@ -763,8 +762,7 @@ static void on_srv_query_done_locked(void* arg, int status, int /*timeouts*/,
ares_strerror(status));
GRPC_CARES_TRACE_LOG("request:%p on_srv_query_done_locked: %s", r,
error_msg.c_str());
grpc_error_handle error =
GRPC_ERROR_CREATE_FROM_CPP_STRING(std::move(error_msg));
grpc_error_handle error = GRPC_ERROR_CREATE_FROM_CPP_STRING(error_msg);
r->error = grpc_error_add_child(error, r->error);
}
delete q;
Expand Down Expand Up @@ -823,7 +821,7 @@ static void on_txt_done_locked(void* arg, int status, int /*timeouts*/,
q->name(), ares_strerror(status));
GRPC_CARES_TRACE_LOG("request:%p on_txt_done_locked %s", r,
error_msg.c_str());
error = GRPC_ERROR_CREATE_FROM_CPP_STRING(std::move(error_msg));
error = GRPC_ERROR_CREATE_FROM_CPP_STRING(error_msg);
r->error = grpc_error_add_child(error, r->error);
}

Expand Down
4 changes: 3 additions & 1 deletion src/core/ext/transport/chttp2/server/chttp2_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ grpc_error_handle Chttp2ServerListener::Create(
// The bulk of this method is inside of a lambda to make cleanup
// easier without using goto.
grpc_error_handle error = [&]() {
grpc_error_handle error = GRPC_ERROR_NONE;
// Create Chttp2ServerListener.
listener = new Chttp2ServerListener(
server, args, args_modifier,
Expand Down Expand Up @@ -861,6 +862,7 @@ grpc_error_handle Chttp2ServerAddPort(Server* server, const char* addr,
std::vector<grpc_error_handle> error_list;
// Using lambda to avoid use of goto.
grpc_error_handle error = [&]() {
grpc_error_handle error = GRPC_ERROR_NONE;
if (absl::StartsWith(addr, kUnixUriPrefix)) {
error = grpc_resolve_unix_domain_address(
addr + sizeof(kUnixUriPrefix) - 1, &resolved);
Expand Down Expand Up @@ -911,7 +913,7 @@ grpc_error_handle Chttp2ServerAddPort(Server* server, const char* addr,
}
return GRPC_ERROR_NONE;
}(); // lambda end
for (grpc_error_handle error : error_list) {
for (const grpc_error_handle& error : error_list) {
GRPC_ERROR_UNREF(error);
}
grpc_channel_args_destroy(args);
Expand Down
23 changes: 12 additions & 11 deletions src/core/ext/transport/chttp2/transport/chttp2_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1227,26 +1227,28 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
write_state_name(t->write_state));
}
if (error != GRPC_ERROR_NONE) {
if (closure->error_data.error == GRPC_ERROR_NONE) {
closure->error_data.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
if (*closure->error_data.error == GRPC_ERROR_NONE) {
*closure->error_data.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Error in HTTP transport completing operation");
closure->error_data.error = grpc_error_set_str(
closure->error_data.error, GRPC_ERROR_STR_TARGET_ADDRESS,
*closure->error_data.error = grpc_error_set_str(
*closure->error_data.error, GRPC_ERROR_STR_TARGET_ADDRESS,
grpc_slice_from_copied_string(t->peer_string.c_str()));
}
closure->error_data.error =
grpc_error_add_child(closure->error_data.error, error);
*closure->error_data.error =
grpc_error_add_child(*closure->error_data.error, error);
}
if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
!(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
// Using GRPC_CLOSURE_SCHED instead of GRPC_CLOSURE_RUN to avoid running
// closures earlier than when it is safe to do so.
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure,
closure->error_data.error);
grpc_error_handle run_error = *closure->error_data.error;
*closure->error_data.error = GRPC_ERROR_NONE;
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, run_error);

} else {
grpc_closure_list_append(&t->run_after_write, closure,
closure->error_data.error);
*closure->error_data.error);
}
}
}
Expand Down Expand Up @@ -1394,7 +1396,7 @@ static void perform_stream_op_locked(void* stream_op,
// This batch has send ops. Use final_data as a barrier until enqueue time;
// the initial counter is dropped at the end of this function.
on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
on_complete->error_data.error = GRPC_ERROR_NONE;
*on_complete->error_data.error = GRPC_ERROR_NONE;
}

if (op->cancel_stream) {
Expand Down Expand Up @@ -2503,7 +2505,6 @@ static void read_action(void* tp, grpc_error_handle error) {

static void read_action_locked(void* tp, grpc_error_handle error) {
GPR_TIMER_SCOPE("reading_action_locked", 0);

grpc_chttp2_transport* t = static_cast<grpc_chttp2_transport*>(tp);

GRPC_ERROR_REF(error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1330,7 +1330,7 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
result = ACTION_TAKEN_NO_CALLBACK;
}
stream_state->state_op_done[OP_CANCEL_ERROR] = true;
if (!stream_state->cancel_error) {
if (stream_state->cancel_error == GRPC_ERROR_NONE) {
stream_state->cancel_error =
GRPC_ERROR_REF(stream_op->payload->cancel_stream.cancel_error);
}
Expand Down
57 changes: 44 additions & 13 deletions src/core/lib/iomgr/call_combiner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,26 @@ DebugOnlyTraceFlag grpc_call_combiner_trace(false, "call_combiner");

namespace {

constexpr static intptr_t kErrorBit =
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
// absl::Status: 2nd bit from LSB can be used
2;
#else
// grpc_status LSB can be used
1;
#endif

grpc_error_handle DecodeCancelStateError(gpr_atm cancel_state) {
if (cancel_state & 1) {
return reinterpret_cast<grpc_error_handle>(cancel_state &
~static_cast<gpr_atm>(1));
if (cancel_state & kErrorBit) {
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
return internal::StatusGetFromPtr(cancel_state & ~kErrorBit);
#else
return reinterpret_cast<grpc_error_handle>(cancel_state & ~kErrorBit);
#endif
}
return GRPC_ERROR_NONE;
}

gpr_atm EncodeCancelStateError(grpc_error_handle error) {
return static_cast<gpr_atm>(1) | reinterpret_cast<gpr_atm>(error);
}

} // namespace

CallCombiner::CallCombiner() {
Expand All @@ -57,7 +65,14 @@ CallCombiner::CallCombiner() {
}

CallCombiner::~CallCombiner() {
GRPC_ERROR_UNREF(DecodeCancelStateError(cancel_state_));
if (cancel_state_ & kErrorBit) {
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
internal::StatusFreePtr(cancel_state_ & ~kErrorBit);
#else
GRPC_ERROR_UNREF(reinterpret_cast<grpc_error_handle>(
cancel_state_ & ~static_cast<gpr_atm>(kErrorBit)));
#endif
}
}

#ifdef GRPC_TSAN_ENABLED
Expand Down Expand Up @@ -142,7 +157,7 @@ void CallCombiner::Start(grpc_closure* closure, grpc_error_handle error,
gpr_log(GPR_INFO, " QUEUING");
}
// Queue was not empty, so add closure to queue.
closure->error_data.error = error;
*closure->error_data.error = error;
queue_.Push(
reinterpret_cast<MultiProducerSingleConsumerQueue::Node*>(closure));
}
Expand Down Expand Up @@ -180,9 +195,11 @@ void CallCombiner::Stop(DEBUG_ARGS const char* reason) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
gpr_log(GPR_INFO, " EXECUTING FROM QUEUE: closure=%p error=%s",
closure,
grpc_error_std_string(closure->error_data.error).c_str());
grpc_error_std_string(*closure->error_data.error).c_str());
}
ScheduleClosure(closure, closure->error_data.error);
grpc_error_handle error = *closure->error_data.error;
*closure->error_data.error = GRPC_ERROR_NONE;
ScheduleClosure(closure, error);
break;
}
} else if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
Expand Down Expand Up @@ -235,15 +252,29 @@ void CallCombiner::SetNotifyOnCancel(grpc_closure* closure) {

void CallCombiner::Cancel(grpc_error_handle error) {
GRPC_STATS_INC_CALL_COMBINER_CANCELLED();
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
intptr_t status_ptr = internal::StatusAllocPtr(error);
if ((status_ptr & kErrorBit) > 0) {
/* absl::Status shouldn't have kErrorBit, could be a code bug. */
gpr_log(GPR_ERROR, "CallCombiner::Cancel got an error which has kErrorBit");
abort();
}
gpr_atm new_state = kErrorBit | status_ptr;
#else
gpr_atm new_state = kErrorBit | reinterpret_cast<gpr_atm>(error);
#endif
while (true) {
gpr_atm original_state = gpr_atm_acq_load(&cancel_state_);
grpc_error_handle original_error = DecodeCancelStateError(original_state);
if (original_error != GRPC_ERROR_NONE) {
#ifdef GRPC_ERROR_IS_ABSEIL_STATUS
internal::StatusFreePtr(status_ptr);
#else
GRPC_ERROR_UNREF(error);
#endif
break;
}
if (gpr_atm_full_cas(&cancel_state_, original_state,
EncodeCancelStateError(error))) {
if (gpr_atm_full_cas(&cancel_state_, original_state, new_state)) {
if (original_state != 0) {
grpc_closure* notify_on_cancel =
reinterpret_cast<grpc_closure*>(original_state);
Expand Down
5 changes: 3 additions & 2 deletions src/core/lib/iomgr/call_combiner.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,10 @@ class CallCombiner {
gpr_atm size_ = 0; // size_t, num closures in queue or currently executing
MultiProducerSingleConsumerQueue queue_;
// Either 0 (if not cancelled and no cancellation closure set),
// a grpc_closure* (if the lowest bit is 0),
// or a grpc_error_handle (if the lowest bit is 1).
// a grpc_closure* (if kErrorBit is 0),
// or a grpc_error_handle (if kErrorBit is 1).
gpr_atm cancel_state_ = 0;

#ifdef GRPC_TSAN_ENABLED
// A fake ref-counted lock that is kept alive after the destruction of
// grpc_call_combiner, when we are running the original closure.
Expand Down
10 changes: 5 additions & 5 deletions src/core/lib/iomgr/closure.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ struct grpc_closure {

/** Once queued, the result of the closure. Before then: scratch space */
union {
grpc_error_handle error;
grpc_core::ManualConstructor<grpc_error_handle> error;
uintptr_t scratch;
} error_data;

Expand All @@ -98,7 +98,7 @@ inline grpc_closure* grpc_closure_init(grpc_closure* closure,
#endif
closure->cb = cb;
closure->cb_arg = cb_arg;
closure->error_data.error = GRPC_ERROR_NONE;
closure->error_data.error.Init(GRPC_ERROR_NONE);
#ifndef NDEBUG
closure->scheduled = false;
closure->file_initiated = nullptr;
Expand Down Expand Up @@ -181,7 +181,7 @@ inline bool grpc_closure_list_append(grpc_closure_list* closure_list,
GRPC_ERROR_UNREF(error);
return false;
}
closure->error_data.error = error;
*closure->error_data.error = error;
closure->next_data.next = nullptr;
bool was_empty = (closure_list->head == nullptr);
if (was_empty) {
Expand All @@ -197,8 +197,8 @@ inline bool grpc_closure_list_append(grpc_closure_list* closure_list,
inline void grpc_closure_list_fail_all(grpc_closure_list* list,
grpc_error_handle forced_failure) {
for (grpc_closure* c = list->head; c != nullptr; c = c->next_data.next) {
if (c->error_data.error == GRPC_ERROR_NONE) {
c->error_data.error = GRPC_ERROR_REF(forced_failure);
if (*c->error_data.error == GRPC_ERROR_NONE) {
*c->error_data.error = GRPC_ERROR_REF(forced_failure);
}
}
GRPC_ERROR_UNREF(forced_failure);
Expand Down
16 changes: 10 additions & 6 deletions src/core/lib/iomgr/combiner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ static void combiner_exec(grpc_core::Combiner* lock, grpc_closure* cl,
}
GPR_ASSERT(last & STATE_UNORPHANED); // ensure lock has not been destroyed
assert(cl->cb);
cl->error_data.error = error;
*cl->error_data.error = error;
lock->queue.Push(cl->next_data.mpscq_node.get());
}

Expand Down Expand Up @@ -232,7 +232,8 @@ bool grpc_combiner_continue_exec_ctx() {
}
GPR_TIMER_SCOPE("combiner.exec1", 0);
grpc_closure* cl = reinterpret_cast<grpc_closure*>(n);
grpc_error_handle cl_err = cl->error_data.error;
grpc_error_handle cl_err = *cl->error_data.error;
*cl->error_data.error = GRPC_ERROR_NONE;
#ifndef NDEBUG
cl->scheduled = false;
#endif
Expand All @@ -248,7 +249,8 @@ bool grpc_combiner_continue_exec_ctx() {
GRPC_COMBINER_TRACE(
gpr_log(GPR_INFO, "C:%p execute_final[%d] c=%p", lock, loops, c));
grpc_closure* next = c->next_data.next;
grpc_error_handle error = c->error_data.error;
grpc_error_handle error = *c->error_data.error;
*c->error_data.error = GRPC_ERROR_NONE;
#ifndef NDEBUG
c->scheduled = false;
#endif
Expand Down Expand Up @@ -314,6 +316,7 @@ static void combiner_finally_exec(grpc_core::Combiner* lock,
GPR_TIMER_MARK("slowpath", 0);
// Using error_data.scratch to store the combiner so that it can be accessed
// in enqueue_finally.
*closure->error_data.error = GRPC_ERROR_NONE;
closure->error_data.scratch = reinterpret_cast<uintptr_t>(lock);
lock->Run(GRPC_CLOSURE_CREATE(enqueue_finally, closure, nullptr), error);
return;
Expand All @@ -327,9 +330,10 @@ static void combiner_finally_exec(grpc_core::Combiner* lock,

static void enqueue_finally(void* closure, grpc_error_handle error) {
grpc_closure* cl = static_cast<grpc_closure*>(closure);
combiner_finally_exec(
reinterpret_cast<grpc_core::Combiner*>(cl->error_data.scratch), cl,
GRPC_ERROR_REF(error));
grpc_core::Combiner* lock =
reinterpret_cast<grpc_core::Combiner*>(cl->error_data.scratch);
cl->error_data.scratch = 0;
combiner_finally_exec(lock, cl, GRPC_ERROR_REF(error));
}

namespace grpc_core {
Expand Down
Loading