Skip to content

chore(worker): remove BeforeRun and AfterRun interface #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,6 @@ func (s *Consumer) BusyWorkers() uint64 {
return s.metric.BusyWorkers()
}

// BeforeRun run script before start worker
func (s *Consumer) BeforeRun() error {
return nil
}

// AfterRun run script after start worker
func (s *Consumer) AfterRun() error {
return nil
}

func (s *Consumer) handle(job Job) error {
// create channel with buffer size 1 to avoid goroutine leak
done := make(chan error, 1)
Expand Down
8 changes: 0 additions & 8 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,6 @@ func (q *Queue) QueueTaskWithTimeout(timeout time.Duration, task TaskFunc) error
}

func (q *Queue) work(task QueuedMessage) {
if err := q.worker.BeforeRun(); err != nil {
q.logger.Error(err)
}

// to handle panic cases from inside the worker
// in such case, we start a new goroutine
defer func() {
Expand All @@ -196,10 +192,6 @@ func (q *Queue) work(task QueuedMessage) {
if err := q.worker.Run(task); err != nil {
q.logger.Errorf("runtime error: %s", err.Error())
}

if err := q.worker.AfterRun(); err != nil {
q.logger.Error(err)
}
}

func (q *Queue) schedule() {
Expand Down
4 changes: 0 additions & 4 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@ package queue

// Worker interface
type Worker interface {
// BeforeRun is called before starting the worker
BeforeRun() error
// Run is called to start the worker
Run(task QueuedMessage) error
// BeforeRun is called after starting the worker
AfterRun() error
// Shutdown is called if stop all worker
Shutdown() error
// Queue to send message in Queue
Expand Down
2 changes: 0 additions & 2 deletions worker_empty.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ var _ Worker = (*emptyWorker)(nil)
// just for unit testing, don't use it.
type emptyWorker struct{}

func (w *emptyWorker) BeforeRun() error { return nil }
func (w *emptyWorker) AfterRun() error { return nil }
func (w *emptyWorker) Run(task QueuedMessage) error { return nil }
func (w *emptyWorker) Shutdown() error { return nil }
func (w *emptyWorker) Queue(task QueuedMessage) error { return nil }
Expand Down
2 changes: 0 additions & 2 deletions worker_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ type messageWorker struct {
messages chan QueuedMessage
}

func (w *messageWorker) BeforeRun() error { return nil }
func (w *messageWorker) AfterRun() error { return nil }
func (w *messageWorker) Run(task QueuedMessage) error {
if string(task.Bytes()) == "panic" {
panic("show panic")
Expand Down
2 changes: 0 additions & 2 deletions worker_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ type taskWorker struct {
messages chan QueuedMessage
}

func (w *taskWorker) BeforeRun() error { return nil }
func (w *taskWorker) AfterRun() error { return nil }
func (w *taskWorker) Run(task QueuedMessage) error {
if v, ok := task.(Job); ok {
if v.Task != nil {
Expand Down