Skip to content

Commit

Permalink
Fix data race in matching test suite (cadence-workflow#5781)
Browse files Browse the repository at this point in the history
  • Loading branch information
taylanisikdemir authored Mar 13, 2024
1 parent 56175e1 commit 3c7c303
Showing 1 changed file with 29 additions and 7 deletions.
36 changes: 29 additions & 7 deletions service/matching/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package matching
import (
"context"
"math/rand"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -441,15 +442,22 @@ func (t *MatcherTestSuite) TestMustOfferRemoteMatch() {
).Return(nil)

// Poll needs to happen before MustOffer, or else it goes into the non-blocking path.
var pollResultMu sync.Mutex
var polledTask *InternalTask
var pollErr error
wait := ensureAsyncReady(time.Second, func(ctx context.Context) {
task, err := t.matcher.Poll(ctx, "")
t.Nil(err)
t.NotNil(task)
pollResultMu.Lock()
defer pollResultMu.Unlock()
polledTask, pollErr = t.matcher.Poll(ctx, "")
})

t.NoError(t.matcher.MustOffer(ctx, task))
cancel()
wait()
pollResultMu.Lock()
defer pollResultMu.Unlock()
t.NoError(pollErr)
t.NotNil(polledTask)
t.NotNil(req)
t.NoError(err)
t.True(remoteSyncMatch)
Expand Down Expand Up @@ -499,15 +507,22 @@ func (t *MatcherTestSuite) TestIsolationMustOfferRemoteMatch() {
).Return(nil)

// Poll needs to happen before MustOffer, or else it goes into the non-blocking path.
var pollResultMu sync.Mutex
var polledTask *InternalTask
var pollErr error
wait := ensureAsyncReady(time.Second, func(ctx context.Context) {
task, err := t.matcher.Poll(ctx, "dca1")
t.Nil(err)
t.NotNil(task)
pollResultMu.Lock()
defer pollResultMu.Unlock()
polledTask, pollErr = t.matcher.Poll(ctx, "dca1")
})

t.NoError(t.matcher.MustOffer(ctx, task))
cancel()
wait()
pollResultMu.Lock()
defer pollResultMu.Unlock()
t.NoError(pollErr)
t.NotNil(polledTask)
t.NotNil(req)
t.NoError(err)
t.True(remoteSyncMatch)
Expand Down Expand Up @@ -627,6 +642,7 @@ func ensureAsyncAfterReady(ctxTimeout time.Duration, cb func(ctx context.Context

// Try to ensure a blocking callback is actively blocked in a goroutine before returning, so tests can
// ensure that the callback contents happen first.
// Do NOT access shared variables in the callback without mutex, as it may cause data races.
//
// This is a best-effort technique, as there is no way to reliably synchronize this kind of thing
// without exposing internal latches or having a more sophisticated locking library than Go offers.
Expand All @@ -635,8 +651,14 @@ func ensureAsyncAfterReady(ctxTimeout time.Duration, cb func(ctx context.Context
// Note that adding fmt.Println() calls touches synchronization code (for I/O), so it may change behavior.
func ensureAsyncReady(ctxTimeout time.Duration, cb func(ctx context.Context)) (wait func()) {
running := make(chan struct{})
closed := make(chan struct{})
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
go func() {
// Defers are stacked. The last one added is the first one to run.
// We want to cancel the context which will make the callback return because it's a Poll,
// and then close the closed channel.
// This way the returned wait function will block until the callback has returned.
defer close(closed)
defer cancel()

close(running)
Expand All @@ -650,6 +672,6 @@ func ensureAsyncReady(ctxTimeout time.Duration, cb func(ctx context.Context)) (w
time.Sleep(1 * time.Millisecond)

return func() {
<-ctx.Done()
<-closed
}
}

0 comments on commit 3c7c303

Please sign in to comment.