From c18204e8401dbc8cc32135c8fefc99608167e6d5 Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Sat, 14 Oct 2023 17:54:05 +0800 Subject: [PATCH 1/5] Support async logging --- src/butil/logging.cc | 282 +++++++++++++++++++++++++++++++++++---- test/logging_unittest.cc | 162 +++++++++++++++++++++- 2 files changed, 418 insertions(+), 26 deletions(-) diff --git a/src/butil/logging.cc b/src/butil/logging.cc index 544a5cab8b..700dd126b1 100644 --- a/src/butil/logging.cc +++ b/src/butil/logging.cc @@ -73,8 +73,11 @@ typedef pthread_mutex_t* MutexHandle; #include "butil/strings/string_util.h" #include "butil/strings/stringprintf.h" #include "butil/strings/utf_string_conversions.h" -#include "butil/synchronization/lock.h" +#include "butil/synchronization/condition_variable.h" #include "butil/threading/platform_thread.h" +#include "butil/threading/simple_thread.h" +#include "butil/object_pool.h" + #if defined(OS_POSIX) #include "butil/errno.h" #include "butil/fd_guard.h" @@ -144,6 +147,8 @@ DEFINE_bool(log_year, false, "Log year in datetime part in each log"); DEFINE_bool(log_func_name, false, "Log function name in each log"); +DEFINE_bool(async_log, false, "Use async log"); + namespace { LoggingDestination logging_destination = LOG_DEFAULT; @@ -399,8 +404,253 @@ void CloseLogFileUnlocked() { log_file = NULL; } +void Log2File(const std::string& log) { + // We can have multiple threads and/or processes, so try to prevent them + // from clobbering each other's writes. + // If the client app did not call InitLogging, and the lock has not + // been created do it now. We do this on demand, but if two threads try + // to do this at the same time, there will be a race condition to create + // the lock. This is why InitLogging should be called from the main + // thread at the beginning of execution. + LoggingLock::Init(LOCK_LOG_FILE, NULL); + LoggingLock logging_lock; + if (InitializeLogFileHandle()) { +#if defined(OS_WIN) + SetFilePointer(log_file, 0, 0, SEEK_END); + DWORD num_written; + WriteFile(log_file, + static_cast(log.data()), + static_cast(log.size()), + &num_written, + NULL); +#else + fwrite(log.data(), log.size(), 1, log_file); + fflush(log_file); +#endif + } +} + } // namespace +struct BAIDU_CACHELINE_ALIGNMENT LogRequest { + static LogRequest* const UNCONNECTED; + + LogRequest* next{NULL}; + std::string data; +}; + +LogRequest* const LogRequest::UNCONNECTED = (LogRequest*)(intptr_t)-1; + +class AsyncLog : public butil::SimpleThread { +public: + static AsyncLog* GetInstance(); + + void Log(const std::string& log); + void Log(std::string&& log); + +private: +friend struct DefaultSingletonTraits; + + AsyncLog(); + ~AsyncLog() override; + + void LogImpl(LogRequest* log_req); + + void Run() override; + + void LogTask(); + + bool IsLogComplete(LogRequest* old_head, + bool singular_node, + LogRequest** new_tail); + + butil::atomic _log_head; + butil::Mutex _mutex; + butil::ConditionVariable _cond; + LogRequest* _current_log_request; + bool _stop; +}; + +AsyncLog* AsyncLog::GetInstance() { + return Singleton>::get(); +} + +AsyncLog::AsyncLog() + : butil::SimpleThread("async_log_thread") + , _log_head(NULL) + , _cond(&_mutex) + , _current_log_request(NULL) + , _stop(false) { + Start(); +} + +AsyncLog::~AsyncLog() { + { + BAIDU_SCOPED_LOCK(_mutex); + _stop = true; + _cond.Signal(); + } + Join(); +} + +void AsyncLog::Log(const std::string& log) { + if (log.empty()) { + return; + } + + auto log_req = butil::get_object(); + if (!log_req) { + // Async log failed, fallback to sync log. + Log2File(log); + return; + } + log_req->data = log; + LogImpl(log_req); +} + +void AsyncLog::Log(std::string&& log) { + if (log.empty()) { + return; + } + + auto log_req = butil::get_object(); + if (!log_req) { + // Async log failed, fallback to sync log. + Log2File(log); + return; + } + log_req->data = std::move(log); + LogImpl(log_req); +} + +void AsyncLog::LogImpl(LogRequest* log_req) { + log_req->next = LogRequest::UNCONNECTED; + LogRequest* const prev_head = + _log_head.exchange(log_req, butil::memory_order_release); + if (prev_head != NULL) { + log_req->next = prev_head; + return; + } + // We've got the right to write. + log_req->next = NULL; + + BAIDU_SCOPED_LOCK(_mutex); + _current_log_request = log_req; + _cond.Signal(); +} + +void AsyncLog::Run() { + while (true) { + BAIDU_SCOPED_LOCK(_mutex); + while (!_stop && !_current_log_request) { + _cond.Wait(); + } + if (_stop) { + break; + } + + LogTask(); + _current_log_request = NULL; + } +} + +void AsyncLog::LogTask() { + LogRequest* req = _current_log_request; + LogRequest* cur_tail = NULL; + do { + // req was written, skip it. + if (req->next != NULL && req->data.empty()) { + LogRequest* const saved_req = req; + req = req->next; + butil::return_object(saved_req); + } + + // Log all req to file. + for (LogRequest* p = req; p != NULL; p = p->next) { + if (p->data.empty()) { + continue; + } + Log2File(p->data); + p->data.clear(); + } + + // Release WriteRequest until non-empty data or last request. + while (req->next != NULL && req->data.empty()) { + LogRequest* const saved_req = req; + req = req->next; + butil::return_object(saved_req); + } + + if (NULL == cur_tail) { + for (cur_tail = req; cur_tail->next != NULL; + cur_tail = cur_tail->next); + } + // Return when there's no more WriteRequests and req is completely + // written. + if (IsLogComplete(cur_tail, (req == cur_tail), &cur_tail)) { + if (cur_tail != req) { + fprintf(stderr, "cur_tail should equal to req\n"); + } + butil::return_object(req); + return; + } + } while (true); +} + +bool AsyncLog::IsLogComplete(LogRequest* old_head, + bool singular_node, + LogRequest** new_tail) { + if (old_head->next) { + fprintf(stderr, "old_head->next should be NULL\n"); + } + LogRequest* new_head = old_head; + LogRequest* desired = NULL; + bool return_when_no_more = true; + if (!old_head->data.empty() || !singular_node) { + desired = old_head; + // Write is obviously not complete if old_head is not fully written. + return_when_no_more = false; + } + if (_log_head.compare_exchange_strong( + new_head, desired, butil::memory_order_acquire)) { + // No one added new requests. + if (new_tail) { + *new_tail = old_head; + } + return return_when_no_more; + } + if (new_head == old_head) { + fprintf(stderr, "new_head should not be equal to old_head\n"); + } + // Above acquire fence pairs release fence of exchange in Log() to make + // sure that we see all fields of requests set. + + // Someone added new requests. + // Reverse the list until old_head. + LogRequest* tail = NULL; + LogRequest* p = new_head; + do { + while (p->next == LogRequest::UNCONNECTED) { + sched_yield(); + } + LogRequest* const saved_next = p->next; + p->next = tail; + tail = p; + p = saved_next; + if (!p) { + fprintf(stderr, "p should not be NULL\n"); + } + } while (p != old_head); + + // Link old list with new list. + old_head->next = tail; + if (new_tail) { + *new_tail = new_head; + } + return false; +} + LoggingSettings::LoggingSettings() : logging_dest(LOG_DEFAULT), log_file(NULL), @@ -957,35 +1207,17 @@ class DefaultLogSink : public LogSink { // write to log file if ((logging_destination & LOG_TO_FILE) != 0) { - // We can have multiple threads and/or processes, so try to prevent them - // from clobbering each other's writes. - // If the client app did not call InitLogging, and the lock has not - // been created do it now. We do this on demand, but if two threads try - // to do this at the same time, there will be a race condition to create - // the lock. This is why InitLogging should be called from the main - // thread at the beginning of execution. - LoggingLock::Init(LOCK_LOG_FILE, NULL); - LoggingLock logging_lock; - if (InitializeLogFileHandle()) { -#if defined(OS_WIN) - SetFilePointer(log_file, 0, 0, SEEK_END); - DWORD num_written; - WriteFile(log_file, - static_cast(log.data()), - static_cast(log.size()), - &num_written, - NULL); -#else - fwrite(log.data(), log.size(), 1, log_file); - fflush(log_file); -#endif + if (FLAGS_async_log) { + AsyncLog::GetInstance()->Log(std::move(log)); + } else { + Log2File(log); } } return true; } private: - DefaultLogSink() {} - ~DefaultLogSink() {} + DefaultLogSink() = default; + ~DefaultLogSink() override = default; friend struct DefaultSingletonTraits; }; diff --git a/test/logging_unittest.cc b/test/logging_unittest.cc index ca025ee98a..66834b9890 100644 --- a/test/logging_unittest.cc +++ b/test/logging_unittest.cc @@ -4,7 +4,9 @@ #include "butil/basictypes.h" #include "butil/logging.h" - +#include "butil/gperftools_profiler.h" +#include "butil/files/temp_file.h" +#include "butil/popen.h" #include #include @@ -14,6 +16,7 @@ namespace logging { DECLARE_bool(crash_on_fatal_log); DECLARE_int32(v); DECLARE_bool(log_func_name); +DECLARE_bool(async_log); namespace { @@ -475,6 +478,163 @@ TEST_F(LoggingTest, log_func) { ::logging::FLAGS_crash_on_fatal_log = old_crash_on_fatal_log; } +bool g_started = false; +bool g_stopped = false; +int g_prof_name_counter = 0; +butil::atomic test_logging_count(0); + +void* test_async_log(void* arg) { + if (arg == NULL) { + return NULL; + } + auto log = (std::string*)(arg); + while (!g_stopped) { + LOG(INFO) << *log; + test_logging_count.fetch_add(1); + } + + return NULL; +} + +TEST_F(LoggingTest, async_log) { + bool saved_async_log = FLAGS_async_log; + FLAGS_async_log = true; + butil::TempFile temp_file; + LoggingSettings settings; + settings.logging_dest = LOG_TO_FILE; + settings.log_file = temp_file.fname(); + settings.delete_old = DELETE_OLD_LOG_FILE; + InitLogging(settings); + + std::string log = "135792468"; + int thread_num = 8; + pthread_t threads[thread_num]; + for (int i = 0; i < thread_num; ++i) { + ASSERT_EQ(0, pthread_create(&threads[i], NULL, test_async_log, &log)); + } + + sleep(5); + + g_stopped = true; + for (int i = 0; i < thread_num; ++i) { + pthread_join(threads[i], NULL); + } + // Wait for async log thread to flush all logs to file. + sleep(10); + + std::ostringstream oss; + std::string cmd = butil::string_printf("grep -c %s %s", + log.c_str(), temp_file.fname()); + ASSERT_LE(0, butil::read_command_output(oss, cmd.c_str())); + uint64_t log_count = std::strtol(oss.str().c_str(), NULL, 10); + ASSERT_EQ(log_count, test_logging_count.load()); + + FLAGS_async_log = saved_async_log; +} + +struct BAIDU_CACHELINE_ALIGNMENT PerfArgs { + const std::string* log; + int64_t counter; + int64_t elapse_ns; + bool ready; + + PerfArgs() : log(NULL), counter(0), elapse_ns(0), ready(false) {} +}; + +void* test_log(void* void_arg) { + auto args = (PerfArgs*)void_arg; + args->ready = true; + butil::Timer t; + while (!g_stopped) { + if (g_started) { + break; + } + usleep(10); + } + t.start(); + while (!g_stopped) { + { + LOG(INFO) << *args->log; + test_logging_count.fetch_add(1, butil::memory_order_relaxed); + } + ++args->counter; + } + t.stop(); + args->elapse_ns = t.n_elapsed(); + return NULL; +} + +void PerfTest(int thread_num, const std::string& log, bool async) { + FLAGS_async_log = async; + + g_started = false; + g_stopped = false; + pthread_t threads[thread_num]; + std::vector args(thread_num); + for (int i = 0; i < thread_num; ++i) { + args[i].log = &log; + ASSERT_EQ(0, pthread_create(&threads[i], NULL, test_log, &args[i])); + } + while (true) { + bool all_ready = true; + for (int i = 0; i < thread_num; ++i) { + if (!args[i].ready) { + all_ready = false; + break; + } + } + if (all_ready) { + break; + } + usleep(1000); + } + g_started = true; + char prof_name[32]; + snprintf(prof_name, sizeof(prof_name), "logging_%d.prof", ++g_prof_name_counter); + ProfilerStart(prof_name); + sleep(5); + ProfilerStop(); + g_stopped = true; + int64_t wait_time = 0; + int64_t count = 0; + for (int i = 0; i < thread_num; ++i) { + pthread_join(threads[i], NULL); + wait_time += args[i].elapse_ns; + count += args[i].counter; + } + std::cout << " thread_num=" << thread_num + << " log_type=" << (async ? "async" : "sync") + << " log_size=" << log.size() + << " count=" << count + << " average_time=" << wait_time / (double)count + << std::endl; +} + +TEST_F(LoggingTest, performance) { + bool saved_async_log = FLAGS_async_log; + + LoggingSettings settings; + settings.logging_dest = LOG_TO_FILE; + InitLogging(settings); + std::string log(64, 'a'); + int thread_num = 1; + PerfTest(thread_num, log, true); + sleep(10); + PerfTest(thread_num, log, false); + + thread_num = 2; + PerfTest(thread_num, log, true); + sleep(10); + PerfTest(thread_num, log, false); + + thread_num = 4; + PerfTest(thread_num, log, true); + sleep(10); + PerfTest(thread_num, log, false); + + FLAGS_async_log = saved_async_log; +} + } // namespace } // namespace logging From 6d5439d6d8eafb1f537ce205d7a2cb30b57b36e9 Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Mon, 23 Oct 2023 21:50:09 +0800 Subject: [PATCH 2/5] Set max async log queue size --- src/butil/logging.cc | 54 +++++++++++++++++++++++++++++++++----------- 1 file changed, 41 insertions(+), 13 deletions(-) diff --git a/src/butil/logging.cc b/src/butil/logging.cc index 700dd126b1..9e98f5ee71 100644 --- a/src/butil/logging.cc +++ b/src/butil/logging.cc @@ -149,6 +149,9 @@ DEFINE_bool(log_func_name, false, "Log function name in each log"); DEFINE_bool(async_log, false, "Use async log"); +DEFINE_int32(max_async_log_queue_size, 100000, "Max async log size. " + "If current log count of async log > max_async_log_size, Use sync log to protect process."); + namespace { LoggingDestination logging_destination = LOG_DEFAULT; @@ -464,10 +467,14 @@ friend struct DefaultSingletonTraits; bool singular_node, LogRequest** new_tail); + void DoLog(LogRequest* req); + void DoLog(const std::string& log); + butil::atomic _log_head; butil::Mutex _mutex; butil::ConditionVariable _cond; LogRequest* _current_log_request; + butil::atomic _log_request_count; bool _stop; }; @@ -499,10 +506,16 @@ void AsyncLog::Log(const std::string& log) { return; } + if (_log_request_count.fetch_add(1, butil::memory_order_relaxed) > + FLAGS_max_async_log_queue_size) { + DoLog(log); + return; + } + auto log_req = butil::get_object(); if (!log_req) { // Async log failed, fallback to sync log. - Log2File(log); + DoLog(log); return; } log_req->data = log; @@ -514,10 +527,16 @@ void AsyncLog::Log(std::string&& log) { return; } + if (_log_request_count.fetch_add(1, butil::memory_order_relaxed) > + FLAGS_max_async_log_queue_size) { + DoLog(log); + return; + } + auto log_req = butil::get_object(); if (!log_req) { // Async log failed, fallback to sync log. - Log2File(log); + DoLog(log); return; } log_req->data = std::move(log); @@ -567,20 +586,20 @@ void AsyncLog::LogTask() { } // Log all req to file. - for (LogRequest* p = req; p != NULL; p = p->next) { - if (p->data.empty()) { - continue; - } - Log2File(p->data); - p->data.clear(); - } - - // Release WriteRequest until non-empty data or last request. - while (req->next != NULL && req->data.empty()) { + while (req->next != NULL) { LogRequest* const saved_req = req; req = req->next; + if (!saved_req->data.empty()) { + DoLog(saved_req); + saved_req->data.clear(); + } + // Release WriteRequest until last request. butil::return_object(saved_req); } + if (!req->data.empty()) { + DoLog(req); + req->data.clear(); + } if (NULL == cur_tail) { for (cur_tail = req; cur_tail->next != NULL; @@ -607,7 +626,7 @@ bool AsyncLog::IsLogComplete(LogRequest* old_head, LogRequest* new_head = old_head; LogRequest* desired = NULL; bool return_when_no_more = true; - if (!old_head->data.empty() || !singular_node) { + if (!singular_node) { desired = old_head; // Write is obviously not complete if old_head is not fully written. return_when_no_more = false; @@ -651,6 +670,15 @@ bool AsyncLog::IsLogComplete(LogRequest* old_head, return false; } +void AsyncLog::DoLog(LogRequest* req) { + DoLog(req->data); +} + +void AsyncLog::DoLog(const std::string& log) { + Log2File(log); + _log_request_count.fetch_sub(1); +} + LoggingSettings::LoggingSettings() : logging_dest(LOG_DEFAULT), log_file(NULL), From be401a49420ead5b30ebf6688a0c81eeba595468 Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Mon, 1 Jan 2024 14:40:09 +0800 Subject: [PATCH 3/5] Flush async log before exit --- src/butil/logging.cc | 145 +++++++++++++++++++++++-------------------- 1 file changed, 76 insertions(+), 69 deletions(-) diff --git a/src/butil/logging.cc b/src/butil/logging.cc index 9e98f5ee71..4da86c1b69 100644 --- a/src/butil/logging.cc +++ b/src/butil/logging.cc @@ -150,7 +150,11 @@ DEFINE_bool(log_func_name, false, "Log function name in each log"); DEFINE_bool(async_log, false, "Use async log"); DEFINE_int32(max_async_log_queue_size, 100000, "Max async log size. " - "If current log count of async log > max_async_log_size, Use sync log to protect process."); + "If current log count of async log > max_async_log_size, " + "Use sync log to protect process."); + +DEFINE_int32(sleep_to_flush_async_log_s, 0, + "If the value > 0, sleep before atexit to flush async log"); namespace { @@ -444,28 +448,36 @@ struct BAIDU_CACHELINE_ALIGNMENT LogRequest { LogRequest* const LogRequest::UNCONNECTED = (LogRequest*)(intptr_t)-1; -class AsyncLog : public butil::SimpleThread { +class AsyncLogger : public butil::SimpleThread { public: - static AsyncLog* GetInstance(); + static AsyncLogger* GetInstance(); void Log(const std::string& log); void Log(std::string&& log); + void StopAndJoin(); private: -friend struct DefaultSingletonTraits; +friend struct DefaultSingletonTraits; + + static LogRequest _stop_req; - AsyncLog(); - ~AsyncLog() override; + AsyncLogger(); + ~AsyncLogger() override; + + static void AtExit() { + GetInstance()->StopAndJoin(); + if (FLAGS_sleep_to_flush_async_log_s > 0) { + ::sleep(FLAGS_sleep_to_flush_async_log_s); + } + } void LogImpl(LogRequest* log_req); void Run() override; - void LogTask(); + void LogTask(LogRequest* req); - bool IsLogComplete(LogRequest* old_head, - bool singular_node, - LogRequest** new_tail); + bool IsLogComplete(LogRequest* old_head); void DoLog(LogRequest* req); void DoLog(const std::string& log); @@ -475,39 +487,40 @@ friend struct DefaultSingletonTraits; butil::ConditionVariable _cond; LogRequest* _current_log_request; butil::atomic _log_request_count; - bool _stop; + butil::atomic _stop; }; -AsyncLog* AsyncLog::GetInstance() { - return Singleton>::get(); +AsyncLogger* AsyncLogger::GetInstance() { + return Singleton>::get(); } -AsyncLog::AsyncLog() +AsyncLogger::AsyncLogger() : butil::SimpleThread("async_log_thread") , _log_head(NULL) , _cond(&_mutex) , _current_log_request(NULL) , _stop(false) { Start(); + // We need to stop async logger and + // flush all async log before exit. + atexit(AtExit); } -AsyncLog::~AsyncLog() { - { - BAIDU_SCOPED_LOCK(_mutex); - _stop = true; - _cond.Signal(); - } - Join(); +AsyncLogger::~AsyncLogger() { + StopAndJoin(); } -void AsyncLog::Log(const std::string& log) { +void AsyncLogger::Log(const std::string& log) { if (log.empty()) { return; } - if (_log_request_count.fetch_add(1, butil::memory_order_relaxed) > - FLAGS_max_async_log_queue_size) { + bool is_full = FLAGS_max_async_log_queue_size > 0 && + _log_request_count.fetch_add(1, butil::memory_order_relaxed) > + FLAGS_max_async_log_queue_size; + if (is_full || _stop) { + // Async logger is full or stopped, fallback to sync log. DoLog(log); return; } @@ -522,13 +535,16 @@ void AsyncLog::Log(const std::string& log) { LogImpl(log_req); } -void AsyncLog::Log(std::string&& log) { +void AsyncLogger::Log(std::string&& log) { if (log.empty()) { return; } - if (_log_request_count.fetch_add(1, butil::memory_order_relaxed) > - FLAGS_max_async_log_queue_size) { + bool is_full = FLAGS_max_async_log_queue_size > 0 && + _log_request_count.fetch_add(1, butil::memory_order_relaxed) > + FLAGS_max_async_log_queue_size; + if (is_full || _stop) { + // Async logger is full or stopped, fallback to sync log. DoLog(log); return; } @@ -543,7 +559,7 @@ void AsyncLog::Log(std::string&& log) { LogImpl(log_req); } -void AsyncLog::LogImpl(LogRequest* log_req) { +void AsyncLogger::LogImpl(LogRequest* log_req) { log_req->next = LogRequest::UNCONNECTED; LogRequest* const prev_head = _log_head.exchange(log_req, butil::memory_order_release); @@ -555,37 +571,50 @@ void AsyncLog::LogImpl(LogRequest* log_req) { log_req->next = NULL; BAIDU_SCOPED_LOCK(_mutex); - _current_log_request = log_req; - _cond.Signal(); + if (_stop) { + LogTask(log_req); + } else { + _current_log_request = log_req; + _cond.Signal(); + } } -void AsyncLog::Run() { +void AsyncLogger::StopAndJoin() { + if (!_stop) { + BAIDU_SCOPED_LOCK(_mutex); + _stop = true; + _cond.Signal(); + } + if (!HasBeenJoined()) { + Join(); + } +} + +void AsyncLogger::Run() { while (true) { BAIDU_SCOPED_LOCK(_mutex); while (!_stop && !_current_log_request) { _cond.Wait(); } - if (_stop) { + if (_stop && !_current_log_request) { break; } - LogTask(); + LogTask(_current_log_request); _current_log_request = NULL; } } -void AsyncLog::LogTask() { - LogRequest* req = _current_log_request; - LogRequest* cur_tail = NULL; +void AsyncLogger::LogTask(LogRequest* req) { do { - // req was written, skip it. + // req was logged, skip it. if (req->next != NULL && req->data.empty()) { LogRequest* const saved_req = req; req = req->next; butil::return_object(saved_req); } - // Log all req to file. + // Log all requests to file. while (req->next != NULL) { LogRequest* const saved_req = req; req = req->next; @@ -593,7 +622,7 @@ void AsyncLog::LogTask() { DoLog(saved_req); saved_req->data.clear(); } - // Release WriteRequest until last request. + // Release LogRequests until last request. butil::return_object(saved_req); } if (!req->data.empty()) { @@ -601,43 +630,24 @@ void AsyncLog::LogTask() { req->data.clear(); } - if (NULL == cur_tail) { - for (cur_tail = req; cur_tail->next != NULL; - cur_tail = cur_tail->next); - } - // Return when there's no more WriteRequests and req is completely - // written. - if (IsLogComplete(cur_tail, (req == cur_tail), &cur_tail)) { - if (cur_tail != req) { - fprintf(stderr, "cur_tail should equal to req\n"); - } + // Return when there's no more LogRequests. + if (IsLogComplete(req)) { butil::return_object(req); return; } } while (true); } -bool AsyncLog::IsLogComplete(LogRequest* old_head, - bool singular_node, - LogRequest** new_tail) { +bool AsyncLogger::IsLogComplete(LogRequest* old_head) { if (old_head->next) { fprintf(stderr, "old_head->next should be NULL\n"); } LogRequest* new_head = old_head; LogRequest* desired = NULL; - bool return_when_no_more = true; - if (!singular_node) { - desired = old_head; - // Write is obviously not complete if old_head is not fully written. - return_when_no_more = false; - } if (_log_head.compare_exchange_strong( new_head, desired, butil::memory_order_acquire)) { // No one added new requests. - if (new_tail) { - *new_tail = old_head; - } - return return_when_no_more; + return true; } if (new_head == old_head) { fprintf(stderr, "new_head should not be equal to old_head\n"); @@ -664,17 +674,14 @@ bool AsyncLog::IsLogComplete(LogRequest* old_head, // Link old list with new list. old_head->next = tail; - if (new_tail) { - *new_tail = new_head; - } return false; } -void AsyncLog::DoLog(LogRequest* req) { +void AsyncLogger::DoLog(LogRequest* req) { DoLog(req->data); } -void AsyncLog::DoLog(const std::string& log) { +void AsyncLogger::DoLog(const std::string& log) { Log2File(log); _log_request_count.fetch_sub(1); } @@ -1236,7 +1243,7 @@ class DefaultLogSink : public LogSink { // write to log file if ((logging_destination & LOG_TO_FILE) != 0) { if (FLAGS_async_log) { - AsyncLog::GetInstance()->Log(std::move(log)); + AsyncLogger::GetInstance()->Log(std::move(log)); } else { Log2File(log); } From 4b84cc39b0c38f2e794b68a9ff227158c7d0891b Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Sun, 14 Jan 2024 14:49:57 +0800 Subject: [PATCH 4/5] Use sync log for the first LogRequest --- src/butil/logging.cc | 39 ++++++++++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/src/butil/logging.cc b/src/butil/logging.cc index 4da86c1b69..9f4e9cbe3a 100644 --- a/src/butil/logging.cc +++ b/src/butil/logging.cc @@ -149,6 +149,8 @@ DEFINE_bool(log_func_name, false, "Log function name in each log"); DEFINE_bool(async_log, false, "Use async log"); +DEFINE_bool(async_log_in_background_always, false, "Async log written in background always."); + DEFINE_int32(max_async_log_queue_size, 100000, "Max async log size. " "If current log count of async log > max_async_log_size, " "Use sync log to protect process."); @@ -519,7 +521,7 @@ void AsyncLogger::Log(const std::string& log) { bool is_full = FLAGS_max_async_log_queue_size > 0 && _log_request_count.fetch_add(1, butil::memory_order_relaxed) > FLAGS_max_async_log_queue_size; - if (is_full || _stop) { + if (is_full || _stop.load(butil::memory_order_relaxed)) { // Async logger is full or stopped, fallback to sync log. DoLog(log); return; @@ -543,7 +545,7 @@ void AsyncLogger::Log(std::string&& log) { bool is_full = FLAGS_max_async_log_queue_size > 0 && _log_request_count.fetch_add(1, butil::memory_order_relaxed) > FLAGS_max_async_log_queue_size; - if (is_full || _stop) { + if (is_full || _stop.load(butil::memory_order_relaxed)) { // Async logger is full or stopped, fallback to sync log. DoLog(log); return; @@ -561,28 +563,46 @@ void AsyncLogger::Log(std::string&& log) { void AsyncLogger::LogImpl(LogRequest* log_req) { log_req->next = LogRequest::UNCONNECTED; + // Release fence makes sure the thread getting request sees *req LogRequest* const prev_head = _log_head.exchange(log_req, butil::memory_order_release); if (prev_head != NULL) { + // Someone is logging. The async_log_thread thread may spin + // until req->next to be non-UNCONNECTED. This process is not + // lock-free, but the duration is so short(1~2 instructions, + // depending on compiler) that the spin rarely occurs in practice + // (I've not seen any spin in highly contended tests). log_req->next = prev_head; return; } // We've got the right to write. log_req->next = NULL; + if (!FLAGS_async_log_in_background_always) { + // Use sync log for the LogRequest + // which has got the right to write. + DoLog(log_req); + // Return when there's no more LogRequests. + if (IsLogComplete(log_req)) { + butil::return_object(log_req); + return; + } + } + BAIDU_SCOPED_LOCK(_mutex); - if (_stop) { + if (_stop.load(butil::memory_order_relaxed)) { + // Async logger is stopped, fallback to sync log. LogTask(log_req); } else { + // Wake up async logger. _current_log_request = log_req; _cond.Signal(); } } void AsyncLogger::StopAndJoin() { - if (!_stop) { + if (!_stop.exchange(true, butil::memory_order_relaxed)) { BAIDU_SCOPED_LOCK(_mutex); - _stop = true; _cond.Signal(); } if (!HasBeenJoined()) { @@ -593,10 +613,12 @@ void AsyncLogger::StopAndJoin() { void AsyncLogger::Run() { while (true) { BAIDU_SCOPED_LOCK(_mutex); - while (!_stop && !_current_log_request) { + while (!_stop.load(butil::memory_order_relaxed) && + !_current_log_request) { _cond.Wait(); } - if (_stop && !_current_log_request) { + if (_stop.load(butil::memory_order_relaxed) && + !_current_log_request) { break; } @@ -620,14 +642,12 @@ void AsyncLogger::LogTask(LogRequest* req) { req = req->next; if (!saved_req->data.empty()) { DoLog(saved_req); - saved_req->data.clear(); } // Release LogRequests until last request. butil::return_object(saved_req); } if (!req->data.empty()) { DoLog(req); - req->data.clear(); } // Return when there's no more LogRequests. @@ -679,6 +699,7 @@ bool AsyncLogger::IsLogComplete(LogRequest* old_head) { void AsyncLogger::DoLog(LogRequest* req) { DoLog(req->data); + req->data.clear(); } void AsyncLogger::DoLog(const std::string& log) { From ff614bc7bf276383bf872f26a9548b2d35b2a5d3 Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Sat, 20 Jan 2024 14:34:21 +0800 Subject: [PATCH 5/5] Support usec of logging for macOS --- src/butil/logging.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/butil/logging.cc b/src/butil/logging.cc index 9f4e9cbe3a..032a54481b 100644 --- a/src/butil/logging.cc +++ b/src/butil/logging.cc @@ -779,7 +779,7 @@ void PrintLogPrefix(std::ostream& os, int severity, const char* file, int line, const char* func) { PrintLogSeverity(os, severity); -#if defined(OS_LINUX) +#if defined(OS_LINUX) || defined(OS_MACOSX) timeval tv; gettimeofday(&tv, NULL); time_t t = tv.tv_sec; @@ -801,7 +801,7 @@ void PrintLogPrefix(std::ostream& os, int severity, << std::setw(2) << local_tm.tm_hour << ':' << std::setw(2) << local_tm.tm_min << ':' << std::setw(2) << local_tm.tm_sec; -#if defined(OS_LINUX) +#if defined(OS_LINUX) || defined(OS_MACOSX) os << '.' << std::setw(6) << tv.tv_usec; #endif if (FLAGS_log_pid) {