Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
kevindiu committed Aug 3, 2020
1 parent 4d7920a commit 88b9da3
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (
"github.com/vdaas/vald/internal/safety"
)

// JobFunc represent the function of a job that work in the worker.
// JobFunc represents the function of a job that works in the worker.
type JobFunc func(context.Context) error

// Worker represent the worker interface to execute jobs.
// Worker represents the worker interface to execute jobs.
type Worker interface {
Start(ctx context.Context) (<-chan error, error)
Pause()
Expand All @@ -56,7 +56,7 @@ type worker struct {
completedCount uint64
}

// New initialize and return the worker, or return initialization error if occurred.
// New initializes and return the worker, or return initialization error if occurred.
func New(opts ...WorkerOption) (Worker, error) {
w := new(worker)
for _, opt := range append(defaultWorkerOpts, opts...) {
Expand All @@ -81,7 +81,7 @@ func New(opts ...WorkerOption) (Worker, error) {
return w, nil
}

// Start start execute jobs in the worker queue. It returns the error channel that the job return, and the error if start failed.
// Start starts execute jobs in the worker queue. It returns the error channel that the job return, and the error if start failed.
func (w *worker) Start(ctx context.Context) (<-chan error, error) {
if w.IsRunning() {
return nil, errors.ErrWorkerIsAlreadyRunning(w.Name())
Expand Down Expand Up @@ -184,44 +184,44 @@ func (w *worker) startJobLoop(ctx context.Context) <-chan error {
return ech
}

// Pause stop allowing new job to be dispatched to the worker.
// Pause stops allowing new job to be dispatched to the worker.
func (w *worker) Pause() {
w.running.Store(false)
}

// Resume resume to allow new job to be dispatched to the worker.
// Resume resumes to allow new jobs to be dispatched to the worker.
func (w *worker) Resume() {
w.running.Store(true)
}

// IsRunning return if the worker is running or not.
// IsRunning returns if the worker is running or not.
func (w *worker) IsRunning() bool {
return w.running.Load().(bool)
}

// Name return the worker name.
// Name returns the worker name.
func (w *worker) Name() string {
return w.name
}

// Len return the length of the worker queue.
// Len returns the length of the worker queue.
func (w *worker) Len() uint64 {
return w.queue.Len()
}

// TotalRequested return the number of job that dispatched to the worker.
// TotalRequested returns the number of jobs that dispatched to the worker.
func (w *worker) TotalRequested() uint64 {
return atomic.LoadUint64(&w.requestedCount)
}

// TotalCompleted return the number of completed job.
// TotalCompleted returns the number of completed job.
func (w *worker) TotalCompleted() uint64 {
return atomic.LoadUint64(&w.completedCount)
}

// Dispatch dispatch the job to the worker and waiting for worker to process it.
// The job error is push to the error channel that Start() return.
// This function will return error if the job cannot be dispatch to the worker queue, or the worker is not running.
// Dispatch dispatches the job to the worker and waiting for the worker to process it.
// The job error is pushed to the error channel that Start() return.
// This function will return an error if the job cannot be dispatch to the worker queue, or the worker is not running.
func (w *worker) Dispatch(ctx context.Context, f JobFunc) error {
ctx, span := trace.StartSpan(ctx, "vald/internal/worker/Worker.Dispatch")
defer func() {
Expand Down

0 comments on commit 88b9da3

Please sign in to comment.