Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(iot-dev): fix thread-leak issue with IotHubReconnectTask #1567

Merged
merged 5 commits into from
Jul 6, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,35 +135,39 @@ private void startWorkerThreads()
{
// while startWorkerThreads should never be called when threads are already active, it doesn't hurt to double
// check that any previous thread pools have been shut down.
stopWorkerThreads();
stopSendAndReceiveThreads();

log.debug("Starting worker threads");

this.sendTaskScheduler = Executors.newScheduledThreadPool(1);
this.receiveTaskScheduler = Executors.newScheduledThreadPool(1);
this.reconnectTaskScheduler = Executors.newScheduledThreadPool(1);

// Note that even though these threads are scheduled at a fixed interval, the sender/receiver threads will wait
// if no messages are available to process. These waiting threads will still count against the pool size defined above,
// so threads will not be needlessly scheduled during times when this SDK has no messages to process.

// the scheduler waits until each execution is finished before
// scheduling the next one, so executions of a given task
// the scheduler waits until each execution is finished before scheduling the next one, so executions of a given task
// will never overlap.

// Note that this is scheduleWithFixedDelay, not scheduleAtFixedRate. There is no reason to spawn a new
// send/receive thread until after the previous one has finished.
this.sendTaskScheduler.scheduleWithFixedDelay(this.sendTask, 0,
sendPeriodInMilliseconds, TimeUnit.MILLISECONDS);
sendPeriodInMilliseconds, TimeUnit.MILLISECONDS);
this.receiveTaskScheduler.scheduleWithFixedDelay(this.receiveTask, 0,
receivePeriodInMilliseconds, TimeUnit.MILLISECONDS);

// This is only set to null if the client as a whole has been closed. This thread pool stays active through disconnected_retrying.
if (this.reconnectTaskScheduler == null)
{
this.reconnectTaskScheduler = Executors.newScheduledThreadPool(1);
this.reconnectTaskScheduler.scheduleWithFixedDelay(this.reconnectTask, 0,
receivePeriodInMilliseconds, TimeUnit.MILLISECONDS);
this.reconnectTaskScheduler.scheduleWithFixedDelay(this.reconnectTask, 0,
receivePeriodInMilliseconds, TimeUnit.MILLISECONDS);
}

this.state = IotHubConnectionStatus.CONNECTED;
}

private void stopWorkerThreads()
private void stopSendAndReceiveThreads()
brycewang-microsoft marked this conversation as resolved.
Show resolved Hide resolved
{
if (this.sendTaskScheduler != null)
{
Expand Down Expand Up @@ -266,11 +270,8 @@ void setReceivePeriodInMilliseconds(long newIntervalInMilliseconds)
// close the old scheduler and start a new one with the new receive period
this.receiveTaskScheduler.shutdown();
this.receiveTaskScheduler = Executors.newScheduledThreadPool(1);
this.receiveTaskScheduler.scheduleAtFixedRate(
this.receiveTask,
0,
this.receivePeriodInMilliseconds,
TimeUnit.MILLISECONDS);
this.receiveTaskScheduler.scheduleAtFixedRate(this.receiveTask, 0,
this.receivePeriodInMilliseconds, TimeUnit.MILLISECONDS);
}
}

Expand All @@ -294,11 +295,8 @@ void setSendPeriodInMilliseconds(long newIntervalInMilliseconds)
// close the old scheduler and start a new one with the new send period
this.sendTaskScheduler.shutdown();
this.sendTaskScheduler = Executors.newScheduledThreadPool(1);
this.sendTaskScheduler.scheduleAtFixedRate(
this.sendTask,
0,
this.sendPeriodInMilliseconds,
TimeUnit.MILLISECONDS);
this.sendTaskScheduler.scheduleAtFixedRate(this.sendTask, 0,
this.sendPeriodInMilliseconds, TimeUnit.MILLISECONDS);
}
}

Expand Down Expand Up @@ -336,7 +334,7 @@ void setMultiplexingConnectionStateCallback(IotHubConnectionStatusChangeCallback

/*
* IotHubTransport layer will notify this layer when the connection is established and when it is lost. This layer should start/stop
* the send/receive threads accordingly
* the send/receive/reconnect threads accordingly.
*/
@Override
public void onStatusChanged(ConnectionStatusChangeContext connectionStatusChangeContext)
Expand All @@ -347,23 +345,24 @@ public void onStatusChanged(ConnectionStatusChangeContext connectionStatusChange

if (status == this.state)
{
// no change in status, so no need to start/stop worker threads.
// No change in status, so no need to start/stop worker threads.
return;
}

if (status == IotHubConnectionStatus.DISCONNECTED || status == IotHubConnectionStatus.DISCONNECTED_RETRYING)
{
// No need to keep spawning send/receive tasks during reconnection or when the client is closed
this.stopWorkerThreads();
// No need to keep spawning send/receive tasks during reconnection or when the client is closed.
this.stopSendAndReceiveThreads();

if (status == IotHubConnectionStatus.DISCONNECTED)
{
// Stop reconnect task only when the client as a whole has been closed.
this.stopReconnectThreads();
}
}
else if (status == IotHubConnectionStatus.CONNECTED)
{
// Restart the task scheduler so that send/receive tasks start spawning again
// Restart the task scheduler so that send/receive tasks start spawning again.
this.startWorkerThreads();
}

Expand Down