Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iot-dev): Add configurable message expiration check job #1762

Merged
merged 2 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public final class ClientConfiguration

private static final long MAX_SAS_TOKEN_EXPIRY_TIME_SECONDS = 10 * 365 * 24 * 60 * 60; //10 years

private static final long DEFAULT_MESSAGE_EXPIRATION_CHECK_PERIOD = 10000;

private boolean useWebsocket;

@Getter
Expand Down Expand Up @@ -91,6 +93,9 @@ public final class ClientConfiguration
@Getter
private String threadNamePrefix = null;

@Getter
private long messageExpiredCheckPeriod;

private boolean useIdentifiableThreadNames = true;

private boolean logRoutineDisconnectsAsErrors = true;
Expand Down Expand Up @@ -229,6 +234,7 @@ private void setClientOptionValues(ClientOptions clientOptions)
this.threadNameSuffix = clientOptions != null ? clientOptions.getThreadNameSuffix() : null;
this.useIdentifiableThreadNames = clientOptions == null || clientOptions.isUsingIdentifiableThreadNames();
this.logRoutineDisconnectsAsErrors = clientOptions == null || clientOptions.isLoggingRoutineDisconnectsAsErrors();
this.messageExpiredCheckPeriod = clientOptions != null ? clientOptions.getMessageExpirationCheckPeriod() : DEFAULT_MESSAGE_EXPIRATION_CHECK_PERIOD;

if (proxySettings != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,28 @@ public final class ClientOptions
@Builder.Default
private final boolean logRoutineDisconnectsAsErrors = true;

/**
* The period (in milliseconds) for how often the client will check queued and in-flight messages for expiry.
*
* Higher values mean that messages will be checked for expiry less often but this client will use less CPU. Higher
* values also mean that messages may not execute their callback with MESSAGE_EXPIRED close to the expected
* expiry time.
*
* Lower values mean that messages will be checked for expiry more often but this client will use more CPU. Lower
* values also mean that messages will execute their callback with MESSAGE_EXPIRED closer to the expected
* expiry time.
*
* By default, this value is 10 seconds.
*
* If set to 0, message expiry will never be checked.
*
* If this client will be used in a multiplexed connection, this value is ignored in favor of the same setting in
* {@link MultiplexingClientOptions}.
*/
@Getter
@Builder.Default
private final long messageExpirationCheckPeriod = 10000;

public boolean isUsingIdentifiableThreadNames()
{
// Using a manually written method here to override the name that Lombok would have given it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,23 @@ final class DeviceIO implements IotHubConnectionStatusChangeCallback
int sendInterval,
boolean useIdentifiableThreadNames,
String threadNamePrefix,
String threadNameSuffix)
String threadNameSuffix,
long messageExpirationCheckPeriod)
{
this.state = IotHubConnectionStatus.DISCONNECTED;
this.transport = new IotHubTransport(hostName, protocol, sslContext, proxySettings, this, keepAliveInterval, sendInterval, useIdentifiableThreadNames, threadNamePrefix, threadNameSuffix);
this.transport = new IotHubTransport(
hostName,
protocol,
sslContext,
proxySettings,
this,
keepAliveInterval,
sendInterval,
useIdentifiableThreadNames,
threadNamePrefix,
threadNameSuffix,
messageExpirationCheckPeriod);

this.sendTask = new IotHubSendTask(this.transport, useIdentifiableThreadNames, threadNamePrefix, threadNameSuffix);
this.receiveTask = new IotHubReceiveTask(this.transport, useIdentifiableThreadNames, threadNamePrefix, threadNameSuffix);
this.reconnectTask = new IotHubReconnectTask(this.transport, useIdentifiableThreadNames, threadNamePrefix, threadNameSuffix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class MultiplexingClient
static final int DEFAULT_MAX_MESSAGES_TO_SEND_PER_THREAD = 10;
private static final long DEFAULT_REGISTRATION_TIMEOUT_MILLISECONDS = 60 * 1000; // 1 minute
private static final long DEFAULT_UNREGISTRATION_TIMEOUT_MILLISECONDS = 60 * 1000; // 1 minute
private static final long DEFAULT_MESSAGE_EXPIRATION_CHECK_PERIOD = 10000;

// keys are deviceIds. Helps to optimize look ups later on which device Ids are already registered.
private final Map<String, DeviceClient> multiplexedDeviceClients;
Expand Down Expand Up @@ -104,6 +105,7 @@ public MultiplexingClient(String hostName, IotHubClientProtocol protocol, Multip
String threadNamePrefix = options != null ? options.getThreadNamePrefix() : null;
String threadNameSuffix = options != null ? options.getThreadNameSuffix() : null;
boolean useIdentifiableThreadNames = options == null || options.isUsingIdentifiableThreadNames();
long messageExpiredCheckPeriod = options != null ? options.getMessageExpirationCheckPeriod() : DEFAULT_MESSAGE_EXPIRATION_CHECK_PERIOD;

if (sendPeriod < 0)
{
Expand All @@ -130,7 +132,18 @@ else if (receivePeriod == 0) //default builder value for this option, signals th

// Optional settings from MultiplexingClientOptions
SSLContext sslContext = options != null ? options.getSslContext() : null;
this.deviceIO = new DeviceIO(hostName, protocol, sslContext, proxySettings, keepAliveInterval, sendInterval, useIdentifiableThreadNames, threadNamePrefix, threadNameSuffix);
this.deviceIO = new DeviceIO(
hostName,
protocol,
sslContext,
proxySettings,
keepAliveInterval,
sendInterval,
useIdentifiableThreadNames,
threadNamePrefix,
threadNameSuffix,
messageExpiredCheckPeriod);

this.deviceIO.setMaxNumberOfMessagesSentPerSendThread(sendMessagesPerThread);
this.deviceIO.setSendPeriodInMilliseconds(sendPeriod);
this.deviceIO.setReceivePeriodInMilliseconds(receivePeriod);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,25 @@ public final class MultiplexingClientOptions
@Builder.Default
private final boolean useIdentifiableThreadNames = true;

/**
* The period (in milliseconds) for how often the client will check queued and in-flight messages for expiry.
*
* Higher values mean that messages will be checked for expiry less often but this client will use less CPU. Higher
* values also mean that messages may not execute their callback with MESSAGE_EXPIRED close to the expected
* expiry time.
*
* Lower values mean that messages will be checked for expiry more often but this client will use more CPU. Lower
* values also mean that messages will execute their callback with MESSAGE_EXPIRED closer to the expected
* expiry time.
*
* By default, this value is 10 seconds.
*
* If set to 0, message expiry will never be checked.
*/
@Getter
@Builder.Default
private final long messageExpirationCheckPeriod = 10000;

public boolean isUsingIdentifiableThreadNames()
{
// Using a manually written method here to override the name that Lombok would have given it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ public class IotHubTransport implements IotHubListener
// should stop spawning send/receive threads when this layer is disconnected or disconnected retrying
private final IotHubConnectionStatusChangeCallback deviceIOConnectionStatusChangeCallback;

// Lock on reading and writing on the waitingPackets queue
final private Object waitingPacketsLock = new Object();

// Lock on reading and writing on the inProgressPackets map
final private Object inProgressMessagesLock = new Object();

Expand Down Expand Up @@ -130,6 +133,10 @@ public class IotHubTransport implements IotHubListener
private final Thread correlationCallbackCleanupThread = new Thread(() -> checkForOldMessages());
private static final int CORRELATION_CALLBACK_CLEANUP_PERIOD_MILLISECONDS = 60 * 60 * 1000;

// A job that runs periodically to remove any expired messages from the in progress and waiting queue
private final Thread expiredMessagesCleanupThread = new Thread(() -> checkForExpiredOutgoingMessages());
private final long messageExpirationCheckPeriod;

/**
* Constructor for an IotHubTransport object with default values
*
Expand Down Expand Up @@ -160,6 +167,7 @@ public IotHubTransport(ClientConfiguration defaultConfig, IotHubConnectionStatus
this.useIdentifiableThreadNames = defaultConfig.isUsingIdentifiableThreadNames();
this.threadNamePrefix = defaultConfig.getThreadNamePrefix();
this.threadNameSuffix = defaultConfig.getThreadNameSuffix();
this.messageExpirationCheckPeriod = defaultConfig.getMessageExpiredCheckPeriod();
}

public IotHubTransport(
Expand All @@ -172,7 +180,8 @@ public IotHubTransport(
int sendInterval,
boolean useIdentifiableThreadNames,
String threadNamePrefix,
String threadNameSuffix) throws IllegalArgumentException
String threadNameSuffix,
long messageExpirationCheckPeriod) throws IllegalArgumentException
{
this.protocol = protocol;
this.hostName = hostName;
Expand All @@ -187,6 +196,7 @@ public IotHubTransport(
this.useIdentifiableThreadNames = useIdentifiableThreadNames;
this.threadNamePrefix = threadNamePrefix;
this.threadNameSuffix = threadNameSuffix;
this.messageExpirationCheckPeriod = messageExpirationCheckPeriod;
}

public Semaphore getSendThreadSemaphore()
Expand Down Expand Up @@ -763,35 +773,38 @@ public void sendMessages()

int timeSlice = maxNumberOfMessagesToSendPerThread;

while (this.connectionStatus == IotHubConnectionStatus.CONNECTED && timeSlice-- > 0)
synchronized (this.waitingPacketsLock)
{
IotHubTransportPacket packet = waitingPacketsQueue.poll();

if (packet != null)
while (this.connectionStatus == IotHubConnectionStatus.CONNECTED && timeSlice-- > 0)
{
Message message = packet.getMessage();
log.trace("Dequeued a message from waiting queue to be sent ({})", message);
IotHubTransportPacket packet = waitingPacketsQueue.poll();

if (message != null && this.isMessageValid(packet))
if (packet != null)
{
sendPacket(packet);
Message message = packet.getMessage();
log.trace("Dequeued a message from waiting queue to be sent ({})", message);

try
if (message != null && this.isMessageValid(packet))
{
String correlationId = message.getCorrelationId();
sendPacket(packet);

if (!correlationId.isEmpty())
try
{
CorrelationCallbackContext callbackContext = correlationCallbacks.get(correlationId);
if (callbackContext != null && callbackContext.getCallback() != null)
String correlationId = message.getCorrelationId();

if (!correlationId.isEmpty())
{
callbackContext.getCallback().onRequestSent(message, callbackContext.getUserContext());
CorrelationCallbackContext callbackContext = correlationCallbacks.get(correlationId);
if (callbackContext != null && callbackContext.getCallback() != null)
{
callbackContext.getCallback().onRequestSent(message, callbackContext.getUserContext());
}
}
}
}
catch (Exception e)
{
log.warn("Exception thrown while calling the onRequestSent callback in sendMessages", e);
catch (Exception e)
{
log.warn("Exception thrown while calling the onRequestSent callback in sendMessages", e);
}
}
}
}
Expand All @@ -816,28 +829,31 @@ String getDeviceClientUniqueIdentifier()

private void checkForExpiredMessages()
{
//Check waiting packets, remove any that have expired.
IotHubTransportPacket packet = this.waitingPacketsQueue.poll();
Queue<IotHubTransportPacket> packetsToAddBackIntoWaitingPacketsQueue = new LinkedBlockingQueue<>();
while (packet != null)
synchronized (this.waitingPacketsQueue)
{
if (packet.getMessage().isExpired())
//Check waiting packets, remove any that have expired.
IotHubTransportPacket packet = this.waitingPacketsQueue.poll();
Queue<IotHubTransportPacket> packetsToAddBackIntoWaitingPacketsQueue = new LinkedBlockingQueue<>();
while (packet != null)
{
packet.setStatus(IotHubStatusCode.MESSAGE_EXPIRED);
this.addToCallbackQueue(packet);
}
else
{
//message not expired, requeue it
packetsToAddBackIntoWaitingPacketsQueue.add(packet);
if (packet.getMessage().isExpired())
{
packet.setStatus(IotHubStatusCode.MESSAGE_EXPIRED);
this.addToCallbackQueue(packet);
}
else
{
//message not expired, requeue it
packetsToAddBackIntoWaitingPacketsQueue.add(packet);
}

packet = this.waitingPacketsQueue.poll();
}

packet = this.waitingPacketsQueue.poll();
//Requeue all the non-expired messages.
this.waitingPacketsQueue.addAll(packetsToAddBackIntoWaitingPacketsQueue);
}

//Requeue all the non-expired messages.
this.waitingPacketsQueue.addAll(packetsToAddBackIntoWaitingPacketsQueue);

//Check in progress messages
synchronized (this.inProgressMessagesLock)
{
Expand Down Expand Up @@ -894,6 +910,26 @@ private void checkForOldMessages()
}
}

private void checkForExpiredOutgoingMessages()
{
try
{
Thread.currentThread().setName("azure-iot-sdk-IotHubMessageExpiryCheckerTask");
while (true)
{
Thread.sleep(this.messageExpirationCheckPeriod);
checkForExpiredMessages();
invokeCallbacks();
}

}
catch (InterruptedException e)
{
// The exception can be ignored since this thread is interrupted when the client is closing.
// Once interrupted, simply end this thread.
}
}

/**
* Invokes the callbacks for all completed requests.
*/
Expand Down Expand Up @@ -1717,10 +1753,24 @@ else if (this.getDefaultConfig() != null
{
// Thread has already started. No need to report this exception
}

try
{
// 0 means that the user doesn't want to ever run this check
if (messageExpirationCheckPeriod != 0)
{
expiredMessagesCleanupThread.start();
}
}
catch (IllegalThreadStateException e)
{
// Thread has already started. No need to report this exception
}
}
else if (newConnectionStatus == IotHubConnectionStatus.DISCONNECTED)
{
correlationCallbackCleanupThread.interrupt();
expiredMessagesCleanupThread.interrupt();
}
}
}
Expand Down Expand Up @@ -1766,20 +1816,8 @@ private void updateStatus(IotHubConnectionStatus newConnectionStatus, IotHubConn
invokeConnectionStatusChangeCallback(newConnectionStatus, previousStatus, reason, throwable, deviceId);
}

if (newConnectionStatus == IotHubConnectionStatus.CONNECTED)
{
try
{
correlationCallbackCleanupThread.start();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't start/stop this thread when one device in a multiplexed connection is lost. Only start/stop it when the whole connection is opened/closed

}
catch (IllegalThreadStateException e)
{
// Thread has already started. No need to report this exception
}
}
else if (newConnectionStatus == IotHubConnectionStatus.DISCONNECTED)
if (newConnectionStatus == IotHubConnectionStatus.DISCONNECTED)
{
correlationCallbackCleanupThread.interrupt();
finalizeMultiplexedDevicesMessages(deviceId);
}
}
Expand Down Expand Up @@ -1913,28 +1951,6 @@ private boolean hasOperationTimedOut(long startTime)
return (System.currentTimeMillis() - startTime) > this.getDefaultConfig().getOperationTimeout();
}

/**
* Returns if the provided packet has lasted longer than the device operation timeout
*
* @return true if the packet has been in the queues for longer than the device operation timeout and false otherwise
*/
private boolean hasOperationTimedOut(long startTime, String deviceId)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused method

{
if (startTime == 0)
{
return false;
}

ClientConfiguration config = this.getConfig(deviceId);
if (config == null)
{
log.debug("Operation has not timed out since the device it was associated with has been unregistered already.");
return false;
}

return (System.currentTimeMillis() - startTime) > config.getOperationTimeout();
}

/**
* Adds the packet to the callback queue if the provided packet has a callback. The packet is ignored otherwise.
*
Expand Down