Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduced lock file to shuffle sharding grouper #4805

Merged
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
dc65d95
Introduced lock file to shuffle sharding grouper
alexqyle Jul 27, 2022
41e1f08
let redis cache logs log with context (#4785)
mengmengy Jul 22, 2022
20e6502
DoBatch preference to 4xx if error (#4783)
danielblando Jul 26, 2022
735e10d
Updated CHANGELOG and ordered imports
alexqyle Jul 28, 2022
a74e8de
Fixed lint and removed groupCallLimit
alexqyle Jul 29, 2022
91e381a
Changed lock file to json format and make sure planner would not pick…
alexqyle Aug 2, 2022
f45290c
Fix updateCachedShippedBlocks - new thanos (#4806)
alanprot Jul 28, 2022
9c3efbc
Join memberlist on starting with no retry (#4804)
danielblando Jul 28, 2022
275881a
Fix alertmanager log message (#4801)
damnever Jul 29, 2022
d6d9349
Grafana Cloud uses Mimir now, so remove Grafana Cloud as hosted servi…
alvinlin123 Aug 1, 2022
4714e83
Merge branch 'master' into lock-file-for-cortex-compactor
alexqyle Aug 2, 2022
7cf4862
Created block_locker to handle all block lock file operations. Added …
alexqyle Aug 3, 2022
835251a
Moved lock file heart beat into planner and refined planner logic to …
alexqyle Aug 4, 2022
3e5f65d
Updated documents
alexqyle Aug 22, 2022
00ea6da
Merge branch 'master' into lock-file-for-cortex-compactor
alexqyle Aug 22, 2022
a5a47b7
Merge branch 'master' into lock-file-for-cortex-compactor
alanprot Aug 30, 2022
d353004
Return concurrency number of group. Use ticker for lock file heart beat
alexqyle Aug 31, 2022
b7b0663
Renamed lock file to be visit marker file
alexqyle Sep 1, 2022
ad5963b
Fixed unit test
alexqyle Sep 1, 2022
c47eb5c
Make sure visited block can be picked by compactor visited it
alexqyle Sep 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
* [ENHANCEMENT] Ring: DoBatch prioritize 4xx errors when failing. #4783
* [ENHANCEMENT] Cortex now built with Go 1.18. #4829
* [ENHANCEMENT] Ingester: Prevent ingesters to become unhealthy during wall replay. #4847
* [ENHANCEMENT] Compactor: Introduced visit marker file for blocks so blocks are under compaction will not be picked up by another compactor. #4805
* [FEATURE] Compactor: Added `-compactor.block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction. #4784
* [FEATURE] Compactor: Added -compactor.blocks-fetch-concurrency` allowing to configure number of go routines for blocks during compaction. #4787
* [FEATURE] Compactor: Added configurations for Azure MSI in blocks-storage, ruler-storage and alertmanager-storage. #4818
Expand Down
10 changes: 10 additions & 0 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,4 +254,14 @@ compactor:
# Timeout for waiting on compactor to become ACTIVE in the ring.
# CLI flag: -compactor.ring.wait-active-instance-timeout
[wait_active_instance_timeout: <duration> | default = 10m]

# How long block visit marker file should be considered as expired and able to
# be picked up by compactor again.
# CLI flag: -compactor.block-visit-marker-timeout
[block_visit_marker_timeout: <duration> | default = 5m]

# How frequently block visit marker file should be updated duration
# compaction.
# CLI flag: -compactor.block-visit-marker-file-update-interval
[block_visit_marker_file_update_interval: <duration> | default = 1m]
```
9 changes: 9 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3846,6 +3846,15 @@ sharding_ring:
# Timeout for waiting on compactor to become ACTIVE in the ring.
# CLI flag: -compactor.ring.wait-active-instance-timeout
[wait_active_instance_timeout: <duration> | default = 10m]

# How long block visit marker file should be considered as expired and able to
# be picked up by compactor again.
# CLI flag: -compactor.block-visit-marker-timeout
[block_visit_marker_timeout: <duration> | default = 5m]

# How frequently block visit marker file should be updated duration compaction.
# CLI flag: -compactor.block-visit-marker-file-update-interval
[block_visit_marker_file_update_interval: <duration> | default = 1m]
```

### `store_gateway_config`
Expand Down
116 changes: 116 additions & 0 deletions pkg/compactor/block_visit_marker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package compactor

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"path"
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"
)

const BlockVisitMarkerFile = "block.visit"

var (
ErrorBlockVisitMarkerNotFound = errors.New("block visit marker not found")
ErrorUnmarshalBlockVisitMarker = errors.New("unmarshal block visit marker JSON")
)

type BlockVisitMarker struct {
CompactorID string `json:"compactorID"`
VisitTime time.Time `json:"visitTime"`
}

func (b *BlockVisitMarker) isVisited(blockVisitMarkerTimeout time.Duration) bool {
return time.Now().Before(b.VisitTime.Add(blockVisitMarkerTimeout))
}

func (b *BlockVisitMarker) isVisitedByCompactor(blockVisitMarkerTimeout time.Duration, compactorID string) bool {
return time.Now().Before(b.VisitTime.Add(blockVisitMarkerTimeout)) && b.CompactorID == compactorID
}

func ReadBlockVisitMarker(ctx context.Context, bkt objstore.Bucket, blockID string, blockVisitMarkerReadFailed prometheus.Counter) (*BlockVisitMarker, error) {
visitMarkerFile := path.Join(blockID, BlockVisitMarkerFile)
visitMarkerFileReader, err := bkt.Get(ctx, visitMarkerFile)
if err != nil {
if bkt.IsObjNotFoundErr(err) {
return nil, errors.Wrapf(ErrorBlockVisitMarkerNotFound, "block visit marker file: %s", visitMarkerFile)
}
blockVisitMarkerReadFailed.Inc()
return nil, errors.Wrapf(err, "get block visit marker file: %s", visitMarkerFile)
}
b, err := ioutil.ReadAll(visitMarkerFileReader)
if err != nil {
blockVisitMarkerReadFailed.Inc()
return nil, errors.Wrapf(err, "read block visit marker file: %s", visitMarkerFile)
}
blockVisitMarker := BlockVisitMarker{}
err = json.Unmarshal(b, &blockVisitMarker)
if err != nil {
blockVisitMarkerReadFailed.Inc()
return nil, errors.Wrapf(ErrorUnmarshalBlockVisitMarker, "block visit marker file: %s, error: %v", visitMarkerFile, err.Error())
}
return &blockVisitMarker, nil
}

func UpdateBlockVisitMarker(ctx context.Context, bkt objstore.Bucket, blockID string, compactorID string, blockVisitMarkerWriteFailed prometheus.Counter) error {
blockVisitMarkerFilePath := path.Join(blockID, BlockVisitMarkerFile)
blockVisitMarker := BlockVisitMarker{
CompactorID: compactorID,
VisitTime: time.Now(),
}
visitMarkerFileContent, err := json.Marshal(blockVisitMarker)
if err != nil {
blockVisitMarkerWriteFailed.Inc()
return err
}
err = bkt.Upload(ctx, blockVisitMarkerFilePath, bytes.NewReader(visitMarkerFileContent))
if err != nil {
blockVisitMarkerWriteFailed.Inc()
return err
}
return nil
}

func markBlocksVisited(ctx context.Context, bkt objstore.Bucket, logger log.Logger, blocks []*metadata.Meta, compactorID string, blockVisitMarkerWriteFailed prometheus.Counter) {
for _, block := range blocks {
blockID := block.ULID.String()
err := UpdateBlockVisitMarker(ctx, bkt, blockID, compactorID, blockVisitMarkerWriteFailed)
if err != nil {
level.Error(logger).Log("msg", "unable to upsert visit marker file content for block", "blockID", blockID, "err", err)
}
}
}

func markBlocksVisitedHeartBeat(ctx context.Context, bkt objstore.Bucket, logger log.Logger, blocks []*metadata.Meta, compactorID string, blockVisitMarkerFileUpdateInterval time.Duration, blockVisitMarkerWriteFailed prometheus.Counter) {
var blockIds []string
for _, block := range blocks {
blockIds = append(blockIds, block.ULID.String())
}
blocksInfo := strings.Join(blockIds, ",")
level.Info(logger).Log("msg", fmt.Sprintf("start heart beat for blocks: %s", blocksInfo))
ticker := time.NewTicker(blockVisitMarkerFileUpdateInterval)
defer ticker.Stop()
heartBeat:
for {
level.Debug(logger).Log("msg", fmt.Sprintf("heart beat for blocks: %s", blocksInfo))
markBlocksVisited(ctx, bkt, logger, blocks, compactorID, blockVisitMarkerWriteFailed)

select {
case <-ctx.Done():
break heartBeat
case <-ticker.C:
continue
}
}
level.Info(logger).Log("msg", fmt.Sprintf("stop heart beat for blocks: %s", blocksInfo))
}
48 changes: 40 additions & 8 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var (
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")

DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string) compact.Grouper {
DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ prometheus.Counter, _ prometheus.Counter, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string) compact.Grouper {
return compact.NewDefaultGrouper(
logger,
bkt,
Expand All @@ -68,8 +68,9 @@ var (
cfg.BlocksFetchConcurrency)
}

ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string) compact.Grouper {
ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string) compact.Grouper {
return NewShuffleShardingGrouper(
ctx,
logger,
bkt,
false, // Do not accept malformed indexes
Expand All @@ -83,10 +84,15 @@ var (
cfg,
ring,
ringLifecycle.Addr,
ringLifecycle.ID,
limits,
userID,
cfg.BlockFilesConcurrency,
cfg.BlocksFetchConcurrency)
cfg.BlocksFetchConcurrency,
cfg.CompactionConcurrency,
cfg.BlockVisitMarkerTimeout,
blockVisitMarkerReadFailed,
blockVisitMarkerWriteFailed)
}

DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) {
Expand All @@ -95,7 +101,7 @@ var (
return nil, nil, err
}

plannerFactory := func(logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Planner {
plannerFactory := func(ctx context.Context, bkt objstore.Bucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ prometheus.Counter, _ prometheus.Counter) compact.Planner {
return compact.NewPlanner(logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter)
}

Expand All @@ -108,9 +114,9 @@ var (
return nil, nil, err
}

plannerFactory := func(logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Planner {
plannerFactory := func(ctx context.Context, bkt objstore.Bucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter) compact.Planner {

return NewShuffleShardingPlanner(logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks)
return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.BlockVisitMarkerTimeout, cfg.BlockVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed)
}
return compactor, plannerFactory, nil
}
Expand All @@ -127,6 +133,8 @@ type BlocksGrouperFactory func(
blocksMarkedForNoCompact prometheus.Counter,
garbageCollectedBlocks prometheus.Counter,
remainingPlannedCompactions prometheus.Gauge,
blockVisitMarkerReadFailed prometheus.Counter,
blockVisitMarkerWriteFailed prometheus.Counter,
ring *ring.Ring,
ringLifecycler *ring.Lifecycler,
limit Limits,
Expand All @@ -142,9 +150,14 @@ type BlocksCompactorFactory func(
) (compact.Compactor, PlannerFactory, error)

type PlannerFactory func(
ctx context.Context,
bkt objstore.Bucket,
logger log.Logger,
cfg Config,
noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter,
ringLifecycle *ring.Lifecycler,
blockVisitMarkerReadFailed prometheus.Counter,
blockVisitMarkerWriteFailed prometheus.Counter,
) compact.Planner

// Limits defines limits used by the Compactor.
Expand Down Expand Up @@ -190,6 +203,10 @@ type Config struct {
// Allow downstream projects to customise the blocks compactor.
BlocksGrouperFactory BlocksGrouperFactory `yaml:"-"`
BlocksCompactorFactory BlocksCompactorFactory `yaml:"-"`

// Block visit marker file config
BlockVisitMarkerTimeout time.Duration `yaml:"block_visit_marker_timeout"`
BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"`
}

// RegisterFlags registers the Compactor flags.
Expand Down Expand Up @@ -223,6 +240,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

f.Var(&cfg.EnabledTenants, "compactor.enabled-tenants", "Comma separated list of tenants that can be compacted. If specified, only these tenants will be compacted by compactor, otherwise all tenants can be compacted. Subject to sharding.")
f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.")

f.DurationVar(&cfg.BlockVisitMarkerTimeout, "compactor.block-visit-marker-timeout", 5*time.Minute, "How long block visit marker file should be considered as expired and able to be picked up by compactor again.")
f.DurationVar(&cfg.BlockVisitMarkerFileUpdateInterval, "compactor.block-visit-marker-file-update-interval", 1*time.Minute, "How frequently block visit marker file should be updated duration compaction.")
}

func (cfg *Config) Validate(limits validation.Limits) error {
Expand Down Expand Up @@ -306,6 +326,8 @@ type Compactor struct {
blocksMarkedForNoCompaction prometheus.Counter
garbageCollectedBlocks prometheus.Counter
remainingPlannedCompactions prometheus.Gauge
blockVisitMarkerReadFailed prometheus.Counter
blockVisitMarkerWriteFailed prometheus.Counter

// TSDB syncer metrics
syncerMetrics *syncerMetrics
Expand Down Expand Up @@ -423,6 +445,14 @@ func newCompactor(
Name: "cortex_compactor_garbage_collected_blocks_total",
Help: "Total number of blocks marked for deletion by compactor.",
}),
blockVisitMarkerReadFailed: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_block_visit_marker_read_failed",
Help: "Number of block visit marker file failed to be read.",
}),
blockVisitMarkerWriteFailed: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_block_visit_marker_write_failed",
Help: "Number of block visit marker file failed to be written.",
}),
remainingPlannedCompactions: remainingPlannedCompactions,
limits: limits,
}
Expand Down Expand Up @@ -760,11 +790,13 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
return errors.Wrap(err, "failed to create syncer")
}

currentCtx, cancel := context.WithCancel(ctx)
defer cancel()
compactor, err := compact.NewBucketCompactor(
ulogger,
syncer,
c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks, c.remainingPlannedCompactions, c.ring, c.ringLifecycler, c.limits, userID),
c.blocksPlannerFactory(ulogger, c.compactorCfg, noCompactMarkerFilter),
c.blocksGrouperFactory(currentCtx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks, c.remainingPlannedCompactions, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, c.ring, c.ringLifecycler, c.limits, userID),
c.blocksPlannerFactory(currentCtx, bucket, ulogger, c.compactorCfg, noCompactMarkerFilter, c.ringLifecycler, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed),
c.blocksCompactor,
path.Join(c.compactorCfg.DataDir, "compact"),
bucket,
Expand Down
Loading