Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add per-tenant time sharding for long out-of-order ingestion #14711

Merged
merged 5 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3712,6 +3712,18 @@ shard_streams:
# CLI flag: -shard-streams.enabled
[enabled: <boolean> | default = true]

# Automatically shard streams by adding a __time_shard__ label, with values
# calculated from the log timestamps divided by MaxChunkAge/2. This allows the
# out-of-order ingestion of very old logs. If both flags are enabled,
# time-based sharding will happen before rate-based sharding.
# CLI flag: -shard-streams.time-sharding-enabled
[time_sharding_enabled: <boolean> | default = false]

# Logs with timestamps that are newer than this value will not be
# time-sharded.
# CLI flag: -shard-streams.time-sharding-ignore-recent
[time_sharding_ignore_recent: <duration> | default = 40m]

# Whether to log sharding streams behavior or not. Not recommended for
# production environments.
# CLI flag: -shard-streams.logging-enabled
Expand Down
137 changes: 125 additions & 12 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math"
"net/http"
"slices"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -58,6 +59,8 @@ const (
ringKey = "distributor"

ringAutoForgetUnhealthyPeriods = 2

timeShardLabel = "__time_shard__"
)

var (
Expand Down Expand Up @@ -120,6 +123,7 @@ type Distributor struct {
services.Service

cfg Config
ingesterCfg ingester.Config
logger log.Logger
clientCfg client.Config
tenantConfigs *runtime.TenantConfigs
Expand Down Expand Up @@ -175,6 +179,7 @@ type Distributor struct {
// New a distributor creates.
func New(
cfg Config,
ingesterCfg ingester.Config,
clientCfg client.Config,
configs *runtime.TenantConfigs,
ingestersRing ring.ReadRing,
Expand Down Expand Up @@ -233,6 +238,7 @@ func New(

d := &Distributor{
cfg: cfg,
ingesterCfg: ingesterCfg,
logger: logger,
clientCfg: clientCfg,
tenantConfigs: configs,
Expand Down Expand Up @@ -434,10 +440,42 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
validatedLineCount := 0

var validationErrors util.GroupedErrors
validationContext := d.validator.getValidationContextForTime(time.Now(), tenantID)

now := time.Now()
validationContext := d.validator.getValidationContextForTime(now, tenantID)
levelDetector := newLevelDetector(validationContext)
shouldDiscoverLevels := levelDetector.shouldDiscoverLogLevels()

shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID)
maybeShardByRate := func(stream logproto.Stream, pushSize int) {
if shardStreamsCfg.Enabled {
streams = append(streams, d.shardStream(stream, pushSize, tenantID)...)
return
}
streams = append(streams, KeyedStream{
HashKey: lokiring.TokenFor(tenantID, stream.Labels),
Stream: stream,
})
}

maybeShardStreams := func(stream logproto.Stream, labels labels.Labels, pushSize int) {
if !shardStreamsCfg.TimeShardingEnabled {
maybeShardByRate(stream, pushSize)
return
}

ignoreRecentFrom := now.Add(-shardStreamsCfg.TimeShardingIgnoreRecent)
streamsByTime, ok := shardStreamByTime(stream, labels, d.ingesterCfg.MaxChunkAge/2, ignoreRecentFrom)
if !ok {
maybeShardByRate(stream, pushSize)
return
}

for _, ts := range streamsByTime {
maybeShardByRate(ts.Stream, ts.linesTotalLen)
}
}

func() {
sp := opentracing.SpanFromContext(ctx)
if sp != nil {
Expand All @@ -446,6 +484,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
sp.LogKV("event", "finished to validate request")
}()
}

for _, stream := range req.Streams {
// Return early if stream does not contain any entries
if len(stream.Entries) == 0 {
Expand Down Expand Up @@ -512,15 +551,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
continue
}

shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID)
if shardStreamsCfg.Enabled {
streams = append(streams, d.shardStream(stream, pushSize, tenantID)...)
} else {
streams = append(streams, KeyedStream{
HashKey: lokiring.TokenFor(tenantID, stream.Labels),
Stream: stream,
})
}
maybeShardStreams(stream, lbs, pushSize)
}
}()

Expand All @@ -534,8 +565,6 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return &logproto.PushResponse{}, validationErr
}

now := time.Now()

if block, until, retStatusCode := d.validator.ShouldBlockIngestion(validationContext, now); block {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validatedLineCount, validatedLineSize, validation.BlockedIngestion)

Expand Down Expand Up @@ -690,6 +719,90 @@ func (d *Distributor) trackDiscardedData(
}
}

type streamWithTimeShard struct {
logproto.Stream
linesTotalLen int
}

// This should shard the stream into multiple sub-streams based on the log
// timestamps, but with no new alocations for the log entries. It will sort them
// in-place in the given stream object (so it may modify it!) and reference
// sub-slices of the same stream.Entries slice.
//
// If the second result is false, it means that either there were no logs in the
// stream, or all of the logs in the stream occurred after the given value of
// ignoreLogsFrom, so there was no need to shard - the original `streams` value
// can be used. However, due to the in-place logs sorting by their timestamp, it
// might still have been reordered.
func shardStreamByTime(stream logproto.Stream, lbls labels.Labels, timeShardLen time.Duration, ignoreLogsFrom time.Time) ([]streamWithTimeShard, bool) {
entries := stream.Entries
entriesLen := len(entries)
if entriesLen == 0 {
return nil, false
}

slices.SortStableFunc(entries, func(a, b logproto.Entry) int { return a.Timestamp.Compare(b.Timestamp) })

// Shortcut to do no work if all of the logs are recent
if entries[0].Timestamp.After(ignoreLogsFrom) {
return nil, false
}

result := make([]streamWithTimeShard, 0, (entries[entriesLen-1].Timestamp.Sub(entries[0].Timestamp)/timeShardLen)+1)
labelBuilder := labels.NewBuilder(lbls)

startIdx := 0
for startIdx < entriesLen && entries[startIdx].Timestamp.Before(ignoreLogsFrom) /* the index is changed below */ {
timeShardStart := entries[startIdx].Timestamp.Truncate(timeShardLen)
timeShardEnd := timeShardStart.Add(timeShardLen)

timeShardCutoff := timeShardEnd
if timeShardCutoff.After(ignoreLogsFrom) {
// If the time_sharding_ignore_recent is in the middle of this
// shard, we need to cut off the logs at that point.
timeShardCutoff = ignoreLogsFrom
}

endIdx := startIdx + 1
linesTotalLen := len(entries[startIdx].Line)
for ; endIdx < entriesLen && entries[endIdx].Timestamp.Before(timeShardCutoff); endIdx++ {
linesTotalLen += len(entries[endIdx].Line)
}

shardLbls := labelBuilder.Set(timeShardLabel, fmt.Sprintf("%d_%d", timeShardStart.Unix(), timeShardEnd.Unix())).Labels()
result = append(result, streamWithTimeShard{
Stream: logproto.Stream{
Labels: shardLbls.String(),
Hash: shardLbls.Hash(),
Entries: stream.Entries[startIdx:endIdx],
},
linesTotalLen: linesTotalLen,
})

startIdx = endIdx
}

if startIdx == entriesLen {
// We do not have any remaining entries
return result, true
}

// Append one last shard with all of the logs without a time shard
logsWithoutTimeShardLen := 0
for i := startIdx; i < entriesLen; i++ {
logsWithoutTimeShardLen += len(entries[i].Line)
}

return append(result, streamWithTimeShard{
Stream: logproto.Stream{
Labels: stream.Labels,
Hash: stream.Hash,
Entries: stream.Entries[startIdx:entriesLen],
},
linesTotalLen: logsWithoutTimeShardLen,
}), true
}

// shardStream shards (divides) the given stream into N smaller streams, where
// N is the sharding size for the given stream. shardSteam returns the smaller
// streams and their associated keys for hashing to ingesters.
Expand Down
Loading
Loading