Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add reset_stream to accumulate #249

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

CJ-Wright
Copy link
Member

One issue I've run into is resetting the state of accumulate nodes.
The current operating procedure is if there is a signal to clear the accumulate node, that it should send data to a sink which resets the acummulate node's state.
However this causes "spooky action at a distance" where two nodes which are completely unrelated by the graph have a very close relationship, making the structure of the pipeline more difficult to understand (naming credit to tacaswell).

This PR fixes this by providing a dedicated stream via a kwarg.
When this stream reciveds data then the accumulate node's state is reset.
Since this stream is formally tracked by the node upstreams it is properly recoreded in the visualized graph and behaves properly when connected or disconnected.

@martindurant
Copy link
Member

(without looking at the code) can you comment on the connection between this and things like zip_latest, where different input streams have different roles and (in your other PR) possibly different representations? I agree that many nodes may have a concept of "reset" which will mean different things in context - often rerunning code that currently lives in __init__ functions. Packing this in with the input streams of a node may be confusing and problematic, as you say.

@CJ-Wright CJ-Wright self-assigned this May 8, 2019
@CJ-Wright
Copy link
Member Author

Overall I think incoming edges meaning different things to a node is a good thing. It provides a nice level of flexibility which would be difficult to come by otherwise and would potentially create confusion. For example I think

s1 = Stream()
s2 = Stream()
s1.accumulate(operator.add, reset_stream=s2).sink(print)

is more readable than

s1 = Stream()
s2 = Stream()
s3 = s1.accumulate(operator.add)
s3.sink(print)
s2.sink(lambda x: s3.state=no_default)

In the implementation I think everything will need to be stored in self.upstreams since this is the way that the system knows about itself and passes around data. The nodes themselves offer a nice way to handle different nodes doing different things, since they all have who in their update methods which provide the identity of the edge providing the data.

zip_latest might not be a good example. I don't use it in my production pipelines because it has issues with state storage and resetting. Maybe combine_latest should have been written to be more explicit about what edges are emit_on but then that could have made the interface a bit rougher.

@martindurant
Copy link
Member

Certainly agree that your second snippet there is hard to read; in fact, it took me a moment to realise what it was doing at all.

I really want to spend some time thinking about how this could be generalised, though. There are a number of streamz types that could be "reset", and some others that might also take a different control input (like emit_on and flush); while some like map are always purely functional. This also gets at problem of graphing such relationships (normals inputs versus control inputs).

@CJ-Wright
Copy link
Member Author

I like the idea of having different edge properties for the visualization of control edges (eg dotted or dashed or colored).

I need to think about the generalization. Maybe have a super class which has a clear method which can be overridden as needed? I need to think/prototype around this more since I'd like to find corner cases where reset produces odd behavior.

@martindurant do you want the generalization in this PR or can it go into a separate one?

@martindurant martindurant mentioned this pull request Jun 24, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants