diff --git a/common/membership/hashring.go b/common/membership/hashring.go index 94da372bcf4..6eb1f77fba2 100644 --- a/common/membership/hashring.go +++ b/common/membership/hashring.go @@ -151,8 +151,8 @@ func (r *ring) Stop() { r.value.Store(emptyHashring()) r.subscribers.Lock() - defer r.subscribers.Unlock() r.subscribers.keys = make(map[string]chan<- *ChangedEvent) + r.subscribers.Unlock() close(r.shutdownCh) if success := common.AwaitWaitGroup(&r.shutdownWG, time.Minute); !success { @@ -302,7 +302,7 @@ func (r *ring) refreshRingWorker() { return case <-r.refreshChan: // local signal or signal from provider if err := r.refresh(); err != nil { - r.logger.Error("refreshing ring", tag.Error(err)) + r.logger.Error("failed to refresh ring", tag.Error(err)) } case <-refreshTicker.Chan(): // periodically force refreshing membership r.signalSelf() diff --git a/common/membership/hashring_test.go b/common/membership/hashring_test.go index 222612855ac..507715c4dc6 100644 --- a/common/membership/hashring_test.go +++ b/common/membership/hashring_test.go @@ -24,19 +24,23 @@ package membership import ( "errors" + "fmt" "math/rand" + "runtime" "sync" "testing" "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/goleak" "go.uber.org/zap/zaptest/observer" "github.com/uber/cadence/common" "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/metrics" ) @@ -155,6 +159,10 @@ func (td *hashringTestData) startHashRing() { td.hashRing.Start() } +func (td *hashringTestData) bypassRefreshRatelimiter() { + td.hashRing.members.refreshed = time.Now().AddDate(0, 0, -1) +} + func TestFailedLookupWillAskProvider(t *testing.T) { td := newHashringTestData(t) @@ -175,6 +183,104 @@ func TestFailedLookupWillAskProvider(t *testing.T) { require.True(t, common.AwaitWaitGroup(&wg, maxTestDuration), "Failed Lookup should lead to refresh") } +func TestFailingToSubscribeIsFatal(t *testing.T) { + defer goleak.VerifyNone(t) + td := newHashringTestData(t) + + // we need to intercept logger calls, use mock + mockLogger := &log.MockLogger{} + td.hashRing.logger = mockLogger + + mockLogger.On("Fatal", mock.Anything, mock.Anything).Run( + func(arguments mock.Arguments) { + // we need to stop goroutine like log.Fatal() does with an entire program + runtime.Goexit() + }, + ).Times(1) + + td.mockPeerProvider.EXPECT(). + Subscribe(gomock.Any(), gomock.Any()). + Return(errors.New("can't subscribe")) + + // because we use runtime.Goexit() we need to call .Start in a separate goroutine + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + td.hashRing.Start() + }() + + require.True(t, common.AwaitWaitGroup(&wg, maxTestDuration), "must be finished - failed to subscribe") + require.True(t, mockLogger.AssertExpectations(t), "log.Fatal must be called") +} + +func TestHandleUpdatesNeverBlocks(t *testing.T) { + td := newHashringTestData(t) + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + td.hashRing.handleUpdates(ChangedEvent{}) + wg.Done() + }() + } + + require.True(t, common.AwaitWaitGroup(&wg, maxTestDuration), "handleUpdates should never block") +} + +func TestHandlerSchedulesUpdates(t *testing.T) { + td := newHashringTestData(t) + + var wg sync.WaitGroup + td.mockPeerProvider.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1) + td.mockPeerProvider.EXPECT().GetMembers("test-service").DoAndReturn(func(service string) ([]HostInfo, error) { + wg.Done() + fmt.Println("GetMembers called") + return randomHostInfo(5), nil + }).Times(2) + td.mockPeerProvider.EXPECT().WhoAmI().AnyTimes() + + wg.Add(1) // we expect 1st GetMembers to be called during hashring start + td.startHashRing() + require.True(t, common.AwaitWaitGroup(&wg, maxTestDuration), "GetMembers must be called") + + wg.Add(1) // another call to GetMembers should happen because of handleUpdate + td.bypassRefreshRatelimiter() + td.hashRing.handleUpdates(ChangedEvent{}) + + require.True(t, common.AwaitWaitGroup(&wg, maxTestDuration), "GetMembers must be called again") +} + +func TestFailedRefreshLogsError(t *testing.T) { + td := newHashringTestData(t) + + var wg sync.WaitGroup + td.mockPeerProvider.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1) + td.mockPeerProvider.EXPECT().GetMembers("test-service").DoAndReturn(func(service string) ([]HostInfo, error) { + wg.Done() + return randomHostInfo(5), nil + }).Times(1) + td.mockPeerProvider.EXPECT().WhoAmI().AnyTimes() + + wg.Add(1) // we expect 1st GetMembers to be called during hashring start + td.startHashRing() + require.True(t, common.AwaitWaitGroup(&wg, maxTestDuration), "GetMembers must be called") + + td.mockPeerProvider.EXPECT().GetMembers("test-service").DoAndReturn(func(service string) ([]HostInfo, error) { + wg.Done() + return nil, errors.New("GetMembers failed") + }).Times(1) + + wg.Add(1) // another call to GetMembers should happen because of handleUpdate + td.bypassRefreshRatelimiter() + td.hashRing.handleUpdates(ChangedEvent{}) + + require.True(t, common.AwaitWaitGroup(&wg, maxTestDuration), "GetMembers must be called again (and fail)") + td.hashRing.Stop() + assert.Equal(t, 1, td.observedLogs.FilterMessageSnippet("failed to refresh ring").Len()) +} + func TestRefreshUpdatesRingOnlyWhenRingHasChanged(t *testing.T) { td := newHashringTestData(t) @@ -218,8 +324,7 @@ func TestRefreshWillNotifySubscribers(t *testing.T) { assert.NotEmpty(t, changedEvent2, "changed event should never be empty") }() - // to bypass internal check - td.hashRing.members.refreshed = time.Now().AddDate(0, 0, -1) + td.bypassRefreshRatelimiter() td.hashRing.signalSelf() // wait until both subscribers will get notification @@ -346,8 +451,7 @@ func TestLookupAndRefreshRaceCondition(t *testing.T) { }() go func() { for i := 0; i < 50; i++ { - // to bypass internal check - td.hashRing.members.refreshed = time.Now().AddDate(0, 0, -1) + td.bypassRefreshRatelimiter() assert.NoError(t, td.hashRing.refresh()) } wg.Done() diff --git a/common/peerprovider/ringpopprovider/provider_test.go b/common/peerprovider/ringpopprovider/provider_test.go index 9f10ff3a7c4..ba45742fa45 100644 --- a/common/peerprovider/ringpopprovider/provider_test.go +++ b/common/peerprovider/ringpopprovider/provider_test.go @@ -27,6 +27,9 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber/ringpop-go/events" "github.com/uber/tchannel-go" "go.uber.org/goleak" @@ -34,6 +37,8 @@ import ( "github.com/uber/cadence/common/membership" ) +const testServiceName = "test-service" + type srvAndCh struct { service string ch *tchannel.Channel @@ -136,6 +141,50 @@ func TestRingpopProvider(t *testing.T) { } } +func TestSubscribeAndNotify(t *testing.T) { + provider := NewRingpopProvider(testServiceName, nil, nil, nil, testlogger.New(t)) + + ringpopEvent := events.RingChangedEvent{ + ServersAdded: []string{"aa", "bb", "cc"}, + ServersUpdated: []string{"dd"}, + ServersRemoved: []string{"ee", "ff"}, + } + expectedEvent := membership.ChangedEvent{ + HostsAdded: ringpopEvent.ServersAdded, + HostsUpdated: ringpopEvent.ServersUpdated, + HostsRemoved: ringpopEvent.ServersRemoved, + } + + var calls1, calls2 int + require.NoError(t, + provider.Subscribe("subscriber1", + func(ev membership.ChangedEvent) { + calls1++ + assert.Equal(t, ev, expectedEvent) + }, + )) + + require.NoError(t, + provider.Subscribe("subscriber2", + func(ev membership.ChangedEvent) { + calls2++ + assert.Equal(t, ev, expectedEvent) + }, + )) + + require.Error(t, + provider.Subscribe( + "subscriber2", + func(membership.ChangedEvent) { t.Error("Should never be called") }, + ), + "Subscribe doesn't allow duplicate names", + ) + + provider.HandleEvent(ringpopEvent) + assert.Equal(t, 1, calls1, "every subscriber must have been called once") + assert.Equal(t, 1, calls2, "every subscriber must have been called once") +} + func createAndListenChannels(serviceName string, n int) ([]*srvAndCh, func(), error) { var res []*srvAndCh cleanupFn := func(srvs []*srvAndCh) func() {