Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throttle number of publish messages #306

Open
johnhydroware opened this issue Apr 15, 2019 · 1 comment
Open

Throttle number of publish messages #306

johnhydroware opened this issue Apr 15, 2019 · 1 comment

Comments

@johnhydroware
Copy link
Contributor

Some MQTT brokers have an limit om number of publish messages(AWS IoT). Since resend and other functionality is build into the client, this can't easily be handled by the application code. Therefore I suggest an throttle option. See code below for my suggestion on a parameter to set number of messages per second. I'm not super happy about the implementation. Any suggestions are welcome:

index 83349a5..0671834 100644
--- a/net.go
+++ b/net.go
@@ -165,14 +165,33 @@ func incoming(c *client) {
 func outgoing(c *client) {
        defer c.workers.Done()
        DEBUG.Println(NET, "outgoing started")
+       var ticks <-chan time.Time
+       var throttleTicker *time.Ticker
+
+       obound, oboundP := c.obound, c.oboundP
+
+       if c.options.PublishThrottle > 0 {
+               throttleTicker = time.NewTicker(time.Duration(int64(time.Millisecond) * int64(1000/c.options.PublishThrottle)))
+               defer throttleTicker.Stop()
+               ticks = throttleTicker.C
+       }
 
        for {
                DEBUG.Println(NET, "outgoing waiting for an outbound message")
                select {
+               case <-ticks:
+                       obound = c.obound
+                       oboundP = c.oboundP
+                       ticks = nil
                case <-c.stop:
                        DEBUG.Println(NET, "outgoing stopped")
                        return
-               case pub := <-c.obound:
+               case pub := <-obound:
+                       if c.options.PublishThrottle > 0 {
+                               obound = nil
+                               oboundP = nil
+                               ticks = throttleTicker.C
+                       }
                        msg := pub.p.(*packets.PublishPacket)
 
                        if c.options.WriteTimeout > 0 {
@@ -196,7 +215,12 @@ func outgoing(c *client) {
                                pub.t.flowComplete()
                        }
                        DEBUG.Println(NET, "obound wrote msg, id:", msg.MessageID)
-               case msg := <-c.oboundP:
+               case msg := <-oboundP:
+                       if c.options.PublishThrottle > 0 {
+                               obound = nil
+                               oboundP = nil
+                               ticks = throttleTicker.C
+                       }
                        switch msg.p.(type) {
                        case *packets.SubscribePacket:
                                msg.p.(*packets.SubscribePacket).MessageID = c.getID(msg.t)
@MattBrittan
Copy link
Contributor

Think it's worth mentioning here that the V5 client does support this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants