Skip to content

Commit

Permalink
Add internal/worker/worker test (#602)
Browse files Browse the repository at this point in the history
* add test

* create comparator

* add test

* remove unused code

* update comment

* do not use comparator

* comment unused code

* fix

* fix

* fix

* revert changes and use alias for go-cmp

* fix

* fix

* 🤖 Update license headers / Format go codes and yaml files

Signed-off-by: vdaas-ci <ci@vdaas.org>

* fix Makefile

* fix

Co-authored-by: vdaas-ci <ci@vdaas.org>
  • Loading branch information
kevindiu and vdaas-ci authored Aug 7, 2020
1 parent 2e2581b commit 0d955e9
Show file tree
Hide file tree
Showing 5 changed files with 725 additions and 462 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ GO_SOURCES = $(eval GO_SOURCES := $(shell find \
./pkg \
-not -path './cmd/cli/*' \
-not -path './internal/core/ngt/*' \
-not -path './internal/test/comparator/*' \
-not -path './hack/benchmark/internal/client/ngtd/*' \
-not -path './hack/benchmark/internal/starter/agent/*' \
-not -path './hack/benchmark/internal/starter/external/*' \
Expand All @@ -130,6 +131,7 @@ GO_OPTION_SOURCES = $(eval GO_OPTION_SOURCES := $(shell find \
./pkg \
-not -path './cmd/cli/*' \
-not -path './internal/core/ngt/*' \
-not -path './internal/test/comparator/*' \
-not -path './hack/benchmark/internal/client/ngtd/*' \
-not -path './hack/benchmark/internal/starter/agent/*' \
-not -path './hack/benchmark/internal/starter/external/*' \
Expand Down
67 changes: 67 additions & 0 deletions internal/test/comparator/standard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//
// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt )
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package comparator

import (
"sync/atomic"

"github.com/google/go-cmp/cmp"
"github.com/vdaas/vald/internal/errgroup"
)

type (
atomicValue = atomic.Value
errorGroup = errgroup.Group

Option = cmp.Option
)

var (
AllowUnexported = cmp.AllowUnexported
Comparer = cmp.Comparer
Diff = cmp.Diff
Equal = cmp.Equal
)

/*
var (
AtomicValue = func(x, y atomicValue) bool {
return reflect.DeepEqual(x.Load(), y.Load())
}
ErrorGroup = func(x, y errorGroup) bool {
return reflect.DeepEqual(x, y)
}
// channel comparator
ErrChannel := func(x, y <-chan error) bool {
if x == nil && y == nil {
return true
}
if x == nil || y == nil || len(x) != len(y) {
return false
}
for e := range x {
if e1 := <-y; !errors.Is(e, e1) {
return false
}
}
return true
}
)
*/
65 changes: 65 additions & 0 deletions internal/worker/queue_mock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//
// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt )
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package worker

import "context"

var (
DefaultStartFunc = func(context.Context) (<-chan error, error) {
return nil, nil
}
DefaultPushFunc = func(context.Context, JobFunc) error {
return nil
}
DefaultPopFunc = func(context.Context) (JobFunc, error) {
return nil, nil
}
DefaultLenFunc = func() uint64 {
return uint64(0)
}
)

type QueueMock struct {
StartFunc func(context.Context) (<-chan error, error)
PushFunc func(context.Context, JobFunc) error
PopFunc func(context.Context) (JobFunc, error)
LenFunc func() uint64
}

func NewQueueMock() Queue {
return &QueueMock{
StartFunc: DefaultStartFunc,
PushFunc: DefaultPushFunc,
PopFunc: DefaultPopFunc,
LenFunc: DefaultLenFunc,
}
}

func (q *QueueMock) Start(ctx context.Context) (<-chan error, error) {
return q.StartFunc(ctx)
}

func (q *QueueMock) Push(ctx context.Context, job JobFunc) error {
return q.PushFunc(ctx, job)
}

func (q *QueueMock) Pop(ctx context.Context) (JobFunc, error) {
return q.PopFunc(ctx)
}

func (q *QueueMock) Len() uint64 {
return q.LenFunc()
}
21 changes: 19 additions & 2 deletions internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ import (
"github.com/vdaas/vald/internal/safety"
)

// JobFunc represents the function of a job that works in the worker.
type JobFunc func(context.Context) error

// Worker represents the worker interface to execute jobs.
type Worker interface {
Start(ctx context.Context) (<-chan error, error)
Pause()
Expand All @@ -54,6 +56,7 @@ type worker struct {
completedCount uint64
}

// New initializes and return the worker, or return initialization error if occurred.
func New(opts ...WorkerOption) (Worker, error) {
w := new(worker)
for _, opt := range append(defaultWorkerOpts, opts...) {
Expand All @@ -78,6 +81,8 @@ func New(opts ...WorkerOption) (Worker, error) {
return w, nil
}

// Start starts execute jobs in the worker queue.
// It returns the error channel that the job return, and the error if start failed.
func (w *worker) Start(ctx context.Context) (<-chan error, error) {
if w.IsRunning() {
return nil, errors.ErrWorkerIsAlreadyRunning(w.Name())
Expand Down Expand Up @@ -126,10 +131,13 @@ func (w *worker) startJobLoop(ctx context.Context) <-chan error {
ech := make(chan error, w.limitation)

w.eg.Go(safety.RecoverFunc(func() (err error) {
defer close(ech)
eg, ctx := errgroup.New(ctx)
eg.Limitation(w.limitation)

limitation := make(chan struct{}, w.limitation)
defer close(limitation)

for {
select {
case <-ctx.Done():
Expand All @@ -153,8 +161,7 @@ func (w *worker) startJobLoop(ctx context.Context) <-chan error {
if job != nil {
eg.Go(safety.RecoverFunc(func() (err error) {
defer atomic.AddUint64(&w.completedCount, 1)
err = job(ctx)
if err != nil {
if err = job(ctx); err != nil {
log.Debugf("an error occurred while executing a job: %s", err)
ech <- err
}
Expand All @@ -176,34 +183,44 @@ func (w *worker) startJobLoop(ctx context.Context) <-chan error {
return ech
}

// Pause stops allowing new job to be dispatched to the worker.
func (w *worker) Pause() {
w.running.Store(false)
}

// Resume resumes to allow new jobs to be dispatched to the worker.
func (w *worker) Resume() {
w.running.Store(true)
}

// IsRunning returns if the worker is running or not.
func (w *worker) IsRunning() bool {
return w.running.Load().(bool)
}

// Name returns the worker name.
func (w *worker) Name() string {
return w.name
}

// Len returns the length of the worker queue.
func (w *worker) Len() uint64 {
return w.queue.Len()
}

// TotalRequested returns the number of jobs that dispatched to the worker.
func (w *worker) TotalRequested() uint64 {
return atomic.LoadUint64(&w.requestedCount)
}

// TotalCompleted returns the number of completed job.
func (w *worker) TotalCompleted() uint64 {
return atomic.LoadUint64(&w.completedCount)
}

// Dispatch dispatches the job to the worker and waiting for the worker to process it.
// The job error is pushed to the error channel that Start() return.
// This function will return an error if the job cannot be dispatch to the worker queue, or the worker is not running.
func (w *worker) Dispatch(ctx context.Context, f JobFunc) error {
ctx, span := trace.StartSpan(ctx, "vald/internal/worker/Worker.Dispatch")
defer func() {
Expand Down
Loading

0 comments on commit 0d955e9

Please sign in to comment.