Skip to content

Commit

Permalink
downsampling: simplify StreamedBlockWriter interface
Browse files Browse the repository at this point in the history
Reduce of use public Flush method to finalize index and meta files.
In case of error, a caller has to remove block directory with a preserved
garbage inside.

Rid of use tmp directories and renaming, syncing the final block on disk
before upload.
  • Loading branch information
xjewer committed Feb 6, 2019
1 parent e7a6b77 commit 8f06dfc
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 165 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bu
pool = downsample.NewPool()
}

b, err := tsdb.OpenBlock(logger, bdir, pool)
b, err := tsdb.OpenBlock(bdir, pool)
if err != nil {
return errors.Wrapf(err, "open block %s", m.ULID)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const (
const (
// MetaFilename is the known JSON filename for meta information.
MetaFilename = "meta.json"

// MetaVersion is a enumeration of versions supported by Thanos.
MetaVersion1 = iota + 1
)

// Meta describes the a block's meta. It wraps the known TSDB meta structure and
Expand Down Expand Up @@ -135,7 +138,7 @@ func Read(dir string) (*Meta, error) {
if err := json.Unmarshal(b, &m); err != nil {
return nil, err
}
if m.Version != 1 {
if m.Version != MetaVersion1 {
return nil, errors.Errorf("unexpected meta file version %d", m.Version)
}
return &m, nil
Expand Down
44 changes: 34 additions & 10 deletions pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package downsample

import (
"math"
"math/rand"
"os"
"path/filepath"
"time"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block/metadata"
Expand Down Expand Up @@ -47,9 +51,33 @@ func Downsample(
}
defer runutil.CloseWithErrCapture(logger, &err, chunkr, "downsample chunk reader")

// Generate new block id.
uid := ulid.MustNew(ulid.Now(), rand.New(rand.NewSource(time.Now().UnixNano())))

// Create block directory to populate with chunks, meta and index files into.
blockDir := filepath.Join(dir, uid.String())
if err := os.MkdirAll(blockDir, 0777); err != nil {
return id, errors.Wrap(err, "mkdir block dir")
}

// Remove blockDir in case of errors.
defer func() {
if err != nil {
var merr tsdb.MultiError
merr.Add(err)
merr.Add(os.RemoveAll(blockDir))
err = merr
}
}()

// Copy original meta to the new one. Update downsampling resolution and ULID for a new block.
newMeta := *origMeta
newMeta.Thanos.Downsample.Resolution = resolution
newMeta.ULID = uid

// Writes downsampled chunks right into the files, avoiding excess memory allocation.
// Flushes index and meta data afterwards aggregations.
streamedBlockWriter, err := NewWriter(dir, indexr, logger, *origMeta, resolution)
streamedBlockWriter, err := NewStreamedBlockWriter(blockDir, indexr, logger, newMeta)
if err != nil {
return id, errors.Wrap(err, "get streamed block writer")
}
Expand Down Expand Up @@ -93,11 +121,11 @@ func Downsample(
return id, errors.Wrapf(err, "expand chunk %d, series %d", c.Ref, postings.At())
}
}
if err := streamedBlockWriter.AddSeries(lset, downsampleRaw(all, resolution)); err != nil {
if err := streamedBlockWriter.WriteSeries(lset, downsampleRaw(all, resolution)); err != nil {
return id, errors.Wrapf(err, "downsample raw data, series: %d", postings.At())
}
} else {
// Downsample a block that contains aggregate chunks already.
// Downsample a block that contains aggregated chunks already.
for _, c := range chks {
aggrChunks = append(aggrChunks, c.Chunk.(*AggrChunk))
}
Expand All @@ -112,7 +140,7 @@ func Downsample(
if err != nil {
return id, errors.Wrapf(err, "downsample aggregate block, series: %d", postings.At())
}
if err := streamedBlockWriter.AddSeries(lset, downsampledChunks); err != nil {
if err := streamedBlockWriter.WriteSeries(lset, downsampledChunks); err != nil {
return id, errors.Wrapf(err, "downsample aggregated block, series: %d", postings.At())
}
}
Expand All @@ -121,12 +149,8 @@ func Downsample(
return id, errors.Wrap(postings.Err(), "iterate series set")
}

id, err = streamedBlockWriter.Flush()
if err != nil {
return id, errors.Wrap(err, "flush data in stream data")
}

return id, nil
id = uid
return
}

// currentWindow returns the end timestamp of the window that t falls into.
Expand Down
Loading

0 comments on commit 8f06dfc

Please sign in to comment.