Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Filip and Matej's comments
Browse files Browse the repository at this point in the history
Signed-off-by: haanhvu <haanh6594@gmail.com>
haanhvu committed May 12, 2023

Verified

This commit was signed with the committer’s verified signature.
snyk-bot Snyk bot
1 parent bb3c72c commit 5fc9011
Showing 6 changed files with 350 additions and 243 deletions.
3 changes: 2 additions & 1 deletion pkg/exemplars/tsdb.go
Original file line number Diff line number Diff line change
@@ -18,7 +18,8 @@ import (

// TSDB allows fetching exemplars from a TSDB instance.
type TSDB struct {
db storage.ExemplarQueryable
db storage.ExemplarQueryable

extLabels labels.Labels
mtx sync.RWMutex
}
2 changes: 1 addition & 1 deletion pkg/receive/config.go
Original file line number Diff line number Diff line change
@@ -44,7 +44,7 @@ type HashringConfig struct {
Tenants []string `json:"tenants,omitempty"`
Endpoints []string `json:"endpoints"`
Algorithm HashringAlgorithm `json:"algorithm,omitempty"`
ExternalLabels []string `json:"external_labels,omitempty"`
ExternalLabels map[string]string `json:"external_labels,omitempty"`
}

// ConfigWatcher is able to watch a file containing a hashring configuration
73 changes: 40 additions & 33 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
@@ -567,26 +567,14 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
reg := prometheus.WrapRegistererWith(prometheus.Labels{"tenant": tenantID}, t.reg)
reg = NewUnRegisterer(reg)

lset := labelpb.ExtendSortedLabels(t.labels, labels.FromStrings(t.tenantLabelName, tenantID))
initialLset := labelpb.ExtendSortedLabels(t.labels, labels.FromStrings(t.tenantLabelName, tenantID))

// Extract tenant's external labels from hashring configs.
// If one tenant appears in multiple hashring configs,
// only the external label set from the first hashring config is applied.
hcLoop:
for _, hc := range t.hashringConfigs {
for _, tenant := range hc.Tenants {
if tenant != tenantID {
continue
}

var err error
lset, err = parseAndExtendSortedLabels(lset, hc.ExternalLabels, t.logger)
if err != nil {
return errors.Wrap(err, "failed to parse external labels for tenant "+tenantID)
}

break hcLoop
}
lset, err := t.extractTenantsLabels(tenantID, initialLset)
if err != nil {
t.mtx.Lock()
delete(t.tenants, tenantID)
t.mtx.Unlock()
return err
}

dataDir := t.defaultTenantDataDir(tenantID)
@@ -691,9 +679,9 @@ func (t *MultiTSDB) SetHashringConfig(cfg []HashringConfig) error {

lset := labelpb.ExtendSortedLabels(t.labels, labels.FromStrings(t.tenantLabelName, tenantId))
var err error
lset, err = parseAndExtendSortedLabels(lset, hc.ExternalLabels, t.logger)
lset, err = extendLabels(lset, hc.ExternalLabels, t.logger)
if err != nil {
return errors.Wrap(err, "failed to parse external labels for tenant "+tenantId)
return errors.Wrap(err, "failed to extend external labels for tenant "+tenantId)
}

if t.tenants[tenantId].ship != nil {
@@ -863,29 +851,46 @@ func (u *UnRegisterer) MustRegister(cs ...prometheus.Collector) {
}
}

// Parse, sort, and extend external labels to an initial label set.
// extractTenantsLabels extractes tenant's external labels from hashring configs.
// If one tenant appears in multiple hashring configs,
// only the external label set from the first hashring config is applied.
func (t *MultiTSDB) extractTenantsLabels(tenantID string, initialLset labels.Labels) (labels.Labels, error) {
for _, hc := range t.hashringConfigs {
for _, tenant := range hc.Tenants {
if tenant != tenantID {
continue
}

lset, err := extendLabels(initialLset, hc.ExternalLabels, t.logger)
if err != nil {
return nil, errors.Wrap(err, "failed to extend external labels for tenant "+tenantID)
}
return lset, nil
}
}

return nil, nil
}

// extendLabels extends external labels of the initial label set.
// If an external label shares same name with a label in the initial label set,
// use the label in the initial label set and inform user about it.
func parseAndExtendSortedLabels(labelSet labels.Labels, extend []string, logger log.Logger) (labels.Labels, error) {
func extendLabels(labelSet labels.Labels, extend map[string]string, logger log.Logger) (labels.Labels, error) {
var extendLabels labels.Labels
for _, s := range extend {
parts := strings.SplitN(s, "=", 2)
if len(parts) != 2 {
return nil, errors.Errorf("unrecognized label %q", s)
}
if !model.LabelName.IsValid(model.LabelName(parts[0])) {
return nil, errors.Errorf("unsupported format for label %s", s)
for name, value := range extend {
if !model.LabelName.IsValid(model.LabelName(name)) {
return nil, errors.Errorf("unsupported format for label's name: %s", name)
}
extendLabels = append(extendLabels, labels.Label{Name: parts[0], Value: parts[1]})
extendLabels = append(extendLabels, labels.Label{Name: name, Value: value})
}
sort.Sort(extendLabels)

extendedLabelSet := make(labels.Labels, 0, len(labelSet)+len(extendLabels))
for len(labelSet) > 0 && len(extendLabels) > 0 {
d := strings.Compare(labelSet[0].Name, extendLabels[0].Name)
if d == 0 {
extendedLabelSet = append(extendedLabelSet, labelSet[0])
level.Info(logger).Log("msg", "Duplicate label's name "+extendLabels[0].Name+". Using initial label instead.")
level.Info(logger).Log("msg", "Duplicate label found. Using initial label instead.",
"label's name", extendLabels[0].Name)
labelSet, extendLabels = labelSet[1:], extendLabels[1:]
} else if d < 0 {
extendedLabelSet = append(extendedLabelSet, labelSet[0])
@@ -898,5 +903,7 @@ func parseAndExtendSortedLabels(labelSet labels.Labels, extend []string, logger
extendedLabelSet = append(extendedLabelSet, labelSet...)
extendedLabelSet = append(extendedLabelSet, extendLabels...)

sort.Sort(extendedLabelSet)

return extendedLabelSet, nil
}
506 changes: 302 additions & 204 deletions pkg/receive/receive_test.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pkg/shipper/shipper.go
Original file line number Diff line number Diff line change
@@ -78,14 +78,14 @@ type Shipper struct {
dir string
metrics *metrics
bucket objstore.Bucket
labels func() labels.Labels
source metadata.SourceType

uploadCompacted bool
allowOutOfOrderUploads bool
hashFunc metadata.HashFunc

mtx sync.RWMutex
labels func() labels.Labels
mtx sync.RWMutex
}

// New creates a new shipper that detects new TSDB blocks in dir and uploads them to
5 changes: 3 additions & 2 deletions pkg/store/tsdb.go
Original file line number Diff line number Diff line change
@@ -41,10 +41,11 @@ type TSDBStore struct {
logger log.Logger
db TSDBReader
component component.StoreAPI
extLset labels.Labels
buffers sync.Pool
maxBytesPerFrame int
mtx sync.RWMutex

extLset labels.Labels
mtx sync.RWMutex
}

func RegisterWritableStoreServer(storeSrv storepb.WriteableStoreServer) func(*grpc.Server) {

0 comments on commit 5fc9011

Please sign in to comment.