Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

What is this PR for?

Zeppelin has been reused broken thrift client instances.
Since we can catch TException, we can discard client instances which throws TException from client pool.

What type of PR is it?

Bug Fix | Improvement

Todos

Is there a relevant Jira issue?

https://issues.apache.org/jira/browse/ZEPPELIN-534

How should this be tested?

  1. run notebook which uses spark interpreter
  2. kill spark interpreter with -9
  3. run notebook which uses killed interpreter
  4. run same notebook again and see error log has changed

output of 3

java.net.SocketException: Connection reset at java.net.SocketInputStream.read(SocketInputStream.java:196) at java.net.SocketInputStream.read(SocketInputStream.java:122) at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) at java.io.BufferedInputStream.read1(BufferedInputStream.java:275) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127) at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429) at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318) at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69) at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.recv_interpret(RemoteInterpreterService.java:220) at org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService$Client.interpret(RemoteInterpreterService.java:205) at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.interpret(RemoteInterpreter.java:225) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93) at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:211) at org.apache.zeppelin.scheduler.Job.run(Job.java:169) at org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:322) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)

output of 4

java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at org.apache.thrift.transport.TSocket.open(TSocket.java:182) at org.apache.zeppelin.interpreter.remote.ClientFactory.create(ClientFactory.java:51) at org.apache.zeppelin.interpreter.remote.ClientFactory.create(ClientFactory.java:37) at org.apache.commons.pool2.BasePooledObjectFactory.makeObject(BasePooledObjectFactory.java:60) at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:861) at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435) at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:363) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess.getClient(RemoteInterpreterProcess.java:140) at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.interpret(RemoteInterpreter.java:205) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93) at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:211) at org.apache.zeppelin.scheduler.Job.run(Job.java:169) at org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:322) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)

Result could be different how many clients instances pool makes at initial phase.
Before applying this, output of 4 would be broken pipe, which means it doesn't discard previous client instance.

Screenshots (if appropriate)

Questions:

  • Does the licenses files need update? (No)
  • Is there breaking changes for older versions? (No)
  • Does this needs documentation? (No)

* We can treat client as broken when TException occurs

Conflicts:
	zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@Leemoonsoo
Copy link
Member

Thanks for the improvement.

What do you think about overriding validateObject() in ClientFactory.java? for example

  @Override
  public boolean validateObject(PooledObject<Client> p) {
    return p.getObject().getInputProtocol().getTransport().isOpen() &&
            p.getObject().getOutputProtocol().getTransport().isOpen();
  }

I'm not 100% sure if it is identical to this PR. but if it is, this could be more simple.

@HeartSaVioR
Copy link
Contributor Author

@Leemoonsoo
I've addressed it from #576 (comment)

If TTransport.isOpen() is similar to Socket's behavior, TTransport.isOpen() can't ensure that connection is still valid. We need to ping to remote to make it clear that connection is valid.

Also, we should take care of setting of GenericObjectPoolConfig.
There're some validation modes which are testOnBorrow, testOnReturn, testWhenIdle.
If we want to replace current PR, at least we should use testOnReturn.
Safest way is testOnBorrow, since idle connections could be disconnected for some reasons. (I mean pooled object can be invalidated)

Would it better to apply "ping" to this PR and apply validation modes?
(Actually I think new approach should be fine since JedisPool - I'm a collaborator of Jedis project - works like charm.)

@HeartSaVioR
Copy link
Contributor Author

Though it works like charm, Jedis don't recommend users to use testOnBorrow and handle that thing manually since it reduces performance.
(Jedis implements Closeable to hide handling code.)

Btw, I don't think it applies to Zeppelin since count of RPC call is relatively small. Maybe it would be possible to apply testOnBorrow.

@Leemoonsoo
Copy link
Member

If TTransport.isOpen() is similar to Socket's behavior, TTransport.isOpen() can't ensure that connection is still valid

While we can not sure about it, this PR as it is looks good to me. Thanks for explanation.

@HeartSaVioR
Copy link
Contributor Author

@Leemoonsoo
I think #576 (comment) would work, so if you don't mind adding "ping" to thrift procedure list and calling it, we can realize your origin intention.
I'm also fine to leave as it is.

@Leemoonsoo
Copy link
Member

I'm merging it if there're no more discussions

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think this is intended behaviour? I think it makes Zeppelin server stopped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually releaseClient() also has this issue since clientPool.returnObject() can throw similar kind of runtime exceptions.
Just difference between returnObject() and invalidateObject() is that invalidateObject() throws checked exception, too.

If we want to release client quietly (consume any exceptions), I can address to releaseClient() and releaseBrokenClient().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it does not have side effects, i think consuming exception (and log error) could be more safe.

@HeartSaVioR
Copy link
Contributor Author

@jongyoul @Leemoonsoo Address your comment.

@Leemoonsoo
Copy link
Member

@HeartSaVioR Thanks! LGTM

@bzz
Copy link
Member

bzz commented Jan 5, 2016

Looks great to me, @HeartSaVioR thank you for the improvement.

@bzz
Copy link
Member

bzz commented Jan 5, 2016

Merging if there is no further discussion.

@asfgit asfgit closed this in fc87089 Jan 5, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants