From 6445c471ac1924ba26619c25c8b33899042e6dc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Snorre=20Magnus=20Dav=C3=B8en?= Date: Thu, 9 Nov 2023 11:39:34 +0100 Subject: [PATCH] fix: Broadcast of post to sse blocks execution It seems some issues around the Broadcast functionality was blocking my entire post db writer causing the feed to go stale as no posts were being written. Two fixes have been implemented for this. First the Broadcast receiver function no longer requests a mutex lock. If it tries to send a post to a closed or nil channel, we don't really care. Locking for every post coming in would likely lead to situations where multiple calls to Broadcast would be waiting for the previous to complete sending channel events. Secondly the Broadcast call is called in a go routine (i.e. go server.Broadcast(post)). This should allow the post subscriber in the serve command to quickly process the posts regardless of how long the broadcast function uses to loop over SSE clients. --- cmd/serve.go | 5 ++++- server/server.go | 9 +++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/cmd/serve.go b/cmd/serve.go index a0a0bff..ee61554 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -55,6 +55,7 @@ func serveCmd() *cli.Command { Value: 3000, }, }, + Action: func(ctx *cli.Context) error { log.Info("Starting Norsky feed generator") @@ -100,9 +101,11 @@ func serveCmd() *cli.Command { go func() { for post := range postChan { switch post := post.(type) { + // Don't crash if broadcast fails case models.CreatePostEvent: dbPostChan <- post - broadcaster.Broadcast(post) // Broadcast new post to SSE clients + // Broadcast without blocking + go broadcaster.Broadcast(post) // Broadcast new post to SSE clients default: dbPostChan <- post } diff --git a/server/server.go b/server/server.go index 30c37ab..445caca 100644 --- a/server/server.go +++ b/server/server.go @@ -59,9 +59,6 @@ func (b *Broadcaster) Broadcast(post models.CreatePostEvent) { log.WithFields(log.Fields{ "clients": len(b.clients), }).Info("Broadcasting post to SSE clients") - - b.Lock() - defer b.Unlock() for _, client := range b.clients { client <- post } @@ -82,8 +79,12 @@ func (b *Broadcaster) AddClient(key string, client chan models.CreatePostEvent) func (b *Broadcaster) RemoveClient(key string) { b.Lock() defer b.Unlock() + // Check if client channel exists in map if _, ok := b.clients[key]; !ok { - close(b.clients[key]) + // Close the channel unless it's already closed + if b.clients[key] != nil { + close(b.clients[key]) + } delete(b.clients, key) }