Skip to content

Commit

Permalink
Mqtt5 Adapter Binding (#674)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazhvera authored Sep 14, 2023
1 parent c72d878 commit 6288900
Show file tree
Hide file tree
Showing 8 changed files with 1,352 additions and 207 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
import software.amazon.awssdk.crt.io.SocketOptions;
import software.amazon.awssdk.crt.io.TlsContext;
import software.amazon.awssdk.crt.mqtt.MqttConnectionConfig;
import software.amazon.awssdk.crt.mqtt5.Mqtt5Client;
import software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions;
import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
Expand Down Expand Up @@ -49,6 +52,32 @@ void deliver(String topic, byte[] payload, boolean dup, int qos, boolean retain)
}
}

/**
* Static help function to create a MqttConnectionConfig from a
* Mqtt5ClientOptions
*/
private static MqttConnectionConfig s_toMqtt3ConnectionConfig(Mqtt5ClientOptions mqtt5options) {
MqttConnectionConfig options = new MqttConnectionConfig();
options.setEndpoint(mqtt5options.getHostName());
options.setPort(mqtt5options.getPort() != null ? Math.toIntExact(mqtt5options.getPort()) : 0);
options.setSocketOptions(mqtt5options.getSocketOptions());
if (mqtt5options.getConnectOptions() != null) {
options.setClientId(mqtt5options.getConnectOptions().getClientId());
options.setKeepAliveSecs(
mqtt5options.getConnectOptions().getKeepAliveIntervalSeconds() != null
? Math.toIntExact(mqtt5options.getConnectOptions().getKeepAliveIntervalSeconds())
: 0);
}
options.setCleanSession(
mqtt5options.getSessionBehavior().compareTo(Mqtt5ClientOptions.ClientSessionBehavior.CLEAN) <= 0);
options.setPingTimeoutMs(
mqtt5options.getPingTimeoutMs() != null ? Math.toIntExact(mqtt5options.getPingTimeoutMs()) : 0);
options.setProtocolOperationTimeoutMs(mqtt5options.getAckTimeoutSeconds() != null
? Math.toIntExact(mqtt5options.getAckTimeoutSeconds()) * 1000
: 0);
return options;
}

/**
* Constructs a new MqttClientConnection. Connections are reusable after being
* disconnected.
Expand All @@ -71,8 +100,62 @@ public MqttClientConnection(MqttConnectionConfig config) throws MqttException {
}

try {
acquireNativeHandle(mqttClientConnectionNew(config.getMqttClient().getNativeHandle(), this));
acquireNativeHandle(mqttClientConnectionNewFrom311Client(config.getMqttClient().getNativeHandle(), this));
SetupConfig(config);

} catch (CrtRuntimeException ex) {
throw new MqttException("Exception during mqttClientConnectionNew: " + ex.getMessage());
}
}

/**
* Constructs a new MqttClientConnection from a Mqtt5Client. Connections are
* reusable after being
* disconnected.
*
* @param mqtt5client the mqtt5 client to setup from
* @param callbacks connection callbacks triggered when receive connection
* events
*
* @throws MqttException If mqttClient is null
*/
public MqttClientConnection(Mqtt5Client mqtt5client, MqttClientConnectionEvents callbacks) throws MqttException {
if (mqtt5client == null) {
throw new MqttException("mqttClient must not be null");
}

try (MqttConnectionConfig config = s_toMqtt3ConnectionConfig(mqtt5client.getClientOptions())) {
config.setMqtt5Client(mqtt5client);
if (callbacks != null) {
config.setConnectionCallbacks(callbacks);
}

if (config.getClientId() == null) {
throw new MqttException("clientId must not be null");
}
if (config.getEndpoint() == null) {
throw new MqttException("endpoint must not be null");
}
if (config.getPort() <= 0 || config.getPort() > 65535) {
throw new MqttException("port must be a positive integer between 1 and 65535");
}

try {
acquireNativeHandle(
mqttClientConnectionNewFrom5Client(config.getMqtt5Client().getNativeHandle(), this));
SetupConfig(config);

} catch (CrtRuntimeException ex) {
throw new MqttException("Exception during mqttClientConnectionNew: " + ex.getMessage());
}
} catch (Exception e) {
throw new MqttException("Failed to setup mqtt3 connection : " + e.getMessage());
}

}

private void SetupConfig(MqttConnectionConfig config) throws MqttException {
try {
if (config.getUsername() != null) {
mqttClientConnectionSetLogin(getNativeHandle(), config.getUsername(), config.getPassword());
}
Expand Down Expand Up @@ -200,7 +283,12 @@ private void onConnectionClosed() {
*/
public CompletableFuture<Boolean> connect() throws MqttException {

TlsContext tls = config.getMqttClient().getTlsContext();
TlsContext tls = null;
if (config.getMqttClient() != null) {
tls = config.getMqttClient().getTlsContext();
} else if (config.getMqtt5Client() != null) {
tls = config.getMqtt5Client().getClientOptions().getTlsContext();
}

// Just clamp the pingTimeout, no point in throwing
short pingTimeout = (short) Math.max(0, Math.min(config.getPingTimeoutMs(), Short.MAX_VALUE));
Expand Down Expand Up @@ -362,7 +450,9 @@ private void onWebsocketHandshake(HttpRequest handshakeRequest, long nativeUserD
}

/**
* Returns statistics about the current state of the MqttClientConnection's queue of operations.
* Returns statistics about the current state of the MqttClientConnection's
* queue of operations.
*
* @return Current state of the connection's queue of operations.
*/
public MqttClientConnectionOperationStatistics getOperationStatistics() {
Expand All @@ -372,7 +462,10 @@ public MqttClientConnectionOperationStatistics getOperationStatistics() {
/*******************************************************************************
* Native methods
******************************************************************************/
private static native long mqttClientConnectionNew(long client, MqttClientConnection thisObj)
private static native long mqttClientConnectionNewFrom311Client(long client, MqttClientConnection thisObj)
throws CrtRuntimeException;

private static native long mqttClientConnectionNewFrom5Client(long client, MqttClientConnection thisObj)
throws CrtRuntimeException;

private static native void mqttClientConnectionDestroy(long connection);
Expand Down Expand Up @@ -419,6 +512,7 @@ private static native void mqttClientConnectionSetHttpProxyOptions(long connecti
String proxyAuthorizationUsername,
String proxyAuthorizationPassword) throws CrtRuntimeException;

private static native MqttClientConnectionOperationStatistics mqttClientConnectionGetOperationStatistics(long connection);
private static native MqttClientConnectionOperationStatistics mqttClientConnectionGetOperationStatistics(
long connection);

};
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import software.amazon.awssdk.crt.http.HttpProxyOptions;
import software.amazon.awssdk.crt.io.ClientTlsContext;
import software.amazon.awssdk.crt.io.SocketOptions;
import software.amazon.awssdk.crt.mqtt5.Mqtt5Client;

/**
* Encapsulates all per-mqtt-connection configuration
Expand All @@ -23,6 +24,7 @@ public final class MqttConnectionConfig extends CrtResource {

/* mqtt */
private MqttClient mqttClient;
private Mqtt5Client mqtt5Client;
private String clientId;
private String username;
private String password;
Expand Down Expand Up @@ -314,6 +316,25 @@ public MqttClient getMqttClient() {
return mqttClient;
}

/**
* Configures the mqtt5 client to use for a connection
*
* @param mqtt5Client the mqtt client to use
*/
public void setMqtt5Client(Mqtt5Client mqtt5Client) {
swapReferenceTo(this.mqtt5Client, mqtt5Client);
this.mqtt5Client = mqtt5Client;
}

/**
* Queries the mqtt5 client to use for a connection
*
* @return the mqtt5 client to use
*/
public Mqtt5Client getMqtt5Client() {
return mqtt5Client;
}

/**
* Sets the login credentials for a connection.
*
Expand Down Expand Up @@ -529,6 +550,7 @@ public MqttConnectionConfig clone() {
clone.setSocketOptions(getSocketOptions());

clone.setMqttClient(getMqttClient());
clone.setMqtt5Client(getMqtt5Client());
clone.setClientId(getClientId());
clone.setUsername(getUsername());
clone.setPassword(getPassword());
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public class Mqtt5Client extends CrtResource {
*/
private boolean isConnected;

/**
* A private config used to save config for mqtt3 connection creation
*/
private Mqtt5ClientOptions clientOptions;

/**
* Creates a Mqtt5Client instance using the provided Mqtt5ClientOptions. Once the Mqtt5Client is created,
* changing the settings will not cause a change in already created Mqtt5Client's.
Expand All @@ -54,6 +59,7 @@ public class Mqtt5Client extends CrtResource {
* @throws CrtRuntimeException If the system is unable to allocate space for a native MQTT5 client structure
*/
public Mqtt5Client(Mqtt5ClientOptions options) throws CrtRuntimeException {
clientOptions = options;
ClientBootstrap bootstrap = options.getBootstrap();
SocketOptions socketOptions = options.getSocketOptions();
TlsContext tlsContext = options.getTlsContext();
Expand Down Expand Up @@ -202,6 +208,21 @@ private synchronized void setIsConnected(boolean connected) {
isConnected = connected;
}


/*******************************************************************************
* Mqtt5 to Mqtt3 Adapter
******************************************************************************/

/**
* Returns the Mqtt5ClientOptions used for the Mqtt5Client
*
* @return Mqtt5ClientOptions
*/
public Mqtt5ClientOptions getClientOptions()
{
return clientOptions;
}

/*******************************************************************************
* websocket methods
******************************************************************************/
Expand All @@ -228,6 +249,7 @@ private void onWebsocketHandshake(HttpRequest handshakeRequest, long nativeUserD
}
}


/*******************************************************************************
* native methods
******************************************************************************/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import software.amazon.awssdk.crt.io.ExponentialBackoffRetryOptions.JitterMode;

import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket;
import software.amazon.awssdk.crt.mqtt.MqttConnectionConfig;

import java.util.Map;
import java.util.function.Function;
Expand Down
Loading

0 comments on commit 6288900

Please sign in to comment.