Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add subscription status callback to subscribeToTopic #1086

Merged
merged 4 commits into from
Aug 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Change Log - AWS SDK for Android


## [Release 2.14.2](https://github.com/aws/aws-sdk-android/releases/tag/release_v2.14.2)

### New Features

- **AWS IoT**
- Added an overloaded version of `subscribeToTopic()` method, `public void subscribeToTopic(final String topic, final AWSIotMqttQos qos, final AWSIotMqttSubscriptionStatusCallback subscriptionStatusCallback, final AWSIotMqttNewMessageCallback callback);`, in `AWSIotMqttManager` which accepts subscription status callback to notify users of the status of subscription operation. See [Issue#1005](https://github.com/aws-amplify/aws-sdk-android/issues/1005) for details.

## [Release 2.14.1](https://github.com/aws/aws-sdk-android/releases/tag/release_v2.14.1)

### New Features
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@

import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

import java.io.File;
Expand Down Expand Up @@ -705,6 +704,23 @@ public void onMessageArrived(String topic, byte[] data) {
assertEquals((int)ONE_TWENTY_KB, messages.get(0).length());
}

/**
* Test Subscribe status callback
*/
private class TestSubscriptionStatusCallback implements AWSIotMqttSubscriptionStatusCallback {
String subscriptionStatus = null;

@Override
public void onSuccess() {
subscriptionStatus = "Subscription successful";
}

@Override
public void onFailure(Throwable exception) {
subscriptionStatus = "Subscription failed";
}
}

@Test
public void mqttWebSocket() throws Exception {

Expand All @@ -728,15 +744,18 @@ public void onStatusChanged(AWSIotMqttClientStatus status, Throwable throwable)
assertEquals(AWSIotMqttClientStatusCallback.AWSIotMqttClientStatus.Connecting, statuses.get(0));
assertEquals(AWSIotMqttClientStatusCallback.AWSIotMqttClientStatus.Connected, statuses.get(1));

TestSubscriptionStatusCallback sscb = new TestSubscriptionStatusCallback();

// subscribe to MQTT topic
mqttManager.subscribeToTopic("sdk/test/integration/ws", AWSIotMqttQos.QOS0, new AWSIotMqttNewMessageCallback() {
mqttManager.subscribeToTopic("sdk/test/integration/ws", AWSIotMqttQos.QOS0, sscb, new AWSIotMqttNewMessageCallback() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you keep this test and add a new test for subscribeToTopic with callback for subscription status, that way we don't lose the coverage on the original method.

@Override
public void onMessageArrived(String topic, byte[] data) {
messages.add(new String(data));
}
});
// ensure subscription propagates
Thread.sleep(2000);
assertEquals("Subscription successful", sscb.subscriptionStatus);
// publish 20 messages
for (int i=0; i<20; ++i) {
mqttManager.publishString("integration test " + i, "sdk/test/integration/ws", AWSIotMqttQos.QOS0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,23 @@ public void resetReconnect() {
*/
public void subscribeToTopic(String topic, AWSIotMqttQos qos,
AWSIotMqttNewMessageCallback callback) {
subscribeToTopic(topic, qos, null, callback);
}

/**
* Subscribes to an MQTT topic
*
* @param topic The topic to which to subscribe.
* @param qos Quality of Service Level of the subscription.
* @param subscriptionStatusCallback Callback that will be notified when the subscribe has completed.
* Any exception encountered during the subscribe operation is reported on the callback
* if avalialble, else AmazonClientException is thrown by this method.
* @param callback Callback to be called when new message is received on this
* topic for this subscription.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment in the Javadoc which says when the callback is invoked and when the AmazonClientException is thrown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*/
public void subscribeToTopic(final String topic, final AWSIotMqttQos qos,
final AWSIotMqttSubscriptionStatusCallback subscriptionStatusCallback,
final AWSIotMqttNewMessageCallback callback) {

if (topic == null || topic.isEmpty()) {
throw new IllegalArgumentException("topic is null or empty");
Expand All @@ -1196,9 +1213,27 @@ public void subscribeToTopic(String topic, AWSIotMqttQos qos,

if (null != mqttClient) {
try {
mqttClient.subscribe(topic, qos.asInt());
if (subscriptionStatusCallback != null) {
mqttClient.subscribe(topic, qos.asInt(), null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
subscriptionStatusCallback.onSuccess();
}

@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
subscriptionStatusCallback.onFailure(exception);
}
});
} else {
mqttClient.subscribe(topic, qos.asInt());
}
} catch (final MqttException e) {
throw new AmazonClientException("Client error when subscribing.", e);
if(subscriptionStatusCallback != null) {
subscriptionStatusCallback.onFailure(e);
} else {
throw new AmazonClientException("Client error when subscribing.", e);
}
}
final AWSIotMqttTopic topicModel = new AWSIotMqttTopic(topic, qos, callback);
topicListeners.put(topic, topicModel);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://aws.amazon.com/apache2.0
*
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
* OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and
* limitations under the License.
*/

package com.amazonaws.mobileconnectors.iot;

/**
* Enables an application to be notified of status of call to subscribe to a topic.
*/
public interface AWSIotMqttSubscriptionStatusCallback {
/**
* This method is invoked when a susbcription has completed successfully.
*/
void onSuccess();
desokroshan marked this conversation as resolved.
Show resolved Hide resolved

/**
* This method is invoked when subscription fails.
* If a client is disconnected while an subscription is in progress
* onFailure will be called.
*/
void onFailure(Throwable exception);
}
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,37 @@ public void testDisconnectException() throws Exception {
testClient.disconnect();
}

@Test
public void testSubscribeToTopicWithSubscriptionCallback() throws Exception {
MockMqttClient mockClient = new MockMqttClient();

AWSIotMqttManager testClient = new AWSIotMqttManager("test-client",
Region.getRegion(Regions.US_EAST_1), TEST_ENDPOINT_PREFIX);
testClient.setMqttClient(mockClient);

KeyStore testKeystore = AWSIotKeystoreHelper.getIotKeystore(CERT_ID, KEYSTORE_PATH,
KEYSTORE_NAME, KEYSTORE_PASSWORD);
testClient.connect(testKeystore, null);

TestNewMessageCallback mcb = new TestNewMessageCallback();

TestAWSIotMqttSubscriptionStatusCallback sscb = new TestAWSIotMqttSubscriptionStatusCallback();

testClient.subscribeToTopic("unit/test/topic", AWSIotMqttQos.QOS0, sscb, mcb);

assertEquals(1, mockClient.subscribeCalls);
assertTrue(sscb.subscribed);
assertTrue(mockClient.mockSubscriptions.containsKey("unit/test/topic"));
assertEquals((Integer) 0, mockClient.mockSubscriptions.get("unit/test/topic"));

MqttMessage msg = new MqttMessage();
msg.setPayload("test payload".getBytes(StringUtils.UTF8));
mockClient.mockCallback.messageArrived("unit/test/topic", msg);

assertEquals(1, mcb.receivedMessages.size());
assertEquals("unit/test/topic" + "test payload", mcb.receivedMessages.get(0));
}

@Test
public void testSubscribeToTopic() throws Exception {
MockMqttClient mockClient = new MockMqttClient();
Expand Down Expand Up @@ -2951,6 +2982,20 @@ public void onMessageArrived(String topic, byte[] data) {
}
}

private class TestAWSIotMqttSubscriptionStatusCallback implements AWSIotMqttSubscriptionStatusCallback {
boolean subscribed;

@Override
public void onSuccess(){
subscribed = true;
}

@Override
public void onFailure(Throwable exception){
subscribed = false;
}
}

/**
* Test Publish Status Callback
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class MockMqttClient extends MqttAsyncClient {
public boolean isConnected;
public MqttCallback mockCallback;
public IMqttActionListener mockConnectionStatusCallback;
public IMqttActionListener mockSubscriptionStatusCallback;
public boolean throwsExceptionOnConnect;
public MqttException connectException;
public boolean throwsExceptionOnPublish;
Expand Down Expand Up @@ -102,6 +103,18 @@ public IMqttToken subscribe(String topicFilter, int qos) throws MqttException {
return testToken;
}

public IMqttToken subscribe(String topicFilter, int qos, Object userContext,
IMqttActionListener callback) throws MqttException {
if (throwsExceptionOnSubscribe) {
throw new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION);
}
++subscribeCalls;
mockSubscriptionStatusCallback = callback;
mockSubscriptions.put(topicFilter, qos);
callback.onSuccess(testToken);
return testToken;
}

public IMqttToken unsubscribe(String topicFilter) throws MqttException {
if (throwsExceptionOnUnsubscribe) {
throw new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION);
Expand Down