Skip to content

Commit

Permalink
[WIP] move output to main
Browse files Browse the repository at this point in the history
  • Loading branch information
agebhar1 committed Feb 4, 2024
1 parent 9e56ef9 commit d803d44
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 275 deletions.
250 changes: 16 additions & 234 deletions cmd/chunks-inspect/loki.go
Original file line number Diff line number Diff line change
@@ -1,104 +1,21 @@
package main

import (
"bytes"
"compress/gzip"
"context"
"encoding/binary"
"fmt"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logql/log"
"github.com/pierrec/lz4/v4"
"hash/crc32"
"io"
"time"

"github.com/golang/snappy"
"github.com/klauspost/compress/flate"
"github.com/klauspost/compress/zstd"
)

type Encoding struct {
code int
name string
readerFn func(io.Reader) (io.Reader, error)
}

func (e Encoding) String() string {
return e.name
}

// The table gets initialized with sync.Once but may still cause a race
// with any other use of the crc32 package anywhere. Thus we initialize it
// before.
var castagnoliTable *crc32.Table

func init() {
castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
}

var (
encNone = Encoding{code: 0, name: "none", readerFn: func(reader io.Reader) (io.Reader, error) { return reader, nil }}
encGZIP = Encoding{code: 1, name: "gzip", readerFn: func(reader io.Reader) (io.Reader, error) { return gzip.NewReader(reader) }}
encDumb = Encoding{code: 2, name: "dumb", readerFn: func(reader io.Reader) (io.Reader, error) { return reader, nil }}
encLZ4 = Encoding{code: 3, name: "lz4", readerFn: func(reader io.Reader) (io.Reader, error) { return lz4.NewReader(reader), nil }}
encSnappy = Encoding{code: 4, name: "snappy", readerFn: func(reader io.Reader) (io.Reader, error) { return snappy.NewReader(reader), nil }}
enclz4_256k = Encoding{code: 5, name: "lz4-256k", readerFn: func(reader io.Reader) (io.Reader, error) { return lz4.NewReader(reader), nil }}
enclz4_1M = Encoding{code: 6, name: "lz4-1M", readerFn: func(reader io.Reader) (io.Reader, error) { return lz4.NewReader(reader), nil }}
enclz4_4M = Encoding{code: 7, name: "lz4-4M", readerFn: func(reader io.Reader) (io.Reader, error) { return lz4.NewReader(reader), nil }}
encFlate = Encoding{code: 8, name: "flate", readerFn: func(reader io.Reader) (io.Reader, error) { return flate.NewReader(reader), nil }}
encZstd = Encoding{code: 9, name: "zstd", readerFn: func(reader io.Reader) (io.Reader, error) {
r, err := zstd.NewReader(reader)
if err != nil {
panic(err)
}
return r, nil
}}

Encodings = []Encoding{encNone, encGZIP, encDumb, encLZ4, encSnappy, enclz4_256k, enclz4_1M, enclz4_4M, encFlate, encZstd}
)

const (
_ byte = iota
chunkFormatV1
chunkFormatV2
chunkFormatV3
)

type LokiChunk struct {
format byte
encoding Encoding

blocks []LokiBlock

metadataChecksum uint32
computedMetadataChecksum uint32
}

type LokiBlock struct {
numEntries uint64 // number of log lines in this block
minT int64 // minimum timestamp, unix nanoseconds
maxT int64 // max timestamp, unix nanoseconds

dataOffset uint64 // offset in the data-part of chunks file

uncompSize uint64 // size of the original data uncompressed

rawData []byte // data as stored in chunk file, compressed
originalData []byte // data uncompressed from rawData

// parsed rawData
entries []LokiEntry
storedChecksum uint32
computedChecksum uint32
}

type LokiEntry struct {
timestamp int64
line string
format byte
encoding chunkenc.Encoding
compressedSize int
uncompressedSize int
blocks []chunkenc.Block
}

func parseLokiChunk(chunkHeader *ChunkHeader, r io.Reader, from, through time.Time) (*LokiChunk, error) {
func parseLokiChunk(chunkHeader *ChunkHeader, r io.Reader) (*LokiChunk, error) {

/* Loki Chunk Format
Expand Down Expand Up @@ -127,25 +44,12 @@ func parseLokiChunk(chunkHeader *ChunkHeader, r io.Reader, from, through time.Ti
}

c, _ := chunkenc.NewByteChunk(data, 0, 0)
bs := c.Blocks(from, through)
fmt.Println("Blocks: ", len(bs))

pipeline := log.NewNoopPipeline()
for idx, block := range bs {
fmt.Println("Block : ", idx)
fmt.Println("MinTime: ", time.Unix(0, block.MinTime()).UTC())
fmt.Println("MaxTime: ", time.Unix(0, block.MaxTime()).UTC())
fmt.Println("Offset : ", block.Offset())
iter := block.Iterator(context.Background(), pipeline.ForStream(nil))
for iter.Next() {
e := iter.Entry()
fmt.Println(e.Timestamp.UTC(), " ", e.Line)
for _, meta := range e.StructuredMetadata {
fmt.Println("\t", meta.Name, "=", meta.Value)
}
}
}
encoding := c.Encoding()
compressedSize := c.CompressedSize()
uncompressedSize := c.UncompressedSize()
from, through := c.Bounds()

bs := c.Blocks(from, through)
err := c.Close()
if err != nil {
return nil, err
Expand All @@ -158,135 +62,13 @@ func parseLokiChunk(chunkHeader *ChunkHeader, r io.Reader, from, through time.Ti
// Chunk format is at position 4
f := data[4]

compression, err := getCompression(f, data[5])
if err != nil {
return nil, fmt.Errorf("failed to read compression: %w", err)
}

// return &LokiChunk{encoding: compression}, nil

metasOffset := binary.BigEndian.Uint64(data[len(data)-8:])

metadata := data[metasOffset : len(data)-(8+4)]

metaChecksum := binary.BigEndian.Uint32(data[len(data)-12 : len(data)-8])
computedMetaChecksum := crc32.Checksum(metadata, castagnoliTable)

blocks, n := binary.Uvarint(metadata)
if n <= 0 {
return nil, fmt.Errorf("failed to read number of blocks")
}
metadata = metadata[n:]

lokiChunk := &LokiChunk{
format: f,
encoding: compression,
metadataChecksum: metaChecksum,
computedMetadataChecksum: computedMetaChecksum,
}

for ix := 0; ix < int(blocks); ix++ {
block := LokiBlock{}
block.numEntries, metadata, err = readUvarint(err, metadata)
block.minT, metadata, err = readVarint(err, metadata)
block.maxT, metadata, err = readVarint(err, metadata)
block.dataOffset, metadata, err = readUvarint(err, metadata)
if f >= chunkFormatV3 {
block.uncompSize, metadata, err = readUvarint(err, metadata)
}
dataLength := uint64(0)
dataLength, metadata, err = readUvarint(err, metadata)

if err != nil {
return nil, err
}

block.rawData = data[block.dataOffset : block.dataOffset+dataLength]
block.storedChecksum = binary.BigEndian.Uint32(data[block.dataOffset+dataLength : block.dataOffset+dataLength+4])
block.computedChecksum = crc32.Checksum(block.rawData, castagnoliTable)
block.originalData, block.entries, err = parseLokiBlock(compression, block.rawData)
lokiChunk.blocks = append(lokiChunk.blocks, block)
format: f,
encoding: encoding,
compressedSize: compressedSize,
uncompressedSize: uncompressedSize,
blocks: bs,
}

return lokiChunk, nil
}

func parseLokiBlock(compression Encoding, data []byte) ([]byte, []LokiEntry, error) {
r, err := compression.readerFn(bytes.NewReader(data))
if err != nil {
return nil, nil, err
}

decompressed, err := io.ReadAll(r)
origDecompressed := decompressed
if err != nil {
return nil, nil, err
}

entries := []LokiEntry(nil)
for len(decompressed) > 0 {
var timestamp int64
var lineLength uint64

timestamp, decompressed, err = readVarint(err, decompressed)
lineLength, decompressed, err = readUvarint(err, decompressed)
if err != nil {
return origDecompressed, nil, err
}

if len(decompressed) < int(lineLength) {
return origDecompressed, nil, fmt.Errorf("not enough line data, need %d, got %d", lineLength, len(decompressed))
}

entries = append(entries, LokiEntry{
timestamp: timestamp,
line: string(decompressed[0:lineLength]),
})

decompressed = decompressed[lineLength:]
}

return origDecompressed, entries, nil
}

func readVarint(prevErr error, buf []byte) (int64, []byte, error) {
if prevErr != nil {
return 0, buf, prevErr
}

val, n := binary.Varint(buf)
if n <= 0 {
return 0, nil, fmt.Errorf("varint: %d", n)
}
return val, buf[n:], nil
}

func readUvarint(prevErr error, buf []byte) (uint64, []byte, error) {
if prevErr != nil {
return 0, buf, prevErr
}

val, n := binary.Uvarint(buf)
if n <= 0 {
return 0, nil, fmt.Errorf("varint: %d", n)
}
return val, buf[n:], nil
}

func getCompression(format byte, code byte) (Encoding, error) {
if format == chunkFormatV1 {
return encGZIP, nil
}

if format >= chunkFormatV2 {
for _, e := range Encodings {
if e.code == int(code) {
return e, nil
}
}

return encNone, fmt.Errorf("unknown encoding: %d", code)
}

return encNone, fmt.Errorf("unknown format: %d", format)
}
64 changes: 23 additions & 41 deletions cmd/chunks-inspect/main.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package main

import (
"crypto/sha256"
"context"
"flag"
"fmt"
logql "github.com/grafana/loki/pkg/logql/log"
"log"
"os"
"strings"
Expand Down Expand Up @@ -33,12 +34,6 @@ func printFile(filename string, blockDetails, printLines, storeBlocks bool) {
}
defer f.Close()

si, err := f.Stat()
if err != nil {
log.Println("failed to stat file", err)
return
}

h, err := DecodeHeader(f)
if err != nil {
log.Printf("%s: %v", filename, err)
Expand All @@ -59,66 +54,53 @@ func printFile(filename string, blockDetails, printLines, storeBlocks bool) {
fmt.Println("\t", l.Name, "=", l.Value)
}

lokiChunk, err := parseLokiChunk(h, f, from, through)
lokiChunk, err := parseLokiChunk(h, f)
if err != nil {
log.Printf("%s: %v", filename, err)
return
}

fmt.Println("Format (Version):", lokiChunk.format)
fmt.Println("Encoding:", lokiChunk.encoding)
fmt.Print("Blocks Metadata Checksum: ", fmt.Sprintf("%08x", lokiChunk.metadataChecksum))
if lokiChunk.metadataChecksum == lokiChunk.computedMetadataChecksum {
fmt.Println(" OK")
} else {
fmt.Println(" BAD, computed checksum:", fmt.Sprintf("%08x", lokiChunk.computedMetadataChecksum))
}
if blockDetails {
fmt.Println("Found", len(lokiChunk.blocks), "block(s)")
} else {
fmt.Println("Found", len(lokiChunk.blocks), "block(s), use -b to show block details")
}
if len(lokiChunk.blocks) > 0 {
fmt.Println("Minimum time (from first block):", time.Unix(0, lokiChunk.blocks[0].minT).In(timezone).Format(format))
fmt.Println("Maximum time (from last block):", time.Unix(0, lokiChunk.blocks[len(lokiChunk.blocks)-1].maxT).In(timezone).Format(format))
}

if blockDetails {
fmt.Println()
}

totalSize := 0

pipeline := logql.NewNoopPipeline()
for ix, b := range lokiChunk.blocks {
if blockDetails {
cksum := ""
if b.storedChecksum == b.computedChecksum {
cksum = fmt.Sprintf("%08x OK", b.storedChecksum)
} else {
cksum = fmt.Sprintf("%08x BAD (computed: %08x)", b.storedChecksum, b.computedChecksum)
}
fmt.Printf("Block %4d: position: %8d, original length: %6d (stored: %6d, ratio: %.2f), minT: %v maxT: %v, checksum: %s\n",
ix, b.dataOffset, len(b.originalData), len(b.rawData), float64(len(b.originalData))/float64(len(b.rawData)),
time.Unix(0, b.minT).In(timezone).Format(format), time.Unix(0, b.maxT).In(timezone).Format(format),
cksum)
fmt.Printf("Block %4d: digest compressed: %02x, original: %02x\n", ix, sha256.Sum256(b.rawData), sha256.Sum256(b.originalData))
fmt.Printf("Block %4d: position: %8d, minT: %v maxT: %v\n",
ix, b.Offset(),
time.Unix(0, b.MinTime()).In(timezone).Format(format),
time.Unix(0, b.MaxTime()).In(timezone).Format(format),
)
}

totalSize += len(b.originalData)

if printLines {
for _, l := range b.entries {
fmt.Printf("%v\t%s\n", time.Unix(0, l.timestamp).In(timezone).Format(format), strings.TrimSpace(l.line))
iter := b.Iterator(context.Background(), pipeline.ForStream(nil))
for iter.Next() {
e := iter.Entry()
fmt.Printf("%v\t%s\n", e.Timestamp.In(timezone).Format(format), strings.TrimSpace(e.Line))
if e.StructuredMetadata != nil {
fmt.Println("Structured Metadata:")
for _, meta := range e.StructuredMetadata {
fmt.Println("\t", meta.Name, "=", meta.Value)
}
}
}
}

if storeBlocks {
writeBlockToFile(b.rawData, ix, fmt.Sprintf("%s.block.%d", filename, ix))
writeBlockToFile(b.originalData, ix, fmt.Sprintf("%s.original.%d", filename, ix))
}
//if storeBlocks {
// writeBlockToFile(b.rawData, ix, fmt.Sprintf("%s.block.%d", filename, ix))
// writeBlockToFile(b.originalData, ix, fmt.Sprintf("%s.original.%d", filename, ix))
//}
}

fmt.Println("Total size of original data:", totalSize, "file size:", si.Size(), "ratio:", fmt.Sprintf("%0.3g", float64(totalSize)/float64(si.Size())))
}

func writeBlockToFile(data []byte, blockIndex int, filename string) {
Expand Down

0 comments on commit d803d44

Please sign in to comment.