-
Notifications
You must be signed in to change notification settings - Fork 233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(iot-dev): Add configurable message expiration check job #1762
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -79,6 +79,9 @@ public class IotHubTransport implements IotHubListener | |
// should stop spawning send/receive threads when this layer is disconnected or disconnected retrying | ||
private final IotHubConnectionStatusChangeCallback deviceIOConnectionStatusChangeCallback; | ||
|
||
// Lock on reading and writing on the waitingPackets queue | ||
final private Object waitingPacketsLock = new Object(); | ||
|
||
// Lock on reading and writing on the inProgressPackets map | ||
final private Object inProgressMessagesLock = new Object(); | ||
|
||
|
@@ -130,6 +133,10 @@ public class IotHubTransport implements IotHubListener | |
private final Thread correlationCallbackCleanupThread = new Thread(() -> checkForOldMessages()); | ||
private static final int CORRELATION_CALLBACK_CLEANUP_PERIOD_MILLISECONDS = 60 * 60 * 1000; | ||
|
||
// A job that runs periodically to remove any expired messages from the in progress and waiting queue | ||
private final Thread expiredMessagesCleanupThread = new Thread(() -> checkForExpiredOutgoingMessages()); | ||
private final long messageExpirationCheckPeriod; | ||
|
||
/** | ||
* Constructor for an IotHubTransport object with default values | ||
* | ||
|
@@ -160,6 +167,7 @@ public IotHubTransport(ClientConfiguration defaultConfig, IotHubConnectionStatus | |
this.useIdentifiableThreadNames = defaultConfig.isUsingIdentifiableThreadNames(); | ||
this.threadNamePrefix = defaultConfig.getThreadNamePrefix(); | ||
this.threadNameSuffix = defaultConfig.getThreadNameSuffix(); | ||
this.messageExpirationCheckPeriod = defaultConfig.getMessageExpiredCheckPeriod(); | ||
} | ||
|
||
public IotHubTransport( | ||
|
@@ -172,7 +180,8 @@ public IotHubTransport( | |
int sendInterval, | ||
boolean useIdentifiableThreadNames, | ||
String threadNamePrefix, | ||
String threadNameSuffix) throws IllegalArgumentException | ||
String threadNameSuffix, | ||
long messageExpirationCheckPeriod) throws IllegalArgumentException | ||
{ | ||
this.protocol = protocol; | ||
this.hostName = hostName; | ||
|
@@ -187,6 +196,7 @@ public IotHubTransport( | |
this.useIdentifiableThreadNames = useIdentifiableThreadNames; | ||
this.threadNamePrefix = threadNamePrefix; | ||
this.threadNameSuffix = threadNameSuffix; | ||
this.messageExpirationCheckPeriod = messageExpirationCheckPeriod; | ||
} | ||
|
||
public Semaphore getSendThreadSemaphore() | ||
|
@@ -763,35 +773,38 @@ public void sendMessages() | |
|
||
int timeSlice = maxNumberOfMessagesToSendPerThread; | ||
|
||
while (this.connectionStatus == IotHubConnectionStatus.CONNECTED && timeSlice-- > 0) | ||
synchronized (this.waitingPacketsLock) | ||
{ | ||
IotHubTransportPacket packet = waitingPacketsQueue.poll(); | ||
|
||
if (packet != null) | ||
while (this.connectionStatus == IotHubConnectionStatus.CONNECTED && timeSlice-- > 0) | ||
{ | ||
Message message = packet.getMessage(); | ||
log.trace("Dequeued a message from waiting queue to be sent ({})", message); | ||
IotHubTransportPacket packet = waitingPacketsQueue.poll(); | ||
|
||
if (message != null && this.isMessageValid(packet)) | ||
if (packet != null) | ||
{ | ||
sendPacket(packet); | ||
Message message = packet.getMessage(); | ||
log.trace("Dequeued a message from waiting queue to be sent ({})", message); | ||
|
||
try | ||
if (message != null && this.isMessageValid(packet)) | ||
{ | ||
String correlationId = message.getCorrelationId(); | ||
sendPacket(packet); | ||
|
||
if (!correlationId.isEmpty()) | ||
try | ||
{ | ||
CorrelationCallbackContext callbackContext = correlationCallbacks.get(correlationId); | ||
if (callbackContext != null && callbackContext.getCallback() != null) | ||
String correlationId = message.getCorrelationId(); | ||
|
||
if (!correlationId.isEmpty()) | ||
{ | ||
callbackContext.getCallback().onRequestSent(message, callbackContext.getUserContext()); | ||
CorrelationCallbackContext callbackContext = correlationCallbacks.get(correlationId); | ||
if (callbackContext != null && callbackContext.getCallback() != null) | ||
{ | ||
callbackContext.getCallback().onRequestSent(message, callbackContext.getUserContext()); | ||
} | ||
} | ||
} | ||
} | ||
catch (Exception e) | ||
{ | ||
log.warn("Exception thrown while calling the onRequestSent callback in sendMessages", e); | ||
catch (Exception e) | ||
{ | ||
log.warn("Exception thrown while calling the onRequestSent callback in sendMessages", e); | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -816,28 +829,31 @@ String getDeviceClientUniqueIdentifier() | |
|
||
private void checkForExpiredMessages() | ||
{ | ||
//Check waiting packets, remove any that have expired. | ||
IotHubTransportPacket packet = this.waitingPacketsQueue.poll(); | ||
Queue<IotHubTransportPacket> packetsToAddBackIntoWaitingPacketsQueue = new LinkedBlockingQueue<>(); | ||
while (packet != null) | ||
synchronized (this.waitingPacketsQueue) | ||
{ | ||
if (packet.getMessage().isExpired()) | ||
//Check waiting packets, remove any that have expired. | ||
IotHubTransportPacket packet = this.waitingPacketsQueue.poll(); | ||
Queue<IotHubTransportPacket> packetsToAddBackIntoWaitingPacketsQueue = new LinkedBlockingQueue<>(); | ||
while (packet != null) | ||
{ | ||
packet.setStatus(IotHubStatusCode.MESSAGE_EXPIRED); | ||
this.addToCallbackQueue(packet); | ||
} | ||
else | ||
{ | ||
//message not expired, requeue it | ||
packetsToAddBackIntoWaitingPacketsQueue.add(packet); | ||
if (packet.getMessage().isExpired()) | ||
{ | ||
packet.setStatus(IotHubStatusCode.MESSAGE_EXPIRED); | ||
this.addToCallbackQueue(packet); | ||
} | ||
else | ||
{ | ||
//message not expired, requeue it | ||
packetsToAddBackIntoWaitingPacketsQueue.add(packet); | ||
} | ||
|
||
packet = this.waitingPacketsQueue.poll(); | ||
} | ||
|
||
packet = this.waitingPacketsQueue.poll(); | ||
//Requeue all the non-expired messages. | ||
this.waitingPacketsQueue.addAll(packetsToAddBackIntoWaitingPacketsQueue); | ||
} | ||
|
||
//Requeue all the non-expired messages. | ||
this.waitingPacketsQueue.addAll(packetsToAddBackIntoWaitingPacketsQueue); | ||
|
||
//Check in progress messages | ||
synchronized (this.inProgressMessagesLock) | ||
{ | ||
|
@@ -894,6 +910,26 @@ private void checkForOldMessages() | |
} | ||
} | ||
|
||
private void checkForExpiredOutgoingMessages() | ||
{ | ||
try | ||
{ | ||
Thread.currentThread().setName("azure-iot-sdk-IotHubMessageExpiryCheckerTask"); | ||
while (true) | ||
{ | ||
Thread.sleep(this.messageExpirationCheckPeriod); | ||
checkForExpiredMessages(); | ||
invokeCallbacks(); | ||
} | ||
|
||
} | ||
catch (InterruptedException e) | ||
{ | ||
// The exception can be ignored since this thread is interrupted when the client is closing. | ||
// Once interrupted, simply end this thread. | ||
} | ||
} | ||
|
||
/** | ||
* Invokes the callbacks for all completed requests. | ||
*/ | ||
|
@@ -1717,10 +1753,24 @@ else if (this.getDefaultConfig() != null | |
{ | ||
// Thread has already started. No need to report this exception | ||
} | ||
|
||
try | ||
{ | ||
// 0 means that the user doesn't want to ever run this check | ||
if (messageExpirationCheckPeriod != 0) | ||
{ | ||
expiredMessagesCleanupThread.start(); | ||
} | ||
} | ||
catch (IllegalThreadStateException e) | ||
{ | ||
// Thread has already started. No need to report this exception | ||
} | ||
} | ||
else if (newConnectionStatus == IotHubConnectionStatus.DISCONNECTED) | ||
{ | ||
correlationCallbackCleanupThread.interrupt(); | ||
expiredMessagesCleanupThread.interrupt(); | ||
} | ||
} | ||
} | ||
|
@@ -1766,20 +1816,8 @@ private void updateStatus(IotHubConnectionStatus newConnectionStatus, IotHubConn | |
invokeConnectionStatusChangeCallback(newConnectionStatus, previousStatus, reason, throwable, deviceId); | ||
} | ||
|
||
if (newConnectionStatus == IotHubConnectionStatus.CONNECTED) | ||
{ | ||
try | ||
{ | ||
correlationCallbackCleanupThread.start(); | ||
} | ||
catch (IllegalThreadStateException e) | ||
{ | ||
// Thread has already started. No need to report this exception | ||
} | ||
} | ||
else if (newConnectionStatus == IotHubConnectionStatus.DISCONNECTED) | ||
if (newConnectionStatus == IotHubConnectionStatus.DISCONNECTED) | ||
{ | ||
correlationCallbackCleanupThread.interrupt(); | ||
finalizeMultiplexedDevicesMessages(deviceId); | ||
} | ||
} | ||
|
@@ -1913,28 +1951,6 @@ private boolean hasOperationTimedOut(long startTime) | |
return (System.currentTimeMillis() - startTime) > this.getDefaultConfig().getOperationTimeout(); | ||
} | ||
|
||
/** | ||
* Returns if the provided packet has lasted longer than the device operation timeout | ||
* | ||
* @return true if the packet has been in the queues for longer than the device operation timeout and false otherwise | ||
*/ | ||
private boolean hasOperationTimedOut(long startTime, String deviceId) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unused method |
||
{ | ||
if (startTime == 0) | ||
{ | ||
return false; | ||
} | ||
|
||
ClientConfiguration config = this.getConfig(deviceId); | ||
if (config == null) | ||
{ | ||
log.debug("Operation has not timed out since the device it was associated with has been unregistered already."); | ||
return false; | ||
} | ||
|
||
return (System.currentTimeMillis() - startTime) > config.getOperationTimeout(); | ||
} | ||
|
||
/** | ||
* Adds the packet to the callback queue if the provided packet has a callback. The packet is ignored otherwise. | ||
* | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't start/stop this thread when one device in a multiplexed connection is lost. Only start/stop it when the whole connection is opened/closed