Skip to content

Commit

Permalink
fix registry grpc unit test intermittent failure
Browse files Browse the repository at this point in the history
Signed-off-by: akihikokuroda <akihikokuroda2020@gmail.com>
  • Loading branch information
akihikokuroda committed Jan 3, 2022
1 parent bee0b68 commit 52103a3
Showing 1 changed file with 20 additions and 14 deletions.
34 changes: 20 additions & 14 deletions pkg/controller/registry/grpc/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ type FakeSourceSyncer struct {
History map[registry.CatalogKey][]connectivity.State

sync.Mutex
expectedEvents int
done chan struct{}
expectedReadies int
done chan struct{}
}

func (f *FakeSourceSyncer) sync(state SourceState) {
Expand All @@ -60,18 +60,20 @@ func (f *FakeSourceSyncer) sync(state SourceState) {
f.History[state.Key] = []connectivity.State{}
}
f.History[state.Key] = append(f.History[state.Key], state.State)
f.expectedEvents--
if f.expectedEvents == 0 {
if state.State == connectivity.Ready {
f.expectedReadies -= 1
}
if f.expectedReadies == 0 {
f.done <- struct{}{}
}
f.Unlock()
}

func NewFakeSourceSyncer(expectedEvents int) *FakeSourceSyncer {
func NewFakeSourceSyncer(expectedReadies int) *FakeSourceSyncer {
return &FakeSourceSyncer{
History: map[registry.CatalogKey][]connectivity.State{},
expectedEvents: expectedEvents,
done: make(chan struct{}),
History: map[registry.CatalogKey][]connectivity.State{},
expectedReadies: expectedReadies,
done: make(chan struct{}),
}
}

Expand All @@ -84,21 +86,19 @@ func TestConnectionEvents(t *testing.T) {
test := func(tt testcase) func(t *testing.T) {
return func(t *testing.T) {
// start server for each catalog
totalEvents := 0
addresses := map[registry.CatalogKey]string{}

for catalog, events := range tt.expectedHistory {
totalEvents += len(events)
for catalog, _ := range tt.expectedHistory {
serve, address, stop := server(&fakes.FakeQuery{})
addresses[catalog] = address
go serve()
defer stop()
}

// start source manager
syncer := NewFakeSourceSyncer(totalEvents)
syncer := NewFakeSourceSyncer(len(tt.expectedHistory))
sources := NewSourceStore(logrus.New(), 1*time.Second, 5*time.Second, syncer.sync)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
sources.Start(ctx)

Expand All @@ -115,7 +115,13 @@ func TestConnectionEvents(t *testing.T) {
for catalog, events := range tt.expectedHistory {
recordedEvents := syncer.History[catalog]
for i := 0; i < len(recordedEvents); i++ {
require.Equal(t, (events[i]).String(), (recordedEvents[i]).String())
found := false
for _, event := range events {
if event.String() == recordedEvents[i].String() {
found = true
}
}
require.True(t, found)
}
}
}
Expand Down

0 comments on commit 52103a3

Please sign in to comment.