Skip to content

Commit

Permalink
Don’t let WatchedActorTerminatedException fail the session streams
Browse files Browse the repository at this point in the history
We want WatchedActorTerminatedException to complete stream processing as before, but not with a failure. A session shutting down isn’t in itself a failure.
  • Loading branch information
huntc authored and longshorej committed Mar 26, 2019
1 parent dfcc3fb commit 1ac2674
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ final class ActorMqttClientSession(settings: MqttSessionSettings)(implicit mat:
case c: Command[A] => throw new IllegalStateException(c + " is not a client command")
}
)
.recover {
case _: WatchedActorTerminatedException => ByteString.empty
}
.filter(_.nonEmpty)
.log("client-commandFlow", _.iterator.decodeControlPacket(settings.maxPacketSize)) // we decode here so we can see the generated packet id
.withAttributes(ActorAttributes.logLevels(onFailure = Logging.DebugLevel))
Expand Down Expand Up @@ -330,6 +333,9 @@ final class ActorMqttClientSession(settings: MqttSessionSettings)(implicit mat:
case _ =>
Supervision.Stop
})
.recoverWithRetries(-1, {
case _: WatchedActorTerminatedException => Source.empty
})
.withAttributes(ActorAttributes.logLevels(onFailure = Logging.DebugLevel))
}

Expand Down Expand Up @@ -538,6 +544,9 @@ final class ActorMqttServerSession(settings: MqttSessionSettings)(implicit mat:
case c: Command[A] => throw new IllegalStateException(c + " is not a server command")
}
)
.recover {
case _: WatchedActorTerminatedException => ByteString.empty
}
.filter(_.nonEmpty)
.log("server-commandFlow", _.iterator.decodeControlPacket(settings.maxPacketSize)) // we decode here so we can see the generated packet id
.withAttributes(ActorAttributes.logLevels(onFailure = Logging.DebugLevel))
Expand Down Expand Up @@ -620,5 +629,8 @@ final class ActorMqttServerSession(settings: MqttSessionSettings)(implicit mat:
case _ =>
Supervision.Stop
})
.recoverWithRetries(-1, {
case _: WatchedActorTerminatedException => Source.empty
})
.withAttributes(ActorAttributes.logLevels(onFailure = Logging.DebugLevel))
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import akka.stream.alpakka.mqtt.streaming.scaladsl.{
}
import akka.stream.scaladsl.{BroadcastHub, Flow, Keep, Sink, Source, SourceQueueWithComplete}
import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import akka.stream.{ActorMaterializer, Materializer, OverflowStrategy, WatchedActorTerminatedException}
import akka.stream.{ActorMaterializer, Materializer, OverflowStrategy}
import akka.testkit._
import akka.util.{ByteString, Timeout}
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
Expand Down Expand Up @@ -1172,14 +1172,14 @@ class MqttSessionSpec
.clientSessionFlow(session, ByteString("1"))
.join(pipeToServer)
)
.toMat(Sink.ignore)(Keep.both)
.toMat(Sink.headOption)(Keep.both)
.run()

val connect = Connect("some-client-id", ConnectFlags.None)

client.offer(Command(connect))

result.failed.futureValue shouldBe a[WatchedActorTerminatedException]
result.futureValue shouldBe None

client.complete()
client.watchCompletion().foreach(_ => session.shutdown())
Expand Down Expand Up @@ -1283,7 +1283,7 @@ class MqttSessionSpec

fromClientQueue.complete()

result.futureValue.apply(0) shouldBe Right(Event(connect))
result.futureValue.head shouldBe Right(Event(connect))
result.futureValue.apply(1) match {
case Right(Event(s: Subscribe, _)) => s.topicFilters shouldBe subscribe.topicFilters
case x => fail("Unexpected: " + x)
Expand Down Expand Up @@ -1390,7 +1390,7 @@ class MqttSessionSpec
val unsubscribe = Unsubscribe("some-topic")
val unsubscribeReceived = Promise[Done]

val (server, result) =
val server =
Source
.queue[Command[Nothing]](1, OverflowStrategy.fail)
.via(
Expand All @@ -1406,7 +1406,7 @@ class MqttSessionSpec
case Right(Event(cp: Unsubscribe, _)) if cp.topicFilters == unsubscribe.topicFilters =>
unsubscribeReceived.success(Done)
})
.toMat(Sink.seq)(Keep.both)
.toMat(Sink.ignore)(Keep.left)
.run()

val connectBytes = connect.encode(ByteString.newBuilder).result()
Expand Down

0 comments on commit 1ac2674

Please sign in to comment.