Skip to content

STORM-513 check heartbeat from multilang subprocess#286

Merged
asfgit merged 10 commits intoapache:masterfrom
HeartSaVioR:STORM-513
Nov 14, 2014
Merged

STORM-513 check heartbeat from multilang subprocess#286
asfgit merged 10 commits intoapache:masterfrom
HeartSaVioR:STORM-513

Conversation

@HeartSaVioR
Copy link
Contributor

Related issue link : https://issues.apache.org/jira/browse/STORM-513

It seems that ShellSpout and ShellBolt doesn't check subprocess, and set heartbeat with their only states.
Subprocess could hang, but it doesn't affect ShellSpout / ShellBolt. It just stops working on tuple.
It's better to check heartbeat from subprocess, and suicide if subprocess stops working.

  • Spout
    • ShellSpout sends "next" to subprocess continuously
    • subprocess sends "sync" to ShellSpout when "next" is received
    • so we can treat "sync", or any messages to heartbeat
  • Bolt
    • ShellBolt sends tuples to subprocess if it's available
    • so we need to send "heartbeat" tuple
    • subprocess sends "sync" to ShellBolt when "heartbeat" tuple is received

* Spout
** ShellSpout sends "next" to subprocess continuously
** subprocess sends "sync" to ShellSpout when "next" is received
** so we can treat "sync", or any messages to heartbeat
* Bolt
** ShellBolt sends tuples to subprocess if it's available
** so we need to send "heartbeat" tuple
** subprocess sends "sync" to ShellBolt when "heartbeat" tuple is
received
@dan-blanchard
Copy link
Contributor

As the person who filed the issue. Thanks for coming up with a solution so quickly!

@HeartSaVioR
Copy link
Contributor Author

@dan-blanchard You're welcome!

Actually it doesn't let subprocess sends heartbeat periodically themselves, so it can't cover some situations.
ex) subprocess is alive but cannot process "heartbeat" tuple in time
It can be possible if subprocess is too busy or subprocess takes long time to process "one" tuple.
Current approach is simpler than others, but if we should cover edge-case, I will try to take care of it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whe doing debug logging try to refrain from using +. Either surraound with isDebugLevel() or use "current time :{}, last heartbeat : {} ....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@itaifrenkel Oh, you're right! It's slf4j so we should use {} to log effectively.

@HeartSaVioR
Copy link
Contributor Author

@itaifrenkel Thanks for pointing out missed spots and points to improve!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. You should consult the comitters if they are happy with another thread, or you are requested to use Tick tuples. A pro for another thread is the fact that maybe maybe a multilang bolt would want to use tick tuples too. But still...
  2. 1 Thread should be enough.
  3. This thread must not halt the process when main exists (as in tests), so it should be daemonized. The way to do it AFAIK is this
    heartBeanExecutor = MoreExecutors.getExitingScheduledExecutorService(Executors.newScheduledThreadPool(1))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@itaifrenkel

  1. AFAIK, ShellSpout's nextTuple() waits infinitely if subprocess is hang (or doesn't send anything).
    (Same thing applies ShellBolt.)
    That's what "sync" is for.
    So we should check heartbeat with "new thread".

Please correct me if I'm wrong. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@itaifrenkel I agree that 2, 3. I'll update it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new ScheduledThreadPoolExecutor(1) --> Executors.newScheduledThreadPool(1)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@itaifrenkel
MoreExecutors.getExitingScheduledExecutorService() receives ScheduledThreadPoolExecutor, not ScheduledExecutorService. I tried to change it, but compiler complained.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could cast, But that's ok as it is

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@itaifrenkel OK, I don't like explicit cast, so I'd not modify it.

@ptgoetz
Copy link
Member

ptgoetz commented Oct 21, 2014

+1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reread the code and think that we need here just to flip an atomicboolean (a priority queue for heartbeats of size 1). The reason is that the size of the _pendingWrites queue is Config.TOPOLOGY_SHELLBOLT_MAX_PENDING which by its name is the number of real tuples to retrieve from the disruptor queue. We set it to 1 to optimize for shortest latency... which would cause this thread to block.... which means you cannot share this thread between bolts event if you wanted too... which we need to think if this is an issue or not. A stronger argument in favor of a priority queue for heartbeats is that the rate of heartbeat messages will not be skewed by the length of the queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@itaifrenkel Oh, I see. I didn't know that options exists. Thanks for letting me know!
Then we can flip (turn on) heartbeat flag which means "it's time to send heartbeat" as you state, and let BoltWriter.run() loop takes care of it first. Definitely it would be flipped (turned off) after sending.
What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds very good 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@itaifrenkel I've updated PR to reflect it.

@itaifrenkel
Copy link
Contributor

@ptgoetz Is there a shared ScheduledExecutor in storm? It seems redundant that each bolt would dedicate a thread for the scheduler. Especially now, that the task itself would be non-blocking.

* size of _pendingWrites is Config.TOPOLOGY_SHELLBOLT_MAX_PENDING
* If users set this to 1 or strict, heartbeat request can affect performance
@ptgoetz
Copy link
Member

ptgoetz commented Oct 24, 2014

@itaifrenkel No, not that's available for use via the bolt API, but it's an interesting idea. You could effectively do the same by making the scheduler static (1 per worker/JVM), but that feels kind of hacky.

@ptgoetz
Copy link
Member

ptgoetz commented Oct 24, 2014

Since there were additional commits added to the pull request, we need to give it more time for others to review before merging, but I am still +1 for the patch.

@HeartSaVioR
Copy link
Contributor Author

Can PR be included to 0.9.3, or should be pending for next version?

@clockfly
Copy link
Contributor

clockfly commented Nov 4, 2014

+1

@harshach
Copy link
Contributor

harshach commented Nov 4, 2014

@HeartSaVioR Thanks for the patch. can you upmerge the changes.

@HeartSaVioR
Copy link
Contributor Author

@harshach Sure. I'll upmerge it.

@HeartSaVioR
Copy link
Contributor Author

I've got a change to discuss about this PR with @clockfly from online yesterday , and he also stated if subprocess is too busy, subprocess cannot send heartbeat in time, which I've stated first of this PR.

Actually it's better to let subprocess have heartbeat thread and send heartbeat periodically.
But there're two things to consider.

  1. ShellSpout runs with PING-PONG communication, and ShellSpout must wait "sync" from nextTuple(). So if we change ShellSpout to have reader thread, we should implement nextTuple() to wait for reading "sync" from reader thread, which is a little complex than current.
  2. We should ensure that main thread and heartbeat thread don't write stdout (maybe Pipe) at the same time. GIL could let us feel free, but there will be other languages that support real (?) thread. Writing operation should be with lock.

Since I'm not a Javascript (nodejs) guy, and I'm a beginner to Ruby, I cannot cover two things with .js.
So I wish to implement it to other PR when we think we can't stand its limitation, or I have some more time.

Btw, Nimbus / Supervisor can find dead process due to subprocess hang up to SUPERVISOR_WORKER_TIMEOUT_SECS * 2 + a (maybe), cause there're two heartbeat check, ShellProcess checks subprocess (and suicide if subprocess cannot respond), Nimbus / Supervisor checks ShellProcess.
(Just for @clockfly )

Conflicts:
	storm-core/src/multilang/py/storm.py

* also fixes diverged py files (there're 3 storm.py files but seems to diverged)
@HeartSaVioR
Copy link
Contributor Author

OK, I've upmerged.
Btw, I found py files are diverged too so I needed to copy and paste one file to another.

@HeartSaVioR
Copy link
Contributor Author

@harshach @ptgoetz @clockfly
Could we merge upmerged PR now, or PR should take care of additional modification before merge?

@itaifrenkel
Copy link
Contributor

Please comment on STORM-528 if you resolved the py files divergence problem.

@itaifrenkel
Copy link
Contributor

@HeartSaVioR @clockfly I think we need to keep the multilang protocl implementation as simple as possible. A full roundtrip of heartbeat messages is not that bad, as long as it does not add too much latency. If you would like an optimization for the rountrip messages then you could consider any emit as an heartbeat, and trigger the heartbeat rountrip only if there are not enough emits from the bolt. It makes the java code more complicated :(, but achieves similar goals, and leaves the multilang implementation simpler :). All-in-all I think this commit is good, and we could discuss various optimizations later on.

@HeartSaVioR
Copy link
Contributor Author

@itaifrenkel @clockfly
I agree @itaifrenkel because there're more implementations (but it doesn't exist on Storm project) on multilang.
I saw php implementation of multilang (Sorry I can't remember where it is), and there could be more.
If we let subprocess take care of many things, implementations should apply it and update.
(Actually we already force them to apply this change because bolt has to treat heartbeat tuple. ;( )
So we should consider trade-off, and maybe we should have documentation of multilang specification.

AND changes of multilang protocol introduced on this PR should be documented when we announce to release 0.9.3.

@HeartSaVioR
Copy link
Contributor Author

@itaifrenkel OK, I've commented to STORM-528.

@HeartSaVioR
Copy link
Contributor Author

Maybe we can have a discussion about multilang from mailing list or other issue.
There seems to leak strategy / etc. about multilang. How do you all think?
(Sorry I found documentation related to multilang later.)

@HeartSaVioR
Copy link
Contributor Author

We should document this feature to http://storm.apache.org/documentation/Multilang-protocol.html
In bolts, bolt should handle heartbeat tuple, and send sync response ASAP.

@itaifrenkel
Copy link
Contributor

@HeartSaVioR I wanted to fix the multilang docs some time ago for nodejs support, but couldn't find a way to provide a pull request ?

@HeartSaVioR
Copy link
Contributor Author

AFAIK, there was a discussion about moving documents (containing website) to git, started by @ptgoetz, and +1 by many committers.
But it's actually not applied, so we can't fix ourselves. We can just report to dev mailing list.

Personally I'm heavily inspired to Redis documentation. Document has been treated to same as code.
It's on http://github.com/antirez/redis-doc.

@harshach
Copy link
Contributor

@HeartSaVioR I am working on doing some tests on this PR. I tried to build the storm with your changes in and I am getting these failures. Can you please check if you see any of these issues. Thanks.

java.lang.Exception: Shell Process Exception: Exception in bolt: undefined method +' for nil:NilClass - tester_bolt.rb:29:inprocess'\n/private/var/folders/yb/67h7c1sx2d95r5c_x5cjdwmh0000gp/T/ddda5ca6-8167-4ed1-bfef-a1a2001f65a2/supervisor/stormdist/test-1-1415984043/resources/storm.rb:186:in run'\ntester_bolt.rb:37:in

'
at backtype.storm.task.ShellBolt.handleError(ShellBolt.java:188) [classes/:na]
at backtype.storm.task.ShellBolt.access$1100(ShellBolt.java:69) [classes/:na]
at backtype.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:331) [classes/:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
90960 [Thread-1055] ERROR backtype.storm.task.ShellBolt - Halting process: ShellBolt died.
java.lang.RuntimeException: backtype.storm.multilang.NoOutputException: Pipe to subprocess seems to be broken! No output read.
Serializer Exception:

at backtype.storm.utils.ShellProcess.readShellMsg(ShellProcess.java:101) ~[classes/:na]
at backtype.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:318) ~[classes/:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]

@ptgoetz
Copy link
Member

ptgoetz commented Nov 14, 2014

+1 (again)

@harshach The problem you saw was due to two of the storm.rb files being out of sync. I will correct that at merge time.

@asfgit asfgit merged commit 69ddc67 into apache:master Nov 14, 2014
@hustfxj
Copy link
Contributor

hustfxj commented Nov 16, 2015

+1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants