Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alpakka Mqtt streaming continuos failure #1833

Closed
idarlington opened this issue Jul 22, 2019 · 16 comments
Closed

Alpakka Mqtt streaming continuos failure #1833

idarlington opened this issue Jul 22, 2019 · 16 comments
Milestone

Comments

@idarlington
Copy link

idarlington commented Jul 22, 2019

Versions used

  • Akka version: 2.5.23
  • HiveMQ: 4.1.1
  • Alpakka-mqtt-streaming: 2.12_1.1.0
  • Scala: 2.12.8

Source is created this way:

private val tcpConnectionFlow: Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]] =
    Tcp().outgoingConnection(mqttBrokerConfig.host, mqttBrokerConfig.port)

private val mqttFlow: Flow[Command[Nothing], Either[MqttCodec.DecodeError, Event[Nothing]], NotUsed] =
    Mqtt
      .clientSessionFlow(
        mqttClientSession,
        ByteString(mqttBrokerConfig.clientId)
      )
      .join(tcpConnectionFlow)

val (commands, source)  = Source
      .queue(
        mqttBrokerConfig.connectorBuffer,
        OverflowStrategy.backpressure
      )
      .via(mqttFlow)
      .collect {
        case Right(Event(publishEvent: Publish, _)) => publishEvent
      }
      .preMaterialize()

commands.offer(
  Command(
    Connect(
      config.clientId,
      ConnectFlags.CleanSession,
      config.username,
      config.password
    )
  )
)

commands.offer(Command(Subscribe(mqttBrokerConfig.topics)))

Expected Behavior

Source doesn't get killed but continues streaming events.

Actual Behavior

Source gets killed and this error shows up:

akka.stream.alpakka.mqtt.streaming.impl.Subscriber$SubscribeFailed$: null

Relevant logs

Stack trace:

2019-07-18 08:00:38,343 ERROR a.a.t.i.a.ActorAdapter [default-akka.actor.default-dispatcher-17] null
akka.stream.alpakka.mqtt.streaming.impl.Subscriber$SubscribeFailed$: null
2019-07-18 08:00:38,343 ERROR a.a.t.i.a.ActorAdapter [default-akka.actor.default-dispatcher-17] null
akka.stream.alpakka.mqtt.streaming.impl.Subscriber$SubscribeFailed$: null
2019-07-18 08:00:38,345 DEBUG a.s.a.m.s.i.Subscriber$ [default-akka.actor.default-dispatcher-2] Cancel all timers
2019-07-18 08:00:38,345 DEBUG a.s.a.m.s.i.Subscriber$ [default-akka.actor.default-dispatcher-2] Cancel all timers
2019-07-18 08:00:38,357 DEBUG a.s.Materializer [default-akka.actor.default-dispatcher-16] [client-commandFlow] Upstream failed, cause: ActorMqttClientSession$SubscribeFailed$: null
2019-07-18 08:00:38,357 DEBUG a.s.Materializer [default-akka.actor.default-dispatcher-16] [client-commandFlow] Upstream failed, cause: ActorMqttClientSession$SubscribeFailed$: null
2019-07-18 08:00:38,358 DEBUG a.a.RepointableActorRef [default-akka.actor.default-dispatcher-14] Aborting tcp connection to [redacted] because of upstream failure: akka.stream.alpakka.mqtt.streaming.scaladsl.ActorMqttClientSession$SubscribeFailed$
2019-07-18 08:00:38,358 DEBUG a.a.RepointableActorRef [default-akka.actor.default-dispatcher-14] Aborting tcp connection to [redacted] because of upstream failure: akka.stream.alpakka.mqtt.streaming.scaladsl.ActorMqttClientSession$SubscribeFailed$
2019-07-18 08:00:38,359 DEBUG a.s.Materializer [default-akka.actor.default-dispatcher-4] [client-events] Upstream failed, cause: ActorMqttClientSession$SubscribeFailed$: null
2019-07-18 08:00:38,359 DEBUG a.s.Materializer [default-akka.actor.default-dispatcher-4] [client-events] Upstream failed, cause: ActorMqttClientSession$SubscribeFailed$: null

Reproducible Test Case

I noticed this is the hivemq logs when it happens:

broker_1               | 2019-07-18 14:57:26,859 DEBUG - Flushing Flusher@7b3c4d7c[queueSize=1,aggregateSize=0,terminated=null]
broker_1               | 2019-07-18 14:57:26,859 DEBUG - Flusher@7b3c4d7c[queueSize=0,aggregateSize=0,terminated=null] processing 1 entries: [FrameEntry[TEXT[len=2415,fin=true,rsv=1..,masked=false],org.eclipse.jetty.websocket.common.extensions.compress.CompressExtension$Flusher@58044887[PROCESSING],OFF,null]]
broker_1               | 2019-07-18 14:57:26,859 DEBUG - Flusher@7b3c4d7c[queueSize=0,aggregateSize=0,terminated=null] flushing 1 frames: [FrameEntry[TEXT[len=2415,fin=true,rsv=1..,masked=false],org.eclipse.jetty.websocket.common.extensions.compress.CompressExtension$Flusher@58044887[PROCESSING],OFF,null]]
broker_1               | 2019-07-18 14:57:26,859 DEBUG - write: WriteFlusher@1893f5e1{IDLE}->null [DirectByteBuffer@137b706c[p=0,l=4,c=1024,r=4]={<<<\xC1~\to>>>\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00...\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00},HeapByteBuffer@4f766ff0[p=0,l=2415,c=2419,r=2415]={<<<\xEc\x9c]\x8f\xDbD\x14\x86\xFf\x8a\x15\t\t\xA4\xA8x<...\x0b\x91q\x7f\xDc\xAd\xBb_\xA8\xA1\xDa}\xD7\xFe\x04>>>\x00\x00\xFf\xFf}]
broker_1               | 2019-07-18 14:57:26,859 DEBUG - update WriteFlusher@1893f5e1{WRITING}->null:IDLE-->WRITING
broker_1               | 2019-07-18 14:57:26,860 DEBUG - flushed 2419 SocketChannelEndPoint@45d16ef7{/172.23.0.1:58168<->/172.23.0.2:8080,OPEN,fill=FI,flush=W,to=5034/300000}{io=1/1,kio=1,kro=1}->WebSocketServerConnection@82e4b9a8[ios=IOState@dbe784c[OPEN,in,out],f=Flusher@7b3c4d7c[queueSize=0,aggregateSize=0,terminated=null],g=Generator[SERVER,validating,+rsv1],p=Parser@17a19726[ExtensionStack,s=START,c=0,len=0,f=null]]
broker_1               | 2019-07-18 14:57:26,861 DEBUG - Flushed=true written=2419 remaining=0 WriteFlusher@1893f5e1{WRITING}->null
broker_1               | 2019-07-18 14:57:26,861 DEBUG - update WriteFlusher@1893f5e1{IDLE}->null:WRITING-->IDLE
broker_1               | 2019-07-18 14:57:26,861 DEBUG - Flushing Flusher@7b3c4d7c[queueSize=0,aggregateSize=0,terminated=null]
broker_1               | 2019-07-18 14:57:26,862 DEBUG - Flusher@7b3c4d7c[queueSize=0,aggregateSize=0,terminated=null] processing 0 entries: []
broker_1               | 2019-07-18 14:57:26,862 DEBUG - Processing null
@huntc
Copy link
Contributor

huntc commented Jul 22, 2019 via email

@idarlington
Copy link
Author

@huntc The source actually consumes messages, then shuts down. In the actual implementation, we used a restartable source and found the bug as it was continuously restarting after consuming events for a while.

@huntc
Copy link
Contributor

huntc commented Jul 22, 2019

The exception you’re seeing is because there’s no SUBACK. Wireshark should confirm this for you. If you see there’s a SUBACK then we have a problem. Can you investigate a bit more? Also, a reproducer would be great.

Note we’ve not seen problems in this area.

Thanks again for the report.

@huntc
Copy link
Contributor

huntc commented Jul 22, 2019

Also, you may want to pass in a promise to the SUBSCRIBE command. That promise will be returned when you receive the SUBACK. May be you need this synchronisation.

@idarlington
Copy link
Author

Thanks for the pointer on SUBACK @huntc

So the client was trying to subscribe to some topics where it didn't have permissions. It got a successful SUBACK for the topics with the right permission and a BadSubAckMessage for the topics without permission.

This was what I noticed in the log:

[info] 2019-07-24 11:44:35,217 DEBUG a.s.Materializer [default-akka.actor.default-dispatcher-6] [client-events] Element: Left(BadSubAckMessage(PacketId(1),Vector(ControlPacketFlags(128), ControlPacketFlags(128), ControlPacketFlags(128), ControlPacketFlags(128), ControlPacketFlags(128), ControlPacketFlags(128), ControlPacketFlags(128), ControlPacketFlags(128), ControlPacketFlags(128))))
[info] 2019-07-24 11:44:35,217 DEBUG a.s.Materializer [default-akka.actor.default-dispatcher-6] [client-events] Element: Left(BadSubAckMessage(PacketId(1),Vector(ControlPacketFlags(128), ControlPacketFlags(128), ControlPacketFlags(128), ControlPacketFlags(128), ControlPacketFlags(128), ControlPacketFlags(128), ControlPacketFlags(128), ControlPacketFlags(128), ControlPacketFlags(128))))

Resolved this by configuring the app to subscribe to all the right topics. It will help if the error messages are more helpful.

@huntc
Copy link
Contributor

huntc commented Jul 24, 2019

I'm very pleased that you found the problem. Any thoughts on how we can improve the messages? It is always hard to know what to log, as we don't want to log too much and, not being able to subscribe isn't technically an error. Subscriptions should of course always be checked for their success.

@huntc
Copy link
Contributor

huntc commented Jul 24, 2019

Perhaps we should even consider updating our docs to cover-off subscription checking?

@idarlington
Copy link
Author

Thanks @huntc

For the error messages, rather than have Subscribe failure without some information like:

[info] akka.stream.alpakka.mqtt.streaming.impl.Subscriber$SubscribeFailed$: null

I will prefer the SubscribeFailed failure comes with some description like:

[info] akka.stream.alpakka.mqtt.streaming.impl.Subscriber$SubscribeFailed$: BadSubAckMessage(...)

Let me know what you think.

@huntc
Copy link
Contributor

huntc commented Jul 25, 2019

In the case of SubscribeFailed it is failing given a timeout. A more explicit reason along with the topic in question might help though. Feel like a PR? :-)

@idarlington
Copy link
Author

Sure :)

@idarlington
Copy link
Author

idarlington commented Jul 25, 2019

So @huntc I noticed:

When the event flow receives Subscribe Ack composed of Failures and Sucess, it decodes it as a BadSubAckMessage.

I am not sure that is correct behavior. Should the flow fail because of partial subscription failure?

for eg

MQ Telemetry Transport Protocol, Subscribe Ack
    Header Flags: 0x90, Message Type: Subscribe Ack
    Msg Len: 11
    Message Identifier: 1
    Granted QoS: At most once delivery (Fire and Forget) (0)
    Granted QoS: At most once delivery (Fire and Forget) (0)
    Granted QoS: Failure (128)
    Granted QoS: Failure (128)
    Granted QoS: Failure (128)
    Granted QoS: Failure (128)
    Granted QoS: Failure (128)
    Granted QoS: At most once delivery (Fire and Forget) (0)
    Granted QoS: At most once delivery (Fire and Forget) (0)

results to:

[info] 2019-07-25 16:23:47,576 DEBUG a.s.Materializer [default-akka.actor.default-dispatcher-28] [client-events] Element: Left(BadSubAckMessage(PacketId(2),Vector(ControlPacketFlags(0), ControlPacketFlags(0), ControlPacketFlags(128), ControlPacketFlags(128), ControlPacketFlags(128), ControlPacketFlags(128), ControlPacketFlags(128), ControlPacketFlags(0), ControlPacketFlags(0))))

@huntc
Copy link
Contributor

huntc commented Jul 25, 2019

I think that it is an application concern as to whether the flow should fail or not.

@idarlington
Copy link
Author

I am not sure the application has control over that because the Subscriber never gets the Subscribe Ack since it is decoded as BadSubAck message. Therefore it times out and the flow shuts down.

Is there some other way to control this?

@huntc
Copy link
Contributor

huntc commented Jul 26, 2019

Sounds like a bug. I can try and take a look over the weekend.

@huntc
Copy link
Contributor

huntc commented Jul 26, 2019

If you get a chance to raise a PR for adding a unit test to reproduce the condition then that could help me too. Thanks for the report.

@huntc
Copy link
Contributor

huntc commented Jul 28, 2019

@idarlington Please review #1845 - perhaps verify by building the PR branch locally.

@ennru ennru added this to the 1.1.1 milestone Aug 5, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants