Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support async logging #2413

Merged
merged 5 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
342 changes: 315 additions & 27 deletions src/butil/logging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,11 @@ typedef pthread_mutex_t* MutexHandle;
#include "butil/strings/string_util.h"
#include "butil/strings/stringprintf.h"
#include "butil/strings/utf_string_conversions.h"
#include "butil/synchronization/lock.h"
#include "butil/synchronization/condition_variable.h"
#include "butil/threading/platform_thread.h"
#include "butil/threading/simple_thread.h"
#include "butil/object_pool.h"

#if defined(OS_POSIX)
#include "butil/errno.h"
#include "butil/fd_guard.h"
Expand Down Expand Up @@ -144,6 +147,17 @@ DEFINE_bool(log_year, false, "Log year in datetime part in each log");

DEFINE_bool(log_func_name, false, "Log function name in each log");

DEFINE_bool(async_log, false, "Use async log");

DEFINE_bool(async_log_in_background_always, false, "Async log written in background always.");

DEFINE_int32(max_async_log_queue_size, 100000, "Max async log size. "
"If current log count of async log > max_async_log_size, "
"Use sync log to protect process.");

DEFINE_int32(sleep_to_flush_async_log_s, 0,
"If the value > 0, sleep before atexit to flush async log");

namespace {

LoggingDestination logging_destination = LOG_DEFAULT;
Expand Down Expand Up @@ -399,8 +413,300 @@ void CloseLogFileUnlocked() {
log_file = NULL;
}

void Log2File(const std::string& log) {
// We can have multiple threads and/or processes, so try to prevent them
// from clobbering each other's writes.
// If the client app did not call InitLogging, and the lock has not
// been created do it now. We do this on demand, but if two threads try
// to do this at the same time, there will be a race condition to create
// the lock. This is why InitLogging should be called from the main
// thread at the beginning of execution.
LoggingLock::Init(LOCK_LOG_FILE, NULL);
LoggingLock logging_lock;
if (InitializeLogFileHandle()) {
#if defined(OS_WIN)
SetFilePointer(log_file, 0, 0, SEEK_END);
DWORD num_written;
WriteFile(log_file,
static_cast<const void*>(log.data()),
static_cast<DWORD>(log.size()),
&num_written,
NULL);
#else
fwrite(log.data(), log.size(), 1, log_file);
fflush(log_file);
#endif
}
}

} // namespace

struct BAIDU_CACHELINE_ALIGNMENT LogRequest {
static LogRequest* const UNCONNECTED;

LogRequest* next{NULL};
std::string data;
};

LogRequest* const LogRequest::UNCONNECTED = (LogRequest*)(intptr_t)-1;

class AsyncLogger : public butil::SimpleThread {
public:
static AsyncLogger* GetInstance();

void Log(const std::string& log);
void Log(std::string&& log);
void StopAndJoin();

private:
friend struct DefaultSingletonTraits<AsyncLogger>;

static LogRequest _stop_req;

AsyncLogger();
~AsyncLogger() override;

static void AtExit() {
GetInstance()->StopAndJoin();
if (FLAGS_sleep_to_flush_async_log_s > 0) {
::sleep(FLAGS_sleep_to_flush_async_log_s);
}
}

void LogImpl(LogRequest* log_req);

void Run() override;

void LogTask(LogRequest* req);

bool IsLogComplete(LogRequest* old_head);

void DoLog(LogRequest* req);
void DoLog(const std::string& log);

butil::atomic<LogRequest*> _log_head;
butil::Mutex _mutex;
butil::ConditionVariable _cond;
LogRequest* _current_log_request;
butil::atomic<int32_t> _log_request_count;
butil::atomic<bool> _stop;
};

AsyncLogger* AsyncLogger::GetInstance() {
return Singleton<AsyncLogger,
LeakySingletonTraits<AsyncLogger>>::get();
}

AsyncLogger::AsyncLogger()
: butil::SimpleThread("async_log_thread")
, _log_head(NULL)
, _cond(&_mutex)
, _current_log_request(NULL)
, _stop(false) {
Start();
// We need to stop async logger and
// flush all async log before exit.
atexit(AtExit);
}

AsyncLogger::~AsyncLogger() {
StopAndJoin();
}

void AsyncLogger::Log(const std::string& log) {
if (log.empty()) {
return;
}

bool is_full = FLAGS_max_async_log_queue_size > 0 &&
_log_request_count.fetch_add(1, butil::memory_order_relaxed) >
FLAGS_max_async_log_queue_size;
if (is_full || _stop.load(butil::memory_order_relaxed)) {
// Async logger is full or stopped, fallback to sync log.
DoLog(log);
return;
}

auto log_req = butil::get_object<LogRequest>();
if (!log_req) {
// Async log failed, fallback to sync log.
DoLog(log);
return;
}
log_req->data = log;
LogImpl(log_req);
}

void AsyncLogger::Log(std::string&& log) {
if (log.empty()) {
return;
}

bool is_full = FLAGS_max_async_log_queue_size > 0 &&
_log_request_count.fetch_add(1, butil::memory_order_relaxed) >
FLAGS_max_async_log_queue_size;
if (is_full || _stop.load(butil::memory_order_relaxed)) {
// Async logger is full or stopped, fallback to sync log.
DoLog(log);
return;
}

auto log_req = butil::get_object<LogRequest>();
if (!log_req) {
// Async log failed, fallback to sync log.
chenBright marked this conversation as resolved.
Show resolved Hide resolved
DoLog(log);
return;
}
log_req->data = std::move(log);
LogImpl(log_req);
}

void AsyncLogger::LogImpl(LogRequest* log_req) {
log_req->next = LogRequest::UNCONNECTED;
// Release fence makes sure the thread getting request sees *req
LogRequest* const prev_head =
_log_head.exchange(log_req, butil::memory_order_release);
if (prev_head != NULL) {
// Someone is logging. The async_log_thread thread may spin
// until req->next to be non-UNCONNECTED. This process is not
// lock-free, but the duration is so short(1~2 instructions,
// depending on compiler) that the spin rarely occurs in practice
// (I've not seen any spin in highly contended tests).
log_req->next = prev_head;
return;
}
// We've got the right to write.
log_req->next = NULL;

if (!FLAGS_async_log_in_background_always) {
// Use sync log for the LogRequest
// which has got the right to write.
DoLog(log_req);
// Return when there's no more LogRequests.
if (IsLogComplete(log_req)) {
butil::return_object(log_req);
return;
}
}

BAIDU_SCOPED_LOCK(_mutex);
if (_stop.load(butil::memory_order_relaxed)) {
// Async logger is stopped, fallback to sync log.
LogTask(log_req);
} else {
// Wake up async logger.
_current_log_request = log_req;
_cond.Signal();
}
}

void AsyncLogger::StopAndJoin() {
if (!_stop.exchange(true, butil::memory_order_relaxed)) {
BAIDU_SCOPED_LOCK(_mutex);
_cond.Signal();
}
if (!HasBeenJoined()) {
Join();
}
}

void AsyncLogger::Run() {
while (true) {
BAIDU_SCOPED_LOCK(_mutex);
while (!_stop.load(butil::memory_order_relaxed) &&
!_current_log_request) {
_cond.Wait();
}
if (_stop.load(butil::memory_order_relaxed) &&
!_current_log_request) {
break;
}

LogTask(_current_log_request);
_current_log_request = NULL;
}
}

void AsyncLogger::LogTask(LogRequest* req) {
do {
// req was logged, skip it.
if (req->next != NULL && req->data.empty()) {
LogRequest* const saved_req = req;
req = req->next;
butil::return_object(saved_req);
}

// Log all requests to file.
while (req->next != NULL) {
LogRequest* const saved_req = req;
req = req->next;
if (!saved_req->data.empty()) {
DoLog(saved_req);
}
// Release LogRequests until last request.
butil::return_object(saved_req);
}
if (!req->data.empty()) {
DoLog(req);
}

// Return when there's no more LogRequests.
if (IsLogComplete(req)) {
butil::return_object(req);
return;
}
} while (true);
}

bool AsyncLogger::IsLogComplete(LogRequest* old_head) {
if (old_head->next) {
fprintf(stderr, "old_head->next should be NULL\n");
}
LogRequest* new_head = old_head;
LogRequest* desired = NULL;
if (_log_head.compare_exchange_strong(
new_head, desired, butil::memory_order_acquire)) {
// No one added new requests.
return true;
}
if (new_head == old_head) {
fprintf(stderr, "new_head should not be equal to old_head\n");
}
// Above acquire fence pairs release fence of exchange in Log() to make
// sure that we see all fields of requests set.

// Someone added new requests.
// Reverse the list until old_head.
LogRequest* tail = NULL;
LogRequest* p = new_head;
do {
while (p->next == LogRequest::UNCONNECTED) {
sched_yield();
}
LogRequest* const saved_next = p->next;
p->next = tail;
tail = p;
p = saved_next;
if (!p) {
fprintf(stderr, "p should not be NULL\n");
}
} while (p != old_head);

// Link old list with new list.
old_head->next = tail;
return false;
}

void AsyncLogger::DoLog(LogRequest* req) {
DoLog(req->data);
req->data.clear();
}

void AsyncLogger::DoLog(const std::string& log) {
Log2File(log);
_log_request_count.fetch_sub(1);
}

LoggingSettings::LoggingSettings()
: logging_dest(LOG_DEFAULT),
log_file(NULL),
Expand Down Expand Up @@ -473,7 +779,7 @@ void PrintLogPrefix(std::ostream& os, int severity,
const char* file, int line,
const char* func) {
PrintLogSeverity(os, severity);
#if defined(OS_LINUX)
#if defined(OS_LINUX) || defined(OS_MACOSX)
timeval tv;
gettimeofday(&tv, NULL);
time_t t = tv.tv_sec;
Expand All @@ -495,7 +801,7 @@ void PrintLogPrefix(std::ostream& os, int severity,
<< std::setw(2) << local_tm.tm_hour << ':'
<< std::setw(2) << local_tm.tm_min << ':'
<< std::setw(2) << local_tm.tm_sec;
#if defined(OS_LINUX)
#if defined(OS_LINUX) || defined(OS_MACOSX)
os << '.' << std::setw(6) << tv.tv_usec;
#endif
if (FLAGS_log_pid) {
Expand Down Expand Up @@ -957,35 +1263,17 @@ class DefaultLogSink : public LogSink {

// write to log file
if ((logging_destination & LOG_TO_FILE) != 0) {
// We can have multiple threads and/or processes, so try to prevent them
// from clobbering each other's writes.
// If the client app did not call InitLogging, and the lock has not
// been created do it now. We do this on demand, but if two threads try
// to do this at the same time, there will be a race condition to create
// the lock. This is why InitLogging should be called from the main
// thread at the beginning of execution.
LoggingLock::Init(LOCK_LOG_FILE, NULL);
LoggingLock logging_lock;
if (InitializeLogFileHandle()) {
#if defined(OS_WIN)
SetFilePointer(log_file, 0, 0, SEEK_END);
DWORD num_written;
WriteFile(log_file,
static_cast<const void*>(log.data()),
static_cast<DWORD>(log.size()),
&num_written,
NULL);
#else
fwrite(log.data(), log.size(), 1, log_file);
fflush(log_file);
#endif
if (FLAGS_async_log) {
AsyncLogger::GetInstance()->Log(std::move(log));
} else {
Log2File(log);
}
}
return true;
}
private:
DefaultLogSink() {}
~DefaultLogSink() {}
DefaultLogSink() = default;
~DefaultLogSink() override = default;
friend struct DefaultSingletonTraits<DefaultLogSink>;
};

Expand Down
Loading
Loading