diff --git a/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/DeviceIO.java b/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/DeviceIO.java index 45d1ac171a..427320adf0 100644 --- a/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/DeviceIO.java +++ b/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/DeviceIO.java @@ -135,35 +135,39 @@ private void startWorkerThreads() { // while startWorkerThreads should never be called when threads are already active, it doesn't hurt to double // check that any previous thread pools have been shut down. - stopWorkerThreads(); + stopSendAndReceiveThreads(); log.debug("Starting worker threads"); this.sendTaskScheduler = Executors.newScheduledThreadPool(1); this.receiveTaskScheduler = Executors.newScheduledThreadPool(1); - this.reconnectTaskScheduler = Executors.newScheduledThreadPool(1); // Note that even though these threads are scheduled at a fixed interval, the sender/receiver threads will wait // if no messages are available to process. These waiting threads will still count against the pool size defined above, // so threads will not be needlessly scheduled during times when this SDK has no messages to process. - // the scheduler waits until each execution is finished before - // scheduling the next one, so executions of a given task + // the scheduler waits until each execution is finished before scheduling the next one, so executions of a given task // will never overlap. // Note that this is scheduleWithFixedDelay, not scheduleAtFixedRate. There is no reason to spawn a new // send/receive thread until after the previous one has finished. this.sendTaskScheduler.scheduleWithFixedDelay(this.sendTask, 0, - sendPeriodInMilliseconds, TimeUnit.MILLISECONDS); + sendPeriodInMilliseconds, TimeUnit.MILLISECONDS); this.receiveTaskScheduler.scheduleWithFixedDelay(this.receiveTask, 0, + receivePeriodInMilliseconds, TimeUnit.MILLISECONDS); + + // This is only set to null if the client as a whole has been closed. This thread pool stays active through disconnected_retrying. + if (this.reconnectTaskScheduler == null) + { + this.reconnectTaskScheduler = Executors.newScheduledThreadPool(1); + this.reconnectTaskScheduler.scheduleWithFixedDelay(this.reconnectTask, 0, receivePeriodInMilliseconds, TimeUnit.MILLISECONDS); - this.reconnectTaskScheduler.scheduleWithFixedDelay(this.reconnectTask, 0, - receivePeriodInMilliseconds, TimeUnit.MILLISECONDS); + } this.state = IotHubConnectionStatus.CONNECTED; } - private void stopWorkerThreads() + private void stopSendAndReceiveThreads() { if (this.sendTaskScheduler != null) { @@ -266,11 +270,8 @@ void setReceivePeriodInMilliseconds(long newIntervalInMilliseconds) // close the old scheduler and start a new one with the new receive period this.receiveTaskScheduler.shutdown(); this.receiveTaskScheduler = Executors.newScheduledThreadPool(1); - this.receiveTaskScheduler.scheduleAtFixedRate( - this.receiveTask, - 0, - this.receivePeriodInMilliseconds, - TimeUnit.MILLISECONDS); + this.receiveTaskScheduler.scheduleAtFixedRate(this.receiveTask, 0, + this.receivePeriodInMilliseconds, TimeUnit.MILLISECONDS); } } @@ -294,11 +295,8 @@ void setSendPeriodInMilliseconds(long newIntervalInMilliseconds) // close the old scheduler and start a new one with the new send period this.sendTaskScheduler.shutdown(); this.sendTaskScheduler = Executors.newScheduledThreadPool(1); - this.sendTaskScheduler.scheduleAtFixedRate( - this.sendTask, - 0, - this.sendPeriodInMilliseconds, - TimeUnit.MILLISECONDS); + this.sendTaskScheduler.scheduleAtFixedRate(this.sendTask, 0, + this.sendPeriodInMilliseconds, TimeUnit.MILLISECONDS); } } @@ -336,7 +334,7 @@ void setMultiplexingConnectionStateCallback(IotHubConnectionStatusChangeCallback /* * IotHubTransport layer will notify this layer when the connection is established and when it is lost. This layer should start/stop - * the send/receive threads accordingly + * the send/receive/reconnect threads accordingly. */ @Override public void onStatusChanged(ConnectionStatusChangeContext connectionStatusChangeContext) @@ -347,23 +345,24 @@ public void onStatusChanged(ConnectionStatusChangeContext connectionStatusChange if (status == this.state) { - // no change in status, so no need to start/stop worker threads. + // No change in status, so no need to start/stop worker threads. return; } if (status == IotHubConnectionStatus.DISCONNECTED || status == IotHubConnectionStatus.DISCONNECTED_RETRYING) { - // No need to keep spawning send/receive tasks during reconnection or when the client is closed - this.stopWorkerThreads(); + // No need to keep spawning send/receive tasks during reconnection or when the client is closed. + this.stopSendAndReceiveThreads(); if (status == IotHubConnectionStatus.DISCONNECTED) { + // Stop reconnect task only when the client as a whole has been closed. this.stopReconnectThreads(); } } else if (status == IotHubConnectionStatus.CONNECTED) { - // Restart the task scheduler so that send/receive tasks start spawning again + // Restart the task scheduler so that send/receive tasks start spawning again. this.startWorkerThreads(); }