From c0cd3b423592de16616544694b234d3f5a97665b Mon Sep 17 00:00:00 2001 From: kevindiu Date: Wed, 22 Jul 2020 15:56:45 +0900 Subject: [PATCH 01/16] add test --- internal/worker/worker_test.go | 157 +++++++++++++++++++++++++-------- 1 file changed, 118 insertions(+), 39 deletions(-) diff --git a/internal/worker/worker_test.go b/internal/worker/worker_test.go index ae5b8c477d..c9a1cce900 100644 --- a/internal/worker/worker_test.go +++ b/internal/worker/worker_test.go @@ -22,13 +22,21 @@ import ( "reflect" "sync/atomic" "testing" + "time" + "github.com/google/go-cmp/cmp" "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" - "go.uber.org/goleak" ) +var ( + // Goroutine leak is detected by `fastime`, but it should be ignored in the test because it is an external package. + goleakIgnoreOptions = []goleak.Option{ + goleak.IgnoreTopFunction("github.com/kpango/fastime.(*Fastime).StartTimerD.func1"), + } +) + func TestNew(t *testing.T) { type args struct { opts []WorkerOption @@ -49,42 +57,113 @@ func TestNew(t *testing.T) { if !errors.Is(err, w.err) { return errors.Errorf("got error = %v, want %v", err, w.err) } - if !reflect.DeepEqual(got, w.want) { - return errors.Errorf("got = %v, want %v", got, w.want) + + want := w.want.(*worker) + + avComparer := func(x, y atomic.Value) bool { + return reflect.DeepEqual(x.Load(), y.Load()) + } + egComparer := func(x, y errgroup.Group) bool { + return reflect.DeepEqual(x, y) + } + + queueOpts := []cmp.Option{ + cmp.AllowUnexported(*(want.queue.(*queue))), + cmp.Comparer(func(x, y chan JobFunc) bool { + return len(x) == len(y) + }), + cmp.Comparer(egComparer), + cmp.Comparer(avComparer), + } + + opts := []cmp.Option{ + cmp.AllowUnexported(*want), + cmp.Comparer(func(x, y queue) bool { + return cmp.Equal(x, y, queueOpts...) + }), + cmp.Comparer(avComparer), + cmp.Comparer(egComparer), + } + if diff := cmp.Diff(want, got, opts...); diff != "" { + return errors.New(diff) } return nil } tests := []test{ - // TODO test cases - /* - { - name: "test_case_1", - args: args { - opts: nil, - }, - want: want{}, - checkFunc: defaultCheckFunc, - }, - */ - - // TODO test cases - /* - func() test { - return test { - name: "test_case_2", - args: args { - opts: nil, - }, - want: want{}, - checkFunc: defaultCheckFunc, - } - }(), - */ + { + name: "return worker without option", + want: want{ + want: &worker{ + name: "worker", + limitation: 10, + eg: errgroup.Get(), + running: func() atomic.Value { + v := new(atomic.Value) + v.Store(false) + return *v + }(), + queue: &queue{ + buffer: 10, + eg: errgroup.Get(), + qcdur: 200 * time.Millisecond, + qLen: func() atomic.Value { + v := new(atomic.Value) + v.Store(uint64(0)) + return *v + }(), + running: func() atomic.Value { + v := new(atomic.Value) + v.Store(false) + return *v + }(), + inCh: make(chan JobFunc, 10), + outCh: make(chan JobFunc, 1), + }, + }, + }, + }, + { + name: "return worker with option", + args: args{ + opts: []WorkerOption{ + WithName("test1"), + }, + }, + want: want{ + want: &worker{ + name: "test1", + limitation: 10, + running: func() atomic.Value { + v := new(atomic.Value) + v.Store(false) + return *v + }(), + eg: errgroup.Get(), + queue: &queue{ + buffer: 10, + eg: errgroup.Get(), + qcdur: 200 * time.Millisecond, + qLen: func() atomic.Value { + v := new(atomic.Value) + v.Store(uint64(0)) + return *v + }(), + running: func() atomic.Value { + v := new(atomic.Value) + v.Store(false) + return *v + }(), + inCh: make(chan JobFunc, 10), + outCh: make(chan JobFunc, 1), + }, + }, + }, + }, } for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(t) + defer goleak.VerifyNone(tt, goleakIgnoreOptions...) if test.beforeFunc != nil { test.beforeFunc(test.args) } @@ -190,7 +269,7 @@ func Test_worker_Start(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(t) + defer goleak.VerifyNone(tt) if test.beforeFunc != nil { test.beforeFunc(test.args) } @@ -302,7 +381,7 @@ func Test_worker_startJobLoop(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(t) + defer goleak.VerifyNone(tt) if test.beforeFunc != nil { test.beforeFunc(test.args) } @@ -400,7 +479,7 @@ func Test_worker_Pause(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(t) + defer goleak.VerifyNone(tt) if test.beforeFunc != nil { test.beforeFunc() } @@ -497,7 +576,7 @@ func Test_worker_Resume(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(t) + defer goleak.VerifyNone(tt) if test.beforeFunc != nil { test.beforeFunc() } @@ -598,7 +677,7 @@ func Test_worker_IsRunning(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(t) + defer goleak.VerifyNone(tt) if test.beforeFunc != nil { test.beforeFunc() } @@ -700,7 +779,7 @@ func Test_worker_Name(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(t) + defer goleak.VerifyNone(tt) if test.beforeFunc != nil { test.beforeFunc() } @@ -802,7 +881,7 @@ func Test_worker_Len(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(t) + defer goleak.VerifyNone(tt) if test.beforeFunc != nil { test.beforeFunc() } @@ -904,7 +983,7 @@ func Test_worker_TotalRequested(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(t) + defer goleak.VerifyNone(tt) if test.beforeFunc != nil { test.beforeFunc() } @@ -1006,7 +1085,7 @@ func Test_worker_TotalCompleted(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(t) + defer goleak.VerifyNone(tt) if test.beforeFunc != nil { test.beforeFunc() } @@ -1121,7 +1200,7 @@ func Test_worker_Dispatch(t *testing.T) { for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(t) + defer goleak.VerifyNone(tt) if test.beforeFunc != nil { test.beforeFunc(test.args) } From d4248ccefe0da456f2004c29d3140712d0c8c474 Mon Sep 17 00:00:00 2001 From: kevindiu Date: Wed, 22 Jul 2020 17:33:02 +0900 Subject: [PATCH 02/16] create comparator --- internal/test/comparator/standard.go | 47 +++++++++ internal/worker/worker_test.go | 136 +++++++++++++++------------ 2 files changed, 123 insertions(+), 60 deletions(-) create mode 100644 internal/test/comparator/standard.go diff --git a/internal/test/comparator/standard.go b/internal/test/comparator/standard.go new file mode 100644 index 0000000000..fb1a2cb557 --- /dev/null +++ b/internal/test/comparator/standard.go @@ -0,0 +1,47 @@ +package comparator + +import ( + "reflect" + "sync/atomic" + + "github.com/vdaas/vald/internal/errgroup" +) + +type ( + atomicValue = atomic.Value + errorGroup = errgroup.Group +) + +var ( + AtomicValue = func(x, y atomicValue) bool { + return reflect.DeepEqual(x.Load(), y.Load()) + } + + ErrorGroup = func(x, y errorGroup) bool { + return reflect.DeepEqual(x, y) + } + + // channel comparator + + ErrorChannel = func(x, y <-chan error) bool { + if x == nil && y == nil { + return true + } + if x == nil || y == nil { + return false + } + + chanToSlice := func(c <-chan error) []error { + s := make([]error, 0) + for v := range c { + s = append(s, v) + } + return s + } + + s1 := chanToSlice(x) + s2 := chanToSlice(y) + + return reflect.DeepEqual(s1, s2) + } +) diff --git a/internal/worker/worker_test.go b/internal/worker/worker_test.go index c9a1cce900..cddeff91fe 100644 --- a/internal/worker/worker_test.go +++ b/internal/worker/worker_test.go @@ -27,6 +27,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/test/comparator" "go.uber.org/goleak" ) @@ -60,29 +61,22 @@ func TestNew(t *testing.T) { want := w.want.(*worker) - avComparer := func(x, y atomic.Value) bool { - return reflect.DeepEqual(x.Load(), y.Load()) - } - egComparer := func(x, y errgroup.Group) bool { - return reflect.DeepEqual(x, y) - } - queueOpts := []cmp.Option{ cmp.AllowUnexported(*(want.queue.(*queue))), cmp.Comparer(func(x, y chan JobFunc) bool { return len(x) == len(y) }), - cmp.Comparer(egComparer), - cmp.Comparer(avComparer), + cmp.Comparer(comparator.ErrorGroup), + cmp.Comparer(comparator.AtomicValue), } opts := []cmp.Option{ cmp.AllowUnexported(*want), - cmp.Comparer(func(x, y queue) bool { + cmp.Comparer(func(x, y Queue) bool { return cmp.Equal(x, y, queueOpts...) }), - cmp.Comparer(avComparer), - cmp.Comparer(egComparer), + cmp.Comparer(comparator.ErrorGroup), + cmp.Comparer(comparator.AtomicValue), } if diff := cmp.Diff(want, got, opts...); diff != "" { return errors.New(diff) @@ -214,62 +208,84 @@ func Test_worker_Start(t *testing.T) { if !errors.Is(err, w.err) { return errors.Errorf("got error = %v, want %v", err, w.err) } - if !reflect.DeepEqual(got, w.want) { - return errors.Errorf("got = %v, want %v", got, w.want) + + opts := []cmp.Option{ + cmp.Comparer(comparator.ErrorChannel), + } + + if diff := cmp.Diff(w.want, got, opts...); diff != "" { + return errors.New(diff) } return nil } tests := []test{ - // TODO test cases - /* - { - name: "test_case_1", - args: args { - ctx: nil, - }, - fields: fields { - name: "", - limitation: 0, - running: nil, - eg: nil, - queue: nil, - qopts: nil, - requestedCount: 0, - completedCount: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - }, - */ - - // TODO test cases - /* - func() test { - return test { - name: "test_case_2", - args: args { - ctx: nil, - }, - fields: fields { - name: "", - limitation: 0, - running: nil, - eg: nil, - queue: nil, - qopts: nil, - requestedCount: 0, - completedCount: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - } - }(), - */ + func() test { + ctx, cancel := context.WithCancel(context.Background()) + return test{ + name: "Start without error", + args: args{ + ctx: ctx, + }, + fields: fields{ + name: "worker", + limitation: 10, + eg: errgroup.Get(), + running: func() atomic.Value { + v := new(atomic.Value) + v.Store(false) + return *v + }(), + queue: &queue{ + buffer: 10, + eg: errgroup.Get(), + qcdur: 200 * time.Millisecond, + qLen: func() atomic.Value { + v := new(atomic.Value) + v.Store(uint64(0)) + return *v + }(), + running: func() atomic.Value { + v := new(atomic.Value) + v.Store(false) + return *v + }(), + inCh: make(chan JobFunc, 10), + outCh: make(chan JobFunc, 1), + }, + }, + want: want{ + want: func() <-chan error { + ch := make(chan error, 2) + close(ch) + return ch + }(), + }, + checkFunc: func(w want, got <-chan error, err error) error { + cancel() + return defaultCheckFunc(w, got, err) + }, + } + }(), + { + name: "return error if it is running", + args: args{}, + fields: fields{ + name: "test", + running: func() atomic.Value { + v := new(atomic.Value) + v.Store(true) + return *v + }(), + }, + want: want{ + err: errors.ErrWorkerIsAlreadyRunning("test"), + }, + }, } for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(tt) + defer goleak.VerifyNone(tt, goleakIgnoreOptions...) if test.beforeFunc != nil { test.beforeFunc(test.args) } From dc8ac26127ef5c8a4275397858585398556a3171 Mon Sep 17 00:00:00 2001 From: kevindiu Date: Wed, 29 Jul 2020 18:16:24 +0900 Subject: [PATCH 03/16] add test --- internal/worker/queue_mock_test.go | 50 ++ internal/worker/worker.go | 18 +- internal/worker/worker_test.go | 816 +++++++++++++++-------------- 3 files changed, 478 insertions(+), 406 deletions(-) create mode 100644 internal/worker/queue_mock_test.go diff --git a/internal/worker/queue_mock_test.go b/internal/worker/queue_mock_test.go new file mode 100644 index 0000000000..3adde1cc5e --- /dev/null +++ b/internal/worker/queue_mock_test.go @@ -0,0 +1,50 @@ +package worker + +import "context" + +var ( + DefaultStartFunc = func(context.Context) (<-chan error, error) { + return nil, nil + } + DefaultPushFunc = func(context.Context, JobFunc) error { + return nil + } + DefaultPopFunc = func(context.Context) (JobFunc, error) { + return nil, nil + } + DefaultLenFunc = func() uint64 { + return uint64(0) + } +) + +type QueueMock struct { + StartFunc func(context.Context) (<-chan error, error) + PushFunc func(context.Context, JobFunc) error + PopFunc func(context.Context) (JobFunc, error) + LenFunc func() uint64 +} + +func NewQueueMock() Queue { + return &QueueMock{ + StartFunc: DefaultStartFunc, + PushFunc: DefaultPushFunc, + PopFunc: DefaultPopFunc, + LenFunc: DefaultLenFunc, + } +} + +func (q *QueueMock) Start(ctx context.Context) (<-chan error, error) { + return q.StartFunc(ctx) +} + +func (q *QueueMock) Push(ctx context.Context, job JobFunc) error { + return q.PushFunc(ctx, job) +} + +func (q *QueueMock) Pop(ctx context.Context) (JobFunc, error) { + return q.PopFunc(ctx) +} + +func (q *QueueMock) Len() uint64 { + return q.LenFunc() +} diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 769c4058d4..739de80035 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -19,6 +19,7 @@ package worker import ( "context" + "fmt" "reflect" "sync/atomic" @@ -29,8 +30,10 @@ import ( "github.com/vdaas/vald/internal/safety" ) +// JobFunc represent the function of a job that work in the worker. type JobFunc func(context.Context) error +// Worker represent the worker interface to execute type Worker interface { Start(ctx context.Context) (<-chan error, error) Pause() @@ -126,17 +129,23 @@ func (w *worker) startJobLoop(ctx context.Context) <-chan error { ech := make(chan error, w.limitation) w.eg.Go(safety.RecoverFunc(func() (err error) { + defer close(ech) eg, ctx := errgroup.New(ctx) eg.Limitation(w.limitation) + limitation := make(chan struct{}, w.limitation) defer close(limitation) + for { select { case <-ctx.Done(): + var egErr error if err = ctx.Err(); err != nil { - return errors.Wrap(eg.Wait(), err.Error()) + egErr = eg.Wait() + return errors.Wrap(egErr, err.Error()) } - return eg.Wait() + fmt.Printf("return egerr: %v\n", egErr) + return egErr case limitation <- struct{}{}: } @@ -152,9 +161,8 @@ func (w *worker) startJobLoop(ctx context.Context) <-chan error { if job != nil { eg.Go(safety.RecoverFunc(func() (err error) { - defer atomic.AddUint64(&w.completedCount, 1) - err = job(ctx) - if err != nil { + atomic.AddUint64(&w.completedCount, 1) + if err = job(ctx); err != nil { log.Debugf("an error occurred while executing a job: %s", err) ech <- err } diff --git a/internal/worker/worker_test.go b/internal/worker/worker_test.go index cddeff91fe..739fac10e3 100644 --- a/internal/worker/worker_test.go +++ b/internal/worker/worker_test.go @@ -27,6 +27,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/test/comparator" "go.uber.org/goleak" ) @@ -235,23 +236,7 @@ func Test_worker_Start(t *testing.T) { v.Store(false) return *v }(), - queue: &queue{ - buffer: 10, - eg: errgroup.Get(), - qcdur: 200 * time.Millisecond, - qLen: func() atomic.Value { - v := new(atomic.Value) - v.Store(uint64(0)) - return *v - }(), - running: func() atomic.Value { - v := new(atomic.Value) - v.Store(false) - return *v - }(), - inCh: make(chan JobFunc, 10), - outCh: make(chan JobFunc, 1), - }, + queue: NewQueueMock(), }, want: want{ want: func() <-chan error { @@ -281,6 +266,30 @@ func Test_worker_Start(t *testing.T) { err: errors.ErrWorkerIsAlreadyRunning("test"), }, }, + { + name: "return queue start error", + args: args{ + ctx: context.Background(), + }, + fields: fields{ + name: "worker", + limitation: 10, + eg: errgroup.Get(), + running: func() atomic.Value { + v := new(atomic.Value) + v.Store(false) + return *v + }(), + queue: &QueueMock{ + StartFunc: func(context.Context) (<-chan error, error) { + return nil, errors.New("error") + }, + }, + }, + want: want{ + err: errors.New("error"), + }, + }, } for _, test := range tests { @@ -342,62 +351,200 @@ func Test_worker_startJobLoop(t *testing.T) { afterFunc func(args) } defaultCheckFunc := func(w want, got <-chan error) error { - if !reflect.DeepEqual(got, w.want) { - return errors.Errorf("got = %v, want %v", got, w.want) + if !comparator.ErrorChannel(w.want, got) { + return errors.New("error") } return nil } tests := []test{ - // TODO test cases - /* - { - name: "test_case_1", - args: args { - ctx: nil, - }, - fields: fields { - name: "", - limitation: 0, - running: nil, - eg: nil, - queue: nil, - qopts: nil, - requestedCount: 0, - completedCount: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - }, - */ + func() test { + ctx, cancel := context.WithCancel(context.Background()) + return test{ + name: "start job loop with empty job list", + args: args{ + ctx: ctx, + }, + fields: fields{ + name: "test", + limitation: 1, + running: atomic.Value{}, + eg: errgroup.Get(), + queue: NewQueueMock(), + requestedCount: 0, + completedCount: 0, + }, + want: want{ + want: func() <-chan error { + ch := make(chan error, 1) + close(ch) + return ch + }(), + }, + checkFunc: func(w want, got <-chan error) error { + time.Sleep(time.Millisecond * 200) + cancel() + time.Sleep(time.Millisecond * 200) - // TODO test cases - /* - func() test { - return test { - name: "test_case_2", - args: args { - ctx: nil, - }, - fields: fields { - name: "", - limitation: 0, - running: nil, - eg: nil, - queue: nil, - qopts: nil, - requestedCount: 0, - completedCount: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - } - }(), - */ + return defaultCheckFunc(w, got) + }, + } + }(), + func() test { + ctx, cancel := context.WithCancel(context.Background()) + err := errors.New("error") + return test{ + name: "start job loop with queue pop error", + args: args{ + ctx: ctx, + }, + fields: fields{ + name: "test", + limitation: 1, + running: atomic.Value{}, + eg: errgroup.Get(), + queue: &QueueMock{ + StartFunc: DefaultStartFunc, + PopFunc: func(context.Context) (JobFunc, error) { + return nil, err + }, + }, + requestedCount: 0, + completedCount: 0, + }, + checkFunc: func(w want, got <-chan error) error { + time.Sleep(time.Millisecond * 200) + cancel() + time.Sleep(time.Millisecond * 200) + + if len(got) == 0 { + return errors.New("got chan len 0") + } + for e := range got { + if e != err { + return errors.New("invalid error") + } + } + return nil + }, + } + }(), + func() test { + ctx, cancel := context.WithCancel(context.Background()) + return test{ + name: "start job loop with a job", + args: args{ + ctx: ctx, + }, + fields: fields{ + name: "test", + limitation: 1, + running: atomic.Value{}, + eg: errgroup.Get(), + queue: &QueueMock{ + StartFunc: DefaultStartFunc, + PopFunc: func(context.Context) (JobFunc, error) { + f := JobFunc(func(context.Context) error { + return nil + }) + return f, nil + }, + }, + requestedCount: 0, + completedCount: 0, + }, + checkFunc: func(w want, got <-chan error) error { + time.Sleep(time.Millisecond * 200) + cancel() + time.Sleep(time.Millisecond * 200) + + if len(got) != 0 { + return errors.New("error returned") + } + return nil + }, + } + }(), + func() test { + ctx, cancel := context.WithCancel(context.Background()) + err := errors.New("error") + return test{ + name: "start job loop with a job which return error", + args: args{ + ctx: ctx, + }, + fields: fields{ + name: "test", + limitation: 1, + running: atomic.Value{}, + eg: errgroup.Get(), + queue: &QueueMock{ + StartFunc: DefaultStartFunc, + PopFunc: func(context.Context) (JobFunc, error) { + f := JobFunc(func(context.Context) error { + return err + }) + return f, nil + }, + }, + requestedCount: 0, + completedCount: 0, + }, + checkFunc: func(w want, got <-chan error) error { + time.Sleep(time.Millisecond * 200) + cancel() + time.Sleep(time.Millisecond * 200) + + if len(got) == 0 { + return errors.New("got chan len 0") + } + for e := range got { + if e != err { + return errors.New("invalid error") + } + } + return nil + }, + } + }(), + func() test { + ctx, cancel := context.WithCancel(context.Background()) + return test{ + name: "start job loop with queue pop a nil job without error", + args: args{ + ctx: ctx, + }, + fields: fields{ + name: "test", + limitation: 1, + running: atomic.Value{}, + eg: errgroup.Get(), + queue: &QueueMock{ + StartFunc: DefaultStartFunc, + PopFunc: func(context.Context) (JobFunc, error) { + return nil, nil + }, + }, + requestedCount: 0, + completedCount: 0, + }, + checkFunc: func(w want, got <-chan error) error { + time.Sleep(time.Millisecond * 200) + cancel() + time.Sleep(time.Millisecond * 200) + + if len(got) != 0 { + return errors.New("got error") + } + return nil + }, + } + }(), } + log.Init() for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(tt) + defer goleak.VerifyNone(tt, goleakIgnoreOptions...) if test.beforeFunc != nil { test.beforeFunc(test.args) } @@ -444,58 +591,35 @@ func Test_worker_Pause(t *testing.T) { name string fields fields want want - checkFunc func(want) error + checkFunc func(want, *worker) error beforeFunc func() afterFunc func() } - defaultCheckFunc := func(w want) error { + defaultCheckFunc := func(w want, worker *worker) error { return nil } tests := []test{ - // TODO test cases - /* - { - name: "test_case_1", - fields: fields { - name: "", - limitation: 0, - running: nil, - eg: nil, - queue: nil, - qopts: nil, - requestedCount: 0, - completedCount: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - }, - */ - - // TODO test cases - /* - func() test { - return test { - name: "test_case_2", - fields: fields { - name: "", - limitation: 0, - running: nil, - eg: nil, - queue: nil, - qopts: nil, - requestedCount: 0, - completedCount: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - } - }(), - */ + { + name: "Pause success", + fields: fields{ + running: func() atomic.Value { + v := &atomic.Value{} + v.Store(true) + return *v + }(), + }, + checkFunc: func(w want, worker *worker) error { + if worker.running.Load().(bool) != false { + return errors.New("running is not false") + } + return nil + }, + }, } for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(tt) + defer goleak.VerifyNone(tt, goleakIgnoreOptions...) if test.beforeFunc != nil { test.beforeFunc() } @@ -517,7 +641,7 @@ func Test_worker_Pause(t *testing.T) { } w.Pause() - if err := test.checkFunc(test.want); err != nil { + if err := test.checkFunc(test.want, w); err != nil { tt.Errorf("error = %v", err) } }) @@ -541,58 +665,35 @@ func Test_worker_Resume(t *testing.T) { name string fields fields want want - checkFunc func(want) error + checkFunc func(want, *worker) error beforeFunc func() afterFunc func() } - defaultCheckFunc := func(w want) error { + defaultCheckFunc := func(w want, worker *worker) error { return nil } tests := []test{ - // TODO test cases - /* - { - name: "test_case_1", - fields: fields { - name: "", - limitation: 0, - running: nil, - eg: nil, - queue: nil, - qopts: nil, - requestedCount: 0, - completedCount: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - }, - */ - - // TODO test cases - /* - func() test { - return test { - name: "test_case_2", - fields: fields { - name: "", - limitation: 0, - running: nil, - eg: nil, - queue: nil, - qopts: nil, - requestedCount: 0, - completedCount: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - } - }(), - */ + { + name: "Resume success", + fields: fields{ + running: func() atomic.Value { + v := &atomic.Value{} + v.Store(false) + return *v + }(), + }, + checkFunc: func(w want, worker *worker) error { + if worker.running.Load().(bool) != true { + return errors.New("running is not false") + } + return nil + }, + }, } for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(tt) + defer goleak.VerifyNone(tt, goleakIgnoreOptions...) if test.beforeFunc != nil { test.beforeFunc() } @@ -614,7 +715,7 @@ func Test_worker_Resume(t *testing.T) { } w.Resume() - if err := test.checkFunc(test.want); err != nil { + if err := test.checkFunc(test.want, w); err != nil { tt.Errorf("error = %v", err) } }) @@ -650,50 +751,37 @@ func Test_worker_IsRunning(t *testing.T) { return nil } tests := []test{ - // TODO test cases - /* - { - name: "test_case_1", - fields: fields { - name: "", - limitation: 0, - running: nil, - eg: nil, - queue: nil, - qopts: nil, - requestedCount: 0, - completedCount: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - }, - */ - - // TODO test cases - /* - func() test { - return test { - name: "test_case_2", - fields: fields { - name: "", - limitation: 0, - running: nil, - eg: nil, - queue: nil, - qopts: nil, - requestedCount: 0, - completedCount: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - } - }(), - */ + { + name: "return true if it is running", + fields: fields{ + running: func() atomic.Value { + v := &atomic.Value{} + v.Store(true) + return *v + }(), + }, + want: want{ + want: true, + }, + }, + { + name: "return false if it is not running", + fields: fields{ + running: func() atomic.Value { + v := &atomic.Value{} + v.Store(false) + return *v + }(), + }, + want: want{ + want: false, + }, + }, } for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(tt) + defer goleak.VerifyNone(tt, goleakIgnoreOptions...) if test.beforeFunc != nil { test.beforeFunc() } @@ -752,50 +840,20 @@ func Test_worker_Name(t *testing.T) { return nil } tests := []test{ - // TODO test cases - /* - { - name: "test_case_1", - fields: fields { - name: "", - limitation: 0, - running: nil, - eg: nil, - queue: nil, - qopts: nil, - requestedCount: 0, - completedCount: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - }, - */ - - // TODO test cases - /* - func() test { - return test { - name: "test_case_2", - fields: fields { - name: "", - limitation: 0, - running: nil, - eg: nil, - queue: nil, - qopts: nil, - requestedCount: 0, - completedCount: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - } - }(), - */ + { + name: "return name", + fields: fields{ + name: "testname", + }, + want: want{ + want: "testname", + }, + }, } for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(tt) + defer goleak.VerifyNone(tt, goleakIgnoreOptions...) if test.beforeFunc != nil { test.beforeFunc() } @@ -854,50 +912,24 @@ func Test_worker_Len(t *testing.T) { return nil } tests := []test{ - // TODO test cases - /* - { - name: "test_case_1", - fields: fields { - name: "", - limitation: 0, - running: nil, - eg: nil, - queue: nil, - qopts: nil, - requestedCount: 0, - completedCount: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - }, - */ - - // TODO test cases - /* - func() test { - return test { - name: "test_case_2", - fields: fields { - name: "", - limitation: 0, - running: nil, - eg: nil, - queue: nil, - qopts: nil, - requestedCount: 0, - completedCount: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - } - }(), - */ + { + name: "return queue length", + fields: fields{ + queue: &QueueMock{ + LenFunc: func() uint64 { + return uint64(100) + }, + }, + }, + want: want{ + want: uint64(100), + }, + }, } for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(tt) + defer goleak.VerifyNone(tt, goleakIgnoreOptions...) if test.beforeFunc != nil { test.beforeFunc() } @@ -956,50 +988,20 @@ func Test_worker_TotalRequested(t *testing.T) { return nil } tests := []test{ - // TODO test cases - /* - { - name: "test_case_1", - fields: fields { - name: "", - limitation: 0, - running: nil, - eg: nil, - queue: nil, - qopts: nil, - requestedCount: 0, - completedCount: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - }, - */ - - // TODO test cases - /* - func() test { - return test { - name: "test_case_2", - fields: fields { - name: "", - limitation: 0, - running: nil, - eg: nil, - queue: nil, - qopts: nil, - requestedCount: 0, - completedCount: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - } - }(), - */ + { + name: "return total count", + fields: fields{ + requestedCount: 1000, + }, + want: want{ + want: 1000, + }, + }, } for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(tt) + defer goleak.VerifyNone(tt, goleakIgnoreOptions...) if test.beforeFunc != nil { test.beforeFunc() } @@ -1058,50 +1060,20 @@ func Test_worker_TotalCompleted(t *testing.T) { return nil } tests := []test{ - // TODO test cases - /* - { - name: "test_case_1", - fields: fields { - name: "", - limitation: 0, - running: nil, - eg: nil, - queue: nil, - qopts: nil, - requestedCount: 0, - completedCount: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - }, - */ - - // TODO test cases - /* - func() test { - return test { - name: "test_case_2", - fields: fields { - name: "", - limitation: 0, - running: nil, - eg: nil, - queue: nil, - qopts: nil, - requestedCount: 0, - completedCount: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - } - }(), - */ + { + name: "return total count", + fields: fields{ + completedCount: 1000, + }, + want: want{ + want: 1000, + }, + }, } for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(tt) + defer goleak.VerifyNone(tt, goleakIgnoreOptions...) if test.beforeFunc != nil { test.beforeFunc() } @@ -1154,69 +1126,111 @@ func Test_worker_Dispatch(t *testing.T) { args args fields fields want want - checkFunc func(want, error) error + checkFunc func(*worker, want, error) error beforeFunc func(args) afterFunc func(args) } - defaultCheckFunc := func(w want, err error) error { + defaultCheckFunc := func(worker *worker, w want, err error) error { if !errors.Is(err, w.err) { return errors.Errorf("got error = %v, want %v", err, w.err) } return nil } tests := []test{ - // TODO test cases - /* - { - name: "test_case_1", - args: args { - ctx: nil, - f: nil, - }, - fields: fields { - name: "", - limitation: 0, - running: nil, - eg: nil, - queue: nil, - qopts: nil, - requestedCount: 0, - completedCount: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - }, - */ - - // TODO test cases - /* - func() test { - return test { - name: "test_case_2", - args: args { - ctx: nil, - f: nil, - }, - fields: fields { - name: "", - limitation: 0, - running: nil, - eg: nil, - queue: nil, - qopts: nil, - requestedCount: 0, - completedCount: 0, - }, - want: want{}, - checkFunc: defaultCheckFunc, - } - }(), - */ + { + name: "return error if the worker is not started yet", + args: args{ + ctx: context.Background(), + }, + fields: fields{ + name: "test", + running: func() atomic.Value { + v := &atomic.Value{} + v.Store(false) + return *v + }(), + }, + want: want{ + err: errors.ErrWorkerIsNotRunning("test"), + }, + }, + { + name: "return error if the job is failed to push to worker queue", + args: args{ + ctx: context.Background(), + f: JobFunc(func(context.Context) error { + return nil + }), + }, + fields: fields{ + name: "test", + running: func() atomic.Value { + v := &atomic.Value{} + v.Store(true) + return *v + }(), + queue: &QueueMock{ + PushFunc: func(context.Context, JobFunc) error { + return errors.New("queue push error") + }, + }, + }, + want: want{ + err: errors.New("queue push error"), + }, + }, + { + name: "return nil if the job is nil", + args: args{ + ctx: context.Background(), + f: nil, + }, + fields: fields{ + name: "test", + running: func() atomic.Value { + v := &atomic.Value{} + v.Store(true) + return *v + }(), + queue: &QueueMock{}, + }, + want: want{}, + }, + { + name: "request count is incremented if the job is pushed", + args: args{ + ctx: context.Background(), + f: JobFunc(func(context.Context) error { + return nil + }), + }, + fields: fields{ + name: "test", + running: func() atomic.Value { + v := &atomic.Value{} + v.Store(true) + return *v + }(), + queue: &QueueMock{ + PushFunc: func(context.Context, JobFunc) error { + return nil + }, + }, + requestedCount: uint64(999), + }, + want: want{}, + checkFunc: func(worker *worker, w want, err error) error { + if worker.requestedCount != uint64(1000) { + return errors.New("requestedCount is not incremented") + } + return defaultCheckFunc(worker, w, err) + }, + }, } for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - defer goleak.VerifyNone(tt) + defer goleak.VerifyNone(tt, goleakIgnoreOptions...) if test.beforeFunc != nil { test.beforeFunc(test.args) } @@ -1238,7 +1252,7 @@ func Test_worker_Dispatch(t *testing.T) { } err := w.Dispatch(test.args.ctx, test.args.f) - if err := test.checkFunc(test.want, err); err != nil { + if err := test.checkFunc(w, test.want, err); err != nil { tt.Errorf("error = %v", err) } From 9d751ca63c02d38e8972364b0bd1f5c8070fea59 Mon Sep 17 00:00:00 2001 From: kevindiu Date: Wed, 29 Jul 2020 18:21:40 +0900 Subject: [PATCH 04/16] remove unused code --- internal/worker/worker.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 739de80035..18c26d2e2e 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -19,7 +19,6 @@ package worker import ( "context" - "fmt" "reflect" "sync/atomic" @@ -33,7 +32,7 @@ import ( // JobFunc represent the function of a job that work in the worker. type JobFunc func(context.Context) error -// Worker represent the worker interface to execute +// Worker represent the worker interface to execute jobs. type Worker interface { Start(ctx context.Context) (<-chan error, error) Pause() @@ -57,6 +56,7 @@ type worker struct { completedCount uint64 } +// New initialize and return the worker, or return initialization error if occurred. func New(opts ...WorkerOption) (Worker, error) { w := new(worker) for _, opt := range append(defaultWorkerOpts, opts...) { @@ -81,6 +81,7 @@ func New(opts ...WorkerOption) (Worker, error) { return w, nil } +// Start start execute jobs in the worker queue. It returns the error channel that the job return, and the error if start failed. func (w *worker) Start(ctx context.Context) (<-chan error, error) { if w.IsRunning() { return nil, errors.ErrWorkerIsAlreadyRunning(w.Name()) @@ -144,7 +145,6 @@ func (w *worker) startJobLoop(ctx context.Context) <-chan error { egErr = eg.Wait() return errors.Wrap(egErr, err.Error()) } - fmt.Printf("return egerr: %v\n", egErr) return egErr case limitation <- struct{}{}: } @@ -161,11 +161,11 @@ func (w *worker) startJobLoop(ctx context.Context) <-chan error { if job != nil { eg.Go(safety.RecoverFunc(func() (err error) { - atomic.AddUint64(&w.completedCount, 1) if err = job(ctx); err != nil { log.Debugf("an error occurred while executing a job: %s", err) ech <- err } + atomic.AddUint64(&w.completedCount, 1) select { case <-limitation: case <-ctx.Done(): @@ -184,34 +184,44 @@ func (w *worker) startJobLoop(ctx context.Context) <-chan error { return ech } +// Pause pause the execution of the worker. func (w *worker) Pause() { w.running.Store(false) } +// Resume resume the execution of the worker. func (w *worker) Resume() { w.running.Store(true) } +// IsRunning return if the worker is running or not. func (w *worker) IsRunning() bool { return w.running.Load().(bool) } +// Name return the worker name. func (w *worker) Name() string { return w.name } +// Len return the length of the worker queue. func (w *worker) Len() uint64 { return w.queue.Len() } +// TotalRequested return the number of job that dispatched to the worker. func (w *worker) TotalRequested() uint64 { return atomic.LoadUint64(&w.requestedCount) } +// TotalCompleted return the number of completed job. func (w *worker) TotalCompleted() uint64 { return atomic.LoadUint64(&w.completedCount) } +// Dispatch dispatch the job to the worker and waiting for worker to process it. +// The job error is push to the error channel that Start() return. +// This function will return error if the job cannot be dispatch to the worker queue, or the worker is not running. func (w *worker) Dispatch(ctx context.Context, f JobFunc) error { ctx, span := trace.StartSpan(ctx, "vald/internal/worker/Worker.Dispatch") defer func() { From d05d6e317bd2d20d85b66c7ddd534e4750822035 Mon Sep 17 00:00:00 2001 From: kevindiu Date: Thu, 30 Jul 2020 10:17:15 +0900 Subject: [PATCH 05/16] update comment --- internal/worker/worker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 18c26d2e2e..4dd877fe51 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -184,12 +184,12 @@ func (w *worker) startJobLoop(ctx context.Context) <-chan error { return ech } -// Pause pause the execution of the worker. +// Pause stop allowing new job to be dispatched to the worker. func (w *worker) Pause() { w.running.Store(false) } -// Resume resume the execution of the worker. +// Resume resume to allow new job to be dispatched to the worker. func (w *worker) Resume() { w.running.Store(true) } From c59a6fd33f3e6b6c4ad601aee835b3df146d0f71 Mon Sep 17 00:00:00 2001 From: kevindiu Date: Thu, 30 Jul 2020 12:06:47 +0900 Subject: [PATCH 06/16] do not use comparator --- internal/worker/worker_test.go | 54 +++++++++++++++++++++++++++------- 1 file changed, 43 insertions(+), 11 deletions(-) diff --git a/internal/worker/worker_test.go b/internal/worker/worker_test.go index 739fac10e3..abe9d5d365 100644 --- a/internal/worker/worker_test.go +++ b/internal/worker/worker_test.go @@ -28,7 +28,6 @@ import ( "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" - "github.com/vdaas/vald/internal/test/comparator" "go.uber.org/goleak" ) @@ -60,6 +59,12 @@ func TestNew(t *testing.T) { return errors.Errorf("got error = %v, want %v", err, w.err) } + egComparator := func(x, y errgroup.Group) bool { + return reflect.DeepEqual(x, y) + } + atomicValueComparator := func(x, y atomic.Value) bool { + return reflect.DeepEqual(x.Load(), y.Load()) + } want := w.want.(*worker) queueOpts := []cmp.Option{ @@ -67,17 +72,16 @@ func TestNew(t *testing.T) { cmp.Comparer(func(x, y chan JobFunc) bool { return len(x) == len(y) }), - cmp.Comparer(comparator.ErrorGroup), - cmp.Comparer(comparator.AtomicValue), + cmp.Comparer(egComparator), + cmp.Comparer(atomicValueComparator), } - opts := []cmp.Option{ cmp.AllowUnexported(*want), cmp.Comparer(func(x, y Queue) bool { return cmp.Equal(x, y, queueOpts...) }), - cmp.Comparer(comparator.ErrorGroup), - cmp.Comparer(comparator.AtomicValue), + cmp.Comparer(egComparator), + cmp.Comparer(atomicValueComparator), } if diff := cmp.Diff(want, got, opts...); diff != "" { return errors.New(diff) @@ -210,12 +214,24 @@ func Test_worker_Start(t *testing.T) { return errors.Errorf("got error = %v, want %v", err, w.err) } - opts := []cmp.Option{ - cmp.Comparer(comparator.ErrorChannel), + ecComparator := func(x, y <-chan error) bool { + if x == nil && y == nil { + return true + } + if x == nil || y == nil || len(x) != len(y) { + return false + } + + for e := range x { + if e1 := <-y; !errors.Is(e, e1) { + return false + } + } + return true } - if diff := cmp.Diff(w.want, got, opts...); diff != "" { - return errors.New(diff) + if !ecComparator(w.want, got) { + return errors.New("error") } return nil } @@ -351,7 +367,23 @@ func Test_worker_startJobLoop(t *testing.T) { afterFunc func(args) } defaultCheckFunc := func(w want, got <-chan error) error { - if !comparator.ErrorChannel(w.want, got) { + ecComparator := func(x, y <-chan error) bool { + if x == nil && y == nil { + return true + } + if x == nil || y == nil || len(x) != len(y) { + return false + } + + for e := range x { + if e1 := <-y; !errors.Is(e, e1) { + return false + } + } + return true + } + + if !ecComparator(w.want, got) { return errors.New("error") } return nil From 2e02a39c3997397386fc7671e2467581c42d073a Mon Sep 17 00:00:00 2001 From: kevindiu Date: Fri, 31 Jul 2020 13:32:48 +0900 Subject: [PATCH 07/16] comment unused code --- internal/test/comparator/standard.go | 33 ++++++++++++---------------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/internal/test/comparator/standard.go b/internal/test/comparator/standard.go index fb1a2cb557..97be32b813 100644 --- a/internal/test/comparator/standard.go +++ b/internal/test/comparator/standard.go @@ -1,7 +1,6 @@ package comparator import ( - "reflect" "sync/atomic" "github.com/vdaas/vald/internal/errgroup" @@ -12,6 +11,7 @@ type ( errorGroup = errgroup.Group ) +/* var ( AtomicValue = func(x, y atomicValue) bool { return reflect.DeepEqual(x.Load(), y.Load()) @@ -23,25 +23,20 @@ var ( // channel comparator - ErrorChannel = func(x, y <-chan error) bool { - if x == nil && y == nil { - return true - } - if x == nil || y == nil { - return false - } + ErrChannel := func(x, y <-chan error) bool { + if x == nil && y == nil { + return true + } + if x == nil || y == nil || len(x) != len(y) { + return false + } - chanToSlice := func(c <-chan error) []error { - s := make([]error, 0) - for v := range c { - s = append(s, v) + for e := range x { + if e1 := <-y; !errors.Is(e, e1) { + return false + } } - return s + return true } - - s1 := chanToSlice(x) - s2 := chanToSlice(y) - - return reflect.DeepEqual(s1, s2) - } ) +*/ From eba71b85398522888ae5bdce8fc10b785f595f8b Mon Sep 17 00:00:00 2001 From: kevindiu Date: Mon, 3 Aug 2020 10:43:19 +0900 Subject: [PATCH 08/16] fix --- internal/test/comparator/standard.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/test/comparator/standard.go b/internal/test/comparator/standard.go index 97be32b813..5c5b4aba1f 100644 --- a/internal/test/comparator/standard.go +++ b/internal/test/comparator/standard.go @@ -3,12 +3,14 @@ package comparator import ( "sync/atomic" + "github.com/google/go-cmp/cmp" "github.com/vdaas/vald/internal/errgroup" ) type ( atomicValue = atomic.Value errorGroup = errgroup.Group + Option = cmp.Option ) /* From deba800b2658b9d5a90f5505429ba232e14d93e9 Mon Sep 17 00:00:00 2001 From: kevindiu Date: Mon, 3 Aug 2020 14:39:33 +0900 Subject: [PATCH 09/16] fix --- internal/worker/worker.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 4dd877fe51..53f427ebfb 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -29,10 +29,10 @@ import ( "github.com/vdaas/vald/internal/safety" ) -// JobFunc represent the function of a job that work in the worker. +// JobFunc represents the function of a job that works in the worker. type JobFunc func(context.Context) error -// Worker represent the worker interface to execute jobs. +// Worker represents the worker interface to execute jobs. type Worker interface { Start(ctx context.Context) (<-chan error, error) Pause() @@ -56,7 +56,7 @@ type worker struct { completedCount uint64 } -// New initialize and return the worker, or return initialization error if occurred. +// New initializes and return the worker, or return initialization error if occurred. func New(opts ...WorkerOption) (Worker, error) { w := new(worker) for _, opt := range append(defaultWorkerOpts, opts...) { @@ -81,7 +81,7 @@ func New(opts ...WorkerOption) (Worker, error) { return w, nil } -// Start start execute jobs in the worker queue. It returns the error channel that the job return, and the error if start failed. +// Start starts execute jobs in the worker queue. It returns the error channel that the job return, and the error if start failed. func (w *worker) Start(ctx context.Context) (<-chan error, error) { if w.IsRunning() { return nil, errors.ErrWorkerIsAlreadyRunning(w.Name()) @@ -184,44 +184,44 @@ func (w *worker) startJobLoop(ctx context.Context) <-chan error { return ech } -// Pause stop allowing new job to be dispatched to the worker. +// Pause stops allowing new job to be dispatched to the worker. func (w *worker) Pause() { w.running.Store(false) } -// Resume resume to allow new job to be dispatched to the worker. +// Resume resumes to allow new jobs to be dispatched to the worker. func (w *worker) Resume() { w.running.Store(true) } -// IsRunning return if the worker is running or not. +// IsRunning returns if the worker is running or not. func (w *worker) IsRunning() bool { return w.running.Load().(bool) } -// Name return the worker name. +// Name returns the worker name. func (w *worker) Name() string { return w.name } -// Len return the length of the worker queue. +// Len returns the length of the worker queue. func (w *worker) Len() uint64 { return w.queue.Len() } -// TotalRequested return the number of job that dispatched to the worker. +// TotalRequested returns the number of jobs that dispatched to the worker. func (w *worker) TotalRequested() uint64 { return atomic.LoadUint64(&w.requestedCount) } -// TotalCompleted return the number of completed job. +// TotalCompleted returns the number of completed job. func (w *worker) TotalCompleted() uint64 { return atomic.LoadUint64(&w.completedCount) } -// Dispatch dispatch the job to the worker and waiting for worker to process it. -// The job error is push to the error channel that Start() return. -// This function will return error if the job cannot be dispatch to the worker queue, or the worker is not running. +// Dispatch dispatches the job to the worker and waiting for the worker to process it. +// The job error is pushed to the error channel that Start() return. +// This function will return an error if the job cannot be dispatch to the worker queue, or the worker is not running. func (w *worker) Dispatch(ctx context.Context, f JobFunc) error { ctx, span := trace.StartSpan(ctx, "vald/internal/worker/Worker.Dispatch") defer func() { From 4c684b9a26ce627249b9ef45117a8f4ad5a68aa0 Mon Sep 17 00:00:00 2001 From: kevindiu Date: Mon, 3 Aug 2020 16:02:35 +0900 Subject: [PATCH 10/16] fix --- internal/worker/worker.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 53f427ebfb..613746f21e 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -81,7 +81,8 @@ func New(opts ...WorkerOption) (Worker, error) { return w, nil } -// Start starts execute jobs in the worker queue. It returns the error channel that the job return, and the error if start failed. +// Start starts execute jobs in the worker queue. +// It returns the error channel that the job return, and the error if start failed. func (w *worker) Start(ctx context.Context) (<-chan error, error) { if w.IsRunning() { return nil, errors.ErrWorkerIsAlreadyRunning(w.Name()) From 3a62b48aa88c75cd4180f23ca6f69740ddc836ce Mon Sep 17 00:00:00 2001 From: kevindiu Date: Tue, 4 Aug 2020 11:24:37 +0900 Subject: [PATCH 11/16] revert changes and use alias for go-cmp --- internal/test/comparator/standard.go | 10 +++++++++- internal/worker/worker.go | 8 +++----- internal/worker/worker_test.go | 27 ++++++++++++++------------- 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/internal/test/comparator/standard.go b/internal/test/comparator/standard.go index 5c5b4aba1f..d428156f95 100644 --- a/internal/test/comparator/standard.go +++ b/internal/test/comparator/standard.go @@ -10,7 +10,15 @@ import ( type ( atomicValue = atomic.Value errorGroup = errgroup.Group - Option = cmp.Option + + Option = cmp.Option +) + +var ( + AllowUnexported = cmp.AllowUnexported + Comparer = cmp.Comparer + Diff = cmp.Diff + Equal = cmp.Equal ) /* diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 613746f21e..0acbd5d03b 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -141,12 +141,10 @@ func (w *worker) startJobLoop(ctx context.Context) <-chan error { for { select { case <-ctx.Done(): - var egErr error if err = ctx.Err(); err != nil { - egErr = eg.Wait() - return errors.Wrap(egErr, err.Error()) + return errors.Wrap(eg.Wait(), err.Error()) } - return egErr + return eg.Wait() case limitation <- struct{}{}: } @@ -162,11 +160,11 @@ func (w *worker) startJobLoop(ctx context.Context) <-chan error { if job != nil { eg.Go(safety.RecoverFunc(func() (err error) { + defer atomic.AddUint64(&w.completedCount, 1) if err = job(ctx); err != nil { log.Debugf("an error occurred while executing a job: %s", err) ech <- err } - atomic.AddUint64(&w.completedCount, 1) select { case <-limitation: case <-ctx.Done(): diff --git a/internal/worker/worker_test.go b/internal/worker/worker_test.go index abe9d5d365..c6b6527dcf 100644 --- a/internal/worker/worker_test.go +++ b/internal/worker/worker_test.go @@ -24,10 +24,11 @@ import ( "testing" "time" - "github.com/google/go-cmp/cmp" "github.com/vdaas/vald/internal/errgroup" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" + "github.com/vdaas/vald/internal/test/comparator" + "go.uber.org/goleak" ) @@ -67,23 +68,23 @@ func TestNew(t *testing.T) { } want := w.want.(*worker) - queueOpts := []cmp.Option{ - cmp.AllowUnexported(*(want.queue.(*queue))), - cmp.Comparer(func(x, y chan JobFunc) bool { + queueOpts := []comparator.Option{ + comparator.AllowUnexported(*(want.queue.(*queue))), + comparator.Comparer(func(x, y chan JobFunc) bool { return len(x) == len(y) }), - cmp.Comparer(egComparator), - cmp.Comparer(atomicValueComparator), + comparator.Comparer(egComparator), + comparator.Comparer(atomicValueComparator), } - opts := []cmp.Option{ - cmp.AllowUnexported(*want), - cmp.Comparer(func(x, y Queue) bool { - return cmp.Equal(x, y, queueOpts...) + opts := []comparator.Option{ + comparator.AllowUnexported(*want), + comparator.Comparer(func(x, y Queue) bool { + return comparator.Equal(x, y, queueOpts...) }), - cmp.Comparer(egComparator), - cmp.Comparer(atomicValueComparator), + comparator.Comparer(egComparator), + comparator.Comparer(atomicValueComparator), } - if diff := cmp.Diff(want, got, opts...); diff != "" { + if diff := comparator.Diff(want, got, opts...); diff != "" { return errors.New(diff) } return nil From b474efba2fa180cb79e84278ead91a847cb48c1e Mon Sep 17 00:00:00 2001 From: kevindiu Date: Tue, 4 Aug 2020 17:02:45 +0900 Subject: [PATCH 12/16] fix --- internal/worker/worker_test.go | 49 +++++++++++++--------------------- 1 file changed, 18 insertions(+), 31 deletions(-) diff --git a/internal/worker/worker_test.go b/internal/worker/worker_test.go index c6b6527dcf..e50e405a1e 100644 --- a/internal/worker/worker_test.go +++ b/internal/worker/worker_test.go @@ -215,25 +215,19 @@ func Test_worker_Start(t *testing.T) { return errors.Errorf("got error = %v, want %v", err, w.err) } - ecComparator := func(x, y <-chan error) bool { - if x == nil && y == nil { - return true - } - if x == nil || y == nil || len(x) != len(y) { - return false - } + if w.want == nil && got == nil { + return nil + } + if w.want == nil || got == nil || len(w.want) != len(got) { + return errors.New("want is not equal to got") + } - for e := range x { - if e1 := <-y; !errors.Is(e, e1) { - return false - } + for e := range w.want { + if e1 := <-got; !errors.Is(e, e1) { + return errors.New("want is not equal to got") } - return true } - if !ecComparator(w.want, got) { - return errors.New("error") - } return nil } tests := []test{ @@ -368,24 +362,17 @@ func Test_worker_startJobLoop(t *testing.T) { afterFunc func(args) } defaultCheckFunc := func(w want, got <-chan error) error { - ecComparator := func(x, y <-chan error) bool { - if x == nil && y == nil { - return true - } - if x == nil || y == nil || len(x) != len(y) { - return false - } - - for e := range x { - if e1 := <-y; !errors.Is(e, e1) { - return false - } - } - return true + if w.want == nil && got == nil { + return nil + } + if w.want == nil || got == nil || len(w.want) != len(got) { + return errors.New("want is not equal to got") } - if !ecComparator(w.want, got) { - return errors.New("error") + for e := range w.want { + if e1 := <-got; !errors.Is(e, e1) { + return errors.New("want is not equal to got") + } } return nil } From 6afc93aaa27aba6a401a518eae9641479e35d1f9 Mon Sep 17 00:00:00 2001 From: kevindiu Date: Tue, 4 Aug 2020 18:02:46 +0900 Subject: [PATCH 13/16] fix --- internal/worker/worker_test.go | 85 ++++++++++++++-------------------- 1 file changed, 34 insertions(+), 51 deletions(-) diff --git a/internal/worker/worker_test.go b/internal/worker/worker_test.go index e50e405a1e..abcf9c012f 100644 --- a/internal/worker/worker_test.go +++ b/internal/worker/worker_test.go @@ -97,24 +97,21 @@ func TestNew(t *testing.T) { name: "worker", limitation: 10, eg: errgroup.Get(), - running: func() atomic.Value { - v := new(atomic.Value) + running: func() (v atomic.Value) { v.Store(false) - return *v + return v }(), queue: &queue{ buffer: 10, eg: errgroup.Get(), qcdur: 200 * time.Millisecond, - qLen: func() atomic.Value { - v := new(atomic.Value) + qLen: func() (v atomic.Value) { v.Store(uint64(0)) - return *v + return v }(), - running: func() atomic.Value { - v := new(atomic.Value) + running: func() (v atomic.Value) { v.Store(false) - return *v + return v }(), inCh: make(chan JobFunc, 10), outCh: make(chan JobFunc, 1), @@ -133,25 +130,22 @@ func TestNew(t *testing.T) { want: &worker{ name: "test1", limitation: 10, - running: func() atomic.Value { - v := new(atomic.Value) + running: func() (v atomic.Value) { v.Store(false) - return *v + return v }(), eg: errgroup.Get(), queue: &queue{ buffer: 10, eg: errgroup.Get(), qcdur: 200 * time.Millisecond, - qLen: func() atomic.Value { - v := new(atomic.Value) + qLen: func() (v atomic.Value) { v.Store(uint64(0)) - return *v + return v }(), - running: func() atomic.Value { - v := new(atomic.Value) + running: func() (v atomic.Value) { v.Store(false) - return *v + return v }(), inCh: make(chan JobFunc, 10), outCh: make(chan JobFunc, 1), @@ -242,10 +236,9 @@ func Test_worker_Start(t *testing.T) { name: "worker", limitation: 10, eg: errgroup.Get(), - running: func() atomic.Value { - v := new(atomic.Value) + running: func() (v atomic.Value) { v.Store(false) - return *v + return v }(), queue: NewQueueMock(), }, @@ -267,10 +260,9 @@ func Test_worker_Start(t *testing.T) { args: args{}, fields: fields{ name: "test", - running: func() atomic.Value { - v := new(atomic.Value) + running: func() (v atomic.Value) { v.Store(true) - return *v + return v }(), }, want: want{ @@ -286,10 +278,9 @@ func Test_worker_Start(t *testing.T) { name: "worker", limitation: 10, eg: errgroup.Get(), - running: func() atomic.Value { - v := new(atomic.Value) + running: func() (v atomic.Value) { v.Store(false) - return *v + return v }(), queue: &QueueMock{ StartFunc: func(context.Context) (<-chan error, error) { @@ -622,10 +613,9 @@ func Test_worker_Pause(t *testing.T) { { name: "Pause success", fields: fields{ - running: func() atomic.Value { - v := &atomic.Value{} + running: func() (v atomic.Value) { v.Store(true) - return *v + return v }(), }, checkFunc: func(w want, worker *worker) error { @@ -696,10 +686,9 @@ func Test_worker_Resume(t *testing.T) { { name: "Resume success", fields: fields{ - running: func() atomic.Value { - v := &atomic.Value{} + running: func() (v atomic.Value) { v.Store(false) - return *v + return v }(), }, checkFunc: func(w want, worker *worker) error { @@ -774,10 +763,9 @@ func Test_worker_IsRunning(t *testing.T) { { name: "return true if it is running", fields: fields{ - running: func() atomic.Value { - v := &atomic.Value{} + running: func() (v atomic.Value) { v.Store(true) - return *v + return v }(), }, want: want{ @@ -787,10 +775,9 @@ func Test_worker_IsRunning(t *testing.T) { { name: "return false if it is not running", fields: fields{ - running: func() atomic.Value { - v := &atomic.Value{} + running: func() (v atomic.Value) { v.Store(false) - return *v + return v }(), }, want: want{ @@ -1164,10 +1151,9 @@ func Test_worker_Dispatch(t *testing.T) { }, fields: fields{ name: "test", - running: func() atomic.Value { - v := &atomic.Value{} + running: func() (v atomic.Value) { v.Store(false) - return *v + return v }(), }, want: want{ @@ -1184,10 +1170,9 @@ func Test_worker_Dispatch(t *testing.T) { }, fields: fields{ name: "test", - running: func() atomic.Value { - v := &atomic.Value{} + running: func() (v atomic.Value) { v.Store(true) - return *v + return v }(), queue: &QueueMock{ PushFunc: func(context.Context, JobFunc) error { @@ -1207,10 +1192,9 @@ func Test_worker_Dispatch(t *testing.T) { }, fields: fields{ name: "test", - running: func() atomic.Value { - v := &atomic.Value{} + running: func() (v atomic.Value) { v.Store(true) - return *v + return v }(), queue: &QueueMock{}, }, @@ -1226,10 +1210,9 @@ func Test_worker_Dispatch(t *testing.T) { }, fields: fields{ name: "test", - running: func() atomic.Value { - v := &atomic.Value{} + running: func() (v atomic.Value) { v.Store(true) - return *v + return v }(), queue: &QueueMock{ PushFunc: func(context.Context, JobFunc) error { From ebd41d02ee39422307c40c23e1c7fc6a5d58f316 Mon Sep 17 00:00:00 2001 From: vdaas-ci Date: Tue, 4 Aug 2020 09:36:35 +0000 Subject: [PATCH 14/16] :robot: Update license headers / Format go codes and yaml files Signed-off-by: vdaas-ci --- internal/test/comparator/standard.go | 15 +++++++++++++++ internal/worker/queue_mock_test.go | 15 +++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/internal/test/comparator/standard.go b/internal/test/comparator/standard.go index d428156f95..3346b70f42 100644 --- a/internal/test/comparator/standard.go +++ b/internal/test/comparator/standard.go @@ -1,3 +1,18 @@ +// +// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt ) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// package comparator import ( diff --git a/internal/worker/queue_mock_test.go b/internal/worker/queue_mock_test.go index 3adde1cc5e..f013aa7914 100644 --- a/internal/worker/queue_mock_test.go +++ b/internal/worker/queue_mock_test.go @@ -1,3 +1,18 @@ +// +// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt ) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// package worker import "context" From 2f1b3ec81db95ff2362c49ec995c33aa34484340 Mon Sep 17 00:00:00 2001 From: kevindiu Date: Wed, 5 Aug 2020 10:31:47 +0900 Subject: [PATCH 15/16] fix Makefile --- Makefile | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Makefile b/Makefile index 39beaa8360..343003d168 100644 --- a/Makefile +++ b/Makefile @@ -111,6 +111,7 @@ GO_SOURCES = $(eval GO_SOURCES := $(shell find \ ./pkg \ -not -path './cmd/cli/*' \ -not -path './internal/core/ngt/*' \ + -not -path './internal/test/*' \ -not -path './hack/benchmark/internal/client/ngtd/*' \ -not -path './hack/benchmark/internal/starter/agent/*' \ -not -path './hack/benchmark/internal/starter/external/*' \ @@ -130,6 +131,7 @@ GO_OPTION_SOURCES = $(eval GO_OPTION_SOURCES := $(shell find \ ./pkg \ -not -path './cmd/cli/*' \ -not -path './internal/core/ngt/*' \ + -not -path './internal/test/*' \ -not -path './hack/benchmark/internal/client/ngtd/*' \ -not -path './hack/benchmark/internal/starter/agent/*' \ -not -path './hack/benchmark/internal/starter/external/*' \ From ad07b1b3fcdbed8d5eb345b1b581c6c22c9af11c Mon Sep 17 00:00:00 2001 From: kevindiu Date: Wed, 5 Aug 2020 11:58:28 +0900 Subject: [PATCH 16/16] fix --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 343003d168..26264c1cda 100644 --- a/Makefile +++ b/Makefile @@ -111,7 +111,7 @@ GO_SOURCES = $(eval GO_SOURCES := $(shell find \ ./pkg \ -not -path './cmd/cli/*' \ -not -path './internal/core/ngt/*' \ - -not -path './internal/test/*' \ + -not -path './internal/test/comparator/*' \ -not -path './hack/benchmark/internal/client/ngtd/*' \ -not -path './hack/benchmark/internal/starter/agent/*' \ -not -path './hack/benchmark/internal/starter/external/*' \ @@ -131,7 +131,7 @@ GO_OPTION_SOURCES = $(eval GO_OPTION_SOURCES := $(shell find \ ./pkg \ -not -path './cmd/cli/*' \ -not -path './internal/core/ngt/*' \ - -not -path './internal/test/*' \ + -not -path './internal/test/comparator/*' \ -not -path './hack/benchmark/internal/client/ngtd/*' \ -not -path './hack/benchmark/internal/starter/agent/*' \ -not -path './hack/benchmark/internal/starter/external/*' \