diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index 78932e3f3e9e..fdeab9cf92c7 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -33,6 +33,9 @@ import ( "github.com/grafana/loki/v3/pkg/util/ring" ) +// TODO(chaudum): Make configurable via (per-tenant?) setting. +var blockCompressionAlgo = compression.EncNone + type Builder struct { services.Service @@ -404,7 +407,7 @@ func (b *Builder) processTask( blockCt++ blk := newBlocks.At() - built, err := bloomshipper.BlockFrom(tenant, task.Table.Addr(), blk) + built, err := bloomshipper.BlockFrom(blockCompressionAlgo, tenant, task.Table.Addr(), blk) if err != nil { level.Error(logger).Log("msg", "failed to build block", "err", err) if err = blk.Reader().Cleanup(); err != nil { diff --git a/pkg/bloombuild/planner/planner_test.go b/pkg/bloombuild/planner/planner_test.go index 482d277589c3..ea780c98e8ee 100644 --- a/pkg/bloombuild/planner/planner_test.go +++ b/pkg/bloombuild/planner/planner_test.go @@ -202,7 +202,7 @@ func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) { block := v1.NewBlock(reader, v1.NewMetrics(nil)) buf := bytes.NewBuffer(nil) - if err := v1.TarGz(buf, block.Reader()); err != nil { + if err := v1.TarCompress(ref.Encoding, buf, block.Reader()); err != nil { return bloomshipper.Block{}, err } @@ -1019,7 +1019,7 @@ func Test_deleteOutdatedMetas(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { logger := log.NewNopLogger() - //logger := log.NewLogfmtLogger(os.Stdout) + // logger := log.NewLogfmtLogger(os.Stdout) cfg := Config{ PlanningInterval: 1 * time.Hour, diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index 8fdc3989510a..698b1ecf40e4 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -165,7 +165,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { Through: now, Refs: groupRefs(t, chunkRefs), Plan: plan.QueryPlan{AST: expr}, - Blocks: []string{"bloom/invalid/block.tar.gz"}, + Blocks: []string{"bloom/invalid/block.tar"}, } ctx := user.InjectOrgID(context.Background(), tenantID) diff --git a/pkg/compression/encoding.go b/pkg/compression/encoding.go index 6b421ed97644..ecef31f09325 100644 --- a/pkg/compression/encoding.go +++ b/pkg/compression/encoding.go @@ -13,7 +13,7 @@ type Encoding byte const ( EncNone Encoding = iota EncGZIP - EncDumb + EncDumb // not supported EncLZ4_64k EncSnappy EncLZ4_256k @@ -41,8 +41,6 @@ func (e Encoding) String() string { return "gzip" case EncNone: return "none" - case EncDumb: - return "dumb" case EncLZ4_64k: return "lz4-64k" case EncLZ4_256k: @@ -70,7 +68,6 @@ func ParseEncoding(enc string) (Encoding, error) { } } return 0, fmt.Errorf("invalid encoding: %s, supported: %s", enc, SupportedEncoding()) - } // SupportedEncoding returns the list of supported Encoding. diff --git a/pkg/compression/fileext.go b/pkg/compression/fileext.go new file mode 100644 index 000000000000..8cd09c392d08 --- /dev/null +++ b/pkg/compression/fileext.go @@ -0,0 +1,50 @@ +package compression + +import "fmt" + +const ( + ExtNone = "" + ExtGZIP = ".gz" + ExtSnappy = ".sz" + ExtLZ4 = ".lz4" + ExtFlate = ".zz" + ExtZstd = ".zst" +) + +func ToFileExtension(e Encoding) string { + switch e { + case EncNone: + return ExtNone + case EncGZIP: + return ExtGZIP + case EncLZ4_64k, EncLZ4_256k, EncLZ4_1M, EncLZ4_4M: + return ExtLZ4 + case EncSnappy: + return ExtSnappy + case EncFlate: + return ExtFlate + case EncZstd: + return ExtZstd + default: + panic(fmt.Sprintf("invalid encoding: %d, supported: %s", e, SupportedEncoding())) + } +} + +func FromFileExtension(ext string) Encoding { + switch ext { + case ExtNone: + return EncNone + case ExtGZIP: + return EncGZIP + case ExtLZ4: + return EncLZ4_4M + case ExtSnappy: + return EncSnappy + case ExtFlate: + return EncFlate + case ExtZstd: + return EncZstd + default: + panic(fmt.Sprintf("invalid file extension: %s", ext)) + } +} diff --git a/pkg/storage/bloom/v1/archive.go b/pkg/storage/bloom/v1/archive.go index 201b071b2500..fce83d69e41d 100644 --- a/pkg/storage/bloom/v1/archive.go +++ b/pkg/storage/bloom/v1/archive.go @@ -11,22 +11,34 @@ import ( "github.com/grafana/loki/v3/pkg/compression" ) +const ( + ExtTar = ".tar" +) + type TarEntry struct { Name string Size int64 Body io.ReadSeeker } -func TarGz(dst io.Writer, reader BlockReader) error { +func TarCompress(enc compression.Encoding, dst io.Writer, reader BlockReader) error { + comprPool := compression.GetWriterPool(enc) + comprWriter := comprPool.GetWriter(dst) + defer func() { + comprWriter.Close() + comprPool.PutWriter(comprWriter) + }() + + return Tar(comprWriter, reader) +} + +func Tar(dst io.Writer, reader BlockReader) error { itr, err := reader.TarEntries() if err != nil { return errors.Wrap(err, "error getting tar entries") } - gzipper := compression.GetWriterPool(compression.EncGZIP).GetWriter(dst) - defer gzipper.Close() - - tarballer := tar.NewWriter(gzipper) + tarballer := tar.NewWriter(dst) defer tarballer.Close() for itr.Next() { @@ -49,13 +61,19 @@ func TarGz(dst io.Writer, reader BlockReader) error { return itr.Err() } -func UnTarGz(dst string, r io.Reader) error { - gzipper, err := compression.GetReaderPool(compression.EncGZIP).GetReader(r) +func UnTarCompress(enc compression.Encoding, dst string, r io.Reader) error { + comprPool := compression.GetReaderPool(enc) + comprReader, err := comprPool.GetReader(r) if err != nil { - return errors.Wrap(err, "error getting gzip reader") + return errors.Wrapf(err, "error getting %s reader", enc.String()) } + defer comprPool.PutReader(comprReader) + + return UnTar(dst, comprReader) +} - tarballer := tar.NewReader(gzipper) +func UnTar(dst string, r io.Reader) error { + tarballer := tar.NewReader(r) for { header, err := tarballer.Next() diff --git a/pkg/storage/bloom/v1/archive_test.go b/pkg/storage/bloom/v1/archive_test.go index e0d2f69a1c84..b7857a4b5ed1 100644 --- a/pkg/storage/bloom/v1/archive_test.go +++ b/pkg/storage/bloom/v1/archive_test.go @@ -24,7 +24,7 @@ func TestArchive(t *testing.T) { BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: compression.EncSnappy, + encoding: compression.EncNone, }, SeriesPageSize: 100, BloomPageSize: 10 << 10, @@ -40,9 +40,9 @@ func TestArchive(t *testing.T) { reader := NewDirectoryBlockReader(dir1) w := bytes.NewBuffer(nil) - require.Nil(t, TarGz(w, reader)) + require.Nil(t, Tar(w, reader)) - require.Nil(t, UnTarGz(dir2, w)) + require.Nil(t, UnTar(dir2, w)) reader2 := NewDirectoryBlockReader(dir2) @@ -78,3 +78,88 @@ func TestArchive(t *testing.T) { require.Nil(t, err) require.Equal(t, srcBloomsBytes, dstBloomsBytes) } + +func TestArchiveCompression(t *testing.T) { + t.Parallel() + for _, tc := range []struct { + enc compression.Encoding + }{ + {compression.EncNone}, + {compression.EncGZIP}, + {compression.EncSnappy}, + {compression.EncLZ4_64k}, + {compression.EncLZ4_256k}, + {compression.EncLZ4_1M}, + {compression.EncLZ4_4M}, + {compression.EncFlate}, + {compression.EncZstd}, + } { + t.Run(tc.enc.String(), func(t *testing.T) { + // for writing files to two dirs for comparison and ensuring they're equal + dir1 := t.TempDir() + dir2 := t.TempDir() + + numSeries := 100 + data, _ := MkBasicSeriesWithBlooms(numSeries, 0x0000, 0xffff, 0, 10000) + + builder, err := NewBlockBuilder( + BlockOptions{ + Schema: Schema{ + version: CurrentSchemaVersion, + encoding: compression.EncNone, + }, + SeriesPageSize: 100, + BloomPageSize: 10 << 10, + }, + NewDirectoryBlockWriter(dir1), + ) + + require.Nil(t, err) + itr := v2.NewSliceIter[SeriesWithBlooms](data) + _, err = builder.BuildFrom(itr) + require.Nil(t, err) + + reader := NewDirectoryBlockReader(dir1) + + w := bytes.NewBuffer(nil) + require.Nil(t, TarCompress(tc.enc, w, reader)) + + require.Nil(t, UnTarCompress(tc.enc, dir2, w)) + + reader2 := NewDirectoryBlockReader(dir2) + + // Check Index is byte for byte equivalent + srcIndex, err := reader.Index() + require.Nil(t, err) + _, err = srcIndex.Seek(0, io.SeekStart) + require.Nil(t, err) + dstIndex, err := reader2.Index() + require.Nil(t, err) + _, err = dstIndex.Seek(0, io.SeekStart) + require.Nil(t, err) + + srcIndexBytes, err := io.ReadAll(srcIndex) + require.Nil(t, err) + dstIndexBytes, err := io.ReadAll(dstIndex) + require.Nil(t, err) + require.Equal(t, srcIndexBytes, dstIndexBytes) + + // Check Blooms is byte for byte equivalent + srcBlooms, err := reader.Blooms() + require.Nil(t, err) + _, err = srcBlooms.Seek(0, io.SeekStart) + require.Nil(t, err) + dstBlooms, err := reader2.Blooms() + require.Nil(t, err) + _, err = dstBlooms.Seek(0, io.SeekStart) + require.Nil(t, err) + + srcBloomsBytes, err := io.ReadAll(srcBlooms) + require.Nil(t, err) + dstBloomsBytes, err := io.ReadAll(dstBlooms) + require.Nil(t, err) + require.Equal(t, srcBloomsBytes, dstBloomsBytes) + + }) + } +} diff --git a/pkg/storage/stores/shipper/bloomshipper/cache.go b/pkg/storage/stores/shipper/bloomshipper/cache.go index 203d15684502..838866e1dee8 100644 --- a/pkg/storage/stores/shipper/bloomshipper/cache.go +++ b/pkg/storage/stores/shipper/bloomshipper/cache.go @@ -94,15 +94,15 @@ func loadBlockDirectories(root string, logger log.Logger) (keys []string, values return nil } - ref, err := resolver.ParseBlockKey(key(path)) + // The block file extension (.tar) needs to be added so the key can be parsed. + // This is because the extension is stripped off when the tar archive is extracted. + ref, err := resolver.ParseBlockKey(key(path + blockExtension)) if err != nil { return nil } if ok, clean := isBlockDir(path, logger); ok { - // the cache key must not contain the directory prefix - // therefore we use the defaultKeyResolver to resolve the block's address - key := defaultKeyResolver{}.Block(ref).Addr() + key := cacheKey(ref) keys = append(keys, key) values = append(values, NewBlockDirectory(ref, path)) level.Debug(logger).Log("msg", "found block directory", "path", path, "key", key) diff --git a/pkg/storage/stores/shipper/bloomshipper/cache_test.go b/pkg/storage/stores/shipper/bloomshipper/cache_test.go index 941b7fa29e99..763036e56ac7 100644 --- a/pkg/storage/stores/shipper/bloomshipper/cache_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/cache_test.go @@ -12,6 +12,7 @@ import ( "github.com/go-kit/log" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config" ) @@ -63,7 +64,8 @@ func Test_LoadBlocksDirIntoCache(t *testing.T) { wd := t.TempDir() // plain file - fp, _ := os.Create(filepath.Join(wd, "regular-file.tar.gz")) + ext := blockExtension + compression.ExtGZIP + fp, _ := os.Create(filepath.Join(wd, "regular-file"+ext)) fp.Close() // invalid directory @@ -99,8 +101,8 @@ func Test_LoadBlocksDirIntoCache(t *testing.T) { require.Equal(t, 1, len(c.entries)) - key := validDir + ".tar.gz" // cache key must not contain directory prefix - elem, found := c.entries[key] + // cache key does neither contain directory prefix nor file extension suffix + elem, found := c.entries[validDir] require.True(t, found) blockDir := elem.Value.(*Entry).Value require.Equal(t, filepath.Join(wd, validDir), blockDir.Path) diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go index 2ce0e0a149ee..1390b0d9c52e 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client.go +++ b/pkg/storage/stores/shipper/bloomshipper/client.go @@ -7,7 +7,6 @@ import ( "fmt" "hash" "io" - "strings" "sync" "time" @@ -18,6 +17,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/grafana/loki/v3/pkg/compression" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/chunk/client" "github.com/grafana/loki/v3/pkg/storage/chunk/client/util" @@ -73,6 +73,7 @@ func (r Ref) Interval() Interval { type BlockRef struct { Ref + compression.Encoding } func (r BlockRef) String() string { @@ -208,29 +209,31 @@ func (c ClosableReadSeekerAdapter) Close() error { return nil } -func BlockRefFrom(tenant, table string, md v1.BlockMetadata) BlockRef { - return BlockRef{ - Ref: Ref{ - TenantID: tenant, - TableName: table, - Bounds: md.Series.Bounds, - StartTimestamp: md.Series.FromTs, - EndTimestamp: md.Series.ThroughTs, - Checksum: md.Checksum, - }, +func newRefFrom(tenant, table string, md v1.BlockMetadata) Ref { + return Ref{ + TenantID: tenant, + TableName: table, + Bounds: md.Series.Bounds, + StartTimestamp: md.Series.FromTs, + EndTimestamp: md.Series.ThroughTs, + Checksum: md.Checksum, } } -func BlockFrom(tenant, table string, blk *v1.Block) (Block, error) { +func newBlockRefWithEncoding(ref Ref, enc compression.Encoding) BlockRef { + return BlockRef{Ref: ref, Encoding: enc} +} + +func BlockFrom(enc compression.Encoding, tenant, table string, blk *v1.Block) (Block, error) { md, _ := blk.Metadata() - ref := BlockRefFrom(tenant, table, md) + ref := newBlockRefWithEncoding(newRefFrom(tenant, table, md), enc) // TODO(owen-d): pool buf := bytes.NewBuffer(nil) - err := v1.TarGz(buf, blk.Reader()) + err := v1.TarCompress(ref.Encoding, buf, blk.Reader()) if err != nil { - return Block{}, errors.Wrap(err, "archiving+compressing block") + return Block{}, err } reader := bytes.NewReader(buf.Bytes()) @@ -320,15 +323,14 @@ func (b *BloomClient) GetBlock(ctx context.Context, ref BlockRef) (BlockDirector } defer rc.Close() - path := b.fsResolver.Block(ref).LocalPath() - // the block directory should not contain the .tar.gz extension - path = strings.TrimSuffix(path, ".tar.gz") + // the block directory must not contain the .tar(.compression) extension + path := localFilePathWithoutExtension(ref, b.fsResolver) err = util.EnsureDirectory(path) if err != nil { return BlockDirectory{}, fmt.Errorf("failed to create block directory %s: %w", path, err) } - err = v1.UnTarGz(path, rc) + err = v1.UnTarCompress(ref.Encoding, path, rc) if err != nil { return BlockDirectory{}, fmt.Errorf("failed to extract block file %s: %w", key, err) } diff --git a/pkg/storage/stores/shipper/bloomshipper/client_test.go b/pkg/storage/stores/shipper/bloomshipper/client_test.go index dff01dcae50a..13ce7a7c97ae 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/client_test.go @@ -14,12 +14,25 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/compression" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/chunk/client" "github.com/grafana/loki/v3/pkg/storage/chunk/client/testutils" "github.com/grafana/loki/v3/pkg/storage/config" ) +var supportedCompressions = []compression.Encoding{ + compression.EncNone, + compression.EncGZIP, + compression.EncSnappy, + compression.EncLZ4_64k, + compression.EncLZ4_256k, + compression.EncLZ4_1M, + compression.EncLZ4_4M, + compression.EncFlate, + compression.EncZstd, +} + func parseTime(s string) model.Time { t, err := time.Parse("2006-01-02 15:04", s) if err != nil { @@ -196,18 +209,18 @@ func TestBloomClient_DeleteMetas(t *testing.T) { }) } -func putBlock(t *testing.T, c *BloomClient, tenant string, start model.Time, minFp, maxFp model.Fingerprint) (Block, error) { +func putBlock(t *testing.T, c *BloomClient, tenant string, start model.Time, minFp, maxFp model.Fingerprint, enc compression.Encoding) (Block, error) { step := int64((24 * time.Hour).Seconds()) day := start.Unix() / step tmpDir := t.TempDir() - fp, _ := os.CreateTemp(t.TempDir(), "*.tar.gz") + fp, _ := os.CreateTemp(t.TempDir(), "*"+blockExtension+compression.ToFileExtension(enc)) blockWriter := v1.NewDirectoryBlockWriter(tmpDir) err := blockWriter.Init() require.NoError(t, err) - err = v1.TarGz(fp, v1.NewDirectoryBlockReader(tmpDir)) + err = v1.TarCompress(enc, fp, v1.NewDirectoryBlockReader(tmpDir)) require.NoError(t, err) _, _ = fp.Seek(0, 0) @@ -221,40 +234,48 @@ func putBlock(t *testing.T, c *BloomClient, tenant string, start model.Time, min StartTimestamp: start, EndTimestamp: start.Add(12 * time.Hour), }, + Encoding: enc, }, Data: fp, } - return block, c.client.PutObject(context.Background(), c.Block(block.BlockRef).Addr(), block.Data) + key := c.Block(block.BlockRef).Addr() + t.Logf("PUT block to storage: %s", key) + return block, c.client.PutObject(context.Background(), key, block.Data) } func TestBloomClient_GetBlock(t *testing.T) { - c, _ := newMockBloomClient(t) - ctx := context.Background() - - b, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0xffff) - require.NoError(t, err) + for _, enc := range supportedCompressions { + c, _ := newMockBloomClient(t) + ctx := context.Background() - t.Run("exists", func(t *testing.T) { - blockDir, err := c.GetBlock(ctx, b.BlockRef) + b, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0xffff, enc) require.NoError(t, err) - require.Equal(t, b.BlockRef, blockDir.BlockRef) - }) - t.Run("does not exist", func(t *testing.T) { - blockDir, err := c.GetBlock(ctx, BlockRef{}) - require.Error(t, err) - require.True(t, c.client.IsObjectNotFoundErr(err)) - require.Equal(t, blockDir, BlockDirectory{}) - }) + t.Run(enc.String(), func(t *testing.T) { + + t.Run("exists", func(t *testing.T) { + blockDir, err := c.GetBlock(ctx, b.BlockRef) + require.NoError(t, err) + require.Equal(t, b.BlockRef, blockDir.BlockRef) + }) + + t.Run("does not exist", func(t *testing.T) { + blockDir, err := c.GetBlock(ctx, BlockRef{}) + require.Error(t, err) + require.True(t, c.client.IsObjectNotFoundErr(err)) + require.Equal(t, blockDir, BlockDirectory{}) + }) + }) + } } func TestBloomClient_GetBlocks(t *testing.T) { c, _ := newMockBloomClient(t) ctx := context.Background() - b1, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0x0fff) + b1, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0x0fff, compression.EncGZIP) require.NoError(t, err) - b2, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x1000, 0xffff) + b2, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x1000, 0xffff, compression.EncNone) require.NoError(t, err) t.Run("exists", func(t *testing.T) { @@ -271,57 +292,62 @@ func TestBloomClient_GetBlocks(t *testing.T) { } func TestBloomClient_PutBlock(t *testing.T) { - c, _ := newMockBloomClient(t) - ctx := context.Background() - - start := parseTime("2024-02-05 12:00") - - tmpDir := t.TempDir() - fp, _ := os.CreateTemp(t.TempDir(), "*.tar.gz") - - blockWriter := v1.NewDirectoryBlockWriter(tmpDir) - err := blockWriter.Init() - require.NoError(t, err) - - err = v1.TarGz(fp, v1.NewDirectoryBlockReader(tmpDir)) - require.NoError(t, err) - - block := Block{ - BlockRef: BlockRef{ - Ref: Ref{ - TenantID: "tenant", - Bounds: v1.NewBounds(0x0000, 0xffff), - TableName: "table_1234", - StartTimestamp: start, - EndTimestamp: start.Add(12 * time.Hour), - }, - }, - Data: fp, + for _, enc := range supportedCompressions { + t.Run(enc.String(), func(t *testing.T) { + c, _ := newMockBloomClient(t) + ctx := context.Background() + + start := parseTime("2024-02-05 12:00") + + tmpDir := t.TempDir() + fp, _ := os.CreateTemp(t.TempDir(), "*"+blockExtension+compression.ToFileExtension(enc)) + + blockWriter := v1.NewDirectoryBlockWriter(tmpDir) + err := blockWriter.Init() + require.NoError(t, err) + + err = v1.TarCompress(enc, fp, v1.NewDirectoryBlockReader(tmpDir)) + require.NoError(t, err) + + block := Block{ + BlockRef: BlockRef{ + Ref: Ref{ + TenantID: "tenant", + Bounds: v1.NewBounds(0x0000, 0xffff), + TableName: "table_1234", + StartTimestamp: start, + EndTimestamp: start.Add(12 * time.Hour), + }, + Encoding: enc, + }, + Data: fp, + } + + err = c.PutBlock(ctx, block) + require.NoError(t, err) + + oc := c.client.(*testutils.InMemoryObjectClient) + stored := oc.Internals() + _, found := stored[c.Block(block.BlockRef).Addr()] + require.True(t, found) + + blockDir, err := c.GetBlock(ctx, block.BlockRef) + require.NoError(t, err) + + require.Equal(t, block.BlockRef, blockDir.BlockRef) + }) } - - err = c.PutBlock(ctx, block) - require.NoError(t, err) - - oc := c.client.(*testutils.InMemoryObjectClient) - stored := oc.Internals() - _, found := stored[c.Block(block.BlockRef).Addr()] - require.True(t, found) - - blockDir, err := c.GetBlock(ctx, block.BlockRef) - require.NoError(t, err) - - require.Equal(t, block.BlockRef, blockDir.BlockRef) } func TestBloomClient_DeleteBlocks(t *testing.T) { c, _ := newMockBloomClient(t) ctx := context.Background() - b1, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0xffff) + b1, err := putBlock(t, c, "tenant", parseTime("2024-02-05 00:00"), 0x0000, 0xffff, compression.EncNone) require.NoError(t, err) - b2, err := putBlock(t, c, "tenant", parseTime("2024-02-06 00:00"), 0x0000, 0xffff) + b2, err := putBlock(t, c, "tenant", parseTime("2024-02-06 00:00"), 0x0000, 0xffff, compression.EncGZIP) require.NoError(t, err) - b3, err := putBlock(t, c, "tenant", parseTime("2024-02-07 00:00"), 0x0000, 0xffff) + b3, err := putBlock(t, c, "tenant", parseTime("2024-02-07 00:00"), 0x0000, 0xffff, compression.EncSnappy) require.NoError(t, err) oc := c.client.(*testutils.InMemoryObjectClient) diff --git a/pkg/storage/stores/shipper/bloomshipper/compress_utils.go b/pkg/storage/stores/shipper/bloomshipper/compress_utils.go deleted file mode 100644 index 52de4a4da582..000000000000 --- a/pkg/storage/stores/shipper/bloomshipper/compress_utils.go +++ /dev/null @@ -1,29 +0,0 @@ -package bloomshipper - -import ( - "os" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - - v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" -) - -func CompressBloomBlock(ref BlockRef, archivePath, localDst string, logger log.Logger) (Block, error) { - blockToUpload := Block{} - archiveFile, err := os.Create(archivePath) - if err != nil { - return blockToUpload, err - } - - err = v1.TarGz(archiveFile, v1.NewDirectoryBlockReader(localDst)) - if err != nil { - level.Error(logger).Log("msg", "creating bloom block archive file", "err", err) - return blockToUpload, err - } - - blockToUpload.BlockRef = ref - blockToUpload.Data = archiveFile - - return blockToUpload, nil -} diff --git a/pkg/storage/stores/shipper/bloomshipper/compress_utils_test.go b/pkg/storage/stores/shipper/bloomshipper/compress_utils_test.go deleted file mode 100644 index f0b1598dadf9..000000000000 --- a/pkg/storage/stores/shipper/bloomshipper/compress_utils_test.go +++ /dev/null @@ -1,49 +0,0 @@ -package bloomshipper - -import ( - "bytes" - "io" - "os" - "path/filepath" - "testing" - - "github.com/google/uuid" - "github.com/stretchr/testify/require" - - v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" -) - -func directoryDoesNotExist(path string) bool { - _, err := os.Lstat(path) - return err != nil -} - -const testArchiveFileName = "test-block-archive" - -func createBlockArchive(t *testing.T) (string, io.Reader, string, string) { - dir := t.TempDir() - mockBlockDir := filepath.Join(dir, "mock-block-dir") - err := os.MkdirAll(mockBlockDir, 0777) - require.NoError(t, err) - bloomFile, err := os.Create(filepath.Join(mockBlockDir, v1.BloomFileName)) - require.NoError(t, err) - bloomFileContent := uuid.NewString() - _, err = io.Copy(bloomFile, bytes.NewReader([]byte(bloomFileContent))) - require.NoError(t, err) - - seriesFile, err := os.Create(filepath.Join(mockBlockDir, v1.SeriesFileName)) - require.NoError(t, err) - seriesFileContent := uuid.NewString() - _, err = io.Copy(seriesFile, bytes.NewReader([]byte(seriesFileContent))) - require.NoError(t, err) - - blockFilePath := filepath.Join(dir, testArchiveFileName) - file, err := os.OpenFile(blockFilePath, os.O_CREATE|os.O_RDWR, 0700) - require.NoError(t, err) - err = v1.TarGz(file, v1.NewDirectoryBlockReader(mockBlockDir)) - require.NoError(t, err) - - _, _ = file.Seek(0, 0) - - return blockFilePath, file, bloomFileContent, seriesFileContent -} diff --git a/pkg/storage/stores/shipper/bloomshipper/fetcher.go b/pkg/storage/stores/shipper/bloomshipper/fetcher.go index 42d8d116b64a..053078180547 100644 --- a/pkg/storage/stores/shipper/bloomshipper/fetcher.go +++ b/pkg/storage/stores/shipper/bloomshipper/fetcher.go @@ -5,7 +5,6 @@ import ( "encoding/json" "os" "path/filepath" - "strings" "sync" "time" @@ -240,7 +239,7 @@ func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef, opts ...Fetc var enqueueTime time.Duration for i := 0; i < n; i++ { - key := f.client.Block(refs[i]).Addr() + key := cacheKey(refs[i]) dir, isFound, err := f.fromCache(ctx, key) if err != nil { return results, err @@ -346,7 +345,7 @@ func (f *Fetcher) processTask(ctx context.Context, task downloadRequest[BlockRef return } - key := f.client.Block(result.BlockRef).Addr() + key := cacheKey(result.BlockRef) if task.async { // put item into cache err = f.blocksCache.Put(ctx, key, result) @@ -407,10 +406,9 @@ func (f *Fetcher) loadBlocksFromFS(_ context.Context, refs []BlockRef) ([]BlockD missing := make([]BlockRef, 0, len(refs)) for _, ref := range refs { - path := f.localFSResolver.Block(ref).LocalPath() - // the block directory does not contain the .tar.gz extension + // the block directory does not contain the .tar(.compression) extension // since it is stripped when the archive is extracted into a folder - path = strings.TrimSuffix(path, ".tar.gz") + path := localFilePathWithoutExtension(ref, f.localFSResolver) if ok, clean := f.isBlockDir(path); ok { blockDirs = append(blockDirs, NewBlockDirectory(ref, path)) } else { diff --git a/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go b/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go index e7723b6d2653..9361c35e90eb 100644 --- a/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go @@ -6,7 +6,6 @@ import ( "fmt" "os" "path/filepath" - "strings" "testing" "time" @@ -15,6 +14,7 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/compression" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" "github.com/grafana/loki/v3/pkg/storage/chunk/client/local" @@ -329,16 +329,16 @@ func TestFetcher_LoadBlocksFromFS(t *testing.T) { refs := []BlockRef{ // no directory for block - {Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x0000, 0x0fff)}}, + {Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x0000, 0x0fff)}, Encoding: compression.EncNone}, // invalid directory for block - {Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x1000, 0x1fff)}}, + {Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x1000, 0x1fff)}, Encoding: compression.EncSnappy}, // valid directory for block - {Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x2000, 0x2fff)}}, + {Ref: Ref{TenantID: "tenant", TableName: "12345", Bounds: v1.NewBounds(0x2000, 0x2fff)}, Encoding: compression.EncGZIP}, } dirs := []string{ - strings.TrimSuffix(resolver.Block(refs[0]).LocalPath(), ".tar.gz"), - strings.TrimSuffix(resolver.Block(refs[1]).LocalPath(), ".tar.gz"), - strings.TrimSuffix(resolver.Block(refs[2]).LocalPath(), ".tar.gz"), + localFilePathWithoutExtension(refs[0], resolver), + localFilePathWithoutExtension(refs[1], resolver), + localFilePathWithoutExtension(refs[2], resolver), } createBlockDir(t, dirs[1]) @@ -360,7 +360,7 @@ func TestFetcher_LoadBlocksFromFS(t *testing.T) { require.Len(t, found, 1) require.Len(t, missing, 2) - require.Equal(t, refs[2], found[0].BlockRef) + require.Equal(t, refs[2].Ref, found[0].Ref) require.ElementsMatch(t, refs[0:2], missing) } diff --git a/pkg/storage/stores/shipper/bloomshipper/resolver.go b/pkg/storage/stores/shipper/bloomshipper/resolver.go index 8f86ce7cb09e..3115f731fe13 100644 --- a/pkg/storage/stores/shipper/bloomshipper/resolver.go +++ b/pkg/storage/stores/shipper/bloomshipper/resolver.go @@ -9,6 +9,7 @@ import ( "strconv" "strings" + "github.com/grafana/loki/v3/pkg/compression" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" ) @@ -17,8 +18,8 @@ const ( MetasPrefix = "metas" BlocksPrefix = "blocks" - extTarGz = ".tar.gz" - extJSON = ".json" + metaExtension = ".json" + blockExtension = v1.ExtTar ) // KeyResolver is an interface for resolving keys to locations. @@ -44,7 +45,7 @@ func (defaultKeyResolver) Meta(ref MetaRef) Location { fmt.Sprintf("%v", ref.TableName), ref.TenantID, MetasPrefix, - fmt.Sprintf("%v-%x%s", ref.Bounds, ref.Checksum, extJSON), + fmt.Sprintf("%v-%x%s", ref.Bounds, ref.Checksum, metaExtension), } } @@ -58,7 +59,7 @@ func (defaultKeyResolver) ParseMetaKey(loc Location) (MetaRef, error) { if err != nil { return MetaRef{}, fmt.Errorf("failed to parse bounds of meta key %s : %w", loc, err) } - withoutExt := strings.TrimSuffix(fnParts[2], extJSON) + withoutExt := strings.TrimSuffix(fnParts[2], metaExtension) checksum, err := strconv.ParseUint(withoutExt, 16, 64) if err != nil { return MetaRef{}, fmt.Errorf("failed to parse checksum of meta key %s : %w", loc, err) @@ -80,28 +81,44 @@ func (defaultKeyResolver) ParseMetaKey(loc Location) (MetaRef, error) { } func (defaultKeyResolver) Block(ref BlockRef) Location { + ext := blockExtension + compression.ToFileExtension(ref.Encoding) return simpleLocation{ BloomPrefix, fmt.Sprintf("%v", ref.TableName), ref.TenantID, BlocksPrefix, ref.Bounds.String(), - fmt.Sprintf("%d-%d-%x%s", ref.StartTimestamp, ref.EndTimestamp, ref.Checksum, extTarGz), + fmt.Sprintf("%d-%d-%x%s", ref.StartTimestamp, ref.EndTimestamp, ref.Checksum, ext), } } func (defaultKeyResolver) ParseBlockKey(loc Location) (BlockRef, error) { dir, fn := path.Split(loc.Addr()) + + ext, enc := path.Ext(fn), compression.EncNone + if ext != "" && ext != blockExtension { + // trim compression extension + fn = strings.TrimSuffix(fn, ext) + enc = compression.FromFileExtension(ext) + ext = path.Ext(fn) + if ext != "" && ext != blockExtension { + return BlockRef{}, fmt.Errorf("failed to parse block. invalid block extension: %s, expected %s", ext, blockExtension) + } + } + // trim tar extension + fn = strings.TrimSuffix(fn, ext) + fnParts := strings.Split(fn, "-") if len(fnParts) != 3 { return BlockRef{}, fmt.Errorf("failed to split filename parts of block key %s : len must be 3, but was %d", loc, len(fnParts)) } + interval, err := ParseIntervalFromParts(fnParts[0], fnParts[1]) if err != nil { return BlockRef{}, fmt.Errorf("failed to parse bounds of meta key %s : %w", loc, err) } - withoutExt := strings.TrimSuffix(fnParts[2], extTarGz) - checksum, err := strconv.ParseUint(withoutExt, 16, 64) + + checksum, err := strconv.ParseUint(fnParts[2], 16, 64) if err != nil { return BlockRef{}, fmt.Errorf("failed to parse checksum of meta key %s : %w", loc, err) } @@ -125,6 +142,7 @@ func (defaultKeyResolver) ParseBlockKey(loc Location) (BlockRef, error) { EndTimestamp: interval.End, Checksum: uint32(checksum), }, + Encoding: enc, }, nil } @@ -266,3 +284,11 @@ func (ls locations) LocalPath() string { return filepath.Join(xs...) } + +func cacheKey(ref BlockRef) string { + return strings.TrimSuffix(defaultKeyResolver{}.Block(ref).Addr(), blockExtension+compression.ToFileExtension(ref.Encoding)) +} + +func localFilePathWithoutExtension(ref BlockRef, res KeyResolver) string { + return strings.TrimSuffix(res.Block(ref).LocalPath(), blockExtension+compression.ToFileExtension(ref.Encoding)) +} diff --git a/pkg/storage/stores/shipper/bloomshipper/resolver_test.go b/pkg/storage/stores/shipper/bloomshipper/resolver_test.go index ba45845ea9ba..259bf7b2db3a 100644 --- a/pkg/storage/stores/shipper/bloomshipper/resolver_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/resolver_test.go @@ -5,6 +5,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/compression" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" ) @@ -31,27 +32,50 @@ func TestResolver_ParseMetaKey(t *testing.T) { } func TestResolver_ParseBlockKey(t *testing.T) { - r := defaultKeyResolver{} - ref := BlockRef{ - Ref: Ref{ - TenantID: "tenant", - TableName: "table_1", - Bounds: v1.NewBounds(0x0000, 0xffff), - StartTimestamp: 0, - EndTimestamp: 3600000, - Checksum: 43981, - }, - } + for _, tc := range []struct { + srcEnc, dstEnc compression.Encoding + }{ + {compression.EncNone, compression.EncNone}, + {compression.EncGZIP, compression.EncGZIP}, + {compression.EncSnappy, compression.EncSnappy}, + {compression.EncLZ4_64k, compression.EncLZ4_4M}, + {compression.EncLZ4_256k, compression.EncLZ4_4M}, + {compression.EncLZ4_1M, compression.EncLZ4_4M}, + {compression.EncLZ4_4M, compression.EncLZ4_4M}, + {compression.EncFlate, compression.EncFlate}, + {compression.EncZstd, compression.EncZstd}, + } { + t.Run(tc.srcEnc.String(), func(t *testing.T) { + r := defaultKeyResolver{} + ref := BlockRef{ + Ref: Ref{ + TenantID: "tenant", + TableName: "table_1", + Bounds: v1.NewBounds(0x0000, 0xffff), + StartTimestamp: 0, + EndTimestamp: 3600000, + Checksum: 43981, + }, + Encoding: tc.srcEnc, + } - // encode block ref as string - loc := r.Block(ref) - path := loc.LocalPath() - require.Equal(t, "bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-abcd.tar.gz", path) + // encode block ref as string + loc := r.Block(ref) + path := loc.LocalPath() + fn := "bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-abcd" + require.Equal(t, fn+blockExtension+compression.ToFileExtension(tc.srcEnc), path) + + // parse encoded string into block ref + parsed, err := r.ParseBlockKey(key(path)) + require.NoError(t, err) + expected := BlockRef{ + Ref: ref.Ref, + Encoding: tc.dstEnc, + } + require.Equal(t, expected, parsed) + }) + } - // parse encoded string into block ref - parsed, err := r.ParseBlockKey(key(path)) - require.NoError(t, err) - require.Equal(t, ref, parsed) } func TestResolver_ShardedPrefixedResolver(t *testing.T) { @@ -87,7 +111,7 @@ func TestResolver_ShardedPrefixedResolver(t *testing.T) { loc := r.Meta(metaRef) require.Equal(t, "prefix/bloom/table_1/tenant/metas/0000000000000000-000000000000ffff-abcd.json", loc.LocalPath()) loc = r.Block(blockRef) - require.Equal(t, "prefix/bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-bcde.tar.gz", loc.LocalPath()) + require.Equal(t, "prefix/bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-bcde.tar", loc.LocalPath()) }) t.Run("multiple prefixes", func(t *testing.T) { @@ -96,6 +120,6 @@ func TestResolver_ShardedPrefixedResolver(t *testing.T) { loc := r.Meta(metaRef) require.Equal(t, "b/bloom/table_1/tenant/metas/0000000000000000-000000000000ffff-abcd.json", loc.LocalPath()) loc = r.Block(blockRef) - require.Equal(t, "d/bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-bcde.tar.gz", loc.LocalPath()) + require.Equal(t, "d/bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-bcde.tar", loc.LocalPath()) }) } diff --git a/pkg/storage/stores/shipper/bloomshipper/store_test.go b/pkg/storage/stores/shipper/bloomshipper/store_test.go index 6a6705f8f0be..674e0c02a506 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/store_test.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/storage" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" @@ -109,13 +110,14 @@ func createMetaInStorage(store *BloomStore, tenant string, start model.Time, min func createBlockInStorage(t *testing.T, store *BloomStore, tenant string, start model.Time, minFp, maxFp model.Fingerprint) (Block, error) { tmpDir := t.TempDir() - fp, _ := os.CreateTemp(t.TempDir(), "*.tar.gz") + fp, _ := os.CreateTemp(t.TempDir(), "*.tar") blockWriter := v1.NewDirectoryBlockWriter(tmpDir) err := blockWriter.Init() require.NoError(t, err) - err = v1.TarGz(fp, v1.NewDirectoryBlockReader(tmpDir)) + enc := compression.EncGZIP + err = v1.TarCompress(enc, fp, v1.NewDirectoryBlockReader(tmpDir)) require.NoError(t, err) _, _ = fp.Seek(0, 0) @@ -128,6 +130,7 @@ func createBlockInStorage(t *testing.T, store *BloomStore, tenant string, start StartTimestamp: start, EndTimestamp: start.Add(12 * time.Hour), }, + Encoding: enc, }, Data: fp, }