-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PubSub: Deadlocks with nested Subscriber #26
Comments
Thanks for posting this issue. I had a quick look yesterday and was able to recreate using below code. I believe you also observed the public class TestListener {
private static final Logger LOG = Logger.getLogger(TestListener.class.getName());
private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();
private static final List<MessageReceiver> childMessageReceiver = new ArrayList<>();
private static final String SUBS_1 = "test_pubsub_sample";
private static final String SUBS_2 = "test_pubsub_sample_2";
static class OuterMessageReceiver implements MessageReceiver {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
String messageData = message.getData().toStringUtf8();
LOG.info("OUTER SUBS=> Message Id: " + message.getMessageId() + " Data: " + messageData);
consumer.ack();
if (messageData.startsWith("START_INNER_LISTENER")) { // Listener for another Subscription
ProjectSubscriptionName subscription2 = ProjectSubscriptionName.of(PROJECT_ID, SUBS_2);
try {
MessageReceiver messageReceiver = new InnerMessageReceiver();
childMessageReceiver.add(messageReceiver);
Subscriber childSubs = Subscriber.newBuilder(subscription2, messageReceiver).build();
childSubs.startAsync().awaitRunning();
// This should stop the subscriber after 2 mins but it is continue to live on.
childSubs.awaitTerminated(2, TimeUnit.MINUTES);
} catch (TimeoutException e) {
throw new IllegalStateException("Exception occurred while closing the inner subscriber");
}
}
}
}
static class InnerMessageReceiver implements MessageReceiver {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer ackReplyConsumer) {
LOG.info(
"INNER_SUBS=> Message Id: "
+ message.getMessageId()
+ " Data: "
+ message.getData().toStringUtf8());
ackReplyConsumer.ack();
}
}
public static void main(String[] args) throws Exception {
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(PROJECT_ID, SUBS_1);
try {
Subscriber outerSubscriber =
Subscriber.newBuilder(subscriptionName, new OuterMessageReceiver()).build();
outerSubscriber.startAsync().awaitRunning();
outerSubscriber.awaitTerminated();
} catch (IllegalStateException e) {
LOG.log(Level.SEVERE, "Subscriber unexpectedly stopped: ", e);
}
}
} |
I'm trying to reproduce the issue but I can't. So, either the issue is gone since that time, or we need more info regarding the way how do you get into the "deadlock":
|
I tried reproducing this issue, but was unable to do so. It is possible that this was happening because of a bug that has now been fixed (#22), in which threads were not shut down properly when a stopping a Subscriber. I am going to close this issue, as I am unable to reproduce. We can re-open if this issue is seen again. |
Can I "nest" subscribers?
Let's say I have this application, with a "main"
subscriber
, always listening; and in thereceiver
I create a newsubscriber
(for another subscription), with anawaitTerminated
with a timeout.Not always, but some times the application remain "stuck"; when this happen it's in the
awaitTerminated
.is there a way to debug this problem?
BTW: it's more common to get stuck if I run the code on kubernets than on my developer pc...
The text was updated successfully, but these errors were encountered: