-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathqueue.go
138 lines (115 loc) · 2.66 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package ant
import (
"context"
"io"
"sync"
)
// Queue represents a URL queue.
//
// A queue must be safe to use from multiple goroutines.
type Queue interface {
// Enqueue enqueues the given set of URLs.
//
// The method returns an io.EOF if the queue was
// closed and a context error if the context was
// canceled.
//
// Any other error will be treated as a critical
// error and will be porpagated.
Enqueue(ctx context.Context, urls URLs) error
// Dequeue dequeues a URL.
//
// The method returns a URL or io.EOF error if
// the queue was stopped.
//
// The method blocks until a URL is available or
// until the queue is closed.
Dequeue(ctx context.Context) (*URL, error)
// Done acknowledges a URL.
//
// When a URL has been handled by the engine the method
// is called with the URL.
Done(ctx context.Context, url *URL) error
// Wait blocks until the queue is closed.
//
// When the engine encounters an error, or there are
// no more URLs to handle the method should unblock.
Wait()
// Close closes the queue.
//
// The method blocks until the queue is closed
// any queued URLs are discarded.
Close(context.Context) error
}
// MemoryQueue implements a naive in-memory queue.
type memoryQueue struct {
pending URLs
cond *sync.Cond
stopped bool
wg *sync.WaitGroup
}
// MemoryQueue returns a new memory queue.
func MemoryQueue(size int) Queue {
return &memoryQueue{
pending: make(URLs, 0, size),
cond: sync.NewCond(&sync.RWMutex{}),
stopped: false,
wg: &sync.WaitGroup{},
}
}
// Enqueue implementation.
func (mq *memoryQueue) Enqueue(ctx context.Context, urls URLs) error {
if len(urls) == 0 {
return nil
}
mq.cond.L.Lock()
defer mq.cond.L.Unlock()
if mq.stopped {
return io.EOF
}
if ctx.Err() != nil {
return ctx.Err()
}
mq.pending = append(mq.pending, urls...)
mq.wg.Add(len(urls))
mq.cond.Broadcast()
return nil
}
// Dequeue implementation.
func (mq *memoryQueue) Dequeue(ctx context.Context) (*URL, error) {
mq.cond.L.Lock()
defer mq.cond.L.Unlock()
for len(mq.pending) == 0 {
if mq.stopped {
return nil, io.EOF
}
if err := ctx.Err(); err != nil {
return nil, err
}
mq.cond.Wait()
}
url := mq.pending[0]
mq.pending = mq.pending[1:]
return url, nil
}
// Done implementation.
func (mq *memoryQueue) Done(_ context.Context, _ *URL) error {
mq.wg.Done()
return nil
}
// Wait implementation.
func (mq *memoryQueue) Wait() {
mq.wg.Wait()
}
// Close implementation.
func (mq *memoryQueue) Close(_ context.Context) error {
mq.cond.L.Lock()
defer mq.cond.L.Unlock()
for range mq.pending {
mq.wg.Done()
}
mq.stopped = true
mq.pending = mq.pending[:0]
mq.cond.Broadcast()
return nil
}