Skip to content

Commit

Permalink
Revert "sorter: avoid too much memory alloc/gc in heap sorter"
Browse files Browse the repository at this point in the history
This reverts commit 0ef525f.
  • Loading branch information
amyangfei authored and ti-chi-bot committed May 27, 2021
1 parent 3b5f067 commit 9a1a01a
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions cdc/puller/sorter/heap_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

const (
flushRateLimitPerSecond = 10
sortHeapCapacity = 65536
sortHeapCapacity = 32
)

type flushTask struct {
Expand Down Expand Up @@ -149,6 +149,7 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
}
h.taskCounter++

var oldHeap sortHeap
if !isEmptyFlush {
task.dealloc = func() error {
backEnd := task.GetBackEnd()
Expand All @@ -158,6 +159,8 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
}
return nil
}
oldHeap = h.heap
h.heap = make(sortHeap, 0, sortHeapCapacity)
} else {
task.dealloc = func() error {
task.markDeallocated()
Expand Down Expand Up @@ -217,7 +220,7 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
})

counter := 0
for h.heap.Len() > 0 {
for oldHeap.Len() > 0 {
failpoint.Inject("asyncFlushInProcessDelay", func() {
log.Debug("asyncFlushInProcessDelay")
})
Expand All @@ -228,7 +231,7 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
}
counter++

event := heap.Pop(&h.heap).(*sortItem).entry
event := heap.Pop(&oldHeap).(*sortItem).entry
err := writer.writeNext(event)
if err != nil {
task.finished <- errors.Trace(err)
Expand Down Expand Up @@ -267,9 +270,6 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
close(task.finished)
return errors.Trace(err)
}
if cap(h.heap) > sortHeapCapacity {
h.heap = make(sortHeap, 0, sortHeapCapacity)
}
}

select {
Expand Down

0 comments on commit 9a1a01a

Please sign in to comment.