-
Notifications
You must be signed in to change notification settings - Fork 218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make Python Topology DSL generate Thrift structs #199
Conversation
When this is done, this will close #84. |
if not isinstance(parallelism, int) or parallelism < 1: | ||
|
||
class ComponentSpec(object): | ||
def __init__(self, component_cls, name=None, inputs=None, parallelism=1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we get a p=2
alias for parallelism=2
? My reasoning is, "parallelism" is a lot of keystrokes and is likely to be specified on every single spec call. Plus, :p
is what Clojure DSL uses. I know this is somewhat pedantic but think it would improve signal/noise ratio of topology definitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did @rduplain put you up to this? 😄
He said almost the same thing to me last week. In general, I despise one-letter arguments. If you had never used the Clojure DSL, would you guess that p
meant parallelism? Is par
a reasonable halfway point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
He didn't, but I guess we have similar taste 😁 I don't mind par
.
i didn't think about the one-letter argument issue -- that's true, those are somewhat despicable, I get you on that.
Just skimmed this really fast but is looking really awesome! Nice work, @dan-blanchard. Had a few small comments but mostly minor stuff. Now, get other people to help you by getting your talk done 😉 |
Oh, and in case someone wants to see an example of how much typing the DSL saves over the Thrift spec. 😄 In [4]: class WordCountTopology(Topology):
word_spout = ShellSpout.spec(command='python', script='words.py',
outputs=['word'])
counting_bolt = ShellBolt.spec(command='python', script='count.py',
inputs=[word_spout])
In [5]: WordCountTopology._topology
Out[5]: StormTopology(state_spouts={},
bolts={'counting_bolt':
Bolt(common=ComponentCommon(streams={},
parallelism_hint=1,
inputs={GlobalStreamId(componentId='word_spout',
streamId='default'):
Grouping(direct=None,
shuffle=NullStruct(),
custom_serialized=None,
none=None,
custom_object=None,
fields=None,
local_or_shuffle=None,
all=None)},
json_conf='{}'),
bolt_object=ComponentObject(java_object=None,
shell=ShellComponent(script='count.py',
execution_command='python'),
serialized_java=None))},
spouts={'word_spout':
SpoutSpec(common=ComponentCommon(streams={'default':
StreamInfo(output_fields=['word'],
direct=False)},
parallelism_hint=1,
inputs={},
json_conf='{}'),
spout_object=ComponentObject(java_object=None,
shell=ShellComponent(script='words.py',
execution_command='python'),
serialized_java=None))}) |
This reverts commit 544c3a8. The changes are no longer valid because of the topology DSL changes.
Can now import pretty much everything directly from streamparse if you want.
Previously the hashes for GlobalStreamIds could change unpredictably, which made using them as keys in dicts not work. Now they have a proper __hash__ method and we reinsert them into the inputs dict when we change something that changes their hash.
Also: - add some more to check for more exceptions. - fix check for invalid fields grouping
At this point, two things need to happen:
|
Because of a limitation of Storm's current thrift file, the number of arguments a particular ShellComponent can execute is limited to 2, so the previous approach of launching things with 'python -m streamparse.run foo.bar' was too long.
submit now works and we've got loads more tests. I'm going to merge tomorrow if there are no objections. |
import sys | ||
import time | ||
|
||
from invoke import run | ||
from six import string_types | ||
import simplejson as json |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still using simplejson? Why not use std lib json?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
simplejson
is much faster on Python 2, and slightly faster on Python 3. We need to use it for message deserialization, so we want it to be fast, and since we can assume people have it installed, might as well use it here too.
No objection from me. I added reactions as comments to the diff. Nice work. |
…mbus is unreachable
This also updates the DSL to make it a bit less verbose (not that anyone was using it before, since it wasn't actually fully functional).
This is not quite done yet. I just want it here as a point of reference. That said, if anyone sees anything horribly wrong with this approach, feel free to let me know now.