Skip to content

Commit

Permalink
Support rvalue task in execution_queue_execute (apache#2308)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenBright authored and Yang Liming committed Oct 31, 2023
1 parent bd38aec commit 0618247
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 14 deletions.
38 changes: 27 additions & 11 deletions src/bthread/execution_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ friend class ExecutionQueueBase;
// more tasks and you can safely release all the related resources ever
// after.
bool is_queue_stopped() const { return _is_stopped; }
operator bool() const;
explicit operator bool() const;
protected:
TaskIteratorBase(TaskNode* head, ExecutionQueueBase* queue,
bool is_stopped, bool high_priority)
Expand Down Expand Up @@ -120,7 +120,7 @@ struct TaskOptions {
// If |in_place_if_possible| is true, execution_queue_execute would call
// execute immediately instead of starting a bthread if possible
//
// Note: Running callbacks in place might cause the dead lock issue, you
// Note: Running callbacks in place might cause the deadlock issue, you
// should be very careful turning this flag on.
//
// Default: false
Expand Down Expand Up @@ -151,10 +151,10 @@ struct ExecutionQueueOptions {
Executor * executor;
};

// Start a ExecutionQueue. If |options| is NULL, the queue will be created with
// Start an ExecutionQueue. If |options| is NULL, the queue will be created with
// the default options.
// Returns 0 on success, errno otherwise
// NOTE: type |T| can be non-POD but must be copy-constructible
// NOTE: type |T| can be non-POD but must be copy-constructive
template <typename T>
int execution_queue_start(
ExecutionQueueId<T>* id,
Expand All @@ -168,17 +168,17 @@ int execution_queue_start(
// - The executor will call |execute| with TaskIterator::is_queue_stopped() being
// true exactly once when all the pending tasks have been executed, and after
// this point it's ok to release the resource referenced by |meta|.
// Returns 0 on success, errno othrwise
// Returns 0 on success, errno otherwise.
template <typename T>
int execution_queue_stop(ExecutionQueueId<T> id);

// Wait until the the stop task (Iterator::is_queue_stopped() returns true) has
// Wait until the stop task (Iterator::is_queue_stopped() returns true) has
// been executed
template <typename T>
int execution_queue_join(ExecutionQueueId<T> id);

// Thread-safe and Wait-free.
// Execute a task with defaut TaskOptions (normal task);
// Execute a task with default TaskOptions (normal task);
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id,
typename butil::add_const_reference<T>::type task);
Expand All @@ -187,7 +187,7 @@ int execution_queue_execute(ExecutionQueueId<T> id,
// Execute a task with options. e.g
// bthread::execution_queue_execute(queue, task, &bthread::TASK_OPTIONS_URGENT)
// If |options| is NULL, we will use default options (normal task)
// If |handle| is not NULL, we will assign it with the hanlder of this task.
// If |handle| is not NULL, we will assign it with the handler of this task.
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id,
typename butil::add_const_reference<T>::type task,
Expand All @@ -198,6 +198,22 @@ int execution_queue_execute(ExecutionQueueId<T> id,
const TaskOptions* options,
TaskHandle* handle);


template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id,
T&& task);

template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id,
T&& task,
const TaskOptions* options);

template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id,
T&& task,
const TaskOptions* options,
TaskHandle* handle);

// [Thread safe and ABA free] Cancel the corresponding task.
// Returns:
// -1: The task was executed or h is an invalid handle
Expand All @@ -210,15 +226,15 @@ int execution_queue_cancel(const TaskHandle& h);
// ExecutionQueue
//
// |execution_queue_execute| internally fetches a reference of ExecutionQueue at
// the begining and releases it at the end, which makes 2 additional cache
// the beginning and releases it at the end, which makes 2 additional cache
// updates. In some critical situation where the overhead of
// execution_queue_execute matters, you can avoid this by addressing the
// reference at the begining of every producer, and execute tasks execatly
// reference at the beginning of every producer, and execute tasks execatly
// through the reference instead of id.
//
// Note: It makes |execution_queue_stop| a little complicated in the user level,
// as we don't pass the `stop task' to |execute| until no one holds any reference.
// If you are not sure about the ownership of the return value (which releasees
// If you are not sure about the ownership of the return value (which releases
// the reference of the very ExecutionQueue in the destructor) and don't that
// care the overhead of ExecutionQueue, DON'T use this function
template <typename T>
Expand Down
43 changes: 40 additions & 3 deletions src/bthread/execution_queue_inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,16 @@ friend class TaskIterator<T>;

int execute(typename butil::add_const_reference<T>::type task,
const TaskOptions* options, TaskHandle* handle) {
return execute(std::forward<T>(const_cast<T&>(task)), options, handle);
}


int execute(T&& task) {
return execute(std::forward<T>(task), NULL, NULL);
}

int execute(T&& task,
const TaskOptions* options, TaskHandle* handle) {
if (stopped()) {
return EINVAL;
}
Expand All @@ -302,7 +312,7 @@ friend class TaskIterator<T>;
return_task_node(node);
return ENOMEM;
}
new (mem) T(task);
new (mem) T(std::forward<T>(task));
node->stop_task = false;
TaskOptions opt;
if (options) {
Expand Down Expand Up @@ -356,7 +366,7 @@ inline int execution_queue_execute(ExecutionQueueId<T> id,
typename butil::add_const_reference<T>::type task,
const TaskOptions* options,
TaskHandle* handle) {
typename ExecutionQueue<T>::scoped_ptr_t
typename ExecutionQueue<T>::scoped_ptr_t
ptr = ExecutionQueue<T>::address(id);
if (ptr != NULL) {
return ptr->execute(task, options, handle);
Expand All @@ -365,6 +375,33 @@ inline int execution_queue_execute(ExecutionQueueId<T> id,
}
}

template <typename T>
inline int execution_queue_execute(ExecutionQueueId<T> id,
T&& task) {
return execution_queue_execute(id, std::forward<T>(task), NULL);
}

template <typename T>
inline int execution_queue_execute(ExecutionQueueId<T> id,
T&& task,
const TaskOptions* options) {
return execution_queue_execute(id, std::forward<T>(task), options, NULL);
}

template <typename T>
inline int execution_queue_execute(ExecutionQueueId<T> id,
T&& task,
const TaskOptions* options,
TaskHandle* handle) {
typename ExecutionQueue<T>::scoped_ptr_t
ptr = ExecutionQueue<T>::address(id);
if (ptr != NULL) {
return ptr->execute(std::forward<T>(task), options, handle);
} else {
return EINVAL;
}
}

template <typename T>
inline int execution_queue_stop(ExecutionQueueId<T> id) {
typename ExecutionQueue<T>::scoped_ptr_t
Expand Down Expand Up @@ -518,7 +555,7 @@ inline int ExecutionQueueBase::dereference() {
butil::memory_order_acquire,
butil::memory_order_relaxed)) {
_on_recycle();
// We don't return m immediatly when the reference count
// We don't return m immediately when the reference count
// reaches 0 as there might be in processing tasks. Instead
// _on_recycle would push a `stop_task' after which is executed
// m would be finally returned and reset
Expand Down
51 changes: 51 additions & 0 deletions test/bthread_execution_queue_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,57 @@ TEST_F(ExecutionQueueTest, single_thread) {
ASSERT_TRUE(stopped);
}

class RValue {
public:
RValue() : _value(0) {}
explicit RValue(int v) : _value(v) {}
RValue(RValue&& rhs) noexcept : _value(rhs._value) {}
RValue& operator=(RValue&& rhs) noexcept {
if (this != &rhs) {
_value = rhs._value;
}
return *this;
}

DISALLOW_COPY_AND_ASSIGN(RValue);

int value() const { return _value; }


private:
int _value;
};

int add(void* meta, bthread::TaskIterator<RValue> &iter) {
stopped = iter.is_queue_stopped();
int* result = (int*)meta;
for (; iter; ++iter) {
*result += iter->value();
}
return 0;
}

TEST_F(ExecutionQueueTest, rvalue) {
int64_t result = 0;
int64_t expected_result = 0;
stopped = false;
bthread::ExecutionQueueId<RValue> queue_id;
bthread::ExecutionQueueOptions options;
ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
add, &result));
for (int i = 0; i < 100; ++i) {
expected_result += i;
RValue v(i);
ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, std::move(v)));
}
LOG(INFO) << "stop";
ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
ASSERT_NE(0, bthread::execution_queue_execute(queue_id, RValue(0)));
ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
ASSERT_EQ(expected_result, result);
ASSERT_TRUE(stopped);
}

struct PushArg {
bthread::ExecutionQueueId<LongIntTask> id {0};
butil::atomic<int64_t> total_num {0};
Expand Down

0 comments on commit 0618247

Please sign in to comment.