Skip to content

Commit

Permalink
[pulsar-function] fix possible deadlock on broker-function service st…
Browse files Browse the repository at this point in the history
…artup (#9499)

### Motivation

Standalone pulsar broker and function service can have deadlock which also sometime causes failures in function unit-test case. Pulsar function tries to create a subscription which blocks the zk thread and can be cause of possible deadlock. Below is the thread-dump when the deadlock happens and function service start up fails. 

So, remove blocking call while creating subscription using admin-api.



```

"pulsar-load-manager-139-1" #717 prio=5 os_prio=31 tid=0x00007f8a68698000 nid=0x3610f waiting on condition [0x00007000156e9000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000792a07020> (a java.util.concurrent.CompletableFuture$Signaller)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1709)
        at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
        at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1788)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
        at org.apache.pulsar.zookeeper.ZooKeeperDataCache.get(ZooKeeperDataCache.java:97)
        at org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.getDynamicConfigurationFromZK(SimpleLoadManagerImpl.java:391)
        at org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.getDynamicConfigurationDouble(SimpleLoadManagerImpl.java:401)
        at org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.getLoadBalancerBrokerOverloadedThresholdPercentage(SimpleLoadManagerImpl.java:450)
        at org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.generateLoadReportForcefully(SimpleLoadManagerImpl.java:1139)
        - locked <0x000000078e53f2d0> (a java.util.HashSet)
        at org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl.writeLoadReportOnZookeeper(SimpleLoadManagerImpl.java:1295)
        at org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask.run(LoadReportUpdaterTask.java:39)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)



"pulsar-ordered-OrderedExecutor-1-0-EventThread" #621 daemon prio=5 os_prio=31 tid=0x00007f89fd8e1800 nid=0x2fe07 waiting on condition [0x000070000f7ce000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000792ecb8b0> (a java.util.concurrent.CompletableFuture$Signaller)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
        at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
        at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalCreateSubscriptionForNonPartitionedTopic(PersistentTopicsBase.java:2060)
        at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.lambda$69(PersistentTopicsBase.java:2034)
        at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase$$Lambda$676/1709042625.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.pulsar.zookeeper.ZooKeeperCache.lambda$5(ZooKeeperCache.java:249)
        at org.apache.pulsar.zookeeper.ZooKeeperCache$$Lambda$154/5998675.processResult(Unknown Source)
        at org.apache.bookkeeper.zookeeper.ZooKeeperClient$15$1.processResult(ZooKeeperClient.java:879)
        at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:583)
        at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:510)


main" #1 prio=5 os_prio=31 tid=0x00007f8aa0013000 nid=0x1603 waiting on condition [0x0000700004160000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000007930efe68> (a java.util.concurrent.CompletableFuture$Signaller)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
        at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
        at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.apply(AsyncHttpConnector.java:178)
        at org.glassfish.jersey.client.ClientRuntime.invoke(ClientRuntime.java:297)
        at org.glassfish.jersey.client.JerseyInvocation.lambda$invoke$1(JerseyInvocation.java:632)
        at org.glassfish.jersey.client.JerseyInvocation$$Lambda$672/237755480.call(Unknown Source)
        at org.glassfish.jersey.client.JerseyInvocation.call(JerseyInvocation.java:654)
        at org.glassfish.jersey.client.JerseyInvocation.lambda$runInScope$3(JerseyInvocation.java:648)
        at org.glassfish.jersey.client.JerseyInvocation$$Lambda$673/197134325.call(Unknown Source)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:205)
        at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:390)
        at org.glassfish.jersey.client.JerseyInvocation.runInScope(JerseyInvocation.java:648)
        at org.glassfish.jersey.client.JerseyInvocation.invoke(JerseyInvocation.java:631)
        at org.glassfish.jersey.client.JerseyInvocation$Builder.method(JerseyInvocation.java:434)
        at org.glassfish.jersey.client.JerseyInvocation$Builder.put(JerseyInvocation.java:318)
        at org.apache.pulsar.client.admin.internal.TopicsImpl.createSubscription(TopicsImpl.java:1143)
        at org.apache.pulsar.functions.worker.PulsarWorkerService.start(PulsarWorkerService.java:454)
        at org.apache.pulsar.broker.PulsarService.startWorkerService(PulsarService.java:1343)
        at org.apache.pulsar.broker.PulsarService.start(PulsarService.java:671)
        at org.apache.pulsar.io.PulsarFunctionE2ETest.setup(PulsarFunctionE2ETest.java:209)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
```
  • Loading branch information
rdhabalia authored Feb 9, 2021
1 parent d5edbd9 commit d654d5e
Showing 1 changed file with 28 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2124,30 +2124,34 @@ private void internalCreateSubscriptionForNonPartitionedTopic(
asyncResponse.resume(new RestException(Status.CONFLICT, "Subscription already exists for topic"));
return;
}
PersistentSubscription subscription = (PersistentSubscription) topic
.createSubscription(subscriptionName, InitialPosition.Latest, replicated).get();
// Mark the cursor as "inactive" as it was created without a real consumer connected
subscription.deactivateCursor();
subscription.resetCursor(PositionImpl.get(targetMessageId.getLedgerId(), targetMessageId.getEntryId()))
.thenRun(() -> {
log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(),
topicName, subscriptionName, targetMessageId);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
log.warn("[{}][{}] Failed to create subscription {} at message id {}", clientAppId(), topicName,
subscriptionName, targetMessageId, t);
if (t instanceof SubscriptionInvalidCursorPosition) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Unable to find position for position specified: " + t.getMessage()));
} else if (t instanceof SubscriptionBusyException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Failed for Subscription Busy: " + t.getMessage()));
} else {
resumeAsyncResponseExceptionally(asyncResponse, t);
}
return null;
});
topic.createSubscription(subscriptionName, InitialPosition.Latest, replicated).thenApply(subscription -> {
// Mark the cursor as "inactive" as it was created without a real consumer connected
((PersistentSubscription) subscription).deactivateCursor();
subscription.resetCursor(PositionImpl.get(targetMessageId.getLedgerId(), targetMessageId.getEntryId()))
.thenRun(() -> {
log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(),
topicName, subscriptionName, targetMessageId);
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
log.warn("[{}][{}] Failed to create subscription {} at message id {}", clientAppId(),
topicName, subscriptionName, targetMessageId, t);
if (t instanceof SubscriptionInvalidCursorPosition) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Unable to find position for position specified: " + t.getMessage()));
} else if (t instanceof SubscriptionBusyException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Failed for Subscription Busy: " + t.getMessage()));
} else {
resumeAsyncResponseExceptionally(asyncResponse, t);
}
return null;
});
return null;
}).exceptionally(ex -> {
resumeAsyncResponseExceptionally(asyncResponse, ex.getCause());
return null;
});
} catch (Throwable e) {
log.warn("[{}][{}] Failed to create subscription {} at message id {}", clientAppId(), topicName,
subscriptionName, targetMessageId, e);
Expand Down

0 comments on commit d654d5e

Please sign in to comment.