diff --git a/lightning/backend/local.go b/lightning/backend/local.go index 1819a9491..2369b3d27 100644 --- a/lightning/backend/local.go +++ b/lightning/backend/local.go @@ -106,10 +106,24 @@ type localFileMeta struct { TotalSize int64 `json:"total_size"` } +type importMutexState int32 + +const ( + importMutexStateNoLock importMutexState = iota + importMutexStateImport + importMutexStateFlush + importMutexStateClose +) + type LocalFile struct { localFileMeta db *pebble.DB Uuid uuid.UUID + + // isImportingAtomic is an atomic variable indicating whether the importMutex has been locked. + // This should not be used as a "spin lock" indicator. + isImportingAtomic int32 + mutex sync.Mutex } func (e *LocalFile) Close() error { @@ -154,6 +168,17 @@ func (e *LocalFile) getSizeProperties() (*sizeProperties, error) { return sizeProps, nil } +// lock locks the local file for importing. +func (e *LocalFile) lock(state importMutexState) { + e.mutex.Lock() + atomic.StoreInt32(&e.isImportingAtomic, int32(state)) +} + +func (e *LocalFile) unlock() { + atomic.StoreInt32(&e.isImportingAtomic, int32(importMutexStateNoLock)) + e.mutex.Unlock() +} + type gRPCConns struct { mu sync.Mutex conns map[uint64]*connPool @@ -305,6 +330,26 @@ func NewLocalBackend( return MakeBackend(local), nil } +// lock locks the local file. +func (local *local) lockEngine(engineId uuid.UUID, state importMutexState) bool { + if e, ok := local.engines.Load(engineId); ok { + engine := e.(*LocalFile) + engine.lock(state) + return true + } + return false +} + +// unlock unlocks the local file from importing. +func (local *local) unlockEngine(engineId uuid.UUID) bool { + if e, ok := local.engines.Load(engineId); ok { + engine := e.(*LocalFile) + engine.unlock() + return true + } + return false +} + func (local *local) makeConn(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) { store, err := local.splitCli.GetStore(ctx, storeID) if err != nil { @@ -438,6 +483,7 @@ func (local *local) LoadEngineMeta(engineUUID uuid.UUID) (localFileMeta, error) return meta, err } +// This method must be called with holding mutex of LocalFile func (local *local) OpenEngine(ctx context.Context, engineUUID uuid.UUID) error { meta, err := local.LoadEngineMeta(engineUUID) if err != nil { @@ -447,7 +493,13 @@ func (local *local) OpenEngine(ctx context.Context, engineUUID uuid.UUID) error if err != nil { return err } - local.engines.Store(engineUUID, &LocalFile{localFileMeta: meta, db: db, Uuid: engineUUID}) + if e, ok := local.engines.Load(engineUUID); ok { + engine := e.(*LocalFile) + engine.db = db + engine.localFileMeta = meta + } else { + local.engines.Store(engineUUID, &LocalFile{localFileMeta: meta, db: db, Uuid: engineUUID, isImportingAtomic: int32(importMutexStateNoLock)}) + } return nil } @@ -473,9 +525,10 @@ func (local *local) CloseEngine(ctx context.Context, engineUUID uuid.UUID) error return err } engineFile := &LocalFile{ - localFileMeta: meta, - Uuid: engineUUID, - db: db, + localFileMeta: meta, + Uuid: engineUUID, + db: db, + isImportingAtomic: int32(importMutexStateNoLock), } local.engines.Store(engineUUID, engineFile) return nil @@ -1162,6 +1215,34 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID) erro return nil } +func (local *local) ResetEngine(ctx context.Context, engineUUID uuid.UUID) error { + // the only way to reset the engine + reclaim the space is to delete and reopen it 🤷 + engineFile, ok := local.engines.Load(engineUUID) + if !ok { + log.L().Warn("could not find engine in cleanupEngine", zap.Stringer("uuid", engineUUID)) + return nil + } + localEngine := engineFile.(*LocalFile) + localEngine.lock(importMutexStateClose) + defer localEngine.unlock() + if err := localEngine.Close(); err != nil { + return err + } + if err := localEngine.Cleanup(local.localStoreDir); err != nil { + return err + } + meta, err := local.LoadEngineMeta(engineUUID) + if err != nil { + meta = localFileMeta{} + } + db, err := local.openEngineDB(engineUUID, false) + if err == nil { + localEngine.db = db + localEngine.localFileMeta = meta + } + return err +} + func (local *local) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error { // release this engine after import success engineFile, ok := local.engines.Load(engineUUID) @@ -1599,7 +1680,6 @@ func (w *LocalWriter) writeRowsLoop() { totalCount += int64(len(kvs)) } - atomic.AddInt64(&w.local.Length, totalCount) if batchSize > 0 { if err := w.flushKVs(); err != nil { w.writeErr.Set(err) @@ -1608,18 +1688,23 @@ func (w *LocalWriter) writeRowsLoop() { totalSize += batchSize log.L().Info("write data by sort index", zap.Int64("bytes", totalSize)) } + w.local.lock(importMutexStateNoLock) if writer != nil { - if err := writer.Close(); err != nil { - w.writeErr.Set(err) - return - } - if err := w.local.db.Ingest([]string{filePath}); err != nil { - w.writeErr.Set(err) - return + err := writer.Close() + if err == nil { + err = w.local.db.Ingest([]string{filePath}) + // The following two variable should be changed with holding mutex, + // because there may be another thread change localFileMeta object. See it in `local::OpenEngine` + atomic.AddInt64(&w.local.TotalSize, totalSize) + atomic.AddInt64(&w.local.Length, totalCount) } + w.writeErr.Set(err) log.L().Info("write data by sst writer", zap.Int64("bytes", totalSize)) + } else { + atomic.AddInt64(&w.local.TotalSize, totalSize) + atomic.AddInt64(&w.local.Length, totalCount) } - atomic.AddInt64(&w.local.TotalSize, totalSize) + w.local.unlock() } func (w *LocalWriter) flushKVs() error { @@ -1639,7 +1724,10 @@ func (w *LocalWriter) flushKVs() error { return err } w.writeBatch = w.writeBatch[:0] - return w.local.db.Ingest([]string{filePath}) + w.local.lock(importMutexStateNoLock) + err = w.local.db.Ingest([]string{filePath}) + w.local.unlock() + return err } func (w *LocalWriter) createWriter() (*sstable.Writer, string, error) {