Skip to content

Commit

Permalink
add support for auxiliary memory bound in mpmc_queue (#6820)
Browse files Browse the repository at this point in the history
ref #6528
  • Loading branch information
windtalker authored Feb 16, 2023
1 parent 9da6490 commit f2abf0c
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 5 deletions.
75 changes: 70 additions & 5 deletions dbms/src/Common/MPMCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,23 @@ class MPMCQueue
public:
using Status = MPMCQueueStatus;
using Result = MPMCQueueResult;
using ElementAuxiliaryMemoryUsageFunc = std::function<Int64(const T & element)>;

explicit MPMCQueue(size_t capacity_)
: capacity(capacity_)
, max_auxiliary_memory_usage(std::numeric_limits<Int64>::max())
, get_auxiliary_memory_usage([](const T &) { return 0; })
, element_auxiliary_memory(capacity, 0)
, data(capacity * sizeof(T))
{
}

/// max_auxiliary_memory_usage_ <= 0 means no limit on auxiliary memory usage
MPMCQueue(size_t capacity_, Int64 max_auxiliary_memory_usage_, ElementAuxiliaryMemoryUsageFunc && get_auxiliary_memory_usage_)
: capacity(capacity_)
, max_auxiliary_memory_usage(max_auxiliary_memory_usage_ <= 0 ? std::numeric_limits<Int64>::max() : max_auxiliary_memory_usage_)
, get_auxiliary_memory_usage(std::move(get_auxiliary_memory_usage_))
, element_auxiliary_memory(capacity, 0)
, data(capacity * sizeof(T))
{
}
Expand Down Expand Up @@ -281,6 +295,7 @@ class MPMCQueue
thread_local WaitingNode node;
#endif
std::unique_lock lock(mu);
bool is_timeout = false;

if constexpr (need_wait)
{
Expand All @@ -289,26 +304,38 @@ class MPMCQueue
return read_pos < write_pos || !isNormal();
};
if (!wait(lock, reader_head, node, pred, deadline))
return Result::TIMEOUT;
is_timeout = true;
}
/// double check status after potential wait
if (!isCancelled() && read_pos < write_pos)
{
auto & obj = getObj(read_pos);
res = std::move(obj);
destruct(obj);
updateElementAuxiliaryMemory<true>(read_pos);

/// update pos only after all operations that may throw an exception.
++read_pos;
/// assert so in debug mode, we can get notified if some bugs happens when updating current_auxiliary_memory_usage
assert(read_pos != write_pos || current_auxiliary_memory_usage == 0);
if (read_pos == write_pos)
current_auxiliary_memory_usage = 0;

/// Notify next writer within the critical area because:
/// 1. If we remove the next writer node and notify it later,
/// it may find itself can't obtain the lock while not being in the list.
/// This need carefully procesing in `assignObj`.
/// 2. If we do not remove the next writer, only obtain its pointer and notify it later,
/// deadlock can be possible because different readers may notify one writer.
notifyNext(writer_head);
if (current_auxiliary_memory_usage < max_auxiliary_memory_usage)
notifyNext(writer_head);
return Result::OK;
}
if constexpr (need_wait)
{
if (is_timeout)
return Result::TIMEOUT;
}
switch (status)
{
case Status::NORMAL:
Expand All @@ -329,22 +356,24 @@ class MPMCQueue
thread_local WaitingNode node;
#endif
std::unique_lock lock(mu);
bool is_timeout = false;

if constexpr (need_wait)
{
auto pred = [&] {
return write_pos - read_pos < capacity || !isNormal();
return (write_pos - read_pos < capacity && current_auxiliary_memory_usage < max_auxiliary_memory_usage) || !isNormal();
};
if (!wait(lock, writer_head, node, pred, deadline))
return Result::TIMEOUT;
is_timeout = true;
}

/// double check status after potential wait
/// check write_pos because timeouted will also reach here.
if (isNormal() && write_pos - read_pos < capacity)
if (isNormal() && (write_pos - read_pos < capacity && current_auxiliary_memory_usage < max_auxiliary_memory_usage))
{
void * addr = getObjAddr(write_pos);
assigner(addr);
updateElementAuxiliaryMemory<false>(write_pos);

/// update pos only after all operations that may throw an exception.
++write_pos;
Expand All @@ -353,6 +382,11 @@ class MPMCQueue
notifyNext(reader_head);
return Result::OK;
}
if constexpr (need_wait)
{
if (is_timeout)
return Result::TIMEOUT;
}
switch (status)
{
case Status::NORMAL:
Expand Down Expand Up @@ -411,6 +445,7 @@ class MPMCQueue

read_pos = 0;
write_pos = 0;
current_auxiliary_memory_usage = 0;
}

template <typename F>
Expand All @@ -426,8 +461,36 @@ class MPMCQueue
return false;
}

template <bool read>
ALWAYS_INLINE void updateElementAuxiliaryMemory(size_t pos)
{
if constexpr (read)
{
auto & elem_value = element_auxiliary_memory[pos % capacity];
current_auxiliary_memory_usage -= elem_value;
elem_value = 0;
}
else
{
auto auxiliary_memory = get_auxiliary_memory_usage(getObj(pos));
current_auxiliary_memory_usage += auxiliary_memory;
element_auxiliary_memory[pos % capacity] = auxiliary_memory;
}
}

private:
const Int64 capacity;
/// max_auxiliary_memory_usage is the bound of all the element's auxiliary memory
/// for an element stored in the queue, it will take two kinds of memory
/// 1. the memory took by the element itself: sizeof(T) bytes, it is a constant value and already reserved in `data`
/// 2. the auxiliary memory of the element, for example, if the element type is std::vector<Int64>,
/// then the auxiliary memory for each element is sizeof(Int64) * element.capacity()
/// If the element stored in the queue has auxiliary memory usage, and the user wants to set a bound for the total
/// auxiliary memory usage, then the user should provide the function to calculate the auxiliary memory usage
/// Note: unlike capacity, max_auxiliary_memory_usage is actually a soft-limit because we need to make at least
/// one element can be pushed to the queue even if its auxiliary memory exceeds max_auxiliary_memory_usage
const Int64 max_auxiliary_memory_usage;
const ElementAuxiliaryMemoryUsageFunc get_auxiliary_memory_usage;

mutable std::mutex mu;
WaitingNode reader_head;
Expand All @@ -436,7 +499,9 @@ class MPMCQueue
Int64 write_pos = 0;
Status status = Status::NORMAL;
String cancel_reason;
Int64 current_auxiliary_memory_usage = 0;

std::vector<Int64> element_auxiliary_memory;
std::vector<UInt8> data;
};

Expand Down
49 changes: 49 additions & 0 deletions dbms/src/Common/tests/gtest_mpmc_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -685,5 +685,54 @@ try
}
CATCH

TEST_F(MPMCQueueTest, AuxiliaryMemoryBound)
try
{
/// case 1: no auxiliary memory usage bound
size_t max_size = 10;
MPMCQueue<Int64> queue(max_size);
for (size_t i = 0; i < max_size; i++)
ASSERT_TRUE(queue.tryPush(i) == MPMCQueueResult::OK);
ASSERT_TRUE(queue.tryPush(max_size) == MPMCQueueResult::FULL);

/// case 2: less auxiliary memory bound than the capacity bound
size_t actual_max_size = 5;
Int64 auxiliary_memory_bound = sizeof(Int64) * actual_max_size;
MPMCQueue<Int64> queue_1(max_size, auxiliary_memory_bound, [](const Int64 &) { return sizeof(Int64); });
for (size_t i = 0; i < actual_max_size; i++)
ASSERT_TRUE(queue_1.tryPush(i) == MPMCQueueResult::OK);
ASSERT_TRUE(queue_1.tryPush(actual_max_size) == MPMCQueueResult::FULL);
Int64 value;
/// after pop one element, the queue can be pushed again
ASSERT_TRUE(queue_1.tryPop(value) == MPMCQueueResult::OK);
ASSERT_TRUE(queue_1.tryPush(actual_max_size) == MPMCQueueResult::OK);

/// case 3: less capacity bound than the auxiliary memory bound
auxiliary_memory_bound = sizeof(Int64) * (max_size * 10);
MPMCQueue<Int64> queue_2(max_size, auxiliary_memory_bound, [](const Int64 &) { return sizeof(Int64); });
for (size_t i = 0; i < max_size; i++)
ASSERT_TRUE(queue_2.tryPush(i) == MPMCQueueResult::OK);
ASSERT_TRUE(queue_2.tryPush(max_size) == MPMCQueueResult::FULL);

/// case 4, auxiliary memory bound <= 0 means unbounded for auxiliary memory usage
MPMCQueue<Int64> queue_3(max_size, 0, [](const Int64 &) { return 1024 * 1024; });
for (size_t i = 0; i < max_size; i++)
ASSERT_TRUE(queue_3.tryPush(i) == MPMCQueueResult::OK);
ASSERT_TRUE(queue_3.tryPush(max_size) == MPMCQueueResult::FULL);
MPMCQueue<Int64> queue_4(max_size, -1, [](const Int64 &) { return 1024 * 1024; });
for (size_t i = 0; i < max_size; i++)
ASSERT_TRUE(queue_4.tryPush(i) == MPMCQueueResult::OK);
ASSERT_TRUE(queue_4.tryPush(max_size) == MPMCQueueResult::FULL);

/// case 5 even if the element's auxiliary memory is out of bound, at least one element can be pushed
MPMCQueue<Int64> queue_5(max_size, 1, [](const Int64 &) { return 10; });
ASSERT_TRUE(queue_5.tryPush(1) == MPMCQueueResult::OK);
ASSERT_TRUE(queue_5.tryPush(2) == MPMCQueueResult::FULL);
ASSERT_TRUE(queue_5.tryPop(value) == MPMCQueueResult::OK);
ASSERT_TRUE(queue_5.tryPop(value) == MPMCQueueResult::EMPTY);
ASSERT_TRUE(queue_5.tryPush(1) == MPMCQueueResult::OK);
}
CATCH

} // namespace
} // namespace DB::tests

0 comments on commit f2abf0c

Please sign in to comment.