Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Add tenant_folders option to filesystem store #4333

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/loki/loki-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ storage_config:
shared_store: filesystem
filesystem:
directory: /tmp/loki/chunks
tenant_folders: true

compactor:
working_directory: /tmp/loki/boltdb-shipper-compactor
Expand Down
3 changes: 3 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1429,6 +1429,9 @@ filesystem:
# Directory to store chunks in.
# CLI flag: -local.chunk-directory
directory: <string>
# Store chunks in per-tenant folders.
# CLI flag: -local.chunk-tenant-folders
[tenant_folders: <boolean> | default = false]

# Configures storing index in an Object Store(GCS/S3/Azure/Swift/Filesystem) in the form of boltdb files.
# Required fields only required when boltdb-shipper is defined in config.
Expand Down
5 changes: 3 additions & 2 deletions docs/sources/operations/storage/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ storage_config:
directory: /tmp/loki/
```

A folder is created for every tenant all the chunks for one tenant are stored in that directory.
By default all chunks are stored in the given directory.
Enable the `tenant_folders` option to separate chunks of tenants into their own folders.
jfolz marked this conversation as resolved.
Show resolved Hide resolved

If loki is run in single-tenant mode, all the chunks are put in a folder named `fake` which is the synthesized tenant name used for single tenant mode.
If loki is run in single-tenant mode, the synthesized tenant name `fake` is used for all chunks.
jfolz marked this conversation as resolved.
Show resolved Hide resolved

See [multi-tenancy](../../multi-tenancy/) for more information.

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk/aws/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ var Fixtures = []testutils.Fixture{
schemaCfg: schemaConfig,
metrics: newMetrics(nil),
}
object := objectclient.NewClient(&S3ObjectClient{S3: newMockS3()}, nil)
object := objectclient.NewClient(&S3ObjectClient{S3: newMockS3()})
return index, object, table, schemaConfig, testutils.CloserFunc(func() error {
table.Stop()
index.Stop()
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk/gcp/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (f *fixture) Clients() (
}

if f.gcsObjectClient {
cClient = objectclient.NewClient(newGCSObjectClient(GCSConfig{BucketName: "chunks"}, f.gcssrv.Client()), nil)
cClient = objectclient.NewClient(newGCSObjectClient(GCSConfig{BucketName: "chunks"}, f.gcssrv.Client()))
} else {
cClient = newBigtableObjectClient(Config{}, schemaConfig, client)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk/local/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (f *fixture) Clients() (
return
}

chunkClient = objectclient.NewClient(oClient, objectclient.Base64Encoder)
chunkClient = objectclient.NewClient(oClient)

tableClient, err = NewTableClient(f.dirname)
if err != nil {
Expand Down
12 changes: 11 additions & 1 deletion pkg/storage/chunk/local/fs_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ import (
util_log "github.com/cortexproject/cortex/pkg/util/log"

"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/objectclient"
"github.com/grafana/loki/pkg/storage/chunk/util"
)

// FSConfig is the config for a FSObjectClient.
type FSConfig struct {
Directory string `yaml:"directory"`
Directory string `yaml:"directory"`
TenantFolders bool `yaml:"tenant_folders"`
}

// RegisterFlags registers flags.
Expand All @@ -32,6 +34,7 @@ func (cfg *FSConfig) RegisterFlags(f *flag.FlagSet) {
// RegisterFlags registers flags with prefix.
func (cfg *FSConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.Directory, prefix+"local.chunk-directory", "", "Directory to store chunks in.")
f.BoolVar(&cfg.TenantFolders, prefix+"local.chunk-tenant-folders", false, "Store chunks in per-tenant folders")
}

// FSObjectClient holds config for filesystem as object store
Expand Down Expand Up @@ -59,6 +62,13 @@ func NewFSObjectClient(cfg FSConfig) (*FSObjectClient, error) {
// Stop implements ObjectClient
func (FSObjectClient) Stop() {}

func (f *FSObjectClient) KeyEncoder() objectclient.KeyEncoder {
if f.cfg.TenantFolders {
return objectclient.TenantBase64Encoder
}
return objectclient.Base64Encoder
}

// GetObject from the store
func (f *FSObjectClient) GetObject(_ context.Context, objectKey string) (io.ReadCloser, error) {
fl, err := os.Open(filepath.Join(f.cfg.Directory, filepath.FromSlash(objectKey)))
Expand Down
30 changes: 24 additions & 6 deletions pkg/storage/chunk/local/fs_object_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/util"
)

func TestFSObjectClient_DeleteChunksBefore(t *testing.T) {
func deleteChunksBefore(t *testing.T, tenantFolders bool) {
deleteFilesOlderThan := 10 * time.Minute

fsChunksDir, err := ioutil.TempDir(os.TempDir(), "fs-chunks")
require.NoError(t, err)

bucketClient, err := NewFSObjectClient(FSConfig{
Directory: fsChunksDir,
Directory: fsChunksDir,
TenantFolders: tenantFolders,
})
require.NoError(t, err)

Expand Down Expand Up @@ -63,12 +64,18 @@ func TestFSObjectClient_DeleteChunksBefore(t *testing.T) {
require.Equal(t, 1, len(files), "Number of files should be 1 after enforcing retention")
}

func TestFSObjectClient_List(t *testing.T) {
func TestFSObjectClient_DeleteChunksBefore(t *testing.T) {
deleteChunksBefore(t, false)
deleteChunksBefore(t, true)
}

func list(t *testing.T, tenantFolders bool) {
fsObjectsDir, err := ioutil.TempDir(os.TempDir(), "fs-objects")
require.NoError(t, err)

bucketClient, err := NewFSObjectClient(FSConfig{
Directory: fsObjectsDir,
Directory: fsObjectsDir,
TenantFolders: tenantFolders,
})
require.NoError(t, err)

Expand Down Expand Up @@ -165,12 +172,18 @@ func TestFSObjectClient_List(t *testing.T) {
require.Empty(t, commonPrefixes)
}

func TestFSObjectClient_DeleteObject(t *testing.T) {
func TestFSObjectClient_List(t *testing.T) {
list(t, false)
list(t, true)
}

func deleteObject(t *testing.T, tenantFolders bool) {
fsObjectsDir, err := ioutil.TempDir(os.TempDir(), "fs-delete-object")
require.NoError(t, err)

bucketClient, err := NewFSObjectClient(FSConfig{
Directory: fsObjectsDir,
Directory: fsObjectsDir,
TenantFolders: tenantFolders,
})
require.NoError(t, err)

Expand Down Expand Up @@ -215,3 +228,8 @@ func TestFSObjectClient_DeleteObject(t *testing.T) {
require.Len(t, commonPrefixes, 0)
require.Len(t, files, len(foldersWithFiles["folder2/"]))*/
}

func TestFSObjectClient_DeleteObject(t *testing.T) {
deleteObject(t, false)
deleteObject(t, true)
}
29 changes: 28 additions & 1 deletion pkg/storage/chunk/objectclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"context"
"encoding/base64"
"io/ioutil"
"reflect"
"strings"

"github.com/pkg/errors"

Expand All @@ -22,14 +24,39 @@ var Base64Encoder = func(key string) string {
return base64.StdEncoding.EncodeToString([]byte(key))
}

// TenantBase64Encoder is a variation of Base64Encoder that encodes tenant
// and remainder of the key separately and returns the results joined by "/"
var TenantBase64Encoder = func(key string) string {
data := []byte(key)
if i := bytes.LastIndex(data, []byte("/")); i > 0 {
jfolz marked this conversation as resolved.
Show resolved Hide resolved
return strings.Join([]string{
base64.URLEncoding.EncodeToString(data[:i]),
base64.URLEncoding.EncodeToString(data[i+1:]),
}, "/")
}
return base64.URLEncoding.EncodeToString(data)
}

// Client is used to store chunks in object store backends
type Client struct {
store chunk.ObjectClient
keyEncoder KeyEncoder
}

// NewClient wraps the provided ObjectClient with a chunk.Client implementation
func NewClient(store chunk.ObjectClient, encoder KeyEncoder) *Client {
func NewClient(store chunk.ObjectClient) *Client {
var encoder KeyEncoder = nil
jfolz marked this conversation as resolved.
Show resolved Hide resolved
// check if store provides a KeyEncoder
var ok bool
method := reflect.ValueOf(&store).MethodByName("KeyEncoder")
jfolz marked this conversation as resolved.
Show resolved Hide resolved
if method.IsValid() {
for _, v := range method.Call([]reflect.Value{}) {
if encoder, ok = v.Interface().(KeyEncoder); !ok {
encoder = nil
}
}
}

return &Client{
store: store,
keyEncoder: encoder,
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/chunk/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, regis
if err != nil {
return nil, err
}
return objectclient.NewClient(store, objectclient.Base64Encoder), nil
return objectclient.NewClient(store), nil
case StorageTypeGrpc:
return grpc.NewStorageClient(cfg.GrpcConfig, schemaCfg)
default:
Expand All @@ -310,7 +310,7 @@ func newChunkClientFromStore(store chunk.ObjectClient, err error) (chunk.Client,
if err != nil {
return nil, err
}
return objectclient.NewClient(store, nil), nil
return objectclient.NewClient(store), nil
}

// NewTableClient makes a new table client based on the configuration.
Expand Down
8 changes: 1 addition & 7 deletions pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/prometheus/common/model"

loki_storage "github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk/local"
"github.com/grafana/loki/pkg/storage/chunk/objectclient"
"github.com/grafana/loki/pkg/storage/chunk/storage"
chunk_util "github.com/grafana/loki/pkg/storage/chunk/util"
Expand Down Expand Up @@ -109,12 +108,7 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig loki_storage
c.metrics = newMetrics(r)

if c.cfg.RetentionEnabled {
var encoder objectclient.KeyEncoder
if _, ok := objectClient.(*local.FSObjectClient); ok {
encoder = objectclient.Base64Encoder
}

chunkClient := objectclient.NewClient(objectClient, encoder)
chunkClient := objectclient.NewClient(objectClient)

retentionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "retention")
c.sweeper, err = retention.NewSweeper(retentionWorkDir, chunkClient, c.cfg.RetentionDeleteWorkCount, c.cfg.RetentionDeleteDelay, r)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func TestChunkRewriter(t *testing.T) {
require.NoError(t, store.Put(context.TODO(), []chunk.Chunk{tt.chunk}))
store.Stop()

chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir), objectclient.Base64Encoder)
chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir))
for _, indexTable := range store.indexTables() {
err := indexTable.DB.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(bucketName)
Expand Down Expand Up @@ -618,7 +618,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
tables := store.indexTables()
require.Len(t, tables, len(tc.expectedDeletedSeries))

chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir), objectclient.Base64Encoder)
chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir))

for i, table := range tables {
seriesCleanRecorder := newSeriesCleanRecorder()
Expand Down