-
Notifications
You must be signed in to change notification settings - Fork 1
/
queue.go
134 lines (110 loc) · 2.3 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package conveyor
import (
"log"
"os"
"sync"
)
type Queue struct {
workers int
chunkCount int
chunkSize int64
lineProcessor LineProcessor
*QueueOpts
tasks chan Chunk
result chan ChunkResult
}
type QueueOpts struct {
ChunkResultLogger ChunkResultLogger
Logger *log.Logger
ErrLogger *log.Logger
OverflowScanBuffSize int
}
type QueueResult struct {
Results []ChunkResult
Lines int64
FailedChunks int
}
func NewQueue(chunks []Chunk, workers int, lineProcessor LineProcessor, opts ...*QueueOpts) *Queue {
tasks := make(chan Chunk, len(chunks))
for _, chunk := range chunks {
tasks <- chunk
}
close(tasks)
var opt *QueueOpts
if len(opts) > 0 && opts[0] != nil {
opt = opts[0]
} else {
opt = &QueueOpts{}
}
if opt.ChunkResultLogger == nil {
opt.ChunkResultLogger = DefaultChunkResultLogger
}
if opt.OverflowScanBuffSize == 0 {
opt.OverflowScanBuffSize = DefaultOverflowScanSize
}
if opt.Logger == nil {
opt.Logger = log.New(os.Stdout, "", log.LstdFlags)
}
if opt.ErrLogger == nil {
opt.ErrLogger = log.New(os.Stderr, "", log.LstdFlags)
}
return &Queue{
workers: workers,
tasks: tasks,
result: make(chan ChunkResult, workers),
chunkCount: len(chunks),
chunkSize: int64(chunks[0].Size),
lineProcessor: lineProcessor,
QueueOpts: opt,
}
}
func (queue *Queue) Work() QueueResult {
var (
wg sync.WaitGroup
results = make([]ChunkResult, 0, queue.chunkCount)
)
wg.Add(queue.workers + queue.chunkCount)
for i := 0; i < queue.workers; i++ {
go NewWorker(
i+1,
queue.tasks,
queue.result,
queue.lineProcessor,
queue.chunkSize,
queue.OverflowScanBuffSize,
&wg,
).Work()
}
quit := make(chan int)
go func() {
currentChunkNumber := 0
for {
select {
case result := <-queue.result:
currentChunkNumber++
queue.ChunkResultLogger(queue, result, currentChunkNumber)
results = append(results, result)
wg.Done()
case <-quit:
return
}
}
}()
wg.Wait()
quit <- 0
var (
totalLines int64
failedChunks int
)
for _, result := range results {
totalLines += int64(result.Lines)
if !result.Ok() {
failedChunks++
}
}
return QueueResult{
Results: results,
Lines: totalLines,
FailedChunks: failedChunks,
}
}