Skip to content

Commit 1e176ac

Browse files
replayAergonus
authored andcommitted
keep import position in whisper reader
fixes issue grafana#727
1 parent 4d6184f commit 1e176ac

File tree

4 files changed

+169
-6
lines changed

4 files changed

+169
-6
lines changed

cmd/mt-whisper-importer-reader/main.go

+29-6
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@ var (
8888
0,
8989
"Only import after the specified timestamp",
9090
)
91+
positionFile = flag.String(
92+
"position-file",
93+
"",
94+
"file to store position and load position from",
95+
)
9196
verbose = flag.Bool(
9297
"verbose",
9398
false,
@@ -114,19 +119,28 @@ func main() {
114119
panic(fmt.Sprintf("Error when parsing schemas file: %q", err))
115120
}
116121

122+
var pos *posTracker
123+
if len(*positionFile) > 0 {
124+
pos, err = NewPositionKeeper(*positionFile)
125+
if err != nil {
126+
log.Fatalf("Error instantiating position keeper: %s", err)
127+
}
128+
defer pos.Close()
129+
}
130+
117131
fileChan := make(chan string)
118132

119133
wg := &sync.WaitGroup{}
120134
wg.Add(*threads)
121135
for i := 0; i < *threads; i++ {
122-
go processFromChan(fileChan, wg)
136+
go processFromChan(pos, fileChan, wg)
123137
}
124138

125-
getFileListIntoChan(fileChan)
139+
getFileListIntoChan(pos, fileChan)
126140
wg.Wait()
127141
}
128142

129-
func processFromChan(files chan string, wg *sync.WaitGroup) {
143+
func processFromChan(pos *posTracker, files chan string, wg *sync.WaitGroup) {
130144
tr := &http.Transport{
131145
TLSClientConfig: &tls.Config{InsecureSkipVerify: *insecureSSL},
132146
}
@@ -194,6 +208,9 @@ func processFromChan(files chan string, wg *sync.WaitGroup) {
194208
resp.Body.Close()
195209
}
196210

211+
if pos != nil {
212+
pos.Done(file)
213+
}
197214
processed := atomic.AddUint32(&processedCount, 1)
198215
if processed%100 == 0 {
199216
skipped := atomic.LoadUint32(&skippedCount)
@@ -362,7 +379,7 @@ func encodedChunksFromPoints(points []whisper.Point, intervalIn, chunkSpan uint3
362379
}
363380

364381
// scan a directory and feed the list of whisper files relative to base into the given channel
365-
func getFileListIntoChan(fileChan chan string) {
382+
func getFileListIntoChan(pos *posTracker, fileChan chan string) {
366383
filepath.Walk(
367384
*whisperDirectory,
368385
func(path string, info os.FileInfo, err error) error {
@@ -375,9 +392,15 @@ func getFileListIntoChan(fileChan chan string) {
375392
atomic.AddUint32(&skippedCount, 1)
376393
return nil
377394
}
378-
if len(path) >= 4 && path[len(path)-4:] == ".wsp" {
379-
fileChan <- path
395+
if len(path) < 4 || path[len(path)-4:] != ".wsp" {
396+
return nil
380397
}
398+
if pos != nil && pos.IsDone(path) {
399+
log.Debugf("Skipping file %s because it was listed as already done", path)
400+
return nil
401+
}
402+
403+
fileChan <- path
381404
return nil
382405
},
383406
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package main
2+
3+
import (
4+
"bufio"
5+
"fmt"
6+
"io"
7+
"os"
8+
"sync"
9+
)
10+
11+
type posTracker struct {
12+
sync.Mutex
13+
file string
14+
fd *os.File
15+
completedMap sync.Map
16+
wg sync.WaitGroup
17+
}
18+
19+
func NewPositionKeeper(file string) (*posTracker, error) {
20+
p := &posTracker{file: file}
21+
22+
fd, err := os.Open(file)
23+
if err != nil {
24+
if !os.IsNotExist(err) {
25+
return nil, err
26+
}
27+
} else {
28+
reader := bufio.NewReader(fd)
29+
var path string
30+
for {
31+
line, isPrefix, err := reader.ReadLine()
32+
if err != nil {
33+
if err == io.EOF {
34+
break
35+
}
36+
return nil, err
37+
}
38+
39+
path += string(line)
40+
if isPrefix {
41+
continue
42+
} else {
43+
p.completedMap.Store(path, struct{}{})
44+
path = ""
45+
}
46+
}
47+
48+
fd.Close()
49+
}
50+
51+
p.fd, err = os.OpenFile(file, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
52+
if err != nil {
53+
return nil, err
54+
}
55+
56+
return p, nil
57+
}
58+
59+
func (p *posTracker) IsDone(path string) bool {
60+
_, ok := p.completedMap.Load(path)
61+
return ok
62+
}
63+
64+
func (p *posTracker) Done(path string) {
65+
p.completedMap.Store(path, struct{}{})
66+
p.wg.Add(1)
67+
go func() {
68+
p.Lock()
69+
defer p.Unlock()
70+
p.fd.WriteString(fmt.Sprintf("%s\n", path))
71+
p.fd.Sync()
72+
p.wg.Done()
73+
}()
74+
}
75+
76+
func (p *posTracker) Close() {
77+
p.wg.Wait()
78+
p.fd.Close()
79+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package main
2+
3+
import (
4+
"os"
5+
"testing"
6+
)
7+
8+
func TestPositionKeeper(t *testing.T) {
9+
filePath := "/tmp/positionKeeperTest"
10+
clearFile := func() { os.Remove(filePath) }
11+
clearFile()
12+
defer clearFile()
13+
14+
testValue1 := "file1"
15+
testValue2 := "file2"
16+
testValue3 := "file3"
17+
18+
p1, err := NewPositionKeeper(filePath)
19+
if err != nil {
20+
t.Fatalf("Error instantiating position keeper: %s", err)
21+
}
22+
p1.Done(testValue1)
23+
if !p1.IsDone(testValue1) {
24+
t.Fatalf("Expected %s to be done, but it was not", testValue1)
25+
}
26+
if p1.IsDone(testValue2) {
27+
t.Fatalf("Expected %s to not be done, but it was", testValue2)
28+
}
29+
p1.Done(testValue2)
30+
if !p1.IsDone(testValue2) {
31+
t.Fatalf("Expected %s to be done, but it was not", testValue2)
32+
}
33+
p1.Close()
34+
35+
// read the file into new instance of position keeper
36+
p2, err := NewPositionKeeper(filePath)
37+
if err != nil {
38+
t.Fatalf("Error instantiating position keeper: %s", err)
39+
}
40+
if !p2.IsDone(testValue1) || !p2.IsDone(testValue2) {
41+
t.Fatalf("Expected %s and %s to be done, but it was not", testValue1, testValue2)
42+
}
43+
44+
if p2.IsDone(testValue3) {
45+
t.Fatalf("Expected %s to not be done, but it was", testValue3)
46+
}
47+
48+
p2.Done(testValue3)
49+
p2.Close()
50+
51+
// read the file into new instance of position keeper
52+
p3, err := NewPositionKeeper(filePath)
53+
if err != nil {
54+
t.Fatalf("Error instantiating position keeper: %s", err)
55+
}
56+
if !p3.IsDone(testValue1) || !p3.IsDone(testValue2) || !p3.IsDone(testValue3) {
57+
t.Fatalf("Expected %s, %s and %s to be done, but it was not", testValue1, testValue2, testValue3)
58+
}
59+
}

docs/tools.md

+2
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,8 @@ Usage of ./mt-whisper-importer-reader:
471471
Prefix to prepend before every metric name, should include the '.' if necessary
472472
-orgid int
473473
Organization ID the data belongs to (default 1)
474+
-position-file string
475+
file to store position and load position from
474476
-threads int
475477
Number of workers threads to process and convert .wsp files (default 10)
476478
-verbose

0 commit comments

Comments
 (0)