Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue where configured send interval isn't reflected in AMQP layer #1673

Merged
merged 5 commits into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -258,5 +258,7 @@ package-list
!/.idea/inspectionProfiles
!/.idea/scopes

/device/iot-device-samples/android-sample/.idea/*
/iot-e2e-tests/android/.idea/*
/iothub/device/iot-device-samples/android-sample/.idea/*
/iot-e2e-tests/android/.idea/*
/iothub/device/iot-device-samples/android-sample/app/.idea/*
/iot-e2e-tests/android/app/.idea/*
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ public final class ClientConfiguration
private static final int DEFAULT_HTTPS_CONNECT_TIMEOUT_MILLIS = 0; //no connect timeout

public static final int DEFAULT_KEEP_ALIVE_INTERVAL_IN_SECONDS = 230;
public static final int DEFAULT_SEND_INTERVAL_IN_SECONDS = 10;

public static final int DEFAULT_AMQP_OPEN_AUTHENTICATION_SESSION_TIMEOUT_IN_SECONDS = 20;
public static final int DEFAULT_AMQP_OPEN_DEVICE_SESSIONS_TIMEOUT_IN_SECONDS = 60;

public static final int DEFAULT_SEND_INTERVAL_IN_MILLISECONDS = 10;

/** The default value for messageLockTimeoutSecs. */
private static final int DEFAULT_MESSAGE_LOCK_TIMEOUT_SECS = 180;

Expand Down Expand Up @@ -79,6 +82,10 @@ public final class ClientConfiguration
@Setter(AccessLevel.PACKAGE)
private int keepAliveInterval = DEFAULT_KEEP_ALIVE_INTERVAL_IN_SECONDS;

@Getter
@Setter(AccessLevel.PACKAGE)
private int sendInterval = DEFAULT_SEND_INTERVAL_IN_MILLISECONDS;

private IotHubAuthenticationProvider authenticationProvider;

/**
Expand Down Expand Up @@ -207,6 +214,7 @@ private void setClientOptionValues(ClientOptions clientOptions)
this.amqpOpenAuthenticationSessionTimeout = clientOptions != null && clientOptions.getAmqpAuthenticationSessionTimeout() != 0 ? clientOptions.getAmqpAuthenticationSessionTimeout() : DEFAULT_AMQP_OPEN_AUTHENTICATION_SESSION_TIMEOUT_IN_SECONDS;
this.amqpOpenDeviceSessionsTimeout = clientOptions != null && clientOptions.getAmqpDeviceSessionTimeout() != 0 ? clientOptions.getAmqpDeviceSessionTimeout() : DEFAULT_AMQP_OPEN_DEVICE_SESSIONS_TIMEOUT_IN_SECONDS;
this.proxySettings = clientOptions != null && clientOptions.getProxySettings() != null ? clientOptions.getProxySettings() : null;
this.sendInterval = clientOptions != null && clientOptions.getSendInterval() != 0 ? clientOptions.getSendInterval() : DEFAULT_SEND_INTERVAL_IN_MILLISECONDS;

if (proxySettings != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ final class DeviceIO implements IotHubConnectionStatusChangeCallback
IotHubClientProtocol protocol,
SSLContext sslContext,
ProxySettings proxySettings,
int keepAliveInterval)
int keepAliveInterval,
int sendInterval)
{
this.state = IotHubConnectionStatus.DISCONNECTED;
this.transport = new IotHubTransport(hostName, protocol, sslContext, proxySettings, this, keepAliveInterval);
this.transport = new IotHubTransport(hostName, protocol, sslContext, proxySettings, this, keepAliveInterval, sendInterval);
this.sendTask = new IotHubSendTask(this.transport);
this.receiveTask = new IotHubReceiveTask(this.transport);
this.reconnectTask = new IotHubReconnectTask(this.transport);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.*;

import static com.microsoft.azure.sdk.iot.device.ClientConfiguration.DEFAULT_KEEP_ALIVE_INTERVAL_IN_SECONDS;
import static com.microsoft.azure.sdk.iot.device.ClientConfiguration.DEFAULT_SEND_INTERVAL_IN_MILLISECONDS;

/**
* A client for creating multiplexed connections to IoT hub. A multiplexed connection allows for multiple device clients
Expand Down Expand Up @@ -99,6 +100,7 @@ public MultiplexingClient(String hostName, IotHubClientProtocol protocol, Multip
long receivePeriod = options != null ? options.getReceiveInterval() : DEFAULT_RECEIVE_PERIOD_MILLIS;
int sendMessagesPerThread = options != null ? options.getMaxMessagesSentPerSendInterval() : DEFAULT_MAX_MESSAGES_TO_SEND_PER_THREAD;
int keepAliveInterval = options != null ? options.getKeepAliveInterval() : DEFAULT_KEEP_ALIVE_INTERVAL_IN_SECONDS;
int sendInterval = (int) (options != null ? options.getSendInterval() : DEFAULT_SEND_INTERVAL_IN_MILLISECONDS);

if (sendPeriod < 0)
{
Expand All @@ -125,7 +127,7 @@ else if (receivePeriod == 0) //default builder value for this option, signals th

// Optional settings from MultiplexingClientOptions
SSLContext sslContext = options != null ? options.getSslContext() : null;
this.deviceIO = new DeviceIO(hostName, protocol, sslContext, proxySettings, keepAliveInterval);
this.deviceIO = new DeviceIO(hostName, protocol, sslContext, proxySettings, keepAliveInterval, sendInterval);
this.deviceIO.setMaxNumberOfMessagesSentPerSendThread(sendMessagesPerThread);
this.deviceIO.setSendPeriodInMilliseconds(sendPeriod);
this.deviceIO.setReceivePeriodInMilliseconds(receivePeriod);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public class IotHubTransport implements IotHubListener
private final String hostName;
private final ProxySettings proxySettings;
private final int keepAliveInterval;
private final int sendInterval;
private SSLContext sslContext;
private final boolean isMultiplexing;

Expand Down Expand Up @@ -148,6 +149,7 @@ public IotHubTransport(ClientConfiguration defaultConfig, IotHubConnectionStatus

this.deviceIOConnectionStatusChangeCallback = deviceIOConnectionStatusChangeCallback;
this.keepAliveInterval = defaultConfig.getKeepAliveInterval();
this.sendInterval = defaultConfig.getSendInterval();
}

public IotHubTransport(
Expand All @@ -156,7 +158,8 @@ public IotHubTransport(
SSLContext sslContext,
ProxySettings proxySettings,
IotHubConnectionStatusChangeCallback deviceIOConnectionStatusChangeCallback,
int keepAliveInterval) throws IllegalArgumentException
int keepAliveInterval,
int sendInterval) throws IllegalArgumentException
{
this.protocol = protocol;
this.hostName = hostName;
Expand All @@ -166,6 +169,7 @@ public IotHubTransport(
this.deviceIOConnectionStatusChangeCallback = deviceIOConnectionStatusChangeCallback;
this.isMultiplexing = true;
this.keepAliveInterval = keepAliveInterval;
this.sendInterval = sendInterval;
}

public Semaphore getSendThreadSemaphore()
Expand Down Expand Up @@ -1324,7 +1328,8 @@ private void openConnection() throws TransportException
this.protocol == IotHubClientProtocol.AMQPS_WS,
this.sslContext,
this.proxySettings,
this.keepAliveInterval);
this.keepAliveInterval,
this.sendInterval);

for (ClientConfiguration config : this.deviceClientConfigs.values())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public final class AmqpsIotHubConnection extends BaseHandler implements IotHubTr
private static final int CBS_SESSION_COUNT = 1; //even for multiplex scenarios

// Message send constants
private static final int SEND_MESSAGES_PERIOD_MILLIS = 50; //every 50 milliseconds, the method onTimerTask will fire to send, at most, MAX_MESSAGES_TO_SEND_PER_CALLBACK queued messages
private final int sendInterval; //every X milliseconds, the method onTimerTask will fire to send, at most, MAX_MESSAGES_TO_SEND_PER_CALLBACK queued messages
private static final int MAX_MESSAGES_TO_SEND_PER_CALLBACK = 1000; //Max number of queued messages to send per periodic sending task

// States of outgoing messages, incoming messages, and outgoing subscriptions
Expand Down Expand Up @@ -141,12 +141,13 @@ public AmqpsIotHubConnection(ClientConfiguration config, String transportUniqueI
this.isMultiplexing = false;

this.keepAliveInterval = config.getKeepAliveInterval();
this.sendInterval = clientConfiguration.getSendInterval();

this.state = IotHubConnectionStatus.DISCONNECTED;
log.trace("AmqpsIotHubConnection object is created successfully and will use port {}", this.isWebsocketConnection ? WEB_SOCKET_PORT : AMQP_PORT);
}

public AmqpsIotHubConnection(String hostName, String transportUniqueIdentifier, boolean isWebsocketConnection, SSLContext sslContext, ProxySettings proxySettings, int keepAliveInterval)
public AmqpsIotHubConnection(String hostName, String transportUniqueIdentifier, boolean isWebsocketConnection, SSLContext sslContext, ProxySettings proxySettings, int keepAliveInterval, int sendInterval)
{
// This allows us to create thread safe sets despite there being no such type default in Java 7 or 8
this.clientConfigurations = Collections.newSetFromMap(new ConcurrentHashMap<>());
Expand All @@ -171,6 +172,7 @@ public AmqpsIotHubConnection(String hostName, String transportUniqueIdentifier,
log.trace("AmqpsIotHubConnection object is created successfully and will use port {}", this.isWebsocketConnection ? WEB_SOCKET_PORT : AMQP_PORT);

this.keepAliveInterval = keepAliveInterval;
this.sendInterval = sendInterval;
}

public void registerMultiplexedDevice(ClientConfiguration config)
Expand Down Expand Up @@ -394,7 +396,7 @@ public void onReactorInit(Event event)
}

this.reactor.connectionToHost(hostName, port, this);
this.reactor.schedule(SEND_MESSAGES_PERIOD_MILLIS, this);
this.reactor.schedule(this.sendInterval, this);
}

@Override
Expand Down Expand Up @@ -611,7 +613,7 @@ public void onTimerTask(Event event)
checkForNewlyUnregisteredMultiplexedClientsToStop();
checkForNewlyRegisteredMultiplexedClientsToStart();

event.getReactor().schedule(SEND_MESSAGES_PERIOD_MILLIS, this);
event.getReactor().schedule(this.sendInterval, this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,6 @@ public void onReactorInit(final @Mocked AmqpsSessionHandler mockAmqpsSessionHand
{
baseExpectations();

final int sendPeriod = Deencapsulation.getField(AmqpsIotHubConnection.class, "SEND_MESSAGES_PERIOD_MILLIS");
final int expectedSasTokenRenewalPeriod = 444;

new NonStrictExpectations()
Expand Down Expand Up @@ -760,7 +759,7 @@ public void onReactorInit(final @Mocked AmqpsSessionHandler mockAmqpsSessionHand
{
{
mockEvent.getReactor();
mockReactor.schedule(sendPeriod, connection);
mockReactor.schedule(anyInt, connection);
mockReactor.connectionToHost(anyString, anyInt, connection);
}
};
Expand Down Expand Up @@ -814,7 +813,6 @@ public void onReactorInitX509() throws TransportException
{
baseExpectations();

final int sendPeriod = Deencapsulation.getField(AmqpsIotHubConnection.class, "SEND_MESSAGES_PERIOD_MILLIS");
final int expectedSasTokenRenewalPeriod = 444;

new NonStrictExpectations()
Expand All @@ -840,7 +838,7 @@ public void onReactorInitX509() throws TransportException
{
{
mockEvent.getReactor();
mockReactor.schedule(sendPeriod, connection);
mockReactor.schedule(anyInt, connection);

mockReactor.schedule(expectedSasTokenRenewalPeriod, mockAmqpsCbsSessionHandler);
times = 0;
Expand Down