Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadcast message to all worker nodes while in closure of custom binary frontier operator #583

Open
cygithub54 opened this issue Sep 12, 2024 · 1 comment

Comments

@cygithub54
Copy link

cygithub54 commented Sep 12, 2024

Hey, I'm new to timely and I have a question regarding broadcasting & binary frontiers. I have a custom defined binary_frontier stream operator that is in feedback with itself and runs for some arbitrary amount of iterations. Currently, the ParellizationContracts that I have passed into this operator are Exchange pacts, and route feedback messages based off an exchange ID integer defined on the message payload. However, given some condition, sometimes I want these messages to be sent to all active worker nodes. Currently I can implement this simply by checking the condition on each message, manually cloning messages intended to be global for each worker node, and then giving each copy to the session for the feedback stream. However, this feels a bit unwieldly so I was wondering if there's a way to do this a bit more idiomatically. I've looked into creating a new stream for the global messages and calling the broadcast() operator on it, but it doesn't seem like there's a way to create a new stream object within the binary frontier closure (running into lifetime issues with the required scope variable for the to_stream function). Alternatively, is there a way to get this desired global messaging behavior by modifying the passed in ParalellizationContracts?

@antiguru
Copy link
Member

However, this feels a bit unwieldly so I was wondering if there's a way to do this a bit more idiomatically.

That is the best approach at the moment. Timely's progress tracking is based on counting messages, and at the moment, each datum sent downstream must be consumed exactly once for counts to add up. A broadcast channel would violate this constraint because it would receive one datum and produce multiple outputs. This is not a fundamental limitation of Timely, but the benefits of a special implementation versus cloning the data eagerly aren't there; the data needs to be cloned anyway.

If you're interested, here's a commit that replaced a special broadcast operator with the current implementation: ca40906

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants