Skip to content

Commit

Permalink
Merge pull request #178 from thingsboard/refactoring/device-publish-m…
Browse files Browse the repository at this point in the history
…sg-handling

Refactor DevicePublishMsg processing to eliminate unnecessary transformation to PublishMsg
  • Loading branch information
dmytro-landiak authored Nov 18, 2024
2 parents e9937e5 + 6c8e03c commit 9d3de7e
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.thingsboard.mqtt.broker.dto.PacketIdDto;
import org.thingsboard.mqtt.broker.dto.SharedSubscriptionPublishPacket;
import org.thingsboard.mqtt.broker.service.analysis.ClientLogger;
import org.thingsboard.mqtt.broker.service.mqtt.PublishMsg;
import org.thingsboard.mqtt.broker.service.mqtt.PublishMsgDeliveryService;
import org.thingsboard.mqtt.broker.service.subscription.shared.SharedSubscriptionCacheService;
import org.thingsboard.mqtt.broker.service.subscription.shared.TopicSharedSubscription;
Expand Down Expand Up @@ -176,11 +175,10 @@ void deliverPersistedMsg(DevicePublishMsg persistedMessage) {
inFlightPacketIds.add(persistedMessage.getPacketId());
}
lastPersistedMsgSentPacketId = persistedMessage.getPacketId();
PublishMsg pubMsg = getPublishMsg(persistedMessage, isDup);
if (msgExpiryResult.isMsgExpiryIntervalPresent()) {
MqttPropertiesUtil.addMsgExpiryIntervalToProps(pubMsg.getProperties(), msgExpiryResult.getMsgExpiryInterval());
MqttPropertiesUtil.addMsgExpiryIntervalToProps(persistedMessage.getProperties(), msgExpiryResult.getMsgExpiryInterval());
}
publishMsgDeliveryService.sendPublishMsgToClient(sessionCtx, pubMsg);
publishMsgDeliveryService.sendPublishMsgToClient(sessionCtx, persistedMessage, isDup);
break;
case PUBREL:
publishMsgDeliveryService.sendPubRelMsgToClient(sessionCtx, persistedMessage.getPacketId());
Expand Down Expand Up @@ -213,11 +211,10 @@ public void process(IncomingPublishMsg msg) {

inFlightPacketIds.add(publishMsg.getPacketId());
try {
PublishMsg pubMsg = getPublishMsg(publishMsg, false);
if (msgExpiryResult.isMsgExpiryIntervalPresent()) {
MqttPropertiesUtil.addMsgExpiryIntervalToProps(publishMsg.getProperties(), msgExpiryResult.getMsgExpiryInterval());
}
publishMsgDeliveryService.sendPublishMsgToClient(sessionCtx, pubMsg);
publishMsgDeliveryService.sendPublishMsgToClient(sessionCtx, publishMsg, false);
clientLogger.logEvent(clientId, this.getClass(), "Delivered msg to device client");
} catch (Exception e) {
log.warn("[{}] Failed to send PUBLISH msg", clientId, e);
Expand All @@ -232,18 +229,6 @@ private void disconnect(String message) {
DisconnectReasonType.ON_ERROR, message)));
}

private PublishMsg getPublishMsg(DevicePublishMsg publishMsg, boolean isDup) {
return PublishMsg.builder()
.packetId(publishMsg.getPacketId())
.topicName(publishMsg.getTopic())
.payload(publishMsg.getPayload())
.qosLevel(publishMsg.getQos())
.isDup(isDup)
.properties(publishMsg.getProperties())
.isRetained(publishMsg.isRetained())
.build();
}

public void processPacketAcknowledge(PacketAcknowledgedEventMsg msg) {
SharedSubscriptionPublishPacket packet = getSharedSubscriptionPublishPacket(msg.getPacketId());
var targetClientId = getTargetClientId(packet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.springframework.stereotype.Service;
import org.thingsboard.mqtt.broker.actors.client.messages.ConnectionAcceptedMsg;
import org.thingsboard.mqtt.broker.actors.client.state.ClientActorStateInfo;
import org.thingsboard.mqtt.broker.common.data.DevicePublishMsg;
import org.thingsboard.mqtt.broker.common.data.util.StringUtils;
import org.thingsboard.mqtt.broker.gen.queue.QueueProtos.PublishMsgProto;
import org.thingsboard.mqtt.broker.service.mqtt.retain.RetainedMsg;
Expand Down Expand Up @@ -205,6 +206,12 @@ public MqttPublishMessage createPubMsg(PublishMsg pubMsg) {
pubMsg.getTopicName(), pubMsg.getPacketId(), pubMsg.getPayload(), pubMsg.getProperties());
}

@Override
public MqttPublishMessage createPubMsg(DevicePublishMsg pubMsg, boolean isDup) {
return getMqttPublishMessage(isDup, pubMsg.getQos(), pubMsg.isRetained(),
pubMsg.getTopic(), pubMsg.getPacketId(), pubMsg.getPayload(), pubMsg.getProperties());
}

@Override
public MqttPublishMessage createPubMsg(PublishMsgProto msg, int qos, boolean retain, String topicName, int packetId, MqttProperties properties) {
return getMqttPublishMessage(false, qos, retain, topicName, packetId, msg.getPayload().toByteArray(), properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.thingsboard.mqtt.broker.adaptor.ProtoConverter;
import org.thingsboard.mqtt.broker.common.data.DevicePublishMsg;
import org.thingsboard.mqtt.broker.gen.queue.QueueProtos.PublishMsgProto;
import org.thingsboard.mqtt.broker.service.historical.stats.TbMessageStatsReportClient;
import org.thingsboard.mqtt.broker.service.mqtt.retain.RetainedMsg;
Expand Down Expand Up @@ -77,14 +78,14 @@ public DefaultPublishMsgDeliveryService(MqttMessageGenerator mqttMessageGenerato
}

@Override
public void sendPublishMsgToClient(ClientSessionCtx sessionCtx, PublishMsg pubMsg) {
public void sendPublishMsgToClient(ClientSessionCtx sessionCtx, DevicePublishMsg pubMsg, boolean isDup) {
if (isTraceEnabled) {
log.trace("[{}] Executing sendPublishMsgToClient {}", sessionCtx.getClientId(), pubMsg);
log.trace("[{}] Executing sendPublishMsgToClient {}, isDup {}", sessionCtx.getClientId(), pubMsg, isDup);
}
pubMsg = sessionCtx.getTopicAliasCtx().createPublishMsgUsingTopicAlias(pubMsg, minTopicNameLengthForAliasReplacement);
MqttPublishMessage mqttPubMsg = mqttMessageGenerator.createPubMsg(pubMsg);
MqttPublishMessage mqttPubMsg = mqttMessageGenerator.createPubMsg(pubMsg, isDup);
tbMessageStatsReportClient.reportStats(OUTGOING_MSGS);
tbMessageStatsReportClient.reportClientReceiveStats(sessionCtx.getClientId(), pubMsg.getQosLevel());
tbMessageStatsReportClient.reportClientReceiveStats(sessionCtx.getClientId(), pubMsg.getQos());
sendPublishMsgToClient(sessionCtx, mqttPubMsg, persistentWriteAndFlush, persistentBufferedMsgCount);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import org.thingsboard.mqtt.broker.actors.client.messages.ConnectionAcceptedMsg;
import org.thingsboard.mqtt.broker.actors.client.state.ClientActorStateInfo;
import org.thingsboard.mqtt.broker.common.data.DevicePublishMsg;
import org.thingsboard.mqtt.broker.gen.queue.QueueProtos.PublishMsgProto;
import org.thingsboard.mqtt.broker.service.mqtt.retain.RetainedMsg;

Expand All @@ -50,6 +51,8 @@ public interface MqttMessageGenerator {

MqttPublishMessage createPubMsg(PublishMsg pubMsg);

MqttPublishMessage createPubMsg(DevicePublishMsg pubMsg, boolean isDup);

MqttPublishMessage createPubMsg(PublishMsgProto publishMsgProto, int qos, boolean retain, String topicName, int packetId, MqttProperties properties);

MqttPublishMessage createPubRetainMsg(int msgId, RetainedMsg retainedMsg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
*/
package org.thingsboard.mqtt.broker.service.mqtt;

import org.thingsboard.mqtt.broker.common.data.DevicePublishMsg;
import org.thingsboard.mqtt.broker.gen.queue.QueueProtos.PublishMsgProto;
import org.thingsboard.mqtt.broker.service.mqtt.retain.RetainedMsg;
import org.thingsboard.mqtt.broker.service.subscription.Subscription;
import org.thingsboard.mqtt.broker.session.ClientSessionCtx;

public interface PublishMsgDeliveryService {

void sendPublishMsgToClient(ClientSessionCtx sessionCtx, PublishMsg publishMsg);
void sendPublishMsgToClient(ClientSessionCtx sessionCtx, DevicePublishMsg publishMsg, boolean isDup);

void sendPublishMsgProtoToClient(ClientSessionCtx sessionCtx, PublishMsgProto publishMsgProto);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.mqtt.broker.common.data.BrokerConstants;
import org.thingsboard.mqtt.broker.common.data.DevicePublishMsg;
import org.thingsboard.mqtt.broker.exception.MqttException;
import org.thingsboard.mqtt.broker.gen.queue.QueueProtos.PublishMsgProto;
import org.thingsboard.mqtt.broker.service.mqtt.PublishMsg;
Expand Down Expand Up @@ -79,26 +80,40 @@ public String getTopicNameByAlias(PublishMsg publishMsg) {

public PublishMsg createPublishMsgUsingTopicAlias(PublishMsg publishMsg, int minTopicNameLengthForAliasReplacement) {
if (enabled) {
String topicName = publishMsg.getTopicName();
if (topicName.length() > minTopicNameLengthForAliasReplacement) {
MqttProperties properties = publishMsg.getProperties();
boolean setEmptyTopic = updateTopicAlias(publishMsg.getTopicName(), publishMsg.getProperties(), minTopicNameLengthForAliasReplacement);
if (setEmptyTopic) {
return publishMsg.toBuilder().topicName(BrokerConstants.EMPTY_STR).build();
}
}
return publishMsg;
}

Integer topicAlias = serverMappings.get(topicName);
if (topicAlias == null) {
int nextTopicAlias = getNextTopicAlias(topicName);
if (nextTopicAlias == 0) {
return publishMsg;
}
MqttPropertiesUtil.addTopicAliasToProps(properties, nextTopicAlias);
return getPublishMsg(publishMsg, topicName, properties);
}
MqttPropertiesUtil.addTopicAliasToProps(properties, topicAlias);
return getPublishMsg(publishMsg, BrokerConstants.EMPTY_STR, properties);
public DevicePublishMsg createPublishMsgUsingTopicAlias(DevicePublishMsg publishMsg, int minTopicNameLengthForAliasReplacement) {
if (enabled) {
boolean setEmptyTopic = updateTopicAlias(publishMsg.getTopic(), publishMsg.getProperties(), minTopicNameLengthForAliasReplacement);
if (setEmptyTopic) {
return publishMsg.toBuilder().topic(BrokerConstants.EMPTY_STR).build();
}
}
return publishMsg;
}

private boolean updateTopicAlias(String topicName, MqttProperties properties, int minTopicNameLengthForAliasReplacement) {
if (topicName.length() <= minTopicNameLengthForAliasReplacement) {
return false;
}
Integer topicAlias = serverMappings.get(topicName);
if (topicAlias != null) {
MqttPropertiesUtil.addTopicAliasToProps(properties, topicAlias);
return true;
}
int nextTopicAlias = getNextTopicAlias(topicName);
if (nextTopicAlias != 0) {
MqttPropertiesUtil.addTopicAliasToProps(properties, nextTopicAlias);
}
return false;
}

public TopicAliasResult getTopicAliasResult(PublishMsgProto publishMsgProto, int minTopicNameLengthForAliasReplacement) {
if (enabled) {
String topicName = publishMsgProto.getTopicName();
Expand All @@ -117,13 +132,6 @@ public TopicAliasResult getTopicAliasResult(PublishMsgProto publishMsgProto, int
return null;
}

private PublishMsg getPublishMsg(PublishMsg publishMsg, String topicName, MqttProperties properties) {
return publishMsg.toBuilder()
.topicName(topicName)
.properties(properties)
.build();
}

void validateTopicAlias(int topicAlias) {
if (topicAlias == 0) {
throw new MqttException("Topic Alias is zero.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -237,7 +238,7 @@ public void givenDevicePublishMsg_whenDeliverPersistedPubrelMsg_thenVerifiedMeth
.build();
persistedDeviceActorMessageProcessor.deliverPersistedMsg(devicePublishMsg);

verify(publishMsgDeliveryService, never()).sendPublishMsgToClient(any(), any());
verify(publishMsgDeliveryService, never()).sendPublishMsgToClient(any(), any(), anyBoolean());
verify(publishMsgDeliveryService).sendPubRelMsgToClient(eq(ctx), eq(1));
}

Expand All @@ -256,7 +257,7 @@ public void givenExpiredDevicePublishMsg_whenDeliverPersistedPublishMsg_thenVeri
.build();
persistedDeviceActorMessageProcessor.deliverPersistedMsg(devicePublishMsg);

verify(publishMsgDeliveryService, never()).sendPublishMsgToClient(any(), any());
verify(publishMsgDeliveryService, never()).sendPublishMsgToClient(any(), any(), anyBoolean());
verify(publishMsgDeliveryService, never()).sendPubRelMsgToClient(any(), anyInt());
}

Expand All @@ -276,7 +277,7 @@ public void givenDevicePublishMsg_whenDeliverPersistedPublishMsg_thenVerifiedMet
.build();
persistedDeviceActorMessageProcessor.deliverPersistedMsg(devicePublishMsg);

verify(publishMsgDeliveryService).sendPublishMsgToClient(any(), any());
verify(publishMsgDeliveryService).sendPublishMsgToClient(any(), any(), anyBoolean());
verify(publishMsgDeliveryService, never()).sendPubRelMsgToClient(any(), anyInt());

assertEquals(1, persistedDeviceActorMessageProcessor.getInFlightPacketIds().size());
Expand All @@ -298,7 +299,7 @@ public void givenDevicePublishMsg_whenSerialNumberIsLessThanLastPersistedMsgSent

persistedDeviceActorMessageProcessor.process(new IncomingPublishMsg(devicePublishMsg));

verify(publishMsgDeliveryService, never()).sendPublishMsgToClient(any(), any());
verify(publishMsgDeliveryService, never()).sendPublishMsgToClient(any(), any(), anyBoolean());
}

@Test
Expand All @@ -315,7 +316,7 @@ public void givenDevicePublishMsg_whenMessageIsExpired_thenStopProcessing() {

persistedDeviceActorMessageProcessor.process(new IncomingPublishMsg(devicePublishMsg));

verify(publishMsgDeliveryService, never()).sendPublishMsgToClient(any(), any());
verify(publishMsgDeliveryService, never()).sendPublishMsgToClient(any(), any(), anyBoolean());
}

@Test
Expand All @@ -335,7 +336,7 @@ public void givenDevicePublishMsg_whenMessageIsNotExpired_thenProcessingIsCorrec

persistedDeviceActorMessageProcessor.process(new IncomingPublishMsg(devicePublishMsg));

verify(publishMsgDeliveryService).sendPublishMsgToClient(any(), any());
verify(publishMsgDeliveryService).sendPublishMsgToClient(any(), any(), anyBoolean());

assertTrue(persistedDeviceActorMessageProcessor.getInFlightPacketIds().contains(1));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.junit.Assert;
import org.junit.Test;
import org.thingsboard.mqtt.broker.common.data.BrokerConstants;
import org.thingsboard.mqtt.broker.common.data.DevicePublishMsg;
import org.thingsboard.mqtt.broker.exception.MqttException;
import org.thingsboard.mqtt.broker.gen.queue.QueueProtos;
import org.thingsboard.mqtt.broker.service.mqtt.PublishMsg;
Expand Down Expand Up @@ -128,6 +129,21 @@ public void givenPubMsgWithSmallTopic_whenCreatePublishMsgUsingTopicAlias_thenRe
Assert.assertEquals(publishMsg, publishMsgUsingTopicAlias);
}

@Test
public void givenDevicePubMsgWithSmallTopic_whenCreateDevicePublishMsgUsingTopicAlias_thenReturnSamePubMsg() {
topicAliasCtx = new TopicAliasCtx(true, 5, new ConcurrentHashMap<>(Map.of(1, "topic123")), null);

DevicePublishMsg publishMsg = DevicePublishMsg
.builder()
.topic("topic/1")
.properties(new MqttProperties())
.build();

DevicePublishMsg publishMsgUsingTopicAlias = topicAliasCtx.createPublishMsgUsingTopicAlias(publishMsg, minTopicNameLengthForAliasReplacement);

Assert.assertEquals(publishMsg, publishMsgUsingTopicAlias);
}

@Test
public void givenPubMsgWithTopicAndMaxAllowedAliases_whenCreatePublishMsgUsingTopicAlias_thenReturnSamePubMsg() {
topicAliasCtx = new TopicAliasCtx(true, 1, new ConcurrentHashMap<>(), new ConcurrentHashMap<>(Map.of("topic123", 1)));
Expand All @@ -143,6 +159,21 @@ public void givenPubMsgWithTopicAndMaxAllowedAliases_whenCreatePublishMsgUsingTo
Assert.assertEquals(publishMsg, publishMsgUsingTopicAlias);
}

@Test
public void givenDevicePubMsgWithTopicAndMaxAllowedAliases_whenCreateDevicePublishMsgUsingTopicAlias_thenReturnSamePubMsg() {
topicAliasCtx = new TopicAliasCtx(true, 1, new ConcurrentHashMap<>(), new ConcurrentHashMap<>(Map.of("topic123", 1)));

DevicePublishMsg publishMsg = DevicePublishMsg
.builder()
.topic("topic/qwerty")
.properties(new MqttProperties())
.build();

DevicePublishMsg publishMsgUsingTopicAlias = topicAliasCtx.createPublishMsgUsingTopicAlias(publishMsg, minTopicNameLengthForAliasReplacement);

Assert.assertEquals(publishMsg, publishMsgUsingTopicAlias);
}

@Test
public void givenPubMsgWithTopic_whenCreatePublishMsgUsingTopicAlias_thenReturnUpdatedPubMsg() {
topicAliasCtx = new TopicAliasCtx(true, 5, new ConcurrentHashMap<>(), new ConcurrentHashMap<>(Map.of("topic123", 1)));
Expand All @@ -161,6 +192,24 @@ public void givenPubMsgWithTopic_whenCreatePublishMsgUsingTopicAlias_thenReturnU
Assert.assertEquals(2, topicAliasCtx.getServerMappings().size());
}

@Test
public void givenDevicePubMsgWithTopic_whenCreateDevicePublishMsgUsingTopicAlias_thenReturnUpdatedPubMsg() {
topicAliasCtx = new TopicAliasCtx(true, 5, new ConcurrentHashMap<>(), new ConcurrentHashMap<>(Map.of("topic123", 1)));

DevicePublishMsg publishMsg = DevicePublishMsg
.builder()
.topic("topic/qwerty")
.properties(new MqttProperties())
.build();

DevicePublishMsg publishMsgUsingTopicAlias = topicAliasCtx.createPublishMsgUsingTopicAlias(publishMsg, minTopicNameLengthForAliasReplacement);

Assert.assertEquals("topic/qwerty", publishMsgUsingTopicAlias.getTopic());
int topicAlias = (int) publishMsgUsingTopicAlias.getProperties().getProperty(BrokerConstants.TOPIC_ALIAS_PROP_ID).value();
Assert.assertEquals(2, topicAlias);
Assert.assertEquals(2, topicAliasCtx.getServerMappings().size());
}

@Test
public void givenPubMsgWithTopicAndExisingMapping_whenCreatePublishMsgUsingTopicAlias_thenReturnUpdatedPubMsgWithEmptyTopic() {
topicAliasCtx = new TopicAliasCtx(true, 5, new ConcurrentHashMap<>(), new ConcurrentHashMap<>(Map.of("topic123456", 1)));
Expand All @@ -179,6 +228,24 @@ public void givenPubMsgWithTopicAndExisingMapping_whenCreatePublishMsgUsingTopic
Assert.assertEquals(1, topicAliasCtx.getServerMappings().size());
}

@Test
public void givenDevicePubMsgWithTopicAndExisingMapping_whenCreateDevicePublishMsgUsingTopicAlias_thenReturnUpdatedPubMsgWithEmptyTopic() {
topicAliasCtx = new TopicAliasCtx(true, 5, new ConcurrentHashMap<>(), new ConcurrentHashMap<>(Map.of("topic123456", 1)));

DevicePublishMsg publishMsg = DevicePublishMsg
.builder()
.topic("topic123456")
.properties(new MqttProperties())
.build();

DevicePublishMsg publishMsgUsingTopicAlias = topicAliasCtx.createPublishMsgUsingTopicAlias(publishMsg, minTopicNameLengthForAliasReplacement);

Assert.assertEquals(BrokerConstants.EMPTY_STR, publishMsgUsingTopicAlias.getTopic());
int topicAlias = (int) publishMsgUsingTopicAlias.getProperties().getProperty(BrokerConstants.TOPIC_ALIAS_PROP_ID).value();
Assert.assertEquals(1, topicAlias);
Assert.assertEquals(1, topicAliasCtx.getServerMappings().size());
}

@Test
public void givenDisabledTopicAliasCtx_whenGetTopicAliasResult_thenReturnNull() {
topicAliasCtx = new TopicAliasCtx(false, 5, new ConcurrentHashMap<>(), new ConcurrentHashMap<>());
Expand Down

0 comments on commit 9d3de7e

Please sign in to comment.