Skip to content

Commit

Permalink
ddl: close lightning writers after the import is complete (#39879) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 14, 2022
1 parent c23cd80 commit 5cdc753
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 5 deletions.
14 changes: 13 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,13 @@ func openDuplicateDB(storeDir string) (*pebble.DB, error) {
return pebble.Open(dbPath, opts)
}

var (
// RunInTest indicates whether the current process is running in test.
RunInTest bool
// LastAlloc is the last ID allocator.
LastAlloc manual.Allocator
)

// NewLocalBackend creates new connections to tikv.
func NewLocalBackend(
ctx context.Context,
Expand Down Expand Up @@ -461,6 +468,11 @@ func NewLocalBackend(
} else {
writeLimiter = noopStoreWriteLimiter{}
}
alloc := manual.Allocator{}
if RunInTest {
alloc.RefCnt = new(atomic.Int64)
LastAlloc = alloc
}
local := &local{
engines: sync.Map{},
pdCtl: pdCtl,
Expand All @@ -486,7 +498,7 @@ func NewLocalBackend(
keyAdapter: keyAdapter,
errorMgr: errorMgr,
importClientFactory: importClientFactory,
bufferPool: membuf.NewPool(membuf.WithAllocator(manual.Allocator{})),
bufferPool: membuf.NewPool(membuf.WithAllocator(alloc)),
writeLimiter: writeLimiter,
logger: log.FromContext(ctx),
encBuilder: NewEncodingBuilder(ctx),
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/manual/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ go_library(
cgo = True,
importpath = "github.com/pingcap/tidb/br/pkg/lightning/manual",
visibility = ["//visibility:public"],
deps = ["@org_uber_go_atomic//:atomic"],
)
31 changes: 28 additions & 3 deletions br/pkg/lightning/manual/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,33 @@

package manual

type Allocator struct{}
import (
"fmt"

func (Allocator) Alloc(n int) []byte { return New(n) }
"go.uber.org/atomic"
)

func (Allocator) Free(b []byte) { Free(b) }
type Allocator struct {
RefCnt *atomic.Int64
}

func (a Allocator) Alloc(n int) []byte {
if a.RefCnt != nil {
a.RefCnt.Add(1)
}
return New(n)
}

func (a Allocator) Free(b []byte) {
if a.RefCnt != nil {
a.RefCnt.Add(-1)
}
Free(b)
}

func (a Allocator) CheckRefCnt() error {
if a.RefCnt != nil && a.RefCnt.Load() != 0 {
return fmt.Errorf("memory leak detected, refCnt: %d", a.RefCnt.Load())
}
return nil
}
29 changes: 28 additions & 1 deletion ddl/ingest/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ func (ei *engineInfo) Clean() {
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
}
ei.openedEngine = nil
err = ei.closeWriters()
if err != nil {
logutil.BgLogger().Error(LitErrCloseWriterErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
}
// Here the local intermediate files will be removed.
err = closedEngine.Cleanup(ei.ctx)
if err != nil {
Expand All @@ -102,8 +107,14 @@ func (ei *engineInfo) ImportAndClean() error {
return errors.New(LitErrCloseEngineErr)
}
ei.openedEngine = nil
err := ei.closeWriters()
if err != nil {
logutil.BgLogger().Error(LitErrCloseWriterErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return err
}

err := ei.diskRoot.UpdateUsageAndQuota()
err = ei.diskRoot.UpdateUsageAndQuota()
if err != nil {
logutil.BgLogger().Error(LitErrUpdateDiskStats, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
Expand Down Expand Up @@ -182,6 +193,22 @@ func (ei *engineInfo) newWriterContext(workerID int) (*WriterContext, error) {
}, nil
}

func (ei *engineInfo) closeWriters() error {
var firstErr error
for wid := range ei.writerCache.Keys() {
if w, ok := ei.writerCache.Load(wid); ok {
_, err := w.Close(ei.ctx)
if err != nil {
if firstErr == nil {
firstErr = err
}
}
}
ei.writerCache.Delete(wid)
}
return firstErr
}

// WriteRow Write one row into local writer buffer.
func (wCtx *WriterContext) WriteRow(key, idxVal []byte) error {
kvs := make([]common.KvPair, 1)
Expand Down
1 change: 1 addition & 0 deletions ddl/ingest/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
LitInfoChgMemSetting string = "[ddl-ingest] change memory setting for lightning"
LitInfoInitMemSetting string = "[ddl-ingest] initial memory setting for lightning"
LitInfoUnsafeImport string = "[ddl-ingest] do a partial import data into the storage"
LitErrCloseWriterErr string = "[ddl-ingest] close writer error"
)

func genBackendAllocMemFailedErr(memRoot MemRoot, jobID int64) error {
Expand Down
1 change: 1 addition & 0 deletions tests/realtikvtest/addindextest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_test(
],
embed = [":addindextest"],
deps = [
"//br/pkg/lightning/backend/local",
"//config",
"//ddl",
"//ddl/ingest",
Expand Down
4 changes: 4 additions & 0 deletions tests/realtikvtest/addindextest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/ingest"
"github.com/pingcap/tidb/ddl/testutil"
Expand All @@ -38,6 +39,8 @@ func TestAddIndexIngestMemoryUsage(t *testing.T) {
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)

local.RunInTest = true

tk.MustExec("create table t (a int, b int, c int);")
var sb strings.Builder
sb.WriteString("insert into t values ")
Expand All @@ -55,6 +58,7 @@ func TestAddIndexIngestMemoryUsage(t *testing.T) {
tk.MustExec("alter table t add unique index idx1(b);")
tk.MustExec("admin check table t;")
require.Equal(t, int64(0), ingest.LitMemRoot.CurrentUsage())
require.NoError(t, local.LastAlloc.CheckRefCnt())
}

func TestAddIndexIngestLimitOneBackend(t *testing.T) {
Expand Down

0 comments on commit 5cdc753

Please sign in to comment.