Skip to content

Commit 50b29ac

Browse files
yeya24rubywtl
authored andcommitted
ingester storage shard
1 parent 818aa71 commit 50b29ac

File tree

1 file changed

+48
-2
lines changed

1 file changed

+48
-2
lines changed

pkg/ingester/ingester.go

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"path/filepath"
1414
"runtime"
1515
"slices"
16+
"strconv"
1617
"strings"
1718
"sync"
1819
"time"
@@ -2211,6 +2212,42 @@ func createUserStats(db *userTSDB, activeSeriesMetricsEnabled bool) UserStats {
22112212

22122213
const queryStreamBatchMessageSize = 1 * 1024 * 1024
22132214

2215+
const CortexIngesterShardLabel = "__cortex_ingester_shard__"
2216+
2217+
type IngesterShardInfo struct {
2218+
ShardCount int
2219+
ShardIndex int
2220+
}
2221+
2222+
func ExtractIngesterShardInfo(matchers []*labels.Matcher) ([]*labels.Matcher, *IngesterShardInfo, error) {
2223+
r := make([]*labels.Matcher, 0, len(matchers))
2224+
2225+
shardInfo := IngesterShardInfo{}
2226+
for _, matcher := range matchers {
2227+
if matcher.Name == CortexIngesterShardLabel && matcher.Type == labels.MatchEqual {
2228+
parts := strings.Split(matcher.Value, "_")
2229+
if len(parts) != 2 {
2230+
return r, nil, fmt.Errorf("invalid shard info: %s", matcher.Value)
2231+
}
2232+
2233+
shardCount, err := strconv.Atoi(parts[0])
2234+
if err != nil {
2235+
return r, nil, fmt.Errorf("invalid shard count: %s", parts[0])
2236+
}
2237+
shardIndex, err := strconv.Atoi(parts[1])
2238+
if err != nil {
2239+
return r, nil, fmt.Errorf("invalid shard index: %s", parts[1])
2240+
}
2241+
shardInfo.ShardCount = shardCount
2242+
shardInfo.ShardIndex = shardIndex
2243+
} else {
2244+
r = append(r, matcher)
2245+
}
2246+
}
2247+
2248+
return r, &shardInfo, nil
2249+
}
2250+
22142251
// QueryStream implements service.IngesterServer
22152252
// Streams metrics from a TSDB. This implements the client.IngesterServer interface
22162253
func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) (err error) {
@@ -2243,6 +2280,10 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
22432280
if err != nil {
22442281
return err
22452282
}
2283+
matchers, shardInfo, err := ExtractIngesterShardInfo(matchers)
2284+
if err != nil {
2285+
return err
2286+
}
22462287

22472288
defer shardMatcher.Close()
22482289

@@ -2262,7 +2303,7 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
22622303
numSeries := 0
22632304
totalDataBytes := 0
22642305
numChunks := 0
2265-
numSeries, numSamples, totalDataBytes, numChunks, err = i.queryStreamChunks(ctx, db, int64(from), int64(through), matchers, shardMatcher, stream)
2306+
numSeries, numSamples, totalDataBytes, numChunks, err = i.queryStreamChunks(ctx, db, int64(from), int64(through), matchers, shardMatcher, shardInfo, stream)
22662307

22672308
if err != nil {
22682309
return err
@@ -2302,7 +2343,7 @@ func (i *Ingester) trackInflightQueryRequest() (func(), error) {
23022343
}
23032344

23042345
// queryStreamChunks streams metrics from a TSDB. This implements the client.IngesterServer interface
2305-
func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, sm *storepb.ShardMatcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples, totalBatchSizeBytes, numChunks int, _ error) {
2346+
func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, sm *storepb.ShardMatcher, shardInfo *IngesterShardInfo, stream client.Ingester_QueryStreamServer) (numSeries, numSamples, totalBatchSizeBytes, numChunks int, _ error) {
23062347
q, err := db.ChunkQuerier(from, through)
23072348
if err != nil {
23082349
return 0, 0, 0, 0, err
@@ -2318,6 +2359,10 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
23182359
End: through,
23192360
DisableTrimming: i.cfg.DisableChunkTrimming,
23202361
}
2362+
if shardInfo.ShardCount > 0 {
2363+
hints.ShardCount = uint64(shardInfo.ShardCount)
2364+
hints.ShardIndex = uint64(shardInfo.ShardIndex)
2365+
}
23212366
// It's not required to return sorted series because series are sorted by the Cortex querier.
23222367
ss := q.Select(ctx, false, hints, matchers...)
23232368
c()
@@ -2609,6 +2654,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
26092654
BlockChunkQuerierFunc: i.blockChunkQuerierFunc(userID),
26102655
NewCompactorFunc: newCompactorFunc,
26112656
BlockQuerierFunc: i.blockQuerierFunc(),
2657+
EnableSharding: true,
26122658
}, nil)
26132659
if err != nil {
26142660
return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir)

0 commit comments

Comments
 (0)