From dd5a62b64852b1b9cc6f56611f301872259ed8f9 Mon Sep 17 00:00:00 2001
From: Daniel Ferstay <dferstay@abnormalsecurity.com>
Date: Sat, 11 Feb 2023 17:56:27 -0800
Subject: [PATCH] Make Subscriber.Close() thread-safe

---
 pkg/kafka/subscriber.go | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/pkg/kafka/subscriber.go b/pkg/kafka/subscriber.go
index 3c445a9..c540693 100644
--- a/pkg/kafka/subscriber.go
+++ b/pkg/kafka/subscriber.go
@@ -4,6 +4,7 @@ import (
 	"context"
 	"strings"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"github.com/Shopify/sarama"
@@ -22,7 +23,7 @@ type Subscriber struct {
 	closing       chan struct{}
 	subscribersWg sync.WaitGroup
 
-	closed bool
+	closed uint32
 }
 
 // NewSubscriber creates a new Kafka Subscriber.
@@ -128,7 +129,7 @@ func DefaultSaramaSubscriberConfig() *sarama.Config {
 //
 // There are multiple subscribers spawned
 func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) {
-	if s.closed {
+	if atomic.LoadUint32(&s.closed) == 1 {
 		return nil, errors.New("subscriber closed")
 	}
 
@@ -465,11 +466,10 @@ func (s *Subscriber) createMessagesHandler(output chan *message.Message) message
 }
 
 func (s *Subscriber) Close() error {
-	if s.closed {
+	if !atomic.CompareAndSwapUint32(&s.closed, 0, 1) {
 		return nil
 	}
 
-	s.closed = true
 	close(s.closing)
 	s.subscribersWg.Wait()