Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor CallQueue flushing for clarity #78612

Merged
merged 1 commit into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 49 additions & 45 deletions core/object/message_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,62 +222,66 @@ void CallQueue::_call_function(const Callable &p_callable, const Variant *p_args
}
}

Error CallQueue::flush() {
LOCK_MUTEX;

// Thread overrides are not meant to be flushed, but appended to the main one.
if (this == MessageQueue::thread_singleton) {
if (pages.size() == 0) {
return OK;
}
Error CallQueue::_transfer_messages_to_main_queue() {
if (pages.size() == 0) {
return OK;
}

CallQueue *mq = MessageQueue::main_singleton;
DEV_ASSERT(!mq->allocator_is_custom && !allocator_is_custom); // Transferring pages is only safe if using the same alloator parameters.

mq->mutex.lock();

// Here we're transferring the data from this queue to the main one.
// However, it's very unlikely big amounts of messages will be queued here,
// so PagedArray/Pool would be overkill. Also, in most cases the data will fit
// an already existing page of the main queue.

// Let's see if our first (likely only) page fits the current target queue page.
uint32_t src_page = 0;
{
if (mq->pages_used) {
uint32_t dst_page = mq->pages_used - 1;
uint32_t dst_offset = mq->page_bytes[dst_page];
if (dst_offset + page_bytes[0] < uint32_t(PAGE_SIZE_BYTES)) {
memcpy(mq->pages[dst_page]->data + dst_offset, pages[0]->data, page_bytes[0]);
mq->page_bytes[dst_page] += page_bytes[0];
src_page++;
}
CallQueue *mq = MessageQueue::main_singleton;
DEV_ASSERT(!mq->allocator_is_custom && !allocator_is_custom); // Transferring pages is only safe if using the same alloator parameters.

mq->mutex.lock();

// Here we're transferring the data from this queue to the main one.
// However, it's very unlikely big amounts of messages will be queued here,
// so PagedArray/Pool would be overkill. Also, in most cases the data will fit
// an already existing page of the main queue.

// Let's see if our first (likely only) page fits the current target queue page.
uint32_t src_page = 0;
{
if (mq->pages_used) {
uint32_t dst_page = mq->pages_used - 1;
uint32_t dst_offset = mq->page_bytes[dst_page];
if (dst_offset + page_bytes[0] < uint32_t(PAGE_SIZE_BYTES)) {
memcpy(mq->pages[dst_page]->data + dst_offset, pages[0]->data, page_bytes[0]);
mq->page_bytes[dst_page] += page_bytes[0];
src_page++;
}
}
}

// Any other possibly existing source page needs to be added.
// Any other possibly existing source page needs to be added.

if (mq->pages_used + (pages_used - src_page) > mq->max_pages) {
ERR_PRINT("Failed appending thread queue. Message queue out of memory. " + mq->error_text);
mq->statistics();
mq->mutex.unlock();
return ERR_OUT_OF_MEMORY;
}
if (mq->pages_used + (pages_used - src_page) > mq->max_pages) {
ERR_PRINT("Failed appending thread queue. Message queue out of memory. " + mq->error_text);
mq->statistics();
mq->mutex.unlock();
return ERR_OUT_OF_MEMORY;
}

for (; src_page < pages_used; src_page++) {
mq->_add_page();
memcpy(mq->pages[mq->pages_used - 1]->data, pages[src_page]->data, page_bytes[src_page]);
mq->page_bytes[mq->pages_used - 1] = page_bytes[src_page];
}
for (; src_page < pages_used; src_page++) {
mq->_add_page();
memcpy(mq->pages[mq->pages_used - 1]->data, pages[src_page]->data, page_bytes[src_page]);
mq->page_bytes[mq->pages_used - 1] = page_bytes[src_page];
}

mq->mutex.unlock();
mq->mutex.unlock();

page_bytes[0] = 0;
pages_used = 1;
page_bytes[0] = 0;
pages_used = 1;

return OK;
return OK;
}

Error CallQueue::flush() {
// Thread overrides are not meant to be flushed, but appended to the main one.
if (unlikely(this == MessageQueue::thread_singleton)) {
return _transfer_messages_to_main_queue();
}

LOCK_MUTEX;

if (pages.size() == 0) {
// Never allocated
UNLOCK_MUTEX;
Expand Down
2 changes: 2 additions & 0 deletions core/object/message_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ class CallQueue {
}
}

Error _transfer_messages_to_main_queue();

void _add_page();

void _call_function(const Callable &p_callable, const Variant *p_args, int p_argcount, bool p_show_error);
Expand Down