Skip to content

Commit

Permalink
DPL: Use X9 to make AsyncQueue atomic
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf committed Sep 19, 2024
1 parent 13baa03 commit 8dddc15
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 1 deletion.
1 change: 1 addition & 0 deletions Framework/Core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ o2_add_library(Framework
O2::Headers
O2::MemoryResources
O2::PCG
O2::X9
RapidJSON::RapidJSON
Arrow::arrow_shared
Microsoft.GSL::GSL
Expand Down
12 changes: 12 additions & 0 deletions Framework/Core/include/Framework/AsyncQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
#include "Framework/TimesliceSlot.h"
#include <string>
#include <vector>
#include <atomic>

typedef struct x9_inbox_internal x9_inbox;
typedef struct x9_node_internal x9_node;

namespace o2::framework
{
Expand Down Expand Up @@ -89,6 +93,12 @@ struct AsyncQueue {
std::vector<AsyncTaskSpec> prototypes;
std::vector<AsyncTask> tasks;
size_t iteration = 0;

std::atomic<bool> first = true;

// Inbox for the message queue used to append
// tasks to this queue.
x9_inbox* inbox = nullptr;
AsyncQueue();
};

Expand All @@ -104,6 +114,8 @@ struct AsyncQueueHelpers {
/// 3. only execute the highest (timeslice, debounce) value
static void run(AsyncQueue& queue, TimesliceId oldestPossibleTimeslice);

// Flush tasks which were posted but not yet committed to the queue
static void flushPending(AsyncQueue& queue);
/// Reset the queue to its initial state
static void reset(AsyncQueue& queue);
};
Expand Down
33 changes: 32 additions & 1 deletion Framework/Core/src/AsyncQueue.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@

#include "Framework/AsyncQueue.h"
#include "Framework/Signpost.h"
#include "x9.h"
#include <numeric>

O2_DECLARE_DYNAMIC_LOG(async_queue);

namespace o2::framework
{
AsyncQueue::AsyncQueue()
: inbox(x9_create_inbox(16, "async_queue", sizeof(AsyncTask)))
{
this->inbox = x9_create_inbox(16, "async_queue", sizeof(AsyncTask));
}

auto AsyncQueueHelpers::create(AsyncQueue& queue, AsyncTaskSpec spec) -> AsyncTaskId
Expand All @@ -31,11 +34,39 @@ auto AsyncQueueHelpers::create(AsyncQueue& queue, AsyncTaskSpec spec) -> AsyncTa

auto AsyncQueueHelpers::post(AsyncQueue& queue, AsyncTask const& task) -> void
{
queue.tasks.push_back(task);
// Until we do not manage to write to the inbox, keep removing
// items from the queue if you are the first one which fails to
// write.
while (!x9_write_to_inbox(queue.inbox, sizeof(AsyncTask), &task)) {
AsyncQueueHelpers::flushPending(queue);
}
}

auto AsyncQueueHelpers::flushPending(AsyncQueue& queue) -> void
{
bool isFirst = true;
if (!std::atomic_compare_exchange_strong(&queue.first, &isFirst, false)) {
// Not the first, try again.
return;
}
// First thread which does not manage to write to the queue.
// Flush it a bit before we try again.
AsyncTask toFlush;
// This potentially stalls if the inserting tasks are faster to insert
// than we are to retrieve. We should probably have a cut-off
while (x9_read_from_inbox(queue.inbox, sizeof(AsyncTask), &toFlush)) {
queue.tasks.push_back(toFlush);
}
queue.first = true;
}

auto AsyncQueueHelpers::run(AsyncQueue& queue, TimesliceId oldestPossible) -> void
{
// We synchronize right before we run to get as many
// tasks as possible. Notice we might still miss some
// which will have to handled on a subsequent iteration.
AsyncQueueHelpers::flushPending(queue);

if (queue.tasks.empty()) {
return;
}
Expand Down

0 comments on commit 8dddc15

Please sign in to comment.