From 51efbcd7cb559425c2aa9610a82a7898ddeb571d Mon Sep 17 00:00:00 2001 From: Abhishek Jha Date: Mon, 6 May 2024 14:50:45 -0700 Subject: [PATCH] Added tests for starting, stopping and purging daemon for replicationQueue --- common/domain/replication_queue_test.go | 116 ++++++++++++++++++++++++ 1 file changed, 116 insertions(+) diff --git a/common/domain/replication_queue_test.go b/common/domain/replication_queue_test.go index e1e1458dc7b..c5e9cce54e7 100644 --- a/common/domain/replication_queue_test.go +++ b/common/domain/replication_queue_test.go @@ -25,7 +25,10 @@ import ( "context" "encoding/binary" "errors" + "github.com/uber/cadence/common" + "sync/atomic" "testing" + "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -39,6 +42,94 @@ const ( preambleVersion0 byte = 0x59 ) +func TestReplicationQueueImpl_Start(t *testing.T) { + tests := []struct { + name string + initialStatus int32 + expectedStatus int32 + shouldStart bool + }{ + { + name: "Should start when initialized", + initialStatus: common.DaemonStatusInitialized, + expectedStatus: common.DaemonStatusStarted, + shouldStart: true, + }, + { + name: "Should not start when already started", + initialStatus: common.DaemonStatusStarted, + expectedStatus: common.DaemonStatusStarted, + shouldStart: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockQueue := persistence.NewMockQueueManager(ctrl) + rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil).(*replicationQueueImpl) + atomic.StoreInt32(&rq.status, tt.initialStatus) + + rq.Start() + defer rq.Stop() + assert.Equal(t, tt.expectedStatus, atomic.LoadInt32(&rq.status)) + + if tt.shouldStart { + time.Sleep(1 * time.Nanosecond) + select { + case <-rq.done: + t.Error("purgeProcessor should not have stopped") + default: + // expected no action + } + } + }) + } +} + +func TestReplicationQueueImpl_Stop(t *testing.T) { + tests := []struct { + name string + initialStatus int32 + expectedStatus int32 + shouldStop bool + }{ + { + name: "Should stop when started", + initialStatus: common.DaemonStatusStarted, + expectedStatus: common.DaemonStatusStopped, + shouldStop: true, + }, + { + name: "Should not stop when not started", + initialStatus: common.DaemonStatusInitialized, + expectedStatus: common.DaemonStatusInitialized, + shouldStop: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockQueue := persistence.NewMockQueueManager(ctrl) + rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil).(*replicationQueueImpl) + atomic.StoreInt32(&rq.status, tt.initialStatus) + + rq.Stop() + assert.Equal(t, tt.expectedStatus, atomic.LoadInt32(&rq.status)) + + if tt.shouldStop { + select { + case <-rq.done: + // expected channel closed + default: + t.Error("done channel should be closed") + } + } + }) + } +} + func TestReplicationQueueImpl_Publish(t *testing.T) { tests := []struct { name string @@ -619,3 +710,28 @@ func TestPurgeAckedMessages(t *testing.T) { }) } } + +func TestReplicationQueueImpl_purgeProcessor(t *testing.T) { + ctrl := gomock.NewController(t) + mockQueue := persistence.NewMockQueueManager(ctrl) + rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil).(*replicationQueueImpl) + atomic.StoreInt32(&rq.status, common.DaemonStatusStarted) + + done := make(chan bool) + mockQueue.EXPECT().GetAckLevels(gomock.Any()).Return(map[string]int64{}, nil).AnyTimes() + mockQueue.EXPECT().DeleteMessagesBefore(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + + go func() { + rq.purgeProcessor() + close(done) + }() + + time.Sleep(1 * time.Nanosecond) + rq.Stop() + select { + case <-done: + // Pass if the goroutine exits + case <-time.After(1 * time.Millisecond): + t.Error("purgeProcessor did not stop within expected time") + } +}