Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
Added example topologies in python (#1198)
Browse files Browse the repository at this point in the history
  • Loading branch information
Taishi Nojima authored and objmagic committed Aug 15, 2016
1 parent 6c2e6bb commit c2c3725
Show file tree
Hide file tree
Showing 15 changed files with 451 additions and 0 deletions.
37 changes: 37 additions & 0 deletions heron/examples/src/python/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package(default_visibility = ["//visibility:public"])

load("/tools/rules/pex_rules", "pex_library", "pex_binary")

pex_library(
name = "example_lib",
srcs = glob(['bolt/*.py', 'spout/*.py', 'misc/*.py']),
deps = [
'//heron/streamparse/src/python:pyheron_lib',
],
)

pex_library(
name = "pyheron_example_pkg",
srcs = glob(['**/*.py']),
deps = ['//heron/streamparse/src/python:pyheron_lib']
)

# with main method
pex_binary(
name = "word_count",
srcs = ["word_count_topology.py"],
deps = [":example_lib"],
)

# without main method
pex_binary(
name = "custom_grouping",
srcs = ["custom_grouping_topology.py"],
deps = [":example_lib"],
)

pex_binary(
name = "multi_stream",
srcs = ["multi_stream_topology.py"],
deps = [":example_lib"],
)
2 changes: 2 additions & 0 deletions heron/examples/src/python/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
"""Excample python topologies"""
__all__ = ['bolt', 'spout', 'misc', 'custom_grouping_topology', 'word_count_topology']
7 changes: 7 additions & 0 deletions heron/examples/src/python/bolt/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""example python spouts"""
__all__ = ['consume_bolt', 'count_bolt', 'half_ack_bolt', 'stream_aggregate_bolt']

from .consume_bolt import ConsumeBolt
from .count_bolt import CountBolt
from .half_ack_bolt import HalfAckBolt
from .stream_aggregate_bolt import StreamAggregateBolt
28 changes: 28 additions & 0 deletions heron/examples/src/python/bolt/consume_bolt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# copyright 2016 twitter. all rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''module for example bolt: Consume Bolt'''
from heron.streamparse.src.python import Bolt

# pylint: disable=unused-argument
class ConsumeBolt(Bolt):
def initialize(self, config, context):
self.logger.info("In prepare() of ConsumerBolt")
self.total = 0

def process(self, tup):
if self.is_tick(tup):
self.log("Got tick tuple!")
self.log("Total received data tuple: %d" % self.total)
else:
self.total += 1
41 changes: 41 additions & 0 deletions heron/examples/src/python/bolt/count_bolt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright 2016 Twitter. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""module for example bolt: CountBolt"""
from collections import Counter
from heron.streamparse.src.python import Bolt

class CountBolt(Bolt):
"""CountBolt"""
# output field declarer
#outputs = ['word', 'count']

def initialize(self, config, context):
self.logger.info("In prepare() of CountBolt")
self.counter = Counter()
self.total = 0

self.logger.info("Component-specific config: \n%s" % str(config))
self.logger.info("Context: \n%s" % str(context))

def _increment(self, word, inc_by):
self.counter[word] += inc_by
self.total += inc_by

def process(self, tup):
if self.is_tick(tup):
self.log("Got tick tuple!")
self.log("Current map: %s" % str(self.counter))
return
word = tup.values[0]
self._increment(word, 10 if word == "heron" else 1)
35 changes: 35 additions & 0 deletions heron/examples/src/python/bolt/half_ack_bolt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# copyright 2016 twitter. all rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''half ack bolt'''
from heron.streamparse.src.python import Bolt

class HalfAckBolt(Bolt):
"""Half of data tuples will be acked and the other half will be failed"""
# pylint: disable=unused-argument
def initialize(self, config, context):
self.total = 0

def process(self, tup):
if self.is_tick(tup):
self.log("Got tick tuple!")
self.log("Total received: %d" % self.total)
return

self.total += 1
if self.total % 2 == 0:
self.logger.debug("Failing a tuple: %s" % str(tup))
self.fail(tup)
else:
self.logger.debug("Acking a tuple: %s" % str(tup))
self.ack(tup)
29 changes: 29 additions & 0 deletions heron/examples/src/python/bolt/stream_aggregate_bolt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# copyright 2016 twitter. all rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''stream aggregator'''
from collections import Counter
from heron.streamparse.src.python import Bolt

# pylint: disable=unused-argument
class StreamAggregateBolt(Bolt):
"""Stream counts are aggregated"""
def initialize(self, config, context):
self.stream_counter = Counter()

def process(self, tup):
self.stream_counter[tup.stream] += 1

if self.is_tick(tup):
self.log("Got tick tuple!")
self.log("Current stream counter: %s" % str(self.stream_counter))
42 changes: 42 additions & 0 deletions heron/examples/src/python/custom_grouping_topology.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# copyright 2016 twitter. all rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''module for example topology: CustomGroupingTopology'''

from heron.common.src.python.utils.log import Log
from heron.streamparse.src.python import Topology, Grouping, ICustomGrouping, constants

from heron.examples.src.python.spout import WordSpout
from heron.examples.src.python.bolt import ConsumeBolt

# pylint: disable=unused-argument
class SampleCustomGrouping(ICustomGrouping):
def prepare(self, context, component, stream, target_tasks):
Log.info("In prepare of SampleCustomGrouping, "
"with src component: %s, "
"with stream id: %s, "
"with target tasks: %s"
% (component, stream, str(target_tasks)))
self.target_tasks = target_tasks

def choose_tasks(self, values):
# only emits to the first task id
return [self.target_tasks[0]]

class CustomGrouping(Topology):
custom_grouping_path = "heron.examples.src.python.custom_grouping_topology.SampleCustomGrouping"

word_spout = WordSpout.spec(par=1)
consume_bolt = ConsumeBolt.spec(par=3,
inputs={word_spout: Grouping.custom(custom_grouping_path)},
config={constants.TOPOLOGY_TICK_TUPLE_FREQ_SECS: 10})
4 changes: 4 additions & 0 deletions heron/examples/src/python/misc/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Miscellaneous example topology-related modules"""
__all__ = ['test_task_hook']

from .test_task_hook import TestTaskHook
61 changes: 61 additions & 0 deletions heron/examples/src/python/misc/test_task_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright 2016 Twitter. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''module for example task hook'''
from collections import Counter

from heron.common.src.python.utils.log import Log
from heron.common.src.python.utils.topology import ITaskHook

# pylint: disable=unused-argument
class TestTaskHook(ITaskHook):
"""TestTaskHook logs event information every 10000 times"""
CONST = 10000

def prepare(self, conf, context):
Log.info("In prepare of TestTaskHook")
self.counter = Counter()

# pylint: disable=no-self-use
def clean_up(self):
Log.info("In clean_up of TestTaskHook")

def emit(self, emit_info):
self.counter['emit'] += 1
if self.counter['emit'] % self.CONST == 0:
Log.info("TestTaskHook: emitted %s tuples" % str(self.counter['emit']))

def spout_ack(self, spout_ack_info):
self.counter['sp_ack'] += 1
if self.counter['sp_ack'] % self.CONST == 0:
Log.info("TestTaskHook: spout acked %s tuples" % str(self.counter['sp_ack']))

def spout_fail(self, spout_fail_info):
self.counter['sp_fail'] += 1
if self.counter['sp_fail'] % self.CONST == 0:
Log.info("TestTaskHook: spout failed %s tuples" % str(self.counter['sp_fail']))

def bolt_execute(self, bolt_execute_info):
self.counter['bl_exec'] += 1
if self.counter['bl_exec'] % self.CONST == 0:
Log.info("TestTaskHook: bolt executed %s tuples" % str(self.counter['bl_exec']))

def bolt_ack(self, bolt_ack_info):
self.counter['bl_ack'] += 1
if self.counter['bl_ack'] % self.CONST == 0:
Log.info("TestTaskHook: bolt acked %s tuples" % str(self.counter['bl_ack']))

def bolt_fail(self, bolt_fail_info):
self.counter['bl_fail'] += 1
if self.counter['bl_fail'] % self.CONST == 0:
Log.info("TestTaskHook: bolt failed %s tuples" % str(self.counter['bl_fail']))
29 changes: 29 additions & 0 deletions heron/examples/src/python/multi_stream_topology.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# copyright 2016 twitter. all rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''module for example topology: CustomGroupingTopology'''

from heron.streamparse.src.python import Topology, Grouping, constants

from heron.examples.src.python.spout import MultiStreamSpout
from heron.examples.src.python.bolt import CountBolt, StreamAggregateBolt

class MultiStream(Topology):
spout = MultiStreamSpout.spec(par=2)
count_bolt = CountBolt.spec(par=2,
inputs={spout: Grouping.fields('word')},
config={constants.TOPOLOGY_TICK_TUPLE_FREQ_SECS: 10})
stream_aggregator = StreamAggregateBolt.spec(par=1,
inputs={spout: Grouping.ALL,
spout['error']: Grouping.ALL},
config={constants.TOPOLOGY_TICK_TUPLE_FREQ_SECS: 15})
5 changes: 5 additions & 0 deletions heron/examples/src/python/spout/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""example python spouts"""
__all__ = ['multi_stream_spout', 'word_spout']

from .multi_stream_spout import MultiStreamSpout
from .word_spout import WordSpout
41 changes: 41 additions & 0 deletions heron/examples/src/python/spout/multi_stream_spout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright 2016 Twitter. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Word spout with error streams"""

from itertools import cycle
from heron.streamparse.src.python import Spout, Stream

class MultiStreamSpout(Spout):
"""WordSpout: emits a set of words repeatedly"""
# output field declarer
outputs = ['word', Stream(fields=['error_msg'], name='error')]

def initialize(self, config, context):
self.logger.info("In initialize() of WordSpout")
self.words = cycle(["hello", "bye", "good", "bad", "heron", "storm"])

self.emit_count = 0

self.logger.info("Component-specific config: \n%s" % str(config))
self.logger.info("Context: \n%s" % str(context))

def next_tuple(self):
word = next(self.words)
self.emit([word])
self.emit_count += 1

if self.emit_count % 100000 == 0:
self.logger.info("Emitted %s" % str(self.emit_count))
self.logger.info("Emitting to error stream")
self.emit(["test error message"], stream='error')
Loading

0 comments on commit c2c3725

Please sign in to comment.