Skip to content

feat(v2): metadata string interning #3744

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 115 additions & 105 deletions api/gen/proto/go/metastore/v1/types.pb.go

Large diffs are not rendered by default.

625 changes: 343 additions & 282 deletions api/gen/proto/go/metastore/v1/types_vtproto.pb.go

Large diffs are not rendered by default.

33 changes: 19 additions & 14 deletions api/metastore/v1/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,35 @@ syntax = "proto3";

package metastore.v1;

import "types/v1/types.proto";

message BlockList {
string tenant = 1;
uint32 shard = 2;
repeated string blocks = 3;
}

message BlockMeta {
uint64 format_version = 1;
uint32 format_version = 1;
// Block ID is a unique identifier for the block.
// This is the only field that is not included into
// the string table.
string id = 2;
int64 min_time = 3;
int64 max_time = 4;
uint32 shard = 5;
uint32 compaction_level = 6;
// Optional. Empty if compaction level is 0.
string tenant_id = 7;
repeated Dataset datasets = 8;
int32 tenant = 3;
uint32 shard = 4;
uint32 compaction_level = 5;
int64 min_time = 6;
int64 max_time = 7;
int32 created_by = 8;
uint64 size = 9;
string created_by = 10;
repeated Dataset datasets = 10;
// String table contains strings of the block.
// By convention, the first string is always an empty string.
repeated string string_table = 11;
}

message Dataset {
string tenant_id = 1;
string name = 2;
int32 tenant = 1;
int32 name = 2;
int64 min_time = 3;
int64 max_time = 4;

Expand All @@ -44,6 +48,7 @@ message Dataset {

// TODO: Delete. Use labels instead.
// Profile types present in the tenant service data.
repeated string profile_types = 7;
repeated types.v1.Labels labels = 8;
repeated int32 profile_types = 7;
// Length prefixed label key-value pairs.
repeated int32 labels = 8;
}
62 changes: 38 additions & 24 deletions api/openapiv2/gen/phlare.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -602,19 +602,17 @@
"type": "object",
"properties": {
"formatVersion": {
"type": "string",
"format": "uint64"
"type": "integer",
"format": "int64"
},
"id": {
"type": "string"
},
"minTime": {
"type": "string",
"format": "int64"
"description": "Block ID is a unique identifier for the block.\nThis is the only field that is not included into\nthe string table."
},
"maxTime": {
"type": "string",
"format": "int64"
"tenant": {
"type": "integer",
"format": "int32",
"description": "Optional. Empty if compaction level is 0."
},
"shard": {
"type": "integer",
Expand All @@ -624,9 +622,21 @@
"type": "integer",
"format": "int64"
},
"tenantId": {
"minTime": {
"type": "string",
"description": "Optional. Empty if compaction level is 0."
"format": "int64"
},
"maxTime": {
"type": "string",
"format": "int64"
},
"createdBy": {
"type": "integer",
"format": "int32"
},
"size": {
"type": "string",
"format": "uint64"
},
"datasets": {
"type": "array",
Expand All @@ -635,12 +645,12 @@
"$ref": "#/definitions/v1Dataset"
}
},
"size": {
"type": "string",
"format": "uint64"
},
"createdBy": {
"type": "string"
"stringTable": {
"type": "array",
"items": {
"type": "string"
},
"description": "String table contains strings of the block.\nBy convention, the first string is always an empty string."
}
}
},
Expand Down Expand Up @@ -824,11 +834,13 @@
"v1Dataset": {
"type": "object",
"properties": {
"tenantId": {
"type": "string"
"tenant": {
"type": "integer",
"format": "int32"
},
"name": {
"type": "string"
"type": "integer",
"format": "int32"
},
"minTime": {
"type": "string",
Expand All @@ -854,16 +866,18 @@
"profileTypes": {
"type": "array",
"items": {
"type": "string"
"type": "integer",
"format": "int32"
},
"description": "TODO: Delete. Use labels instead.\nProfile types present in the tenant service data."
},
"labels": {
"type": "array",
"items": {
"type": "object",
"$ref": "#/definitions/v1Labels"
}
"type": "integer",
"format": "int32"
},
"description": "Length prefixed label key-value pairs."
}
}
},
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ require (
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/valyala/bytebufferpool v1.0.0
github.com/xlab/treeprint v1.2.0
go.etcd.io/bbolt v1.3.10
go.etcd.io/bbolt v1.3.11
go.opentelemetry.io/otel v1.30.0
go.opentelemetry.io/proto/otlp v1.1.0
go.uber.org/atomic v1.11.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -760,8 +760,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.etcd.io/bbolt v1.3.10 h1:+BqfJTcCzTItrop8mq/lbzL8wSGtj94UO/3U31shqG0=
go.etcd.io/bbolt v1.3.10/go.mod h1:bK3UQLPJZly7IlNmV7uVHJDxfe5aK9Ll93e/74Y9oEQ=
go.etcd.io/bbolt v1.3.11 h1:yGEzV1wPz2yVCLsD8ZAiGHhHVlczyC9d1rP43/VCRJ0=
go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I=
go.etcd.io/etcd/api/v3 v3.5.7 h1:sbcmosSVesNrWOJ58ZQFitHMdncusIifYcrBfwrlJSY=
go.etcd.io/etcd/api/v3 v3.5.7/go.mod h1:9qew1gCdDDLu+VwmeG+iFpL+QlpHTo7iubavdVDgCAA=
go.etcd.io/etcd/client/pkg/v3 v3.5.7 h1:y3kf5Gbp4e4q7egZdn5T7W9TSHUvkClN6u+Rq9mEOmg=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package block

import (
"context"
"crypto/rand"
"fmt"
"os"
"path/filepath"
Expand All @@ -12,7 +11,6 @@ import (
"sync"

"github.com/grafana/dskit/multierror"
"github.com/oklog/ulid"
"github.com/parquet-go/parquet-go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage"
Expand Down Expand Up @@ -115,18 +113,17 @@ func PlanCompaction(objects Objects) ([]*CompactionPlan, error) {
}
level++

// Assuming that the first block in the job is the oldest one.
timestamp := ulid.MustParse(r.meta.Id).Time()
g := NewULIDGenerator(objects)
m := make(map[string]*CompactionPlan)
for _, obj := range objects {
for _, s := range obj.meta.Datasets {
tm, ok := m[s.TenantId]
tm, ok := m[obj.meta.StringTable[s.Tenant]]
if !ok {
tm = newBlockCompaction(timestamp, s.TenantId, r.meta.Shard, level)
m[s.TenantId] = tm
tm = newBlockCompaction(g.ULID().String(), obj.meta.StringTable[s.Tenant], r.meta.Shard, level)
m[obj.meta.StringTable[s.Tenant]] = tm
}
sm := tm.addDataset(s)
// Bind objects to datasets.
sm := tm.addDataset(obj.meta, s)
sm.append(NewDataset(s, obj))
}
}
Expand All @@ -135,44 +132,49 @@ func PlanCompaction(objects Objects) ([]*CompactionPlan, error) {
for _, tm := range m {
ordered = append(ordered, tm)
slices.SortFunc(tm.datasets, func(a, b *datasetCompaction) int {
return strings.Compare(a.meta.Name, b.meta.Name)
return strings.Compare(a.name, b.name)
})
}
slices.SortFunc(ordered, func(a, b *CompactionPlan) int {
return strings.Compare(a.tenantID, b.tenantID)
return strings.Compare(a.tenant, b.tenant)
})

return ordered, nil
}

type CompactionPlan struct {
tenantID string
datasetMap map[string]*datasetCompaction
tenant string
path string
datasetMap map[int32]*datasetCompaction
datasets []*datasetCompaction
meta *metastorev1.BlockMeta
strings *MetadataStrings
}

func newBlockCompaction(unixMilli uint64, tenantID string, shard uint32, compactionLevel uint32) *CompactionPlan {
return &CompactionPlan{
tenantID: tenantID,
datasetMap: make(map[string]*datasetCompaction),
meta: &metastorev1.BlockMeta{
FormatVersion: 1,
// TODO(kolesnikovae): Make it deterministic?
Id: ulid.MustNew(unixMilli, rand.Reader).String(),
TenantId: tenantID,
Shard: shard,
CompactionLevel: compactionLevel,
Datasets: nil,
MinTime: 0,
MaxTime: 0,
Size: 0,
},
}
func newBlockCompaction(
id string,
tenant string,
shard uint32,
compactionLevel uint32,
) *CompactionPlan {
p := &CompactionPlan{
tenant: tenant,
datasetMap: make(map[int32]*datasetCompaction),
strings: NewMetadataStringTable(),
}
p.path = BuildObjectPath(tenant, shard, compactionLevel, id)
p.meta = &metastorev1.BlockMeta{
FormatVersion: 1,
Id: id,
Tenant: p.strings.Put(tenant),
Shard: shard,
CompactionLevel: compactionLevel,
}
return p
}

func (b *CompactionPlan) Compact(ctx context.Context, dst objstore.Bucket, tmpdir string) (m *metastorev1.BlockMeta, err error) {
w := NewBlockWriter(dst, ObjectPath(b.meta), tmpdir)
w := NewBlockWriter(dst, b.path, tmpdir)
defer func() {
err = multierror.New(err, w.Close()).Err()
}()
Expand All @@ -187,14 +189,17 @@ func (b *CompactionPlan) Compact(ctx context.Context, dst objstore.Bucket, tmpdi
return nil, fmt.Errorf("flushing block writer: %w", err)
}
b.meta.Size = w.Offset()
b.meta.StringTable = b.strings.Strings
return b.meta, nil
}

func (b *CompactionPlan) addDataset(s *metastorev1.Dataset) *datasetCompaction {
sm, ok := b.datasetMap[s.Name]
func (b *CompactionPlan) addDataset(md *metastorev1.BlockMeta, s *metastorev1.Dataset) *datasetCompaction {
name := b.strings.Put(md.StringTable[s.Name])
tenant := b.strings.Put(md.StringTable[s.Tenant])
sm, ok := b.datasetMap[name]
if !ok {
sm = newDatasetCompaction(s.TenantId, s.Name)
b.datasetMap[s.Name] = sm
sm = b.newDatasetCompaction(tenant, name)
b.datasetMap[name] = sm
b.datasets = append(b.datasets, sm)
}
if b.meta.MinTime == 0 || s.MinTime < b.meta.MinTime {
Expand All @@ -207,8 +212,12 @@ func (b *CompactionPlan) addDataset(s *metastorev1.Dataset) *datasetCompaction {
}

type datasetCompaction struct {
// Dataset name.
name string
parent *CompactionPlan

meta *metastorev1.Dataset
ptypes map[string]struct{}
ptypes map[int32]struct{}
path string // Set at open.

datasets []*Dataset
Expand All @@ -224,12 +233,14 @@ type datasetCompaction struct {
flushOnce sync.Once
}

func newDatasetCompaction(tenantID, name string) *datasetCompaction {
func (b *CompactionPlan) newDatasetCompaction(tenant, name int32) *datasetCompaction {
return &datasetCompaction{
ptypes: make(map[string]struct{}, 10),
name: b.strings.Strings[name],
parent: b,
ptypes: make(map[int32]struct{}, 10),
meta: &metastorev1.Dataset{
TenantId: tenantID,
Name: name,
Tenant: tenant,
Name: name,
// Updated at append.
MinTime: 0,
MaxTime: 0,
Expand All @@ -250,7 +261,8 @@ func (m *datasetCompaction) append(s *Dataset) {
m.meta.MaxTime = s.meta.MaxTime
}
for _, pt := range s.meta.ProfileTypes {
m.ptypes[pt] = struct{}{}
ptn := m.parent.strings.Put(s.obj.meta.StringTable[pt])
m.ptypes[ptn] = struct{}{}
}
}

Expand Down Expand Up @@ -388,11 +400,10 @@ func (m *datasetCompaction) writeTo(w *Writer) (err error) {
return err
}
m.meta.Size = w.Offset() - off
m.meta.ProfileTypes = make([]string, 0, len(m.ptypes))
m.meta.ProfileTypes = make([]int32, 0, len(m.ptypes))
for pt := range m.ptypes {
m.meta.ProfileTypes = append(m.meta.ProfileTypes, pt)
}
sort.Strings(m.meta.ProfileTypes)
return nil
}

Expand Down
Loading
Loading