diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index f2dceafd870d2..104e0cbead318 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -140,7 +140,7 @@ func New(cfg Config) (*Loki, error) { if err := loki.setupModuleManager(); err != nil { return nil, err } - storage.RegisterCustomIndexClients(cfg.StorageConfig, prometheus.DefaultRegisterer) + storage.RegisterCustomIndexClients(&loki.cfg.StorageConfig, prometheus.DefaultRegisterer) return loki, nil } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 08946de71d5a5..1fe4c0b50a5b3 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -37,7 +37,7 @@ import ( "github.com/grafana/loki/pkg/querier" "github.com/grafana/loki/pkg/querier/queryrange" loki_storage "github.com/grafana/loki/pkg/storage" - "github.com/grafana/loki/pkg/storage/stores/local" + "github.com/grafana/loki/pkg/storage/stores/shipper" serverutil "github.com/grafana/loki/pkg/util/server" "github.com/grafana/loki/pkg/util/validation" ) @@ -182,7 +182,7 @@ func (t *Loki) initIngester() (_ services.Service, err error) { // We want ingester to also query the store when using boltdb-shipper pc := t.cfg.SchemaConfig.Configs[activePeriodConfig(t.cfg.SchemaConfig)] - if pc.IndexType == local.BoltDBShipperType { + if pc.IndexType == shipper.BoltDBShipperType { t.cfg.Ingester.QueryStore = true mlb, err := calculateMaxLookBack(pc, t.cfg.Ingester.QueryStoreMaxLookBackPeriod, t.cfg.Ingester.MaxChunkAge) if err != nil { @@ -243,17 +243,17 @@ func (t *Loki) initTableManager() (services.Service, error) { } func (t *Loki) initStore() (_ services.Service, err error) { - if t.cfg.SchemaConfig.Configs[activePeriodConfig(t.cfg.SchemaConfig)].IndexType == local.BoltDBShipperType { + if t.cfg.SchemaConfig.Configs[activePeriodConfig(t.cfg.SchemaConfig)].IndexType == shipper.BoltDBShipperType { t.cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.cfg.Ingester.LifecyclerConfig.ID switch t.cfg.Target { case Ingester: // We do not want ingester to unnecessarily keep downloading files - t.cfg.StorageConfig.BoltDBShipperConfig.Mode = local.ShipperModeWriteOnly + t.cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeWriteOnly case Querier: // We do not want query to do any updates to index - t.cfg.StorageConfig.BoltDBShipperConfig.Mode = local.ShipperModeReadOnly + t.cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly default: - t.cfg.StorageConfig.BoltDBShipperConfig.Mode = local.ShipperModeReadWrite + t.cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadWrite } } @@ -370,8 +370,8 @@ func activePeriodConfig(cfg chunk.SchemaConfig) int { // usingBoltdbShipper check whether current or the next index type is boltdb-shipper, returns true if yes. func usingBoltdbShipper(cfg chunk.SchemaConfig) bool { activePCIndex := activePeriodConfig(cfg) - if cfg.Configs[activePCIndex].IndexType == local.BoltDBShipperType || - (len(cfg.Configs)-1 > activePCIndex && cfg.Configs[activePCIndex+1].IndexType == local.BoltDBShipperType) { + if cfg.Configs[activePCIndex].IndexType == shipper.BoltDBShipperType || + (len(cfg.Configs)-1 > activePCIndex && cfg.Configs[activePCIndex+1].IndexType == shipper.BoltDBShipperType) { return true } @@ -379,11 +379,11 @@ func usingBoltdbShipper(cfg chunk.SchemaConfig) bool { } func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, maxChunkAge time.Duration) (time.Duration, error) { - if pc.ObjectType != local.FilesystemObjectStoreType && maxLookBackConfig.Nanoseconds() != 0 { + if pc.ObjectType != shipper.FilesystemObjectStoreType && maxLookBackConfig.Nanoseconds() != 0 { return 0, errors.New("it is an error to specify a non zero `query_store_max_look_back_period` value when using any object store other than `filesystem`") } // When using shipper, limit max look back for query to MaxChunkAge + upload interval by shipper + 15 mins to query only data whose index is not pushed yet - defaultMaxLookBack := maxChunkAge + local.ShipperFileUploadInterval + (15 * time.Minute) + defaultMaxLookBack := maxChunkAge + shipper.UploadInterval + (15 * time.Minute) if maxLookBackConfig == 0 { // If the QueryStoreMaxLookBackPeriod is still it's default value of 0, set it to the default calculated value. diff --git a/pkg/storage/store.go b/pkg/storage/store.go index abe396ea2952f..22100aacd00e4 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -18,15 +18,15 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql/stats" - "github.com/grafana/loki/pkg/storage/stores/local" + "github.com/grafana/loki/pkg/storage/stores/shipper" "github.com/grafana/loki/pkg/util" ) // Config is the loki storage configuration type Config struct { storage.Config `yaml:",inline"` - MaxChunkBatchSize int `yaml:"max_chunk_batch_size"` - BoltDBShipperConfig local.ShipperConfig `yaml:"boltdb_shipper"` + MaxChunkBatchSize int `yaml:"max_chunk_batch_size"` + BoltDBShipperConfig shipper.Config `yaml:"boltdb_shipper"` } // RegisterFlags adds the flags required to configure this flag set. @@ -64,7 +64,7 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf // NewTableClient creates a TableClient for managing tables for index/chunk store. // ToDo: Add support in Cortex for registering custom table client like index client. func NewTableClient(name string, cfg Config) (chunk.TableClient, error) { - if name == local.BoltDBShipperType { + if name == shipper.BoltDBShipperType { name = "boltdb" cfg.FSConfig = cortex_local.FSConfig{Directory: cfg.BoltDBShipperConfig.ActiveIndexDirectory} } @@ -287,13 +287,13 @@ func filterChunksByTime(from, through model.Time, chunks []chunk.Chunk) []chunk. return filtered } -func RegisterCustomIndexClients(cfg Config, registerer prometheus.Registerer) { +func RegisterCustomIndexClients(cfg *Config, registerer prometheus.Registerer) { // BoltDB Shipper is supposed to be run as a singleton. // This could also be done in NewBoltDBIndexClientWithShipper factory method but we are doing it here because that method is used // in tests for creating multiple instances of it at a time. var boltDBIndexClientWithShipper chunk.IndexClient - storage.RegisterIndexStore(local.BoltDBShipperType, func() (chunk.IndexClient, error) { + storage.RegisterIndexStore(shipper.BoltDBShipperType, func() (chunk.IndexClient, error) { if boltDBIndexClientWithShipper != nil { return boltDBIndexClientWithShipper, nil } @@ -303,9 +303,7 @@ func RegisterCustomIndexClients(cfg Config, registerer prometheus.Registerer) { return nil, err } - boltDBIndexClientWithShipper, err = local.NewBoltDBIndexClientWithShipper( - cortex_local.BoltDBConfig{Directory: cfg.BoltDBShipperConfig.ActiveIndexDirectory}, - objectClient, cfg.BoltDBShipperConfig, registerer) + boltDBIndexClientWithShipper, err = shipper.NewShipper(cfg.BoltDBShipperConfig, objectClient, registerer) return boltDBIndexClientWithShipper, err }, func() (client chunk.TableClient, e error) { @@ -314,6 +312,6 @@ func RegisterCustomIndexClients(cfg Config, registerer prometheus.Registerer) { return nil, err } - return local.NewBoltDBShipperTableClient(objectClient), nil + return shipper.NewBoltDBShipperTableClient(objectClient), nil }) } diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index c668a7c4eca28..6060e7d0f2561 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -28,7 +28,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql/marshal" - "github.com/grafana/loki/pkg/storage/stores/local" + "github.com/grafana/loki/pkg/storage/stores/shipper" "github.com/grafana/loki/pkg/util/validation" ) @@ -731,7 +731,7 @@ func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) { require.NoError(t, err) // config for BoltDB Shipper - boltdbShipperConfig := local.ShipperConfig{} + boltdbShipperConfig := shipper.Config{} flagext.DefaultValues(&boltdbShipperConfig) boltdbShipperConfig.ActiveIndexDirectory = path.Join(tempDir, "index") boltdbShipperConfig.SharedStoreType = "filesystem" @@ -748,7 +748,7 @@ func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) { BoltDBShipperConfig: boltdbShipperConfig, } - RegisterCustomIndexClients(config, nil) + RegisterCustomIndexClients(&config, nil) store, err := NewStore(config, chunk.StoreConfig{}, chunk.SchemaConfig{ Configs: []chunk.PeriodConfig{ diff --git a/pkg/storage/stores/local/boltdb_index_client.go b/pkg/storage/stores/local/boltdb_index_client.go deleted file mode 100644 index 2fa21615fd46d..0000000000000 --- a/pkg/storage/stores/local/boltdb_index_client.go +++ /dev/null @@ -1,65 +0,0 @@ -package local - -import ( - "context" - - "github.com/cortexproject/cortex/pkg/chunk" - "github.com/cortexproject/cortex/pkg/chunk/local" - chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" - "github.com/prometheus/client_golang/prometheus" - "github.com/weaveworks/common/instrument" - "go.etcd.io/bbolt" -) - -type BoltdbIndexClientWithShipper struct { - *local.BoltIndexClient - shipper *Shipper -} - -// NewBoltDBIndexClientWithShipper creates a new IndexClient that used BoltDB. -func NewBoltDBIndexClientWithShipper(cfg local.BoltDBConfig, archiveStoreClient chunk.ObjectClient, archiverCfg ShipperConfig, registerer prometheus.Registerer) (chunk.IndexClient, error) { - boltDBIndexClient, err := local.NewBoltDBIndexClient(cfg) - if err != nil { - return nil, err - } - - shipper, err := NewShipper(archiverCfg, archiveStoreClient, boltDBIndexClient, registerer) - if err != nil { - return nil, err - } - - indexClient := BoltdbIndexClientWithShipper{ - BoltIndexClient: boltDBIndexClient, - shipper: shipper, - } - - return &indexClient, nil -} - -func (b *BoltdbIndexClientWithShipper) Stop() { - b.shipper.Stop() - b.BoltIndexClient.Stop() -} - -func (b *BoltdbIndexClientWithShipper) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { - return chunk_util.DoParallelQueries(ctx, b.query, queries, callback) -} - -func (b *BoltdbIndexClientWithShipper) query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error { - db, err := b.GetDB(query.TableName, local.DBOperationRead) - if err != nil && err != local.ErrUnexistentBoltDB { - return err - } - - if db != nil { - if err := b.QueryDB(ctx, db, query, callback); err != nil { - return err - } - } - - return instrument.CollectedRequest(ctx, "QUERY", instrument.NewHistogramCollector(b.shipper.metrics.requestDurationSeconds), instrument.ErrorCode, func(ctx context.Context) error { - return b.shipper.forEach(ctx, query.TableName, func(db *bbolt.DB) error { - return b.QueryDB(ctx, db, query, callback) - }) - }) -} diff --git a/pkg/storage/stores/local/downloads.go b/pkg/storage/stores/local/downloads.go deleted file mode 100644 index 69eb55b23f7a9..0000000000000 --- a/pkg/storage/stores/local/downloads.go +++ /dev/null @@ -1,254 +0,0 @@ -package local - -import ( - "context" - "fmt" - "io" - "os" - "path" - "strings" - "time" - - "github.com/cortexproject/cortex/pkg/chunk" - "github.com/cortexproject/cortex/pkg/chunk/local" - chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" - "github.com/cortexproject/cortex/pkg/util" - "github.com/go-kit/kit/log/level" -) - -// checkStorageForUpdates compares files from cache with storage and builds the list of files to be downloaded from storage and to be deleted from cache -func (fc *FilesCollection) checkStorageForUpdates(ctx context.Context) (toDownload []chunk.StorageObject, toDelete []string, err error) { - // listing tables from store - var objects []chunk.StorageObject - objects, _, err = fc.storageClient.List(ctx, fc.period+"/") - if err != nil { - return - } - - listedUploaders := make(map[string]struct{}, len(objects)) - - fc.mtx.RLock() - for _, object := range objects { - uploader, err := getUploaderFromObjectKey(object.Key) - if err != nil { - return nil, nil, err - } - listedUploaders[uploader] = struct{}{} - - // Checking whether file was updated in the store after we downloaded it, if not, no need to include it in updates - downloadedFileDetails, ok := fc.files[uploader] - if !ok || downloadedFileDetails.mtime != object.ModifiedAt { - toDownload = append(toDownload, object) - } - } - fc.mtx.RUnlock() - - err = fc.ForEach(func(uploader string, df *downloadedFile) error { - if _, isOK := listedUploaders[uploader]; !isOK { - toDelete = append(toDelete, uploader) - } - return nil - }) - - return -} - -// Sync downloads updated and new files from for given period from all the uploaders and removes deleted ones -func (fc *FilesCollection) Sync(ctx context.Context) error { - level.Debug(util.Logger).Log("msg", fmt.Sprintf("syncing files for period %s", fc.period)) - - toDownload, toDelete, err := fc.checkStorageForUpdates(ctx) - if err != nil { - return err - } - - for _, storageObject := range toDownload { - err = fc.downloadFile(ctx, storageObject) - if err != nil { - return err - } - } - - fc.mtx.Lock() - defer fc.mtx.Unlock() - - for _, uploader := range toDelete { - err := fc.cleanupFile(uploader) - if err != nil { - return err - } - } - - return nil -} - -// It first downloads file to a temp location so that we close the existing file(if already exists), replace it with new one and then reopen it. -func (fc *FilesCollection) downloadFile(ctx context.Context, storageObject chunk.StorageObject) error { - uploader, err := getUploaderFromObjectKey(storageObject.Key) - if err != nil { - return err - } - folderPath, _ := fc.getFolderPathForPeriod(false) - filePath := path.Join(folderPath, uploader) - - // download the file temporarily with some other name to allow boltdb client to close the existing file first if it exists - tempFilePath := path.Join(folderPath, fmt.Sprintf("%s.%s", uploader, "temp")) - - err = fc.getFileFromStorage(ctx, storageObject.Key, tempFilePath) - if err != nil { - return err - } - - fc.mtx.Lock() - defer fc.mtx.Unlock() - - df, ok := fc.files[uploader] - if ok { - if err := df.boltdb.Close(); err != nil { - return err - } - } else { - df = &downloadedFile{} - } - - // move the file from temp location to actual location - err = os.Rename(tempFilePath, filePath) - if err != nil { - return err - } - - df.mtime = storageObject.ModifiedAt - df.boltdb, err = local.OpenBoltdbFile(filePath) - if err != nil { - return err - } - - fc.files[uploader] = df - - return nil -} - -// getFileFromStorage downloads a file from storage to given location. -func (fc *FilesCollection) getFileFromStorage(ctx context.Context, objectKey, destination string) error { - readCloser, err := fc.storageClient.GetObject(ctx, objectKey) - if err != nil { - return err - } - - defer func() { - if err := readCloser.Close(); err != nil { - level.Error(util.Logger) - } - }() - - f, err := os.Create(destination) - if err != nil { - return err - } - - _, err = io.Copy(f, readCloser) - if err != nil { - return err - } - - level.Info(util.Logger).Log("msg", fmt.Sprintf("downloaded file %s", objectKey)) - - return f.Sync() -} - -// downloadAllFilesForPeriod should be called when files for a period does not exist i.e they were never downloaded or got cleaned up later on by TTL -// While files are being downloaded it will block all reads/writes on FilesCollection by taking an exclusive lock -func (fc *FilesCollection) downloadAllFilesForPeriod(ctx context.Context) (err error) { - defer func() { - status := statusSuccess - if err != nil { - status = statusFailure - fc.setErr(err) - - // cleaning up files due to error to avoid returning invalid results. - for fileName := range fc.files { - if err := fc.cleanupFile(fileName); err != nil { - level.Error(util.Logger).Log("msg", "failed to cleanup partially downloaded file", "filename", fileName, "err", err) - } - } - } - fc.metrics.filesDownloadOperationTotal.WithLabelValues(status).Inc() - }() - - startTime := time.Now() - totalFilesSize := int64(0) - - objects, _, err := fc.storageClient.List(ctx, fc.period+"/") - if err != nil { - return - } - - level.Debug(util.Logger).Log("msg", fmt.Sprintf("list of files to download for period %s: %s", fc.period, objects)) - - folderPath, err := fc.getFolderPathForPeriod(true) - if err != nil { - return - } - - for _, object := range objects { - var uploader string - uploader, err = getUploaderFromObjectKey(object.Key) - if err != nil { - return - } - - filePath := path.Join(folderPath, uploader) - df := downloadedFile{} - - err = fc.getFileFromStorage(ctx, object.Key, filePath) - if err != nil { - return - } - - df.mtime = object.ModifiedAt - df.boltdb, err = local.OpenBoltdbFile(filePath) - if err != nil { - return - } - - var stat os.FileInfo - stat, err = os.Stat(filePath) - if err != nil { - return - } - - totalFilesSize += stat.Size() - - fc.files[uploader] = &df - } - - duration := time.Since(startTime).Seconds() - fc.metrics.filesDownloadDurationSeconds.add(fc.period, duration) - fc.metrics.filesDownloadSizeBytes.add(fc.period, totalFilesSize) - - return -} - -func (fc *FilesCollection) getFolderPathForPeriod(ensureExists bool) (string, error) { - folderPath := path.Join(fc.cacheLocation, fc.period) - - if ensureExists { - err := chunk_util.EnsureDirectory(folderPath) - if err != nil { - return "", err - } - } - - return folderPath, nil -} - -func getUploaderFromObjectKey(objectKey string) (string, error) { - uploaders := strings.Split(objectKey, "/") - if len(uploaders) != 2 { - return "", fmt.Errorf("invalid object key: %v", objectKey) - } - if uploaders[1] == "" { - return "", fmt.Errorf("empty uploader, object key: %v", objectKey) - } - return uploaders[1], nil -} diff --git a/pkg/storage/stores/local/downloads_test.go b/pkg/storage/stores/local/downloads_test.go deleted file mode 100644 index a53f471ca5a5d..0000000000000 --- a/pkg/storage/stores/local/downloads_test.go +++ /dev/null @@ -1,121 +0,0 @@ -package local - -// import ( -// "context" -// "io/ioutil" -// "os" -// "path/filepath" -// "strconv" -// "testing" -// "time" - -// "github.com/cortexproject/cortex/pkg/chunk/local" - -// "github.com/cortexproject/cortex/pkg/chunk" -// "github.com/stretchr/testify/require" -// ) - -// func queryTestBoltdb(t *testing.T, boltdbIndexClient *BoltdbIndexClientWithShipper, query chunk.IndexQuery) map[string]string { -// resp := map[string]string{} - -// require.NoError(t, boltdbIndexClient.query(context.Background(), query, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { -// itr := batch.Iterator() -// for itr.Next() { -// resp[string(itr.RangeValue())] = string(itr.Value()) -// } -// return true -// })) - -// return resp -// } - -// func writeTestData(t *testing.T, indexClient *BoltdbIndexClientWithShipper, tableName string, numRecords, startValue int) { -// time.Sleep(time.Second / 2) - -// batch := indexClient.NewWriteBatch() -// for i := 0; i < numRecords; i++ { -// value := []byte(strconv.Itoa(startValue + i)) -// batch.Add(tableName, "", value, value) -// } - -// require.NoError(t, indexClient.BatchWrite(context.Background(), batch)) - -// boltdb, err := indexClient.GetDB(tableName, local.DBOperationWrite) -// require.NoError(t, err) - -// require.NoError(t, boltdb.Sync()) -// } - -// func TestShipper_Downloads(t *testing.T) { -// tempDirForTests, err := ioutil.TempDir("", "test-dir") -// require.NoError(t, err) - -// defer func() { -// require.NoError(t, os.RemoveAll(tempDirForTests)) -// }() - -// localStoreLocation, err := ioutil.TempDir(tempDirForTests, "local-store") -// require.NoError(t, err) - -// boltDBWithShipper1 := createTestBoltDBWithShipper(t, tempDirForTests, "ingester1", localStoreLocation) -// boltDBWithShipper2 := createTestBoltDBWithShipper(t, tempDirForTests, "ingester2", localStoreLocation) - -// // add a file to boltDBWithShipper1 -// writeTestData(t, boltDBWithShipper1, "1", 10, 0) - -// // upload files from boltDBWithShipper1 -// require.NoError(t, boltDBWithShipper1.shipper.uploadFiles(context.Background())) - -// // query data for same table from boltDBWithShipper2 -// resp := queryTestBoltdb(t, boltDBWithShipper2, chunk.IndexQuery{ -// TableName: "1", -// }) - -// // make sure we got same data that was added from boltDBWithShipper1 -// checkExpectedKVsInBoltdbResp(t, resp, 10, 0) - -// // add more data to the previous file added to boltDBWithShipper1 and the upload it -// writeTestData(t, boltDBWithShipper1, "1", 10, 10) -// require.NoError(t, boltDBWithShipper1.shipper.uploadFiles(context.Background())) - -// // sync files in boltDBWithShipper2 -// require.NoError(t, boltDBWithShipper2.shipper.syncLocalWithStorage(context.Background())) - -// // query data for same table from boltDBWithShipper2 -// resp = queryTestBoltdb(t, boltDBWithShipper2, chunk.IndexQuery{ -// TableName: "1", -// }) - -// // make sure we also got new data that was added from boltDBWithShipper1 -// checkExpectedKVsInBoltdbResp(t, resp, 20, 0) - -// // add some data for same table in boltDBWithShipper2 -// writeTestData(t, boltDBWithShipper2, "1", 10, 20) - -// // query data for same table from boltDBWithShipper2 -// resp = queryTestBoltdb(t, boltDBWithShipper2, chunk.IndexQuery{ -// TableName: "1", -// }) - -// // make sure we data from boltDBWithShipper1 and boltDBWithShipper2 -// checkExpectedKVsInBoltdbResp(t, resp, 30, 0) - -// // stop boltDBWithShipper1 -// boltDBWithShipper1.Stop() - -// // delete the file from the store that was uploaded by boltDBWithShipper1 -// require.NoError(t, os.Remove(filepath.Join(localStoreLocation, storageKeyPrefix, "1", boltDBWithShipper1.shipper.uploader))) - -// // sync files in boltDBWithShipper2 -// require.NoError(t, boltDBWithShipper2.shipper.syncLocalWithStorage(context.Background())) - -// // query data for same table from boltDBWithShipper2 -// resp = queryTestBoltdb(t, boltDBWithShipper2, chunk.IndexQuery{ -// TableName: "1", -// }) - -// // make sure we got only data that was added to boltDBWithShipper2 -// checkExpectedKVsInBoltdbResp(t, resp, 10, 20) - -// boltDBWithShipper2.Stop() -// } diff --git a/pkg/storage/stores/local/filescollection.go b/pkg/storage/stores/local/filescollection.go deleted file mode 100644 index 64566a6244bb9..0000000000000 --- a/pkg/storage/stores/local/filescollection.go +++ /dev/null @@ -1,137 +0,0 @@ -package local - -import ( - "context" - "fmt" - "os" - "sync" - "time" - - "github.com/cortexproject/cortex/pkg/chunk" - "github.com/cortexproject/cortex/pkg/util" - "github.com/go-kit/kit/log/level" - "go.etcd.io/bbolt" -) - -// timeout for downloading initial files for a table to avoid leaking resources by allowing it to take all the time. -const downloadTimeout = 5 * time.Minute - -type downloadedFile struct { - mtime time.Time - boltdb *bbolt.DB -} - -// FilesCollection holds info about shipped boltdb index files by other uploaders(ingesters). -// It is used to hold boltdb files created by all the ingesters for same period i.e with same name. -// In the object store files are uploaded as / to manage files with same name from different ingesters. -// Note: FilesCollection takes care of locking with all the exported/public methods. -// Note2: It has an err variable which is set when FilesCollection is in invalid state due to some issue. -// All operations which try to access/update the files except cleanup returns an error if set. -type FilesCollection struct { - mtx sync.RWMutex - - period string - cacheLocation string - metrics *downloaderMetrics - storageClient chunk.ObjectClient - - lastUsedAt time.Time - files map[string]*downloadedFile - err error - ready chan struct{} -} - -func NewFilesCollection(period, cacheLocation string, metrics *downloaderMetrics, storageClient chunk.ObjectClient) *FilesCollection { - fc := FilesCollection{ - period: period, - cacheLocation: cacheLocation, - metrics: metrics, - storageClient: storageClient, - files: map[string]*downloadedFile{}, - ready: make(chan struct{}), - } - - // keep the files collection locked until all the files are downloaded. - fc.mtx.Lock() - go func() { - defer fc.mtx.Unlock() - defer close(fc.ready) - - ctx, cancel := context.WithTimeout(context.Background(), downloadTimeout) - defer cancel() - - // Using background context to avoid cancellation of download when request times out. - // We would anyways need the files for serving next requests. - if err := fc.downloadAllFilesForPeriod(ctx); err != nil { - level.Error(util.Logger).Log("msg", "failed to download files", "period", fc.period) - } - }() - - return &fc -} - -func (fc *FilesCollection) cleanupFile(fileName string) error { - df, ok := fc.files[fileName] - if !ok { - return fmt.Errorf("file %s not found in files collection for cleaning up", fileName) - } - - filePath := df.boltdb.Path() - - if err := df.boltdb.Close(); err != nil { - return err - } - - delete(fc.files, fileName) - - return os.Remove(filePath) -} - -func (fc *FilesCollection) ForEach(callback func(fileName string, df *downloadedFile) error) error { - if fc.err != nil { - return fc.err - } - - fc.mtx.RLock() - defer fc.mtx.RUnlock() - - for fileName, df := range fc.files { - if err := callback(fileName, df); err != nil { - return err - } - } - - return nil -} - -func (fc *FilesCollection) CleanupAllFiles() error { - fc.mtx.Lock() - defer fc.mtx.Unlock() - - for fileName := range fc.files { - if err := fc.cleanupFile(fileName); err != nil { - return err - } - } - return nil -} - -func (fc *FilesCollection) UpdateLastUsedAt() { - fc.lastUsedAt = time.Now() -} - -func (fc *FilesCollection) LastUsedAt() time.Time { - return fc.lastUsedAt -} - -func (fc *FilesCollection) setErr(err error) { - fc.err = err -} - -func (fc *FilesCollection) Err() error { - return fc.err -} - -func (fc *FilesCollection) IsReady() chan struct{} { - return fc.ready -} diff --git a/pkg/storage/stores/local/metrics.go b/pkg/storage/stores/local/metrics.go deleted file mode 100644 index a748454ebb811..0000000000000 --- a/pkg/storage/stores/local/metrics.go +++ /dev/null @@ -1,108 +0,0 @@ -package local - -import ( - "sync" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/weaveworks/common/instrument" -) - -const ( - statusFailure = "failure" - statusSuccess = "success" -) - -type downloadPeriodDurationMetric struct { - sync.RWMutex - gauge prometheus.Gauge - periods map[string]float64 -} - -func (m *downloadPeriodDurationMetric) add(period string, downloadDuration float64) { - m.Lock() - defer m.Unlock() - m.periods[period] = downloadDuration - - totalDuration := float64(0) - for _, dur := range m.periods { - totalDuration += dur - } - - m.gauge.Set(totalDuration) -} - -type downloadPeriodBytesMetric struct { - sync.RWMutex - gauge prometheus.Gauge - periods map[string]int64 -} - -func (m *downloadPeriodBytesMetric) add(period string, downloadedBytes int64) { - m.Lock() - defer m.Unlock() - m.periods[period] = downloadedBytes - - totalDownloadedBytes := int64(0) - for _, downloadedBytes := range m.periods { - totalDownloadedBytes += downloadedBytes - } - - m.gauge.Set(float64(totalDownloadedBytes)) -} - -type downloaderMetrics struct { - // metrics for measuring performance of downloading of files per period initially i.e for the first time - filesDownloadDurationSeconds *downloadPeriodDurationMetric - filesDownloadSizeBytes *downloadPeriodBytesMetric - - filesDownloadOperationTotal *prometheus.CounterVec -} - -type boltDBShipperMetrics struct { - downloaderMetrics *downloaderMetrics - - // duration in seconds spent in serving request on index managed by BoltDB Shipper - requestDurationSeconds *prometheus.HistogramVec - - filesUploadOperationTotal *prometheus.CounterVec -} - -func newBoltDBShipperMetrics(r prometheus.Registerer) *boltDBShipperMetrics { - m := &boltDBShipperMetrics{ - downloaderMetrics: &downloaderMetrics{ - filesDownloadDurationSeconds: &downloadPeriodDurationMetric{ - periods: map[string]float64{}, - gauge: promauto.With(r).NewGauge(prometheus.GaugeOpts{ - Namespace: "loki_boltdb_shipper", - Name: "initial_files_download_duration_seconds", - Help: "Time (in seconds) spent in downloading of files per period, initially i.e for the first time", - })}, - filesDownloadSizeBytes: &downloadPeriodBytesMetric{ - periods: map[string]int64{}, - gauge: promauto.With(r).NewGauge(prometheus.GaugeOpts{ - Namespace: "loki_boltdb_shipper", - Name: "initial_files_download_size_bytes", - Help: "Size of files (in bytes) downloaded per period, initially i.e for the first time", - })}, - filesDownloadOperationTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ - Namespace: "loki_boltdb_shipper", - Name: "files_download_operation_total", - Help: "Total number of download operations done by status", - }, []string{"status"}), - }, - requestDurationSeconds: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "loki_boltdb_shipper", - Name: "request_duration_seconds", - Help: "Time (in seconds) spent serving requests when using boltdb shipper", - Buckets: instrument.DefBuckets, - }, []string{"operation", "status_code"}), - filesUploadOperationTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ - Namespace: "loki_boltdb_shipper", - Name: "files_upload_operation_total", - Help: "Total number of upload operations done by status", - }, []string{"status"}), - } - - return m -} diff --git a/pkg/storage/stores/local/shipper.go b/pkg/storage/stores/local/shipper.go deleted file mode 100644 index 1cde8c5615497..0000000000000 --- a/pkg/storage/stores/local/shipper.go +++ /dev/null @@ -1,294 +0,0 @@ -package local - -import ( - "context" - "flag" - "fmt" - "io/ioutil" - "os" - "path" - "sync" - "time" - - "github.com/cortexproject/cortex/pkg/chunk" - chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" - pkg_util "github.com/cortexproject/cortex/pkg/util" - "github.com/go-kit/kit/log/level" - "github.com/prometheus/client_golang/prometheus" - "go.etcd.io/bbolt" - - "github.com/grafana/loki/pkg/storage/stores/util" -) - -const ( - // ShipperModeReadWrite is to allow both read and write - ShipperModeReadWrite = iota - // ShipperModeReadOnly is to allow only read operations - ShipperModeReadOnly - // ShipperModeWriteOnly is to allow only write operations - ShipperModeWriteOnly - - // ShipperFileUploadInterval defines interval for uploading active boltdb files from local which are being written to by ingesters. - ShipperFileUploadInterval = 15 * time.Minute - - // BoltDBShipperType holds the index type for using boltdb with shipper which keeps flushing them to a shared storage - BoltDBShipperType = "boltdb-shipper" - - // FilesystemObjectStoreType holds the periodic config type for the filesystem store - FilesystemObjectStoreType = "filesystem" - - cacheCleanupInterval = 24 * time.Hour - storageKeyPrefix = "index/" -) - -type BoltDBGetter interface { - GetDB(name string, operation int) (*bbolt.DB, error) -} - -type ShipperConfig struct { - ActiveIndexDirectory string `yaml:"active_index_directory"` - SharedStoreType string `yaml:"shared_store"` - CacheLocation string `yaml:"cache_location"` - CacheTTL time.Duration `yaml:"cache_ttl"` - ResyncInterval time.Duration `yaml:"resync_interval"` - IngesterName string `yaml:"-"` - Mode int `yaml:"-"` -} - -// RegisterFlags registers flags. -func (cfg *ShipperConfig) RegisterFlags(f *flag.FlagSet) { - f.StringVar(&cfg.ActiveIndexDirectory, "boltdb.shipper.active-index-directory", "", "Directory where ingesters would write boltdb files which would then be uploaded by shipper to configured storage") - f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.shared-store", "", "Shared store for keeping boltdb files. Supported types: gcs, s3, azure, filesystem") - f.StringVar(&cfg.CacheLocation, "boltdb.shipper.cache-location", "", "Cache location for restoring boltDB files for queries") - f.DurationVar(&cfg.CacheTTL, "boltdb.shipper.cache-ttl", 24*time.Hour, "TTL for boltDB files restored in cache for queries") - f.DurationVar(&cfg.ResyncInterval, "boltdb.shipper.resync-interval", 5*time.Minute, "Resync downloaded files with the storage") -} - -type Shipper struct { - cfg ShipperConfig - boltDBGetter BoltDBGetter - - // downloadedPeriods holds mapping for period -> FilesCollection. - // Here period is name of the file created by ingesters for a specific period. - downloadedPeriods map[string]*FilesCollection - downloadedPeriodsMtx sync.RWMutex - storageClient chunk.ObjectClient - - uploader string - uploadedFilesMtime map[string]time.Time - uploadedFilesMtimeMtx sync.RWMutex - - done chan struct{} - wait sync.WaitGroup - metrics *boltDBShipperMetrics -} - -// NewShipper creates a shipper for syncing local objects with a store -func NewShipper(cfg ShipperConfig, storageClient chunk.ObjectClient, boltDBGetter BoltDBGetter, registerer prometheus.Registerer) (*Shipper, error) { - err := chunk_util.EnsureDirectory(cfg.CacheLocation) - if err != nil { - return nil, err - } - - shipper := Shipper{ - cfg: cfg, - boltDBGetter: boltDBGetter, - downloadedPeriods: map[string]*FilesCollection{}, - storageClient: util.NewPrefixedObjectClient(storageClient, storageKeyPrefix), - done: make(chan struct{}), - uploadedFilesMtime: map[string]time.Time{}, - metrics: newBoltDBShipperMetrics(registerer), - } - - shipper.uploader, err = shipper.getUploaderName() - if err != nil { - return nil, err - } - - level.Info(pkg_util.Logger).Log("msg", fmt.Sprintf("starting boltdb shipper in %d mode", cfg.Mode)) - - shipper.wait.Add(1) - go shipper.loop() - - return &shipper, nil -} - -// we would persist uploader name in /uploader/name file so that we use same name on subsequent restarts to -// avoid uploading same files again with different name. If the filed does not exist we would create one with uploader name set to -// ingester name and startup timestamp so that we randomise the name and do not override files from other ingesters. -func (s *Shipper) getUploaderName() (string, error) { - uploader := fmt.Sprintf("%s-%d", s.cfg.IngesterName, time.Now().UnixNano()) - - uploaderFilePath := path.Join(s.cfg.ActiveIndexDirectory, "uploader", "name") - if err := chunk_util.EnsureDirectory(path.Dir(uploaderFilePath)); err != nil { - return "", err - } - - _, err := os.Stat(uploaderFilePath) - if err != nil { - if !os.IsNotExist(err) { - return "", err - } - if err := ioutil.WriteFile(uploaderFilePath, []byte(uploader), 0666); err != nil { - return "", err - } - } else { - ub, err := ioutil.ReadFile(uploaderFilePath) - if err != nil { - return "", err - } - uploader = string(ub) - } - - return uploader, nil -} - -func (s *Shipper) loop() { - defer s.wait.Done() - - resyncTicker := time.NewTicker(s.cfg.ResyncInterval) - defer resyncTicker.Stop() - - uploadFilesTicker := time.NewTicker(ShipperFileUploadInterval) - defer uploadFilesTicker.Stop() - - cacheCleanupTicker := time.NewTicker(cacheCleanupInterval) - defer cacheCleanupTicker.Stop() - - for { - select { - case <-resyncTicker.C: - err := s.syncLocalWithStorage(context.Background()) - if err != nil { - level.Error(pkg_util.Logger).Log("msg", "error syncing local boltdb files with storage", "err", err) - } - case <-uploadFilesTicker.C: - err := s.uploadFiles(context.Background()) - if err != nil { - level.Error(pkg_util.Logger).Log("msg", "error pushing archivable files to store", "err", err) - } - case <-cacheCleanupTicker.C: - err := s.cleanupCache() - if err != nil { - level.Error(pkg_util.Logger).Log("msg", "error cleaning up expired tables", "err", err) - } - case <-s.done: - return - } - } -} - -// Stop the shipper and push all the local files to the store -func (s *Shipper) Stop() { - close(s.done) - s.wait.Wait() - - // Push all boltdb files to storage before returning - err := s.uploadFiles(context.Background()) - if err != nil { - level.Error(pkg_util.Logger).Log("msg", "error pushing archivable files to store", "err", err) - } - - s.downloadedPeriodsMtx.Lock() - defer s.downloadedPeriodsMtx.Unlock() - - for period, fc := range s.downloadedPeriods { - err := fc.ForEach(func(uploader string, df *downloadedFile) error { - return df.boltdb.Close() - }) - if err != nil { - level.Error(pkg_util.Logger).Log("msg", "failed to close boltdb files", "period", period, "err", err) - } - } -} - -// cleanupCache removes all the files for a period which has not be queried for using the configured TTL -func (s *Shipper) cleanupCache() error { - s.downloadedPeriodsMtx.Lock() - defer s.downloadedPeriodsMtx.Unlock() - - for period, fc := range s.downloadedPeriods { - lastUsedAt := fc.LastUsedAt() - if lastUsedAt.Add(s.cfg.CacheTTL).Before(time.Now()) { - err := fc.CleanupAllFiles() - if err != nil { - return err - } - - delete(s.downloadedPeriods, period) - } - } - - return nil -} - -// syncLocalWithStorage syncs all the periods that we have in the cache with the storage -// i.e download new and updated files and remove files which were delete from the storage. -func (s *Shipper) syncLocalWithStorage(ctx context.Context) (err error) { - if s.cfg.Mode == ShipperModeWriteOnly { - return - } - - s.downloadedPeriodsMtx.RLock() - defer s.downloadedPeriodsMtx.RUnlock() - - defer func() { - status := statusSuccess - if err != nil { - status = statusFailure - } - s.metrics.downloaderMetrics.filesDownloadOperationTotal.WithLabelValues(status).Inc() - }() - - for period := range s.downloadedPeriods { - if err := s.downloadedPeriods[period].Sync(ctx); err != nil { - return err - } - } - - return -} - -func (s *Shipper) forEach(ctx context.Context, period string, callback func(db *bbolt.DB) error) error { - // if filesCollection is already there, use it. - s.downloadedPeriodsMtx.RLock() - fc, ok := s.downloadedPeriods[period] - s.downloadedPeriodsMtx.RUnlock() - - if !ok { - s.downloadedPeriodsMtx.Lock() - // check if some other competing goroutine got the lock before us and created the filesCollection, use it if so. - fc, ok = s.downloadedPeriods[period] - if !ok { - // filesCollection not found, creating one. - level.Info(pkg_util.Logger).Log("msg", fmt.Sprintf("downloading all files for period %s", period)) - - fc = NewFilesCollection(period, s.cfg.CacheLocation, s.metrics.downloaderMetrics, s.storageClient) - s.downloadedPeriods[period] = fc - } - s.downloadedPeriodsMtx.Unlock() - } - - // let us check if FilesCollection is ready for use while also honoring the context timeout - select { - case <-ctx.Done(): - return ctx.Err() - case <-fc.IsReady(): - } - - fc.UpdateLastUsedAt() - err := fc.ForEach(func(uploader string, df *downloadedFile) error { - return callback(df.boltdb) - }) - - // the request which started the download could have timed out and returned so cleaning up the reference here - if err != nil && fc.Err() != nil { - s.downloadedPeriodsMtx.Lock() - defer s.downloadedPeriodsMtx.Unlock() - - // cleaning up fc since it is in invalid state anyways. - delete(s.downloadedPeriods, period) - } - - return err -} diff --git a/pkg/storage/stores/local/uploads.go b/pkg/storage/stores/local/uploads.go deleted file mode 100644 index 1990f3dcf025e..0000000000000 --- a/pkg/storage/stores/local/uploads.go +++ /dev/null @@ -1,120 +0,0 @@ -package local - -import ( - "context" - "fmt" - "io/ioutil" - "os" - "path" - - "github.com/cortexproject/cortex/pkg/chunk/local" - chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" - "github.com/cortexproject/cortex/pkg/util" - "github.com/go-kit/kit/log/level" - "go.etcd.io/bbolt" -) - -// uploadFiles uploads all new and updated files to storage. -// It uploads the files from configured boltdb dir where ingester writes the index. -func (s *Shipper) uploadFiles(ctx context.Context) (err error) { - if s.cfg.Mode == ShipperModeReadOnly { - return - } - - defer func() { - status := statusSuccess - if err != nil { - status = statusFailure - } - s.metrics.filesUploadOperationTotal.WithLabelValues(status).Inc() - }() - - filesInfo, err := ioutil.ReadDir(s.cfg.ActiveIndexDirectory) - if err != nil { - return - } - - for _, fileInfo := range filesInfo { - if fileInfo.IsDir() { - continue - } - - s.uploadedFilesMtimeMtx.RLock() - // Checking whether file is updated after last push, if not skipping it - uploadedFileMtime, ok := s.uploadedFilesMtime[fileInfo.Name()] - s.uploadedFilesMtimeMtx.RUnlock() - - if ok && uploadedFileMtime.Equal(fileInfo.ModTime()) { - continue - } - - err = s.uploadFile(ctx, fileInfo.Name()) - if err != nil { - return - } - - s.uploadedFilesMtimeMtx.Lock() - s.uploadedFilesMtime[fileInfo.Name()] = fileInfo.ModTime() - s.uploadedFilesMtimeMtx.Unlock() - } - - return -} - -// uploadFile uploads one of the files locally written by ingesters to storage. -func (s *Shipper) uploadFile(ctx context.Context, period string) error { - if s.cfg.Mode == ShipperModeReadOnly { - return nil - } - - level.Debug(util.Logger).Log("msg", fmt.Sprintf("uploading file for period %s", period)) - - snapshotPath := path.Join(s.cfg.CacheLocation, period) - err := chunk_util.EnsureDirectory(snapshotPath) - if err != nil { - return err - } - - filePath := path.Join(snapshotPath, fmt.Sprintf("%s.%s", s.uploader, "temp")) - f, err := os.Create(filePath) - if err != nil { - return err - } - - defer func() { - if err := os.Remove(filePath); err != nil { - level.Error(util.Logger) - } - }() - - db, err := s.boltDBGetter.GetDB(period, local.DBOperationRead) - if err != nil { - return err - } - - err = db.View(func(tx *bbolt.Tx) error { - _, err := tx.WriteTo(f) - return err - }) - if err != nil { - return err - } - - if err := f.Sync(); err != nil { - return err - } - - if _, err := f.Seek(0, 0); err != nil { - return err - } - - defer func() { - if err := f.Close(); err != nil { - level.Error(util.Logger) - } - }() - - // Files are stored with / - objectKey := fmt.Sprintf("%s/%s", period, s.uploader) - return s.storageClient.PutObject(ctx, objectKey, f) -} diff --git a/pkg/storage/stores/local/uploads_test.go b/pkg/storage/stores/local/uploads_test.go deleted file mode 100644 index b21249dfbe776..0000000000000 --- a/pkg/storage/stores/local/uploads_test.go +++ /dev/null @@ -1,166 +0,0 @@ -package local - -import ( - "context" - "io/ioutil" - "os" - "path/filepath" - "strconv" - "testing" - "time" - - "github.com/cortexproject/cortex/pkg/chunk/local" - "github.com/cortexproject/cortex/pkg/chunk/util" - "github.com/stretchr/testify/require" - "go.etcd.io/bbolt" -) - -const testBucketName = "testBucket" - -func createTestBoltDBWithShipper(t *testing.T, parentTempDir, ingesterName, localStoreLocation string) *BoltdbIndexClientWithShipper { - cacheLocation := filepath.Join(parentTempDir, ingesterName, "cache") - boltdbFilesLocation := filepath.Join(parentTempDir, ingesterName, "boltdb") - - require.NoError(t, util.EnsureDirectory(cacheLocation)) - require.NoError(t, util.EnsureDirectory(boltdbFilesLocation)) - - shipperConfig := ShipperConfig{ - ActiveIndexDirectory: boltdbFilesLocation, - CacheLocation: cacheLocation, - CacheTTL: 1 * time.Hour, - ResyncInterval: 1 * time.Hour, - IngesterName: ingesterName, - Mode: ShipperModeReadWrite, - } - - archiveStoreClient, err := local.NewFSObjectClient(local.FSConfig{ - Directory: localStoreLocation, - }) - require.NoError(t, err) - - boltdbIndexClientWithShipper, err := NewBoltDBIndexClientWithShipper( - local.BoltDBConfig{Directory: shipperConfig.ActiveIndexDirectory}, archiveStoreClient, shipperConfig, nil) - require.NoError(t, err) - - return boltdbIndexClientWithShipper.(*BoltdbIndexClientWithShipper) -} - -func addTestRecordsToBoltDBFile(t *testing.T, boltdb *bbolt.DB, numRecords int, start int) { - time.Sleep(time.Second / 2) - - err := boltdb.Update(func(tx *bbolt.Tx) error { - b, err := tx.CreateBucketIfNotExists([]byte(testBucketName)) - if err != nil { - return err - } - - for i := 0; i < numRecords; i++ { - kv := []byte(strconv.Itoa(start + i)) - - err = b.Put(kv, kv) - if err != nil { - return err - } - } - - return nil - }) - - require.NoError(t, err) - require.NoError(t, boltdb.Sync()) -} - -func readAllKVsFromBoltdbFile(t *testing.T, boltdb *bbolt.DB) map[string]string { - resp := map[string]string{} - - err := boltdb.View(func(tx *bbolt.Tx) error { - b := tx.Bucket([]byte(testBucketName)) - require.NotNil(t, b) - - return b.ForEach(func(k, v []byte) error { - resp[string(k)] = string(v) - return nil - }) - }) - - require.NoError(t, err) - - return resp -} - -func readAllKVsFromBoltdbFileAtPath(t *testing.T, path string) map[string]string { - boltDBFile, err := local.OpenBoltdbFile(path) - require.NoError(t, err) - - defer func() { - require.NoError(t, boltDBFile.Close()) - }() - - return readAllKVsFromBoltdbFile(t, boltDBFile) -} - -func checkExpectedKVsInBoltdbResp(t *testing.T, resp map[string]string, expectedNumRecords, start int) { - require.Equal(t, expectedNumRecords, len(resp), "responses", resp) - - for i := 0; i < expectedNumRecords; i++ { - expectedKV := strconv.Itoa(start + i) - - val, ok := resp[expectedKV] - require.Equal(t, true, ok) - require.Equal(t, expectedKV, val) - } -} - -func TestShipper_Uploads(t *testing.T) { - tempDirForTests, err := ioutil.TempDir("", "test-dir") - require.NoError(t, err) - - defer func() { - require.NoError(t, os.RemoveAll(tempDirForTests)) - }() - - localStoreLocation, err := ioutil.TempDir(tempDirForTests, "local-store") - require.NoError(t, err) - - boltDBWithShipper := createTestBoltDBWithShipper(t, tempDirForTests, "ingester", localStoreLocation) - - // create a boltdb file for boltDBWithShipper to test upload. - boltdbFile1, err := boltDBWithShipper.GetDB("file1", local.DBOperationWrite) - require.NoError(t, err) - file1PathInStorage := filepath.Join(localStoreLocation, storageKeyPrefix, filepath.Base(boltdbFile1.Path()), boltDBWithShipper.shipper.uploader) - - // add some test records to boltdbFile1 - addTestRecordsToBoltDBFile(t, boltdbFile1, 10, 1) - - // Upload files from boltDBWithShipper - err = boltDBWithShipper.shipper.uploadFiles(context.Background()) - require.NoError(t, err) - - // open boltdbFile1 and verify it has expected records - checkExpectedKVsInBoltdbResp(t, readAllKVsFromBoltdbFileAtPath(t, file1PathInStorage), 10, 1) - - // create another boltdb file for boltDBWithShipper to test upload. - boltdbFile2, err := boltDBWithShipper.GetDB("file2", local.DBOperationWrite) - require.NoError(t, err) - file2PathInStorage := filepath.Join(localStoreLocation, storageKeyPrefix, filepath.Base(boltdbFile2.Path()), boltDBWithShipper.shipper.uploader) - - // add some test records to boltdbFile2 and some more records to boltdbFile1 - addTestRecordsToBoltDBFile(t, boltdbFile2, 10, 1) - addTestRecordsToBoltDBFile(t, boltdbFile1, 5, 11) - - // Upload files from boltDBWithShipper - err = boltDBWithShipper.shipper.uploadFiles(context.Background()) - require.NoError(t, err) - - // open boltdbFile1 and boltdbFile2 and verify it has expected records - checkExpectedKVsInBoltdbResp(t, readAllKVsFromBoltdbFileAtPath(t, file2PathInStorage), 10, 1) - checkExpectedKVsInBoltdbResp(t, readAllKVsFromBoltdbFileAtPath(t, file1PathInStorage), 15, 1) - - // modify boltdbFile2 again - addTestRecordsToBoltDBFile(t, boltdbFile2, 10, 11) - - // stop boltDBWithShipper to make it upload all the new and changed to store - boltDBWithShipper.Stop() - - checkExpectedKVsInBoltdbResp(t, readAllKVsFromBoltdbFileAtPath(t, file2PathInStorage), 20, 1) -} diff --git a/pkg/storage/stores/shipper/downloads/metrics.go b/pkg/storage/stores/shipper/downloads/metrics.go new file mode 100644 index 0000000000000..0035f4675fa35 --- /dev/null +++ b/pkg/storage/stores/shipper/downloads/metrics.go @@ -0,0 +1,85 @@ +package downloads + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const ( + statusFailure = "failure" + statusSuccess = "success" +) + +type downloadTableDurationMetric struct { + sync.RWMutex + gauge prometheus.Gauge + periods map[string]float64 +} + +func (m *downloadTableDurationMetric) add(period string, downloadDuration float64) { + m.Lock() + defer m.Unlock() + m.periods[period] = downloadDuration + + totalDuration := float64(0) + for _, dur := range m.periods { + totalDuration += dur + } + + m.gauge.Set(totalDuration) +} + +type downloadTableBytesMetric struct { + sync.RWMutex + gauge prometheus.Gauge + periods map[string]int64 +} + +func (m *downloadTableBytesMetric) add(period string, downloadedBytes int64) { + m.Lock() + defer m.Unlock() + m.periods[period] = downloadedBytes + + totalDownloadedBytes := int64(0) + for _, downloadedBytes := range m.periods { + totalDownloadedBytes += downloadedBytes + } + + m.gauge.Set(float64(totalDownloadedBytes)) +} + +type metrics struct { + // metrics for measuring performance of downloading of files per period initially i.e for the first time + filesDownloadDurationSeconds *downloadTableDurationMetric + filesDownloadSizeBytes *downloadTableBytesMetric + + filesDownloadOperationTotal *prometheus.CounterVec +} + +func newMetrics(r prometheus.Registerer) *metrics { + m := &metrics{ + filesDownloadDurationSeconds: &downloadTableDurationMetric{ + periods: map[string]float64{}, + gauge: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: "loki_boltdb_shipper", + Name: "initial_files_download_duration_seconds", + Help: "Time (in seconds) spent in downloading of files per table, initially i.e for the first time", + })}, + filesDownloadSizeBytes: &downloadTableBytesMetric{ + periods: map[string]int64{}, + gauge: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: "loki_boltdb_shipper", + Name: "initial_files_download_size_bytes", + Help: "Size of files (in bytes) downloaded per table, initially i.e for the first time", + })}, + filesDownloadOperationTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki_boltdb_shipper", + Name: "files_download_operation_total", + Help: "Total number of download operations done by status", + }, []string{"status"}), + } + + return m +} diff --git a/pkg/storage/stores/shipper/downloads/table.go b/pkg/storage/stores/shipper/downloads/table.go new file mode 100644 index 0000000000000..efca56fa7cc85 --- /dev/null +++ b/pkg/storage/stores/shipper/downloads/table.go @@ -0,0 +1,415 @@ +package downloads + +import ( + "context" + "fmt" + "io" + "os" + "path" + "strings" + "sync" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/local" + chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log/level" + "go.etcd.io/bbolt" +) + +// timeout for downloading initial files for a table to avoid leaking resources by allowing it to take all the time. +const downloadTimeout = 5 * time.Minute + +type BoltDBIndexClient interface { + QueryDB(ctx context.Context, db *bbolt.DB, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error +} + +type StorageClient interface { + GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error) + List(ctx context.Context, prefix string) ([]chunk.StorageObject, []chunk.StorageCommonPrefix, error) +} + +type downloadedFile struct { + mtime time.Time + boltdb *bbolt.DB +} + +// Table is a collection of multiple files created for a same table by various ingesters. +// All the public methods are concurrency safe and take care of mutexes to avoid any data race. +type Table struct { + name string + cacheLocation string + metrics *metrics + storageClient StorageClient + boltDBIndexClient BoltDBIndexClient + + lastUsedAt time.Time + dbs map[string]*downloadedFile + dbsMtx sync.RWMutex + err error + + ready chan struct{} // helps with detecting initialization of table which downloads all the existing files. + cancelFunc context.CancelFunc // helps with cancellation of initialization if we are asked to stop. +} + +func NewTable(name, cacheLocation string, storageClient StorageClient, boltDBIndexClient BoltDBIndexClient, metrics *metrics) *Table { + ctx, cancel := context.WithCancel(context.Background()) + + table := Table{ + name: name, + cacheLocation: cacheLocation, + metrics: metrics, + storageClient: storageClient, + boltDBIndexClient: boltDBIndexClient, + lastUsedAt: time.Now(), + dbs: map[string]*downloadedFile{}, + ready: make(chan struct{}), + cancelFunc: cancel, + } + + // keep the files collection locked until all the files are downloaded. + table.dbsMtx.Lock() + go func() { + defer table.dbsMtx.Unlock() + defer close(table.ready) + + ctx, cancel := context.WithTimeout(ctx, downloadTimeout) + defer cancel() + + // Using background context to avoid cancellation of download when request times out. + // We would anyways need the files for serving next requests. + if err := table.init(ctx); err != nil { + level.Error(util.Logger).Log("msg", "failed to download table", "name", table.name) + } + }() + + return &table +} + +// init downloads all the db files for the table from object storage. +// it assumes the locking of mutex is taken care of by the caller. +func (t *Table) init(ctx context.Context) (err error) { + defer func() { + status := statusSuccess + if err != nil { + status = statusFailure + t.err = err + + level.Error(util.Logger).Log("msg", fmt.Sprintf("failed to initialize table %s, cleaning it up", t.name), "err", err) + + // cleaning up files due to error to avoid returning invalid results. + for fileName := range t.dbs { + if err := t.cleanupDB(fileName); err != nil { + level.Error(util.Logger).Log("msg", "failed to cleanup partially downloaded file", "filename", fileName, "err", err) + } + } + } + t.metrics.filesDownloadOperationTotal.WithLabelValues(status).Inc() + }() + + startTime := time.Now() + totalFilesSize := int64(0) + + objects, _, err := t.storageClient.List(ctx, t.name+"/") + if err != nil { + return + } + + level.Debug(util.Logger).Log("msg", fmt.Sprintf("list of files to download for period %s: %s", t.name, objects)) + + folderPath, err := t.folderPathForTable(true) + if err != nil { + return + } + + for _, object := range objects { + dbName, err := getDBNameFromObjectKey(object.Key) + if err != nil { + return err + } + + filePath := path.Join(folderPath, dbName) + df := downloadedFile{} + + err = t.getFileFromStorage(ctx, object.Key, filePath) + if err != nil { + return err + } + + df.mtime = object.ModifiedAt + df.boltdb, err = local.OpenBoltdbFile(filePath) + if err != nil { + return err + } + + var stat os.FileInfo + stat, err = os.Stat(filePath) + if err != nil { + return err + } + + totalFilesSize += stat.Size() + + t.dbs[dbName] = &df + } + + duration := time.Since(startTime).Seconds() + t.metrics.filesDownloadDurationSeconds.add(t.name, duration) + t.metrics.filesDownloadSizeBytes.add(t.name, totalFilesSize) + + return +} + +// Closes references to all the dbs. +func (t *Table) Close() { + // stop the initialization if it is still ongoing. + t.cancelFunc() + + t.dbsMtx.Lock() + defer t.dbsMtx.Unlock() + + for name, db := range t.dbs { + if err := db.boltdb.Close(); err != nil { + level.Error(util.Logger).Log("msg", fmt.Errorf("failed to close file %s for table %s", name, t.name)) + } + } + + t.dbs = map[string]*downloadedFile{} +} + +// Queries all the dbs for index. +func (t *Table) Query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error { + // let us check if table is ready for use while also honoring the context timeout + select { + case <-ctx.Done(): + return ctx.Err() + case <-t.ready: + } + + t.dbsMtx.RLock() + defer t.dbsMtx.RUnlock() + + if t.err != nil { + return t.err + } + + t.lastUsedAt = time.Now() + + for _, db := range t.dbs { + if err := t.boltDBIndexClient.QueryDB(ctx, db.boltdb, query, callback); err != nil { + return err + } + } + + return nil +} + +// Closes reference to all the open dbs and removes the local file. +func (t *Table) CleanupAllDBs() error { + t.dbsMtx.Lock() + defer t.dbsMtx.Unlock() + + for fileName := range t.dbs { + if err := t.cleanupDB(fileName); err != nil { + return err + } + } + return nil +} + +// Err returns the err which is usually set when there was any issue in init. +func (t *Table) Err() error { + return t.err +} + +// LastUsedAt returns the time at which table was last used for querying. +func (t *Table) LastUsedAt() time.Time { + return t.lastUsedAt +} + +func (t *Table) cleanupDB(fileName string) error { + df, ok := t.dbs[fileName] + if !ok { + return fmt.Errorf("file %s not found in files collection for cleaning up", fileName) + } + + filePath := df.boltdb.Path() + + if err := df.boltdb.Close(); err != nil { + return err + } + + delete(t.dbs, fileName) + + return os.Remove(filePath) +} + +// Sync downloads updated and new files from the storage relevant for the table and removes the deleted ones +func (t *Table) Sync(ctx context.Context) error { + level.Debug(util.Logger).Log("msg", fmt.Sprintf("syncing files for table %s", t.name)) + + toDownload, toDelete, err := t.checkStorageForUpdates(ctx) + if err != nil { + return err + } + + level.Debug(util.Logger).Log("msg", fmt.Sprintf("updates for table %s. toDownload: %s, toDelete: %s", t.name, toDownload, toDelete)) + + for _, storageObject := range toDownload { + err = t.downloadFile(ctx, storageObject) + if err != nil { + return err + } + } + + t.dbsMtx.Lock() + defer t.dbsMtx.Unlock() + + for _, db := range toDelete { + err := t.cleanupDB(db) + if err != nil { + return err + } + } + + return nil +} + +// checkStorageForUpdates compares files from cache with storage and builds the list of files to be downloaded from storage and to be deleted from cache +func (t *Table) checkStorageForUpdates(ctx context.Context) (toDownload []chunk.StorageObject, toDelete []string, err error) { + // listing tables from store + var objects []chunk.StorageObject + objects, _, err = t.storageClient.List(ctx, t.name+"/") + if err != nil { + return + } + + listedDBs := make(map[string]struct{}, len(objects)) + + t.dbsMtx.RLock() + defer t.dbsMtx.RUnlock() + + for _, object := range objects { + dbName, err := getDBNameFromObjectKey(object.Key) + if err != nil { + return nil, nil, err + } + listedDBs[dbName] = struct{}{} + + // Checking whether file was updated in the store after we downloaded it, if not, no need to include it in updates + downloadedFileDetails, ok := t.dbs[dbName] + if !ok || downloadedFileDetails.mtime != object.ModifiedAt { + toDownload = append(toDownload, object) + } + } + + for db := range t.dbs { + if _, isOK := listedDBs[db]; !isOK { + toDelete = append(toDelete, db) + } + } + + return +} + +// downloadFile first downloads file to a temp location so that we can close the existing db(if already exists), replace it with new one and then reopen it. +func (t *Table) downloadFile(ctx context.Context, storageObject chunk.StorageObject) error { + level.Info(util.Logger).Log("msg", fmt.Sprintf("downloading object from storage with key %s", storageObject.Key)) + + dbName, err := getDBNameFromObjectKey(storageObject.Key) + if err != nil { + return err + } + folderPath, _ := t.folderPathForTable(false) + filePath := path.Join(folderPath, dbName) + + // download the file temporarily with some other name to allow boltdb client to close the existing file first if it exists + tempFilePath := path.Join(folderPath, fmt.Sprintf("%s.%s", dbName, "temp")) + + err = t.getFileFromStorage(ctx, storageObject.Key, tempFilePath) + if err != nil { + return err + } + + t.dbsMtx.Lock() + defer t.dbsMtx.Unlock() + + df, ok := t.dbs[dbName] + if ok { + if err := df.boltdb.Close(); err != nil { + return err + } + } else { + df = &downloadedFile{} + } + + // move the file from temp location to actual location + err = os.Rename(tempFilePath, filePath) + if err != nil { + return err + } + + df.mtime = storageObject.ModifiedAt + df.boltdb, err = local.OpenBoltdbFile(filePath) + if err != nil { + return err + } + + t.dbs[dbName] = df + + return nil +} + +// getFileFromStorage downloads a file from storage to given location. +func (t *Table) getFileFromStorage(ctx context.Context, objectKey, destination string) error { + readCloser, err := t.storageClient.GetObject(ctx, objectKey) + if err != nil { + return err + } + + defer func() { + if err := readCloser.Close(); err != nil { + level.Error(util.Logger) + } + }() + + f, err := os.Create(destination) + if err != nil { + return err + } + + _, err = io.Copy(f, readCloser) + if err != nil { + return err + } + + level.Info(util.Logger).Log("msg", fmt.Sprintf("downloaded file %s", objectKey)) + + return f.Sync() +} + +func (t *Table) folderPathForTable(ensureExists bool) (string, error) { + folderPath := path.Join(t.cacheLocation, t.name) + + if ensureExists { + err := chunk_util.EnsureDirectory(folderPath) + if err != nil { + return "", err + } + } + + return folderPath, nil +} + +func getDBNameFromObjectKey(objectKey string) (string, error) { + ss := strings.Split(objectKey, "/") + + if len(ss) != 2 { + return "", fmt.Errorf("invalid object key: %v", objectKey) + } + if ss[1] == "" { + return "", fmt.Errorf("empty db name, object key: %v", objectKey) + } + return ss[1], nil +} diff --git a/pkg/storage/stores/shipper/downloads/table_manager.go b/pkg/storage/stores/shipper/downloads/table_manager.go new file mode 100644 index 0000000000000..588f321869896 --- /dev/null +++ b/pkg/storage/stores/shipper/downloads/table_manager.go @@ -0,0 +1,176 @@ +package downloads + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" + chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" + pkg_util "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" +) + +const cacheCleanupInterval = time.Hour + +type Config struct { + CacheDir string + SyncInterval time.Duration + CacheTTL time.Duration +} + +type TableManager struct { + cfg Config + boltIndexClient BoltDBIndexClient + storageClient StorageClient + + tables map[string]*Table + tablesMtx sync.RWMutex + metrics *metrics + + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +func NewTableManager(cfg Config, boltIndexClient BoltDBIndexClient, storageClient StorageClient, registerer prometheus.Registerer) (*TableManager, error) { + ctx, cancel := context.WithCancel(context.Background()) + tm := &TableManager{ + cfg: cfg, + boltIndexClient: boltIndexClient, + storageClient: storageClient, + tables: make(map[string]*Table), + metrics: newMetrics(registerer), + ctx: ctx, + cancel: cancel, + } + go tm.loop() + return tm, nil +} + +func (tm *TableManager) loop() { + tm.wg.Add(1) + defer tm.wg.Done() + + syncTicker := time.NewTicker(tm.cfg.SyncInterval) + defer syncTicker.Stop() + + cacheCleanupTicker := time.NewTicker(cacheCleanupInterval) + defer cacheCleanupTicker.Stop() + + for { + select { + case <-syncTicker.C: + err := tm.syncTables(tm.ctx) + if err != nil { + level.Error(pkg_util.Logger).Log("msg", "error syncing local boltdb files with storage", "err", err) + } + case <-cacheCleanupTicker.C: + err := tm.cleanupCache() + if err != nil { + level.Error(pkg_util.Logger).Log("msg", "error cleaning up expired tables", "err", err) + } + case <-tm.ctx.Done(): + return + } + } +} + +func (tm *TableManager) Stop() { + tm.cancel() + tm.wg.Wait() + + tm.tablesMtx.Lock() + defer tm.tablesMtx.Unlock() + + for _, table := range tm.tables { + table.Close() + } +} + +func (tm *TableManager) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error { + return chunk_util.DoParallelQueries(ctx, tm.query, queries, callback) +} + +func (tm *TableManager) query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error { + table := tm.getOrCreateTable(query.TableName) + + err := table.Query(ctx, query, callback) + if err != nil { + if table.Err() != nil { + // table is in invalid state, remove the table so that next queries re-create it. + tm.tablesMtx.Lock() + defer tm.tablesMtx.Unlock() + + level.Error(pkg_util.Logger).Log("msg", fmt.Sprintf("table %s has some problem, cleaning it up", query.TableName), "err", table.Err()) + + delete(tm.tables, query.TableName) + return table.Err() + } + } + + return err +} + +func (tm *TableManager) getOrCreateTable(tableName string) *Table { + // if table is already there, use it. + tm.tablesMtx.RLock() + table, ok := tm.tables[tableName] + tm.tablesMtx.RUnlock() + + if !ok { + tm.tablesMtx.Lock() + // check if some other competing goroutine got the lock before us and created the table, use it if so. + table, ok = tm.tables[tableName] + if !ok { + // table not found, creating one. + level.Info(pkg_util.Logger).Log("msg", fmt.Sprintf("downloading all files for table %s", tableName)) + + table = NewTable(tableName, tm.cfg.CacheDir, tm.storageClient, tm.boltIndexClient, tm.metrics) + tm.tables[tableName] = table + } + tm.tablesMtx.Unlock() + } + + return table +} + +func (tm *TableManager) syncTables(ctx context.Context) error { + tm.tablesMtx.RLock() + defer tm.tablesMtx.RUnlock() + + level.Info(pkg_util.Logger).Log("msg", "syncing tables") + + for _, table := range tm.tables { + err := table.Sync(ctx) + if err != nil { + return err + } + } + + return nil +} + +func (tm *TableManager) cleanupCache() error { + tm.tablesMtx.Lock() + defer tm.tablesMtx.Unlock() + + level.Info(pkg_util.Logger).Log("msg", "cleaning tables cache") + + for name, table := range tm.tables { + lastUsedAt := table.LastUsedAt() + if lastUsedAt.Add(tm.cfg.CacheTTL).Before(time.Now()) { + level.Info(pkg_util.Logger).Log("msg", fmt.Sprintf("cleaning up expired table %s", name)) + err := table.CleanupAllDBs() + if err != nil { + return err + } + + delete(tm.tables, name) + } + } + + return nil +} diff --git a/pkg/storage/stores/shipper/downloads/table_manager_test.go b/pkg/storage/stores/shipper/downloads/table_manager_test.go new file mode 100644 index 0000000000000..33b59e79b2bb0 --- /dev/null +++ b/pkg/storage/stores/shipper/downloads/table_manager_test.go @@ -0,0 +1,133 @@ +package downloads + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/local" + "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" + "github.com/stretchr/testify/require" +) + +func buildTestTableManager(t *testing.T, path string) (*TableManager, *local.BoltIndexClient, stopFunc) { + boltDBIndexClient, fsObjectClient := buildTestClients(t, path) + cachePath := filepath.Join(path, cacheDirName) + + cfg := Config{ + CacheDir: cachePath, + SyncInterval: time.Hour, + CacheTTL: time.Hour, + } + tableManager, err := NewTableManager(cfg, boltDBIndexClient, fsObjectClient, nil) + require.NoError(t, err) + + return tableManager, boltDBIndexClient, func() { + tableManager.Stop() + boltDBIndexClient.Stop() + } +} + +func TestTableManager_QueryPages(t *testing.T) { + tempDir, err := ioutil.TempDir("", "table-manager-query-pages") + require.NoError(t, err) + + defer func() { + require.NoError(t, os.RemoveAll(tempDir)) + }() + + objectStoragePath := filepath.Join(tempDir, objectsStorageDirName) + + tables := map[string]map[string]testutil.DBRecords{ + "table1": { + "db1": { + Start: 0, + NumRecords: 10, + }, + "db2": { + Start: 10, + NumRecords: 10, + }, + "db3": { + Start: 20, + NumRecords: 10, + }, + }, + "table2": { + "db1": { + Start: 30, + NumRecords: 10, + }, + "db2": { + Start: 40, + NumRecords: 10, + }, + "db3": { + Start: 50, + NumRecords: 10, + }, + }, + } + + var queries []chunk.IndexQuery + for name, dbs := range tables { + queries = append(queries, chunk.IndexQuery{TableName: name}) + testutil.SetupDBTablesAtPath(t, name, objectStoragePath, dbs) + } + + tableManager, _, stopFunc := buildTestTableManager(t, tempDir) + defer stopFunc() + + testutil.TestMultiTableQuery(t, queries, tableManager, 0, 60) +} + +func TestTableManager_cleanupCache(t *testing.T) { + tempDir, err := ioutil.TempDir("", "table-manager-cleanup-cache") + require.NoError(t, err) + + defer func() { + require.NoError(t, os.RemoveAll(tempDir)) + }() + + tableManager, _, stopFunc := buildTestTableManager(t, tempDir) + defer stopFunc() + + // one table that would expire and other one won't + expiredTableName := "expired-table" + nonExpiredTableName := "non-expired-table" + + // query for above 2 tables which should set them up in table manager + err = tableManager.QueryPages(context.Background(), []chunk.IndexQuery{ + {TableName: expiredTableName}, + {TableName: nonExpiredTableName}, + }, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { + return true + }) + + require.NoError(t, err) + // table manager should now have 2 tables. + require.Len(t, tableManager.tables, 2) + + // call cleanupCache and verify that no tables are cleaned up because they are not yet expired. + require.NoError(t, tableManager.cleanupCache()) + require.Len(t, tableManager.tables, 2) + + // change the last used at time of expiredTable to before the ttl. + expiredTable, ok := tableManager.tables[expiredTableName] + require.True(t, ok) + expiredTable.lastUsedAt = time.Now().Add(-(tableManager.cfg.CacheTTL + time.Minute)) + + // call the cleanupCache and verify that we still have nonExpiredTable and expiredTable is gone. + require.NoError(t, tableManager.cleanupCache()) + require.Len(t, tableManager.tables, 1) + + _, ok = tableManager.tables[expiredTableName] + require.False(t, ok) + + _, ok = tableManager.tables[nonExpiredTableName] + require.True(t, ok) +} diff --git a/pkg/storage/stores/shipper/downloads/table_test.go b/pkg/storage/stores/shipper/downloads/table_test.go new file mode 100644 index 0000000000000..4d37f55712242 --- /dev/null +++ b/pkg/storage/stores/shipper/downloads/table_test.go @@ -0,0 +1,196 @@ +package downloads + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/local" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" +) + +const ( + cacheDirName = "cache" + objectsStorageDirName = "objects" +) + +type stopFunc func() + +func buildTestClients(t *testing.T, path string) (*local.BoltIndexClient, *local.FSObjectClient) { + cachePath := filepath.Join(path, cacheDirName) + + boltDBIndexClient, err := local.NewBoltDBIndexClient(local.BoltDBConfig{Directory: cachePath}) + require.NoError(t, err) + + objectStoragePath := filepath.Join(path, objectsStorageDirName) + fsObjectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) + require.NoError(t, err) + + return boltDBIndexClient, fsObjectClient +} + +func buildTestTable(t *testing.T, path string) (*Table, *local.BoltIndexClient, stopFunc) { + boltDBIndexClient, fsObjectClient := buildTestClients(t, path) + cachePath := filepath.Join(path, cacheDirName) + + table := NewTable("test", cachePath, fsObjectClient, boltDBIndexClient, newMetrics(nil)) + + // wait for either table to get ready or a timeout hits + select { + case <-table.ready: + case <-time.Tick(2 * time.Second): + t.Fatal("failed to initialize table in time") + } + + // there should be no error in initialization of the table + require.NoError(t, table.Err()) + + return table, boltDBIndexClient, func() { + table.Close() + boltDBIndexClient.Stop() + } +} + +func TestTable_Query(t *testing.T) { + tempDir, err := ioutil.TempDir("", "table-writes") + require.NoError(t, err) + + defer func() { + require.NoError(t, os.RemoveAll(tempDir)) + }() + + objectStoragePath := filepath.Join(tempDir, objectsStorageDirName) + + testDBs := map[string]testutil.DBRecords{ + "db1": { + Start: 0, + NumRecords: 10, + }, + "db2": { + Start: 10, + NumRecords: 10, + }, + "db3": { + Start: 20, + NumRecords: 10, + }, + } + + testutil.SetupDBTablesAtPath(t, "test", objectStoragePath, testDBs) + + table, _, stopFunc := buildTestTable(t, tempDir) + defer func() { + stopFunc() + }() + + testutil.TestSingleQuery(t, chunk.IndexQuery{}, table, 0, 30) +} + +func TestTable_Sync(t *testing.T) { + tempDir, err := ioutil.TempDir("", "table-writes") + require.NoError(t, err) + + defer func() { + require.NoError(t, os.RemoveAll(tempDir)) + }() + + objectStoragePath := filepath.Join(tempDir, objectsStorageDirName) + tableName := "test" + tablePathInStorage := filepath.Join(objectStoragePath, tableName) + + // list of dbs to create except newDB that would be added later as part of updates + deleteDB := "delete" + noUpdatesDB := "no-updates" + updateDB := "update" + newDB := "new" + + testDBs := map[string]testutil.DBRecords{ + deleteDB: { + Start: 0, + NumRecords: 10, + }, + noUpdatesDB: { + Start: 10, + NumRecords: 10, + }, + updateDB: { + Start: 20, + NumRecords: 10, + }, + } + + // setup the table in storage with some records + testutil.SetupDBTablesAtPath(t, tableName, objectStoragePath, testDBs) + + // create table instance + table, boltdbClient, stopFunc := buildTestTable(t, tempDir) + defer func() { + stopFunc() + }() + + // query table to see it has expected records setup + testutil.TestSingleQuery(t, chunk.IndexQuery{}, table, 0, 30) + + // add a sleep since we are updating a file and CI is sometimes too fast to create a difference in mtime of files + time.Sleep(time.Second) + + // remove deleteDB, update updateDB and add the newDB + require.NoError(t, os.Remove(filepath.Join(tablePathInStorage, deleteDB))) + testutil.AddRecordsToDB(t, filepath.Join(tablePathInStorage, updateDB), boltdbClient, 30, 10) + testutil.AddRecordsToDB(t, filepath.Join(tablePathInStorage, newDB), boltdbClient, 40, 10) + + // sync the table + require.NoError(t, table.Sync(context.Background())) + + // query and verify table has expected records from new and updated db and the records from deleted db are gone + testutil.TestSingleQuery(t, chunk.IndexQuery{}, table, 10, 40) + + // verify files in cache where dbs for the table are synced to double check. + expectedFilesInDir := map[string]struct{}{ + updateDB: {}, + noUpdatesDB: {}, + newDB: {}, + } + filesInfo, err := ioutil.ReadDir(tablePathInStorage) + require.NoError(t, err) + require.Len(t, table.dbs, len(expectedFilesInDir)) + + for _, fileInfo := range filesInfo { + require.False(t, fileInfo.IsDir()) + _, ok := expectedFilesInDir[fileInfo.Name()] + require.True(t, ok) + } +} + +func TestTable_LastUsedAt(t *testing.T) { + tempDir, err := ioutil.TempDir("", "table-writes") + require.NoError(t, err) + + table, _, stopFunc := buildTestTable(t, tempDir) + defer func() { + stopFunc() + require.NoError(t, os.RemoveAll(tempDir)) + }() + + // a newly built table should have last used at close to now. + require.InDelta(t, time.Now().Unix(), table.LastUsedAt().Unix(), 1) + + // change the last used at to an hour before + table.lastUsedAt = time.Now().Add(-time.Hour) + require.InDelta(t, time.Now().Add(-time.Hour).Unix(), table.LastUsedAt().Unix(), 1) + + // query the table which should set the last used at to now. + err = table.Query(context.Background(), chunk.IndexQuery{}, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { + return true + }) + require.NoError(t, err) + + // check whether last used at got update to now. + require.InDelta(t, time.Now().Unix(), table.LastUsedAt().Unix(), 1) +} diff --git a/pkg/storage/stores/shipper/metrics.go b/pkg/storage/stores/shipper/metrics.go new file mode 100644 index 0000000000000..dfdf63726e0f6 --- /dev/null +++ b/pkg/storage/stores/shipper/metrics.go @@ -0,0 +1,23 @@ +package shipper + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/instrument" +) + +type metrics struct { + // duration in seconds spent in serving request on index managed by BoltDB Shipper + requestDurationSeconds *prometheus.HistogramVec +} + +func newMetrics(r prometheus.Registerer) *metrics { + return &metrics{ + requestDurationSeconds: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "loki_boltdb_shipper", + Name: "request_duration_seconds", + Help: "Time (in seconds) spent serving requests when using boltdb shipper", + Buckets: instrument.DefBuckets, + }, []string{"operation", "status_code"}), + } +} diff --git a/pkg/storage/stores/shipper/shipper_index_client.go b/pkg/storage/stores/shipper/shipper_index_client.go new file mode 100644 index 0000000000000..d6515ef79b80b --- /dev/null +++ b/pkg/storage/stores/shipper/shipper_index_client.go @@ -0,0 +1,218 @@ +package shipper + +import ( + "context" + "flag" + "fmt" + "io/ioutil" + "os" + "path" + "time" + + "github.com/weaveworks/common/instrument" + + "github.com/grafana/loki/pkg/storage/stores/util" + + "github.com/grafana/loki/pkg/storage/stores/shipper/downloads" + "github.com/grafana/loki/pkg/storage/stores/shipper/uploads" + + "github.com/cortexproject/cortex/pkg/chunk/local" + + "github.com/cortexproject/cortex/pkg/chunk" + chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" + pkg_util "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "go.etcd.io/bbolt" +) + +const ( + // ModeReadWrite is to allow both read and write + ModeReadWrite = iota + // ModeReadOnly is to allow only read operations + ModeReadOnly + // ModeWriteOnly is to allow only write operations + ModeWriteOnly + + // BoltDBShipperType holds the index type for using boltdb with shipper which keeps flushing them to a shared storage + BoltDBShipperType = "boltdb-shipper" + + // FilesystemObjectStoreType holds the periodic config type for the filesystem store + FilesystemObjectStoreType = "filesystem" + + storageKeyPrefix = "index/" + + // UploadInterval defines interval for uploading active boltdb files from local which are being written to by ingesters. + UploadInterval = 15 * time.Minute +) + +type boltDBIndexClient interface { + QueryDB(ctx context.Context, db *bbolt.DB, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error + NewWriteBatch() chunk.WriteBatch + WriteToDB(ctx context.Context, db *bbolt.DB, writes local.TableWrites) error + Stop() +} + +type Config struct { + ActiveIndexDirectory string `yaml:"active_index_directory"` + SharedStoreType string `yaml:"shared_store"` + CacheLocation string `yaml:"cache_location"` + CacheTTL time.Duration `yaml:"cache_ttl"` + ResyncInterval time.Duration `yaml:"resync_interval"` + IngesterName string `yaml:"-"` + Mode int `yaml:"-"` +} + +// RegisterFlags registers flags. +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.StringVar(&cfg.ActiveIndexDirectory, "boltdb.shipper.active-index-directory", "", "Directory where ingesters would write boltdb files which would then be uploaded by shipper to configured storage") + f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.shared-store", "", "Shared store for keeping boltdb files. Supported types: gcs, s3, azure, filesystem") + f.StringVar(&cfg.CacheLocation, "boltdb.shipper.cache-location", "", "Cache location for restoring boltDB files for queries") + f.DurationVar(&cfg.CacheTTL, "boltdb.shipper.cache-ttl", 24*time.Hour, "TTL for boltDB files restored in cache for queries") + f.DurationVar(&cfg.ResyncInterval, "boltdb.shipper.resync-interval", 5*time.Minute, "Resync downloaded files with the storage") +} + +type Shipper struct { + cfg Config + boltDBIndexClient boltDBIndexClient + uploadsManager *uploads.TableManager + downloadsManager *downloads.TableManager + + metrics *metrics +} + +// NewShipper creates a shipper for syncing local objects with a store +func NewShipper(cfg Config, storageClient chunk.ObjectClient, registerer prometheus.Registerer) (chunk.IndexClient, error) { + err := chunk_util.EnsureDirectory(cfg.CacheLocation) + if err != nil { + return nil, err + } + + shipper := Shipper{ + cfg: cfg, + metrics: newMetrics(registerer), + } + + err = shipper.init(storageClient, registerer) + if err != nil { + return nil, err + } + + level.Info(pkg_util.Logger).Log("msg", fmt.Sprintf("starting boltdb shipper in %d mode", cfg.Mode)) + + return &shipper, nil +} + +func (s *Shipper) init(storageClient chunk.ObjectClient, registerer prometheus.Registerer) error { + uploader, err := s.getUploaderName() + if err != nil { + return err + } + + s.boltDBIndexClient, err = local.NewBoltDBIndexClient(local.BoltDBConfig{Directory: s.cfg.ActiveIndexDirectory}) + if err != nil { + return err + } + + prefixedObjectClient := util.NewPrefixedObjectClient(storageClient, storageKeyPrefix) + + if s.cfg.Mode != ModeReadOnly { + cfg := uploads.Config{ + Uploader: uploader, + IndexDir: s.cfg.ActiveIndexDirectory, + UploadInterval: UploadInterval, + } + uploadsManager, err := uploads.NewTableManager(cfg, s.boltDBIndexClient, prefixedObjectClient, registerer) + if err != nil { + return err + } + + s.uploadsManager = uploadsManager + } + + if s.cfg.Mode != ModeWriteOnly { + cfg := downloads.Config{ + CacheDir: s.cfg.CacheLocation, + SyncInterval: s.cfg.ResyncInterval, + CacheTTL: s.cfg.CacheTTL, + } + downloadsManager, err := downloads.NewTableManager(cfg, s.boltDBIndexClient, prefixedObjectClient, registerer) + if err != nil { + return err + } + + s.downloadsManager = downloadsManager + } + + return nil +} + +// we would persist uploader name in /uploader/name file so that we use same name on subsequent restarts to +// avoid uploading same files again with different name. If the filed does not exist we would create one with uploader name set to +// ingester name and startup timestamp so that we randomise the name and do not override files from other ingesters. +func (s *Shipper) getUploaderName() (string, error) { + uploader := fmt.Sprintf("%s-%d", s.cfg.IngesterName, time.Now().UnixNano()) + + uploaderFilePath := path.Join(s.cfg.ActiveIndexDirectory, "uploader", "name") + if err := chunk_util.EnsureDirectory(path.Dir(uploaderFilePath)); err != nil { + return "", err + } + + _, err := os.Stat(uploaderFilePath) + if err != nil { + if !os.IsNotExist(err) { + return "", err + } + if err := ioutil.WriteFile(uploaderFilePath, []byte(uploader), 0666); err != nil { + return "", err + } + } else { + ub, err := ioutil.ReadFile(uploaderFilePath) + if err != nil { + return "", err + } + uploader = string(ub) + } + + return uploader, nil +} + +func (s *Shipper) Stop() { + if s.uploadsManager != nil { + s.uploadsManager.Stop() + } + + if s.downloadsManager != nil { + s.downloadsManager.Stop() + } + + s.boltDBIndexClient.Stop() +} + +func (s *Shipper) NewWriteBatch() chunk.WriteBatch { + return s.boltDBIndexClient.NewWriteBatch() +} + +func (s *Shipper) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error { + return s.uploadsManager.BatchWrite(ctx, batch) +} + +func (s *Shipper) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { + return instrument.CollectedRequest(ctx, "QUERY", instrument.NewHistogramCollector(s.metrics.requestDurationSeconds), instrument.ErrorCode, func(ctx context.Context) error { + if s.uploadsManager != nil { + err := s.uploadsManager.QueryPages(ctx, queries, callback) + if err != nil { + return err + } + } + + if s.downloadsManager != nil { + err := s.downloadsManager.QueryPages(ctx, queries, callback) + if err != nil { + return err + } + } + + return nil + }) +} diff --git a/pkg/storage/stores/local/boltdb_shipper_table_client.go b/pkg/storage/stores/shipper/table_client.go similarity index 99% rename from pkg/storage/stores/local/boltdb_shipper_table_client.go rename to pkg/storage/stores/shipper/table_client.go index fc1d9993dd787..ba0fdee8002c3 100644 --- a/pkg/storage/stores/local/boltdb_shipper_table_client.go +++ b/pkg/storage/stores/shipper/table_client.go @@ -1,4 +1,4 @@ -package local +package shipper import ( "context" diff --git a/pkg/storage/stores/local/boltdb_shipper_table_client_test.go b/pkg/storage/stores/shipper/table_client_test.go similarity index 99% rename from pkg/storage/stores/local/boltdb_shipper_table_client_test.go rename to pkg/storage/stores/shipper/table_client_test.go index 2e1310838f4c6..938b2c72db5f5 100644 --- a/pkg/storage/stores/local/boltdb_shipper_table_client_test.go +++ b/pkg/storage/stores/shipper/table_client_test.go @@ -1,4 +1,4 @@ -package local +package shipper import ( "bytes" diff --git a/pkg/storage/stores/shipper/testutil/testutil.go b/pkg/storage/stores/shipper/testutil/testutil.go new file mode 100644 index 0000000000000..19d155f7f058a --- /dev/null +++ b/pkg/storage/stores/shipper/testutil/testutil.go @@ -0,0 +1,148 @@ +package testutil + +import ( + "context" + "path/filepath" + "strconv" + "sync" + "testing" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/local" + chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" + "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" +) + +var boltBucketName = []byte("index") + +func AddRecordsToDB(t *testing.T, path string, dbClient *local.BoltIndexClient, start, numRecords int) { + db, err := local.OpenBoltdbFile(path) + require.NoError(t, err) + + batch := dbClient.NewWriteBatch() + AddRecordsToBatch(batch, "test", start, numRecords) + + require.NoError(t, dbClient.WriteToDB(context.Background(), db, batch.(*local.BoltWriteBatch).Writes["test"])) + + require.NoError(t, db.Sync()) + require.NoError(t, db.Close()) +} + +func AddRecordsToBatch(batch chunk.WriteBatch, tableName string, start, numRecords int) { + for i := 0; i < numRecords; i++ { + rec := []byte(strconv.Itoa(start + i)) + batch.Add(tableName, "", rec, rec) + } + + return +} + +type SingleTableQuerier interface { + Query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error +} + +func TestSingleQuery(t *testing.T, query chunk.IndexQuery, querier SingleTableQuerier, start, numRecords int) { + minValue := start + maxValue := start + numRecords + fetchedRecords := make(map[string]string) + + err := querier.Query(context.Background(), query, makeTestCallback(t, minValue, maxValue, fetchedRecords)) + + require.NoError(t, err) + require.Len(t, fetchedRecords, numRecords) +} + +type SingleDBQuerier interface { + QueryDB(ctx context.Context, db *bbolt.DB, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error +} + +func TestSingleDBQuery(t *testing.T, query chunk.IndexQuery, db *bbolt.DB, querier SingleDBQuerier, start, numRecords int) { + minValue := start + maxValue := start + numRecords + fetchedRecords := make(map[string]string) + + err := querier.QueryDB(context.Background(), db, query, makeTestCallback(t, minValue, maxValue, fetchedRecords)) + + require.NoError(t, err) + require.Len(t, fetchedRecords, numRecords) +} + +type MultiTableQuerier interface { + QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error +} + +func TestMultiTableQuery(t *testing.T, queries []chunk.IndexQuery, querier MultiTableQuerier, start, numRecords int) { + minValue := start + maxValue := start + numRecords + fetchedRecords := make(map[string]string) + + err := querier.QueryPages(context.Background(), queries, makeTestCallback(t, minValue, maxValue, fetchedRecords)) + + require.NoError(t, err) + require.Len(t, fetchedRecords, numRecords) +} + +func makeTestCallback(t *testing.T, minValue, maxValue int, records map[string]string) func(query chunk.IndexQuery, batch chunk.ReadBatch) (shouldContinue bool) { + recordsMtx := sync.Mutex{} + return func(query chunk.IndexQuery, batch chunk.ReadBatch) (shouldContinue bool) { + itr := batch.Iterator() + for itr.Next() { + require.Equal(t, itr.RangeValue(), itr.Value()) + rec, err := strconv.Atoi(string(itr.Value())) + + require.NoError(t, err) + require.GreaterOrEqual(t, rec, minValue) + require.LessOrEqual(t, rec, maxValue) + + recordsMtx.Lock() + records[string(itr.RangeValue())] = string(itr.Value()) + recordsMtx.Unlock() + } + return true + } +} + +func CompareDBs(t *testing.T, db1, db2 *bbolt.DB) { + db1Records := readDB(t, db1) + db2Records := readDB(t, db2) + + require.Equal(t, db1Records, db2Records) +} + +func readDB(t *testing.T, db *bbolt.DB) map[string]string { + dbRecords := map[string]string{} + + err := db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(boltBucketName) + require.NotNil(t, b) + + return b.ForEach(func(k, v []byte) error { + dbRecords[string(k)] = string(v) + return nil + }) + }) + + require.NoError(t, err) + return dbRecords +} + +type DBRecords struct { + Start, NumRecords int +} + +func SetupDBTablesAtPath(t *testing.T, tableName, path string, dbs map[string]DBRecords) string { + boltIndexClient, err := local.NewBoltDBIndexClient(local.BoltDBConfig{Directory: path}) + require.NoError(t, err) + + defer boltIndexClient.Stop() + + tablePath := filepath.Join(path, tableName) + require.NoError(t, chunk_util.EnsureDirectory(tablePath)) + + for name, dbRecords := range dbs { + AddRecordsToDB(t, filepath.Join(tablePath, name), boltIndexClient, dbRecords.Start, dbRecords.NumRecords) + } + + return tablePath +} diff --git a/pkg/storage/stores/shipper/uploads/metrics.go b/pkg/storage/stores/shipper/uploads/metrics.go new file mode 100644 index 0000000000000..09c4509b8d9d2 --- /dev/null +++ b/pkg/storage/stores/shipper/uploads/metrics.go @@ -0,0 +1,25 @@ +package uploads + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const ( + statusFailure = "failure" + statusSuccess = "success" +) + +type metrics struct { + tablesUploadOperationTotal *prometheus.CounterVec +} + +func newMetrics(r prometheus.Registerer) *metrics { + return &metrics{ + tablesUploadOperationTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki_boltdb_shipper", + Name: "tables_upload_operation_total", + Help: "Total number of upload operations done by status", + }, []string{"status"}), + } +} diff --git a/pkg/storage/stores/shipper/uploads/table.go b/pkg/storage/stores/shipper/uploads/table.go new file mode 100644 index 0000000000000..365707a64a8c7 --- /dev/null +++ b/pkg/storage/stores/shipper/uploads/table.go @@ -0,0 +1,331 @@ +package uploads + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "os" + "path" + "path/filepath" + "sync" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/local" + chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log/level" + "go.etcd.io/bbolt" +) + +const ( + // create a new db sharded by time based on when write request is received + shardDBsByDuration = 15 * time.Minute + + // retain dbs for specified duration after they are modified to avoid keeping them locally forever. + // this period should be big enough than shardDBsByDuration to avoid any conflicts + dbRetainPeriod = time.Hour +) + +type BoltDBIndexClient interface { + QueryDB(ctx context.Context, db *bbolt.DB, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error + WriteToDB(ctx context.Context, db *bbolt.DB, writes local.TableWrites) error +} + +type StorageClient interface { + PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error +} + +// Table is a collection of multiple dbs created for a same table by the ingester. +// All the public methods are concurrency safe and take care of mutexes to avoid any data race. +type Table struct { + name string + path string + uploader string + storageClient StorageClient + boltdbIndexClient BoltDBIndexClient + + dbs map[string]*bbolt.DB + dbsMtx sync.RWMutex + + uploadedDBsMtime map[string]time.Time + uploadedDBsMtimeMtx sync.RWMutex +} + +// NewTable create a new Table without looking for any existing local dbs belonging to the table. +func NewTable(path, uploader string, storageClient StorageClient, boltdbIndexClient BoltDBIndexClient) (*Table, error) { + err := chunk_util.EnsureDirectory(path) + if err != nil { + return nil, err + } + + return newTableWithDBs(map[string]*bbolt.DB{}, path, uploader, storageClient, boltdbIndexClient) +} + +// LoadTable loads local dbs belonging to the table and creates a new Table with references to dbs if there are any otherwise it doesn't create a table +func LoadTable(path, uploader string, storageClient StorageClient, boltdbIndexClient BoltDBIndexClient) (*Table, error) { + dbs, err := loadBoltDBsFromDir(path) + if err != nil { + return nil, err + } + + if len(dbs) == 0 { + return nil, nil + } + + return newTableWithDBs(dbs, path, uploader, storageClient, boltdbIndexClient) +} + +func newTableWithDBs(dbs map[string]*bbolt.DB, path, uploader string, storageClient StorageClient, boltdbIndexClient BoltDBIndexClient) (*Table, error) { + return &Table{ + name: filepath.Base(path), + path: path, + uploader: uploader, + storageClient: storageClient, + boltdbIndexClient: boltdbIndexClient, + dbs: dbs, + uploadedDBsMtime: map[string]time.Time{}, + }, nil +} + +// Query serves the index by querying all the open dbs. +func (lt *Table) Query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error { + lt.dbsMtx.RLock() + defer lt.dbsMtx.RUnlock() + + for _, db := range lt.dbs { + if err := lt.boltdbIndexClient.QueryDB(ctx, db, query, callback); err != nil { + return err + } + } + + return nil +} + +func (lt *Table) getOrAddDB(name string) (*bbolt.DB, error) { + lt.dbsMtx.Lock() + defer lt.dbsMtx.Unlock() + + var ( + db *bbolt.DB + err error + ok bool + ) + + db, ok = lt.dbs[name] + if !ok { + db, err = local.OpenBoltdbFile(filepath.Join(lt.path, name)) + if err != nil { + return nil, err + } + + lt.dbs[name] = db + return db, nil + } + + return db, nil +} + +// Write writes to a db locally with write time set to now. +func (lt *Table) Write(ctx context.Context, writes local.TableWrites) error { + return lt.write(ctx, time.Now(), writes) +} + +// write writes to a db locally. It shards the db files by truncating the passed time by shardDBsByDuration using https://golang.org/pkg/time/#Time.Truncate +// db files are named after the time shard i.e epoch of the truncated time. +// If a db file does not exist for a shard it gets created. +func (lt *Table) write(ctx context.Context, tm time.Time, writes local.TableWrites) error { + shard := fmt.Sprint(tm.Truncate(shardDBsByDuration).Unix()) + + db, err := lt.getOrAddDB(shard) + if err != nil { + return err + } + + return lt.boltdbIndexClient.WriteToDB(ctx, db, writes) +} + +// Stop closes all the open dbs. +func (lt *Table) Stop() { + lt.dbsMtx.Lock() + defer lt.dbsMtx.Unlock() + + for name, db := range lt.dbs { + if err := db.Close(); err != nil { + level.Error(util.Logger).Log("msg", fmt.Errorf("failed to close file %s for table %s", name, lt.name)) + } + } + + lt.dbs = map[string]*bbolt.DB{} +} + +// RemoveDB closes the db and removes the file locally. +func (lt *Table) RemoveDB(name string) error { + lt.dbsMtx.Lock() + defer lt.dbsMtx.Unlock() + + db, ok := lt.dbs[name] + if !ok { + return nil + } + + err := db.Close() + if err != nil { + return err + } + + delete(lt.dbs, name) + + return os.Remove(filepath.Join(lt.path, name)) +} + +// Upload uploads all the dbs which are never uploaded or have been modified since the last batch was uploaded. +func (lt *Table) Upload(ctx context.Context) error { + lt.dbsMtx.RLock() + defer lt.dbsMtx.RUnlock() + + level.Info(util.Logger).Log("msg", fmt.Sprintf("uploading table %s", lt.name)) + + for name, db := range lt.dbs { + stat, err := os.Stat(db.Path()) + if err != nil { + return err + } + + lt.uploadedDBsMtimeMtx.RLock() + uploadedDBMtime, ok := lt.uploadedDBsMtime[name] + lt.uploadedDBsMtimeMtx.RUnlock() + + if ok && !uploadedDBMtime.Before(stat.ModTime()) { + continue + } + + err = lt.uploadDB(ctx, name, db) + if err != nil { + return err + } + + lt.uploadedDBsMtimeMtx.Lock() + lt.uploadedDBsMtime[name] = stat.ModTime() + lt.uploadedDBsMtimeMtx.Unlock() + } + + level.Info(util.Logger).Log("msg", fmt.Sprintf("finished uploading table %s", lt.name)) + + return nil +} + +func (lt *Table) uploadDB(ctx context.Context, name string, db *bbolt.DB) error { + level.Debug(util.Logger).Log("msg", fmt.Sprintf("uploading db %s from table %s", name, lt.name)) + + filePath := path.Join(lt.path, fmt.Sprintf("%s.%s", name, "temp")) + f, err := os.Create(filePath) + if err != nil { + return err + } + + defer func() { + if err := f.Close(); err != nil { + level.Error(util.Logger).Log("msg", "failed to close temp file", "path", filePath, "err", err) + } + + if err := os.Remove(filePath); err != nil { + level.Error(util.Logger).Log("msg", "failed to remove temp file", "path", filePath, "err", err) + } + }() + + err = db.View(func(tx *bbolt.Tx) error { + _, err := tx.WriteTo(f) + return err + }) + if err != nil { + return err + } + + // flush the file to disk and seek the file to the beginning. + if err := f.Sync(); err != nil { + return err + } + + if _, err := f.Seek(0, 0); err != nil { + return err + } + + objectKey := lt.buildObjectKey(name) + return lt.storageClient.PutObject(ctx, objectKey, f) +} + +// Cleanup removes dbs which are already uploaded and have not been modified for period longer than dbRetainPeriod. +// This is to avoid keeping all the files forever in the ingesters. +func (lt *Table) Cleanup() error { + level.Info(util.Logger).Log("msg", fmt.Sprintf("cleaning up unwanted dbs from table %s", lt.name)) + + var filesToCleanup []string + cutoffTime := time.Now().Add(-dbRetainPeriod) + + lt.dbsMtx.RLock() + + for name, db := range lt.dbs { + stat, err := os.Stat(db.Path()) + if err != nil { + return err + } + + lt.uploadedDBsMtimeMtx.RLock() + uploadedDBMtime, ok := lt.uploadedDBsMtime[name] + lt.uploadedDBsMtimeMtx.RUnlock() + + // consider files which are already uploaded and have mod time before cutoff time to retain files. + if ok && !uploadedDBMtime.Before(stat.ModTime()) && stat.ModTime().Before(cutoffTime) { + filesToCleanup = append(filesToCleanup, name) + } + } + + lt.dbsMtx.RUnlock() + + for i := range filesToCleanup { + level.Debug(util.Logger).Log("msg", fmt.Sprintf("removing db %s from table %s", filesToCleanup[i], lt.name)) + + if err := lt.RemoveDB(filesToCleanup[i]); err != nil { + return err + } + } + + return nil +} + +func (lt *Table) buildObjectKey(dbName string) string { + // Files are stored with /- + objectKey := fmt.Sprintf("%s/%s-%s", lt.name, lt.uploader, dbName) + + // if the file is a migrated one then don't add its name to the object key otherwise we would re-upload them again here with a different name. + if lt.name == dbName { + objectKey = fmt.Sprintf("%s/%s", lt.name, lt.uploader) + } + + return objectKey +} + +func loadBoltDBsFromDir(dir string) (map[string]*bbolt.DB, error) { + dbs := map[string]*bbolt.DB{} + filesInfo, err := ioutil.ReadDir(dir) + if err != nil { + return nil, err + } + + for _, fileInfo := range filesInfo { + if fileInfo.IsDir() { + continue + } + + db, err := local.OpenBoltdbFile(filepath.Join(dir, fileInfo.Name())) + if err != nil { + return nil, err + } + + dbs[fileInfo.Name()] = db + } + + return dbs, nil +} diff --git a/pkg/storage/stores/shipper/uploads/table_manager.go b/pkg/storage/stores/shipper/uploads/table_manager.go new file mode 100644 index 0000000000000..749f02ca45617 --- /dev/null +++ b/pkg/storage/stores/shipper/uploads/table_manager.go @@ -0,0 +1,242 @@ +package uploads + +import ( + "context" + "errors" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "regexp" + "sync" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/local" + chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" + pkg_util "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" +) + +type Config struct { + Uploader string + IndexDir string + UploadInterval time.Duration +} + +type TableManager struct { + cfg Config + boltIndexClient BoltDBIndexClient + storageClient StorageClient + + metrics *metrics + tables map[string]*Table + tablesMtx sync.RWMutex + + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +func NewTableManager(cfg Config, boltIndexClient BoltDBIndexClient, storageClient StorageClient, registerer prometheus.Registerer) (*TableManager, error) { + ctx, cancel := context.WithCancel(context.Background()) + tm := TableManager{ + cfg: cfg, + boltIndexClient: boltIndexClient, + storageClient: storageClient, + metrics: newMetrics(registerer), + ctx: ctx, + cancel: cancel, + } + + tables, err := tm.loadTables() + if err != nil { + return nil, err + } + + tm.tables = tables + go tm.loop() + return &tm, nil +} + +func (tm *TableManager) loop() { + tm.wg.Add(1) + defer tm.wg.Done() + + syncTicker := time.NewTicker(tm.cfg.UploadInterval) + defer syncTicker.Stop() + + for { + select { + case <-syncTicker.C: + err := tm.uploadTables(context.Background()) + if err != nil { + level.Error(pkg_util.Logger).Log("msg", "error uploading local boltdb files to the storage", "err", err) + } + case <-tm.ctx.Done(): + return + } + } +} + +func (tm *TableManager) Stop() { + level.Info(pkg_util.Logger).Log("msg", "stopping table manager") + + tm.cancel() + tm.wg.Wait() + + err := tm.uploadTables(context.Background()) + if err != nil { + level.Error(pkg_util.Logger).Log("msg", "error uploading local boltdb files to the storage before stopping", "err", err) + } +} + +func (tm *TableManager) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error { + return chunk_util.DoParallelQueries(ctx, tm.query, queries, callback) +} + +func (tm *TableManager) query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error { + tm.tablesMtx.RLock() + defer tm.tablesMtx.RUnlock() + + table, ok := tm.tables[query.TableName] + if !ok { + return nil + } + + return table.Query(ctx, query, callback) +} + +func (tm *TableManager) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error { + boltWriteBatch, ok := batch.(*local.BoltWriteBatch) + if !ok { + return errors.New("invalid write batch") + } + + for tableName, tableWrites := range boltWriteBatch.Writes { + table, err := tm.getOrCreateTable(tableName) + if err != nil { + return err + } + + err = table.Write(ctx, tableWrites) + if err != nil { + return err + } + } + + return nil +} + +func (tm *TableManager) getOrCreateTable(tableName string) (*Table, error) { + tm.tablesMtx.RLock() + table, ok := tm.tables[tableName] + tm.tablesMtx.RUnlock() + + if !ok { + tm.tablesMtx.Lock() + defer tm.tablesMtx.Unlock() + + table, ok = tm.tables[tableName] + if !ok { + var err error + table, err = NewTable(filepath.Join(tm.cfg.IndexDir, tableName), tm.cfg.Uploader, tm.storageClient, tm.boltIndexClient) + if err != nil { + return nil, err + } + + tm.tables[tableName] = table + } + } + + return table, nil +} + +func (tm *TableManager) uploadTables(ctx context.Context) (err error) { + tm.tablesMtx.RLock() + defer tm.tablesMtx.RUnlock() + + level.Info(pkg_util.Logger).Log("msg", "uploading tables") + + defer func() { + status := statusSuccess + if err != nil { + status = statusFailure + } + tm.metrics.tablesUploadOperationTotal.WithLabelValues(status).Inc() + }() + + for _, table := range tm.tables { + err := table.Upload(ctx) + if err != nil { + return err + } + + // cleanup unwanted dbs from the table + err = table.Cleanup() + if err != nil { + // we do not want to stop uploading of dbs due to failures in cleaning them up so logging just the error here. + level.Error(pkg_util.Logger).Log("msg", "failed to cleanup uploaded dbs past their retention period", "table", table.name, "err", err) + } + } + + return +} + +func (tm *TableManager) loadTables() (map[string]*Table, error) { + localTables := make(map[string]*Table) + filesInfo, err := ioutil.ReadDir(tm.cfg.IndexDir) + if err != nil { + return nil, err + } + + // regex matching table name patters, i.e prefix+period_number + re, err := regexp.Compile(`.+[0-9]+$`) + if err != nil { + return nil, err + } + + for _, fileInfo := range filesInfo { + if !re.MatchString(fileInfo.Name()) { + continue + } + + // since we are moving to keeping files for same table in a folder, if current element is a file we need to move it inside a directory with the same name + // i.e file index_123 would be moved to path index_123/index_123. + if !fileInfo.IsDir() { + level.Info(pkg_util.Logger).Log("msg", fmt.Sprintf("found a legacy file %s, moving it to folder with same name", fileInfo.Name())) + filePath := filepath.Join(tm.cfg.IndexDir, fileInfo.Name()) + + // create a folder with .temp suffix since we can't create a directory with same name as file. + tempDirPath := filePath + ".temp" + if err := chunk_util.EnsureDirectory(tempDirPath); err != nil { + return nil, err + } + + // move the file to temp dir. + if err := os.Rename(filePath, filepath.Join(tempDirPath, fileInfo.Name())); err != nil { + return nil, err + } + + // rename the directory to name of the file + if err := os.Rename(tempDirPath, filePath); err != nil { + return nil, err + } + } + + level.Info(pkg_util.Logger).Log("msg", fmt.Sprintf("loading table %s", fileInfo.Name())) + table, err := LoadTable(filepath.Join(tm.cfg.IndexDir, fileInfo.Name()), tm.cfg.Uploader, tm.storageClient, tm.boltIndexClient) + if err != nil { + return nil, err + } + + if table == nil { + continue + } + + localTables[fileInfo.Name()] = table + } + + return localTables, nil +} diff --git a/pkg/storage/stores/shipper/uploads/table_manager_test.go b/pkg/storage/stores/shipper/uploads/table_manager_test.go new file mode 100644 index 0000000000000..3bfed54f48680 --- /dev/null +++ b/pkg/storage/stores/shipper/uploads/table_manager_test.go @@ -0,0 +1,177 @@ +package uploads + +import ( + "context" + "github.com/cortexproject/cortex/pkg/chunk/local" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" + "github.com/stretchr/testify/require" +) + +func buildTestTableManager(t *testing.T, testDir string) (*TableManager, *local.BoltIndexClient, stopFunc) { + defer func() { + require.NoError(t, os.RemoveAll(testDir)) + }() + + boltDBIndexClient, storageClient := buildTestClients(t, testDir) + indexPath := filepath.Join(testDir, indexDirName) + + cfg := Config{ + Uploader: "test-table-manager", + IndexDir: indexPath, + UploadInterval: time.Hour, + } + tm, err := NewTableManager(cfg, boltDBIndexClient, storageClient, nil) + require.NoError(t, err) + + return tm, boltDBIndexClient, func() { + tm.Stop() + boltDBIndexClient.Stop() + } +} + +func TestLoadTables(t *testing.T) { + testDir, err := ioutil.TempDir("", "cleanup") + require.NoError(t, err) + + defer func() { + require.NoError(t, os.RemoveAll(testDir)) + }() + + boltDBIndexClient, storageClient := buildTestClients(t, testDir) + indexPath := filepath.Join(testDir, indexDirName) + + defer func() { + boltDBIndexClient.Stop() + }() + + // add a legacy db which is outside of table specific folder + testutil.AddRecordsToDB(t, filepath.Join(indexPath, "table0"), boltDBIndexClient, 0, 10) + + // table1 with 2 dbs + testutil.SetupDBTablesAtPath(t, "table1", indexPath, map[string]testutil.DBRecords{ + "db1": testutil.DBRecords{ + Start: 10, + NumRecords: 10, + }, + "db2": testutil.DBRecords{ + Start: 20, + NumRecords: 10, + }, + }) + + // table2 with 2 dbs + testutil.SetupDBTablesAtPath(t, "table2", indexPath, map[string]testutil.DBRecords{ + "db1": testutil.DBRecords{ + Start: 30, + NumRecords: 10, + }, + "db2": testutil.DBRecords{ + Start: 40, + NumRecords: 10, + }, + }) + + expectedTables := map[string]struct { + start, numRecords int + }{ + "table0": {start: 0, numRecords: 10}, + "table1": {start: 10, numRecords: 20}, + "table2": {start: 30, numRecords: 20}, + } + + cfg := Config{ + Uploader: "test-table-manager", + IndexDir: indexPath, + UploadInterval: time.Hour, + } + + tm, err := NewTableManager(cfg, boltDBIndexClient, storageClient, nil) + require.NoError(t, err) + require.Len(t, tm.tables, len(expectedTables)) + + stat, err := os.Stat(filepath.Join(indexPath, "table0", "table0")) + require.NoError(t, err) + require.True(t, !stat.IsDir()) + + for tableName, expectedIndex := range expectedTables { + testutil.TestSingleQuery(t, chunk.IndexQuery{TableName: tableName}, tm.tables[tableName], expectedIndex.start, expectedIndex.numRecords) + } +} + +func TestTableManager_BatchWrite(t *testing.T) { + testDir, err := ioutil.TempDir("", "batch-write") + require.NoError(t, err) + + defer func() { + require.NoError(t, os.RemoveAll(testDir)) + }() + + tm, boltIndexClient, stopFunc := buildTestTableManager(t, testDir) + defer func() { + stopFunc() + }() + + tc := map[string]struct { + start, numRecords int + }{ + "table0": {start: 0, numRecords: 10}, + "table1": {start: 10, numRecords: 10}, + "table2": {start: 20, numRecords: 10}, + } + + writeBatch := boltIndexClient.NewWriteBatch() + for tableName, records := range tc { + testutil.AddRecordsToBatch(writeBatch, tableName, records.start, records.numRecords) + } + + require.NoError(t, tm.BatchWrite(context.Background(), writeBatch)) + + require.NoError(t, err) + require.Len(t, tm.tables, len(tc)) + + for tableName, expectedIndex := range tc { + testutil.TestSingleQuery(t, chunk.IndexQuery{TableName: tableName}, tm.tables[tableName], expectedIndex.start, expectedIndex.numRecords) + } +} + +func TestTableManager_QueryPages(t *testing.T) { + testDir, err := ioutil.TempDir("", "query-pages") + require.NoError(t, err) + + defer func() { + require.NoError(t, os.RemoveAll(testDir)) + }() + + tm, boltIndexClient, stopFunc := buildTestTableManager(t, testDir) + defer func() { + stopFunc() + }() + + tc := map[string]struct { + start, numRecords int + }{ + "table0": {start: 0, numRecords: 10}, + "table1": {start: 10, numRecords: 10}, + "table2": {start: 20, numRecords: 10}, + } + + var queries []chunk.IndexQuery + writeBatch := boltIndexClient.NewWriteBatch() + for tableName, records := range tc { + testutil.AddRecordsToBatch(writeBatch, tableName, records.start, records.numRecords) + queries = append(queries, chunk.IndexQuery{TableName: tableName}) + } + + queries = append(queries, chunk.IndexQuery{TableName: "non-existent"}) + + require.NoError(t, tm.BatchWrite(context.Background(), writeBatch)) + + testutil.TestMultiTableQuery(t, queries, tm, 0, 30) +} diff --git a/pkg/storage/stores/shipper/uploads/table_test.go b/pkg/storage/stores/shipper/uploads/table_test.go new file mode 100644 index 0000000000000..9a527a44e79db --- /dev/null +++ b/pkg/storage/stores/shipper/uploads/table_test.go @@ -0,0 +1,268 @@ +package uploads + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/local" + "github.com/stretchr/testify/require" +) + +const ( + indexDirName = "index" + objectsStorageDirName = "objects" +) + +func buildTestClients(t *testing.T, path string) (*local.BoltIndexClient, *local.FSObjectClient) { + indexPath := filepath.Join(path, indexDirName) + + boltDBIndexClient, err := local.NewBoltDBIndexClient(local.BoltDBConfig{Directory: indexPath}) + require.NoError(t, err) + + objectStoragePath := filepath.Join(path, objectsStorageDirName) + fsObjectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) + require.NoError(t, err) + + return boltDBIndexClient, fsObjectClient +} + +type stopFunc func() + +func buildTestTable(t *testing.T, path string) (*Table, *local.BoltIndexClient, stopFunc) { + boltDBIndexClient, fsObjectClient := buildTestClients(t, path) + indexPath := filepath.Join(path, indexDirName) + + table, err := NewTable(indexPath, "test", fsObjectClient, boltDBIndexClient) + require.NoError(t, err) + + return table, boltDBIndexClient, func() { + table.Stop() + boltDBIndexClient.Stop() + } +} + +func TestLoadTable(t *testing.T) { + indexPath, err := ioutil.TempDir("", "table-load") + require.NoError(t, err) + + defer func() { + require.NoError(t, os.RemoveAll(indexPath)) + }() + + boltDBIndexClient, err := local.NewBoltDBIndexClient(local.BoltDBConfig{Directory: indexPath}) + require.NoError(t, err) + + defer func() { + boltDBIndexClient.Stop() + }() + + // setup some dbs for a table at a path. + tablePath := testutil.SetupDBTablesAtPath(t, "test-table", indexPath, map[string]testutil.DBRecords{ + "db1": { + Start: 0, + NumRecords: 10, + }, + "db2": { + Start: 10, + NumRecords: 10, + }, + }) + + // try loading the table. + table, err := LoadTable(tablePath, "test", nil, boltDBIndexClient) + require.NoError(t, err) + require.NotNil(t, table) + + defer func() { + table.Stop() + }() + + // query the loaded table to see if it has right data. + testutil.TestSingleQuery(t, chunk.IndexQuery{}, table, 0, 20) +} + +func TestTable_Write(t *testing.T) { + tempDir, err := ioutil.TempDir("", "table-writes") + require.NoError(t, err) + + table, boltIndexClient, stopFunc := buildTestTable(t, tempDir) + + defer func() { + stopFunc() + require.NoError(t, os.RemoveAll(tempDir)) + }() + + now := time.Now() + // a couple of times for which we want to do writes to make the table create different shards + writeTimes := []time.Time{now, now.Add(-(shardDBsByDuration + 5*time.Minute)), now.Add(-(shardDBsByDuration*3 + 3*time.Minute))} + + numFiles := 0 + + // performing writes and checking whether the index gets written to right shard + for i, tm := range writeTimes { + t.Run(fmt.Sprint(i), func(t *testing.T) { + batch := boltIndexClient.NewWriteBatch() + testutil.AddRecordsToBatch(batch, "test", i*10, 10) + require.NoError(t, table.write(context.Background(), tm, batch.(*local.BoltWriteBatch).Writes["test"])) + + numFiles++ + require.Equal(t, numFiles, len(table.dbs)) + + expectedDBName := fmt.Sprint(tm.Truncate(shardDBsByDuration).Unix()) + db, ok := table.dbs[expectedDBName] + require.True(t, ok) + + // test that the table has current + previous records + testutil.TestSingleQuery(t, chunk.IndexQuery{}, table, 0, (i+1)*10) + testutil.TestSingleDBQuery(t, chunk.IndexQuery{}, db, boltIndexClient, i*10, 10) + }) + } +} + +func TestTable_Upload(t *testing.T) { + tempDir, err := ioutil.TempDir("", "table-writes") + require.NoError(t, err) + + table, boltIndexClient, stopFunc := buildTestTable(t, tempDir) + require.NoError(t, err) + + defer func() { + stopFunc() + require.NoError(t, os.RemoveAll(tempDir)) + }() + + now := time.Now() + + // write a batch for now + batch := boltIndexClient.NewWriteBatch() + testutil.AddRecordsToBatch(batch, "test", 0, 10) + require.NoError(t, table.write(context.Background(), now, batch.(*local.BoltWriteBatch).Writes["test"])) + + // upload the table + require.NoError(t, table.Upload(context.Background())) + require.Len(t, table.dbs, 1) + + // compare the local dbs for the table with the dbs in remote storage after upload to ensure they have same data + objectStorageDir := filepath.Join(tempDir, objectsStorageDirName) + compareTableWithStorage(t, table, objectStorageDir) + + // add a sleep since we are updating a file and CI is sometimes too fast to create a difference in mtime of files + time.Sleep(time.Second) + + // write another batch to same shard + batch = boltIndexClient.NewWriteBatch() + testutil.AddRecordsToBatch(batch, "test", 10, 10) + require.NoError(t, table.write(context.Background(), now, batch.(*local.BoltWriteBatch).Writes["test"])) + + // write a batch to another shard + batch = boltIndexClient.NewWriteBatch() + testutil.AddRecordsToBatch(batch, "test", 20, 10) + require.NoError(t, table.write(context.Background(), now.Add(shardDBsByDuration), batch.(*local.BoltWriteBatch).Writes["test"])) + + // upload the dbs to storage + require.NoError(t, table.Upload(context.Background())) + require.Len(t, table.dbs, 2) + + // check local dbs with remote dbs to ensure they have same data + compareTableWithStorage(t, table, objectStorageDir) +} + +func compareTableWithStorage(t *testing.T, table *Table, storageDir string) { + for name, db := range table.dbs { + objectKey := table.buildObjectKey(name) + storageDB, err := local.OpenBoltdbFile(filepath.Join(storageDir, objectKey)) + require.NoError(t, err) + + testutil.CompareDBs(t, db, storageDB) + require.NoError(t, storageDB.Close()) + } +} + +func TestTable_Cleanup(t *testing.T) { + testDir, err := ioutil.TempDir("", "cleanup") + require.NoError(t, err) + + defer func() { + require.NoError(t, os.RemoveAll(testDir)) + }() + + boltDBIndexClient, storageClient := buildTestClients(t, testDir) + indexPath := filepath.Join(testDir, indexDirName) + + defer func() { + boltDBIndexClient.Stop() + }() + + // dbs for various scenarios to test + outsideRetention := filepath.Join(indexPath, "outside-retention") + outsideRetentionButModified := filepath.Join(indexPath, "outside-retention-but-mod") + outsideRetentionButNeverUploaded := filepath.Join(indexPath, "outside-retention-but-no-uploaded") + inRention := filepath.Join(indexPath, "in-retention") + + // build all the test dbs except for outsideRetentionButNeverUploaded + testutil.AddRecordsToDB(t, outsideRetention, boltDBIndexClient, 0, 10) + testutil.AddRecordsToDB(t, outsideRetentionButModified, boltDBIndexClient, 10, 10) + testutil.AddRecordsToDB(t, inRention, boltDBIndexClient, 20, 10) + + // change the mtimes of dbs that should be outside retention + require.NoError(t, os.Chtimes(outsideRetention, time.Now().Add(-2*dbRetainPeriod), time.Now().Add(-2*dbRetainPeriod))) + require.NoError(t, os.Chtimes(outsideRetentionButModified, time.Now().Add(-2*dbRetainPeriod), time.Now().Add(-2*dbRetainPeriod))) + + // load existing dbs + table, err := LoadTable(indexPath, "test", storageClient, boltDBIndexClient) + require.NoError(t, err) + require.Len(t, table.dbs, 3) + + defer func() { + table.Stop() + }() + + // upload all the existing dbs + require.NoError(t, table.Upload(context.Background())) + require.Len(t, table.uploadedDBsMtime, 3) + + // change the mtime of outsideRetentionButModified db after the upload + require.NoError(t, os.Chtimes(outsideRetentionButModified, time.Now().Add(-dbRetainPeriod), time.Now().Add(-dbRetainPeriod))) + + // build and add the outsideRetentionButNeverUploaded db + testutil.AddRecordsToDB(t, outsideRetentionButNeverUploaded, boltDBIndexClient, 30, 10) + require.NoError(t, os.Chtimes(outsideRetentionButNeverUploaded, time.Now().Add(-2*dbRetainPeriod), time.Now().Add(-2*dbRetainPeriod))) + _, err = table.getOrAddDB(filepath.Base(outsideRetentionButNeverUploaded)) + require.NoError(t, err) + + // there must be 4 dbs now in the table + require.Len(t, table.dbs, 4) + + // cleanup the dbs + require.NoError(t, table.Cleanup()) + + // there must be 3 dbs now, it should have cleaned up only outsideRetention + require.Len(t, table.dbs, 3) + + expectedDBs := []string{ + outsideRetentionButModified, + outsideRetentionButNeverUploaded, + inRention, + } + + // verify open dbs with the table and actual db files in the index directory + filesInfo, err := ioutil.ReadDir(indexPath) + require.NoError(t, err) + require.Len(t, filesInfo, len(expectedDBs)) + + for _, expectedDB := range expectedDBs { + _, ok := table.dbs[filepath.Base(expectedDB)] + require.True(t, ok) + + _, err := os.Stat(expectedDB) + require.NoError(t, err) + } +}