Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements subscription options retain as published feature #815

3 changes: 2 additions & 1 deletion ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ Version 0.18-SNAPSHOT:
[feature] subscription option handling: (issue #808)
- Move from qos to subscription option implementing the persistence of SubscriptionOption to/from storage. (#810)
- Exposed the maximum granted QoS by the server with the config setting 'max_server_granted_qos'. (#811)
- Implements handling of noLocal subscription option on MQTT5 connections. (#814)
- Implements handling of noLocal subscription option on MQTT5 connections. (#814)
- Implements subscription options retain as published feature. (#815)
[feature] subscription identifiers: (issue #801)
- Implements the validation of subscription identifier properties in SUBSCRIBE. (#803)
- Store and retrieve the subscription identifier into the subscriptions directory. (#804)
Expand Down
43 changes: 26 additions & 17 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ private void trackWillSpecificationForFutureFire(Session bindedSession, ISession
}

private void publishWill(ISessionsRepository.Will will) {
publish2Subscribers(WILL_PUBLISHER, Unpooled.copiedBuffer(will.payload), new Topic(will.topic), will.qos);
publish2Subscribers(WILL_PUBLISHER, Unpooled.copiedBuffer(will.payload), new Topic(will.topic), will.qos, will.retained);
}

/**
Expand Down Expand Up @@ -528,7 +528,7 @@ CompletableFuture<Void> receivedPublishQos0(Topic topic, String username, String
ReferenceCountUtil.release(msg);
return CompletableFuture.completedFuture(null);
}
final RoutingResults publishResult = publish2Subscribers(clientID, msg.payload(), topic, AT_MOST_ONCE);
final RoutingResults publishResult = publish2Subscribers(clientID, msg.payload(), topic, AT_MOST_ONCE, msg.fixedHeader().isRetain());
if (publishResult.isAllFailed()) {
LOG.info("No one publish was successfully enqueued to session loops");
ReferenceCountUtil.release(msg);
Expand Down Expand Up @@ -564,12 +564,13 @@ RoutingResults receivedPublishQos1(MQTTConnection connection, Topic topic, Strin
}

ByteBuf payload = msg.payload();
boolean retainPublish = msg.fixedHeader().isRetain();
final RoutingResults routes;
if (msg.fixedHeader().isDup()) {
final Set<String> failedClients = failedPublishes.listFailed(clientId, messageID);
routes = publish2Subscribers(clientId, payload, topic, AT_LEAST_ONCE, failedClients);
routes = publish2Subscribers(clientId, payload, topic, AT_LEAST_ONCE, failedClients, retainPublish);
} else {
routes = publish2Subscribers(clientId, payload, topic, AT_LEAST_ONCE);
routes = publish2Subscribers(clientId, payload, topic, AT_LEAST_ONCE, retainPublish);
}
if (LOG.isTraceEnabled()) {
LOG.trace("subscriber routes: {}", routes);
Expand Down Expand Up @@ -602,8 +603,9 @@ private void manageRetain(Topic topic, MqttPublishMessage msg) {
}
}

private RoutingResults publish2Subscribers(String publisherClientId, ByteBuf payload, Topic topic, MqttQoS publishingQos) {
return publish2Subscribers(publisherClientId, payload, topic, publishingQos, NO_FILTER);
private RoutingResults publish2Subscribers(String publisherClientId, ByteBuf payload, Topic topic,
MqttQoS publishingQos, boolean isPublishRetained) {
return publish2Subscribers(publisherClientId, payload, topic, publishingQos, NO_FILTER, isPublishRetained);
}

private class BatchingPublishesCollector {
Expand Down Expand Up @@ -670,7 +672,7 @@ public int countBatches() {
}

private RoutingResults publish2Subscribers(String publisherClientId, ByteBuf payload, Topic topic, MqttQoS publishingQos,
Set<String> filterTargetClients) {
Set<String> filterTargetClients, boolean retainPublish) {
List<Subscription> topicMatchingSubscriptions = subscriptions.matchQosSharpening(topic);
if (topicMatchingSubscriptions.isEmpty()) {
// no matching subscriptions, clean exit
Expand Down Expand Up @@ -704,7 +706,7 @@ private RoutingResults publish2Subscribers(String publisherClientId, ByteBuf pay
payload.retain(subscriptionCount);

List<RouteResult> publishResults = collector.routeBatchedPublishes((batch) -> {
publishToSession(payload, topic, batch, publishingQos);
publishToSession(payload, topic, batch, publishingQos, retainPublish);
payload.release();
});

Expand All @@ -727,23 +729,28 @@ private RoutingResults publish2Subscribers(String publisherClientId, ByteBuf pay
return new RoutingResults(successedRoutings, failedRoutings, publishes);
}

private void publishToSession(ByteBuf payload, Topic topic, Collection<Subscription> subscriptions, MqttQoS publishingQos) {
ByteBuf duplicate = payload.duplicate();
private void publishToSession(ByteBuf payload, Topic topic, Collection<Subscription> subscriptions,
MqttQoS publishingQos, boolean retainPublish) {
ByteBuf duplicatedPayload = payload.duplicate();
for (Subscription sub : subscriptions) {
MqttQoS qos = lowerQosToTheSubscriptionDesired(sub, publishingQos);
publishToSession(duplicate, topic, sub, qos);
boolean retained = false;
if (sub.option().isRetainAsPublished()) {
retained = retainPublish;
}
publishToSession(duplicatedPayload, topic, sub, qos, retained);
}
}

private void publishToSession(ByteBuf payload, Topic topic, Subscription sub, MqttQoS qos) {
private void publishToSession(ByteBuf payload, Topic topic, Subscription sub, MqttQoS qos, boolean retained) {
Session targetSession = this.sessionRegistry.retrieve(sub.getClientId());

boolean isSessionPresent = targetSession != null;
if (isSessionPresent) {
LOG.debug("Sending PUBLISH message to active subscriber CId: {}, topicFilter: {}, qos: {}",
sub.getClientId(), sub.getTopicFilter(), qos);
final MqttProperties.MqttProperty[] properties = prepareSubscriptionProperties(sub);
targetSession.sendNotRetainedPublishOnSessionAtQos(topic, qos, payload, properties);
targetSession.sendPublishOnSessionAtQos(topic, qos, payload, retained, properties);
} else {
// If we are, the subscriber disconnected after the subscriptions tree selected that session as a
// destination.
Expand Down Expand Up @@ -787,12 +794,13 @@ RoutingResults receivedPublishQos2(MQTTConnection connection, MqttPublishMessage
}

final int messageID = msg.variableHeader().packetId();
boolean retainPublish = msg.fixedHeader().isRetain();
final RoutingResults publishRoutings;
if (msg.fixedHeader().isDup()) {
final Set<String> failedClients = failedPublishes.listFailed(clientId, messageID);
publishRoutings = publish2Subscribers(clientId, payload, topic, EXACTLY_ONCE, failedClients);
publishRoutings = publish2Subscribers(clientId, payload, topic, EXACTLY_ONCE, failedClients, retainPublish);
} else {
publishRoutings = publish2Subscribers(clientId, payload, topic, EXACTLY_ONCE);
publishRoutings = publish2Subscribers(clientId, payload, topic, EXACTLY_ONCE, retainPublish);
}
if (publishRoutings.isAllSuccess()) {
// QoS2 PUB message was enqueued successfully to every event loop
Expand Down Expand Up @@ -834,10 +842,11 @@ public RoutingResults internalPublish(MqttPublishMessage msg) {
final ByteBuf payload = msg.payload();
LOG.info("Sending internal PUBLISH message Topic={}, qos={}", topic, qos);

final RoutingResults publishResult = publish2Subscribers(INTERNAL_PUBLISHER, payload, topic, qos);
boolean retainPublish = msg.fixedHeader().isRetain();
final RoutingResults publishResult = publish2Subscribers(INTERNAL_PUBLISHER, payload, topic, qos, retainPublish);
LOG.trace("after routed publishes: {}", publishResult);

if (!msg.fixedHeader().isRetain()) {
if (!retainPublish) {
return publishResult;
}
if (qos == AT_MOST_ONCE || payload.readableBytes() == 0) {
Expand Down
13 changes: 4 additions & 9 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,8 @@ public void sendRetainedPublishOnSessionAtQos(Topic topic, MqttQoS qos, ByteBuf
sendPublishOnSessionAtQos(topic, qos, payload, true, mqttProperties);
}

public void sendNotRetainedPublishOnSessionAtQos(Topic topic, MqttQoS qos, ByteBuf payload,
MqttProperties.MqttProperty... mqttProperties) {
sendPublishOnSessionAtQos(topic, qos, payload, false, mqttProperties);
}

private void sendPublishOnSessionAtQos(Topic topic, MqttQoS qos, ByteBuf payload, boolean retained,
MqttProperties.MqttProperty... mqttProperties) {
void sendPublishOnSessionAtQos(Topic topic, MqttQoS qos, ByteBuf payload, boolean retained,
MqttProperties.MqttProperty... mqttProperties) {
switch (qos) {
case AT_MOST_ONCE:
if (connected()) {
Expand Down Expand Up @@ -310,8 +305,8 @@ private void sendPublishInFlightWindowOrQueueing(Topic topic, MqttQoS qos, ByteB
if (resendInflightOnTimeout) {
inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS));
}
MqttPublishMessage publishMsg = MQTTConnection.createNotRetainedPublishMessage(topic.toString(), qos,
payload, packetId, mqttProperties);
MqttPublishMessage publishMsg = MQTTConnection.createPublishMessage(topic.toString(), qos,
payload, packetId, retained, false, mqttProperties);
localMqttConnectionRef.sendPublish(publishMsg);

drainQueueToConnection();
Expand Down
2 changes: 1 addition & 1 deletion broker/src/test/java/io/moquette/broker/SessionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testPubAckDrainMessagesRemainingInQueue() {

private void sendQoS1To(Session client, Topic destinationTopic, String message) {
final ByteBuf payload = ByteBufUtil.writeUtf8(UnpooledByteBufAllocator.DEFAULT, message);
client.sendNotRetainedPublishOnSessionAtQos(destinationTopic, MqttQoS.AT_LEAST_ONCE, payload);
client.sendPublishOnSessionAtQos(destinationTopic, MqttQoS.AT_LEAST_ONCE, payload, false);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,12 @@ private String createMessage(int qosPub, int qosSub) {
return "Hello world MQTT " + qosPub + " " + qosSub;
}

private String createTopic(int qosPub, int qosSub) {
return "/topic" + qosPub + qosSub;
private String createTopic(String testName, int qosPub, int qosSub) {
return "/topic/" + testName + "/" + qosPub + qosSub;
}

private void sendRetainedAndSubscribe(int qosPub, int qosSub) throws MqttException {
String topic = createTopic(qosPub, qosSub);
private void sendRetainedAndSubscribe(String testName, int qosPub, int qosSub) throws MqttException {
String topic = createTopic(testName, qosPub, qosSub);
String messageString = createMessage(qosPub, qosSub);
clientPublisher.subscribe(topic);
callbackPublisher.reinit();
Expand All @@ -140,13 +140,13 @@ private void sendRetainedAndSubscribe(int qosPub, int qosSub) throws MqttExcepti
}
}

private void unsubscribeSubscriber(int qosPub, int qosSub) throws MqttException {
String topic = createTopic(qosPub, qosSub);
private void unsubscribeSubscriber(String testName, int qosPub, int qosSub) throws MqttException {
String topic = createTopic(testName, qosPub, qosSub);
clientSubscriber.unsubscribe(topic);
}

private void sendEmptyRetainedAndSubscribe(int qosPub, int qosSub) throws MqttException {
String topic = createTopic(qosPub, qosSub);
private void sendEmptyRetainedAndSubscribe(String testName, int qosPub, int qosSub) throws MqttException {
String topic = createTopic(testName, qosPub, qosSub);
callbackPublisher.reinit();
clientPublisher.publish(topic, new byte[0], qosPub, true);
// Wait for the publish to finish
Expand Down Expand Up @@ -204,30 +204,30 @@ static Stream<Arguments> retainedProvider() {
@MethodSource("notRetainedProvider")
public void checkShouldNotRetain(int qosPub, int qosSub) throws MqttException {
LOG.info("*** checkShouldNotRetain: qosPub {}, qosSub {} ***", qosPub, qosSub);
sendRetainedAndSubscribe(qosPub, qosSub);
sendRetainedAndSubscribe("should_not_retain", qosPub, qosSub);
validateMustNotReceive(qosPub);
}

@ParameterizedTest
@MethodSource("retainedProvider")
public void checkShouldRetain(int qosPub, int qosSub) throws MqttException {
LOG.info("*** checkShouldRetain: qosPub {}, qosSub {} ***", qosPub, qosSub);
sendRetainedAndSubscribe(qosPub, qosSub);
sendRetainedAndSubscribe("should_retain", qosPub, qosSub);
validateMustReceive(qosPub, qosSub);
unsubscribeSubscriber(qosPub, qosSub);
sendEmptyRetainedAndSubscribe(qosPub, qosSub);
unsubscribeSubscriber("should_retain", qosPub, qosSub);
sendEmptyRetainedAndSubscribe("should_retain", qosPub, qosSub);
validateMustNotReceive(qosPub);
}

@Test
public void checkQos0CancelsRetain() throws MqttException {
LOG.info("*** checkQos0CancelsRetain ***");
// First send a QoS 2 retain, and check it arrives.
sendRetainedAndSubscribe(2, 2);
sendRetainedAndSubscribe("qos0_cancel_retain", 2, 2);
validateMustReceive(2, 2);
unsubscribeSubscriber(2, 2);
unsubscribeSubscriber("qos0_cancel_retain", 2, 2);
// Then send a QoS 0 retain, and check it cancels the previous retain.
sendRetainedAndSubscribe(0, 2);
sendRetainedAndSubscribe("qos0_cancel_retain", 0, 2);
validateMustNotReceive(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ private static void publish(Mqtt5BlockingClient publisherClient, String topicNam
.send();
}

private static void subscribe(Mqtt5BlockingClient subscriberClient, String topicFilter, MqttQos mqttQos) {
static void subscribe(Mqtt5BlockingClient subscriberClient, String topicFilter, MqttQos mqttQos) {
subscriberClient.subscribeWith()
.topicFilter(topicFilter)
.qos(mqttQos)
Expand Down
Loading