diff --git a/pkg/kafka/subscriber.go b/pkg/kafka/subscriber.go index 3c445a9..c540693 100644 --- a/pkg/kafka/subscriber.go +++ b/pkg/kafka/subscriber.go @@ -4,6 +4,7 @@ import ( "context" "strings" "sync" + "sync/atomic" "time" "github.com/Shopify/sarama" @@ -22,7 +23,7 @@ type Subscriber struct { closing chan struct{} subscribersWg sync.WaitGroup - closed bool + closed uint32 } // NewSubscriber creates a new Kafka Subscriber. @@ -128,7 +129,7 @@ func DefaultSaramaSubscriberConfig() *sarama.Config { // // There are multiple subscribers spawned func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) { - if s.closed { + if atomic.LoadUint32(&s.closed) == 1 { return nil, errors.New("subscriber closed") } @@ -465,11 +466,10 @@ func (s *Subscriber) createMessagesHandler(output chan *message.Message) message } func (s *Subscriber) Close() error { - if s.closed { + if !atomic.CompareAndSwapUint32(&s.closed, 0, 1) { return nil } - s.closed = true close(s.closing) s.subscribersWg.Wait()