Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

channel sender / receiver disagree on content #834

Closed
Licenser opened this issue Jul 14, 2020 · 13 comments
Closed

channel sender / receiver disagree on content #834

Licenser opened this issue Jul 14, 2020 · 13 comments

Comments

@Licenser
Copy link
Contributor

Licenser commented Jul 14, 2020

Ahead of time let me apologize for this not ideal bug request since I couldn't manage to reproduce it outside of a rather complex application.

The issue: "randomly" (or under conditions I couldn't rack down yet) the sender/receiver of a channel disagree on the amount of messages in it.

My gut feeling says there is some kind of race condition I've not yet found but I'm going to file this issue in the hope that there is an "obvious" answer I'm missing :)

Basically what happens, and I've seen the same issue on two different code path, the sender thinks the channel is full, while the receiver think it's empty leading to the sender never sending another message and the receiver never being able to read a message.

This seems to trigger more often when using the merge() on two channels.

The code that triggers this has a layout of:

source -c1-> processor -c2-> dst -c3-> processor -c4-> source

where the processor prioritizes c3 over c2 and the source loop uses try_send to ensure even if c1 is full it can read from c4.

The issue manifests in that the c4 sender thinks c4 is full

[src/pipeline.rs:309] o.len() = 64

and the receiver thinks c4 is empty

[src/source.rs:364] self.rx.len() = 0

It is al worth noting that this only triggers in release mode, as long as each step is "slow enough" it seems not to manifest.

This is reproducible from https://github.com/wayfair-tremor/tremor-runtime/pull/new/async-channel-issue by running:

cargo build -p tremor-server --release;perf record ./bench/run real-workflow-throughput-json

I'll add a few lines of debug output below for the sake of having context.

[src/pipeline.rs:309] o.len() = 1
[src/source.rs:364] self.rx.len() = 0
tremor://localhost/pipeline/main/01/out: 61
[src/pipeline.rs:309] o.len() = 0
[src/pipeline.rs:309] o.len() = 0
[src/pipeline.rs:309] o.len() = 1
[src/pipeline.rs:309] o.len() = 1
[src/pipeline.rs:309] o.len() = 2
[src/pipeline.rs:309] o.len() = 3
[src/pipeline.rs:309] o.len() = 4
[src/pipeline.rs:309] o.len() = 5
[src/pipeline.rs:309] o.len() = 6
[src/pipeline.rs:309] o.len() = 7
[src/pipeline.rs:309] o.len() = 8
[src/pipeline.rs:309] o.len() = 9
[src/pipeline.rs:309] o.len() = 10
[src/pipeline.rs:309] o.len() = 11
[src/pipeline.rs:309] o.len() = 12
[src/pipeline.rs:309] o.len() = 13
[src/pipeline.rs:309] o.len() = 14
[src/pipeline.rs:309] o.len() = 15
[src/pipeline.rs:309] o.len() = 16
[src/pipeline.rs:309] o.len() = 17
[src/pipeline.rs:309] o.len() = 18
[src/pipeline.rs:309] o.len() = 19
[src/pipeline.rs:309] o.len() = 20
[src/pipeline.rs:309] o.len() = 21
[src/pipeline.rs:309] o.len() = 22
[src/pipeline.rs:309] o.len() = 23
[src/pipeline.rs:309] o.len() = 24
[src/pipeline.rs:309] o.len() = 25
[src/pipeline.rs:309] o.len() = 26
[src/pipeline.rs:309] o.len() = 27
[src/pipeline.rs:309] o.len() = 28
[src/pipeline.rs:309] o.len() = 29
[src/pipeline.rs:309] o.len() = 30
[src/pipeline.rs:309] o.len() = 31
[src/pipeline.rs:309] o.len() = 32
[src/pipeline.rs:309] o.len() = 33
[src/pipeline.rs:309] o.len() = 34
[src/pipeline.rs:309] o.len() = 35
[src/pipeline.rs:309] o.len() = 36
[src/pipeline.rs:309] o.len() = 37
[src/pipeline.rs:309] o.len() = 38
[src/pipeline.rs:309] o.len() = 39
[src/pipeline.rs:309] o.len() = 40
[src/pipeline.rs:309] o.len() = 41
[src/pipeline.rs:309] o.len() = 42
[src/pipeline.rs:309] o.len() = 43
[src/pipeline.rs:309] o.len() = 44
[src/pipeline.rs:309] o.len() = 45
[src/pipeline.rs:309] o.len() = 46
[src/pipeline.rs:309] o.len() = 47
[src/pipeline.rs:309] o.len() = 48
[src/pipeline.rs:309] o.len() = 49
[src/pipeline.rs:309] o.len() = 50
[src/pipeline.rs:309] o.len() = 51
[src/pipeline.rs:309] o.len() = 52
[src/pipeline.rs:309] o.len() = 53
[src/pipeline.rs:309] o.len() = 54
[src/pipeline.rs:309] o.len() = 55
[src/pipeline.rs:309] o.len() = 56
[src/pipeline.rs:309] o.len() = 57
[src/pipeline.rs:309] o.len() = 58
[src/pipeline.rs:309] o.len() = 59
[src/pipeline.rs:309] o.len() = 60
[src/pipeline.rs:309] o.len() = 61
[src/pipeline.rs:309] o.len() = 62
[src/pipeline.rs:309] o.len() = 63
[src/pipeline.rs:309] o.len() = 64
[src/source.rs:364] self.rx.len() = 0
tremor://localhost/pipeline/main/01/out: 64
@ghost
Copy link

ghost commented Jul 14, 2020

I'm believe #827 is very likely to solve this issue. cc @dignifiedquire

Until then, you can use the async-channel crate.

@Licenser
Copy link
Contributor Author

Heya I threw in async-channel and I do see the same issue :( - I'll do some more digging perhaps I find something interesting (fingers crossed)

@ghost
Copy link

ghost commented Jul 14, 2020

@Licenser I'm a bit confused - how can the sender and receiver side of the same channel report different .len() if they do exactly the same thing? Receiver::len() is equivalent to Sender::len() - look:

Are you sure o and self.rx are two ends of the same channel?

@Licenser
Copy link
Contributor Author

Ja I went through that I'm super confused too, I'm about 95% sure then again there is 5% of me being just silly and missing something super obvious. That's totally possible but after 3 days of hunting this I figured I look at the possibility of another cause, especially as I saw the same a different combination of channels too.

I'll go dig more into this tomorrow. I'm completely puzzled by this.

Next thing I came up with is mem::transmuting the sender/receiver to something I can expect and comparing the pointers in the Arc to be really the same and things getting mixed up somewhere.

@Licenser
Copy link
Contributor Author

I've run this in debug and it looks like they're the same

both times the debugger says for the Arc pointer:

self.rx:

{pointer:0x00007ffef7164b00}

o:

{pointer:0x00007ffef7164b00}

I couldn't easily get this in release mode yet as the compiler is doing so much magic that I lose access to the inspection capabilites

@Licenser
Copy link
Contributor Author

I added printing of the arc pointer to debug of sender and receiver and the log starts hanging with the last few entries attached below.

I'm about 99% sure now there is a race condition somewhere in the channels since replacing the sending code:

            if let Err(e) = o.send(onramp::Msg::Cb(cb, insight.id.clone())).await {
                error!("[Pipeline] failed to send to onramp: {} {:?}", e, &o);
                task::sleep(std::time::Duration::from_secs(1)).await
            }

with:

            while let Err(e) = o.try_send(onramp::Msg::Cb(cb, insight.id.clone())) {
                error!("[Pipeline] failed to send to onramp: {} {:?}", e, &o);
                task::sleep(std::time::Duration::from_secs(1)).await
            }

resolves the issue that indicates to me (and I might be wrong here so it's not 100% :P) that send misses the update of "now you can progress" in some case while the try_send loop refreshes the state of the channel and gets the correct data. I imagine something along the following lines (hypothetical):


(channel)  > is full
(sender)   > checks full
(receiver) > starts reading
(receiver) > empties channel
(sender)   > registers callback on size change
(channel)  > never changes size since it's empty
[src/pipeline.rs:311] &o = Sender { .. } (0x7fa9ae091680: 0)
[src/pipeline.rs:311] &o = Sender { .. } (0x7fa9ae091680: 0)
[src/pipeline.rs:311] &o = Sender { .. } (0x7fa9ae091680: 0)
[src/pipeline.rs:311] &o = Sender { .. } (0x7fa9ae091680: 1)
[src/pipeline.rs:311] &o = Sender { .. } (0x7fa9ae091680: 2)
[src/pipeline.rs:311] &o = Sender { .. } (0x7fa9ae091680: 3)
[src/pipeline.rs:311] &o = Sender { .. } (0x7fa9ae091680: 4)
[src/pipeline.rs:311] &o = Sender { .. } (0x7fa9ae091680: 5)
[src/pipeline.rs:311] &o = Sender { .. } (0x7fa9ae091680: 6)
[src/pipeline.rs:311] &o = Sender { .. } (0x7fa9ae091680: 7)
[src/pipeline.rs:311] &o = Sender { .. } (0x7fa9ae091680: 8)
[src/pipeline.rs:311] &o = Sender { .. } (0x7fa9ae091680: 9)
[src/pipeline.rs:311] &o = Sender { .. } (0x7fa9ae091680: 10)
[src/pipeline.rs:311] &o = Sender { .. } (0x7fa9ae091680: 11)
[src/pipeline.rs:311] &o = Sender { .. } (0x7fa9ae091680: 12)
[src/pipeline.rs:311] &o = Sender { .. } (0x7fa9ae091680: 13)
[src/pipeline.rs:311] &o = Sender { .. } (0x7fa9ae091680: 14)
[src/pipeline.rs:311] &o = Sender { .. } (0x7fa9ae091680: 15)
[src/source.rs:363] &self.rx = Receiver { .. } (0x7fa9ae091680: 15)
[src/source.rs:363] &self.rx = Receiver { .. } (0x7fa9ae091680: 0)
[src/source.rs:363] &self.rx = Receiver { .. } (0x7fa9ae091680: 0)
[src/source.rs:363] &self.rx = Receiver { .. } (0x7fa9ae091680: 0)

@Licenser
Copy link
Contributor Author

update:

Changing

https://github.com/stjepang/async-channel/blob/master/src/lib.rs#L242

to

YieldNow(false).await;
l.wait_timeout(std::time::Duration::from_micros(1));

also resolves the issue. So I think everything points to some issue with the notifications

@Licenser
Copy link
Contributor Author

Okay, next update, I reduced the reproducibility to a single line change: tremor-rs/tremor-runtime@d3ce267

This seems to happen when there is a try_recv loop without an explicit yield so it seems that prevents the sender to be notified.

@Licenser
Copy link
Contributor Author

And good news with all those things found out I managed to create a minimal reproduction case:

https://github.com/Licenser/loop-test

@ghost
Copy link

ghost commented Jul 24, 2020

@Licenser I took a look at loop-test and found:

task::spawn(async move {
    loop {
        if let Ok(m) = off_rx.try_recv() {
            r_tx.send(m).await;
        }
    }
});

I think this is a core issue - the loop is spinning with try_recv() until it finds a message. If you replace the code with the following, the deadlock is gone:

task::spawn(async move {
    while let Ok(m) = off_rx.recv().await {
        r_tx.send(m).await;
    }
});

The current executor in async-std works under the assumption that futures always yield in a reasonable amount of time. The new scheduler in #836 assumes that some features might be 'bad' and loop forever, so it more aggressively wakes up worker threads.

Can you perhaps try your original code with the #836 branch and see if that resolves the issue?

@Licenser
Copy link
Contributor Author

#836 does solve the issue for cores > tasks (while sad it makes sense that it won't solve it for tasks > cores)

@ghost
Copy link

ghost commented Jul 27, 2020

while sad it makes sense that it won't solve it for tasks > cores

But that's kind of an impossible problem to solve, right? :)

@Licenser
Copy link
Contributor Author

Ja that's why it makes sense 😂 since there is a PR I should probably close this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant