Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server is not making qos 1/2 assurance when subscribed client not response puback #439

Open
cxshun opened this issue Nov 25, 2024 · 1 comment

Comments

@cxshun
Copy link

cxshun commented Nov 25, 2024

Background

We are testing qos assurance, but found that mochi-mqtt is not assure qos 1message's delivery.
But I walk through mochi-mqtt's readme. It says it implements Full qos 1/2.

Our testcase demonstrate like below:

  1. we create a mqtt client to subscribe specify topic like /testTopic, but not response puback packet.
  2. we publish mqtt message to the topic /testTopic with another client.
    So with this testcase, we want this message to resend interval.

Code walk through

I'm not sure if I was making the right conclusion.
Here I walk through the code, but not finding anything about qos =1/2's retrying.
Below is the code that I was walked through, Please feel free If I was missing something.

  1. publishToClient - github.com/mochi-mqtt/server/v2/server.go
select {
	case cl.State.outbound <- &out:
		atomic.AddInt32(&cl.State.outboundQty, 1)
	default:
		atomic.AddInt64(&s.Info.MessagesDropped, 1)
		cl.ops.hooks.OnPublishDropped(cl, pk)
		if out.FixedHeader.Qos > 0 {
			cl.State.Inflight.Delete(out.PacketID) // packet was dropped due to irregular circumstances, so rollback inflight.
			cl.State.Inflight.IncreaseSendQuota()
		}
		return out, packets.ErrPendingClientWritesExceeded
	}
  1. WriteLoop - github.com/mochi-mqtt/server/v2/clients.go
// WriteLoop ranges over pending outbound messages and writes them to the client connection.
func (cl *Client) WriteLoop() {
	for {
		select {
		case pk := <-cl.State.outbound:
			if err := cl.WritePacket(*pk); err != nil {
				// TODO : Figure out what to do with error
				cl.ops.log.Debug("failed publishing packet", "error", err, "client", cl.ID, "packet", pk)
			}
			atomic.AddInt32(&cl.State.outboundQty, -1)
		case <-cl.State.open.Done():
			return
		}
	}
}

I found that after WritePacket, if it didn't encouter any error, It will treat the message as publish success.

Expect

When subscribed client didn't response puback, server must resend message to ensure qos =1 or 2.

Please feel free to tell me that if I was missing something.

@cxshun cxshun changed the title server is not making qos 1 assurance when subscribed client not response puback server is not making qos 1/2 assurance when subscribed client not response puback Nov 25, 2024
@thedevop
Copy link
Collaborator

thedevop commented Nov 27, 2024

Given MQTT is based on TCP, with well behaved (conforming to MQTT spec) client and server, only when there is a break in the TCP connection may result in server not receive PubAck. Hence, the un-acked messages are only resent when the client reconnects to the server.

server/server.go

Lines 467 to 468 in 8f52b89

if sessionPresent {
err = cl.ResendInflightMessages(true)

In addition, with MQTT, there is Publisher, Broker and Subscriber:

Publisher ---> Broker ---> Subscriber

Each segment has its own respective QoS. For example, the publish QoS only dictates the delivery guarantee for Publisher to the Broker portion. Subscriber defines its QoS in the subscription message. Generally you can consider the end-to-end QoS is the min(Publisher QoS, Subscriber QoS), assuming the server allows up to QoS 2.

Given the Publish API is to "simulate" a Publisher publishing to a topic, it returns when the Broker can accept the message.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants