Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka-python consumer used in streamparse spout not work, and throw timeout #447

Open
lusonzeng opened this issue Aug 24, 2018 · 6 comments

Comments

@lusonzeng
Copy link

pyspout.py:

from streamparse import Spout

from kafka import KafkaConsumer

class PySpout(Spout):

    outputs = ['word']

    def initialize(self, stormconf, context):

        self.consumer = KafkaConsumer('test',

                                 group_id='my-group',

                                 bootstrap_servers=['localhost:9092'])

    def next_tuple(self):

        for message in self.consumer:

            self.emit([message.value])```



wordcount.py(bolt):

```python
import os

from collections import Counter

from streamparse import Bolt

class WordCountBolt(Bolt):

    outputs = ['word', 'count']

    def initialize(self, conf, ctx):

        self.counter = Counter()

        self.pid = os.getpid()

        self.total = 0

    def _increment(self, word, inc_by):

        self.counter[word] += inc_by

        self.total += inc_by

    def process(self, tup):

        word = tup.values[0]

        self._increment(word, 10 if word == "dog" else 1)

        if self.total % 1000 == 0:

            self.logger.info("counted [{:,}] words [pid={}]".format(self.total, self.pid))

        self.emit([word, self.counter[word]])```


wordcount.py(topology):

```python
class WordCount(Topology):

    new_spout = PySpout.spec()

    new_bolt = WordCountBolt.spec(inputs={new_spout: Grouping.fields('word')}, par=2)```

after excute: sparse run, then get no data and after a while thow timeout
```bash
5669 [refresh-active-timer] INFO  o.a.s.d.worker - All connections are ready for worker 29a4867e-ce62-4a92-bdae-32a8d28d4b60:1024 with id 85db4c94-fa26-41bc-9ffe-7fc2a15ff0f0
5686 [Thread-22-__acker-executor[1 1]] INFO  o.a.s.d.executor - Preparing bolt __acker:(1)
5688 [Thread-24-__system-executor[-1 -1]] INFO  o.a.s.d.executor - Preparing bolt __system:(-1)
5693 [Thread-24-__system-executor[-1 -1]] INFO  o.a.s.d.executor - Prepared bolt __system:(-1)
5696 [Thread-22-__acker-executor[1 1]] INFO  o.a.s.d.executor - Prepared bolt __acker:(1)
5700 [Thread-26-new_spout-executor[4 4]] INFO  o.a.s.d.executor - Opening spout new_spout:(4)
5703 [Thread-26-new_spout-executor[4 4]] INFO  o.a.s.u.ShellProcess - Storm multilang serializer: org.apache.storm.multilang.JsonSerializer
5760 [Thread-18-new_bolt-executor[2 2]] INFO  o.a.s.d.executor - Preparing bolt new_bolt:(2)
5761 [Thread-18-new_bolt-executor[2 2]] INFO  o.a.s.u.ShellProcess - Storm multilang serializer: org.apache.storm.multilang.JsonSerializer
5772 [Thread-20-new_bolt-executor[3 3]] INFO  o.a.s.d.executor - Preparing bolt new_bolt:(3)
5773 [Thread-20-new_bolt-executor[3 3]] INFO  o.a.s.u.ShellProcess - Storm multilang serializer: org.apache.storm.multilang.JsonSerializer
5947 [Thread-26-new_spout-executor[4 4]] INFO  o.a.s.s.ShellSpout - Launched subprocess with pid 87117
5951 [Thread-18-new_bolt-executor[2 2]] INFO  o.a.s.t.ShellBolt - Launched subprocess with pid 87124
5952 [Thread-18-new_bolt-executor[2 2]] INFO  o.a.s.t.ShellBolt - Start checking heartbeat...
5953 [Thread-26-new_spout-executor[4 4]] INFO  o.a.s.d.executor - Opened spout new_spout:(4)
5953 [Thread-18-new_bolt-executor[2 2]] INFO  o.a.s.d.executor - Prepared bolt new_bolt:(2)
5955 [Thread-26-new_spout-executor[4 4]] INFO  o.a.s.d.executor - Activating spout new_spout:(4)
5955 [Thread-26-new_spout-executor[4 4]] INFO  o.a.s.s.ShellSpout - Start checking heartbeat...
5964 [Thread-20-new_bolt-executor[3 3]] INFO  o.a.s.t.ShellBolt - Launched subprocess with pid 87132
5965 [Thread-20-new_bolt-executor[3 3]] INFO  o.a.s.t.ShellBolt - Start checking heartbeat...
5965 [Thread-20-new_bolt-executor[3 3]] INFO  o.a.s.d.executor - Prepared bolt new_bolt:(3)


52956 [pool-51-thread-1] ERROR o.a.s.s.ShellSpout - Halting process: ShellSpout died. Command: [streamparse_run, -s json spouts.pyspout.PySpout], ProcessInfo pid:87117, name:new_spout exitCode:-1, errorString: 
java.lang.RuntimeException: subprocess heartbeat timeout
	at org.apache.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:299) [storm-core-1.1.3.jar:1.1.3]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_74]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_74]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_74]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_74]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_74]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_74]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_74]
52960 [pool-51-thread-1] ERROR o.a.s.d.executor - 
java.lang.RuntimeException: subprocess heartbeat timeout
	at org.apache.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:299) [storm-core-1.1.3.jar:1.1.3]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_74]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_74]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_74]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_74]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_74]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_74]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_74]




Fatal error: local() encountered an error (return code 11) while executing 'storm jar /data/newqspace/lusonzeng/wordcount/_build/wordcount-0.0.1-SNAPSHOT-standalone.jar org.apache.storm.flux.Flux --local --no-splash --sleep 9223372036854775807 /tmp/tmpx6Q9ki.yaml'
@amoyiki
Copy link

amoyiki commented Aug 25, 2018

you can use pykafka instead of kafka-python
spouts/words.py

from streamparse import Spout
from pykafka import KafkaClient


class WordSpout(Spout):
    outputs = ['word']

    def initialize(self, stormconf, context):
        client = KafkaClient(hosts="c1:9092,c1_1:9092,c1_2:9092")
        topic = client.topics['test'.encode('utf-8')]
        self.balanced_consumer = topic.get_balanced_consumer(
            consumer_group=b"test_group",
            auto_commit_enable=True,
            zookeeper_connect="c1:2181,c1_1:2181,c1_2:2181"
        )
        self.reg = self.regex()

    def next_tuple(self):
        message = self.balanced_consumer.consume()
        info = message.value.decode('utf-8')
        self.logger.info('==================={}'.format(info))
        word = next(self.words)
        self.emit([message])

@lusonzeng
Copy link
Author

lusonzeng commented Sep 5, 2018

i have found that it is just right to throw the exceptino when the kafka producer dost not push data intervally.

it works without exception when kafka producer push data every second.

@jhhnjhhn
Copy link

jhhnjhhn commented Dec 4, 2018

i have found that it is just right to throw the exceptino when the kafka producer dost not push data intervally.

it works without exception when kafka producer push data every second.

how do you solve it?

1 similar comment
@tianser
Copy link

tianser commented Apr 23, 2019

i have found that it is just right to throw the exceptino when the kafka producer dost not push data intervally.

it works without exception when kafka producer push data every second.

how do you solve it?

@tianser
Copy link

tianser commented Apr 23, 2019

@jhhnjhhn how do you solve it ?

@Abdelhadi92
Copy link

Abdelhadi92 commented Jan 15, 2020

This code will be fix the issue:

from streamparse import Spout
from kafka import KafkaConsumer

class PySpout(Spout):

    outputs = ['word']

    def initialize(self, stormconf, context):
        self.consumer = KafkaConsumer('test', group_id='my-group', bootstrap_servers=['localhost:9092'], consumer_timeout_ms=10)

    def next_tuple(self):
        try:
            message = self.consumer.next_v1()
            self.emit([message.value])
        except:
            pass

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants