Skip to content

Commit

Permalink
Adding more tests to ringpop provider
Browse files Browse the repository at this point in the history
Also fixed race-condition: we were waiting for ring to stop, but in
order to read stop-channel it sometimes had to finish notifying
subscribers which took the same lock. We need to be careful with
lock-scope.
  • Loading branch information
dkrotx committed Sep 26, 2024
1 parent 876dab1 commit 745ba5a
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 6 deletions.
4 changes: 2 additions & 2 deletions common/membership/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ func (r *ring) Stop() {
r.value.Store(emptyHashring())

r.subscribers.Lock()
defer r.subscribers.Unlock()
r.subscribers.keys = make(map[string]chan<- *ChangedEvent)
r.subscribers.Unlock()
close(r.shutdownCh)

if success := common.AwaitWaitGroup(&r.shutdownWG, time.Minute); !success {
Expand Down Expand Up @@ -302,7 +302,7 @@ func (r *ring) refreshRingWorker() {
return
case <-r.refreshChan: // local signal or signal from provider
if err := r.refresh(); err != nil {
r.logger.Error("refreshing ring", tag.Error(err))
r.logger.Error("failed to refresh ring", tag.Error(err))
}
case <-refreshTicker.Chan(): // periodically force refreshing membership
r.signalSelf()
Expand Down
112 changes: 108 additions & 4 deletions common/membership/hashring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,23 @@ package membership

import (
"errors"
"fmt"
"math/rand"
"runtime"
"sync"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"go.uber.org/zap/zaptest/observer"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/metrics"
)
Expand Down Expand Up @@ -155,6 +159,10 @@ func (td *hashringTestData) startHashRing() {
td.hashRing.Start()
}

func (td *hashringTestData) bypassRefreshRatelimiter() {
td.hashRing.members.refreshed = time.Now().AddDate(0, 0, -1)
}

func TestFailedLookupWillAskProvider(t *testing.T) {
td := newHashringTestData(t)

Expand All @@ -175,6 +183,104 @@ func TestFailedLookupWillAskProvider(t *testing.T) {
require.True(t, common.AwaitWaitGroup(&wg, maxTestDuration), "Failed Lookup should lead to refresh")
}

func TestFailingToSubscribeIsFatal(t *testing.T) {
defer goleak.VerifyNone(t)
td := newHashringTestData(t)

// we need to intercept logger calls, use mock
mockLogger := &log.MockLogger{}
td.hashRing.logger = mockLogger

mockLogger.On("Fatal", mock.Anything, mock.Anything).Run(
func(arguments mock.Arguments) {
// we need to stop goroutine like log.Fatal() does with an entire program
runtime.Goexit()
},
).Times(1)

td.mockPeerProvider.EXPECT().
Subscribe(gomock.Any(), gomock.Any()).
Return(errors.New("can't subscribe"))

// because we use runtime.Goexit() we need to call .Start in a separate goroutine
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
td.hashRing.Start()
}()

require.True(t, common.AwaitWaitGroup(&wg, maxTestDuration), "must be finished - failed to subscribe")
require.True(t, mockLogger.AssertExpectations(t), "log.Fatal must be called")
}

func TestHandleUpdatesNeverBlocks(t *testing.T) {
td := newHashringTestData(t)

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
td.hashRing.handleUpdates(ChangedEvent{})
wg.Done()
}()
}

require.True(t, common.AwaitWaitGroup(&wg, maxTestDuration), "handleUpdates should never block")
}

func TestHandlerSchedulesUpdates(t *testing.T) {
td := newHashringTestData(t)

var wg sync.WaitGroup
td.mockPeerProvider.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1)
td.mockPeerProvider.EXPECT().GetMembers("test-service").DoAndReturn(func(service string) ([]HostInfo, error) {
wg.Done()
fmt.Println("GetMembers called")
return randomHostInfo(5), nil
}).Times(2)
td.mockPeerProvider.EXPECT().WhoAmI().AnyTimes()

wg.Add(1) // we expect 1st GetMembers to be called during hashring start
td.startHashRing()
require.True(t, common.AwaitWaitGroup(&wg, maxTestDuration), "GetMembers must be called")

wg.Add(1) // another call to GetMembers should happen because of handleUpdate
td.bypassRefreshRatelimiter()
td.hashRing.handleUpdates(ChangedEvent{})

require.True(t, common.AwaitWaitGroup(&wg, maxTestDuration), "GetMembers must be called again")
}

func TestFailedRefreshLogsError(t *testing.T) {
td := newHashringTestData(t)

var wg sync.WaitGroup
td.mockPeerProvider.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1)
td.mockPeerProvider.EXPECT().GetMembers("test-service").DoAndReturn(func(service string) ([]HostInfo, error) {
wg.Done()
return randomHostInfo(5), nil
}).Times(1)
td.mockPeerProvider.EXPECT().WhoAmI().AnyTimes()

wg.Add(1) // we expect 1st GetMembers to be called during hashring start
td.startHashRing()
require.True(t, common.AwaitWaitGroup(&wg, maxTestDuration), "GetMembers must be called")

td.mockPeerProvider.EXPECT().GetMembers("test-service").DoAndReturn(func(service string) ([]HostInfo, error) {
wg.Done()
return nil, errors.New("GetMembers failed")
}).Times(1)

wg.Add(1) // another call to GetMembers should happen because of handleUpdate
td.bypassRefreshRatelimiter()
td.hashRing.handleUpdates(ChangedEvent{})

require.True(t, common.AwaitWaitGroup(&wg, maxTestDuration), "GetMembers must be called again (and fail)")
td.hashRing.Stop()
assert.Equal(t, 1, td.observedLogs.FilterMessageSnippet("failed to refresh ring").Len())
}

func TestRefreshUpdatesRingOnlyWhenRingHasChanged(t *testing.T) {
td := newHashringTestData(t)

Expand Down Expand Up @@ -218,8 +324,7 @@ func TestRefreshWillNotifySubscribers(t *testing.T) {
assert.NotEmpty(t, changedEvent2, "changed event should never be empty")
}()

// to bypass internal check
td.hashRing.members.refreshed = time.Now().AddDate(0, 0, -1)
td.bypassRefreshRatelimiter()
td.hashRing.signalSelf()

// wait until both subscribers will get notification
Expand Down Expand Up @@ -346,8 +451,7 @@ func TestLookupAndRefreshRaceCondition(t *testing.T) {
}()
go func() {
for i := 0; i < 50; i++ {
// to bypass internal check
td.hashRing.members.refreshed = time.Now().AddDate(0, 0, -1)
td.bypassRefreshRatelimiter()
assert.NoError(t, td.hashRing.refresh())
}
wg.Done()
Expand Down
49 changes: 49 additions & 0 deletions common/peerprovider/ringpopprovider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,18 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/ringpop-go/events"
"github.com/uber/tchannel-go"
"go.uber.org/goleak"

"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/membership"
)

const testServiceName = "test-service"

type srvAndCh struct {
service string
ch *tchannel.Channel
Expand Down Expand Up @@ -136,6 +141,50 @@ func TestRingpopProvider(t *testing.T) {
}
}

func TestSubscribeAndNotify(t *testing.T) {
provider := NewRingpopProvider(testServiceName, nil, nil, nil, testlogger.New(t))

ringpopEvent := events.RingChangedEvent{
ServersAdded: []string{"aa", "bb", "cc"},
ServersUpdated: []string{"dd"},
ServersRemoved: []string{"ee", "ff"},
}
expectedEvent := membership.ChangedEvent{
HostsAdded: ringpopEvent.ServersAdded,
HostsUpdated: ringpopEvent.ServersUpdated,
HostsRemoved: ringpopEvent.ServersRemoved,
}

var calls1, calls2 int
require.NoError(t,
provider.Subscribe("subscriber1",
func(ev membership.ChangedEvent) {
calls1++
assert.Equal(t, ev, expectedEvent)
},
))

require.NoError(t,
provider.Subscribe("subscriber2",
func(ev membership.ChangedEvent) {
calls2++
assert.Equal(t, ev, expectedEvent)
},
))

require.Error(t,
provider.Subscribe(
"subscriber2",
func(membership.ChangedEvent) { t.Error("Should never be called") },
),
"Subscribe doesn't allow duplicate names",
)

provider.HandleEvent(ringpopEvent)
assert.Equal(t, 1, calls1, "every subscriber must have been called once")
assert.Equal(t, 1, calls2, "every subscriber must have been called once")
}

func createAndListenChannels(serviceName string, n int) ([]*srvAndCh, func(), error) {
var res []*srvAndCh
cleanupFn := func(srvs []*srvAndCh) func() {
Expand Down

0 comments on commit 745ba5a

Please sign in to comment.