This repository was archived by the owner on Feb 24, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathimport_tsdb.go
106 lines (83 loc) · 2.5 KB
/
import_tsdb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package main
import (
gokitlog "github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"log"
"math"
"os"
)
func ImportTSDB(blockPath string) (Block, error) {
logger := gokitlog.NewLogfmtLogger(os.Stderr)
var newBlock Block
block, err := tsdb.OpenBlock(logger, blockPath, chunkenc.NewPool())
if err != nil {
log.Println("Failed to open Block", err)
return Block{}, errors.Wrap(err, "tsdb.OpenBlock")
}
metaInfo := block.Meta()
newBlock.NumSeries = metaInfo.Stats.NumSeries
newBlock.NumChunks = metaInfo.Stats.NumChunks
newBlock.NumSamples = metaInfo.Stats.NumSamples
newBlock.NumTombstones = metaInfo.Stats.NumTombstones
newBlock.EndingTime = metaInfo.MaxTime
newBlock.StartingTime = metaInfo.MinTime
newBlock.ULID = metaInfo.ULID.String()
indexr, err := block.Index()
if err != nil {
return Block{}, errors.Wrap(err, "block.Index")
}
defer indexr.Close()
newBlock.LabelNames, err = indexr.LabelNames()
newBlock.Symbols = indexr.Symbols()
newBlock.Postings, err = indexr.Postings("", "")
chunkr, err := block.Chunks()
if err != nil {
return Block{}, errors.Wrap(err, "block.Chunks")
}
var it chunkenc.Iterator
for newBlock.Postings.Next() {
var customTimeSeries TimeSeries
ref := newBlock.Postings.At()
lset := labels.Labels{}
chks := []chunks.Meta{}
if err := indexr.Series(ref, &lset, &chks); err != nil {
return Block{}, errors.Wrap(err, "index.Series")
}
customTimeSeries.Ref = ref
customTimeSeries.Labels = lset
for _, meta := range chks {
chunk, err := chunkr.Chunk(meta.Ref)
if err != nil {
return Block{}, errors.Wrap(err, "chunkr.Chunk")
}
var customChunk Chunk
customChunk.Ref = meta.Ref
customChunk.NumSamples = chunk.NumSamples()
it := chunk.Iterator(it)
for it.Next() {
t, v := it.At()
if math.IsNaN(v) {
continue
}
if math.IsInf(v, -1) || math.IsInf(v, 1) {
continue
}
customChunk.TimeStamps = append(customChunk.TimeStamps, t)
customChunk.Values = append(customChunk.Values, v)
}
if it.Err() != nil {
return Block{}, errors.Wrap(err, "iterator.Err")
}
if len(customChunk.TimeStamps) == 0 {
continue
}
customTimeSeries.Chunks = append(customTimeSeries.Chunks, customChunk)
}
newBlock.CustomTimeSeries = append(newBlock.CustomTimeSeries, customTimeSeries)
}
return newBlock, nil
}