Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Add tenant_folders option to filesystem store #4333

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/loki/loki-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ storage_config:
shared_store: filesystem
filesystem:
directory: /tmp/loki/chunks
tenant_folders: true

compactor:
working_directory: /tmp/loki/boltdb-shipper-compactor
Expand Down
3 changes: 3 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1429,6 +1429,9 @@ filesystem:
# Directory to store chunks in.
# CLI flag: -local.chunk-directory
directory: <string>
# Store chunks in per-tenant folders.
# CLI flag: -local.chunk-tenant-folders
[tenant_folders: <boolean> | default = false]

# Configures storing index in an Object Store(GCS/S3/Azure/Swift/Filesystem) in the form of boltdb files.
# Required fields only required when boltdb-shipper is defined in config.
Expand Down
5 changes: 3 additions & 2 deletions docs/sources/operations/storage/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ storage_config:
directory: /tmp/loki/
```

A folder is created for every tenant all the chunks for one tenant are stored in that directory.
By default all chunks are stored in the given directory.
Set `tenant_folders` to true to separate chunks of tenants into their own folders.

If loki is run in single-tenant mode, all the chunks are put in a folder named `fake` which is the synthesized tenant name used for single tenant mode.
If Loki is run in single-tenant mode, the synthesized tenant name `fake` is used for all chunks.

See [multi-tenancy](../../multi-tenancy/) for more information.

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk/aws/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ var Fixtures = []testutils.Fixture{
schemaCfg: schemaConfig,
metrics: newMetrics(nil),
}
object := objectclient.NewClient(&S3ObjectClient{S3: newMockS3()}, nil)
object := objectclient.NewClient(&S3ObjectClient{S3: newMockS3()})
return index, object, table, schemaConfig, testutils.CloserFunc(func() error {
table.Stop()
index.Stop()
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk/gcp/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (f *fixture) Clients() (
}

if f.gcsObjectClient {
cClient = objectclient.NewClient(newGCSObjectClient(GCSConfig{BucketName: "chunks"}, f.gcssrv.Client()), nil)
cClient = objectclient.NewClient(newGCSObjectClient(GCSConfig{BucketName: "chunks"}, f.gcssrv.Client()))
} else {
cClient = newBigtableObjectClient(Config{}, schemaConfig, client)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk/local/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (f *fixture) Clients() (
return
}

chunkClient = objectclient.NewClient(oClient, objectclient.Base64Encoder)
chunkClient = objectclient.NewClient(oClient)

tableClient, err = NewTableClient(f.dirname)
if err != nil {
Expand Down
12 changes: 11 additions & 1 deletion pkg/storage/chunk/local/fs_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ import (
util_log "github.com/cortexproject/cortex/pkg/util/log"

"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/objectclient"
"github.com/grafana/loki/pkg/storage/chunk/util"
)

// FSConfig is the config for a FSObjectClient.
type FSConfig struct {
Directory string `yaml:"directory"`
Directory string `yaml:"directory"`
TenantFolders bool `yaml:"tenant_folders"`
}

// RegisterFlags registers flags.
Expand All @@ -32,6 +34,7 @@ func (cfg *FSConfig) RegisterFlags(f *flag.FlagSet) {
// RegisterFlags registers flags with prefix.
func (cfg *FSConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.Directory, prefix+"local.chunk-directory", "", "Directory to store chunks in.")
f.BoolVar(&cfg.TenantFolders, prefix+"local.chunk-tenant-folders", false, "Store chunks in per-tenant folders")
}

// FSObjectClient holds config for filesystem as object store
Expand Down Expand Up @@ -59,6 +62,13 @@ func NewFSObjectClient(cfg FSConfig) (*FSObjectClient, error) {
// Stop implements ObjectClient
func (FSObjectClient) Stop() {}

func (f *FSObjectClient) KeyEncoder() chunk.KeyEncoder {
if f.cfg.TenantFolders {
return objectclient.SlashBase64Encoder
}
return objectclient.Base64Encoder
}

// GetObject from the store
func (f *FSObjectClient) GetObject(_ context.Context, objectKey string) (io.ReadCloser, error) {
fl, err := os.Open(filepath.Join(f.cfg.Directory, filepath.FromSlash(objectKey)))
Expand Down
30 changes: 24 additions & 6 deletions pkg/storage/chunk/local/fs_object_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/util"
)

func TestFSObjectClient_DeleteChunksBefore(t *testing.T) {
func deleteChunksBefore(t *testing.T, tenantFolders bool) {
deleteFilesOlderThan := 10 * time.Minute

fsChunksDir, err := ioutil.TempDir(os.TempDir(), "fs-chunks")
require.NoError(t, err)

bucketClient, err := NewFSObjectClient(FSConfig{
Directory: fsChunksDir,
Directory: fsChunksDir,
TenantFolders: tenantFolders,
})
require.NoError(t, err)

Expand Down Expand Up @@ -63,12 +64,18 @@ func TestFSObjectClient_DeleteChunksBefore(t *testing.T) {
require.Equal(t, 1, len(files), "Number of files should be 1 after enforcing retention")
}

func TestFSObjectClient_List(t *testing.T) {
func TestFSObjectClient_DeleteChunksBefore(t *testing.T) {
deleteChunksBefore(t, false)
deleteChunksBefore(t, true)
}

func list(t *testing.T, tenantFolders bool) {
fsObjectsDir, err := ioutil.TempDir(os.TempDir(), "fs-objects")
require.NoError(t, err)

bucketClient, err := NewFSObjectClient(FSConfig{
Directory: fsObjectsDir,
Directory: fsObjectsDir,
TenantFolders: tenantFolders,
})
require.NoError(t, err)

Expand Down Expand Up @@ -165,12 +172,18 @@ func TestFSObjectClient_List(t *testing.T) {
require.Empty(t, commonPrefixes)
}

func TestFSObjectClient_DeleteObject(t *testing.T) {
func TestFSObjectClient_List(t *testing.T) {
list(t, false)
list(t, true)
}

func deleteObject(t *testing.T, tenantFolders bool) {
fsObjectsDir, err := ioutil.TempDir(os.TempDir(), "fs-delete-object")
require.NoError(t, err)

bucketClient, err := NewFSObjectClient(FSConfig{
Directory: fsObjectsDir,
Directory: fsObjectsDir,
TenantFolders: tenantFolders,
})
require.NoError(t, err)

Expand Down Expand Up @@ -215,3 +228,8 @@ func TestFSObjectClient_DeleteObject(t *testing.T) {
require.Len(t, commonPrefixes, 0)
require.Len(t, files, len(foldersWithFiles["folder2/"]))*/
}

func TestFSObjectClient_DeleteObject(t *testing.T) {
deleteObject(t, false)
deleteObject(t, true)
}
26 changes: 20 additions & 6 deletions pkg/storage/chunk/objectclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,45 @@ import (
"context"
"encoding/base64"
"io/ioutil"
"strings"

"github.com/pkg/errors"

"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/util"
)

// KeyEncoder is used to encode chunk keys before writing/retrieving chunks
// from the underlying ObjectClient
type KeyEncoder func(string) string

// Base64Encoder is used to encode chunk keys in base64 before storing/retrieving
// them from the ObjectClient
var Base64Encoder = func(key string) string {
return base64.StdEncoding.EncodeToString([]byte(key))
}

// SlashBase64Encoder is a variation of Base64Encoder that preserves all "/"
// in the key by base64 encoding the strings separately
var SlashBase64Encoder = func(key string) string {
parts := bytes.Split([]byte(key), []byte("/"))
ret := make([]string, len(parts))
for i, part := range parts {
ret[i] = base64.URLEncoding.EncodeToString(part)
}
return strings.Join(ret, "/")

}

// Client is used to store chunks in object store backends
type Client struct {
store chunk.ObjectClient
keyEncoder KeyEncoder
keyEncoder chunk.KeyEncoder
}

// NewClient wraps the provided ObjectClient with a chunk.Client implementation
func NewClient(store chunk.ObjectClient, encoder KeyEncoder) *Client {
func NewClient(store chunk.ObjectClient) *Client {
var encoder chunk.KeyEncoder
// check if store provides a KeyEncoder
if encoderClient, ok := store.(chunk.EncoderObjectClient); ok {
encoder = encoderClient.KeyEncoder()
}
return &Client{
store: store,
keyEncoder: encoder,
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/chunk/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, regis
if err != nil {
return nil, err
}
return objectclient.NewClient(store, objectclient.Base64Encoder), nil
return objectclient.NewClient(store), nil
case StorageTypeGrpc:
return grpc.NewStorageClient(cfg.GrpcConfig, schemaCfg)
default:
Expand All @@ -310,7 +310,7 @@ func newChunkClientFromStore(store chunk.ObjectClient, err error) (chunk.Client,
if err != nil {
return nil, err
}
return objectclient.NewClient(store, nil), nil
return objectclient.NewClient(store), nil
}

// NewTableClient makes a new table client based on the configuration.
Expand Down
11 changes: 11 additions & 0 deletions pkg/storage/chunk/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ type ObjectClient interface {
Stop()
}

// KeyEncoder is used to encode chunk keys before writing/retrieving chunks
// from the underlying ObjectClient
type KeyEncoder func(string) string

// EncoderObjectClient is a specialization of the ObjectClient interface that implements encoding schemes for object keys
type EncoderObjectClient interface {
ObjectClient
// Returns the KeyEncoder function as defined/required by the ObjectClient
KeyEncoder() KeyEncoder
}

// StorageObject represents an object being stored in an Object Store
type StorageObject struct {
Key string
Expand Down
8 changes: 1 addition & 7 deletions pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/prometheus/common/model"

loki_storage "github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk/local"
"github.com/grafana/loki/pkg/storage/chunk/objectclient"
"github.com/grafana/loki/pkg/storage/chunk/storage"
chunk_util "github.com/grafana/loki/pkg/storage/chunk/util"
Expand Down Expand Up @@ -109,12 +108,7 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig loki_storage
c.metrics = newMetrics(r)

if c.cfg.RetentionEnabled {
var encoder objectclient.KeyEncoder
if _, ok := objectClient.(*local.FSObjectClient); ok {
encoder = objectclient.Base64Encoder
}

chunkClient := objectclient.NewClient(objectClient, encoder)
chunkClient := objectclient.NewClient(objectClient)

retentionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "retention")
c.sweeper, err = retention.NewSweeper(retentionWorkDir, chunkClient, c.cfg.RetentionDeleteWorkCount, c.cfg.RetentionDeleteDelay, r)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func TestChunkRewriter(t *testing.T) {
require.NoError(t, store.Put(context.TODO(), []chunk.Chunk{tt.chunk}))
store.Stop()

chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir), objectclient.Base64Encoder)
chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir))
for _, indexTable := range store.indexTables() {
err := indexTable.DB.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(bucketName)
Expand Down Expand Up @@ -618,7 +618,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
tables := store.indexTables()
require.Len(t, tables, len(tc.expectedDeletedSeries))

chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir), objectclient.Base64Encoder)
chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir))

for i, table := range tables {
seriesCleanRecorder := newSeriesCleanRecorder()
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/stores/shipper/compactor/retention/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ type testObjectClient struct {
path string
}

func (c *testObjectClient) KeyEncoder() chunk.KeyEncoder {
var encoder chunk.KeyEncoder
if encoderClient, ok := c.ObjectClient.(chunk.EncoderObjectClient); ok {
encoder = encoderClient.KeyEncoder()
}
return encoder
}

func newTestObjectClient(path string) chunk.ObjectClient {
c, err := chunk_storage.NewObjectClient("filesystem", chunk_storage.Config{
FSConfig: local.FSConfig{
Expand Down