Skip to content

Commit

Permalink
Merge pull request #522 from ChIoT-Tech/master
Browse files Browse the repository at this point in the history
Test for PR 521 (forgot to commit)
  • Loading branch information
MattBrittan authored Jul 19, 2021
2 parents 06f46aa + 37eb0f2 commit b9726c5
Showing 1 changed file with 72 additions and 0 deletions.
72 changes: 72 additions & 0 deletions fvt_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"runtime"
"strconv"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -1545,3 +1546,74 @@ func Test_DisconnectWhileProcessingIncomingPublish(t *testing.T) {
}
p.Disconnect(250) // Close publisher
}

// Test_ResumeSubsMaxInflight - Check the MaxResumePubInFlight option.
// This is difficult to test without control of the broker (because we will be communicating via the broker not
// directly. However due to the way resume works when there is no limit to inflight messages message ordering is not
// guaranteed. However with SetMaxResumePubInFlight(1) it is guaranteed so we use that to test.
// On my PC (using mosquitto under docker) running this without SetMaxResumePubInFlight(1) will fail with 1000 messages
// (generally passes if only 100 are sent). With the option set it always passes.
func Test_ResumeSubsMaxInflight(t *testing.T) {
topic := "/test/ResumeSubsMaxInflight"
var qos byte = 1

// When a connection is made with messages in the store normally it would be expected that many messages will be
// transmitted simultaneously; using MaxResumePubInFlight we can limit this to 1.
// subscribe to topic before establishing a connection, and publish a message after the publish client has connected successfully
sops := NewClientOptions().SetClientID("rsmif-Sub").AddBroker(FVTTCP).SetOrderMatters(true)
s := NewClient(sops) // s = subscriber
if sToken := s.Connect(); sToken.Wait() && sToken.Error() != nil {
t.Fatalf("Error on subscriber Client.Connect(): %v", sToken.Error())
}

incommingMsg := make(chan int, 1000)
var f MessageHandler = func(client Client, msg Message) {
num, _ := strconv.Atoi(string(msg.Payload()))
incommingMsg <- num
}

if sToken := s.Subscribe(topic, qos, f); sToken.Wait() && sToken.Error() != nil {
t.Fatalf("Error on subscriber Client.Subscribe(): %v", sToken.Error())
}

// Now we preload an ordered memory store with 100 messages and connect...
memStore := NewOrderedMemoryStore()
memStore.Open()

for i := 0; i < 1000; i++ {
pub := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket)
pub.Qos = qos
pub.TopicName = topic
pub.Payload = []byte(strconv.Itoa(i))
pub.MessageID = uint16(i + 1)
memStore.Put(outboundKeyFromMID(pub.Details().MessageID), pub)
time.Sleep(time.Nanosecond)
}

pops := NewClientOptions().AddBroker(FVTTCP).SetClientID("rsmif-Pub").SetOrderMatters(false).
SetCleanSession(false).SetStore(memStore).SetMaxResumePubInFlight(1)
p := NewClient(pops)
if pToken := p.Connect(); pToken.Wait() && pToken.Error() != nil { // Note: messages will be received before this completes
t.Fatalf("Error on publisher Client.Connect(): %v", pToken.Error())
}
// We should receive 100 * 1's
timeOut := time.NewTimer(30 * time.Second)
defer timeOut.Stop()
getLoop:
for i := 0; i < 1000; i++ {
select {
case <-timeOut.C:
t.Errorf("timed out waiting for messages (after receiving %d)", i)
break getLoop
case s := <-incommingMsg:
if s != i {
t.Errorf("received message out of order (expected %d, got %d)", i, s)
break getLoop
}
continue
}
}

p.Disconnect(250)
s.Disconnect(250)
}

0 comments on commit b9726c5

Please sign in to comment.