From 23afa3478678553e74f4c6b6b8e3d511eba9fca8 Mon Sep 17 00:00:00 2001 From: tangenta Date: Fri, 3 Feb 2023 13:58:50 +0800 Subject: [PATCH] ddl/ingest: add mutex to disk root --- ddl/ingest/disk_root.go | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/ddl/ingest/disk_root.go b/ddl/ingest/disk_root.go index 445115333edd1..d71dc65433a62 100644 --- a/ddl/ingest/disk_root.go +++ b/ddl/ingest/disk_root.go @@ -15,6 +15,8 @@ package ingest import ( + "sync" + lcom "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/logutil" @@ -37,6 +39,7 @@ type diskRootImpl struct { currentUsage uint64 maxQuota uint64 bcCtx *backendCtxManager + mu sync.RWMutex } // NewDiskRootImpl creates a new DiskRoot. @@ -49,22 +52,32 @@ func NewDiskRootImpl(path string, bcCtx *backendCtxManager) DiskRoot { // CurrentUsage implements DiskRoot interface. func (d *diskRootImpl) CurrentUsage() uint64 { - return d.currentUsage + d.mu.RUnlock() + usage := d.currentUsage + d.mu.RUnlock() + return usage } // MaxQuota implements DiskRoot interface. func (d *diskRootImpl) MaxQuota() uint64 { - return d.maxQuota + d.mu.RUnlock() + quota := d.maxQuota + d.mu.RUnlock() + return quota } // UpdateUsageAndQuota implements DiskRoot interface. func (d *diskRootImpl) UpdateUsageAndQuota() error { - d.currentUsage = d.bcCtx.TotalDiskUsage() + totalDiskUsage := d.bcCtx.TotalDiskUsage() sz, err := lcom.GetStorageSize(d.path) if err != nil { logutil.BgLogger().Error(LitErrGetStorageQuota, zap.Error(err)) return err } - d.maxQuota = mathutil.Min(variable.DDLDiskQuota.Load(), uint64(capacityThreshold*float64(sz.Capacity))) + maxQuota := mathutil.Min(variable.DDLDiskQuota.Load(), uint64(capacityThreshold*float64(sz.Capacity))) + d.mu.Lock() + d.currentUsage = totalDiskUsage + d.maxQuota = maxQuota + d.mu.Unlock() return nil }