-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtask.go
88 lines (70 loc) · 1.5 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package eagle
import (
"log"
"sync"
)
type keyFileLoader struct {
db *DB
fileChan chan *keyLog
err error
wg *sync.WaitGroup
nProcessed int
nTombstones int
maxSeqNumber uint64
}
func (task *keyFileLoader) loadFile(file *keyLog) error {
it := file.Iterator()
nProcessed := 0
nTombstones := 0
for it.HasNext() {
e, _, err := it.Next()
if err != nil {
log.Println("error ", file.fileId, file.size, it.readOffset)
return err
}
nProcessed++
ptr := getPointerFromEntry(e, file.fileId)
var prevInfo *recordInfo
rInfo := &recordInfo{seqNumber: e.SeqNumber, ptr: ptr}
if e.ValueSize == 0 {
rInfo = rInfo.markDeleted(e.SeqNumber)
nTombstones++
}
prevInfo, _ = task.db.table.Update(e.Key, rInfo)
if prevInfo != nil {
task.db.markPreviousAsStale(file.fileId, recordSize(len(e.Key), int(e.ValueSize)))
}
if e.SeqNumber > task.maxSeqNumber {
task.maxSeqNumber = e.SeqNumber
}
}
task.nProcessed += nProcessed
task.nTombstones += nTombstones
return nil
}
const chanSize = 100
func newKeyFileProcessTask(db *DB, wg *sync.WaitGroup) *keyFileLoader {
return &keyFileLoader{
db: db,
wg: wg,
err: nil,
fileChan: make(chan *keyLog, chanSize),
maxSeqNumber: 0,
}
}
func (task *keyFileLoader) start() {
go func() {
defer task.wg.Done()
for {
file := <-task.fileChan
if file == nil { // quit signal
return
}
err := task.loadFile(file)
if err != nil {
task.err = err
return
}
}
}()
}