Skip to content

Commit

Permalink
Merge branch 'master' into ttl-reschedule-after-finish
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKeao authored Dec 13, 2022
2 parents 0ace00e + d0d6955 commit a8a4ab8
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 44 deletions.
14 changes: 13 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,13 @@ func openDuplicateDB(storeDir string) (*pebble.DB, error) {
return pebble.Open(dbPath, opts)
}

var (
// RunInTest indicates whether the current process is running in test.
RunInTest bool
// LastAlloc is the last ID allocator.
LastAlloc manual.Allocator
)

// NewLocalBackend creates new connections to tikv.
func NewLocalBackend(
ctx context.Context,
Expand Down Expand Up @@ -461,6 +468,11 @@ func NewLocalBackend(
} else {
writeLimiter = noopStoreWriteLimiter{}
}
alloc := manual.Allocator{}
if RunInTest {
alloc.RefCnt = new(atomic.Int64)
LastAlloc = alloc
}
local := &local{
engines: sync.Map{},
pdCtl: pdCtl,
Expand All @@ -486,7 +498,7 @@ func NewLocalBackend(
keyAdapter: keyAdapter,
errorMgr: errorMgr,
importClientFactory: importClientFactory,
bufferPool: membuf.NewPool(membuf.WithAllocator(manual.Allocator{})),
bufferPool: membuf.NewPool(membuf.WithAllocator(alloc)),
writeLimiter: writeLimiter,
logger: log.FromContext(ctx),
encBuilder: NewEncodingBuilder(ctx),
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/manual/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ go_library(
cgo = True,
importpath = "github.com/pingcap/tidb/br/pkg/lightning/manual",
visibility = ["//visibility:public"],
deps = ["@org_uber_go_atomic//:atomic"],
)
31 changes: 28 additions & 3 deletions br/pkg/lightning/manual/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,33 @@

package manual

type Allocator struct{}
import (
"fmt"

func (Allocator) Alloc(n int) []byte { return New(n) }
"go.uber.org/atomic"
)

func (Allocator) Free(b []byte) { Free(b) }
type Allocator struct {
RefCnt *atomic.Int64
}

func (a Allocator) Alloc(n int) []byte {
if a.RefCnt != nil {
a.RefCnt.Add(1)
}
return New(n)
}

func (a Allocator) Free(b []byte) {
if a.RefCnt != nil {
a.RefCnt.Add(-1)
}
Free(b)
}

func (a Allocator) CheckRefCnt() error {
if a.RefCnt != nil && a.RefCnt.Load() != 0 {
return fmt.Errorf("memory leak detected, refCnt: %d", a.RefCnt.Load())
}
return nil
}
29 changes: 28 additions & 1 deletion ddl/ingest/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ func (ei *engineInfo) Clean() {
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
}
ei.openedEngine = nil
err = ei.closeWriters()
if err != nil {
logutil.BgLogger().Error(LitErrCloseWriterErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
}
// Here the local intermediate files will be removed.
err = closedEngine.Cleanup(ei.ctx)
if err != nil {
Expand All @@ -101,8 +106,14 @@ func (ei *engineInfo) ImportAndClean() error {
return err1
}
ei.openedEngine = nil
err := ei.closeWriters()
if err != nil {
logutil.BgLogger().Error(LitErrCloseWriterErr, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
return err
}

err := ei.diskRoot.UpdateUsageAndQuota()
err = ei.diskRoot.UpdateUsageAndQuota()
if err != nil {
logutil.BgLogger().Error(LitErrUpdateDiskStats, zap.Error(err),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
Expand Down Expand Up @@ -181,6 +192,22 @@ func (ei *engineInfo) newWriterContext(workerID int) (*WriterContext, error) {
}, nil
}

func (ei *engineInfo) closeWriters() error {
var firstErr error
for wid := range ei.writerCache.Keys() {
if w, ok := ei.writerCache.Load(wid); ok {
_, err := w.Close(ei.ctx)
if err != nil {
if firstErr == nil {
firstErr = err
}
}
}
ei.writerCache.Delete(wid)
}
return firstErr
}

// WriteRow Write one row into local writer buffer.
func (wCtx *WriterContext) WriteRow(key, idxVal []byte) error {
kvs := make([]common.KvPair, 1)
Expand Down
1 change: 1 addition & 0 deletions ddl/ingest/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const (
LitInfoChgMemSetting string = "[ddl-ingest] change memory setting for ingest"
LitInfoInitMemSetting string = "[ddl-ingest] initial memory setting for ingest"
LitInfoUnsafeImport string = "[ddl-ingest] do a partial import data into the storage"
LitErrCloseWriterErr string = "[ddl-ingest] close writer error"
)

func genBackendAllocMemFailedErr(memRoot MemRoot, jobID int64) error {
Expand Down
Loading

0 comments on commit a8a4ab8

Please sign in to comment.