diff --git a/cmd/loki/loki-local-config.yaml b/cmd/loki/loki-local-config.yaml index 529644b304cfb..f1cf3d00734b5 100644 --- a/cmd/loki/loki-local-config.yaml +++ b/cmd/loki/loki-local-config.yaml @@ -39,6 +39,7 @@ storage_config: shared_store: filesystem filesystem: directory: /tmp/loki/chunks + tenant_folders: true compactor: working_directory: /tmp/loki/boltdb-shipper-compactor diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index e9351ba10f9ff..a71ae3e1a94eb 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -1429,6 +1429,9 @@ filesystem: # Directory to store chunks in. # CLI flag: -local.chunk-directory directory: + # Store chunks in per-tenant folders. + # CLI flag: -local.chunk-tenant-folders + [tenant_folders: | default = false] # Configures storing index in an Object Store(GCS/S3/Azure/Swift/Filesystem) in the form of boltdb files. # Required fields only required when boltdb-shipper is defined in config. diff --git a/docs/sources/operations/storage/filesystem.md b/docs/sources/operations/storage/filesystem.md index 95d0cda2c5222..a6b939c6f119e 100644 --- a/docs/sources/operations/storage/filesystem.md +++ b/docs/sources/operations/storage/filesystem.md @@ -13,9 +13,10 @@ storage_config: directory: /tmp/loki/ ``` -A folder is created for every tenant all the chunks for one tenant are stored in that directory. +By default all chunks are stored in the given directory. +Set `tenant_folders` to true to separate chunks of tenants into their own folders. -If loki is run in single-tenant mode, all the chunks are put in a folder named `fake` which is the synthesized tenant name used for single tenant mode. +If Loki is run in single-tenant mode, the synthesized tenant name `fake` is used for all chunks. See [multi-tenancy](../../multi-tenancy/) for more information. diff --git a/pkg/storage/chunk/aws/fixtures.go b/pkg/storage/chunk/aws/fixtures.go index 36aa4eefe395e..76b687c0a5e49 100644 --- a/pkg/storage/chunk/aws/fixtures.go +++ b/pkg/storage/chunk/aws/fixtures.go @@ -44,7 +44,7 @@ var Fixtures = []testutils.Fixture{ schemaCfg: schemaConfig, metrics: newMetrics(nil), } - object := objectclient.NewClient(&S3ObjectClient{S3: newMockS3()}, nil) + object := objectclient.NewClient(&S3ObjectClient{S3: newMockS3()}) return index, object, table, schemaConfig, testutils.CloserFunc(func() error { table.Stop() index.Stop() diff --git a/pkg/storage/chunk/gcp/fixtures.go b/pkg/storage/chunk/gcp/fixtures.go index 9b525490c386c..7393ae6762be8 100644 --- a/pkg/storage/chunk/gcp/fixtures.go +++ b/pkg/storage/chunk/gcp/fixtures.go @@ -78,7 +78,7 @@ func (f *fixture) Clients() ( } if f.gcsObjectClient { - cClient = objectclient.NewClient(newGCSObjectClient(GCSConfig{BucketName: "chunks"}, f.gcssrv.Client()), nil) + cClient = objectclient.NewClient(newGCSObjectClient(GCSConfig{BucketName: "chunks"}, f.gcssrv.Client())) } else { cClient = newBigtableObjectClient(Config{}, schemaConfig, client) } diff --git a/pkg/storage/chunk/local/fixtures.go b/pkg/storage/chunk/local/fixtures.go index d2703808c2797..98e84e75a5c24 100644 --- a/pkg/storage/chunk/local/fixtures.go +++ b/pkg/storage/chunk/local/fixtures.go @@ -43,7 +43,7 @@ func (f *fixture) Clients() ( return } - chunkClient = objectclient.NewClient(oClient, objectclient.Base64Encoder) + chunkClient = objectclient.NewClient(oClient) tableClient, err = NewTableClient(f.dirname) if err != nil { diff --git a/pkg/storage/chunk/local/fs_object_client.go b/pkg/storage/chunk/local/fs_object_client.go index 6f733110c72ad..16e99a881fddd 100644 --- a/pkg/storage/chunk/local/fs_object_client.go +++ b/pkg/storage/chunk/local/fs_object_client.go @@ -16,12 +16,14 @@ import ( util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/chunk/objectclient" "github.com/grafana/loki/pkg/storage/chunk/util" ) // FSConfig is the config for a FSObjectClient. type FSConfig struct { - Directory string `yaml:"directory"` + Directory string `yaml:"directory"` + TenantFolders bool `yaml:"tenant_folders"` } // RegisterFlags registers flags. @@ -32,6 +34,7 @@ func (cfg *FSConfig) RegisterFlags(f *flag.FlagSet) { // RegisterFlags registers flags with prefix. func (cfg *FSConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&cfg.Directory, prefix+"local.chunk-directory", "", "Directory to store chunks in.") + f.BoolVar(&cfg.TenantFolders, prefix+"local.chunk-tenant-folders", false, "Store chunks in per-tenant folders") } // FSObjectClient holds config for filesystem as object store @@ -59,6 +62,13 @@ func NewFSObjectClient(cfg FSConfig) (*FSObjectClient, error) { // Stop implements ObjectClient func (FSObjectClient) Stop() {} +func (f *FSObjectClient) KeyEncoder() chunk.KeyEncoder { + if f.cfg.TenantFolders { + return objectclient.SlashBase64Encoder + } + return objectclient.Base64Encoder +} + // GetObject from the store func (f *FSObjectClient) GetObject(_ context.Context, objectKey string) (io.ReadCloser, error) { fl, err := os.Open(filepath.Join(f.cfg.Directory, filepath.FromSlash(objectKey))) diff --git a/pkg/storage/chunk/local/fs_object_client_test.go b/pkg/storage/chunk/local/fs_object_client_test.go index 1e3537377141a..6a3d4b69c18f2 100644 --- a/pkg/storage/chunk/local/fs_object_client_test.go +++ b/pkg/storage/chunk/local/fs_object_client_test.go @@ -16,14 +16,15 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/util" ) -func TestFSObjectClient_DeleteChunksBefore(t *testing.T) { +func deleteChunksBefore(t *testing.T, tenantFolders bool) { deleteFilesOlderThan := 10 * time.Minute fsChunksDir, err := ioutil.TempDir(os.TempDir(), "fs-chunks") require.NoError(t, err) bucketClient, err := NewFSObjectClient(FSConfig{ - Directory: fsChunksDir, + Directory: fsChunksDir, + TenantFolders: tenantFolders, }) require.NoError(t, err) @@ -63,12 +64,18 @@ func TestFSObjectClient_DeleteChunksBefore(t *testing.T) { require.Equal(t, 1, len(files), "Number of files should be 1 after enforcing retention") } -func TestFSObjectClient_List(t *testing.T) { +func TestFSObjectClient_DeleteChunksBefore(t *testing.T) { + deleteChunksBefore(t, false) + deleteChunksBefore(t, true) +} + +func list(t *testing.T, tenantFolders bool) { fsObjectsDir, err := ioutil.TempDir(os.TempDir(), "fs-objects") require.NoError(t, err) bucketClient, err := NewFSObjectClient(FSConfig{ - Directory: fsObjectsDir, + Directory: fsObjectsDir, + TenantFolders: tenantFolders, }) require.NoError(t, err) @@ -165,12 +172,18 @@ func TestFSObjectClient_List(t *testing.T) { require.Empty(t, commonPrefixes) } -func TestFSObjectClient_DeleteObject(t *testing.T) { +func TestFSObjectClient_List(t *testing.T) { + list(t, false) + list(t, true) +} + +func deleteObject(t *testing.T, tenantFolders bool) { fsObjectsDir, err := ioutil.TempDir(os.TempDir(), "fs-delete-object") require.NoError(t, err) bucketClient, err := NewFSObjectClient(FSConfig{ - Directory: fsObjectsDir, + Directory: fsObjectsDir, + TenantFolders: tenantFolders, }) require.NoError(t, err) @@ -215,3 +228,8 @@ func TestFSObjectClient_DeleteObject(t *testing.T) { require.Len(t, commonPrefixes, 0) require.Len(t, files, len(foldersWithFiles["folder2/"]))*/ } + +func TestFSObjectClient_DeleteObject(t *testing.T) { + deleteObject(t, false) + deleteObject(t, true) +} diff --git a/pkg/storage/chunk/objectclient/client.go b/pkg/storage/chunk/objectclient/client.go index 395b5f41cbad6..592541c787878 100644 --- a/pkg/storage/chunk/objectclient/client.go +++ b/pkg/storage/chunk/objectclient/client.go @@ -5,6 +5,7 @@ import ( "context" "encoding/base64" "io/ioutil" + "strings" "github.com/pkg/errors" @@ -12,24 +13,37 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/util" ) -// KeyEncoder is used to encode chunk keys before writing/retrieving chunks -// from the underlying ObjectClient -type KeyEncoder func(string) string - // Base64Encoder is used to encode chunk keys in base64 before storing/retrieving // them from the ObjectClient var Base64Encoder = func(key string) string { return base64.StdEncoding.EncodeToString([]byte(key)) } +// SlashBase64Encoder is a variation of Base64Encoder that preserves all "/" +// in the key by base64 encoding the strings separately +var SlashBase64Encoder = func(key string) string { + parts := bytes.Split([]byte(key), []byte("/")) + ret := make([]string, len(parts)) + for i, part := range parts { + ret[i] = base64.URLEncoding.EncodeToString(part) + } + return strings.Join(ret, "/") + +} + // Client is used to store chunks in object store backends type Client struct { store chunk.ObjectClient - keyEncoder KeyEncoder + keyEncoder chunk.KeyEncoder } // NewClient wraps the provided ObjectClient with a chunk.Client implementation -func NewClient(store chunk.ObjectClient, encoder KeyEncoder) *Client { +func NewClient(store chunk.ObjectClient) *Client { + var encoder chunk.KeyEncoder + // check if store provides a KeyEncoder + if encoderClient, ok := store.(chunk.EncoderObjectClient); ok { + encoder = encoderClient.KeyEncoder() + } return &Client{ store: store, keyEncoder: encoder, diff --git a/pkg/storage/chunk/storage/factory.go b/pkg/storage/chunk/storage/factory.go index 7c90a64ca1e25..afc664e47d2f4 100644 --- a/pkg/storage/chunk/storage/factory.go +++ b/pkg/storage/chunk/storage/factory.go @@ -298,7 +298,7 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, regis if err != nil { return nil, err } - return objectclient.NewClient(store, objectclient.Base64Encoder), nil + return objectclient.NewClient(store), nil case StorageTypeGrpc: return grpc.NewStorageClient(cfg.GrpcConfig, schemaCfg) default: @@ -310,7 +310,7 @@ func newChunkClientFromStore(store chunk.ObjectClient, err error) (chunk.Client, if err != nil { return nil, err } - return objectclient.NewClient(store, nil), nil + return objectclient.NewClient(store), nil } // NewTableClient makes a new table client based on the configuration. diff --git a/pkg/storage/chunk/storage_client.go b/pkg/storage/chunk/storage_client.go index 7314e5a29aa90..0e8578b1adebb 100644 --- a/pkg/storage/chunk/storage_client.go +++ b/pkg/storage/chunk/storage_client.go @@ -79,6 +79,17 @@ type ObjectClient interface { Stop() } +// KeyEncoder is used to encode chunk keys before writing/retrieving chunks +// from the underlying ObjectClient +type KeyEncoder func(string) string + +// EncoderObjectClient is a specialization of the ObjectClient interface that implements encoding schemes for object keys +type EncoderObjectClient interface { + ObjectClient + // Returns the KeyEncoder function as defined/required by the ObjectClient + KeyEncoder() KeyEncoder +} + // StorageObject represents an object being stored in an Object Store type StorageObject struct { Key string diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index 801f0b3940431..35abf14355bf9 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -16,7 +16,6 @@ import ( "github.com/prometheus/common/model" loki_storage "github.com/grafana/loki/pkg/storage" - "github.com/grafana/loki/pkg/storage/chunk/local" "github.com/grafana/loki/pkg/storage/chunk/objectclient" "github.com/grafana/loki/pkg/storage/chunk/storage" chunk_util "github.com/grafana/loki/pkg/storage/chunk/util" @@ -109,12 +108,7 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig loki_storage c.metrics = newMetrics(r) if c.cfg.RetentionEnabled { - var encoder objectclient.KeyEncoder - if _, ok := objectClient.(*local.FSObjectClient); ok { - encoder = objectclient.Base64Encoder - } - - chunkClient := objectclient.NewClient(objectClient, encoder) + chunkClient := objectclient.NewClient(objectClient) retentionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "retention") c.sweeper, err = retention.NewSweeper(retentionWorkDir, chunkClient, c.cfg.RetentionDeleteWorkCount, c.cfg.RetentionDeleteDelay, r) diff --git a/pkg/storage/stores/shipper/compactor/retention/retention_test.go b/pkg/storage/stores/shipper/compactor/retention/retention_test.go index a0741ad6a051e..68b175f8a1aca 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention_test.go @@ -363,7 +363,7 @@ func TestChunkRewriter(t *testing.T) { require.NoError(t, store.Put(context.TODO(), []chunk.Chunk{tt.chunk})) store.Stop() - chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir), objectclient.Base64Encoder) + chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir)) for _, indexTable := range store.indexTables() { err := indexTable.DB.Update(func(tx *bbolt.Tx) error { bucket := tx.Bucket(bucketName) @@ -618,7 +618,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { tables := store.indexTables() require.Len(t, tables, len(tc.expectedDeletedSeries)) - chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir), objectclient.Base64Encoder) + chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir)) for i, table := range tables { seriesCleanRecorder := newSeriesCleanRecorder() diff --git a/pkg/storage/stores/shipper/compactor/retention/util_test.go b/pkg/storage/stores/shipper/compactor/retention/util_test.go index 9ba7d2bfec682..38d971f519db3 100644 --- a/pkg/storage/stores/shipper/compactor/retention/util_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/util_test.go @@ -125,6 +125,14 @@ type testObjectClient struct { path string } +func (c *testObjectClient) KeyEncoder() chunk.KeyEncoder { + var encoder chunk.KeyEncoder + if encoderClient, ok := c.ObjectClient.(chunk.EncoderObjectClient); ok { + encoder = encoderClient.KeyEncoder() + } + return encoder +} + func newTestObjectClient(path string) chunk.ObjectClient { c, err := chunk_storage.NewObjectClient("filesystem", chunk_storage.Config{ FSConfig: local.FSConfig{