Skip to content

Commit 51f3802

Browse files
Adjust shared subscription sample to better fit docs (#406)
1 parent 4413364 commit 51f3802

File tree

2 files changed

+33
-41
lines changed

2 files changed

+33
-41
lines changed

samples/Mqtt5/SharedSubscription/README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@ MQTT5 introduces additional features and enhancements that improve the developme
1010

1111
Note: MQTT5 support is currently in **developer preview**. We encourage feedback at all times, but feedback during the preview window is especially valuable in shaping the final product. During the preview period we may make backwards-incompatible changes to the public API, but in general, this is something we will try our best to avoid.
1212

13-
[Shared Subscriptions](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901250) allow IoT devices to connect to a group where messages sent to a topic are then relayed to the group in a round-robin-like fashion. This is useful for distributing message load across multiple subscribing MQTT5 clients automatically. This is helpful for load balancing when you have many messages that need to be processed.
13+
[Shared Subscriptions](https://docs.aws.amazon.com/iot/latest/developerguide/mqtt.html#mqtt5-shared-subscription) allow IoT devices to connect to a group where messages sent to a topic are then relayed to the group in a round-robin-like fashion. This is useful for distributing message load across multiple subscribing MQTT5 clients automatically. This is helpful for load balancing when you have many messages that need to be processed.
1414

15-
Shared Subscriptions rely on a group identifier, which tells the MQTT5 broker/server which IoT devices to treat as a group for message distribution. This is done when subscribing by formatting the subscription topic like the following: `$share/<group identifier>/<topic>`.
15+
Shared Subscriptions rely on a group name/identifier, which tells the MQTT5 broker/server which IoT devices to treat as a group for message distribution. This is done when subscribing by formatting the subscription topic like the following: `$share/<ShareName>/<TopicFilter>`.
1616
* `$share`: Tells the MQTT5 broker/server that the device is subscribing to a Shared Subscription.
17-
* `<group identifier>`: Tells the MQTT5 broker/server which group to add this Shared Subscription to. Messages published to a matching topic will be distributed round-robin amongst the group.
18-
* `<topic>`: The topic that the Shared Subscription is for. Messages published to this topic will be processed in a round-robin fashion. For example, `test/topic`.
17+
* `<ShareName>`: Tells the MQTT5 broker/server which group to add this Shared Subscription to. Messages published to a matching topic will be distributed round-robin amongst the group.
18+
* `<TopicFilter>`: The topic that the Shared Subscription is for. Messages published to this topic will be processed in a round-robin fashion. For example, `test/topic`.
1919

2020
Shared Subscriptions use a round-robbin like method of distributing messages. For example, say you have three MQTT5 clients all subscribed to the same Shared Subscription group and topic. If five messages are sent to the Shared Subscription topic, the messages will likely be delivered in the following order:
2121
* Message 1 -> Client one
@@ -71,7 +71,7 @@ Replace with the following with the data from your AWS account:
7171
* `<region>`: The AWS IoT Core region where you created your AWS IoT Core thing you wish to use with this sample. For example `us-east-1`.
7272
* `<account>`: Your AWS IoT Core account ID. This is the set of numbers in the top right next to your AWS account name when using the AWS IoT Core website.
7373

74-
Note that in a real application, you may want to avoid the use of wildcards in your ClientID or use them selectively. Please follow best practices when working with AWS on production applications using the SDK. Also, for the purposes of this sample, please make sure your policy allows a client ID of `test-*` to connect or use `--client_id <client ID here>` to send the client ID your policy supports.
74+
Note that in a real application, you may want to avoid the use of wildcards in your ClientID and shared subscription group names/identifiers. Wildcards should only be used selectively. Please follow best practices when working with AWS on production applications using the SDK. Also, for the purposes of this sample, please make sure your policy allows a client ID of `test-*` to connect or use `--client_id <client ID here>` to send the client ID your policy supports.
7575

7676
</details>
7777

samples/Mqtt5/SharedSubscription/src/main/java/sharedsubscription/SharedSubscription.java

Lines changed: 28 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,9 @@ public void onStopped(Mqtt5Client client, OnStoppedReturn onStoppedReturn) {
107107
*/
108108
static final class SamplePublishEvents implements Mqtt5ClientOptions.PublishEvents {
109109
SampleMqtt5Client sampleClient;
110-
CountDownLatch messagesReceived;
111110

112-
SamplePublishEvents(SampleMqtt5Client client, int messageCount) {
111+
SamplePublishEvents(SampleMqtt5Client client) {
113112
sampleClient = client;
114-
messagesReceived = new CountDownLatch(messageCount);
115113
}
116114

117115
@Override
@@ -132,7 +130,6 @@ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) {
132130
}
133131
}
134132
}
135-
messagesReceived.countDown();
136133
}
137134
}
138135

@@ -151,10 +148,10 @@ static final class SampleMqtt5Client {
151148
*/
152149
public static SampleMqtt5Client createMqtt5Client(
153150
String input_endpoint, String input_cert, String input_key, String input_ca,
154-
String input_clientId, int input_count, String input_clientName) {
151+
String input_clientId, String input_clientName) {
155152

156153
SampleMqtt5Client sampleClient = new SampleMqtt5Client();
157-
SamplePublishEvents publishEvents = new SamplePublishEvents(sampleClient, input_count / 2);
154+
SamplePublishEvents publishEvents = new SamplePublishEvents(sampleClient);
158155
SampleLifecycleEvents lifecycleEvents = new SampleLifecycleEvents(sampleClient);
159156

160157
Mqtt5Client client;
@@ -201,23 +198,17 @@ public static void main(String[] args) {
201198
SampleMqtt5Client subscriberOne = null;
202199
SampleMqtt5Client subscriberTwo = null;
203200

204-
/* Make sure the message count is even */
205-
if (cmdData.input_count%2 != 0) {
206-
onApplicationFailure(new Throwable("'--count' is an odd number. '--count' must be even or zero for this sample."));
207-
System.exit(1);
208-
}
209-
210201
try {
211202
/* Create the MQTT5 clients: one publisher and two subscribers */
212203
publisher = SampleMqtt5Client.createMqtt5Client(
213204
cmdData.input_endpoint, cmdData.input_cert, cmdData.input_key, cmdData.input_ca,
214-
cmdData.input_clientId + '1', cmdData.input_count, "Publisher");
205+
cmdData.input_clientId + '1', "Publisher");
215206
subscriberOne = SampleMqtt5Client.createMqtt5Client(
216207
cmdData.input_endpoint, cmdData.input_cert, cmdData.input_key, cmdData.input_ca,
217-
cmdData.input_clientId + '2', cmdData.input_count, "Subscriber One");
208+
cmdData.input_clientId + '2', "Subscriber One");
218209
subscriberTwo = SampleMqtt5Client.createMqtt5Client(
219210
cmdData.input_endpoint, cmdData.input_cert, cmdData.input_key, cmdData.input_ca,
220-
cmdData.input_clientId + '3', cmdData.input_count, "Subscriber Two");
211+
cmdData.input_clientId + '3', "Subscriber Two");
221212

222213
/* Connect all the clients */
223214
publisher.client.start();
@@ -231,22 +222,18 @@ public static void main(String[] args) {
231222
System.out.println("[" + subscriberTwo.name + "]: Connected");
232223

233224
/* Subscribe to the shared topic on the two subscribers */
234-
try {
235-
SubscribePacket.SubscribePacketBuilder subscribeBuilder = new SubscribePacket.SubscribePacketBuilder();
236-
subscribeBuilder.withSubscription(input_sharedTopic, QOS.AT_LEAST_ONCE, false, false, SubscribePacket.RetainHandlingType.DONT_SEND);
237-
subscriberOne.client.subscribe(subscribeBuilder.build()).get(60, TimeUnit.SECONDS);
238-
System.out.println("[" + subscriberOne.name + "]: Subscribed");
239-
subscriberTwo.client.subscribe(subscribeBuilder.build()).get(60, TimeUnit.SECONDS);
240-
System.out.println("[" + subscriberTwo.name + "]: Subscribed");
241-
}
242-
// TMP: If this fails subscribing in CI, just exit the sample gracefully.
243-
catch (Exception ex) {
244-
if (isCI) {
245-
return;
246-
} else {
247-
throw ex;
248-
}
249-
}
225+
SubscribePacket.SubscribePacketBuilder subscribeBuilder = new SubscribePacket.SubscribePacketBuilder();
226+
subscribeBuilder.withSubscription(input_sharedTopic, QOS.AT_LEAST_ONCE, false, false, SubscribePacket.RetainHandlingType.DONT_SEND);
227+
subscriberOne.client.subscribe(subscribeBuilder.build()).get(60, TimeUnit.SECONDS);
228+
System.out.println(
229+
"[" + subscriberOne.name + "]: Subscribed to topic '" + cmdData.input_topic +
230+
"' in shared subscription group '" + cmdData.input_groupIdentifier + "'.");
231+
System.out.println("[" + subscriberOne.name + "]: Full subscribed topic is '" + input_sharedTopic + "'.");
232+
subscriberTwo.client.subscribe(subscribeBuilder.build()).get(60, TimeUnit.SECONDS);
233+
System.out.println(
234+
"[" + subscriberTwo.name + "]: Subscribed to topic '" + cmdData.input_topic +
235+
"' in shared subscription group '" + cmdData.input_groupIdentifier + "'.");
236+
System.out.println("[" + subscriberTwo.name + "]: Full subscribed topic is '" + input_sharedTopic + "'.");
250237

251238
/* Publish using the publisher client */
252239
PublishPacket.PublishPacketBuilder publishBuilder = new PublishPacket.PublishPacketBuilder();
@@ -259,9 +246,8 @@ public static void main(String[] args) {
259246
System.out.println("[" + publisher.name + "]: Sent publish");
260247
Thread.sleep(1000);
261248
}
262-
/* Make sure all the messages were gotten on the subscribers */
263-
subscriberOne.publishEvents.messagesReceived.await(60 * 4, TimeUnit.SECONDS);
264-
subscriberTwo.publishEvents.messagesReceived.await(60 * 4, TimeUnit.SECONDS);
249+
/* Wait 5 seconds to let the last publish go out before unsubscribing */
250+
Thread.sleep(5000);
265251
} else {
266252
System.out.println("Skipping publishing messages due to message count being zero...");
267253
}
@@ -270,9 +256,15 @@ public static void main(String[] args) {
270256
UnsubscribePacket.UnsubscribePacketBuilder unsubscribeBuilder = new UnsubscribePacket.UnsubscribePacketBuilder();
271257
unsubscribeBuilder.withSubscription(input_sharedTopic);
272258
subscriberOne.client.unsubscribe(unsubscribeBuilder.build()).get(60, TimeUnit.SECONDS);
273-
System.out.println("[" + subscriberOne.name + "]: Unsubscribed");
259+
System.out.println(
260+
"[" + subscriberOne.name + "]: Unsubscribed to topic '" + cmdData.input_topic +
261+
"' in shared subscription group '" + cmdData.input_groupIdentifier + "'.");
262+
System.out.println("[" + subscriberOne.name + "]: Full unsubscribed topic is '" + input_sharedTopic + "'.");
274263
subscriberTwo.client.unsubscribe(unsubscribeBuilder.build()).get(60, TimeUnit.SECONDS);
275-
System.out.println("[" + subscriberTwo.name + "]: Unsubscribed");
264+
System.out.println(
265+
"[" + subscriberTwo.name + "]: Unsubscribed to topic '" + cmdData.input_topic +
266+
"' in shared subscription group '" + cmdData.input_groupIdentifier + "'.");
267+
System.out.println("[" + subscriberTwo.name + "]: Full unsubscribed topic is '" + input_sharedTopic + "'.");
276268

277269
/* Disconnect all the clients */
278270
publisher.client.stop(null);

0 commit comments

Comments
 (0)