Skip to content

Commit

Permalink
Updated step-3 with nit improvements
Browse files Browse the repository at this point in the history
Signed-off-by: soniasingla <soniasingla.1812@gmail.com>

Updated step-4 with nit improvements

Signed-off-by: soniasingla <soniasingla.1812@gmail.com>

cached, loaded

Signed-off-by: soniasingla <soniasingla.1812@gmail.com>

updated sh file

Signed-off-by: soniasingla <soniasingla.1812@gmail.com>

updated sh file

Signed-off-by: soniasingla <soniasingla.1812@gmail.com>

updated sh file

Signed-off-by: soniasingla <soniasingla.1812@gmail.com>

nit improvements

Signed-off-by: soniasingla <soniasingla.1812@gmail.com>

nit improvements

Signed-off-by: soniasingla <soniasingla.1812@gmail.com>

nit improvements

Signed-off-by: soniasingla <soniasingla.1812@gmail.com>

Update tutorials/katacoda/thanos/2-lts/intro.md

Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Signed-off-by: soniasingla <soniasingla.1812@gmail.com>

Update tutorials/katacoda/thanos/2-lts/step1.md

Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Signed-off-by: soniasingla <soniasingla.1812@gmail.com>

Update tutorials/katacoda/thanos/2-lts/step1.md

Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Signed-off-by: soniasingla <soniasingla.1812@gmail.com>

Update tutorials/katacoda/thanos/2-lts/step1.md

Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Signed-off-by: soniasingla <soniasingla.1812@gmail.com>

Update tutorials/katacoda/thanos/2-lts/step1.md

Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Signed-off-by: soniasingla <soniasingla.1812@gmail.com>

refactored states

Signed-off-by: soniasingla <soniasingla.1812@gmail.com>

Revert "Replace sync/atomic with uber-go/atomic (thanos-io#2935)"

This reverts commit 20a82b9.

Signed-off-by: soniasingla <soniasingla.1812@gmail.com>

Revert "cleanup shipper NewWithCompacted function (thanos-io#2940)"

This reverts commit 2a54885.

Signed-off-by: soniasingla <soniasingla.1812@gmail.com>

running querier

Signed-off-by: soniasingla <soniasingla.1812@gmail.com>

Adding updated thanos version

Signed-off-by: soniasingla <soniasingla.1812@gmail.com>

tutorials/katacoda/thanos/2-lts/index.json
~
~
~
~
~
~
~
~
~
~
~
~users querying data

Signed-off-by: soniasingla <soniasingla.1812@gmail.com>
  • Loading branch information
soniasingla committed Sep 28, 2020
1 parent 7fe647a commit 30404d3
Show file tree
Hide file tree
Showing 29 changed files with 196 additions and 93 deletions.
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,7 @@ github.com/prometheus/prometheus/pkg/testutils=github.com/thanos-io/thanos/pkg/t
github.com/prometheus/client_golang/prometheus.{DefaultGatherer,DefBuckets,NewUntypedFunc,UntypedFunc},\
github.com/prometheus/client_golang/prometheus.{NewCounter,NewCounterVec,NewCounterVec,NewGauge,NewGaugeVec,NewGaugeFunc,\
NewHistorgram,NewHistogramVec,NewSummary,NewSummaryVec}=github.com/prometheus/client_golang/prometheus/promauto.{NewCounter,\
NewCounterVec,NewCounterVec,NewGauge,NewGaugeVec,NewGaugeFunc,NewHistorgram,NewHistogramVec,NewSummary,NewSummaryVec},\
sync/atomic=go.uber.org/atomic" ./...
NewCounterVec,NewCounterVec,NewGauge,NewGaugeVec,NewGaugeFunc,NewHistorgram,NewHistogramVec,NewSummary,NewSummaryVec}" ./...
@$(FAILLINT) -paths "fmt.{Print,Println,Sprint}" -ignore-tests ./...
@echo ">> linting all of the Go files GOGC=${GOGC}"
@$(GOLANGCI_LINT) run
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ func runRule(
}
}()

s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource, false, allowOutOfOrderUpload)
s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource, allowOutOfOrderUpload)

ctx, cancel := context.WithCancel(context.Background())

Expand Down
8 changes: 6 additions & 2 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,12 @@ func runSidecar(
return errors.Wrapf(err, "aborting as no external labels found after waiting %s", promReadyTimeout)
}

s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource,
conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload)
var s *shipper.Shipper
if conf.shipper.uploadCompacted {
s = shipper.NewWithCompacted(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, conf.shipper.allowOutOfOrderUpload)
} else {
s = shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, conf.shipper.allowOutOfOrderUpload)
}

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if uploaded, err := s.Sync(ctx); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions docs/components/sidecar.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Prometheus servers connected to the Thanos cluster via the sidecar are subject t
If you choose to use the sidecar to also upload data to object storage:

* Must specify object storage (`--objstore.*` flags)
* It only uploads uncompacted Prometheus blocks. For compacted blocks, see [Upload compacted blocks](./sidecar.md/#upload-compacted-blocks).
* It only uploads uncompacted Prometheus blocks. For compacted blocks, see [Upload compacted blocks](./sidecar.md/#upload-compacted-blocks-experimental).
* The `--storage.tsdb.min-block-duration` and `--storage.tsdb.max-block-duration` must be set to equal values to disable local compaction on order to use Thanos sidecar upload, otherwise leave local compaction on if sidecar just exposes StoreAPI and your retention is normal. The default of `2h` is recommended.
Mentioned parameters set to equal values disable the internal Prometheus compaction, which is needed to avoid the uploaded data corruption when Thanos compactor does its job, this is critical for data consistency and should not be ignored if you plan to use Thanos compactor. Even though you set mentioned parameters equal, you might observe Prometheus internal metric `prometheus_tsdb_compactions_total` being incremented, don't be confused by that: Prometheus writes initial head block to filesytem via its internal compaction mechanism, but if you have followed recommendations - data won't be modified by Prometheus before the sidecar uploads it. Thanos sidecar will also check sanity of the flags set to Prometheus on the startup and log errors or warning if they have been configured improperly (#838).
* The retention is recommended to not be lower than three times the min block duration, so 6 hours. This achieves resilience in the face of connectivity issues to the object storage since all local data will remain available within the Thanos cluster. If connectivity gets restored the backlog of blocks gets uploaded to the object storage.
Expand Down Expand Up @@ -70,7 +70,7 @@ config:
bucket: example-bucket
```
## Upload compacted blocks
## Upload compacted blocks (EXPERIMENTAL)
If you want to migrate from a pure Prometheus setup to Thanos and have to keep the historical data, you can use the flag `--shipper.upload-compacted`. This will also upload blocks that were compacted by Prometheus. Values greater than 1 in the `compaction.level` field of a Prometheus block’s `meta.json` file indicate level of compaction.

Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ require (
github.com/uber/jaeger-lib v2.2.0+incompatible
go.elastic.co/apm v1.5.0
go.elastic.co/apm/module/apmot v1.5.0
go.uber.org/atomic v1.6.0
go.uber.org/automaxprocs v1.2.0
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
Expand Down
8 changes: 4 additions & 4 deletions pkg/alert/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net/url"
"path"
"sync"
"sync/atomic"
"time"

"github.com/go-kit/kit/log"
Expand All @@ -24,7 +25,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"go.uber.org/atomic"

"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/tracing"
Expand Down Expand Up @@ -370,7 +370,7 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) {

var (
wg sync.WaitGroup
numSuccess atomic.Uint64
numSuccess uint64
)
for _, am := range s.alertmanagers {
for _, u := range am.dispatcher.Endpoints() {
Expand All @@ -396,14 +396,14 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) {
s.latency.WithLabelValues(u.Host).Observe(time.Since(start).Seconds())
s.sent.WithLabelValues(u.Host).Add(float64(len(alerts)))

numSuccess.Inc()
atomic.AddUint64(&numSuccess, 1)
})
}(am, *u)
}
}
wg.Wait()

if numSuccess.Load() > 0 {
if numSuccess > 0 {
return
}

Expand Down
19 changes: 10 additions & 9 deletions pkg/prober/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ package prober
import (
"io"
"net/http"
"sync/atomic"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"go.uber.org/atomic"
)

type check func() bool

// HTTPProbe represents health and readiness status of given component, and provides HTTP integration.
type HTTPProbe struct {
ready atomic.Uint32
healthy atomic.Uint32
ready uint32
healthy uint32
}

// NewHTTP returns HTTPProbe representing readiness and healthiness of given component.
Expand Down Expand Up @@ -49,33 +49,34 @@ func (p *HTTPProbe) handler(logger log.Logger, c check) http.HandlerFunc {

// isReady returns true if component is ready.
func (p *HTTPProbe) isReady() bool {
ready := p.ready.Load()
ready := atomic.LoadUint32(&p.ready)
return ready > 0
}

// isHealthy returns true if component is healthy.
func (p *HTTPProbe) isHealthy() bool {
healthy := p.healthy.Load()
healthy := atomic.LoadUint32(&p.healthy)
return healthy > 0
}

// Ready sets components status to ready.
func (p *HTTPProbe) Ready() {
p.ready.Swap(1)
atomic.SwapUint32(&p.ready, 1)
}

// NotReady sets components status to not ready with given error as a cause.
func (p *HTTPProbe) NotReady(err error) {
p.ready.Swap(0)
atomic.SwapUint32(&p.ready, 0)

}

// Healthy sets components status to healthy.
func (p *HTTPProbe) Healthy() {
p.healthy.Swap(1)
atomic.SwapUint32(&p.healthy, 1)

}

// NotHealthy sets components status to not healthy with given error as a cause.
func (p *HTTPProbe) NotHealthy(err error) {
p.healthy.Swap(0)
atomic.SwapUint32(&p.healthy, 0)
}
1 change: 0 additions & 1 deletion pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
t.bucket,
func() labels.Labels { return lbls },
metadata.ReceiveSource,
false,
t.allowOutOfOrderUpload,
)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/reloader/reloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (r *Reloader) apply(ctx context.Context) error {
return err
}

// filepath.Walk uses Lstat to retrieve os.FileInfo. Lstat does not
// filepath.Walk uses Lstat to retriev os.FileInfo. Lstat does not
// follow symlinks. Make sure to follow a symlink before checking
// if it is a directory.
targetFile, err := os.Stat(path)
Expand Down
2 changes: 1 addition & 1 deletion pkg/reloader/reloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (
"path/filepath"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/fortytw2/leaktest"
"github.com/thanos-io/thanos/pkg/testutil"
"go.uber.org/atomic"
)

func TestReloader_ConfigApply(t *testing.T) {
Expand Down
40 changes: 34 additions & 6 deletions pkg/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,15 @@ type Shipper struct {
allowOutOfOrderUploads bool
}

// New creates a new shipper that detects new TSDB blocks in dir and uploads them to
// remote if necessary. It attaches the Thanos metadata section in each meta JSON file.
// If uploadCompacted is enabled, it also uploads compacted blocks which are already in filesystem.
// New creates a new shipper that detects new TSDB blocks in dir and uploads them
// to remote if necessary. It attaches the Thanos metadata section in each meta JSON file.
func New(
logger log.Logger,
r prometheus.Registerer,
dir string,
bucket objstore.Bucket,
lbls func() labels.Labels,
source metadata.SourceType,
uploadCompacted bool,
allowOutOfOrderUploads bool,
) *Shipper {
if logger == nil {
Expand All @@ -108,10 +106,40 @@ func New(
dir: dir,
bucket: bucket,
labels: lbls,
metrics: newMetrics(r, uploadCompacted),
metrics: newMetrics(r, false),
source: source,
allowOutOfOrderUploads: allowOutOfOrderUploads,
uploadCompacted: uploadCompacted,
}
}

// NewWithCompacted creates a new shipper that detects new TSDB blocks in dir and uploads them
// to remote if necessary, including compacted blocks which are already in filesystem.
// It attaches the Thanos metadata section in each meta JSON file.
func NewWithCompacted(
logger log.Logger,
r prometheus.Registerer,
dir string,
bucket objstore.Bucket,
lbls func() labels.Labels,
source metadata.SourceType,
allowOutOfOrderUploads bool,
) *Shipper {
if logger == nil {
logger = log.NewNopLogger()
}
if lbls == nil {
lbls = func() labels.Labels { return nil }
}

return &Shipper{
logger: logger,
dir: dir,
bucket: bucket,
labels: lbls,
metrics: newMetrics(r, true),
source: source,
uploadCompacted: true,
allowOutOfOrderUploads: allowOutOfOrderUploads,
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/shipper/shipper_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestShipper_SyncBlocks_e2e(t *testing.T) {
}()

extLset := labels.FromStrings("prometheus", "prom-1")
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, false, false)
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, false)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) {
defer upcancel2()
testutil.Ok(t, p.WaitPrometheusUp(upctx2))

shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, true, false)
shipper := NewWithCompacted(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, false)

// Create 10 new blocks. 9 of them (non compacted) should be actually uploaded.
var (
Expand Down
6 changes: 3 additions & 3 deletions pkg/shipper/shipper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestShipperTimestamps(t *testing.T) {
testutil.Ok(t, os.RemoveAll(dir))
}()

s := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false)
s := New(nil, nil, dir, nil, nil, metadata.TestSource, false)

// Missing thanos meta file.
_, _, err = s.Timestamps()
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestIterBlockMetas(t *testing.T) {
},
}))

shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false)
shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false)
metas, err := shipper.blockMetasFromOldest()
testutil.Ok(t, err)
testutil.Equals(t, sort.SliceIsSorted(metas, func(i, j int) bool {
Expand Down Expand Up @@ -162,7 +162,7 @@ func BenchmarkIterBlockMetas(b *testing.B) {
})
b.ResetTimer()

shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false)
shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false)

_, err = shipper.blockMetasFromOldest()
testutil.Ok(b, err)
Expand Down
18 changes: 9 additions & 9 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sort"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -51,7 +52,6 @@ import (
storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
"go.uber.org/atomic"
)

var emptyRelabelConfig = make([]*relabel.Config, 0)
Expand Down Expand Up @@ -1281,10 +1281,10 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request

if !t.IsBenchmark() {
// Make sure the pool is correctly used. This is expected for 200k numbers.
testutil.Equals(t, numOfBlocks, int(chunkPool.(*mockedPool).gets.Load()))
testutil.Equals(t, numOfBlocks, int(chunkPool.(*mockedPool).gets))
// TODO(bwplotka): This is wrong negative for large number of samples (1mln). Investigate.
testutil.Equals(t, 0, int(chunkPool.(*mockedPool).balance.Load()))
chunkPool.(*mockedPool).gets.Store(0)
testutil.Equals(t, 0, int(chunkPool.(*mockedPool).balance))
chunkPool.(*mockedPool).gets = 0

for _, b := range blocks {
// NOTE(bwplotka): It is 4 x 1.0 for 100mln samples. Kind of make sense: long series.
Expand All @@ -1306,22 +1306,22 @@ func (m fakePool) Put(_ *[]byte) {}

type mockedPool struct {
parent pool.BytesPool
balance atomic.Uint64
gets atomic.Uint64
balance uint64
gets uint64
}

func (m *mockedPool) Get(sz int) (*[]byte, error) {
b, err := m.parent.Get(sz)
if err != nil {
return nil, err
}
m.balance.Add(uint64(cap(*b)))
m.gets.Add(uint64(1))
atomic.AddUint64(&m.balance, uint64(cap(*b)))
atomic.AddUint64(&m.gets, uint64(1))
return b, nil
}

func (m *mockedPool) Put(b *[]byte) {
m.balance.Sub(uint64(cap(*b)))
atomic.AddUint64(&m.balance, ^uint64(cap(*b)-1))
m.parent.Put(b)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/store/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ package store

import (
"sync"
"sync/atomic"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
)

type ChunksLimiter interface {
Expand All @@ -25,7 +25,7 @@ type ChunksLimiterFactory func(failedCounter prometheus.Counter) ChunksLimiter
// Limiter is a simple mechanism for checking if something has passed a certain threshold.
type Limiter struct {
limit uint64
reserved atomic.Uint64
reserved uint64

// Counter metric which we will increase if limit is exceeded.
failedCounter prometheus.Counter
Expand All @@ -42,7 +42,7 @@ func (l *Limiter) Reserve(num uint64) error {
if l.limit == 0 {
return nil
}
if reserved := l.reserved.Add(num); reserved > l.limit {
if reserved := atomic.AddUint64(&l.reserved, num); reserved > l.limit {
// We need to protect from the counter being incremented twice due to concurrency
// while calling Reserve().
l.failedOnce.Do(l.failedCounter.Inc)
Expand Down
2 changes: 1 addition & 1 deletion tutorials/katacoda/thanos/2-lts/courseBase.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

docker pull dockerenginesonia/thanosbench:v7
docker pull quay.io/prometheus/prometheus:v2.19.0
docker pull quay.io/thanos/thanos:v0.13.0
docker pull quay.io/thanos/thanos:v0.15.0
docker pull minio/minio:RELEASE.2019-01-31T00-31-19Z
Loading

0 comments on commit 30404d3

Please sign in to comment.