From bdc141c9e4f287f3b6e6397e09ea821838d273ee Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Mon, 27 May 2024 09:38:03 +0800 Subject: [PATCH] Opt performance of async log (#2602) * Opt performance of async log * Fix ci --- src/butil/logging.cc | 174 ++++++++++++++++++++++++--------------- test/logging_unittest.cc | 43 +++++----- 2 files changed, 132 insertions(+), 85 deletions(-) diff --git a/src/butil/logging.cc b/src/butil/logging.cc index 78436ec6e1..14c977bddf 100644 --- a/src/butil/logging.cc +++ b/src/butil/logging.cc @@ -428,12 +428,9 @@ void Log2File(const std::string& log) { if (InitializeLogFileHandle()) { #if defined(OS_WIN) SetFilePointer(log_file, 0, 0, SEEK_END); - DWORD num_written; - WriteFile(log_file, - static_cast(log.data()), - static_cast(log.size()), - &num_written, - NULL); + DWORD num_written; + WriteFile(log_file, static_cast(log.data()), + static_cast(log.size()), &num_written, NULL); #else fwrite(log.data(), log.size(), 1, log_file); fflush(log_file); @@ -443,11 +440,41 @@ void Log2File(const std::string& log) { } // namespace +#if defined(OS_LINUX) || defined(OS_MACOSX) +typedef timeval TimeVal; +#else +struct TimeVal { + time_t tv_sec; +}; +#endif + +TimeVal GetTimestamp() { +#if defined(OS_LINUX) || defined(OS_MACOSX) + timeval tv; + gettimeofday(&tv, NULL); + return tv; +#else + return { time(NULL) }; +#endif +} + +struct BAIDU_CACHELINE_ALIGNMENT LogInfo { + std::string file; + std::string func; + std::string content; + TimeVal timestamp{}; + int severity{0}; + int line{0}; + // If `raw' is false, content has been a complete log. + // If raw is true, a complete log consists of all properties of LogInfo. + bool raw{false}; +}; + struct BAIDU_CACHELINE_ALIGNMENT LogRequest { static LogRequest* const UNCONNECTED; LogRequest* next{NULL}; - std::string data; + LogInfo log_info; }; LogRequest* const LogRequest::UNCONNECTED = (LogRequest*)(intptr_t)-1; @@ -456,8 +483,8 @@ class AsyncLogger : public butil::SimpleThread { public: static AsyncLogger* GetInstance(); - void Log(const std::string& log); - void Log(std::string&& log); + void Log(const LogInfo& log_info); + void Log(LogInfo&& log_info); void StopAndJoin(); private: @@ -484,7 +511,7 @@ friend struct DefaultSingletonTraits; bool IsLogComplete(LogRequest* old_head); void DoLog(LogRequest* req); - void DoLog(const std::string& log); + void DoLog(const LogInfo& log_info); butil::atomic _log_head; butil::Mutex _mutex; @@ -515,8 +542,26 @@ AsyncLogger::~AsyncLogger() { StopAndJoin(); } -void AsyncLogger::Log(const std::string& log) { - if (log.empty()) { +std::string LogInfoToLogStr(int severity, butil::StringPiece file, + int line, butil::StringPiece func, + butil::StringPiece content) { + // There's a copy here to concatenate prefix and content. Since + // DefaultLogSink is hardly used right now, the copy is irrelevant. + // A LogSink focused on performance should also be able to handle + // non-continuous inputs which is a must to maximize performance. + std::ostringstream os; + PrintLog(os, severity, file.data(), line, func.data(), content); + os << '\n'; + return os.str(); +} + +std::string LogInfo2LogStr(const LogInfo& log_info) { + return LogInfoToLogStr(log_info.severity, log_info.file, log_info.line, + log_info.func, log_info.content); +} + +void AsyncLogger::Log(const LogInfo& log_info) { + if (log_info.content.empty()) { return; } @@ -525,22 +570,22 @@ void AsyncLogger::Log(const std::string& log) { FLAGS_max_async_log_queue_size; if (is_full || _stop.load(butil::memory_order_relaxed)) { // Async logger is full or stopped, fallback to sync log. - DoLog(log); + DoLog(log_info); return; } auto log_req = butil::get_object(); if (!log_req) { // Async log failed, fallback to sync log. - DoLog(log); + DoLog(log_info); return; } - log_req->data = log; + log_req->log_info = log_info; LogImpl(log_req); } -void AsyncLogger::Log(std::string&& log) { - if (log.empty()) { +void AsyncLogger::Log(LogInfo&& log_info) { + if (log_info.content.empty()) { return; } @@ -549,17 +594,17 @@ void AsyncLogger::Log(std::string&& log) { FLAGS_max_async_log_queue_size; if (is_full || _stop.load(butil::memory_order_relaxed)) { // Async logger is full or stopped, fallback to sync log. - DoLog(log); + DoLog(log_info); return; } auto log_req = butil::get_object(); if (!log_req) { // Async log failed, fallback to sync log. - DoLog(log); + DoLog(log_info); return; } - log_req->data = std::move(log); + log_req->log_info = std::move(log_info); LogImpl(log_req); } @@ -632,7 +677,7 @@ void AsyncLogger::Run() { void AsyncLogger::LogTask(LogRequest* req) { do { // req was logged, skip it. - if (req->next != NULL && req->data.empty()) { + if (req->next != NULL && req->log_info.content.empty()) { LogRequest* const saved_req = req; req = req->next; butil::return_object(saved_req); @@ -642,13 +687,13 @@ void AsyncLogger::LogTask(LogRequest* req) { while (req->next != NULL) { LogRequest* const saved_req = req; req = req->next; - if (!saved_req->data.empty()) { + if (!saved_req->log_info.content.empty()) { DoLog(saved_req); } // Release LogRequests until last request. butil::return_object(saved_req); } - if (!req->data.empty()) { + if (!req->log_info.content.empty()) { DoLog(req); } @@ -700,13 +745,17 @@ bool AsyncLogger::IsLogComplete(LogRequest* old_head) { } void AsyncLogger::DoLog(LogRequest* req) { - DoLog(req->data); - req->data.clear(); + DoLog(req->log_info); + req->log_info.content.clear(); } -void AsyncLogger::DoLog(const std::string& log) { - Log2File(log); - _log_request_count.fetch_sub(1); +void AsyncLogger::DoLog(const LogInfo& log_info) { + if (log_info.raw) { + Log2File(LogInfo2LogStr(log_info)); + } else { + Log2File(log_info.content); + } + _log_request_count.fetch_sub(1, butil::memory_order_relaxed); } LoggingSettings::LoggingSettings() @@ -778,16 +827,10 @@ static void PrintLogSeverity(std::ostream& os, int severity) { } void PrintLogPrefix(std::ostream& os, int severity, - const char* file, int line, - const char* func) { + butil::StringPiece file, int line, + butil::StringPiece func, TimeVal tv) { PrintLogSeverity(os, severity); -#if defined(OS_LINUX) || defined(OS_MACOSX) - timeval tv; - gettimeofday(&tv, NULL); time_t t = tv.tv_sec; -#else - time_t t = time(NULL); -#endif struct tm local_tm = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, NULL}; #if _MSC_VER >= 1400 localtime_s(&local_tm, &t); @@ -822,7 +865,7 @@ void PrintLogPrefix(std::ostream& os, int severity, os << ' ' << hostname; } os << ' ' << file << ':' << line; - if (func && *func != '\0') { + if (!func.empty()) { os << " " << func; } os << "] "; @@ -832,12 +875,13 @@ void PrintLogPrefix(std::ostream& os, int severity, void PrintLogPrefix(std::ostream& os, int severity, const char* file, int line) { - PrintLogPrefix(os, severity, file, line, ""); + PrintLogPrefix(os, severity, file, line, "", GetTimestamp()); } static void PrintLogPrefixAsJSON(std::ostream& os, int severity, - const char* file, const char* func, - int line) { + butil::StringPiece file, + butil::StringPiece func, + int line, TimeVal tv) { // severity os << "\"L\":\""; if (severity < 0) { @@ -849,13 +893,7 @@ static void PrintLogPrefixAsJSON(std::ostream& os, int severity, } // time os << "\",\"T\":\""; -#if defined(OS_LINUX) - timeval tv; - gettimeofday(&tv, NULL); time_t t = tv.tv_sec; -#else - time_t t = time(NULL); -#endif struct tm local_tm = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, NULL}; #if _MSC_VER >= 1400 localtime_s(&local_tm, &t); @@ -871,7 +909,7 @@ static void PrintLogPrefixAsJSON(std::ostream& os, int severity, << std::setw(2) << local_tm.tm_hour << ':' << std::setw(2) << local_tm.tm_min << ':' << std::setw(2) << local_tm.tm_sec; -#if defined(OS_LINUX) +#if defined(OS_LINUX) || defined(OS_MACOSX) os << '.' << std::setw(6) << tv.tv_usec; #endif os << "\","; @@ -889,7 +927,7 @@ static void PrintLogPrefixAsJSON(std::ostream& os, int severity, os << "\"host\":\"" << hostname << "\","; } os << "\"C\":\"" << file << ':' << line; - if (func && *func != '\0') { + if (!func.empty()) { os << " " << func; } os << "\""; @@ -922,11 +960,11 @@ inline void OutputLog(std::ostream& os, const butil::StringPiece& s) { void PrintLog(std::ostream& os, int severity, const char* file, int line, const char* func, const butil::StringPiece& content) { if (!FLAGS_log_as_json) { - PrintLogPrefix(os, severity, file, line, func); + PrintLogPrefix(os, severity, file, line, func, GetTimestamp()); OutputLog(os, content); } else { os << '{'; - PrintLogPrefixAsJSON(os, severity, file, func, line); + PrintLogPrefixAsJSON(os, severity, file, func, line, GetTimestamp()); bool pair_quote = false; if (content.empty() || content[0] != '"') { // not a json, add a 'M' field @@ -1246,33 +1284,39 @@ class DefaultLogSink : public LogSink { bool OnLogMessage(int severity, const char* file, int line, const char* func, const butil::StringPiece& content) override { - // There's a copy here to concatenate prefix and content. Since - // DefaultLogSink is hardly used right now, the copy is irrelevant. - // A LogSink focused on performance should also be able to handle - // non-continuous inputs which is a must to maximize performance. - std::ostringstream os; - PrintLog(os, severity, file, line, func, content); - os << '\n'; - std::string log = os.str(); - - if ((logging_destination & LOG_TO_SYSTEM_DEBUG_LOG) != 0) { - fwrite(log.data(), log.size(), 1, stderr); - fflush(stderr); - } else if (severity >= kAlwaysPrintErrorLevel) { + std::string log; + if ((logging_destination & LOG_TO_SYSTEM_DEBUG_LOG) != 0 || + severity >= kAlwaysPrintErrorLevel) { + log = LogInfoToLogStr(severity, file, line, func, content); // When we're only outputting to a log file, above a certain log level, we // should still output to stderr so that we can better detect and diagnose // problems with unit tests, especially on the buildbots. fwrite(log.data(), log.size(), 1, stderr); fflush(stderr); } - // write to log file if ((logging_destination & LOG_TO_FILE) != 0) { if ((FLAGS_crash_on_fatal_log && severity == BLOG_FATAL) || !FLAGS_async_log) { + if (log.empty()) { + log = LogInfoToLogStr(severity, file, line, func, content); + } Log2File(log); } else { - AsyncLogger::GetInstance()->Log(std::move(log)); + LogInfo info; + if (log.empty()) { + info.severity = severity; + info.timestamp = GetTimestamp(); + info.file = file; + info.func = func; + info.line = line; + info.content = content.as_string(); + info.raw = true; + } else { + info.content = std::move(log); + info.raw = false; + } + AsyncLogger::GetInstance()->Log(std::move(info)); } } return true; diff --git a/test/logging_unittest.cc b/test/logging_unittest.cc index 66834b9890..6072044be5 100644 --- a/test/logging_unittest.cc +++ b/test/logging_unittest.cc @@ -17,6 +17,8 @@ DECLARE_bool(crash_on_fatal_log); DECLARE_int32(v); DECLARE_bool(log_func_name); DECLARE_bool(async_log); +DECLARE_bool(async_log_in_background_always); +DECLARE_int32(max_async_log_queue_size); namespace { @@ -513,14 +515,14 @@ TEST_F(LoggingTest, async_log) { ASSERT_EQ(0, pthread_create(&threads[i], NULL, test_async_log, &log)); } - sleep(5); + usleep(1000 * 500); g_stopped = true; for (int i = 0; i < thread_num; ++i) { pthread_join(threads[i], NULL); } // Wait for async log thread to flush all logs to file. - sleep(10); + sleep(15); std::ostringstream oss; std::string cmd = butil::string_printf("grep -c %s %s", @@ -545,6 +547,8 @@ void* test_log(void* void_arg) { auto args = (PerfArgs*)void_arg; args->ready = true; butil::Timer t; + std::string log = *args->log; + int counter = 0; while (!g_stopped) { if (g_started) { break; @@ -554,13 +558,14 @@ void* test_log(void* void_arg) { t.start(); while (!g_stopped) { { - LOG(INFO) << *args->log; + LOG(INFO) << log; test_logging_count.fetch_add(1, butil::memory_order_relaxed); } - ++args->counter; + ++counter; } t.stop(); args->elapse_ns = t.n_elapsed(); + args->counter = counter; return NULL; } @@ -588,11 +593,12 @@ void PerfTest(int thread_num, const std::string& log, bool async) { } usleep(1000); } + int sleep_s = 2; g_started = true; char prof_name[32]; snprintf(prof_name, sizeof(prof_name), "logging_%d.prof", ++g_prof_name_counter); ProfilerStart(prof_name); - sleep(5); + sleep(sleep_s); ProfilerStop(); g_stopped = true; int64_t wait_time = 0; @@ -606,31 +612,28 @@ void PerfTest(int thread_num, const std::string& log, bool async) { << " log_type=" << (async ? "async" : "sync") << " log_size=" << log.size() << " count=" << count - << " average_time=" << wait_time / (double)count + << " duration=" << sleep_s << "s" + << " qps=" << (int)(count / (double)sleep_s) + << " average_time=" << wait_time / (double)count << "us" << std::endl; } TEST_F(LoggingTest, performance) { bool saved_async_log = FLAGS_async_log; + FLAGS_max_async_log_queue_size = + std::numeric_limits::max(); + FLAGS_async_log_in_background_always = true; LoggingSettings settings; settings.logging_dest = LOG_TO_FILE; + settings.delete_old = DELETE_OLD_LOG_FILE; InitLogging(settings); - std::string log(64, 'a'); - int thread_num = 1; - PerfTest(thread_num, log, true); - sleep(10); - PerfTest(thread_num, log, false); - - thread_num = 2; - PerfTest(thread_num, log, true); - sleep(10); - PerfTest(thread_num, log, false); - - thread_num = 4; - PerfTest(thread_num, log, true); + std::string log(100, 'a'); + PerfTest(1, log, false); + PerfTest(8, log, false); + PerfTest(1, log, true); sleep(10); - PerfTest(thread_num, log, false); + PerfTest(8, log, true); FLAGS_async_log = saved_async_log; }