Skip to content

Commit

Permalink
Compatible for disk quota (pingcap#543)
Browse files Browse the repository at this point in the history
* compatible for disk quota

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* unlock when err occur

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* fix delete

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* fix method name

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

Co-authored-by: kennytm <kennytm@gmail.com>
Co-authored-by: glorv <glorvs@163.com>
  • Loading branch information
3 people authored Jan 7, 2021
1 parent a401ad0 commit 0529577
Showing 1 changed file with 102 additions and 14 deletions.
116 changes: 102 additions & 14 deletions lightning/backend/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,24 @@ type localFileMeta struct {
TotalSize int64 `json:"total_size"`
}

type importMutexState int32

const (
importMutexStateNoLock importMutexState = iota
importMutexStateImport
importMutexStateFlush
importMutexStateClose
)

type LocalFile struct {
localFileMeta
db *pebble.DB
Uuid uuid.UUID

// isImportingAtomic is an atomic variable indicating whether the importMutex has been locked.
// This should not be used as a "spin lock" indicator.
isImportingAtomic int32
mutex sync.Mutex
}

func (e *LocalFile) Close() error {
Expand Down Expand Up @@ -154,6 +168,17 @@ func (e *LocalFile) getSizeProperties() (*sizeProperties, error) {
return sizeProps, nil
}

// lock locks the local file for importing.
func (e *LocalFile) lock(state importMutexState) {
e.mutex.Lock()
atomic.StoreInt32(&e.isImportingAtomic, int32(state))
}

func (e *LocalFile) unlock() {
atomic.StoreInt32(&e.isImportingAtomic, int32(importMutexStateNoLock))
e.mutex.Unlock()
}

type gRPCConns struct {
mu sync.Mutex
conns map[uint64]*connPool
Expand Down Expand Up @@ -305,6 +330,26 @@ func NewLocalBackend(
return MakeBackend(local), nil
}

// lock locks the local file.
func (local *local) lockEngine(engineId uuid.UUID, state importMutexState) bool {
if e, ok := local.engines.Load(engineId); ok {
engine := e.(*LocalFile)
engine.lock(state)
return true
}
return false
}

// unlock unlocks the local file from importing.
func (local *local) unlockEngine(engineId uuid.UUID) bool {
if e, ok := local.engines.Load(engineId); ok {
engine := e.(*LocalFile)
engine.unlock()
return true
}
return false
}

func (local *local) makeConn(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) {
store, err := local.splitCli.GetStore(ctx, storeID)
if err != nil {
Expand Down Expand Up @@ -438,6 +483,7 @@ func (local *local) LoadEngineMeta(engineUUID uuid.UUID) (localFileMeta, error)
return meta, err
}

// This method must be called with holding mutex of LocalFile
func (local *local) OpenEngine(ctx context.Context, engineUUID uuid.UUID) error {
meta, err := local.LoadEngineMeta(engineUUID)
if err != nil {
Expand All @@ -447,7 +493,13 @@ func (local *local) OpenEngine(ctx context.Context, engineUUID uuid.UUID) error
if err != nil {
return err
}
local.engines.Store(engineUUID, &LocalFile{localFileMeta: meta, db: db, Uuid: engineUUID})
if e, ok := local.engines.Load(engineUUID); ok {
engine := e.(*LocalFile)
engine.db = db
engine.localFileMeta = meta
} else {
local.engines.Store(engineUUID, &LocalFile{localFileMeta: meta, db: db, Uuid: engineUUID, isImportingAtomic: int32(importMutexStateNoLock)})
}
return nil
}

Expand All @@ -473,9 +525,10 @@ func (local *local) CloseEngine(ctx context.Context, engineUUID uuid.UUID) error
return err
}
engineFile := &LocalFile{
localFileMeta: meta,
Uuid: engineUUID,
db: db,
localFileMeta: meta,
Uuid: engineUUID,
db: db,
isImportingAtomic: int32(importMutexStateNoLock),
}
local.engines.Store(engineUUID, engineFile)
return nil
Expand Down Expand Up @@ -1162,6 +1215,34 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID) erro
return nil
}

func (local *local) ResetEngine(ctx context.Context, engineUUID uuid.UUID) error {
// the only way to reset the engine + reclaim the space is to delete and reopen it 🤷
engineFile, ok := local.engines.Load(engineUUID)
if !ok {
log.L().Warn("could not find engine in cleanupEngine", zap.Stringer("uuid", engineUUID))
return nil
}
localEngine := engineFile.(*LocalFile)
localEngine.lock(importMutexStateClose)
defer localEngine.unlock()
if err := localEngine.Close(); err != nil {
return err
}
if err := localEngine.Cleanup(local.localStoreDir); err != nil {
return err
}
meta, err := local.LoadEngineMeta(engineUUID)
if err != nil {
meta = localFileMeta{}
}
db, err := local.openEngineDB(engineUUID, false)
if err == nil {
localEngine.db = db
localEngine.localFileMeta = meta
}
return err
}

func (local *local) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error {
// release this engine after import success
engineFile, ok := local.engines.Load(engineUUID)
Expand Down Expand Up @@ -1599,7 +1680,6 @@ func (w *LocalWriter) writeRowsLoop() {
totalCount += int64(len(kvs))
}

atomic.AddInt64(&w.local.Length, totalCount)
if batchSize > 0 {
if err := w.flushKVs(); err != nil {
w.writeErr.Set(err)
Expand All @@ -1608,18 +1688,23 @@ func (w *LocalWriter) writeRowsLoop() {
totalSize += batchSize
log.L().Info("write data by sort index", zap.Int64("bytes", totalSize))
}
w.local.lock(importMutexStateNoLock)
if writer != nil {
if err := writer.Close(); err != nil {
w.writeErr.Set(err)
return
}
if err := w.local.db.Ingest([]string{filePath}); err != nil {
w.writeErr.Set(err)
return
err := writer.Close()
if err == nil {
err = w.local.db.Ingest([]string{filePath})
// The following two variable should be changed with holding mutex,
// because there may be another thread change localFileMeta object. See it in `local::OpenEngine`
atomic.AddInt64(&w.local.TotalSize, totalSize)
atomic.AddInt64(&w.local.Length, totalCount)
}
w.writeErr.Set(err)
log.L().Info("write data by sst writer", zap.Int64("bytes", totalSize))
} else {
atomic.AddInt64(&w.local.TotalSize, totalSize)
atomic.AddInt64(&w.local.Length, totalCount)
}
atomic.AddInt64(&w.local.TotalSize, totalSize)
w.local.unlock()
}

func (w *LocalWriter) flushKVs() error {
Expand All @@ -1639,7 +1724,10 @@ func (w *LocalWriter) flushKVs() error {
return err
}
w.writeBatch = w.writeBatch[:0]
return w.local.db.Ingest([]string{filePath})
w.local.lock(importMutexStateNoLock)
err = w.local.db.Ingest([]string{filePath})
w.local.unlock()
return err
}

func (w *LocalWriter) createWriter() (*sstable.Writer, string, error) {
Expand Down

0 comments on commit 0529577

Please sign in to comment.