Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document how to handle cycles in Topology #339

Closed
colingabr opened this issue Nov 28, 2016 · 1 comment
Closed

Document how to handle cycles in Topology #339

colingabr opened this issue Nov 28, 2016 · 1 comment

Comments

@colingabr
Copy link

In streamparse 3.2 it doesn't look like there is first class support for cycles as there was with the clojure dsl.

If you are to attempt to define a cycle in the naive way, you will reference a variable before it is defined because declaration of inputs happens at class instantiation time. this works for all graphs that can be topologically sorted:

	word_spout = WordSpout.spec()
	count_bolt = WordCountBolt.spec(inputs={word_spout: Grouping.fields('word')}, par=1)

	a_bolt = A.spec(name="A", inputs=[count_bolt["count->A"], b_bolt["B->A"]]) # here this fails because b_bolt is not defined
	b_bolt = B.spec(name="B", inputs=[c_bolt["C->B"]])
	c_bolt = C.spec(name="C", inputs=[a_bolt["A->C"]])

The current solution therefore is to init each bolt and then extend inputs:

def extend_inputs(bolt, inputs):
	additions = bolt._sanitize_inputs(inputs)
	bolt.inputs = {**additions, **bolt.inputs} # only python >3.5
	bolt.common = ComponentCommon(inputs=bolt.inputs, streams=bolt.outputs, parallelism_hint=bolt.par, json_conf=bolt.config)

class WordCount(Topology):
	word_spout = WordSpout.spec()
	count_bolt = WordCountBolt.spec(inputs={word_spout: Grouping.fields('word')}, par=1)
	a_bolt = A.spec(name="A", inputs=[count_bolt["count->A"]])
	b_bolt = B.spec(name="B")
	c_bolt = C.spec(name="C")

	extend_inputs(a_bolt, [b_bolt["B->A"]])
# and so on...

Is there a better way to do this currently?

@dan-blanchard
Copy link
Member

It's currently undocumented, but the workaround for this is to do:

from streamparse.thrift import storm_thrift
from storm_thrift import GlobalStreamId  # You have to do this in two lines because  
                                         # storm_thrift is generated by thriftpy on import

class WordCount(Topology):
	word_spout = WordSpout.spec()
	count_bolt = WordCountBolt.spec(inputs={word_spout: Grouping.fields('word')}, par=1)
        b_stream = GlobalStreamId(componentId='b_bolt', streamId='B->A')
	a_bolt = A.spec(name="A", inputs=[count_bolt["count->A"], b_stream])
	b_bolt = B.spec(name="B", inputs=[c_bolt["C->B"]])
	c_bolt = C.spec(name="C", inputs=[a_bolt["A->C"]])

This works because ComponentSpec.__getitem__ (which is used whenever you do b_bolt['B->A'] in the topology DSL) just returns a GlobalStreamId with the component ID and stream ID set for you automatically.

Side note: For a real topology, don't really need to use names like B->A for your streams, especially if they all only go to one component. Every component has a default stream called 'default', and unless your component has multiple output streams that produce different content, it's really unnecessary.

@dan-blanchard dan-blanchard reopened this Nov 30, 2016
@dan-blanchard dan-blanchard changed the title Cycles in Topology Document how to handle cycles in Topology Nov 30, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants