diff --git a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/ClientConfiguration.java b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/ClientConfiguration.java index 2d9ec70aca..41a53726d2 100644 --- a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/ClientConfiguration.java +++ b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/ClientConfiguration.java @@ -11,10 +11,7 @@ import com.microsoft.azure.sdk.iot.provisioning.security.SecurityProviderSymmetricKey; import com.microsoft.azure.sdk.iot.provisioning.security.SecurityProviderTpm; import com.microsoft.azure.sdk.iot.provisioning.security.SecurityProviderX509; -import lombok.AccessLevel; -import lombok.Getter; -import lombok.NonNull; -import lombok.Setter; +import lombok.*; import lombok.extern.slf4j.Slf4j; import javax.net.ssl.SSLContext; @@ -86,6 +83,14 @@ public final class ClientConfiguration @Setter(AccessLevel.PACKAGE) private int sendInterval = DEFAULT_SEND_INTERVAL_IN_MILLISECONDS; + @Getter + private String threadNameSuffix = null; + + @Getter + private String threadNamePrefix = null; + + private boolean useIdentifiableThreadNames = true; + private IotHubAuthenticationProvider authenticationProvider; /** @@ -215,6 +220,9 @@ private void setClientOptionValues(ClientOptions clientOptions) this.amqpOpenDeviceSessionsTimeout = clientOptions != null && clientOptions.getAmqpDeviceSessionTimeout() != 0 ? clientOptions.getAmqpDeviceSessionTimeout() : DEFAULT_AMQP_OPEN_DEVICE_SESSIONS_TIMEOUT_IN_SECONDS; this.proxySettings = clientOptions != null && clientOptions.getProxySettings() != null ? clientOptions.getProxySettings() : null; this.sendInterval = clientOptions != null && clientOptions.getSendInterval() != 0 ? clientOptions.getSendInterval() : DEFAULT_SEND_INTERVAL_IN_MILLISECONDS; + this.threadNamePrefix = clientOptions != null ? clientOptions.getThreadNamePrefix() : null; + this.threadNameSuffix = clientOptions != null ? clientOptions.getThreadNameSuffix() : null; + this.useIdentifiableThreadNames = clientOptions == null || clientOptions.isUsingIdentifiableThreadNames(); if (proxySettings != null) { @@ -632,6 +640,12 @@ public AuthType getAuthenticationType() } } + public boolean isUsingIdentifiableThreadNames() + { + // Using a manually written method here to override the name that Lombok would have given it + return this.useIdentifiableThreadNames; + } + /** * Sets the device operation timeout * @param timeout the amount of time, in milliseconds, that a given device operation can last before expiring diff --git a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/ClientOptions.java b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/ClientOptions.java index 411e251e21..58f4defc60 100644 --- a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/ClientOptions.java +++ b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/ClientOptions.java @@ -136,4 +136,41 @@ public final class ClientOptions @Getter @Builder.Default private final int receiveInterval = RECEIVE_PERIOD_MILLIS; + + /** + * The prefix that will be applied to the names of all threads created by this client. If + * {@link #useIdentifiableThreadNames} is set to true, then this value is ignored and this client will create the + * prefix for you. + */ + @Getter + @Builder.Default + private final String threadNamePrefix = null; + + /** + * The suffix that will be applied to the names of all threads created by this client. If + * {@link #useIdentifiableThreadNames} is set to true, then this value is ignored and this client will create the + * suffix for you. + */ + @Getter + @Builder.Default + private final String threadNameSuffix = null; + + /** + * If true, all threads created by this client will use names that are unique. This is useful in applications that manage + * multiple device/module clients and want to be able to correlate logs to a particular client. In addition, + * the {@link #threadNamePrefix} and {@link #threadNameSuffix} values will be ignored. + * + * If false, all threads created by this client will use simple names that describe the thread's purpose, but are + * indistinguishable from the same threads created by a different client instance. However, users may still alter + * these thread names by providing values for the {@link #threadNamePrefix} and {@link #threadNameSuffix}. + */ + @Builder.Default + private final boolean useIdentifiableThreadNames = true; + + + public boolean isUsingIdentifiableThreadNames() + { + // Using a manually written method here to override the name that Lombok would have given it + return this.useIdentifiableThreadNames; + } } diff --git a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/DeviceIO.java b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/DeviceIO.java index 1b76cb9d2b..2d08fb11fc 100644 --- a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/DeviceIO.java +++ b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/DeviceIO.java @@ -65,9 +65,9 @@ final class DeviceIO implements IotHubConnectionStatusChangeCallback this.state = IotHubConnectionStatus.DISCONNECTED; - this.sendTask = new IotHubSendTask(this.transport); - this.receiveTask = new IotHubReceiveTask(this.transport); - this.reconnectTask = new IotHubReconnectTask(this.transport); + this.sendTask = new IotHubSendTask(this.transport, config.isUsingIdentifiableThreadNames(), config.getThreadNamePrefix(), config.getThreadNameSuffix()); + this.receiveTask = new IotHubReceiveTask(this.transport, config.isUsingIdentifiableThreadNames(), config.getThreadNamePrefix(), config.getThreadNameSuffix()); + this.reconnectTask = new IotHubReconnectTask(this.transport, config.isUsingIdentifiableThreadNames(), config.getThreadNamePrefix(), config.getThreadNameSuffix()); } DeviceIO( @@ -76,13 +76,16 @@ final class DeviceIO implements IotHubConnectionStatusChangeCallback SSLContext sslContext, ProxySettings proxySettings, int keepAliveInterval, - int sendInterval) + int sendInterval, + boolean useIdentifiableThreadNames, + String threadNamePrefix, + String threadNameSuffix) { this.state = IotHubConnectionStatus.DISCONNECTED; - this.transport = new IotHubTransport(hostName, protocol, sslContext, proxySettings, this, keepAliveInterval, sendInterval); - this.sendTask = new IotHubSendTask(this.transport); - this.receiveTask = new IotHubReceiveTask(this.transport); - this.reconnectTask = new IotHubReconnectTask(this.transport); + this.transport = new IotHubTransport(hostName, protocol, sslContext, proxySettings, this, keepAliveInterval, sendInterval, useIdentifiableThreadNames, threadNamePrefix, threadNameSuffix); + this.sendTask = new IotHubSendTask(this.transport, useIdentifiableThreadNames, threadNamePrefix, threadNameSuffix); + this.receiveTask = new IotHubReceiveTask(this.transport, useIdentifiableThreadNames, threadNamePrefix, threadNameSuffix); + this.reconnectTask = new IotHubReconnectTask(this.transport, useIdentifiableThreadNames, threadNamePrefix, threadNameSuffix); } /** diff --git a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/MultiplexingClient.java b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/MultiplexingClient.java index 53190ed608..5f0b08d7d4 100644 --- a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/MultiplexingClient.java +++ b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/MultiplexingClient.java @@ -101,6 +101,9 @@ public MultiplexingClient(String hostName, IotHubClientProtocol protocol, Multip int sendMessagesPerThread = options != null ? options.getMaxMessagesSentPerSendInterval() : DEFAULT_MAX_MESSAGES_TO_SEND_PER_THREAD; int keepAliveInterval = options != null ? options.getKeepAliveInterval() : DEFAULT_KEEP_ALIVE_INTERVAL_IN_SECONDS; int sendInterval = (int) (options != null ? options.getSendInterval() : DEFAULT_SEND_INTERVAL_IN_MILLISECONDS); + String threadNamePrefix = options != null ? options.getThreadNamePrefix() : null; + String threadNameSuffix = options != null ? options.getThreadNameSuffix() : null; + boolean useIdentifiableThreadNames = options == null || options.isUsingIdentifiableThreadNames(); if (sendPeriod < 0) { @@ -127,7 +130,7 @@ else if (receivePeriod == 0) //default builder value for this option, signals th // Optional settings from MultiplexingClientOptions SSLContext sslContext = options != null ? options.getSslContext() : null; - this.deviceIO = new DeviceIO(hostName, protocol, sslContext, proxySettings, keepAliveInterval, sendInterval); + this.deviceIO = new DeviceIO(hostName, protocol, sslContext, proxySettings, keepAliveInterval, sendInterval, useIdentifiableThreadNames, threadNamePrefix, threadNameSuffix); this.deviceIO.setMaxNumberOfMessagesSentPerSendThread(sendMessagesPerThread); this.deviceIO.setSendPeriodInMilliseconds(sendPeriod); this.deviceIO.setReceivePeriodInMilliseconds(receivePeriod); diff --git a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/MultiplexingClientOptions.java b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/MultiplexingClientOptions.java index ca7273d591..ff5f449722 100644 --- a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/MultiplexingClientOptions.java +++ b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/MultiplexingClientOptions.java @@ -67,4 +67,40 @@ public final class MultiplexingClientOptions @Getter @Builder.Default public final int keepAliveInterval = DEFAULT_KEEP_ALIVE_INTERVAL_IN_SECONDS; + + /** + * The prefix that will be applied to the names of all threads created by this client. If + * {@link #useIdentifiableThreadNames} is set to true, then this value is ignored and this client will create the + * prefix for you. + */ + @Getter + @Builder.Default + private final String threadNamePrefix = null; + + /** + * The suffix that will be applied to the names of all threads created by this client. If + * {@link #useIdentifiableThreadNames} is set to true, then this value is ignored and this client will create the + * suffix for you. + */ + @Getter + @Builder.Default + private final String threadNameSuffix = null; + + /** + * If true, all threads created by this client will use names that are unique. This is useful in applications that manage + * multiple device/module clients and want to be able to correlate logs to a particular client. In addition, + * the {@link #threadNamePrefix} and {@link #threadNameSuffix} values will be ignored. + * + * If false, all threads created by this client will use simple names that describe the thread's purpose, but are + * indistinguishable from the same threads created by a different client instance. However, users may still alter + * these thread names by providing values for the {@link #threadNamePrefix} and {@link #threadNameSuffix}. + */ + @Builder.Default + private final boolean useIdentifiableThreadNames = true; + + public boolean isUsingIdentifiableThreadNames() + { + // Using a manually written method here to override the name that Lombok would have given it + return this.useIdentifiableThreadNames; + } } diff --git a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubReceiveTask.java b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubReceiveTask.java index 11890a7d1f..6bca1a5ced 100644 --- a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubReceiveTask.java +++ b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubReceiveTask.java @@ -17,6 +17,9 @@ public final class IotHubReceiveTask implements Runnable { private static final String THREAD_NAME = "azure-iot-sdk-IotHubReceiveTask"; private final IotHubTransport transport; + private final String threadNamePrefix; + private final String threadNameSuffix; + private final boolean useIdentifiableThreadNames; // This lock is used to communicate state between this thread and the IoTHubTransport layer. This thread will // wait until a message has been received in that layer before continuing. This means that if the transport layer @@ -25,7 +28,7 @@ public final class IotHubReceiveTask implements Runnable // layer's responsibility to notify this thread when a message has been received so that this thread can handle it. private final Semaphore receiveThreadSemaphore; - public IotHubReceiveTask(IotHubTransport transport) + public IotHubReceiveTask(IotHubTransport transport, boolean useIdentifiableThreadNames, String threadNamePrefix, String threadNameSuffix) { if (transport == null) { @@ -34,13 +37,35 @@ public IotHubReceiveTask(IotHubTransport transport) this.transport = transport; this.receiveThreadSemaphore = this.transport.getReceiveThreadSemaphore(); + this.useIdentifiableThreadNames = useIdentifiableThreadNames; + this.threadNamePrefix = threadNamePrefix; + this.threadNameSuffix = threadNameSuffix; } public void run() { - String deviceClientId = this.transport.getDeviceClientUniqueIdentifier(); - String connectionId = transport.getTransportConnectionId(); - String threadName = deviceClientId + "-" + "Cxn" + connectionId + "-" + THREAD_NAME; + String threadName = ""; + if (this.useIdentifiableThreadNames) + { + String deviceClientId = this.transport.getDeviceClientUniqueIdentifier(); + String connectionId = transport.getTransportConnectionId(); + threadName += deviceClientId + "-" + "Cxn" + connectionId + "-" + THREAD_NAME; + } + else + { + if (this.threadNamePrefix != null && !this.threadNamePrefix.isEmpty()) + { + threadName += this.threadNamePrefix; + } + + threadName += THREAD_NAME; + + if (this.threadNameSuffix != null && !this.threadNameSuffix.isEmpty()) + { + threadName += this.threadNameSuffix; + } + } + Thread.currentThread().setName(threadName); try diff --git a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubReconnectTask.java b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubReconnectTask.java index e70d1690e3..6a5c834743 100644 --- a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubReconnectTask.java +++ b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubReconnectTask.java @@ -15,13 +15,16 @@ public final class IotHubReconnectTask implements Runnable { private static final String THREAD_NAME = "azure-iot-sdk-IotHubReconnectTask"; private final IotHubTransport transport; + private final String threadNamePrefix; + private final String threadNameSuffix; + private final boolean useIdentifiableThreadNames; // This lock is used to communicate state between this thread and the IoTHubTransport layer. This thread will // wait until a disconnection event occurs in that layer before continuing. This means that if the transport layer // has no connectivity problems, then this thread will do nothing and cost nothing. private final Semaphore reconnectThreadSemaphore; - public IotHubReconnectTask(IotHubTransport transport) + public IotHubReconnectTask(IotHubTransport transport, boolean useIdentifiableThreadNames, String threadNamePrefix, String threadNameSuffix) { if (transport == null) { @@ -30,13 +33,35 @@ public IotHubReconnectTask(IotHubTransport transport) this.transport = transport; this.reconnectThreadSemaphore = this.transport.getReconnectThreadSemaphore(); + this.useIdentifiableThreadNames = useIdentifiableThreadNames; + this.threadNamePrefix = threadNamePrefix; + this.threadNameSuffix = threadNameSuffix; } public void run() { - String deviceClientId = this.transport.getDeviceClientUniqueIdentifier(); - String connectionId = transport.getTransportConnectionId(); - String threadName = deviceClientId + "-" + "Cxn" + connectionId + "-" + THREAD_NAME; + String threadName = ""; + if (this.useIdentifiableThreadNames) + { + String deviceClientId = this.transport.getDeviceClientUniqueIdentifier(); + String connectionId = transport.getTransportConnectionId(); + threadName += deviceClientId + "-" + "Cxn" + connectionId + "-" + THREAD_NAME; + } + else + { + if (this.threadNamePrefix != null && !this.threadNamePrefix.isEmpty()) + { + threadName += this.threadNamePrefix; + } + + threadName += THREAD_NAME; + + if (this.threadNameSuffix != null && !this.threadNameSuffix.isEmpty()) + { + threadName += this.threadNameSuffix; + } + } + Thread.currentThread().setName(threadName); try diff --git a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubSendTask.java b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubSendTask.java index dd272eb181..af04f875fb 100644 --- a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubSendTask.java +++ b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubSendTask.java @@ -16,6 +16,9 @@ public final class IotHubSendTask implements Runnable { private static final String THREAD_NAME = "azure-iot-sdk-IotHubSendTask"; private final IotHubTransport transport; + private final String threadNamePrefix; + private final String threadNameSuffix; + private final boolean useIdentifiableThreadNames; // This lock is used to communicate state between this thread and the IoTHubTransport layer. This thread will // wait until a message or callback is queued in that layer before continuing. This means that if the transport layer @@ -25,7 +28,7 @@ public final class IotHubSendTask implements Runnable // so that this thread can handle it. private final Semaphore sendThreadSemaphore; - public IotHubSendTask(IotHubTransport transport) + public IotHubSendTask(IotHubTransport transport, boolean useIdentifiableThreadNames, String threadNamePrefix, String threadNameSuffix) { if (transport == null) { @@ -34,13 +37,35 @@ public IotHubSendTask(IotHubTransport transport) this.transport = transport; this.sendThreadSemaphore = this.transport.getSendThreadSemaphore(); + this.useIdentifiableThreadNames = useIdentifiableThreadNames; + this.threadNamePrefix = threadNamePrefix; + this.threadNameSuffix = threadNameSuffix; } public void run() { - String deviceClientId = this.transport.getDeviceClientUniqueIdentifier(); - String connectionId = transport.getTransportConnectionId(); - String threadName = deviceClientId + "-" + "Cxn" + connectionId + "-" + THREAD_NAME; + String threadName = ""; + if (this.useIdentifiableThreadNames) + { + String deviceClientId = this.transport.getDeviceClientUniqueIdentifier(); + String connectionId = transport.getTransportConnectionId(); + threadName += deviceClientId + "-" + "Cxn" + connectionId + "-" + THREAD_NAME; + } + else + { + if (this.threadNamePrefix != null && !this.threadNamePrefix.isEmpty()) + { + threadName += this.threadNamePrefix; + } + + threadName += THREAD_NAME; + + if (this.threadNameSuffix != null && !this.threadNameSuffix.isEmpty()) + { + threadName += this.threadNameSuffix; + } + } + Thread.currentThread().setName(threadName); try diff --git a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubTransport.java b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubTransport.java index b348c0c3d2..85f172d2da 100644 --- a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubTransport.java +++ b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubTransport.java @@ -114,6 +114,10 @@ public class IotHubTransport implements IotHubListener private SSLContext sslContext; private final boolean isMultiplexing; + private final String threadNamePrefix; + private final String threadNameSuffix; + private final boolean useIdentifiableThreadNames; + // Flag set when close() starts. Acts as a signal to any running reconnection logic to not try again. private boolean isClosing; @@ -151,6 +155,9 @@ public IotHubTransport(ClientConfiguration defaultConfig, IotHubConnectionStatus this.deviceIOConnectionStatusChangeCallback = deviceIOConnectionStatusChangeCallback; this.keepAliveInterval = defaultConfig.getKeepAliveInterval(); this.sendInterval = defaultConfig.getSendInterval(); + this.useIdentifiableThreadNames = defaultConfig.isUsingIdentifiableThreadNames(); + this.threadNamePrefix = defaultConfig.getThreadNamePrefix(); + this.threadNameSuffix = defaultConfig.getThreadNameSuffix(); } public IotHubTransport( @@ -160,7 +167,10 @@ public IotHubTransport( ProxySettings proxySettings, IotHubConnectionStatusChangeCallback deviceIOConnectionStatusChangeCallback, int keepAliveInterval, - int sendInterval) throws IllegalArgumentException + int sendInterval, + boolean useIdentifiableThreadNames, + String threadNamePrefix, + String threadNameSuffix) throws IllegalArgumentException { this.protocol = protocol; this.hostName = hostName; @@ -171,6 +181,9 @@ public IotHubTransport( this.isMultiplexing = true; this.keepAliveInterval = keepAliveInterval; this.sendInterval = sendInterval; + this.useIdentifiableThreadNames = useIdentifiableThreadNames; + this.threadNamePrefix = threadNamePrefix; + this.threadNameSuffix = threadNameSuffix; } public Semaphore getSendThreadSemaphore() @@ -1329,7 +1342,10 @@ private void openConnection() throws TransportException this.sslContext, this.proxySettings, this.keepAliveInterval, - this.sendInterval); + this.sendInterval, + this.useIdentifiableThreadNames, + this.threadNamePrefix, + this.threadNameSuffix); for (ClientConfiguration config : this.deviceClientConfigs.values()) { diff --git a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/amqps/AmqpsIotHubConnection.java b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/amqps/AmqpsIotHubConnection.java index a4fa424278..64087e6e13 100644 --- a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/amqps/AmqpsIotHubConnection.java +++ b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/amqps/AmqpsIotHubConnection.java @@ -109,6 +109,10 @@ public final class AmqpsIotHubConnection extends BaseHandler implements IotHubTr private final Map queuedAcknowledgements = new ConcurrentHashMap<>(); + private final String threadNamePrefix; + private final String threadNameSuffix; + private final boolean useIdentifiableThreadNames; + public AmqpsIotHubConnection(ClientConfiguration config, String transportUniqueIdentifier) { // This allows us to create thread safe sets despite there being no such type default in Java 7 or 8 @@ -142,12 +146,15 @@ public AmqpsIotHubConnection(ClientConfiguration config, String transportUniqueI this.keepAliveInterval = config.getKeepAliveInterval(); this.sendInterval = clientConfiguration.getSendInterval(); + this.useIdentifiableThreadNames = clientConfiguration.isUsingIdentifiableThreadNames(); + this.threadNamePrefix = clientConfiguration.getThreadNamePrefix(); + this.threadNameSuffix = clientConfiguration.getThreadNameSuffix(); this.state = IotHubConnectionStatus.DISCONNECTED; log.trace("AmqpsIotHubConnection object is created successfully and will use port {}", this.isWebsocketConnection ? WEB_SOCKET_PORT : AMQP_PORT); } - public AmqpsIotHubConnection(String hostName, String transportUniqueIdentifier, boolean isWebsocketConnection, SSLContext sslContext, ProxySettings proxySettings, int keepAliveInterval, int sendInterval) + public AmqpsIotHubConnection(String hostName, String transportUniqueIdentifier, boolean isWebsocketConnection, SSLContext sslContext, ProxySettings proxySettings, int keepAliveInterval, int sendInterval, boolean useIdentifiableThreadNames, String threadNamePrefix, String threadNameSuffix) { // This allows us to create thread safe sets despite there being no such type default in Java 7 or 8 this.clientConfigurations = Collections.newSetFromMap(new ConcurrentHashMap<>()); @@ -173,6 +180,9 @@ public AmqpsIotHubConnection(String hostName, String transportUniqueIdentifier, this.keepAliveInterval = keepAliveInterval; this.sendInterval = sendInterval; + this.useIdentifiableThreadNames = useIdentifiableThreadNames; + this.threadNamePrefix = threadNamePrefix; + this.threadNameSuffix = threadNameSuffix; } public void registerMultiplexedDevice(ClientConfiguration config) @@ -1249,14 +1259,57 @@ private void openAsync() throws TransportException String runnerUniqueIdentifier = this.isMultiplexing ? "Multiplexed-" + this.transportUniqueIdentifier : this.clientConfiguration.getDeviceClientUniqueIdentifier(); - String reactorRunnerPrefix = this.hostName + "-" + runnerUniqueIdentifier + "-" + "Cnx" + this.connectionId; + + String threadName = ""; + if (this.isMultiplexing) + { + if (this.useIdentifiableThreadNames) + { + threadName += reactorRunnerPrefix + "-" + ReactorRunner.THREAD_NAME + "-ConnectionOwner"; + } + else + { + if (this.threadNamePrefix != null && !this.threadNamePrefix.isEmpty()) + { + threadName += this.threadNamePrefix; + } + + threadName += ReactorRunner.THREAD_NAME; + + if (this.threadNameSuffix != null && !this.threadNameSuffix.isEmpty()) + { + threadName += this.threadNameSuffix; + } + } + } + else + { + if (this.useIdentifiableThreadNames) + { + threadName += reactorRunnerPrefix + "-" + ReactorRunner.THREAD_NAME + "-ConnectionOwner"; + } + else + { + if (this.threadNamePrefix != null && !this.threadNamePrefix.isEmpty()) + { + threadName += this.threadNamePrefix; + } + + threadName += ReactorRunner.THREAD_NAME; + + if (this.threadNameSuffix != null && !this.threadNameSuffix.isEmpty()) + { + threadName += this.threadNameSuffix; + } + } + } + ReactorRunner reactorRunner = new ReactorRunner( this.reactor, this.listener, this.connectionId, - reactorRunnerPrefix, - "ConnectionOwner", + threadName, this); executorService.submit(reactorRunner); diff --git a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/amqps/ReactorRunner.java b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/amqps/ReactorRunner.java index 0e7f525d62..234ecfb950 100644 --- a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/amqps/ReactorRunner.java +++ b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/amqps/ReactorRunner.java @@ -12,28 +12,25 @@ public class ReactorRunner implements Callable { - private static final String THREAD_NAME = "azure-iot-sdk-ReactorRunner"; + static final String THREAD_NAME = "azure-iot-sdk-ReactorRunner"; private final Reactor reactor; private final IotHubListener listener; private final String connectionId; + private final String threadName; private final ReactorRunnerStateCallback reactorRunnerStateCallback; - private final String threadPostfix; - private final String threadPrefix; ReactorRunner( Reactor reactor, IotHubListener listener, String connectionId, - String threadPrefix, - String threadPostfix, + String threadName, ReactorRunnerStateCallback reactorRunnerStateCallback) { this.listener = listener; this.reactor = reactor; this.connectionId = connectionId; + this.threadName = threadName; this.reactorRunnerStateCallback = reactorRunnerStateCallback; - this.threadPrefix = threadPrefix; - this.threadPostfix = threadPostfix; } @Override @@ -41,8 +38,7 @@ public Object call() { try { - String threadName = this.threadPrefix + "-" + THREAD_NAME + "-" + this.threadPostfix; - Thread.currentThread().setName(threadName); + Thread.currentThread().setName(this.threadName); this.reactor.setTimeout(10); this.reactor.start(); diff --git a/iothub/device/iot-device-client/src/test/java/com/microsoft/azure/sdk/iot/device/transport/IotHubReceiveTaskTest.java b/iothub/device/iot-device-client/src/test/java/com/microsoft/azure/sdk/iot/device/transport/IotHubReceiveTaskTest.java index d917321337..2c9fdb1ee4 100644 --- a/iothub/device/iot-device-client/src/test/java/com/microsoft/azure/sdk/iot/device/transport/IotHubReceiveTaskTest.java +++ b/iothub/device/iot-device-client/src/test/java/com/microsoft/azure/sdk/iot/device/transport/IotHubReceiveTaskTest.java @@ -40,7 +40,7 @@ public void runReceivesAllMessages() throws IotHubClientException, TransportExce result = IotHubClientProtocol.AMQPS; } }; - IotHubReceiveTask receiveTask = new IotHubReceiveTask(mockTransport); + IotHubReceiveTask receiveTask = new IotHubReceiveTask(mockTransport, true, null, null); // act receiveTask.run(); @@ -64,7 +64,7 @@ public void runReceivesAllMessagesHTTP() } }; - IotHubReceiveTask receiveTask = new IotHubReceiveTask(mockTransport); + IotHubReceiveTask receiveTask = new IotHubReceiveTask(mockTransport, true, null, null); // act receiveTask.run(); @@ -90,7 +90,7 @@ public void runDoesNotCrashFromIoException() throws IOException, URISyntaxExcept } }; - IotHubReceiveTask receiveTask = new IotHubReceiveTask(mockTransport); + IotHubReceiveTask receiveTask = new IotHubReceiveTask(mockTransport, true, null, null); receiveTask.run(); } @@ -106,7 +106,7 @@ public void runDoesNotCrashFromThrowable() throws IOException, URISyntaxExceptio } }; - IotHubReceiveTask receiveTask = new IotHubReceiveTask(mockTransport); + IotHubReceiveTask receiveTask = new IotHubReceiveTask(mockTransport, true, null, null); receiveTask.run(); } } diff --git a/iothub/device/iot-device-client/src/test/java/com/microsoft/azure/sdk/iot/device/transport/IotHubSendTaskTest.java b/iothub/device/iot-device-client/src/test/java/com/microsoft/azure/sdk/iot/device/transport/IotHubSendTaskTest.java index 7c0ab7d7d1..f72b981b0b 100644 --- a/iothub/device/iot-device-client/src/test/java/com/microsoft/azure/sdk/iot/device/transport/IotHubSendTaskTest.java +++ b/iothub/device/iot-device-client/src/test/java/com/microsoft/azure/sdk/iot/device/transport/IotHubSendTaskTest.java @@ -35,7 +35,7 @@ public void runSendsAllMessages() } }; - IotHubSendTask sendTask = new IotHubSendTask(mockTransport); + IotHubSendTask sendTask = new IotHubSendTask(mockTransport, true, null, null); sendTask.run(); new Verifications() @@ -64,7 +64,7 @@ public void runInvokesAllCallbacks() } }; - IotHubSendTask sendTask = new IotHubSendTask(mockTransport); + IotHubSendTask sendTask = new IotHubSendTask(mockTransport, true, null, null); sendTask.run(); new Verifications() @@ -87,7 +87,7 @@ public void runDoesNotCrashFromIoException() } }; - IotHubSendTask sendTask = new IotHubSendTask(mockTransport); + IotHubSendTask sendTask = new IotHubSendTask(mockTransport, true, null, null); sendTask.run(); } @@ -103,7 +103,7 @@ public void runDoesNotCrashFromThrowable() } }; - IotHubSendTask sendTask = new IotHubSendTask(mockTransport); + IotHubSendTask sendTask = new IotHubSendTask(mockTransport, true, null, null); sendTask.run(); } } diff --git a/iothub/device/iot-device-client/src/test/java/com/microsoft/azure/sdk/iot/device/transport/amqps/AmqpsIotHubConnectionTest.java b/iothub/device/iot-device-client/src/test/java/com/microsoft/azure/sdk/iot/device/transport/amqps/AmqpsIotHubConnectionTest.java index 583008583d..ebb10ed43c 100644 --- a/iothub/device/iot-device-client/src/test/java/com/microsoft/azure/sdk/iot/device/transport/amqps/AmqpsIotHubConnectionTest.java +++ b/iothub/device/iot-device-client/src/test/java/com/microsoft/azure/sdk/iot/device/transport/amqps/AmqpsIotHubConnectionTest.java @@ -526,7 +526,7 @@ public void openThrowsIfProtonReactorThrows() throws TransportException new NonStrictExpectations() { { - new ReactorRunner((Reactor) any, (IotHubListener) any, anyString, anyString, anyString, (ReactorRunnerStateCallback) any); + new ReactorRunner((Reactor) any, (IotHubListener) any, anyString, anyString, (ReactorRunnerStateCallback) any); result = new IOException(); } }; @@ -572,7 +572,7 @@ public void openTriggersProtonReactor(@Mocked final Reactor mockedReactor) throw new Verifications() { { - new ReactorRunner((Reactor) any, (IotHubListener) any, anyString, anyString, anyString, (ReactorRunnerStateCallback) any); + new ReactorRunner((Reactor) any, (IotHubListener) any, anyString, anyString, (ReactorRunnerStateCallback) any); times = 1; } }; diff --git a/provisioning/provisioning-device-client/src/main/java/com/microsoft/azure/sdk/iot/provisioning/device/transport/amqp/AmqpsConnection.java b/provisioning/provisioning-device-client/src/main/java/com/microsoft/azure/sdk/iot/provisioning/device/transport/amqp/AmqpsConnection.java index e222049d18..bf95572532 100644 --- a/provisioning/provisioning-device-client/src/main/java/com/microsoft/azure/sdk/iot/provisioning/device/transport/amqp/AmqpsConnection.java +++ b/provisioning/provisioning-device-client/src/main/java/com/microsoft/azure/sdk/iot/provisioning/device/transport/amqp/AmqpsConnection.java @@ -541,20 +541,20 @@ private static class ReactorRunner implements Callable { private final static String THREAD_NAME = "azure-iot-sdk-ReactorRunner"; private final AmqpReactor amqpReactor; - private final String threadPostFix; + private final String threadSuffix; private final String threadPreFix; - ReactorRunner(AmqpReactor reactor, String threadPrefix, String threadPostFix) + ReactorRunner(AmqpReactor reactor, String threadPrefix, String threadSuffix) { this.amqpReactor = reactor; - this.threadPostFix = threadPostFix; + this.threadSuffix = threadSuffix; this.threadPreFix = threadPrefix; } @Override public Object call() { - String threadName = threadPreFix + "-" + THREAD_NAME + "-" + this.threadPostFix; + String threadName = threadPreFix + "-" + THREAD_NAME + "-" + this.threadSuffix; Thread.currentThread().setName(threadName); log.trace("Amqp reactor thread {} has started", threadName);