Skip to content

Commit

Permalink
Implement stalling mode for ping-pong buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
jdksjolen committed Dec 15, 2024
1 parent 22845a7 commit 90bc26e
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 37 deletions.
77 changes: 54 additions & 23 deletions src/hotspot/share/logging/logAsyncWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,36 @@
#include "runtime/atomic.hpp"
#include "runtime/os.inline.hpp"

class AsyncLogWriter::AsyncLogLocker : public StackObj {
class AsyncLogWriter::OuterLocker : public StackObj {
public:
AsyncLogLocker() {
OuterLocker() {
assert(_instance != nullptr, "AsyncLogWriter::_lock is unavailable");
_instance->_lock.lock();
_instance->_outer_lock.lock();
}

~AsyncLogLocker() {
_instance->_lock.unlock();
~OuterLocker() {
_instance->_outer_lock.unlock();
}
};

class AsyncLogWriter::InnerLocker : public StackObj {
public:
InnerLocker() {
assert(_instance != nullptr, "AsyncLogWriter::_lock is unavailable");
_instance->_inner_lock.lock();
}

~InnerLocker() {
_instance->_inner_lock.unlock();
}
};

// LogDecorator::None applies to 'constant initialization' because of its constexpr constructor.
const LogDecorations& AsyncLogWriter::None = LogDecorations(LogLevel::Warning, LogTagSetMapping<LogTag::__NO_TAG>::tagset(),
LogDecorators::None);

bool AsyncLogWriter::Buffer::push_back(LogFileStreamOutput* output, const LogDecorations& decorations, const char* msg) {
const size_t len = strlen(msg);
bool AsyncLogWriter::Buffer::push_back(LogFileStreamOutput* output, const LogDecorations& decorations, const char* msg, const size_t msg_len) {
const size_t len = msg_len;
const size_t sz = Message::calc_size(len);
const bool is_token = output == nullptr;
// Always leave headroom for the flush token. Pushing a token must succeed.
Expand All @@ -65,7 +77,7 @@ bool AsyncLogWriter::Buffer::push_back(LogFileStreamOutput* output, const LogDec
}

void AsyncLogWriter::Buffer::push_flush_token() {
bool result = push_back(nullptr, AsyncLogWriter::None, "");
bool result = push_back(nullptr, AsyncLogWriter::None, "", 0);
assert(result, "fail to enqueue the flush token.");
}

Expand All @@ -74,34 +86,46 @@ void AsyncLogWriter::enqueue_locked(LogFileStreamOutput* output, const LogDecora
// client should use "" instead.
assert(msg != nullptr, "enqueuing a null message!");

if (!_buffer->push_back(output, decorations, msg)) {
size_t msg_len = strlen(msg);

if (_buffer->push_back(output, decorations, msg, msg_len)) {
_data_available = true;
_inner_lock.notify();
return;
}

if (true /* stalling mode */) {
void* ptr = os::malloc(Message::calc_size(msg_len), mtLogging);
new (ptr) Message(output, decorations, msg, msg_len);
_stalled_message = (Message*)ptr;
while (_stalled_message != nullptr) {
_inner_lock.wait(0 /* no timeout */);
}
} else {
bool p_created;
uint32_t* counter = _stats.put_if_absent(output, 0, &p_created);
*counter = *counter + 1;
return;
}

_data_available = true;
_lock.notify();
}

void AsyncLogWriter::enqueue(LogFileStreamOutput& output, const LogDecorations& decorations, const char* msg) {
AsyncLogLocker locker;
OuterLocker locker;
InnerLocker ilocker;
enqueue_locked(&output, decorations, msg);
}

// LogMessageBuffer consists of a multiple-part/multiple-line message.
// The lock here guarantees its integrity.
void AsyncLogWriter::enqueue(LogFileStreamOutput& output, LogMessageBuffer::Iterator msg_iterator) {
AsyncLogLocker locker;

OuterLocker locker;
InnerLocker ilocker;
for (; !msg_iterator.is_at_end(); msg_iterator++) {
enqueue_locked(&output, msg_iterator.decorations(), msg_iterator.message());
}
}

AsyncLogWriter::AsyncLogWriter()
: _flush_sem(0), _lock(), _data_available(false),
: _flush_sem(0), _outer_lock(), _data_available(false),
_initialized(false),
_stats() {

Expand Down Expand Up @@ -153,10 +177,10 @@ void AsyncLogWriter::run() {
ResourceMark rm;
AsyncLogMap<AnyObj::RESOURCE_AREA> snapshot;
{
AsyncLogLocker locker;
InnerLocker ilocker;

while (!_data_available) {
_lock.wait(0/* no timeout */);
_inner_lock.wait(0/* no timeout */);
}
// Only doing a swap and statistics under the lock to
// guarantee that I/O jobs don't block logsites.
Expand All @@ -175,6 +199,13 @@ void AsyncLogWriter::run() {
_data_available = false;
}
write(snapshot);
if (_stalled_message != nullptr) {
InnerLocker ilocker;
Message* m = (Message*)_stalled_message;
m->output()->write_blocking(m->decorations(), m->message());
_stalled_message = nullptr;
_inner_lock.notify();
}
}
}

Expand Down Expand Up @@ -212,19 +243,19 @@ AsyncLogWriter* AsyncLogWriter::instance() {
void AsyncLogWriter::flush() {
if (_instance != nullptr) {
{
AsyncLogLocker locker;
OuterLocker locker;
// Push directly in-case we are at logical max capacity, as this must not get dropped.
_instance->_buffer->push_flush_token();
_instance->_data_available = true;
_instance->_lock.notify();
_instance->_outer_lock.notify();
}

_instance->_flush_sem.wait();
}
}

AsyncLogWriter::BufferUpdater::BufferUpdater(size_t newsize) {
AsyncLogLocker locker;
OuterLocker locker;
auto p = AsyncLogWriter::_instance;

_buf1 = p->_buffer;
Expand All @@ -238,7 +269,7 @@ AsyncLogWriter::BufferUpdater::~BufferUpdater() {
auto p = AsyncLogWriter::_instance;

{
AsyncLogLocker locker;
OuterLocker locker;

delete p->_buffer;
delete p->_buffer_staging;
Expand Down
16 changes: 13 additions & 3 deletions src/hotspot/share/logging/logAsyncWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class LogFileStreamOutput;
class AsyncLogWriter : public NonJavaThread {
friend class AsyncLogTest;
friend class AsyncLogTest_logBuffer_vm_Test;
class AsyncLogLocker;
class OuterLocker;
class InnerLocker;

// account for dropped messages
template <AnyObj::allocation_type ALLOC_TYPE>
Expand Down Expand Up @@ -125,7 +126,7 @@ class AsyncLogWriter : public NonJavaThread {
}

void push_flush_token();
bool push_back(LogFileStreamOutput* output, const LogDecorations& decorations, const char* msg);
bool push_back(LogFileStreamOutput* output, const LogDecorations& decorations, const char* msg, const size_t msg_len);

void reset() {
// Ensure _pos is Message-aligned
Expand Down Expand Up @@ -159,7 +160,13 @@ class AsyncLogWriter : public NonJavaThread {
static AsyncLogWriter* _instance;
Semaphore _flush_sem;
// Can't use a Monitor here as we need a low-level API that can be used without Thread::current().
PlatformMonitor _lock;
// Producers take both locks in the order 0. _outer_lock 1. _inner_lock
// The consumer thread only takes the _inner_lock.
// The _inner_lock protects the buffers and performs all communication between producer and consumer via wait/notify.
// This allows a producer to await progress from the consumer thread (by only releasing the _inner_lock)), whilst preventing all other consumers from progressing.
// Stalling is implemented by writing to _stalled_message, notifying the _inner_lock and releasing it _inner_lock.
PlatformMonitor _outer_lock;
PlatformMonitor _inner_lock;
bool _data_available;
volatile bool _initialized;
AsyncLogMap<AnyObj::C_HEAP> _stats;
Expand All @@ -168,6 +175,9 @@ class AsyncLogWriter : public NonJavaThread {
Buffer* _buffer;
Buffer* _buffer_staging;

// Stalled message
volatile Message* _stalled_message;

static const LogDecorations& None;

AsyncLogWriter();
Expand Down
9 changes: 7 additions & 2 deletions src/hotspot/share/logging/logConfiguration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -637,10 +637,15 @@ void LogConfiguration::print_command_line_help(outputStream* out) {
out->cr();

out->print_cr("Asynchronous logging (off by default):");
out->print_cr(" -Xlog:async");
out->print_cr(" -Xlog:async[:[mode]]");
out->print_cr(" All log messages are written to an intermediate buffer first and will then be flushed"
" to the corresponding log outputs by a standalone thread. Write operations at logsites are"
" guaranteed non-blocking.");
out->print_cr(" A mode, either 'drop' or 'stall', may be provided. If 'drop' is provided then"
" messages will be dropped if there is no room in the intermediate buffer,"
" if 'stall' is provided then the log operation will wait for room to be made by the output thread."
" The default mode is 'drop'.");

out->cr();

out->print_cr("Some examples:");
Expand Down Expand Up @@ -716,4 +721,4 @@ void LogConfiguration::notify_update_listeners() {
}
}

bool LogConfiguration::_async_mode = false;
LogConfiguration::AsyncMode LogConfiguration::_async_mode = AsyncMode::Off;
16 changes: 12 additions & 4 deletions src/hotspot/share/logging/logConfiguration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,14 @@ class LogConfiguration : public AllStatic {

static UpdateListenerFunction* _listener_callbacks;
static size_t _n_listener_callbacks;
static bool _async_mode;

public:
enum class AsyncMode {
Off, Stall, Drop
};

private:
static AsyncMode _async_mode;

// Create a new output. Returns null if failed.
static LogOutput* new_output(const char* name, const char* options, outputStream* errstream);
Expand Down Expand Up @@ -129,9 +136,10 @@ class LogConfiguration : public AllStatic {
// Rotates all LogOutput
static void rotate_all_outputs();

static bool is_async_mode() { return _async_mode; }
static void set_async_mode(bool value) {
_async_mode = value;
static AsyncMode async_mode() { return _async_mode; }
static bool is_async_mode() { return _async_mode != AsyncMode::Off; }
static void set_async_mode(AsyncMode mode) {
_async_mode = mode;
}
};

Expand Down
15 changes: 13 additions & 2 deletions src/hotspot/share/runtime/arguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2593,9 +2593,20 @@ jint Arguments::parse_each_vm_init_arg(const JavaVMInitArgs* args, JVMFlagOrigin
} else if (strcmp(tail, ":disable") == 0) {
LogConfiguration::disable_logging();
ret = true;
} else if (strcmp(tail, ":async") == 0) {
LogConfiguration::set_async_mode(true);
} else if (strncmp(tail, ":async", strlen(":async")) == 0) {
ret = true;
const char* async_tail = tail + strlen(":async");
if (*async_tail == '\0') {
// Default is to drop.
LogConfiguration::set_async_mode(LogConfiguration::AsyncMode::Drop);
} else if (strcmp(async_tail, ":stall") == 0) {
LogConfiguration::set_async_mode(LogConfiguration::AsyncMode::Stall);
} else if (strcmp(async_tail, ":drop") == 0) {
LogConfiguration::set_async_mode(LogConfiguration::AsyncMode::Drop);
} else {
// User provided unknown async option
ret = false;
}
} else if (*tail == '\0') {
ret = LogConfiguration::parse_command_line_arguments();
assert(ret, "-Xlog without arguments should never fail to parse");
Expand Down
6 changes: 3 additions & 3 deletions test/hotspot/gtest/logging/test_asynclog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,10 @@ TEST_VM_F(AsyncLogTest, logBuffer) {
const uintptr_t mask = (uintptr_t)(sizeof(void*) - 1);
bool res;

res = buffer->push_back(output, Default, "a log line");
res = buffer->push_back(output, Default, "a log line", strlen("a log line"));
EXPECT_TRUE(res) << "first message should succeed.";
line++;
res = buffer->push_back(output, Default, "yet another");
res = buffer->push_back(output, Default, "yet another", strlen("yet another"));
EXPECT_TRUE(res) << "second message should succeed.";
line++;

Expand All @@ -202,7 +202,7 @@ TEST_VM_F(AsyncLogTest, logBuffer) {
written = e->output()->write_blocking(e->decorations(), e->message());
EXPECT_GT(written, 0);

while (buffer->push_back(output, Default, "0123456789abcdef")) {
while (buffer->push_back(output, Default, "0123456789abcdef", strlen("0123456789abcdef"))) {
line++;
}

Expand Down

0 comments on commit 90bc26e

Please sign in to comment.