Skip to content

Commit

Permalink
[FEAT] Add uploaded bytes metric
Browse files Browse the repository at this point in the history
Signed-off-by: rita.canavarro <rita.canavarro@farfetch.com>
  • Loading branch information
rita.canavarro committed Jul 20, 2023
1 parent 1dcbd7a commit eb42b40
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ NOTE: Querier's `query.promql-engine` flag enabling new PromQL engine is now unh
- [#5741](https://github.com/thanos-io/thanos/pull/5741) Query: add metrics on how much data is being selected by downstream Store APIs.
- [#5673](https://github.com/thanos-io/thanos/pull/5673) Receive: Reload tenant limit configuration on file change.
- [#5749](https://github.com/thanos-io/thanos/pull/5749) Query Frontend: Added small LRU cache to cache query analysis results.
- [#6500](https://github.com/thanos-io/thanos/pull/6500) Shipper: Add metric `thanos_shipper_uploaded_bytes_total` for number of total uploaded blocks.

### Changed

Expand Down
57 changes: 42 additions & 15 deletions pkg/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ type metrics struct {
uploads prometheus.Counter
uploadFailures prometheus.Counter
uploadedCompacted prometheus.Gauge
uploadedBytes prometheus.Counter
}

func newMetrics(reg prometheus.Registerer, uploadCompacted bool) *metrics {
func newMetrics(reg prometheus.Registerer) *metrics {
var m metrics

m.dirSyncs = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Expand All @@ -59,15 +60,15 @@ func newMetrics(reg prometheus.Registerer, uploadCompacted bool) *metrics {
Name: "thanos_shipper_upload_failures_total",
Help: "Total number of block upload failures",
})
uploadCompactedGaugeOpts := prometheus.GaugeOpts{
m.uploadedCompacted = promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_shipper_upload_compacted_done",
Help: "If 1 it means shipper uploaded all compacted blocks from the filesystem.",
}
if uploadCompacted {
m.uploadedCompacted = promauto.With(reg).NewGauge(uploadCompactedGaugeOpts)
} else {
m.uploadedCompacted = promauto.With(nil).NewGauge(uploadCompactedGaugeOpts)
}
})
m.uploadedBytes = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_shipper_uploaded_bytes_total",
Help: "Total number of uploaded bytes.",
})

return &m
}

Expand All @@ -80,7 +81,7 @@ type Shipper struct {
bucket objstore.Bucket
source metadata.SourceType

uploadCompacted bool
uploadCompactedFunc func() bool
allowOutOfOrderUploads bool
hashFunc metadata.HashFunc

Expand All @@ -98,7 +99,7 @@ func New(
bucket objstore.Bucket,
lbls func() labels.Labels,
source metadata.SourceType,
uploadCompacted bool,
uploadCompactedFunc func() bool,
allowOutOfOrderUploads bool,
hashFunc metadata.HashFunc,
) *Shipper {
Expand All @@ -109,15 +110,20 @@ func New(
lbls = func() labels.Labels { return nil }
}

if uploadCompactedFunc == nil {
uploadCompactedFunc = func() bool {
return false
}
}
return &Shipper{
logger: logger,
dir: dir,
bucket: bucket,
labels: lbls,
metrics: newMetrics(r, uploadCompacted),
metrics: newMetrics(r),
source: source,
allowOutOfOrderUploads: allowOutOfOrderUploads,
uploadCompacted: uploadCompacted,
uploadCompactedFunc: uploadCompactedFunc,
hashFunc: hashFunc,
}
}
Expand Down Expand Up @@ -272,6 +278,7 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
uploadErrs int
)

uploadCompacted := s.uploadCompactedFunc()
metas, err := s.blockMetasFromOldest()
if err != nil {
return 0, err
Expand All @@ -292,7 +299,7 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {

// We only ship of the first compacted block level as normal flow.
if m.Compaction.Level > 1 {
if !s.uploadCompacted {
if !uploadCompacted {
continue
}
}
Expand Down Expand Up @@ -339,8 +346,10 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
return uploaded, errors.Errorf("failed to sync %v blocks", uploadErrs)
}

if s.uploadCompacted {
if uploadCompacted {
s.metrics.uploadedCompacted.Set(1)
} else {
s.metrics.uploadedCompacted.Set(0)
}
return uploaded, nil
}
Expand Down Expand Up @@ -380,7 +389,25 @@ func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error {
if err := meta.WriteToDir(s.logger, updir); err != nil {
return errors.Wrap(err, "write meta file")
}
return block.Upload(ctx, s.logger, s.bucket, updir, s.hashFunc)

err := block.Upload(ctx, s.logger, s.bucket, updir, s.hashFunc)

if err != nil {
return errors.Wrap(err, "while upploading the block")
}

files, err := block.GatherFileStats(updir, s.hashFunc, s.logger)

if err != nil {
//The block upload should not stop due to issues gathering data for a metric
return nil
}

for _, x := range files {
s.metrics.uploadedBytes.Add(float64(x.SizeBytes))
}

return nil
}

// blockMetasFromOldest returns the block meta of each block found in dir
Expand Down

0 comments on commit eb42b40

Please sign in to comment.