Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix some threading issues with correlation callback maps #1719

Merged
merged 7 commits into from
May 31, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ public class IotHubTransport implements IotHubListener
// Used to store the number of milliseconds since epoch that this packet was created for a correlationId
private final Map<String, Long> correlationStartTimeMillis = new ConcurrentHashMap<>();

// A job that runs periodically to remove any stale correlation callbacks
private Thread correlationCallbackCleanupThread = new Thread(() -> checkForOldMessages());
private static final int CORRELATION_CALLBACK_CLEANUP_PERIOD_MILLISECONDS = 60 * 60 * 1000;
timtay-microsoft marked this conversation as resolved.
Show resolved Hide resolved
private final Object correlationCallbackOperationLock = new Object();

/**
* Constructor for an IotHubTransport object with default values
*
Expand Down Expand Up @@ -264,20 +269,22 @@ public void onMessageSent(Message message, String deviceId, TransportException e

if (!correlationId.isEmpty())
{
CorrelatingMessageCallback callback = correlationCallbacks.get(correlationId);

if (callback != null)
synchronized (this.correlationCallbackOperationLock)
{
Object context = correlationCallbackContexts.get(correlationId);
IotHubClientException clientException = null;
if (e != null)
CorrelatingMessageCallback callback = correlationCallbacks.get(correlationId);

if (callback != null)
{
clientException = e.toIotHubClientException();
Object context = correlationCallbackContexts.get(correlationId);
IotHubClientException clientException = null;
if (e != null)
{
clientException = e.toIotHubClientException();
}
callback.onRequestAcknowledged(packet.getMessage(), context, clientException);
}
callback.onRequestAcknowledged(packet.getMessage(), context, clientException);
}
}

}
catch (Exception ex)
{
Expand Down Expand Up @@ -317,31 +324,34 @@ else if (message != null)
String correlationId = message.getCorrelationId();
if (!correlationId.isEmpty())
{
CorrelatingMessageCallback callback = correlationCallbacks.get(correlationId);

if (callback != null)
synchronized (this.correlationCallbackOperationLock)
{
Object context = correlationCallbackContexts.get(correlationId);
IotHubClientException clientException = null;
if (e != null)
{
// This case indicates that the transport layer failed to construct a valid message out of
// a message delivered by the service
clientException = e.toIotHubClientException();
}
else
CorrelatingMessageCallback callback = correlationCallbacks.get(correlationId);

if (callback != null)
{
// This case indicates that the transport layer constructed a valid message out of a message
// delivered by the service, but that message may contain an unsuccessful status code in cases
// such as if an operation was rejected because it was badly formatted.
IotHubStatusCode statusCode = IotHubStatusCode.getIotHubStatusCode(Integer.parseInt(message.getStatus()));
if (!IotHubStatusCode.isSuccessful(statusCode))
Object context = correlationCallbackContexts.get(correlationId);
IotHubClientException clientException = null;
if (e != null)
{
clientException = new IotHubClientException(statusCode, "Received an unsuccessful operation error code from the service: " + statusCode);
// This case indicates that the transport layer failed to construct a valid message out of
// a message delivered by the service
clientException = e.toIotHubClientException();
}
else
{
// This case indicates that the transport layer constructed a valid message out of a message
// delivered by the service, but that message may contain an unsuccessful status code in cases
// such as if an operation was rejected because it was badly formatted.
IotHubStatusCode statusCode = IotHubStatusCode.getIotHubStatusCode(Integer.parseInt(message.getStatus()));
if (!IotHubStatusCode.isSuccessful(statusCode))
{
clientException = new IotHubClientException(statusCode, "Received an unsuccessful operation error code from the service: " + statusCode);
}
}
}

callback.onResponseReceived(message, context, clientException);
callback.onResponseReceived(message, context, clientException);
}
}
}
}
Expand Down Expand Up @@ -506,6 +516,8 @@ public void open(boolean withRetry) throws TransportException, IotHubClientExcep
openConnection();
}

correlationCallbackCleanupThread.start();
timtay-microsoft marked this conversation as resolved.
Show resolved Hide resolved

log.debug("Client connection opened successfully");
}

Expand Down Expand Up @@ -551,6 +563,8 @@ public void close(IotHubConnectionStatusChangeReason reason, Throwable cause)
}
finally
{
correlationCallbackCleanupThread.interrupt();
timtay-microsoft marked this conversation as resolved.
Show resolved Hide resolved

this.updateStatus(IotHubConnectionStatus.DISCONNECTED, reason, cause);

// Notify send thread to finish up so it doesn't survive this close
Expand Down Expand Up @@ -731,7 +745,6 @@ public IotHubClientProtocol getProtocol()
public void sendMessages()
{
checkForExpiredMessages();
new Thread(() -> checkForOldMessages()).start();
timtay-microsoft marked this conversation as resolved.
Show resolved Hide resolved

if (this.connectionStatus == IotHubConnectionStatus.DISCONNECTED
|| this.connectionStatus == IotHubConnectionStatus.DISCONNECTED_RETRYING)
Expand Down Expand Up @@ -760,12 +773,15 @@ public void sendMessages()

if (!correlationId.isEmpty())
{
CorrelatingMessageCallback callback = correlationCallbacks.get(correlationId);

if (callback != null)
synchronized (this.correlationCallbackOperationLock)
{
Object context = correlationCallbackContexts.get(correlationId);
callback.onRequestSent(message, context);
CorrelatingMessageCallback callback = correlationCallbacks.get(correlationId);

if (callback != null)
{
Object context = correlationCallbackContexts.get(correlationId);
callback.onRequestSent(message, context);
}
}
}
}
Expand Down Expand Up @@ -844,21 +860,37 @@ private void checkForExpiredMessages()
// the size of map will grow endlessly which results in OutOfMemory eventually.
private void checkForOldMessages()
{
List<String> correlationIdsToRemove = new ArrayList<>();

for (String correlationId : correlationCallbacks.keySet())
try
{
if (System.currentTimeMillis() - correlationStartTimeMillis.get(correlationId) >= DEFAULT_CORRELATION_ID_LIVE_TIME)
while (true)
{
correlationIdsToRemove.add(correlationId);
correlationCallbackContexts.remove(correlationId);
correlationStartTimeMillis.remove(correlationId);
Thread.sleep(CORRELATION_CALLBACK_CLEANUP_PERIOD_MILLISECONDS);

List<String> correlationIdsToRemove = new ArrayList<>();

synchronized (this.correlationCallbackOperationLock)
{
for (String correlationId : correlationCallbacks.keySet())
{
if (System.currentTimeMillis() - correlationStartTimeMillis.get(correlationId) >= DEFAULT_CORRELATION_ID_LIVE_TIME)
{
correlationIdsToRemove.add(correlationId);
correlationCallbackContexts.remove(correlationId);
correlationStartTimeMillis.remove(correlationId);
}
}

for (String correlationId : correlationIdsToRemove)
{
correlationCallbacks.remove(correlationId);
}
}
}
}

for (String correlationId : correlationIdsToRemove)
catch (InterruptedException e)
{
correlationCallbacks.remove(correlationId);
// The exception can be ignored since this thread is interrupted when the client is closing.
// Once interrupted, simply end this thread.
}
}

Expand Down Expand Up @@ -1196,22 +1228,22 @@ private void acknowledgeReceivedMessage(IotHubTransportMessage receivedMessage)
String correlationId = receivedMessage.getCorrelationId();
if (!correlationId.isEmpty())
{
CorrelatingMessageCallback callback = correlationCallbacks.get(correlationId);

if (callback != null)
synchronized (this.correlationCallbackOperationLock)
{
Object context = correlationCallbackContexts.get(correlationId);
callback.onResponseAcknowledged(receivedMessage, context);
}
CorrelatingMessageCallback callback = correlationCallbacks.get(correlationId);

// We need to remove the CorrelatingMessageCallback with the current correlation ID from the map after the received C2D
// message has been acknowledged. Otherwise, the size of map will grow endlessly which results in OutOfMemory eventually.
new Thread(() ->
{
if (callback != null)
{
Object context = correlationCallbackContexts.get(correlationId);
callback.onResponseAcknowledged(receivedMessage, context);
}

// We need to remove the CorrelatingMessageCallback with the current correlation ID from the map after the received C2D
// message has been acknowledged. Otherwise, the size of map will grow endlessly which results in OutOfMemory eventually.
correlationCallbacks.remove(correlationId);
correlationCallbackContexts.remove(correlationId);
correlationStartTimeMillis.remove(correlationId);
}).start();
}
}
}
catch (Exception ex)
Expand Down Expand Up @@ -1248,12 +1280,15 @@ private void addReceivedMessagesOverHttpToReceivedQueue() throws TransportExcept
String correlationId = transportMessage.getCorrelationId();
if (!correlationId.isEmpty())
{
CorrelatingMessageCallback callback = correlationCallbacks.get(correlationId);

if (callback != null)
synchronized (this.correlationCallbackOperationLock)
{
Object context = correlationCallbackContexts.get(correlationId);
callback.onResponseReceived(transportMessage, context, null);
CorrelatingMessageCallback callback = correlationCallbacks.get(correlationId);

if (callback != null)
{
Object context = correlationCallbackContexts.get(correlationId);
callback.onResponseReceived(transportMessage, context, null);
}
}
}
}
Expand Down Expand Up @@ -1853,15 +1888,18 @@ private void addToWaitingQueue(IotHubTransportPacket packet)
CorrelatingMessageCallback correlationCallback = message.getCorrelatingMessageCallback();
if (!correlationId.isEmpty() && correlationCallback != null)
{
correlationCallbacks.put(correlationId, correlationCallback);
correlationStartTimeMillis.put(correlationId, System.currentTimeMillis());

Object correlationCallbackContext = message.getCorrelatingMessageCallbackContext();
if (correlationCallbackContext != null)
synchronized (this.correlationCallbackOperationLock)
{
correlationCallbackContexts.put(correlationId, correlationCallbackContext);
correlationCallbacks.put(correlationId, correlationCallback);
correlationStartTimeMillis.put(correlationId, System.currentTimeMillis());

Object correlationCallbackContext = message.getCorrelatingMessageCallbackContext();
if (correlationCallbackContext != null)
{
correlationCallbackContexts.put(correlationId, correlationCallbackContext);
}
correlationCallback.onRequestQueued(message, correlationCallbackContext);
}
correlationCallback.onRequestQueued(message, correlationCallbackContext);
}
}
}
Expand Down