Skip to content

Commit

Permalink
Add dedup test
Browse files Browse the repository at this point in the history
  • Loading branch information
mbrt committed Apr 28, 2024
1 parent 4cc858a commit c12f74d
Showing 1 changed file with 41 additions and 23 deletions.
64 changes: 41 additions & 23 deletions internal/concurr/dedup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestContextExpired(t *testing.T) {
}

func TestMergeDo(t *testing.T) {
tw := &testMergeWorker{}
tw := &testMergeWorker{waitRequests: 1}
ctx := context.Background()
d := NewDedup(tw)
wg := errgroup.Group{}
Expand All @@ -52,11 +52,11 @@ func TestMergeDo(t *testing.T) {
assert.NoError(t, err)
err = wg.Wait()
assert.NoError(t, err)
assert.Equal(t, 2, tw.res)
assert.Equal(t, []int{2}, tw.res)
}

func TestSequentialDo(t *testing.T) {
tw := &testMergeWorker{}
tw := &testMergeWorker{waitRequests: 1}
ctx := context.Background()
d := NewDedup(tw)
wg := errgroup.Group{}
Expand All @@ -67,7 +67,25 @@ func TestSequentialDo(t *testing.T) {
assert.NoError(t, err)
err = wg.Wait()
assert.NoError(t, err)
assert.Equal(t, 1, tw.res)
assert.Equal(t, []int{1, 1}, tw.res)
}

func TestReorderMerge(t *testing.T) {
tw := &testMergeWorker{waitRequests: 2}
ctx := context.Background()
d := NewDedup(tw)
wg := errgroup.Group{}
wg.Go(func() error {
return d.Do(ctx, "key", unmergeableRequest(2))
})
wg.Go(func() error {
return d.Do(ctx, "key", reorderableRequest(3))
})
err := d.Do(ctx, "key", mergeableRequest(5))
assert.NoError(t, err)
err = wg.Wait()
assert.NoError(t, err)
assert.Equal(t, []int{8, 2}, tw.res)
}

type testWorker struct {
Expand All @@ -81,27 +99,22 @@ func (t *testWorker) Work(ctx context.Context, key string, cntr DedupContr) erro
}

type testMergeWorker struct {
res int
waitRequests int
res []int
}

func (t *testMergeWorker) Work(ctx context.Context, key string, cntr DedupContr) error {
r := cntr.Request(key).(testRequest)
if t.res > 0 {
// We are at the second request. Process it immediately.
t.res = r.counter
return nil
for t.waitRequests > 0 {
select {
case <-cntr.OnNextDo(key):
case <-ctx.Done():
return ctx.Err()
}
t.waitRequests--
}

// Wait for the next request to come in.
// This is just to make sure we're merging it with the next.
select {
case <-cntr.OnNextDo(key):
case <-ctx.Done():
return ctx.Err()
}

r = cntr.Request(key).(testRequest)
t.res = r.counter
r := cntr.Request(key).(testRequest)
t.res = append(t.res, r.counter)
return nil
}

Expand All @@ -113,12 +126,17 @@ func unmergeableRequest(counter int) testRequest {
return testRequest{counter: counter}
}

func reorderableRequest(counter int) testRequest {
return testRequest{counter: counter, canMerge: true, canReorder: true}
}

type testRequest struct {
counter int
canMerge bool
counter int
canMerge bool
canReorder bool
}

func (r testRequest) CanReorder() bool { return false }
func (r testRequest) CanReorder() bool { return r.canReorder }

func (r testRequest) Merge(other Request) (Request, bool) {
or, ok := other.(testRequest)
Expand Down

0 comments on commit c12f74d

Please sign in to comment.