Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failure creating a subscription on a (fairly) new connection #316

Closed
ColinSullivan1 opened this issue May 28, 2020 · 11 comments
Closed

Failure creating a subscription on a (fairly) new connection #316

ColinSullivan1 opened this issue May 28, 2020 · 11 comments
Assignees

Comments

@ColinSullivan1
Copy link
Member

I'm encountering an exception when creating a subscription on a new connection. The new connection has only had a few messages passed through it based on the message stats. I'm working on isolating a use case and will add to this issue if successful.

The goal here is to have an application receive a message, and from within the callback create a new connection and subscribe. The connection used here and the attempt to subscribe were created/called from within the callback of a different connection.

            System.out.println("setupMessageHandling, conn="+ c);
            d = c.createDispatcher(this);
            d.subscribe("control.*");
            c.flush(Duration.ofSeconds(2));

What's odd is that that if the connection has never been used to publish a message this does not occur.

The result:

NATS @ ~/Dropbox/go/src/github.com/ColinSullivan1/java-examples () $ ./publoss.sh 
setupMessageHandling, conn=io.nats.client.impl.NatsConnection@31221be2
Sending 100000 messages of 128 bytes on foo, server is nats://localhost:4222
Received control message on control.migrate.
newConn=io.nats.client.impl.NatsConnection@4e3e8e61
oldConn=io.nats.client.impl.NatsConnection@31221be2
publishing paused...
publishing paused...
NATS Connection Event: nats: connection closed
Done with migration.
Conn Statistics: 
### Connection ###
Reconnects:                      0
### Reader ###
Messages in:                     0
Bytes in:                        0
### Writer ###
Messages out:                    9
Bytes out:                       1,152
setupMessageHandling, conn=io.nats.client.impl.NatsConnection@4e3e8e61
Migration exception: Output queue is full 0
java.lang.IllegalStateException: Output queue is full 0
	at io.nats.client.impl.MessageQueue.push(MessageQueue.java:122)
	at io.nats.client.impl.MessageQueue.push(MessageQueue.java:108)
	at io.nats.client.impl.NatsConnectionWriter.queue(NatsConnectionWriter.java:191)
	at io.nats.client.impl.NatsConnection.queueOutgoing(NatsConnection.java:1316)
	at io.nats.client.impl.NatsConnection.sendSubscriptionMessage(NatsConnection.java:938)
	at io.nats.client.impl.NatsConnection.createSubscription(NatsConnection.java:908)
	at io.nats.client.impl.NatsDispatcher.subscribeImpl(NatsDispatcher.java:252)
	at io.nats.client.impl.NatsDispatcher.subscribe(NatsDispatcher.java:194)
	at io.nats.java.examples.ControlPlane.setupMessageHandling(ControlPlane.java:32)
	at io.nats.java.examples.ControlPlane.migrate(ControlPlane.java:63)
	at io.nats.java.examples.ControlPlane.onMessage(ControlPlane.java:77)
	at io.nats.client.impl.NatsDispatcher.run(NatsDispatcher.java:98)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

The problem appears to be in adding to an empty LinkedBlockingQueue.

if (!this.offer(msg)) {
throw new IllegalStateException("Output queue is full " + queue.size());
}

Environment

MacOS Catalina

$ java -version
java version "1.8.0_231"
Java(TM) SE Runtime Environment (build 1.8.0_231-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.231-b11, mixed mode)

Any ideas? Thanks!

@ColinSullivan1
Copy link
Member Author

Here's test code that will reproduce this issue. Maybe I'm doing something wrong... If I comment out the close in the callback, it works. That being said, I don't see how close on one connection should affect another.

Code

    static public void main(String argv[]) {
        try {
            // setup connection
            Connection nc = Nats.connect("localhost:4222");

            // setup a message handler that creates another connection
            // and closes the original connection.
            MessageHandler mh = new MessageHandler() {
                public void onMessage(Message msg) throws InterruptedException {
                    try {
                        // create a new connection here
                        Connection nc2 = Nats.connect("localhost:4222");
            
                        // close the original conn
                        try {
                          nc.close();
                        } catch (Exception e) {
                            // Doesn't get hit
                            e.printStackTrace();
                        }
            
                        // recreate subscription on the new connection
                        Dispatcher d2 = nc2.createDispatcher(this);
                        d2.subscribe("foo");  // <- exception here
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                } 
            };

            // setup handler and subscription
            Dispatcher d1 = nc.createDispatcher(mh);
            d1 .subscribe("foo");
            nc.flush(Duration.ofSeconds(2));

            // Invoke the callback from a completely different connection
            Nats.connect("localhost:4222").publish("foo", new byte[64]);

            Thread.sleep(2000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Done");
    }

Output

bash-3.2$  /Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home/bin/java -Dfile.encoding=UTF-8 -cp /var/folders/2x/d1g9l2xd5xl17ktltf8pl7wh0000gn/T/cp_emg817tpcc6tlwijoobi02k26.jar io.nats.java.examples.Example 
java.lang.IllegalStateException: Output queue is full 0
        at io.nats.client.impl.MessageQueue.push(MessageQueue.java:122)
        at io.nats.client.impl.MessageQueue.push(MessageQueue.java:108)
        at io.nats.client.impl.NatsConnectionWriter.queue(NatsConnectionWriter.java:191)
        at io.nats.client.impl.NatsConnection.queueOutgoing(NatsConnection.java:1316)
        at io.nats.client.impl.NatsConnection.sendSubscriptionMessage(NatsConnection.java:938)
        at io.nats.client.impl.NatsConnection.createSubscription(NatsConnection.java:908)
        at io.nats.client.impl.NatsDispatcher.subscribeImpl(NatsDispatcher.java:252)
        at io.nats.client.impl.NatsDispatcher.subscribe(NatsDispatcher.java:194)
        at io.nats.java.examples.Example$1.onMessage(Example.java:32)
        at io.nats.client.impl.NatsDispatcher.run(NatsDispatcher.java:98)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Done

@sasbury
Copy link
Contributor

sasbury commented May 28, 2020 via email

@ColinSullivan1
Copy link
Member Author

Looks like it was introduced in 2.6.7...

Version 2.6.6
Done
^C=============
Version 2.6.7
Unable to stop reader thread
java.lang.IllegalStateException: Output queue is full 0
	at io.nats.client.impl.MessageQueue.push(MessageQueue.java:107)
	at io.nats.client.impl.NatsConnectionWriter.queue(NatsConnectionWriter.java:188)
	at io.nats.client.impl.NatsConnection.queueOutgoing(NatsConnection.java:1321)
	at io.nats.client.impl.NatsConnection.sendSubscriptionMessage(NatsConnection.java:943)
	at io.nats.client.impl.NatsConnection.createSubscription(NatsConnection.java:913)
	at io.nats.client.impl.NatsDispatcher.subscribeImpl(NatsDispatcher.java:252)
	at io.nats.client.impl.NatsDispatcher.subscribe(NatsDispatcher.java:194)
	at io.nats.java.examples.Example$1.onMessage(Example.java:32)
	at io.nats.client.impl.NatsDispatcher.run(NatsDispatcher.java:98)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Done
^C=============
Version 2.6.8
java.lang.IllegalStateException: Output queue is full 0
	at io.nats.client.impl.MessageQueue.push(MessageQueue.java:122)
	at io.nats.client.impl.MessageQueue.push(MessageQueue.java:108)
	at io.nats.client.impl.NatsConnectionWriter.queue(NatsConnectionWriter.java:191)
	at io.nats.client.impl.NatsConnection.queueOutgoing(NatsConnection.java:1316)
	at io.nats.client.impl.NatsConnection.sendSubscriptionMessage(NatsConnection.java:938)
	at io.nats.client.impl.NatsConnection.createSubscription(NatsConnection.java:908)
	at io.nats.client.impl.NatsDispatcher.subscribeImpl(NatsDispatcher.java:252)
	at io.nats.client.impl.NatsDispatcher.subscribe(NatsDispatcher.java:194)
	at io.nats.java.examples.Example$1.onMessage(Example.java:32)
	at io.nats.client.impl.NatsDispatcher.run(NatsDispatcher.java:98)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Done
^C=============

@sasbury
Copy link
Contributor

sasbury commented May 29, 2020

looks like you an Richard worked on that one, maybe he has some ideas, i will try to look if you don't see anything

@ColinSullivan1
Copy link
Member Author

I think I've found out the issue, or at least have a working theory. The initial connection's close from within the callback will interrupt the callback thread itself, so the first "interruptable" API after the close throws an InterruptedException. In the example provided it is queue.offer found in subscribe:

boolean offer(NatsMessage msg) {
try {
return this.queue.offer(msg, 5, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
return false;
}
}

offer was throwing an interrupted exception, so the method returned false. Calling code assumed the queue was full upon a false return code.

In the example code if called Thread.sleep(1) immediately after the close, the sleep is interrupted instead.

To resolve, if a connection is closed, should user code in the callback be interrupted, or should the callback be allowed to finish?

@RichardHightower, @sasbury , wdyt?

@sasbury
Copy link
Contributor

sasbury commented Jun 9, 2020 via email

@RichardHightower
Copy link
Contributor

I will reproduce it locally today and try to reason on it a bit.

@ivanr
Copy link

ivanr commented Aug 17, 2020

Related: #305

@mdford
Copy link

mdford commented Nov 19, 2020

Was there ever a "fix" for this? I have inherited a project that I am attempting to retrofit with Nats. I am encountering the same bug/message due to the way the original code was written. There are times when the current code will "timeout" in a function and does a retry which calls the section of code again with the .publish() call which then throws the java.lang.IllegalStateException: Output queue is full 0 exception. Once this happens, the publish() method will throw this every time it is called. How did you resolve this or did you find a workaround that will allow publish to be called again from an interrupted thread?

Thanks!

@scottf scottf self-assigned this Mar 8, 2021
@scottf
Copy link
Contributor

scottf commented Mar 13, 2021

The initial connection's close from within the callback will interrupt the callback thread itself, so the first "interruptable" API after the close throws an InterruptedException.

Yes, I came to the same conclusion. Connection 1 subscribes to a message and then receives that message. While processing the receive, the code tries to close the connection that it is currently on, which makes sure that there are no messages in flight. But there are, so it can't close, so it times out, throws an interrupted exception, which basically leaves the connection in a broken state.
Then when you open a new connection and try to publish, even though it's a new connection, it's the same vm, and the underlying queue mechanism shares a lock because it's the same thread, which can't be acquired so the queue operation fails. The error message queue is full is misleading because the queue is not full, but the queue does not discern between the queue being full failure and other failures while offering something to the queue.

This is why closing after the subscribe works and closing before the subscribe fails. So here is a fix. Make the second subscribe in a different thread.

nc2.getOptions().getExecutor().submit(() -> {
    Dispatcher d2 = nc2.createDispatcher(this);
    d2.subscribe("foo");  // <- exception here
});

@scottf
Copy link
Contributor

scottf commented Feb 23, 2022

Closing, not response. Fixed with code example in comments

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants