From 3fbad36a4cd119bc04274df3f5a415ae1b5cea58 Mon Sep 17 00:00:00 2001 From: Wei Li Date: Thu, 6 Apr 2023 14:53:21 +0800 Subject: [PATCH] fix: fix panic when redis client closed before server closed --- subscriber.go | 7 ++++++- subscriber_test.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/subscriber.go b/subscriber.go index 3c1b1eaa..5192b770 100644 --- a/subscriber.go +++ b/subscriber.go @@ -46,7 +46,7 @@ func newSubscriber(params subscriberParams) *subscriber { func (s *subscriber) shutdown() { s.logger.Debug("Subscriber shutting down...") // Signal the subscriber goroutine to stop. - s.done <- struct{}{} + close(s.done) } func (s *subscriber) start(wg *sync.WaitGroup) { @@ -80,6 +80,11 @@ func (s *subscriber) start(wg *sync.WaitGroup) { s.logger.Debug("Subscriber done") return case msg := <-cancelCh: + if msg == nil { + s.logger.Warn("channel closed") + pubsub.Close() + return + } cancel, ok := s.cancelations.Get(msg.Payload) if ok { cancel() diff --git a/subscriber_test.go b/subscriber_test.go index ec4e65b8..3afdb61a 100644 --- a/subscriber_test.go +++ b/subscriber_test.go @@ -70,6 +70,38 @@ func TestSubscriber(t *testing.T) { } } +func TestSubscriberWithRedisClientClosed(t *testing.T) { + defer func() { + if r := recover(); r != nil { + t.Errorf("panic occurred: %v", r) + } + }() + redisClient := setup(t) + r := rdb.NewRDB(redisClient) + defer r.Close() + testBroker := testbroker.NewTestBroker(r) + + cancelations := base.NewCancelations() + subscriber := newSubscriber(subscriberParams{ + logger: testLogger, + broker: testBroker, + cancelations: cancelations, + }) + subscriber.retryTimeout = 1 * time.Second // set shorter retry timeout for testing purpose. + + var wg sync.WaitGroup + subscriber.start(&wg) + defer subscriber.shutdown() + + time.Sleep(5 * time.Second) // allow some time for subscriber to connect + + if err := redisClient.Close(); err != nil { + t.Fatalf("close redis client,%+v", err) + } + + time.Sleep(1 * time.Second) // make sure client is closed +} + func TestSubscriberWithRedisDown(t *testing.T) { defer func() { if r := recover(); r != nil {