Skip to content

Commit

Permalink
Fix reconnection issues mqtt (#8821)
Browse files Browse the repository at this point in the history
  • Loading branch information
helenosheaa authored Feb 11, 2021
1 parent c25ae52 commit f3a208e
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 6 deletions.
1 change: 1 addition & 0 deletions docs/LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ following works:
- github.com/googleapis/gax-go [BSD 3-Clause "New" or "Revised" License](https://github.com/googleapis/gax-go/blob/master/LICENSE)
- github.com/gopcua/opcua [MIT License](https://github.com/gopcua/opcua/blob/master/LICENSE)
- github.com/gorilla/mux [BSD 3-Clause "New" or "Revised" License](https://github.com/gorilla/mux/blob/master/LICENSE)
- github.com/gorilla/websocket [BSD 2-Clause "Simplified" License](https://github.com/gorilla/websocket/blob/master/LICENSE)
- github.com/gosnmp/gosnmp [BSD 2-Clause "Simplified" License](https://github.com/gosnmp/gosnmp/blob/master/LICENSE)
- github.com/grpc-ecosystem/grpc-gateway [BSD 3-Clause "New" or "Revised" License](https://github.com/grpc-ecosystem/grpc-gateway/blob/master/LICENSE.txt)
- github.com/hailocab/go-hostpool [MIT License](https://github.com/hailocab/go-hostpool/blob/master/LICENSE)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ require (
github.com/docker/go-connections v0.3.0 // indirect
github.com/docker/go-units v0.3.3 // indirect
github.com/docker/libnetwork v0.8.0-dev.2.0.20181012153825-d7b61745d166
github.com/eclipse/paho.mqtt.golang v1.2.0
github.com/eclipse/paho.mqtt.golang v1.3.0
github.com/ericchiang/k8s v1.2.0
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
github.com/go-logfmt/logfmt v0.4.0
Expand Down
7 changes: 5 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0=
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
github.com/eclipse/paho.mqtt.golang v1.3.0 h1:MU79lqr3FKNKbSrGN7d7bNYqh8MwWW7Zcx0iG+VIw9I=
github.com/eclipse/paho.mqtt.golang v1.3.0/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -336,6 +336,8 @@ github.com/gorilla/context v1.1.1 h1:AWwleXJkX/nhcU9bZSnZoi3h/qGYqQAGhq6zZe/aQW8
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.6.2 h1:Pgr17XVTNXAk3q/r4CpKzC5xBM/qW1uVLV+IhRZpIIk=
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gosnmp/gosnmp v1.29.0 h1:fEkud7oiYVzR64L+/BQA7uvp+7COI9+XkrUQi8JunYM=
github.com/gosnmp/gosnmp v1.29.0/go.mod h1:Ux0YzU4nV5yDET7dNIijd0VST0BCy8ijBf+gTVFQeaM=
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
Expand Down Expand Up @@ -729,6 +731,7 @@ golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200904194848-62affa334b73 h1:MXfv8rhZWmFeqX3GNZRsd6vOLoaCHjYEX3qkRo3YBUA=
golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
Expand Down
4 changes: 1 addition & 3 deletions plugins/inputs/mqtt_consumer/mqtt_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func (m *MQTTConsumer) Init() error {
}

m.opts = opts
m.messages = map[telegraf.TrackingID]bool{}

return nil
}
Expand Down Expand Up @@ -221,9 +222,6 @@ func (m *MQTTConsumer) connect() error {

m.Log.Infof("Connected %v", m.Servers)
m.state = Connected
m.messagesMutex.Lock()
m.messages = make(map[telegraf.TrackingID]bool)
m.messagesMutex.Unlock()

// Persistent sessions should skip subscription if a session is present, as
// the subscriptions are stored by the server.
Expand Down
5 changes: 5 additions & 0 deletions plugins/inputs/mqtt_consumer/mqtt_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (p *FakeParser) SetDefaultTags(tags map[string]string) {

type FakeToken struct {
sessionPresent bool
complete chan struct{}
}

// FakeToken satisfies mqtt.Token
Expand All @@ -84,6 +85,10 @@ func (t *FakeToken) SessionPresent() bool {
return t.sessionPresent
}

func (t *FakeToken) Done() <-chan struct{} {
return t.complete
}

// Test the basic lifecycle transitions of the plugin.
func TestLifecycleSanity(t *testing.T) {
var acc testutil.Accumulator
Expand Down

0 comments on commit f3a208e

Please sign in to comment.