From cdf084fdaeaf632e7c078022c6ad4322bfef2989 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Fri, 20 Sep 2024 15:55:56 +0200 Subject: [PATCH] perf(blooms): Remove compression of `.tar` archived bloom blocks (#14159) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Decompression is a CPU intensive task, especially un-gzipping. The gain of compressing a tar archive of storage optimized binary blocks is neglectable (question: is it?). In this example, the block of ~170MiB is ~3.3MiB bigger when not compressed, which is a ratio of ~2% ```bash $ ls -las 2bc017f79711e12a-2bffc5dcc0e8e964_1726004114913-1726106283939-bc42f529.tar 177048 -rw-rw-r-- 1 christian christian 181293056 Sep 18 13:49 2bc017f79711e12a-2bffc5dcc0e8e964_1726004114913-1726106283939-bc42f529.tar $ ls -las 2bc017f79711e12a-2bffc5dcc0e8e964_1726004114913-1726106283939-bc42f529.tar.gz 173732 -rw-rw-r-- 1 christian christian 177897689 Sep 18 13:49 2bc017f79711e12a-2bffc5dcc0e8e964_1726004114913-1726106283939-bc42f529.tar.gz $ qalc '(181293056 - 177897689) / 1024/ 1024' ((181293056 − 177897689) / 1024) / 1024 ≈ 3.238074303 $ qalc '181293056 / 177897689' 181293056 / 177897689 ≈ 1.019086066 ``` After some consideration, we decided to store the encoding of the bloom block in the `BlockRef`. This means, that the changes in this PR do not break compatibility with existing blocks compressed with gzip, although new blocks will not be compressed any more. However, the PR adds support for different compression algorithms, such as gzip, snappy, lz4, flate, and zstd. Compression is not configurable yet. --- Signed-off-by: Christian Haudum --- pkg/bloombuild/builder/builder.go | 5 +- pkg/bloombuild/planner/planner_test.go | 4 +- pkg/bloomgateway/bloomgateway_test.go | 2 +- pkg/compression/encoding.go | 5 +- pkg/compression/fileext.go | 50 ++++++ pkg/storage/bloom/v1/archive.go | 36 +++-- pkg/storage/bloom/v1/archive_test.go | 91 ++++++++++- .../stores/shipper/bloomshipper/cache.go | 8 +- .../stores/shipper/bloomshipper/cache_test.go | 8 +- .../stores/shipper/bloomshipper/client.go | 40 ++--- .../shipper/bloomshipper/client_test.go | 152 ++++++++++-------- .../shipper/bloomshipper/compress_utils.go | 29 ---- .../bloomshipper/compress_utils_test.go | 49 ------ .../stores/shipper/bloomshipper/fetcher.go | 10 +- .../shipper/bloomshipper/fetcher_test.go | 16 +- .../stores/shipper/bloomshipper/resolver.go | 40 ++++- .../shipper/bloomshipper/resolver_test.go | 66 +++++--- .../stores/shipper/bloomshipper/store_test.go | 7 +- 18 files changed, 387 insertions(+), 231 deletions(-) create mode 100644 pkg/compression/fileext.go delete mode 100644 pkg/storage/stores/shipper/bloomshipper/compress_utils.go delete mode 100644 pkg/storage/stores/shipper/bloomshipper/compress_utils_test.go diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index 78932e3f3e9e..fdeab9cf92c7 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -33,6 +33,9 @@ import ( "github.com/grafana/loki/v3/pkg/util/ring" ) +// TODO(chaudum): Make configurable via (per-tenant?) setting. +var blockCompressionAlgo = compression.EncNone + type Builder struct { services.Service @@ -404,7 +407,7 @@ func (b *Builder) processTask( blockCt++ blk := newBlocks.At() - built, err := bloomshipper.BlockFrom(tenant, task.Table.Addr(), blk) + built, err := bloomshipper.BlockFrom(blockCompressionAlgo, tenant, task.Table.Addr(), blk) if err != nil { level.Error(logger).Log("msg", "failed to build block", "err", err) if err = blk.Reader().Cleanup(); err != nil { diff --git a/pkg/bloombuild/planner/planner_test.go b/pkg/bloombuild/planner/planner_test.go index 482d277589c3..ea780c98e8ee 100644 --- a/pkg/bloombuild/planner/planner_test.go +++ b/pkg/bloombuild/planner/planner_test.go @@ -202,7 +202,7 @@ func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) { block := v1.NewBlock(reader, v1.NewMetrics(nil)) buf := bytes.NewBuffer(nil) - if err := v1.TarGz(buf, block.Reader()); err != nil { + if err := v1.TarCompress(ref.Encoding, buf, block.Reader()); err != nil { return bloomshipper.Block{}, err } @@ -1019,7 +1019,7 @@ func Test_deleteOutdatedMetas(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { logger := log.NewNopLogger() - //logger := log.NewLogfmtLogger(os.Stdout) + // logger := log.NewLogfmtLogger(os.Stdout) cfg := Config{ PlanningInterval: 1 * time.Hour, diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index 8fdc3989510a..698b1ecf40e4 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -165,7 +165,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { Through: now, Refs: groupRefs(t, chunkRefs), Plan: plan.QueryPlan{AST: expr}, - Blocks: []string{"bloom/invalid/block.tar.gz"}, + Blocks: []string{"bloom/invalid/block.tar"}, } ctx := user.InjectOrgID(context.Background(), tenantID) diff --git a/pkg/compression/encoding.go b/pkg/compression/encoding.go index 6b421ed97644..ecef31f09325 100644 --- a/pkg/compression/encoding.go +++ b/pkg/compression/encoding.go @@ -13,7 +13,7 @@ type Encoding byte const ( EncNone Encoding = iota EncGZIP - EncDumb + EncDumb // not supported EncLZ4_64k EncSnappy EncLZ4_256k @@ -41,8 +41,6 @@ func (e Encoding) String() string { return "gzip" case EncNone: return "none" - case EncDumb: - return "dumb" case EncLZ4_64k: return "lz4-64k" case EncLZ4_256k: @@ -70,7 +68,6 @@ func ParseEncoding(enc string) (Encoding, error) { } } return 0, fmt.Errorf("invalid encoding: %s, supported: %s", enc, SupportedEncoding()) - } // SupportedEncoding returns the list of supported Encoding. diff --git a/pkg/compression/fileext.go b/pkg/compression/fileext.go new file mode 100644 index 000000000000..8cd09c392d08 --- /dev/null +++ b/pkg/compression/fileext.go @@ -0,0 +1,50 @@ +package compression + +import "fmt" + +const ( + ExtNone = "" + ExtGZIP = ".gz" + ExtSnappy = ".sz" + ExtLZ4 = ".lz4" + ExtFlate = ".zz" + ExtZstd = ".zst" +) + +func ToFileExtension(e Encoding) string { + switch e { + case EncNone: + return ExtNone + case EncGZIP: + return ExtGZIP + case EncLZ4_64k, EncLZ4_256k, EncLZ4_1M, EncLZ4_4M: + return ExtLZ4 + case EncSnappy: + return ExtSnappy + case EncFlate: + return ExtFlate + case EncZstd: + return ExtZstd + default: + panic(fmt.Sprintf("invalid encoding: %d, supported: %s", e, SupportedEncoding())) + } +} + +func FromFileExtension(ext string) Encoding { + switch ext { + case ExtNone: + return EncNone + case ExtGZIP: + return EncGZIP + case ExtLZ4: + return EncLZ4_4M + case ExtSnappy: + return EncSnappy + case ExtFlate: + return EncFlate + case ExtZstd: + return EncZstd + default: + panic(fmt.Sprintf("invalid file extension: %s", ext)) + } +} diff --git a/pkg/storage/bloom/v1/archive.go b/pkg/storage/bloom/v1/archive.go index 201b071b2500..fce83d69e41d 100644 --- a/pkg/storage/bloom/v1/archive.go +++ b/pkg/storage/bloom/v1/archive.go @@ -11,22 +11,34 @@ import ( "github.com/grafana/loki/v3/pkg/compression" ) +const ( + ExtTar = ".tar" +) + type TarEntry struct { Name string Size int64 Body io.ReadSeeker } -func TarGz(dst io.Writer, reader BlockReader) error { +func TarCompress(enc compression.Encoding, dst io.Writer, reader BlockReader) error { + comprPool := compression.GetWriterPool(enc) + comprWriter := comprPool.GetWriter(dst) + defer func() { + comprWriter.Close() + comprPool.PutWriter(comprWriter) + }() + + return Tar(comprWriter, reader) +} + +func Tar(dst io.Writer, reader BlockReader) error { itr, err := reader.TarEntries() if err != nil { return errors.Wrap(err, "error getting tar entries") } - gzipper := compression.GetWriterPool(compression.EncGZIP).GetWriter(dst) - defer gzipper.Close() - - tarballer := tar.NewWriter(gzipper) + tarballer := tar.NewWriter(dst) defer tarballer.Close() for itr.Next() { @@ -49,13 +61,19 @@ func TarGz(dst io.Writer, reader BlockReader) error { return itr.Err() } -func UnTarGz(dst string, r io.Reader) error { - gzipper, err := compression.GetReaderPool(compression.EncGZIP).GetReader(r) +func UnTarCompress(enc compression.Encoding, dst string, r io.Reader) error { + comprPool := compression.GetReaderPool(enc) + comprReader, err := comprPool.GetReader(r) if err != nil { - return errors.Wrap(err, "error getting gzip reader") + return errors.Wrapf(err, "error getting %s reader", enc.String()) } + defer comprPool.PutReader(comprReader) + + return UnTar(dst, comprReader) +} - tarballer := tar.NewReader(gzipper) +func UnTar(dst string, r io.Reader) error { + tarballer := tar.NewReader(r) for { header, err := tarballer.Next() diff --git a/pkg/storage/bloom/v1/archive_test.go b/pkg/storage/bloom/v1/archive_test.go index e0d2f69a1c84..b7857a4b5ed1 100644 --- a/pkg/storage/bloom/v1/archive_test.go +++ b/pkg/storage/bloom/v1/archive_test.go @@ -24,7 +24,7 @@ func TestArchive(t *testing.T) { BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: compression.EncSnappy, + encoding: compression.EncNone, }, SeriesPageSize: 100, BloomPageSize: 10 << 10, @@ -40,9 +40,9 @@ func TestArchive(t *testing.T) { reader := NewDirectoryBlockReader(dir1) w := bytes.NewBuffer(nil) - require.Nil(t, TarGz(w, reader)) + require.Nil(t, Tar(w, reader)) - require.Nil(t, UnTarGz(dir2, w)) + require.Nil(t, UnTar(dir2, w)) reader2 := NewDirectoryBlockReader(dir2) @@ -78,3 +78,88 @@ func TestArchive(t *testing.T) { require.Nil(t, err) require.Equal(t, srcBloomsBytes, dstBloomsBytes) } + +func TestArchiveCompression(t *testing.T) { + t.Parallel() + for _, tc := range []struct { + enc compression.Encoding + }{ + {compression.EncNone}, + {compression.EncGZIP}, + {compression.EncSnappy}, + {compression.EncLZ4_64k}, + {compression.EncLZ4_256k}, + {compression.EncLZ4_1M}, + {compression.EncLZ4_4M}, + {compression.EncFlate}, + {compression.EncZstd}, + } { + t.Run(tc.enc.String(), func(t *testing.T) { + // for writing files to two dirs for comparison and ensuring they're equal + dir1 := t.TempDir() + dir2 := t.TempDir() + + numSeries := 100 + data, _ := MkBasicSeriesWithBlooms(numSeries, 0x0000, 0xffff, 0, 10000) + + builder, err := NewBlockBuilder( + BlockOptions{ + Schema: Schema{ + version: CurrentSchemaVersion, + encoding: compression.EncNone, + }, + SeriesPageSize: 100, + BloomPageSize: 10 << 10, + }, + NewDirectoryBlockWriter(dir1), + ) + + require.Nil(t, err) + itr := v2.NewSliceIter[SeriesWithBlooms](data) + _, err = builder.BuildFrom(itr) + require.Nil(t, err) + + reader := NewDirectoryBlockReader(dir1) + + w := bytes.NewBuffer(nil) + require.Nil(t, TarCompress(tc.enc, w, reader)) + + require.Nil(t, UnTarCompress(tc.enc, dir2, w)) + + reader2 := NewDirectoryBlockReader(dir2) + + // Check Index is byte for byte equivalent + srcIndex, err := reader.Index() + require.Nil(t, err) + _, err = srcIndex.Seek(0, io.SeekStart) + require.Nil(t, err) + dstIndex, err := reader2.Index() + require.Nil(t, err) + _, err = dstIndex.Seek(0, io.SeekStart) + require.Nil(t, err) + + srcIndexBytes, err := io.ReadAll(srcIndex) + require.Nil(t, err) + dstIndexBytes, err := io.ReadAll(dstIndex) + require.Nil(t, err) + require.Equal(t, srcIndexBytes, dstIndexBytes) + + // Check Blooms is byte for byte equivalent + srcBlooms, err := reader.Blooms() + require.Nil(t, err) + _, err = srcBlooms.Seek(0, io.SeekStart) + require.Nil(t, err) + dstBlooms, err := reader2.Blooms() + require.Nil(t, err) + _, err = dstBlooms.Seek(0, io.SeekStart) + require.Nil(t, err) + + srcBloomsBytes, err := io.ReadAll(srcBlooms) + require.Nil(t, err) + dstBloomsBytes, err := io.ReadAll(dstBlooms) + require.Nil(t, err) + require.Equal(t, srcBloomsBytes, dstBloomsBytes) + + }) + } +} diff --git a/pkg/storage/stores/shipper/bloomshipper/cache.go b/pkg/storage/stores/shipper/bloomshipper/cache.go index 203d15684502..838866e1dee8 100644 --- a/pkg/storage/stores/shipper/bloomshipper/cache.go +++ b/pkg/storage/stores/shipper/bloomshipper/cache.go @@ -94,15 +94,15 @@ func loadBlockDirectories(root string, logger log.Logger) (keys []string, values return nil } - ref, err := resolver.ParseBlockKey(key(path)) + // The block file extension (.tar) needs to be added so the key can be parsed. + // This is because the extension is stripped off when the tar archive is extracted. + ref, err := resolver.ParseBlockKey(key(path + blockExtension)) if err != nil { return nil } if ok, clean := isBlockDir(path, logger); ok { - // the cache key must not contain the directory prefix - // therefore we use the defaultKeyResolver to resolve the block's address - key := defaultKeyResolver{}.Block(ref).Addr() + key := cacheKey(ref) keys = append(keys, key) values = append(values, NewBlockDirectory(ref, path)) level.Debug(logger).Log("msg", "found block directory", "path", path, "key", key) diff --git a/pkg/storage/stores/shipper/bloomshipper/cache_test.go b/pkg/storage/stores/shipper/bloomshipper/cache_test.go index 941b7fa29e99..763036e56ac7 100644 --- a/pkg/storage/stores/shipper/bloomshipper/cache_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/cache_test.go @@ -12,6 +12,7 @@ import ( "github.com/go-kit/log" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config" ) @@ -63,7 +64,8 @@ func Test_LoadBlocksDirIntoCache(t *testing.T) { wd := t.TempDir() // plain file - fp, _ := os.Create(filepath.Join(wd, "regular-file.tar.gz")) + ext := blockExtension + compression.ExtGZIP + fp, _ := os.Create(filepath.Join(wd, "regular-file"+ext)) fp.Close() // invalid directory @@ -99,8 +101,8 @@ func Test_LoadBlocksDirIntoCache(t *testing.T) { require.Equal(t, 1, len(c.entries)) - key := validDir + ".tar.gz" // cache key must not contain directory prefix - elem, found := c.entries[key] + // cache key does neither contain directory prefix nor file extension suffix + elem, found := c.entries[validDir] require.True(t, found) blockDir := elem.Value.(*Entry).Value require.Equal(t, filepath.Join(wd, validDir), blockDir.Path) diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go index 2ce0e0a149ee..1390b0d9c52e 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client.go +++ b/pkg/storage/stores/shipper/bloomshipper/client.go @@ -7,7 +7,6 @@ import ( "fmt" "hash" "io" - "strings" "sync" "time" @@ -18,6 +17,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/grafana/loki/v3/pkg/compression" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/chunk/client" "github.com/grafana/loki/v3/pkg/storage/chunk/client/util" @@ -73,6 +73,7 @@ func (r Ref) Interval() Interval { type BlockRef struct { Ref + compression.Encoding } func (r BlockRef) String() string { @@ -208,29 +209,31 @@ func (c ClosableReadSeekerAdapter) Close() error { return nil } -func BlockRefFrom(tenant, table string, md v1.BlockMetadata) BlockRef { - return BlockRef{ - Ref: Ref{ - TenantID: tenant, - TableName: table, - Bounds: md.Series.Bounds, - StartTimestamp: md.Series.FromTs, - EndTimestamp: md.Series.ThroughTs, - Checksum: md.Checksum, - }, +func newRefFrom(tenant, table string, md v1.BlockMetadata) Ref { + return Ref{ + TenantID: tenant, + TableName: table, + Bounds: md.Series.Bounds, + StartTimestamp: md.Series.FromTs, + EndTimestamp: md.Series.ThroughTs, + Checksum: md.Checksum, } } -func BlockFrom(tenant, table string, blk *v1.Block) (Block, error) { +func newBlockRefWithEncoding(ref Ref, enc compression.Encoding) BlockRef { + return BlockRef{Ref: ref, Encoding: enc} +} + +func BlockFrom(enc compression.Encoding, tenant, table string, blk *v1.Block) (Block, error) { md, _ := blk.Metadata() - ref := BlockRefFrom(tenant, table, md) + ref := newBlockRefWithEncoding(newRefFrom(tenant, table, md), enc) // TODO(owen-d): pool buf := bytes.NewBuffer(nil) - err := v1.TarGz(buf, blk.Reader()) + err := v1.TarCompress(ref.Encoding, buf, blk.Reader()) if err != nil { - return Block{}, errors.Wrap(err, "archiving+compressing block") + return Block{}, err } reader := bytes.NewReader(buf.Bytes()) @@ -320,15 +323,14 @@ func (b *BloomClient) GetBlock(ctx context.Context, ref BlockRef) (BlockDirector } defer rc.Close() - path := b.fsResolver.Block(ref).LocalPath() - // the block directory should not contain the .tar.gz extension - path = strings.TrimSuffix(path, ".tar.gz") + // the block directory must not contain the .tar(.compression) extension + path := localFilePathWithoutExtension(ref, b.fsResolver) err = util.EnsureDirectory(path) if err != nil { return BlockDirectory{}, fmt.Errorf("failed to create block directory %s: %w", path, err) } - err = v1.UnTarGz(path, rc) + err = v1.UnTarCompress(ref.Encoding, path, rc) if err != nil { return BlockDirectory{}, fmt.Errorf("failed to extract block file %s: %w", key, err) } diff --git a/pkg/storage/stores/shipper/bloomshipper/client_test.go b/pkg/storage/stores/shipper/bloomshipper/client_test.go index dff01dcae50a..13ce7a7c97ae 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/client_test.go @@ -14,12 +14,25 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/compression" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/chunk/client" "github.com/grafana/loki/v3/pkg/storage/chunk/client/testutils" "github.com/grafana/loki/v3/pkg/storage/config" ) +var supportedCompressions = []compression.Encoding{ + compression.EncNone, + compression.EncGZIP, + compression.EncSnappy, + compression.EncLZ4_64k, + compression.EncLZ4_256k, + compression.EncLZ4_1M, + compression.EncLZ4_4M, + compression.EncFlate, + compression.EncZstd, +} + func parseTime(s string) model.Time { t, err := time.Parse("2006-01-02 15:04", s) if err != nil { @@ -196,18 +209,18 @@ func TestBloomClient_DeleteMetas(t *testing.T) { }) } -func putBlock(t *testing.T, c *BloomClient, tenant string, start model.Time, minFp, maxFp model.Fingerprint) (Block, error) { +func putBlock(t *testing.T, c *BloomClient, tenant string, start model.Time, minFp, maxFp model.Fingerprint, enc compression.Encoding) (Block, error) { step := int64((24 * time.Hour).Seconds()) day := start.Unix() / step tmpDir := t.TempDir() - fp, _ := os.CreateTemp(t.TempDir(), "*.tar.gz") + fp, _ := os.CreateTemp(t.TempDir(), "*"+blockExtension+compression.ToFileExtension(enc)) blockWriter := v1.NewDirectoryBlockWriter(tmpDir) err := blockWriter.Init() require.NoError(t, err) - err = v1.TarGz(fp, v1.NewDirectoryBlockReader(tmpDir)) + err = v1.TarCompress(enc, fp, v1.NewDirectoryBlockReader(tmpDir)) require.NoError(t, err) _, _ = fp.Seek(0, 0) @@ -221,40 +234,48 @@ func putBlock(t *testing.T, c *BloomClient, tenant string, start model.Time, min StartTimestamp: start, EndTimestamp: start.Add(12 * time.Hour), }, + Encoding: enc, }, Data: fp, } - return block, c.client.PutObject(context.Background(), c.Block(block.BlockRef).Addr(), block.Data) + key := c.Block(block.BlockRef).Addr() + t.Logf("PUT block to storage: %s", key) + return block, c.client.PutObject(context.Background(), key, block.Data) } func TestBloomClient_GetBlock(t *testing.T) { - c, _ := newMockBloomClient(t) - ctx := context.Background() - - b, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0xffff) - require.NoError(t, err) + for _, enc := range supportedCompressions { + c, _ := newMockBloomClient(t) + ctx := context.Background() - t.Run("exists", func(t *testing.T) { - blockDir, err := c.GetBlock(ctx, b.BlockRef) + b, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0xffff, enc) require.NoError(t, err) - require.Equal(t, b.BlockRef, blockDir.BlockRef) - }) - t.Run("does not exist", func(t *testing.T) { - blockDir, err := c.GetBlock(ctx, BlockRef{}) - require.Error(t, err) - require.True(t, c.client.IsObjectNotFoundErr(err)) - require.Equal(t, blockDir, BlockDirectory{}) - }) + t.Run(enc.String(), func(t *testing.T) { + + t.Run("exists", func(t *testing.T) { + blockDir, err := c.GetBlock(ctx, b.BlockRef) + require.NoError(t, err) + require.Equal(t, b.BlockRef, blockDir.BlockRef) + }) + + t.Run("does not exist", func(t *testing.T) { + blockDir, err := c.GetBlock(ctx, BlockRef{}) + require.Error(t, err) + require.True(t, c.client.IsObjectNotFoundErr(err)) + require.Equal(t, blockDir, BlockDirectory{}) + }) + }) + } } func TestBloomClient_GetBlocks(t *testing.T) { c, _ := newMockBloomClient(t) ctx := context.Background() - b1, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0x0fff) + b1, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0x0fff, compression.EncGZIP) require.NoError(t, err) - b2, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x1000, 0xffff) + b2, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x1000, 0xffff, compression.EncNone) require.NoError(t, err) t.Run("exists", func(t *testing.T) { @@ -271,57 +292,62 @@ func TestBloomClient_GetBlocks(t *testing.T) { } func TestBloomClient_PutBlock(t *testing.T) { - c, _ := newMockBloomClient(t) - ctx := context.Background() - - start := parseTime("2024-02-05 12:00") - - tmpDir := t.TempDir() - fp, _ := os.CreateTemp(t.TempDir(), "*.tar.gz") - - blockWriter := v1.NewDirectoryBlockWriter(tmpDir) - err := blockWriter.Init() - require.NoError(t, err) - - err = v1.TarGz(fp, v1.NewDirectoryBlockReader(tmpDir)) - require.NoError(t, err) - - block := Block{ - BlockRef: BlockRef{ - Ref: Ref{ - TenantID: "tenant", - Bounds: v1.NewBounds(0x0000, 0xffff), - TableName: "table_1234", - StartTimestamp: start, - EndTimestamp: start.Add(12 * time.Hour), - }, - }, - Data: fp, + for _, enc := range supportedCompressions { + t.Run(enc.String(), func(t *testing.T) { + c, _ := newMockBloomClient(t) + ctx := context.Background() + + start := parseTime("2024-02-05 12:00") + + tmpDir := t.TempDir() + fp, _ := os.CreateTemp(t.TempDir(), "*"+blockExtension+compression.ToFileExtension(enc)) + + blockWriter := v1.NewDirectoryBlockWriter(tmpDir) + err := blockWriter.Init() + require.NoError(t, err) + + err = v1.TarCompress(enc, fp, v1.NewDirectoryBlockReader(tmpDir)) + require.NoError(t, err) + + block := Block{ + BlockRef: BlockRef{ + Ref: Ref{ + TenantID: "tenant", + Bounds: v1.NewBounds(0x0000, 0xffff), + TableName: "table_1234", + StartTimestamp: start, + EndTimestamp: start.Add(12 * time.Hour), + }, + Encoding: enc, + }, + Data: fp, + } + + err = c.PutBlock(ctx, block) + require.NoError(t, err) + + oc := c.client.(*testutils.InMemoryObjectClient) + stored := oc.Internals() + _, found := stored[c.Block(block.BlockRef).Addr()] + require.True(t, found) + + blockDir, err := c.GetBlock(ctx, block.BlockRef) + require.NoError(t, err) + + require.Equal(t, block.BlockRef, blockDir.BlockRef) + }) } - - err = c.PutBlock(ctx, block) - require.NoError(t, err) - - oc := c.client.(*testutils.InMemoryObjectClient) - stored := oc.Internals() - _, found := stored[c.Block(block.BlockRef).Addr()] - require.True(t, found) - - blockDir, err := c.GetBlock(ctx, block.BlockRef) - require.NoError(t, err) - - require.Equal(t, block.BlockRef, blockDir.BlockRef) } func TestBloomClient_DeleteBlocks(t *testing.T) { c, _ := newMockBloomClient(t) ctx := context.Background() - b1, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0xffff) + b1, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0xffff, compression.EncNone) require.NoError(t, err) - b2, err := putBlock(t, c, "tenant", parseTime("2024-02-06 00:00"), 0x0000, 0xffff) + b2, err := putBlock(t, c, "tenant", parseTime("2024-02-06 00:00"), 0x0000, 0xffff, compression.EncGZIP) require.NoError(t, err) - b3, err := putBlock(t, c, "tenant", parseTime("2024-02-07 00:00"), 0x0000, 0xffff) + b3, err := putBlock(t, c, "tenant", parseTime("2024-02-07 00:00"), 0x0000, 0xffff, compression.EncSnappy) require.NoError(t, err) oc := c.client.(*testutils.InMemoryObjectClient) diff --git a/pkg/storage/stores/shipper/bloomshipper/compress_utils.go b/pkg/storage/stores/shipper/bloomshipper/compress_utils.go deleted file mode 100644 index 52de4a4da582..000000000000 --- a/pkg/storage/stores/shipper/bloomshipper/compress_utils.go +++ /dev/null @@ -1,29 +0,0 @@ -package bloomshipper - -import ( - "os" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - - v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" -) - -func CompressBloomBlock(ref BlockRef, archivePath, localDst string, logger log.Logger) (Block, error) { - blockToUpload := Block{} - archiveFile, err := os.Create(archivePath) - if err != nil { - return blockToUpload, err - } - - err = v1.TarGz(archiveFile, v1.NewDirectoryBlockReader(localDst)) - if err != nil { - level.Error(logger).Log("msg", "creating bloom block archive file", "err", err) - return blockToUpload, err - } - - blockToUpload.BlockRef = ref - blockToUpload.Data = archiveFile - - return blockToUpload, nil -} diff --git a/pkg/storage/stores/shipper/bloomshipper/compress_utils_test.go b/pkg/storage/stores/shipper/bloomshipper/compress_utils_test.go deleted file mode 100644 index f0b1598dadf9..000000000000 --- a/pkg/storage/stores/shipper/bloomshipper/compress_utils_test.go +++ /dev/null @@ -1,49 +0,0 @@ -package bloomshipper - -import ( - "bytes" - "io" - "os" - "path/filepath" - "testing" - - "github.com/google/uuid" - "github.com/stretchr/testify/require" - - v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" -) - -func directoryDoesNotExist(path string) bool { - _, err := os.Lstat(path) - return err != nil -} - -const testArchiveFileName = "test-block-archive" - -func createBlockArchive(t *testing.T) (string, io.Reader, string, string) { - dir := t.TempDir() - mockBlockDir := filepath.Join(dir, "mock-block-dir") - err := os.MkdirAll(mockBlockDir, 0777) - require.NoError(t, err) - bloomFile, err := os.Create(filepath.Join(mockBlockDir, v1.BloomFileName)) - require.NoError(t, err) - bloomFileContent := uuid.NewString() - _, err = io.Copy(bloomFile, bytes.NewReader([]byte(bloomFileContent))) - require.NoError(t, err) - - seriesFile, err := os.Create(filepath.Join(mockBlockDir, v1.SeriesFileName)) - require.NoError(t, err) - seriesFileContent := uuid.NewString() - _, err = io.Copy(seriesFile, bytes.NewReader([]byte(seriesFileContent))) - require.NoError(t, err) - - blockFilePath := filepath.Join(dir, testArchiveFileName) - file, err := os.OpenFile(blockFilePath, os.O_CREATE|os.O_RDWR, 0700) - require.NoError(t, err) - err = v1.TarGz(file, v1.NewDirectoryBlockReader(mockBlockDir)) - require.NoError(t, err) - - _, _ = file.Seek(0, 0) - - return blockFilePath, file, bloomFileContent, seriesFileContent -} diff --git a/pkg/storage/stores/shipper/bloomshipper/fetcher.go b/pkg/storage/stores/shipper/bloomshipper/fetcher.go index 42d8d116b64a..053078180547 100644 --- a/pkg/storage/stores/shipper/bloomshipper/fetcher.go +++ b/pkg/storage/stores/shipper/bloomshipper/fetcher.go @@ -5,7 +5,6 @@ import ( "encoding/json" "os" "path/filepath" - "strings" "sync" "time" @@ -240,7 +239,7 @@ func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef, opts ...Fetc var enqueueTime time.Duration for i := 0; i < n; i++ { - key := f.client.Block(refs[i]).Addr() + key := cacheKey(refs[i]) dir, isFound, err := f.fromCache(ctx, key) if err != nil { return results, err @@ -346,7 +345,7 @@ func (f *Fetcher) processTask(ctx context.Context, task downloadRequest[BlockRef return } - key := f.client.Block(result.BlockRef).Addr() + key := cacheKey(result.BlockRef) if task.async { // put item into cache err = f.blocksCache.Put(ctx, key, result) @@ -407,10 +406,9 @@ func (f *Fetcher) loadBlocksFromFS(_ context.Context, refs []BlockRef) ([]BlockD missing := make([]BlockRef, 0, len(refs)) for _, ref := range refs { - path := f.localFSResolver.Block(ref).LocalPath() - // the block directory does not contain the .tar.gz extension + // the block directory does not contain the .tar(.compression) extension // since it is stripped when the archive is extracted into a folder - path = strings.TrimSuffix(path, ".tar.gz") + path := localFilePathWithoutExtension(ref, f.localFSResolver) if ok, clean := f.isBlockDir(path); ok { blockDirs = append(blockDirs, NewBlockDirectory(ref, path)) } else { diff --git a/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go b/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go index e7723b6d2653..9361c35e90eb 100644 --- a/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go @@ -6,7 +6,6 @@ import ( "fmt" "os" "path/filepath" - "strings" "testing" "time" @@ -15,6 +14,7 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/compression" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" "github.com/grafana/loki/v3/pkg/storage/chunk/client/local" @@ -329,16 +329,16 @@ func TestFetcher_LoadBlocksFromFS(t *testing.T) { refs := []BlockRef{ // no directory for block - {Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x0000, 0x0fff)}}, + {Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x0000, 0x0fff)}, Encoding: compression.EncNone}, // invalid directory for block - {Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x1000, 0x1fff)}}, + {Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x1000, 0x1fff)}, Encoding: compression.EncSnappy}, // valid directory for block - {Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x2000, 0x2fff)}}, + {Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x2000, 0x2fff)}, Encoding: compression.EncGZIP}, } dirs := []string{ - strings.TrimSuffix(resolver.Block(refs[0]).LocalPath(), ".tar.gz"), - strings.TrimSuffix(resolver.Block(refs[1]).LocalPath(), ".tar.gz"), - strings.TrimSuffix(resolver.Block(refs[2]).LocalPath(), ".tar.gz"), + localFilePathWithoutExtension(refs[0], resolver), + localFilePathWithoutExtension(refs[1], resolver), + localFilePathWithoutExtension(refs[2], resolver), } createBlockDir(t, dirs[1]) @@ -360,7 +360,7 @@ func TestFetcher_LoadBlocksFromFS(t *testing.T) { require.Len(t, found, 1) require.Len(t, missing, 2) - require.Equal(t, refs[2], found[0].BlockRef) + require.Equal(t, refs[2].Ref, found[0].Ref) require.ElementsMatch(t, refs[0:2], missing) } diff --git a/pkg/storage/stores/shipper/bloomshipper/resolver.go b/pkg/storage/stores/shipper/bloomshipper/resolver.go index 8f86ce7cb09e..3115f731fe13 100644 --- a/pkg/storage/stores/shipper/bloomshipper/resolver.go +++ b/pkg/storage/stores/shipper/bloomshipper/resolver.go @@ -9,6 +9,7 @@ import ( "strconv" "strings" + "github.com/grafana/loki/v3/pkg/compression" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" ) @@ -17,8 +18,8 @@ const ( MetasPrefix = "metas" BlocksPrefix = "blocks" - extTarGz = ".tar.gz" - extJSON = ".json" + metaExtension = ".json" + blockExtension = v1.ExtTar ) // KeyResolver is an interface for resolving keys to locations. @@ -44,7 +45,7 @@ func (defaultKeyResolver) Meta(ref MetaRef) Location { fmt.Sprintf("%v", ref.TableName), ref.TenantID, MetasPrefix, - fmt.Sprintf("%v-%x%s", ref.Bounds, ref.Checksum, extJSON), + fmt.Sprintf("%v-%x%s", ref.Bounds, ref.Checksum, metaExtension), } } @@ -58,7 +59,7 @@ func (defaultKeyResolver) ParseMetaKey(loc Location) (MetaRef, error) { if err != nil { return MetaRef{}, fmt.Errorf("failed to parse bounds of meta key %s : %w", loc, err) } - withoutExt := strings.TrimSuffix(fnParts[2], extJSON) + withoutExt := strings.TrimSuffix(fnParts[2], metaExtension) checksum, err := strconv.ParseUint(withoutExt, 16, 64) if err != nil { return MetaRef{}, fmt.Errorf("failed to parse checksum of meta key %s : %w", loc, err) @@ -80,28 +81,44 @@ func (defaultKeyResolver) ParseMetaKey(loc Location) (MetaRef, error) { } func (defaultKeyResolver) Block(ref BlockRef) Location { + ext := blockExtension + compression.ToFileExtension(ref.Encoding) return simpleLocation{ BloomPrefix, fmt.Sprintf("%v", ref.TableName), ref.TenantID, BlocksPrefix, ref.Bounds.String(), - fmt.Sprintf("%d-%d-%x%s", ref.StartTimestamp, ref.EndTimestamp, ref.Checksum, extTarGz), + fmt.Sprintf("%d-%d-%x%s", ref.StartTimestamp, ref.EndTimestamp, ref.Checksum, ext), } } func (defaultKeyResolver) ParseBlockKey(loc Location) (BlockRef, error) { dir, fn := path.Split(loc.Addr()) + + ext, enc := path.Ext(fn), compression.EncNone + if ext != "" && ext != blockExtension { + // trim compression extension + fn = strings.TrimSuffix(fn, ext) + enc = compression.FromFileExtension(ext) + ext = path.Ext(fn) + if ext != "" && ext != blockExtension { + return BlockRef{}, fmt.Errorf("failed to parse block. invalid block extension: %s, expected %s", ext, blockExtension) + } + } + // trim tar extension + fn = strings.TrimSuffix(fn, ext) + fnParts := strings.Split(fn, "-") if len(fnParts) != 3 { return BlockRef{}, fmt.Errorf("failed to split filename parts of block key %s : len must be 3, but was %d", loc, len(fnParts)) } + interval, err := ParseIntervalFromParts(fnParts[0], fnParts[1]) if err != nil { return BlockRef{}, fmt.Errorf("failed to parse bounds of meta key %s : %w", loc, err) } - withoutExt := strings.TrimSuffix(fnParts[2], extTarGz) - checksum, err := strconv.ParseUint(withoutExt, 16, 64) + + checksum, err := strconv.ParseUint(fnParts[2], 16, 64) if err != nil { return BlockRef{}, fmt.Errorf("failed to parse checksum of meta key %s : %w", loc, err) } @@ -125,6 +142,7 @@ func (defaultKeyResolver) ParseBlockKey(loc Location) (BlockRef, error) { EndTimestamp: interval.End, Checksum: uint32(checksum), }, + Encoding: enc, }, nil } @@ -266,3 +284,11 @@ func (ls locations) LocalPath() string { return filepath.Join(xs...) } + +func cacheKey(ref BlockRef) string { + return strings.TrimSuffix(defaultKeyResolver{}.Block(ref).Addr(), blockExtension+compression.ToFileExtension(ref.Encoding)) +} + +func localFilePathWithoutExtension(ref BlockRef, res KeyResolver) string { + return strings.TrimSuffix(res.Block(ref).LocalPath(), blockExtension+compression.ToFileExtension(ref.Encoding)) +} diff --git a/pkg/storage/stores/shipper/bloomshipper/resolver_test.go b/pkg/storage/stores/shipper/bloomshipper/resolver_test.go index ba45845ea9ba..259bf7b2db3a 100644 --- a/pkg/storage/stores/shipper/bloomshipper/resolver_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/resolver_test.go @@ -5,6 +5,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/compression" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" ) @@ -31,27 +32,50 @@ func TestResolver_ParseMetaKey(t *testing.T) { } func TestResolver_ParseBlockKey(t *testing.T) { - r := defaultKeyResolver{} - ref := BlockRef{ - Ref: Ref{ - TenantID: "tenant", - TableName: "table_1", - Bounds: v1.NewBounds(0x0000, 0xffff), - StartTimestamp: 0, - EndTimestamp: 3600000, - Checksum: 43981, - }, - } + for _, tc := range []struct { + srcEnc, dstEnc compression.Encoding + }{ + {compression.EncNone, compression.EncNone}, + {compression.EncGZIP, compression.EncGZIP}, + {compression.EncSnappy, compression.EncSnappy}, + {compression.EncLZ4_64k, compression.EncLZ4_4M}, + {compression.EncLZ4_256k, compression.EncLZ4_4M}, + {compression.EncLZ4_1M, compression.EncLZ4_4M}, + {compression.EncLZ4_4M, compression.EncLZ4_4M}, + {compression.EncFlate, compression.EncFlate}, + {compression.EncZstd, compression.EncZstd}, + } { + t.Run(tc.srcEnc.String(), func(t *testing.T) { + r := defaultKeyResolver{} + ref := BlockRef{ + Ref: Ref{ + TenantID: "tenant", + TableName: "table_1", + Bounds: v1.NewBounds(0x0000, 0xffff), + StartTimestamp: 0, + EndTimestamp: 3600000, + Checksum: 43981, + }, + Encoding: tc.srcEnc, + } - // encode block ref as string - loc := r.Block(ref) - path := loc.LocalPath() - require.Equal(t, "bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-abcd.tar.gz", path) + // encode block ref as string + loc := r.Block(ref) + path := loc.LocalPath() + fn := "bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-abcd" + require.Equal(t, fn+blockExtension+compression.ToFileExtension(tc.srcEnc), path) + + // parse encoded string into block ref + parsed, err := r.ParseBlockKey(key(path)) + require.NoError(t, err) + expected := BlockRef{ + Ref: ref.Ref, + Encoding: tc.dstEnc, + } + require.Equal(t, expected, parsed) + }) + } - // parse encoded string into block ref - parsed, err := r.ParseBlockKey(key(path)) - require.NoError(t, err) - require.Equal(t, ref, parsed) } func TestResolver_ShardedPrefixedResolver(t *testing.T) { @@ -87,7 +111,7 @@ func TestResolver_ShardedPrefixedResolver(t *testing.T) { loc := r.Meta(metaRef) require.Equal(t, "prefix/bloom/table_1/tenant/metas/0000000000000000-000000000000ffff-abcd.json", loc.LocalPath()) loc = r.Block(blockRef) - require.Equal(t, "prefix/bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-bcde.tar.gz", loc.LocalPath()) + require.Equal(t, "prefix/bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-bcde.tar", loc.LocalPath()) }) t.Run("multiple prefixes", func(t *testing.T) { @@ -96,6 +120,6 @@ func TestResolver_ShardedPrefixedResolver(t *testing.T) { loc := r.Meta(metaRef) require.Equal(t, "b/bloom/table_1/tenant/metas/0000000000000000-000000000000ffff-abcd.json", loc.LocalPath()) loc = r.Block(blockRef) - require.Equal(t, "d/bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-bcde.tar.gz", loc.LocalPath()) + require.Equal(t, "d/bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-bcde.tar", loc.LocalPath()) }) } diff --git a/pkg/storage/stores/shipper/bloomshipper/store_test.go b/pkg/storage/stores/shipper/bloomshipper/store_test.go index 6a6705f8f0be..674e0c02a506 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/store_test.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/storage" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" @@ -109,13 +110,14 @@ func createMetaInStorage(store *BloomStore, tenant string, start model.Time, min func createBlockInStorage(t *testing.T, store *BloomStore, tenant string, start model.Time, minFp, maxFp model.Fingerprint) (Block, error) { tmpDir := t.TempDir() - fp, _ := os.CreateTemp(t.TempDir(), "*.tar.gz") + fp, _ := os.CreateTemp(t.TempDir(), "*.tar") blockWriter := v1.NewDirectoryBlockWriter(tmpDir) err := blockWriter.Init() require.NoError(t, err) - err = v1.TarGz(fp, v1.NewDirectoryBlockReader(tmpDir)) + enc := compression.EncGZIP + err = v1.TarCompress(enc, fp, v1.NewDirectoryBlockReader(tmpDir)) require.NoError(t, err) _, _ = fp.Seek(0, 0) @@ -128,6 +130,7 @@ func createBlockInStorage(t *testing.T, store *BloomStore, tenant string, start StartTimestamp: start, EndTimestamp: start.Add(12 * time.Hour), }, + Encoding: enc, }, Data: fp, }