Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add v4 chunk format to chunks-inspect tool #13057

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 124 additions & 10 deletions cmd/chunks-inspect/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ const (
chunkFormatV1
chunkFormatV2
chunkFormatV3
chunkFormatV4
)

type LokiChunk struct {
Expand Down Expand Up @@ -89,9 +90,15 @@ type LokiBlock struct {
computedChecksum uint32
}

type label struct {
name string
val string
}

type LokiEntry struct {
timestamp int64
line string
timestamp int64
line string
structuredMetadata []label
}

func parseLokiChunk(chunkHeader *ChunkHeader, r io.Reader) (*LokiChunk, error) {
Expand Down Expand Up @@ -122,27 +129,96 @@ func parseLokiChunk(chunkHeader *ChunkHeader, r io.Reader) (*LokiChunk, error) {
return nil, fmt.Errorf("failed to read rawData for Loki chunk into memory: %w", err)
}

//Magic Number
if num := binary.BigEndian.Uint32(data[0:4]); num != 0x012EE56A {
return nil, fmt.Errorf("invalid magic number: %0x", num)
}

// Chunk format is at position 4
f := data[4]

// Compression
compression, err := getCompression(f, data[5])
if err != nil {
return nil, fmt.Errorf("failed to read compression: %w", err)
}

// return &LokiChunk{encoding: compression}, nil

metasOffset := binary.BigEndian.Uint64(data[len(data)-8:])
// This was copied from memchunk.go newByteChunk() as a helper function to use here also
// In format >=v4 there are multiple metadata sections at the end of the chunk file,
// the offset and length of each is stored as the last bytes in the file
readSectionLenAndOffset := func(idx int) (uint64, uint64) {
lenAndOffsetPos := len(data) - (idx * 16)
lenAndOffset := data[lenAndOffsetPos : lenAndOffsetPos+16]
return binary.BigEndian.Uint64(lenAndOffset[:8]), binary.BigEndian.Uint64(lenAndOffset[8:])
}

metadata := data[metasOffset : len(data)-(8+4)]
// Chunk formats v1-v3 had a single metadata section which was stored at the end of the chunk with the last 8 bytes a pointer to the beginning of the metadata table section
metasOffset := uint64(0)
metasLen := uint64(0)
if f < chunkFormatV4 {
// Metadata table is at the end, read the last 8 bytes to get the offset to the start of the table
metasOffset = binary.BigEndian.Uint64(data[len(data)-8:])
// Exclude the last 8 bytes which is the offset, and the 4 bytes before that which is the checksum from the length
metasLen = uint64(len(data)-(8+4)) - metasOffset
} else {
// the chunk metas section is index 1
metasLen, metasOffset = readSectionLenAndOffset(1)
}

metaChecksum := binary.BigEndian.Uint32(data[len(data)-12 : len(data)-8])
// Read metadata
metadata := data[metasOffset : metasOffset+metasLen]

// Read metadata block checksum, the last 4 bytes before the last 8 bytes containing the metadata table start offset
metaChecksum := binary.BigEndian.Uint32(data[metasOffset+metasLen:])
computedMetaChecksum := crc32.Checksum(metadata, castagnoliTable)

// If chunkFormat >= v4 we also need to read the structured metadata section
var structuredMetadataSymbols []string
if f >= chunkFormatV4 {
// the chunk metas section is index 2
structuredMetadataLength, structuredMetadataOffset := readSectionLenAndOffset(2)

//expCRC := binary.BigEndian.Uint32(data[structuredMetadataOffset+structuredMetadataLength:])
//computedMetaChecksum := crc32.Checksum(lb, castagnoliTable)
Comment on lines +183 to +184
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented code, looks like a left over


structuredMetadata := data[structuredMetadataOffset : structuredMetadataOffset+structuredMetadataLength]

// Structured Metadata is "normalized" or "compressed" by storing an index to a string with each log line, and then all the strings in the chunk metadata section
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: By reading the code not sure I understand the "normalized" part of this comment, but maybe I'm just missing context

// Here we need to extract the list of string from the metadata to be used for look ups when decompressing the log lines
// First we read the number of symbols
symbols, n := binary.Uvarint(structuredMetadata)
if n <= 0 {
return nil, fmt.Errorf("failed to read number of labels in structured metadata")
}
structuredMetadata = structuredMetadata[n:]

// Next we need to decompress the list of strings
lr, err := compression.readerFn(bytes.NewReader(structuredMetadata))
decompressed, err := io.ReadAll(lr)
if err != nil {
return nil, err
}

structuredMetadataSymbols = make([]string, 0, symbols)
// Read every label and add it to a map for easy lookup
for i := 0; i < int(symbols); i++ {
// Read the length of the string
strLen, read := binary.Uvarint(decompressed)
if read <= 0 {
return nil, fmt.Errorf("expected to find a length for a structured metadata string but did not find one")
}
decompressed = decompressed[read:]

// Read the bytes of the string and advance the buffer
str := string(decompressed[:strLen])
decompressed = decompressed[strLen:]
// Append to our slice of symbols
structuredMetadataSymbols = append(structuredMetadataSymbols, str)
}
}

blocks, n := binary.Uvarint(metadata)
if n <= 0 {
return nil, fmt.Errorf("failed to read number of blocks")
Expand All @@ -158,13 +234,19 @@ func parseLokiChunk(chunkHeader *ChunkHeader, r io.Reader) (*LokiChunk, error) {

for ix := 0; ix < int(blocks); ix++ {
block := LokiBlock{}
// Read number of entries in block
block.numEntries, metadata, err = readUvarint(err, metadata)
// Read block minimum time
block.minT, metadata, err = readVarint(err, metadata)
// Read block max time
block.maxT, metadata, err = readVarint(err, metadata)
// Read offset to block data
block.dataOffset, metadata, err = readUvarint(err, metadata)
if f >= chunkFormatV3 {
// Read uncompressed size
block.uncompSize, metadata, err = readUvarint(err, metadata)
}
// Read block length
dataLength := uint64(0)
dataLength, metadata, err = readUvarint(err, metadata)

Expand All @@ -175,14 +257,14 @@ func parseLokiChunk(chunkHeader *ChunkHeader, r io.Reader) (*LokiChunk, error) {
block.rawData = data[block.dataOffset : block.dataOffset+dataLength]
block.storedChecksum = binary.BigEndian.Uint32(data[block.dataOffset+dataLength : block.dataOffset+dataLength+4])
block.computedChecksum = crc32.Checksum(block.rawData, castagnoliTable)
block.originalData, block.entries, err = parseLokiBlock(compression, block.rawData)
block.originalData, block.entries, err = parseLokiBlock(f, compression, block.rawData, structuredMetadataSymbols)
lokiChunk.blocks = append(lokiChunk.blocks, block)
}

return lokiChunk, nil
}

func parseLokiBlock(compression Encoding, data []byte) ([]byte, []LokiEntry, error) {
func parseLokiBlock(format byte, compression Encoding, data []byte, symbols []string) ([]byte, []LokiEntry, error) {
r, err := compression.readerFn(bytes.NewReader(data))
if err != nil {
return nil, nil, err
Expand All @@ -208,13 +290,45 @@ func parseLokiBlock(compression Encoding, data []byte) ([]byte, []LokiEntry, err
if len(decompressed) < int(lineLength) {
return origDecompressed, nil, fmt.Errorf("not enough line data, need %d, got %d", lineLength, len(decompressed))
}
line := string(decompressed[0:lineLength])
decompressed = decompressed[lineLength:]

var structuredMetdata []label
if format >= chunkFormatV4 {
// The length of the symbols section is encoded first, but we don't really need it here because everything is a Uvarint
// Read it to advance the buffer to the next element.
_, decompressed, err = readUvarint(err, decompressed)

// Read number of structured metadata pairs
var structuredMetadataPairs uint64
structuredMetadataPairs, decompressed, err = readUvarint(err, decompressed)
if err != nil {
return origDecompressed, nil, err
}
structuredMetdata = make([]label, 0, structuredMetadataPairs)
// Read all the pairs
for i := 0; i < int(structuredMetadataPairs); i++ {
var nameIdx uint64
nameIdx, decompressed, err = readUvarint(err, decompressed)
if err != nil {
return origDecompressed, nil, err
}
var valIdx uint64
valIdx, decompressed, err = readUvarint(err, decompressed)
if err != nil {
return origDecompressed, nil, err
}
lbl := label{name: symbols[nameIdx], val: symbols[valIdx]}
structuredMetdata = append(structuredMetdata, lbl)
}
}

entries = append(entries, LokiEntry{
timestamp: timestamp,
line: string(decompressed[0:lineLength]),
timestamp: timestamp,
line: line,
structuredMetadata: structuredMetdata,
})

decompressed = decompressed[lineLength:]
}

return origDecompressed, entries, nil
Expand Down
6 changes: 5 additions & 1 deletion cmd/chunks-inspect/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,11 @@ func printFile(filename string, blockDetails, printLines, storeBlocks bool) {

if printLines {
for _, l := range b.entries {
fmt.Printf("%v\t%s\n", time.Unix(0, l.timestamp).In(timezone).Format(format), strings.TrimSpace(l.line))
fmt.Printf("TS(%v) LINE(%s) STRUCTURED_METADATA(", time.Unix(0, l.timestamp).In(timezone).Format(format), strings.TrimSpace(l.line))
for _, s := range l.structuredMetadata {
fmt.Printf("%s=%s ", s.name, s.val)
}
fmt.Println(")")
}
}

Expand Down
Loading