Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support A Multiple Producer, Single Consumer Queue #2492

Merged
merged 1 commit into from
Jan 23, 2024

Conversation

chenBright
Copy link
Contributor

What problem does this PR solve?

Issue Number:

Problem Summary:

What is changed and the side effects?

Changed:

使用ExecutionQueue的相关技术实现一个多生产者单消费者的无锁队列。

Side effects:

  • Performance effects(性能影响):

  • Breaking backward compatibility(向后兼容性):


Check List:

  • Please make sure your changes are compilable(请确保你的更改可以通过编译).
  • When providing us with a new feature, it is best to add related tests(如果你向我们增加一个新的功能, 请添加相关测试).
  • Please follow Contributor Covenant Code of Conduct.(请遵循贡献者准则).

@chenBright chenBright changed the title Support A Multiple Producer, Single Consumer Wait-Free Queue Support A Multiple Producer, Single Consumer Queue Jan 4, 2024
@wwbmmm
Copy link
Contributor

wwbmmm commented Jan 9, 2024

LGTM

@wwbmmm wwbmmm merged commit 3bacab8 into apache:master Jan 23, 2024
18 checks passed
@chenBright chenBright deleted the mpsc_queue branch January 23, 2024 06:32
@chenBright chenBright mentioned this pull request Jan 25, 2024
jiangdongzi pushed a commit to jiangdongzi/brpc that referenced this pull request Jan 31, 2024
@yanglimingcn
Copy link
Contributor

@chenBright 这个对列,消费者是需要轮询吗?

@chenBright
Copy link
Contributor Author

需要主动调出队的接口

@yanglimingcn
Copy link
Contributor

如果对列没有内容,不会阻塞,需要轮询,或者自己实现一个唤醒的方式,是这样吧。

@chenBright
Copy link
Contributor Author

是的,没有同步机制。如果有需求,可以加一个同步机制吧。

@yanglimingcn
Copy link
Contributor

@chenBright PR里面我提了点问题,你有时间帮忙看看。

@chenBright
Copy link
Contributor Author

@chenBright PR里面我提了点问题,你有时间帮忙看看。

好像没看到

@yanglimingcn
Copy link
Contributor

@chenBright PR里面我提了点问题,你有时间帮忙看看。

好像没看到

_cur_enqueue_node看上去每次执行都要设置成NULL,这个是必要的吗?
是不是把 node = _cur_enqueue_node.load(memory_order_relaxed);换成
node = _cur_enqueue_node.exchange(NULL, memory_order_relaxed); 就可以了呢?
你们在实际场景用了吗?我这边有需求,打算用用。

@chenBright
Copy link
Contributor Author

chenBright commented Aug 19, 2024

确实没有必要,改成node = _cur_enqueue_node.exchange(NULL, memory_order_relaxed);就行。

之前排查问题的时候在profiler采样场景下用了。

@chenBright
Copy link
Contributor Author

如果对列没有内容,不会阻塞,需要轮询,或者自己实现一个唤醒的方式,是这样吧。

有同步的需求吗?

@yanglimingcn
Copy link
Contributor

确实没有必要,改成node = _cur_enqueue_node.exchange(NULL, memory_order_relaxed);就行。

之前排查问题的时候在profiler采样场景下用了。

那我来改一下也行。

@yanglimingcn
Copy link
Contributor

如果对列没有内容,不会阻塞,需要轮询,或者自己实现一个唤醒的方式,是这样吧。

有同步的需求吗?

我琢磨了一下,同步应该用户自己实现,在这个mpsc里面不好做,因为不同用户的判断逻辑会很不一样。

@chenBright
Copy link
Contributor Author

用户自己实现更灵活。

mpsc提供同步机制应该也不复杂,抽象一个同步基类作为mpsc模板参数,让用户传决定实现wait和wakeup(一般就是pthread实现和bthread实现,这两个可以由框架提供实现)。入队之后是队头且有waiter,就唤醒waiter。出队时没有数据就wait。

@yanglimingcn
Copy link
Contributor

yanglimingcn commented Aug 19, 2024

你实现的这个无锁队列,理论上性能很好的,不希望这个唤醒影响到性能。
我这边有这个唤醒需求的话,我根据自己的场景去实现,应该只需要一个原子变量就可以了,具体描述:
1、消费者消费队列的内容,直到返回false,在持续返回false一段时间,就会设置一个flag为,need_wakeup_flag。
2、生产者在往队列放内容的后,会根据need_wakeup_flag决定是否执行一次唤醒。
因为这个need_wakeup_flag不会被频繁设置,因此,这个原子变量的更新预期不会很频繁。

我建议这块不要做了,让用户自己去实现,因为场景不同,不太能很好的支持。比如,我们这边消费者可能是个异步的动作,判断消费者是否完成这些任务,不光要判断Dequeue是否返回false,还得有自己的逻辑判断那些异步请求是否都执行完了。

@yanglimingcn
Copy link
Contributor

@chenBright PR里面我提了点问题,你有时间帮忙看看。

好像没看到

_cur_enqueue_node看上去每次执行都要设置成NULL,这个是必要的吗? 是不是把 node = _cur_enqueue_node.load(memory_order_relaxed);换成 node = _cur_enqueue_node.exchange(NULL, memory_order_relaxed); 就可以了呢? 你们在实际场景用了吗?我这边有需求,打算用用。

#2739

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants