From b5d1786d517595f33acdafe7acbed65a671d3c0b Mon Sep 17 00:00:00 2001 From: Jon Drews Date: Sat, 7 Aug 2021 02:24:53 -0400 Subject: [PATCH] Implement pubsub broker to broadcast lines to all websocket clients fixes case where new websocket clients consume lines, resulting in incomplete logs in all websocket clients --- go.mod | 3 ++- go.sum | 17 +++++++++++++- internal/pubsub.go | 58 ++++++++++++++++++++++++++++++++++++++++++++++ main.go | 33 ++++++++++++++------------ 4 files changed, 94 insertions(+), 17 deletions(-) create mode 100644 internal/pubsub.go diff --git a/go.mod b/go.mod index 7b67d97..6dbd63f 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/fstab/grok_exporter v0.2.8 github.com/gorilla/websocket v1.4.2 github.com/labstack/echo/v4 v4.3.0 - github.com/rakyll/statik v0.1.7 // indirect + github.com/prometheus/common v0.4.1 // indirect + github.com/rakyll/statik v0.1.7 github.com/sirupsen/logrus v1.8.1 ) diff --git a/go.sum b/go.sum index 6783e8a..17c4d2f 100644 --- a/go.sum +++ b/go.sum @@ -11,10 +11,16 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumC github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/fstab/grok_exporter v0.2.8 h1:GpMFrTbo/S6bEUxKaLmetvdMaim+q/+rVm1PwYVDr0g= github.com/fstab/grok_exporter v0.2.8/go.mod h1:iGlibhW5JHntDD2pq8Ydx8tmcb+I/gGKYKlULPBKrCc= +github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/labstack/echo/v4 v4.3.0 h1:DCP6cbtT+Zu++K6evHOJzSgA2115cPMuCx0xg55q1EQ= github.com/labstack/echo/v4 v4.3.0/go.mod h1:PvmtTvhVqKDzDQy4d3bWzPjZLzom4iQbAZy2sgZ/qI8= github.com/labstack/gommon v0.3.0 h1:JEeO0bvc78PKdyHxloTKiF8BD5iGrH8T6MSeGvSgob0= @@ -27,16 +33,22 @@ github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2y github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= -github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jOqq0gIVUe6Yk0/QMZ640k6NvkxcBf+8= github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/common v0.4.1 h1:K0MGApIoQvMw27RTdJkPbr3JZ7DNbtxQNyi5STVM6Kw= +github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/rakyll/statik v0.1.7 h1:OF3QCZUuyPxuGEP7B4ypUa7sB/iHtqOTDYZXGM8KOdQ= github.com/rakyll/statik v0.1.7/go.mod h1:AlZONWzMtEnMs7W4e/1LURLiI49pIMmp6V9Unghqrcc= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= @@ -55,12 +67,14 @@ golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 h1:It14KIkyBFYkHkwZ7k45mi golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4 h1:c2HOrn5iMezYjSlGPncknSEr/8x5LELb/ilJbXi9DEA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -80,5 +94,6 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/internal/pubsub.go b/internal/pubsub.go new file mode 100644 index 0000000..1816893 --- /dev/null +++ b/internal/pubsub.go @@ -0,0 +1,58 @@ +// Sourced from https://github.com/eliben/code-for-blog/blob/master/2020/go-pubsub/pubsub-channel-return.go +// and https://eli.thegreenplace.net/2020/pubsub-using-channels-in-go/ +// License; Public Domain + +package internal + +import ( + "sync" +) + +type Pubsub struct { + mu sync.RWMutex + subs map[string][]chan string + closed bool +} + +func NewPubsub() *Pubsub { + ps := &Pubsub{} + ps.subs = make(map[string][]chan string) + ps.closed = false + return ps +} + +func (ps *Pubsub) Subscribe(topic string) <-chan string { + ps.mu.Lock() + defer ps.mu.Unlock() + + ch := make(chan string, 1) + ps.subs[topic] = append(ps.subs[topic], ch) + return ch +} + +func (ps *Pubsub) Publish(topic string, msg string) { + ps.mu.RLock() + defer ps.mu.RUnlock() + + if ps.closed { + return + } + + for _, ch := range ps.subs[topic] { + ch <- msg + } +} + +func (ps *Pubsub) Close() { + ps.mu.Lock() + defer ps.mu.Unlock() + + if !ps.closed { + ps.closed = true + for _, subs := range ps.subs { + for _, ch := range subs { + close(ch) + } + } + } +} diff --git a/main.go b/main.go index ea05067..21b4e07 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "github.com/fstab/grok_exporter/tailer/fswatcher" "github.com/fstab/grok_exporter/tailer/glob" "github.com/gorilla/websocket" + "github.com/jdrews/logstation/internal" _ "github.com/jdrews/logstation/statik" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" @@ -18,10 +19,10 @@ var ( ) func main() { + pubSub := internal.NewPubsub() - linesChannel := make(chan string, 500) //begin watching the file - go follow("test/logfile.log", linesChannel) + go follow("test/logfile.log", pubSub) e := echo.New() @@ -39,7 +40,7 @@ func main() { // pass channel into handler wsHandlerChan := func(c echo.Context) error { - return wshandler(c, linesChannel) + return wshandler(c, pubSub) } e.GET("/ws", wsHandlerChan) @@ -47,29 +48,31 @@ func main() { e.Logger.Fatal(e.Start(":8081")) } -func wshandler(c echo.Context, linesChannel <-chan string) error { - // Disable the following line in production. Using in development so I can `npm start` and dev the frontend +func wshandler(c echo.Context, pubSub *internal.Pubsub) error { + // Disable the following line in production. Using in development so I can `npm start` and dev the frontend. It bypasses CORS upgrader.CheckOrigin = func(r *http.Request) bool { return true } + var err error + + linesChannel := pubSub.Subscribe("lines") + ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil) if err != nil { return err } defer ws.Close() - for { - select { - case lines := <-linesChannel: - // Write - err := ws.WriteMessage(websocket.TextMessage, []byte(lines)) - if err != nil { - c.Logger().Error(err) - } + for line := range linesChannel { + // Write + err := ws.WriteMessage(websocket.TextMessage, []byte(line)) + if err != nil { + c.Logger().Error(err) } } + return err } -func follow(path string, linesChannel chan<- string) error { +func follow(path string, pubSub *internal.Pubsub) error { logger := logrus.New() logger.Level = logrus.DebugLevel logger.SetOutput(os.Stdout) @@ -85,7 +88,7 @@ func follow(path string, linesChannel chan<- string) error { select { case line := <-tailer.Lines(): logger.Debug(line.Line) - linesChannel <- path + ":" + line.Line + pubSub.Publish("lines", line.Line) default: continue }