Skip to content

Commit

Permalink
Added tests for starting, stopping and purging daemon for replication…
Browse files Browse the repository at this point in the history
…Queue
  • Loading branch information
abhishekj720 committed May 6, 2024
1 parent a843a9e commit 51efbcd
Showing 1 changed file with 116 additions and 0 deletions.
116 changes: 116 additions & 0 deletions common/domain/replication_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import (
"context"
"encoding/binary"
"errors"
"github.com/uber/cadence/common"
"sync/atomic"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
Expand All @@ -39,6 +42,94 @@ const (
preambleVersion0 byte = 0x59
)

func TestReplicationQueueImpl_Start(t *testing.T) {
tests := []struct {
name string
initialStatus int32
expectedStatus int32
shouldStart bool
}{
{
name: "Should start when initialized",
initialStatus: common.DaemonStatusInitialized,
expectedStatus: common.DaemonStatusStarted,
shouldStart: true,
},
{
name: "Should not start when already started",
initialStatus: common.DaemonStatusStarted,
expectedStatus: common.DaemonStatusStarted,
shouldStart: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockQueue := persistence.NewMockQueueManager(ctrl)
rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil).(*replicationQueueImpl)
atomic.StoreInt32(&rq.status, tt.initialStatus)

rq.Start()
defer rq.Stop()
assert.Equal(t, tt.expectedStatus, atomic.LoadInt32(&rq.status))

if tt.shouldStart {
time.Sleep(1 * time.Nanosecond)
select {
case <-rq.done:
t.Error("purgeProcessor should not have stopped")
default:
// expected no action
}
}
})
}
}

func TestReplicationQueueImpl_Stop(t *testing.T) {
tests := []struct {
name string
initialStatus int32
expectedStatus int32
shouldStop bool
}{
{
name: "Should stop when started",
initialStatus: common.DaemonStatusStarted,
expectedStatus: common.DaemonStatusStopped,
shouldStop: true,
},
{
name: "Should not stop when not started",
initialStatus: common.DaemonStatusInitialized,
expectedStatus: common.DaemonStatusInitialized,
shouldStop: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockQueue := persistence.NewMockQueueManager(ctrl)
rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil).(*replicationQueueImpl)
atomic.StoreInt32(&rq.status, tt.initialStatus)

rq.Stop()
assert.Equal(t, tt.expectedStatus, atomic.LoadInt32(&rq.status))

if tt.shouldStop {
select {
case <-rq.done:
// expected channel closed
default:
t.Error("done channel should be closed")
}
}
})
}
}

func TestReplicationQueueImpl_Publish(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -619,3 +710,28 @@ func TestPurgeAckedMessages(t *testing.T) {
})
}
}

func TestReplicationQueueImpl_purgeProcessor(t *testing.T) {
ctrl := gomock.NewController(t)
mockQueue := persistence.NewMockQueueManager(ctrl)
rq := NewReplicationQueue(mockQueue, "testCluster", nil, nil).(*replicationQueueImpl)
atomic.StoreInt32(&rq.status, common.DaemonStatusStarted)

done := make(chan bool)
mockQueue.EXPECT().GetAckLevels(gomock.Any()).Return(map[string]int64{}, nil).AnyTimes()
mockQueue.EXPECT().DeleteMessagesBefore(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()

go func() {
rq.purgeProcessor()
close(done)
}()

time.Sleep(1 * time.Nanosecond)
rq.Stop()
select {
case <-done:
// Pass if the goroutine exits
case <-time.After(1 * time.Millisecond):
t.Error("purgeProcessor did not stop within expected time")
}
}

0 comments on commit 51efbcd

Please sign in to comment.