Skip to content

Commit 73f5262

Browse files
committed
chore(queue): auto scale the task worker
fix #43 Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
1 parent 4c2d646 commit 73f5262

10 files changed

+202
-174
lines changed

.golangci.yml

-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ linters:
88
- depguard
99
- dogsled
1010
- dupl
11-
- errcheck
1211
- exportloopref
1312
- exhaustive
1413
- gochecknoinits

consumer.go

+19-11
Original file line numberDiff line numberDiff line change
@@ -101,26 +101,25 @@ func (s *Consumer) handle(job Job) error {
101101
}
102102

103103
// Run start the worker
104-
func (s *Consumer) Run() error {
104+
func (s *Consumer) Run(task QueuedMessage) error {
105105
// check queue status
106106
select {
107107
case <-s.stop:
108108
return ErrQueueShutdown
109109
default:
110110
}
111111

112-
for task := range s.taskQueue {
113-
var data Job
114-
_ = json.Unmarshal(task.Bytes(), &data)
115-
if v, ok := task.(Job); ok {
116-
if v.Task != nil {
117-
data.Task = v.Task
118-
}
119-
}
120-
if err := s.handle(data); err != nil {
121-
s.logger.Error(err.Error())
112+
var data Job
113+
_ = json.Unmarshal(task.Bytes(), &data)
114+
if v, ok := task.(Job); ok {
115+
if v.Task != nil {
116+
data.Task = v.Task
122117
}
123118
}
119+
if err := s.handle(data); err != nil {
120+
return err
121+
}
122+
124123
return nil
125124
}
126125

@@ -161,6 +160,15 @@ func (s *Consumer) Queue(job QueuedMessage) error {
161160
}
162161
}
163162

163+
func (s *Consumer) Request() (QueuedMessage, error) {
164+
select {
165+
case task := <-s.taskQueue:
166+
return task, nil
167+
default:
168+
return nil, errors.New("no message in queue")
169+
}
170+
}
171+
164172
// NewConsumer for struc
165173
func NewConsumer(opts ...Option) *Consumer {
166174
o := NewOptions(opts...)

consumer_test.go

+17-39
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,16 @@ func TestCustomFuncAndWait(t *testing.T) {
5050
q, err := NewQueue(
5151
WithWorker(w),
5252
WithWorkerCount(2),
53+
WithLogger(NewLogger()),
5354
)
5455
assert.NoError(t, err)
55-
q.Start()
56-
time.Sleep(100 * time.Millisecond)
5756
assert.NoError(t, q.Queue(m))
5857
assert.NoError(t, q.Queue(m))
5958
assert.NoError(t, q.Queue(m))
6059
assert.NoError(t, q.Queue(m))
60+
q.Start()
61+
time.Sleep(100 * time.Millisecond)
62+
assert.Equal(t, 2, int(q.metric.BusyWorkers()))
6163
time.Sleep(600 * time.Millisecond)
6264
q.Shutdown()
6365
q.Wait()
@@ -84,26 +86,6 @@ func TestEnqueueJobAfterShutdown(t *testing.T) {
8486
q.Wait()
8587
}
8688

87-
func TestConsumerNumAfterShutdown(t *testing.T) {
88-
w := NewConsumer()
89-
q, err := NewQueue(
90-
WithWorker(w),
91-
WithWorkerCount(2),
92-
)
93-
assert.NoError(t, err)
94-
q.Start()
95-
q.Start()
96-
time.Sleep(50 * time.Millisecond)
97-
assert.Equal(t, 4, q.Workers())
98-
q.Shutdown()
99-
q.Wait()
100-
assert.Equal(t, 0, q.Workers())
101-
// show queue has been shutdown meesgae
102-
q.Start()
103-
q.Start()
104-
assert.Equal(t, 0, q.Workers())
105-
}
106-
10789
func TestJobReachTimeout(t *testing.T) {
10890
m := mockMessage{
10991
message: "foo",
@@ -131,12 +113,10 @@ func TestJobReachTimeout(t *testing.T) {
131113
WithWorkerCount(2),
132114
)
133115
assert.NoError(t, err)
134-
q.Start()
135-
time.Sleep(50 * time.Millisecond)
136116
assert.NoError(t, q.QueueWithTimeout(30*time.Millisecond, m))
117+
q.Start()
137118
time.Sleep(50 * time.Millisecond)
138-
q.Shutdown()
139-
q.Wait()
119+
q.Release()
140120
}
141121

142122
func TestCancelJobAfterShutdown(t *testing.T) {
@@ -167,11 +147,12 @@ func TestCancelJobAfterShutdown(t *testing.T) {
167147
WithWorkerCount(2),
168148
)
169149
assert.NoError(t, err)
170-
q.Start()
171-
time.Sleep(50 * time.Millisecond)
172150
assert.NoError(t, q.QueueWithTimeout(100*time.Millisecond, m))
173-
q.Shutdown()
174-
q.Wait()
151+
assert.NoError(t, q.QueueWithTimeout(100*time.Millisecond, m))
152+
q.Start()
153+
time.Sleep(10 * time.Millisecond)
154+
assert.Equal(t, 2, int(q.metric.busyWorkers))
155+
q.Release()
175156
}
176157

177158
func TestGoroutineLeak(t *testing.T) {
@@ -205,15 +186,14 @@ func TestGoroutineLeak(t *testing.T) {
205186
WithWorkerCount(10),
206187
)
207188
assert.NoError(t, err)
208-
q.Start()
209-
time.Sleep(50 * time.Millisecond)
210189
for i := 0; i < 500; i++ {
211190
m.message = fmt.Sprintf("foobar: %d", i+1)
212191
assert.NoError(t, q.Queue(m))
213192
}
193+
194+
q.Start()
214195
time.Sleep(2 * time.Second)
215-
q.Shutdown()
216-
q.Wait()
196+
q.Release()
217197
fmt.Println("number of goroutines:", runtime.NumGoroutine())
218198
}
219199

@@ -231,12 +211,10 @@ func TestGoroutinePanic(t *testing.T) {
231211
WithWorkerCount(2),
232212
)
233213
assert.NoError(t, err)
234-
q.Start()
235-
time.Sleep(50 * time.Millisecond)
236214
assert.NoError(t, q.Queue(m))
237-
time.Sleep(50 * time.Millisecond)
238-
q.Shutdown()
239-
q.Wait()
215+
q.Start()
216+
time.Sleep(10 * time.Millisecond)
217+
q.Release()
240218
}
241219

242220
func TestHandleTimeout(t *testing.T) {

pool_test.go

+1-5
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package queue
33
import (
44
"context"
55
"testing"
6-
"time"
76

87
"github.com/stretchr/testify/assert"
98
)
@@ -14,9 +13,6 @@ func TestNewPoolWithQueueTask(t *testing.T) {
1413
rets := make(chan struct{}, taskN)
1514

1615
p := NewPool(totalN)
17-
time.Sleep(time.Millisecond * 50)
18-
assert.Equal(t, totalN, p.Workers())
19-
2016
for i := 0; i < taskN; i++ {
2117
assert.NoError(t, p.QueueTask(func(context.Context) error {
2218
rets <- struct{}{}
@@ -30,5 +26,5 @@ func TestNewPoolWithQueueTask(t *testing.T) {
3026

3127
// shutdown all, and now running worker is 0
3228
p.Release()
33-
assert.Equal(t, 0, p.Workers())
29+
assert.Equal(t, 0, p.BusyWorkers())
3430
}

0 commit comments

Comments
 (0)