Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully shutdown Ably resources #917

Closed
mighelone opened this issue Feb 8, 2023 · 9 comments
Closed

Gracefully shutdown Ably resources #917

mighelone opened this issue Feb 8, 2023 · 9 comments

Comments

@mighelone
Copy link

mighelone commented Feb 8, 2023

I am writing a new application that needs to connects to different Ably clients.
Each client will use a different token and subscribe to a different channel.
The channels have some limited life, and when some certain message is received, I want to unsubscribe the channel.

As soon as the message of unsubscribing the channel, also the client connection is closed.

The main problem is that all the threads relative to the client/channel are still actives, although I don't think they are doing anything.

Since, the application should run continuously adding new channels, I would like to know if there is clean way of closing all the threads after the channel received the closure message.

┆Issue is synchronized with this Jira Task by Unito

@sync-by-unito
Copy link

sync-by-unito bot commented Feb 8, 2023

➤ Automation for Jira commented:

The link to the corresponding Jira issue is https://ably.atlassian.net/browse/SDK-3327

@owenpearson
Copy link
Member

Hey @mighelone, thanks for reporting this! We're just looking into this now, are you able to provide some info on what the names of the active threads are and which version of the library you're using?

@mighelone
Copy link
Author

mighelone commented Feb 8, 2023

Hey @owenpearson , thanks for the replay.
The version of the APIs are 1.2.18, I just noticed that some minor updates have been released (1.12.25).

Concerning the threads, follows a partial list of the main threads existing, after closing the ably realtime client:

Thread :Thread[HttpClient-1-Worker-0,5,main]:state:TIMED_WAITING
Thread :Thread[HttpClient-1-Worker-1,5,main]:state:TIMED_WAITING
Thread :Thread[main,5,main]:state:RUNNABLE
Thread :Thread[HttpClient-1-SelectorManager,5,main]:state:RUNNABLE
Thread :Thread[Timer-0,5,main]:state:TIMED_WAITING

@AndyTWF
Copy link
Contributor

AndyTWF commented Feb 8, 2023

Hi @mighelone, thanks for the extra information!

I've had a look into this for you and I think I've found the issue you're running into - just a couple of questions to confirm my suspicions:

  • I was wondering how you configure the AblyRealtime object - do you use the autoConnect option in ClientOptions at all? The default is true, so if you haven't specifically disabled it in your ClientOptions, then there will be an autoconnect as part of the constructor.
  • Also, after constructing the AblyRealtime object, do you subsequently make an explicit call to connect()?

I've managed to reproduce the issue on 1.2.25 and spotted a bug in situations where the autoConnect option and a call to connect() explicitly are used together. This causes two connection attempts under the hood that can interfere with each other and this can result in various stale threads hanging around even after the connection is closed.

If you are doing both, you should be able to avoid this issue by either using the autoConnect option or an explicit call to connect().

In the meantime, we'll look deeper into the underlying issue and look at how we can address it - the autoconnect and explicit connect call shouldn't be interfering with each other, in any case!

@mighelone
Copy link
Author

Hey @AndyTWF thanks for the update.
I don't call directly the connect() option.
Here are the basic steps of the ably client creation:

ClientOptions options = new ClientOptions() {
      {
        token = "";
        environment = ENVIRONMENT;
        fallbackHosts = FALLBACK_HOSTS;
        authCallback =
          params -> {
               // some logic here
               return new TokenDetails(newName.accessToken);
          };
        logLevel = Log.INFO; // TODO pass as parameters
      }
    };
AblyRealtime ably = new AblyRealtime(options);
ably.connection.on(state -> {
      switch (state.current) {
        case connected:
          log.warn("I am connected!");
          break;
        case closed:
          log.warn("Connection is closed...");
          break;
        default:
          log.atDebug().log("Undefined action for status: " + state.current.name());
      }
    });
Channel channel = ably.channels.get(name.channelName);
channel.subscribe(message -> {
      Status status = checkTheStatus(message);
      switch (status) {
        case Finished:
          ably.connection.close();
          ably.close();
          break;
        default:
          logger.atDebug().log("some logs");
      }});

@owenpearson
Copy link
Member

Hey @mighelone I've tried a couple of different setups but haven't been able to reproduce this issue so far.

Here's a simple script which closes a realtime instance upon receipt of a channel message, and then prints out all threads after waiting for 2s.

package org.example;

import io.ably.lib.realtime.AblyRealtime;
import io.ably.lib.realtime.Channel;
import io.ably.lib.types.*;

import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

public class App {
    public static void main(String[] args) throws AblyException, InterruptedException {
        ClientOptions options = new ClientOptions() {
            {
                key = "<ABLY_API_KEY>";
            }
        };
        AblyRealtime ably = new AblyRealtime(options);
        ably.connection.on(state -> {
            switch (state.current) {
                case connected:
                    System.out.println("Connected");
                default:
                    break;
            }
        });

        Channel channel = ably.channels.get("test123");
        AtomicBoolean done = new AtomicBoolean(false);
        channel.subscribe(message -> {
            if (message.data.equals("Test")) {
                ably.connection.close();
                ably.close();
                System.out.println("Done");
                done.set(true);
            }
        });

        channel.publish(new Message("Test", "Test"));

        while (!done.get()) {
            Thread.sleep(5);
        }

        Thread.sleep(2000);
        Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
        threadSet.forEach((t) -> System.out.println(t.getName()));

    }
}

When I run this myself on ably-java 1.2.18 it doesn't list any ably-related threads. I have also tried using an authCallback and got the same result. Would you be able to provide a minimal repro script?

@mighelone
Copy link
Author

mighelone commented Feb 10, 2023

Hey @owenpearson ,

Here you can find a simple example of my code:

package org.example;

import java.io.IOException;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Set;

import com.models.ChannelName;
import com.models.FixtureSchedule;

import io.ably.lib.realtime.AblyRealtime;
import io.ably.lib.realtime.Channel;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.ClientOptions;

 class App{

  public static void main(String[] args)
    throws IOException, InterruptedException, ClientException, AblyException {
    
    ChannelName channelName = getChannel();  // retrieve channel name from 3rd party
 

    ClientOptions options = new ClientOptions() {
      {
        token = channelName.accessToken;
        environment = ENVIRONMENT;  // 3rd party provided 
        fallbackHosts = AblyClient.FALLBACK_HOSTS;  
      }
    };
    AblyRealtime ably = new AblyRealtime(options);
    ably.connection.on(state -> System.out.println(state.current.name()));
    Channel channel = ably.channels.get(channelName.getChannelName());
    channel.subscribe(message -> System.out.println(message.timestamp));

    Thread.sleep(10000);
    System.out.println("Stopping ably");
    ably.connection.close();
    ably.close();


    Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
    for ( Thread t : threadSet){
        if ( t.getThreadGroup() == Thread.currentThread().getThreadGroup()){
          System.out.println("Thread :"+t+":"+"state:"+t.getState());
        }
    }
  }
}

Just for clarity, I am connecting to a cluster managed from one of your customers, and I need to specify the fallbackHosts and the environment in the ChannelOptions.
When I run this code, the execution process hangs indefinitely and I have to manually stop.
If I run the same code, using the channel here https://ably.com/hub/ably-coindesk/bitcoin, everything works fine, and the application exits normally.

The only difference I can notice between the two runs in terms of threads is:
In the hanging application:
Thread :Thread[WebSocketWriteThread-38,5,main]:state:WAITING ,
while in the completed application:
Thread :Thread[WebSocketWriteThread-29,5,main]:state:RUNNABLE

@QuintinWillison
Copy link
Contributor

@mighelone I've been asked to look at this as @owenpearson and @AndyTWF are both busy on other things. I'll admit, I'm struggling to try to work out what we're trying to debug and investigate here. I think we need more information from you, please.

First, and foremost, verbose log output - including the messages you're printing to System.out, ideally with timestamps.

In respect of:

When I run this code, the execution process hangs indefinitely and I have to manually stop.

I'm unclear as to when in your app's execution this is happening. We need more detail here, which the verbose logs will help with.

Also, could you please provide us with the concrete values being returned by your ChannelName API, as this is all very abstract right now and difficult to emulate at our end. There's not much to go on from our side. Ideally it would be good to see some self-contained code without external dependencies that are invisible to us.

If you need to share anything (e.g. sensitive/private) in a venue outside of this public GitHub issue then we can get that arranged.

@ttypic
Copy link
Contributor

ttypic commented Apr 23, 2024

Hey @mighelone,

Just wanted to check in on this issue since there hasn't been any activity for a while. We're closing it since there's been no recent discussion.

If you still feel this needs attention or have any updates, please reopen it, and we can keep the conversation going!

@ttypic ttypic closed this as completed Apr 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

5 participants