Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add force_send method to channel Sender #1135

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

bergkvist
Copy link

@bergkvist bergkvist commented Sep 13, 2024

Add a force_push method to the Sender part of a channel.

In particular, this is useful for bounded channels of non-zero size.

Motivation:

Design goals:

  • It is fine to lose messages, but messages should never appear in a different order from which they were sent
  • For bounded (non-zero-sized) channels, force_send should never fail (except when the receiver has been dropped)
  • Should not be blocking
use crossbeam_channel::{bounded, ForceSendError};

let (s, r) = bounded(3);

assert_eq!(s.force_send(0), Ok(None));
assert_eq!(s.force_send(1), Ok(None));
assert_eq!(s.force_send(2), Ok(None));
assert_eq!(s.force_send(3), Ok(Some(0)));

assert_eq!(r.recv(), Ok(1));

assert_eq!(s.force_send(4), Ok(None));

assert_eq!(r.recv(), Ok(2));
assert_eq!(r.recv(), Ok(3));

assert_eq!(s.force_send(5), Ok(None));
assert_eq!(s.force_send(6), Ok(None));
assert_eq!(s.force_send(7), Ok(Some(4)));
assert_eq!(s.force_send(8), Ok(Some(5)));

assert_eq!(r.recv(), Ok(6));
assert_eq!(r.recv(), Ok(7));
assert_eq!(r.recv(), Ok(8));

drop(r);

assert_eq!(s.force_send(9), Err(ForceSendError(9)));

/// the channel is full. The returned error contains the original message.
///
/// If called on a zero-capacity channel, this method will send the message only if there
/// happens to be a receive operation on the other side of the channel at the same time.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear from the description what would happen if there is no receive operation. Would the call fail (despite what the start of the comment says)? Would the data be returned to the caller?

@proski
Copy link

proski commented Feb 4, 2025

The "lossy channel" proposal is about removal the oldest element rather than the latest:

By "lossy channel", I mean one that discards old data when the buffer is full

Removing the latest element makes it impossible to force send multiple messages unless the reader reads a message at the right time. That's very limiting and unreliable. The "force" is one element deep.

Figuratively speaking, users expect a bulldozer, and you give them a chisel.

Copy link
Member

@taiki-e taiki-e left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I would prefer that force_push in crossbeam_channel have the same semantics as force_push in crossbeam_queue::ArrayQueue (#789):

let q = crossbeam_queue::ArrayQueue::new(2);

assert_eq!(q.force_push(10), None);
assert_eq!(q.force_push(20), None);
assert_eq!(q.force_push(30), Some(10));

As said in #400 (comment), it should be able to be implemented by porting #789.

@bergkvist
Copy link
Author

bergkvist commented Feb 17, 2025

I agree that removing the oldest entry is more desirable than removing the most recent one.

I've been putting some more thought into this, and force_send seems to be tricky compared to try_send.

Consider the following, a bounded(6) channel (H=head (read index), T=tail (write index)):

step  next state        op
       
        H
                    T
1     [ 0, 1, 2, 3, _, _ ]   force_send <- 3
      
        H
                       T
2     [ 0, 1, 2, 3, 4, _ ]   force_send <- 4
      
           H
                       T
3     [ _, 1, 2, 3, 4, _ ]   recv -> 0
      
           H
        T              
4     [ _, 1, 2, 3, 4, 5 ]   force_send <- 1
              
           H
           T            
5     [ 6, 1, 2, 3, 4, 5 ]   force_send <- 6
              
              H
              T            
6     [ 6, 7, 2, 3, 4, 5 ]   force_send <- 7 (removes/returns 1)
      
                 H
              T            
7     [ 6, 7, _, 3, 4, 5 ]   recv -> 2

Notice that force_send only has different behavior from try_send in step 6 (where the channel is full), where try_send will fail, but force_send should overwrite the oldest value, updating both H and T indices at the same time.

And here lies the problem, H and T both are atomic usizes, meaning we can ensure atomicity when updating one of them using compare_exchange_weak(..), but since we need to update both of them at the same time, we have a problem (on x86 we have https://docs.rs/mwcas/latest/mwcas/).

…recent one first

Note that this is a very naive implementation of force_send, that works when used within a single thread, but can easily fail when multiple threads/cores are involved.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

3 participants