Skip to content

Commit

Permalink
Opt1
Browse files Browse the repository at this point in the history
  • Loading branch information
chenBright committed Dec 17, 2024
1 parent 2372541 commit bebea21
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 104 deletions.
3 changes: 1 addition & 2 deletions src/brpc/input_messenger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,7 @@ int InputMessenger::ProcessNewMessage(
// This unique_ptr prevents msg to be lost before transfering
// ownership to last_msg
DestroyingPtr<InputMessageBase> msg(pr.message());
QueueMessage(last_msg.release(), &num_bthread_created,
m->_keytable_pool);
QueueMessage(last_msg.release(), &num_bthread_created, m->_keytable_pool);
if (_handlers[index].process == NULL) {
LOG(ERROR) << "process of index=" << index << " is NULL";
continue;
Expand Down
6 changes: 4 additions & 2 deletions src/bthread/task_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1013,13 +1013,15 @@ void print_task(std::ostream& os, bthread_t tid) {
<< "}\nhas_tls=" << has_tls
<< "\nuptime_ns=" << butil::cpuwide_time_ns() - cpuwide_start_ns
<< "\ncputime_ns=" << stat.cputime_ns
<< "\nnswitch=" << stat.nswitch
<< "\nnswitch=" << stat.nswitch
#ifdef BRPC_BTHREAD_TRACER
<< "\nstatus=" << status
<< "\ntraced=" << traced
<< "\nworker_tid=" << worker_tid;
#endif // BRPC_BTHREAD_TRACER
#else
;
(void)status;(void)traced;(void)worker_tid;
#endif // BRPC_BTHREAD_TRACER
}
}

Expand Down
203 changes: 139 additions & 64 deletions src/bthread/task_tracer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,45 @@
// specific language governing permissions and limitations
// under the License.

#ifdef BRPC_BTHREAD_TRACER
// #ifdef BRPC_BTHREAD_TRACER

#include "bthread/task_tracer.h"
#include <unistd.h>
#include <poll.h>
#include <gflags/gflags.h>
#include "butil/fd_utility.h"
#include "butil/debug/stack_trace.h"
#include "butil/memory/scope_guard.h"
#include "bthread/task_group.h"
#include "bthread/processor.h"
// todo move到butil?
// #include "brpc/reloadable_flags.h"

namespace bthread {

DEFINE_bool(enable_fast_unwind, true, "Whether to enable fast unwind");
DEFINE_uint32(signal_trace_timeout_ms, 50, "Timeout for signal trace in ms");
// BRPC_VALIDATE_GFLAG(signal_trace_timeout_ms, [](const char*, uint32_t v) {
// return v > 0;
// });

extern BAIDU_THREAD_LOCAL TaskMeta* pthread_fake_meta;

int TaskTracer::Init() {
if (RegisterSignalHandler() != 0) {
if (pipe(_pipe_fds) != 0) {
PLOG(ERROR) << "Fail to pipe";
return -1;
}
if (butil::make_non_blocking(_pipe_fds[0]) != 0) {
PLOG(ERROR) << "Fail to make_non_blocking";
return -1;
}
if (butil::make_non_blocking(_pipe_fds[1]) != 0) {
PLOG(ERROR) << "Fail to make_non_blocking";
return -1;
}
if (sem_init(&_sem, 0, 0) != 0) {
PLOG(ERROR) << "Fail to sem_init";
return -1;
}
if (_trace_time.expose("bthread_trace_time") != 0) {
Expand All @@ -43,6 +65,10 @@ int TaskTracer::Init() {
if (_signal_handler_time.expose("bthread_signal_handler_time") != 0) {
return -1;
}
if (RegisterSignalHandler() != 0) {
return -1;
}
LOG(INFO) << 1;
return 0;
}

Expand Down Expand Up @@ -89,16 +115,22 @@ bool TaskTracer::set_end_status_unsafe(TaskMeta* m) {

std::string TaskTracer::Trace(bthread_t tid) {
Result result = TraceImpl(tid);
// return result.error ? result.err_msg : ToString(result);
if (result.error) {
return result.err_msg;
}

if (result.frame_count == 0) {
return "No frame";
if (!result.OK()) {
std::string result_str;
result_str.reserve(128);
for (size_t i = 0; i < result.err_count; ++i) {
result_str.append(result.err_msg[i]);
if (i + 1 < result.err_count) {
result_str.push_back('\n');
}
}
} else {
return "No frame";
}
}

if (!result.fast_unwind) {
if (result.fast_unwind) {
butil::debug::StackTrace stack_trace((void**)&result.ips, result.frame_count);
return stack_trace.ToString();
}
Expand All @@ -121,17 +153,22 @@ std::string TaskTracer::Trace(bthread_t tid) {

void TaskTracer::Trace(std::ostream& os, bthread_t tid) {
Result result = TraceImpl(tid);
if (result.error) {
os << result.err_msg;
return;
}

if (result.frame_count == 0) {
os << "No frame";
if (!result.OK()) {
os << result.err_msg;
for (size_t i = 0; i < result.err_count; ++i) {
os << result.err_msg[i];
if (i + 1 < result.err_count) {
os << '\n';
}
}
} else {
os << "No frame";
}
return;
}

if (!result.fast_unwind) {
if (result.fast_unwind) {
butil::debug::StackTrace stack_trace((void**)&result.ips, result.frame_count);
stack_trace.OutputToStream(&os);
return;
Expand All @@ -155,6 +192,7 @@ TaskTracer::Result TaskTracer::TraceImpl(bthread_t tid) {
BRPC_SCOPE_EXIT {
timer.stop();
_trace_time << timer.n_elapsed();
LOG(INFO) << "Trace time: " << timer.u_elapsed();
};

if (tid == bthread_self() ||
Expand Down Expand Up @@ -204,7 +242,10 @@ TaskTracer::Result TaskTracer::TraceImpl(bthread_t tid) {
if (TASK_STATUS_RUNNING == status) {
result = SignalTrace(worker_tid);
} else if (TASK_STATUS_SUSPENDED == status || TASK_STATUS_READY == status) {
butil::Timer timer(butil::Timer::STARTED);
result = ContextTrace(m->stack->context);
timer.stop();
LOG(INFO) << "Signal trace time: " << timer.u_elapsed();
}

{
Expand Down Expand Up @@ -272,12 +313,12 @@ int TaskTracer::RegisterSignalHandler() {
sa.sa_sigaction = SignalHandler;
sa.sa_flags = SA_SIGINFO;
sigfillset(&sa.sa_mask);
if (sigaction(SIGURG, &sa, &old_sa) == -1) {
if (sigaction(SIGURG, &sa, &old_sa) != 0) {
PLOG(ERROR) << "Failed to sigaction";
return -1;
}
if (NULL != old_sa.sa_handler || NULL != old_sa.sa_sigaction) {
LOG(ERROR) << "SIGURG is already registered";
LOG(ERROR) << "Signal handler of SIGURG is already registered";
return -1;
}

Expand All @@ -296,37 +337,80 @@ void TaskTracer::SignalHandler(int, siginfo_t* info, void* context) {

// Caution: This function is called in signal handler, so it should be async-signal-safety.
void TaskTracer::SignalTraceHandler(unw_context_t* context) {
// Something wrong, signal trace is not started, do nothing.
if (SIGNAL_TRACE_STATUS_START !=
_signal_handler_flag.load(butil::memory_order_acquire)) {
return;
}

butil::Timer timer(butil::Timer::STARTED);
BRPC_SCOPE_EXIT {
timer.stop();
// todo 会申请内存
_signal_handler_time << timer.n_elapsed();
};

_signal_handler_context = context;
// Use memory_order_seq_cst to ensure the flag is set before loop.
_signal_handler_flag.store(SIGNAL_TRACE_STATUS_TRACING, butil::memory_order_seq_cst);
while (SIGNAL_TRACE_STATUS_TRACING ==
_signal_handler_flag.load(butil::memory_order_seq_cst)) {
SignalSafeUsleep(50); // 50us
// Timeout to avoid deadlock of libunwind.
_signal_context = context;
// Notify SignalTrace that SignalTraceHandler has started.
// Binary semaphore do not fail, so no need to check return value.
sem_post(&_sem);
pollfd poll_fd = {_pipe_fds[0], POLLIN, 0};
while (true) {
timer.stop();
if (timer.m_elapsed() > FLAGS_signal_trace_timeout_ms) {
int timeout = FLAGS_signal_trace_timeout_ms - timer.m_elapsed();
int rc = poll(&poll_fd, 1, timeout);
if (rc > 0) {
break;
} else if (rc == 0) {
errno = ETIMEDOUT;
return;
} else {
if (errno == EINTR) {
continue;
}
return;
}
}
char c;
while (read(_pipe_fds[0], &c, 1) != 0 && EINTR == errno);
}

TaskTracer::Result TaskTracer::SignalTrace(pid_t tid) {
_signal_handler_context = NULL;
// Use memory_order_seq_cst to ensure the flag is set before sending signal.
_signal_handler_flag.store(SIGNAL_TRACE_STATUS_START, butil::memory_order_seq_cst);
bool TaskTracer::WaitForSignalHandler(const timespec* abs_timeout, Result& result) {
while (sem_timedwait(&_sem, abs_timeout) != 0) {
if (EINTR == errno) {
continue;
}
if (ETIMEDOUT == errno) {
result.SetError("Timeout exceed %dms", FLAGS_signal_trace_timeout_ms);
} else {
// During the process of signal handler,
// can not use berro() which is not async-signal-safe.
result.SetError("Fail to sem_timedwait, errno=%d", errno);
}
return false;
}
return true;
}

void TaskTracer::WakeupSignalHandler(Result& result) {
int try_count = 0;
while (true) {
ssize_t nw = write(_pipe_fds[1], "1", 1);
if (0 < nw) {
break;
} else if (0 == nw) {
continue;
} else {
if (EINTR == errno) {
continue;
} else if (EAGAIN == errno && try_count++ < 3) {
usleep(1000); // Wait 1ms, try again.
continue;
}
// During the process of signal handler,
// can not use berro() which is not async-signal-safe.
result.SetError(
"Fail to write pipe to notify signal handler, errno=%d", errno);
break;
}
}
}

TaskTracer::Result TaskTracer::SignalTrace(pid_t tid) {
// CAUTION:
// The signal handler will wait for the backtrace to complete.
// If the worker thread is interrupted when holding a resource(lock, etc),
Expand All @@ -345,8 +429,7 @@ TaskTracer::Result TaskTracer::SignalTrace(pid_t tid) {
// #0 __lll_lock_wait (futex=futex@entry=0x7f0d3d7f0990 <_rtld_global+2352>, private=0) at lowlevellock.c:52
// #1 0x00007f0d3a73c131 in __GI___pthread_mutex_lock (mutex=0x7f0d3d7f0990 <_rtld_global+2352>) at ../nptl/pthread_mutex_lock.c:115
// #2 0x00007f0d38eb0231 in __GI___dl_iterate_phdr (callback=callback@entry=0x7f0d38c456a0 <_ULx86_64_dwarf_callback>, data=data@entry=0x7f0d07defad0) at dl-iteratephdr.c:40
// #3 0x00007f0d38c45d79 in _ULx86_64_dwarf_find_proc_info (as=0x7f0d38c4f340 <local_addr_space>, ip=ip@entry=139694791966897, pi=pi@entry=0x7f0d07df0498, need_unwind_info=need_unwind_info@entry=1, arg=0x7f0
// d07df0340) at dwarf/Gfind_proc_info-lsb.c:759
// #3 0x00007f0d38c45d79 in _ULx86_64_dwarf_find_proc_info (as=0x7f0d38c4f340 <local_addr_space>, ip=ip@entry=139694791966897, pi=pi@entry=0x7f0d07df0498, need_unwind_info=need_unwind_info@entry=1, arg=0x7f0d07df0340) at dwarf/Gfind_proc_info-lsb.c:759
// #4 0x00007f0d38c43260 in fetch_proc_info (c=c@entry=0x7f0d07df0340, ip=139694791966897) at dwarf/Gparser.c:461
// #5 0x00007f0d38c44e46 in find_reg_state (sr=0x7f0d07defd10, c=0x7f0d07df0340) at dwarf/Gparser.c:925
// #6 _ULx86_64_dwarf_step (c=c@entry=0x7f0d07df0340) at dwarf/Gparser.c:972
Expand Down Expand Up @@ -374,6 +457,8 @@ TaskTracer::Result TaskTracer::SignalTrace(pid_t tid) {
// backtracks with dl_iterate_phdr. We introduce a timeout mechanism in signal
// handler to avoid deadlock.

_signal_context = NULL;

union sigval value{};
value.sival_ptr = this;
size_t sigqueue_try = 0;
Expand All @@ -383,36 +468,26 @@ TaskTracer::Result TaskTracer::SignalTrace(pid_t tid) {
}
}

butil::Timer timer(butil::Timer::STARTED);
// Use memory_order_seq_cst to ensure the signal is sent and the flag is set before checking.
for (int i = 0;
SIGNAL_TRACE_STATUS_START == _signal_handler_flag.load(butil::memory_order_seq_cst);
++i) {
if (i < 30) {
sched_yield();
} else {
SignalSafeUsleep(5); // 5us
}
Result result;
// Wakeup the signal handler at the end.
BRPC_SCOPE_EXIT {
WakeupSignalHandler(result);
};

// Timeout to avoid dead loop if handler of SIGURG is covered.
timer.stop();
if (timer.m_elapsed() > FLAGS_signal_trace_timeout_ms) {
return Result::MakeErrorResult(
"Timeout exceed %dms", FLAGS_signal_trace_timeout_ms);
}
timespec abs_timeout = butil::milliseconds_from_now(FLAGS_signal_trace_timeout_ms);
// Wait for the signal handler to start.
if (!WaitForSignalHandler(&abs_timeout, result)) {
return result;
}

unw_cursor_t cursor;
int rc = unw_init_local(&cursor, _signal_handler_context);
Result result;
if (0 == rc) {
result = TraceCore(cursor);
int rc = unw_init_local(&cursor, _signal_context);
if (0 != rc) {
result.SetError("Failed to init local, rc=%d", rc);
return result;
}

// Use memory_order_seq_cst to ensure the flag is set after tracing.
_signal_handler_flag.store(SIGNAL_TRACE_STATUS_UNKNOWN, butil::memory_order_seq_cst);

return 0 == rc ? result : Result::MakeErrorResult("Failed to init local, rc=%d", rc);
return TraceCore(cursor);
}

unw_cursor_t TaskTracer::MakeCursor(bthread_fcontext_t fcontext) {
Expand Down Expand Up @@ -461,7 +536,7 @@ TaskTracer::Result TaskTracer::TraceCore(unw_cursor_t& cursor) {
rc = unw_get_reg(&cursor, UNW_REG_IP, &ip);
result.ips[result.frame_count] = ip;

if (!result.fast_unwind) {
if (result.fast_unwind) {
continue;
}

Expand All @@ -484,4 +559,4 @@ TaskTracer::Result TaskTracer::TraceCore(unw_cursor_t& cursor) {

} // namespace bthread

#endif // BRPC_BTHREAD_TRACER
// #endif // BRPC_BTHREAD_TRACER
Loading

0 comments on commit bebea21

Please sign in to comment.