From 1994542eb466bfd3403d4cff41f580eb41db6073 Mon Sep 17 00:00:00 2001 From: Jason Longshore Date: Thu, 4 Apr 2019 10:50:46 -0500 Subject: [PATCH] mqtt-streaming: Fix a memory leak if subscribe/unsubscribe fails --- .../akka/stream/alpakka/mqtt/streaming/impl/ServerState.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/impl/ServerState.scala b/mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/impl/ServerState.scala index e9307bf796..7fc9675a0c 100644 --- a/mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/impl/ServerState.scala +++ b/mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/impl/ServerState.scala @@ -880,6 +880,7 @@ import scala.util.{Failure, Success} ) case UnobtainablePacketId => data.local.failure(SubscribeFailed) + data.subscribed.failure(SubscribeFailed) throw SubscribeFailed } @@ -895,6 +896,7 @@ import scala.util.{Failure, Success} data.subscribed.success(remote) Behaviors.stopped case ReceiveSubAckTimeout => + data.subscribed.failure(SubscribeFailed) throw SubscribeFailed } .receiveSignal { @@ -977,6 +979,7 @@ import scala.util.{Failure, Success} ) case UnobtainablePacketId => data.local.failure(UnsubscribeFailed) + data.unsubscribed.failure(UnsubscribeFailed) throw UnsubscribeFailed } } @@ -992,6 +995,7 @@ import scala.util.{Failure, Success} data.unsubscribed.success(Done) Behaviors.stopped case ReceiveUnsubAckTimeout => + data.unsubscribed.failure(UnsubscribeFailed) throw UnsubscribeFailed } .receiveSignal {