diff --git a/queue.go b/queue.go index 45ad1d1..d5ec5a5 100644 --- a/queue.go +++ b/queue.go @@ -251,6 +251,16 @@ func (q *Queue) start() { tasks := make(chan core.QueuedMessage, 1) for { + // check worker number + q.schedule() + + select { + // wait worker ready + case <-q.ready: + case <-q.quit: + return + } + // request task from queue in background q.routineGroup.Run(func() { for { @@ -289,11 +299,6 @@ func (q *Queue) start() { return } - // check worker number - q.schedule() - // wait worker ready - <-q.ready - // start new task q.metric.IncBusyWorker() q.routineGroup.Run(func() { diff --git a/queue_test.go b/queue_test.go index 3affe42..0bd9232 100644 --- a/queue_test.go +++ b/queue_test.go @@ -24,7 +24,7 @@ func (m mockMessage) Bytes() []byte { return []byte(m.message) } -func TestNewQueue(t *testing.T) { +func TestNewQueueWithZeroWorker(t *testing.T) { controller := gomock.NewController(t) defer controller.Finish() @@ -34,18 +34,44 @@ func TestNewQueue(t *testing.T) { w := mocks.NewMockWorker(controller) w.EXPECT().Shutdown().Return(nil) - w.EXPECT().Request().Return(nil, nil) q, err = NewQueue( WithWorker(w), + WithWorkerCount(0), ) assert.NoError(t, err) assert.NotNil(t, q) q.Start() + time.Sleep(50 * time.Millisecond) assert.Equal(t, 0, q.BusyWorkers()) q.Release() } +func TestNewQueueWithDefaultWorker(t *testing.T) { + controller := gomock.NewController(t) + defer controller.Finish() + + q, err := NewQueue() + assert.Error(t, err) + assert.Nil(t, q) + + w := mocks.NewMockWorker(controller) + m := mocks.NewMockQueuedMessage(controller) + m.EXPECT().Bytes().Return([]byte("test")).AnyTimes() + w.EXPECT().Shutdown().Return(nil) + w.EXPECT().Request().Return(m, nil).AnyTimes() + w.EXPECT().Run(m).Return(nil).AnyTimes() + q, err = NewQueue( + WithWorker(w), + ) + assert.NoError(t, err) + assert.NotNil(t, q) + + q.Start() + q.Release() + assert.Equal(t, 0, q.BusyWorkers()) +} + func TestShtdonwOnce(t *testing.T) { w := &messageWorker{ messages: make(chan core.QueuedMessage, 100), diff --git a/worker_test.go b/worker_test.go index 47095bc..79d3c6a 100644 --- a/worker_test.go +++ b/worker_test.go @@ -3,6 +3,7 @@ package queue import ( "errors" "testing" + "time" "github.com/golang-queue/queue/core" "github.com/golang-queue/queue/mocks" @@ -30,5 +31,6 @@ func TestMockWorkerAndMessage(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, q) q.Start() + time.Sleep(50 * time.Millisecond) q.Release() }