Skip to content

Commit

Permalink
sorter: cherry-pick Unified Sorter changes to release-4.0 (#1730)
Browse files Browse the repository at this point in the history
  • Loading branch information
liuzix authored May 8, 2021
1 parent 753073a commit fa5d48f
Show file tree
Hide file tree
Showing 22 changed files with 795 additions and 131 deletions.
1 change: 1 addition & 0 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func (o *Owner) newChangeFeed(
return nil, errors.Trace(err)
}

// TODO delete
if info.Engine == model.SortInFile {
err = os.MkdirAll(info.SortDir, 0o755)
if err != nil {
Expand Down
14 changes: 8 additions & 6 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ func (p *oldProcessor) addTable(ctx context.Context, tableID int64, replicaInfo
switch p.changefeed.Engine {
case model.SortInMemory:
sorter = puller.NewEntrySorter()
case model.SortInFile, model.SortUnified:
case model.SortInFile:
err := util.IsDirAndWritable(p.changefeed.SortDir)
if err != nil {
if os.IsNotExist(errors.Cause(err)) {
Expand All @@ -838,12 +838,14 @@ func (p *oldProcessor) addTable(ctx context.Context, tableID int64, replicaInfo
}
}

if p.changefeed.Engine == model.SortInFile {
sorter = puller.NewFileSorter(p.changefeed.SortDir)
} else {
// Unified Sorter
sorter = psorter.NewUnifiedSorter(p.changefeed.SortDir, tableName, util.CaptureAddrFromCtx(ctx))
sorter = puller.NewFileSorter(p.changefeed.SortDir)
case model.SortUnified:
err := psorter.UnifiedSorterCheckDir(p.changefeed.SortDir)
if err != nil {
p.sendError(errors.Trace(err))
return nil
}
sorter = psorter.NewUnifiedSorter(p.changefeed.SortDir, p.changefeedID, tableName, tableID, util.CaptureAddrFromCtx(ctx))
default:
p.sendError(cerror.ErrUnknownSortEngine.GenWithStackByArgs(p.changefeed.Engine))
return nil
Expand Down
34 changes: 23 additions & 11 deletions cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,27 @@ type sorterNode struct {
sortEngine model.SortEngine
sortDir string
sorter puller.EventSorter
tableName string // quoted schema and table, used in metircs only
wg errgroup.Group
cancel context.CancelFunc

changeFeedID model.ChangeFeedID
tableID model.TableID
tableName string // quoted schema and table, used in metircs only

wg errgroup.Group
cancel context.CancelFunc
}

func newSorterNode(sortEngine model.SortEngine, sortDir string, tableName string) pipeline.Node {
func newSorterNode(
sortEngine model.SortEngine,
sortDir string,
changeFeedID model.ChangeFeedID,
tableName string, tableID model.TableID) pipeline.Node {
return &sorterNode{
sortEngine: sortEngine,
sortDir: sortDir,
tableName: tableName,

changeFeedID: changeFeedID,
tableID: tableID,
tableName: tableName,
}
}

Expand All @@ -51,7 +62,7 @@ func (n *sorterNode) Init(ctx pipeline.NodeContext) error {
switch n.sortEngine {
case model.SortInMemory:
sorter = puller.NewEntrySorter()
case model.SortInFile, model.SortUnified:
case model.SortInFile:
err := util.IsDirAndWritable(n.sortDir)
if err != nil {
if os.IsNotExist(errors.Cause(err)) {
Expand All @@ -64,12 +75,13 @@ func (n *sorterNode) Init(ctx pipeline.NodeContext) error {
}
}

if n.sortEngine == model.SortInFile {
sorter = puller.NewFileSorter(n.sortDir)
} else {
// Unified Sorter
sorter = psorter.NewUnifiedSorter(n.sortDir, n.tableName, ctx.Vars().CaptureAddr)
sorter = puller.NewFileSorter(n.sortDir)
case model.SortUnified:
err := psorter.UnifiedSorterCheckDir(n.sortDir)
if err != nil {
return errors.Trace(err)
}
sorter = psorter.NewUnifiedSorter(n.sortDir, n.changeFeedID, n.tableName, n.tableID, ctx.Vars().CaptureAddr)
default:
return cerror.ErrUnknownSortEngine.GenWithStackByArgs(n.sortEngine)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func NewTablePipeline(ctx context.Context,

p := pipeline.NewPipeline(ctx, 500*time.Millisecond)
p.AppendNode(ctx, "puller", newPullerNode(changefeedID, credential, kvStorage, limitter, tableID, replicaInfo, tableName))
p.AppendNode(ctx, "sorter", newSorterNode(sortEngine, sortDir, tableName))
p.AppendNode(ctx, "sorter", newSorterNode(sortEngine, sortDir, changefeedID, tableName, tableID))
p.AppendNode(ctx, "mounter", newMounterNode(mounter))
config := ctx.Vars().Config
if config.Cyclic != nil && config.Cyclic.IsEnabled() {
Expand Down
17 changes: 13 additions & 4 deletions cdc/puller/sorter/backend_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"time"
"unsafe"

"github.com/pingcap/ticdc/pkg/util"

"github.com/mackerelio/go-osstat/memory"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand All @@ -36,7 +38,7 @@ import (
)

const (
backgroundJobInterval = time.Second * 5
backgroundJobInterval = time.Second * 15
)

var (
Expand Down Expand Up @@ -107,11 +109,11 @@ func newBackEndPool(dir string, captureAddr string) *backEndPool {
zap.Int64("usedBySorter", ret.sorterMemoryUsage()))
// Increase GC frequency to avoid unnecessary OOMs
debug.SetGCPercent(10)
if memPressure > 95 {
if memPressure > 80 {
runtime.GC()
}
} else {
debug.SetGCPercent(100)
debug.SetGCPercent(50)
}

// garbage collect temporary files in batches
Expand Down Expand Up @@ -166,9 +168,11 @@ func (p *backEndPool) alloc(ctx context.Context) (backEnd, error) {
}

fname := fmt.Sprintf("%s%d.tmp", p.filePrefix, atomic.AddUint64(&p.fileNameCounter, 1))
tableID, tableName := util.TableIDFromCtx(ctx)
log.Debug("Unified Sorter: trying to create file backEnd",
zap.String("filename", fname),
zap.String("table", tableNameFromCtx(ctx)))
zap.Int64("table-id", tableID),
zap.String("table-name", tableName))

ret, err := newFileBackEnd(fname, &msgPackGenSerde{})
if err != nil {
Expand Down Expand Up @@ -228,10 +232,12 @@ func (p *backEndPool) terminate() {
defer close(p.cancelCh)
// the background goroutine can be considered terminated here

log.Debug("Unified Sorter terminating...")
p.cancelRWLock.Lock()
defer p.cancelRWLock.Unlock()
p.isTerminating = true

log.Debug("Unified Sorter cleaning up before exiting")
// any new allocs and deallocs will not succeed from this point
// accessing p.cache without atomics is safe from now

Expand All @@ -254,11 +260,14 @@ func (p *backEndPool) terminate() {
log.Warn("Unified Sorter clean-up failed", zap.Error(err))
}
for _, file := range files {
log.Debug("Unified Sorter backEnd removing file", zap.String("file", file))
err = os.RemoveAll(file)
if err != nil {
log.Warn("Unified Sorter clean-up failed: failed to remove", zap.String("file-name", file), zap.Error(err))
}
}

log.Debug("Unified Sorter backEnd terminated")
}

func (p *backEndPool) sorterMemoryUsage() int64 {
Expand Down
5 changes: 1 addition & 4 deletions cdc/puller/sorter/backend_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"os"
"strconv"
"testing"
"time"

"github.com/pingcap/check"
Expand All @@ -26,11 +25,9 @@ import (
"github.com/pingcap/ticdc/pkg/util/testleak"
)

func TestSuite(t *testing.T) { check.TestingT(t) }

type backendPoolSuite struct{}

var _ = check.Suite(&backendPoolSuite{})
var _ = check.SerialSuites(&backendPoolSuite{})

func (s *backendPoolSuite) TestBasicFunction(c *check.C) {
defer testleak.AfterTest(c)()
Expand Down
50 changes: 41 additions & 9 deletions cdc/puller/sorter/heap_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,35 @@ const (
type flushTask struct {
taskID int
heapSorterID int
backend backEnd
reader backEndReader
tsLowerBound uint64
maxResolvedTs uint64
finished chan error
dealloc func() error
isDeallocated int32
dataSize int64
lastTs uint64 // for debugging TODO remove
canceller *asyncCanceller

isEmpty bool // read only field

deallocLock sync.RWMutex
isDeallocated bool // do not access directly
backend backEnd // do not access directly
}

func (t *flushTask) markDeallocated() {
t.deallocLock.Lock()
defer t.deallocLock.Unlock()

t.backend = nil
t.isDeallocated = true
}

func (t *flushTask) GetBackEnd() backEnd {
t.deallocLock.RLock()
defer t.deallocLock.RUnlock()

return t.backend
}

type heapSorter struct {
Expand Down Expand Up @@ -104,6 +123,10 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
isEmptyFlush := h.heap.Len() == 0
var finishCh chan error
if !isEmptyFlush {
failpoint.Inject("InjectErrorBackEndAlloc", func() {
failpoint.Return(cerrors.ErrUnifiedSorterIOError.Wrap(errors.New("injected alloc error")).FastGenWithCause())
})

var err error
backEnd, err = pool.alloc(ctx)
if err != nil {
Expand All @@ -121,17 +144,16 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
maxResolvedTs: maxResolvedTs,
finished: finishCh,
canceller: h.canceller,
isEmpty: isEmptyFlush,
}
h.taskCounter++

var oldHeap sortHeap
if !isEmptyFlush {
task.dealloc = func() error {
if atomic.SwapInt32(&task.isDeallocated, 1) == 1 {
return nil
}
if task.backend != nil {
task.backend = nil
backEnd := task.GetBackEnd()
if backEnd != nil {
defer task.markDeallocated()
return pool.dealloc(backEnd)
}
return nil
Expand All @@ -140,12 +162,15 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
h.heap = make(sortHeap, 0, 65536)
} else {
task.dealloc = func() error {
task.markDeallocated()
return nil
}
}
failpoint.Inject("sorterDebug", func() {
tableID, tableName := util.TableIDFromCtx(ctx)
log.Debug("Unified Sorter new flushTask",
zap.String("table", tableNameFromCtx(ctx)),
zap.Int64("table-id", tableID),
zap.String("table-name", tableName),
zap.Int("heap-id", task.heapSorterID),
zap.Uint64("resolvedTs", task.maxResolvedTs))
})
Expand Down Expand Up @@ -188,6 +213,11 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
close(task.finished)
}()

failpoint.Inject("InjectErrorBackEndWrite", func() {
task.finished <- cerrors.ErrUnifiedSorterIOError.Wrap(errors.New("injected write error")).FastGenWithCause()
failpoint.Return()
})

counter := 0
for oldHeap.Len() > 0 {
failpoint.Inject("asyncFlushInProcessDelay", func() {
Expand Down Expand Up @@ -223,9 +253,11 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
backEndFinal = nil

failpoint.Inject("sorterDebug", func() {
tableID, tableName := util.TableIDFromCtx(ctx)
log.Debug("Unified Sorter flushTask finished",
zap.Int("heap-id", task.heapSorterID),
zap.String("table", tableNameFromCtx(ctx)),
zap.Int64("table-id", tableID),
zap.String("table-name", tableName),
zap.Uint64("resolvedTs", task.maxResolvedTs),
zap.Uint64("data-size", dataSize),
zap.Int("size", eventCount))
Expand Down
Loading

0 comments on commit fa5d48f

Please sign in to comment.