From c85beb8106a1e6633ee2156c524d5f000f3e12d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Fri, 23 Jun 2023 13:27:34 +0200 Subject: [PATCH] Refactor CallQueue flushing for clarity --- core/object/message_queue.cpp | 94 ++++++++++++++++++----------------- core/object/message_queue.h | 2 + 2 files changed, 51 insertions(+), 45 deletions(-) diff --git a/core/object/message_queue.cpp b/core/object/message_queue.cpp index 18ba5d5b3006..506f8291eb2c 100644 --- a/core/object/message_queue.cpp +++ b/core/object/message_queue.cpp @@ -222,62 +222,66 @@ void CallQueue::_call_function(const Callable &p_callable, const Variant *p_args } } -Error CallQueue::flush() { - LOCK_MUTEX; - - // Thread overrides are not meant to be flushed, but appended to the main one. - if (this == MessageQueue::thread_singleton) { - if (pages.size() == 0) { - return OK; - } +Error CallQueue::_transfer_messages_to_main_queue() { + if (pages.size() == 0) { + return OK; + } - CallQueue *mq = MessageQueue::main_singleton; - DEV_ASSERT(!mq->allocator_is_custom && !allocator_is_custom); // Transferring pages is only safe if using the same alloator parameters. - - mq->mutex.lock(); - - // Here we're transferring the data from this queue to the main one. - // However, it's very unlikely big amounts of messages will be queued here, - // so PagedArray/Pool would be overkill. Also, in most cases the data will fit - // an already existing page of the main queue. - - // Let's see if our first (likely only) page fits the current target queue page. - uint32_t src_page = 0; - { - if (mq->pages_used) { - uint32_t dst_page = mq->pages_used - 1; - uint32_t dst_offset = mq->page_bytes[dst_page]; - if (dst_offset + page_bytes[0] < uint32_t(PAGE_SIZE_BYTES)) { - memcpy(mq->pages[dst_page]->data + dst_offset, pages[0]->data, page_bytes[0]); - mq->page_bytes[dst_page] += page_bytes[0]; - src_page++; - } + CallQueue *mq = MessageQueue::main_singleton; + DEV_ASSERT(!mq->allocator_is_custom && !allocator_is_custom); // Transferring pages is only safe if using the same alloator parameters. + + mq->mutex.lock(); + + // Here we're transferring the data from this queue to the main one. + // However, it's very unlikely big amounts of messages will be queued here, + // so PagedArray/Pool would be overkill. Also, in most cases the data will fit + // an already existing page of the main queue. + + // Let's see if our first (likely only) page fits the current target queue page. + uint32_t src_page = 0; + { + if (mq->pages_used) { + uint32_t dst_page = mq->pages_used - 1; + uint32_t dst_offset = mq->page_bytes[dst_page]; + if (dst_offset + page_bytes[0] < uint32_t(PAGE_SIZE_BYTES)) { + memcpy(mq->pages[dst_page]->data + dst_offset, pages[0]->data, page_bytes[0]); + mq->page_bytes[dst_page] += page_bytes[0]; + src_page++; } } + } - // Any other possibly existing source page needs to be added. + // Any other possibly existing source page needs to be added. - if (mq->pages_used + (pages_used - src_page) > mq->max_pages) { - ERR_PRINT("Failed appending thread queue. Message queue out of memory. " + mq->error_text); - mq->statistics(); - mq->mutex.unlock(); - return ERR_OUT_OF_MEMORY; - } + if (mq->pages_used + (pages_used - src_page) > mq->max_pages) { + ERR_PRINT("Failed appending thread queue. Message queue out of memory. " + mq->error_text); + mq->statistics(); + mq->mutex.unlock(); + return ERR_OUT_OF_MEMORY; + } - for (; src_page < pages_used; src_page++) { - mq->_add_page(); - memcpy(mq->pages[mq->pages_used - 1]->data, pages[src_page]->data, page_bytes[src_page]); - mq->page_bytes[mq->pages_used - 1] = page_bytes[src_page]; - } + for (; src_page < pages_used; src_page++) { + mq->_add_page(); + memcpy(mq->pages[mq->pages_used - 1]->data, pages[src_page]->data, page_bytes[src_page]); + mq->page_bytes[mq->pages_used - 1] = page_bytes[src_page]; + } - mq->mutex.unlock(); + mq->mutex.unlock(); - page_bytes[0] = 0; - pages_used = 1; + page_bytes[0] = 0; + pages_used = 1; - return OK; + return OK; +} + +Error CallQueue::flush() { + // Thread overrides are not meant to be flushed, but appended to the main one. + if (unlikely(this == MessageQueue::thread_singleton)) { + return _transfer_messages_to_main_queue(); } + LOCK_MUTEX; + if (pages.size() == 0) { // Never allocated UNLOCK_MUTEX; diff --git a/core/object/message_queue.h b/core/object/message_queue.h index 9f567e4dd045..c2f4ad16435e 100644 --- a/core/object/message_queue.h +++ b/core/object/message_queue.h @@ -98,6 +98,8 @@ class CallQueue { } } + Error _transfer_messages_to_main_queue(); + void _add_page(); void _call_function(const Callable &p_callable, const Variant *p_args, int p_argcount, bool p_show_error);