Skip to content

Commit

Permalink
Merge branch 'master' into avro
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangyangyu authored May 17, 2022
2 parents d3b9f09 + 5e858a4 commit 3c89d33
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 52 deletions.
13 changes: 11 additions & 2 deletions cdc/model/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

package model

// PolymorphicEvent describes an event can be in multiple states
// PolymorphicEvent describes an event can be in multiple states.
type PolymorphicEvent struct {
StartTs uint64
// Commit or resolved TS
Expand All @@ -23,7 +23,16 @@ type PolymorphicEvent struct {
Row *RowChangedEvent
}

// NewPolymorphicEvent creates a new PolymorphicEvent with a raw KV
// NewEmptyPolymorphicEvent creates a new empty PolymorphicEvent.
func NewEmptyPolymorphicEvent(ts uint64) *PolymorphicEvent {
return &PolymorphicEvent{
CRTs: ts,
RawKV: &RawKVEntry{},
Row: &RowChangedEvent{},
}
}

// NewPolymorphicEvent creates a new PolymorphicEvent with a raw KV.
func NewPolymorphicEvent(rawKV *RawKVEntry) *PolymorphicEvent {
if rawKV.OpType == OpTypeResolved {
return NewResolvedPolymorphicEvent(rawKV.RegionID, rawKV.CRTs)
Expand Down
3 changes: 3 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,9 @@ type RowChangedEvent struct {
// ApproximateDataSize is the approximate size of protobuf binary
// representation of this event.
ApproximateDataSize int64 `json:"-" msg:"-"`

// SplitTxn marks this RowChangedEvent as the first line of a new txn.
SplitTxn bool `json:"-" msg:"-"`
}

// IsDelete returns true if the row is a delete event
Expand Down
6 changes: 5 additions & 1 deletion cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ type mockSink struct {
// we are testing sinkNode by itself.
type mockFlowController struct{}

func (c *mockFlowController) Consume(commitTs uint64, size uint64, blockCallBack func() error) error {
func (c *mockFlowController) Consume(
msg *model.PolymorphicEvent,
size uint64,
blockCallBack func(bool) error,
) error {
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,10 @@ func (n *sorterNode) start(
size := uint64(msg.Row.ApproximateBytes())
// NOTE we allow the quota to be exceeded if blocking means interrupting a transaction.
// Otherwise the pipeline would deadlock.
err = n.flowController.Consume(commitTs, size, func() error {
if lastCRTs > lastSentResolvedTs {
err = n.flowController.Consume(msg, size, func(batch bool) error {
if batch {
log.Panic("cdc does not support the batch resolve mechanism at this time")
} else if lastCRTs > lastSentResolvedTs {
// If we are blocking, we send a Resolved Event here to elicit a sink-flush.
// Not sending a Resolved Event here will very likely deadlock the pipeline.
lastSentResolvedTs = lastCRTs
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type tablePipelineImpl struct {
// TODO find a better name or avoid using an interface
// We use an interface here for ease in unit testing.
type tableFlowController interface {
Consume(commitTs uint64, size uint64, blockCallBack func() error) error
Consume(msg *model.PolymorphicEvent, size uint64, blockCallBack func(batch bool) error) error
Release(resolvedTs uint64)
Abort()
GetConsumption() uint64
Expand Down
92 changes: 80 additions & 12 deletions cdc/sink/flowcontrol/flow_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,16 @@ import (
"github.com/edwingeng/deque"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"go.uber.org/zap"
)

const (
maxRowsPerTxn = 1024
maxSizePerTxn = 1024 * 1024 /* 1MB */
batchSize = 100
)

// TableFlowController provides a convenient interface to control the memory consumption of a per table event stream
type TableFlowController struct {
memoryQuota *tableMemoryQuota
Expand All @@ -31,13 +38,20 @@ type TableFlowController struct {
sync.Mutex
queue deque.Deque
}
// batchGroupCount is the number of txnSizeEntries with same commitTs, which could be:
// 1. Different txns with same commitTs but different startTs
// 2. TxnSizeEntry split from the same txns which exceeds max rows or max size
batchGroupCount uint

lastCommitTs uint64
}

type commitTsSizeEntry struct {
type txnSizeEntry struct {
// txn id
startTs uint64
commitTs uint64
size uint64
rowCount uint64
}

// NewTableFlowController creates a new TableFlowController
Expand All @@ -55,7 +69,12 @@ func NewTableFlowController(quota uint64) *TableFlowController {

// Consume is called when an event has arrived for being processed by the sink.
// It will handle transaction boundaries automatically, and will not block intra-transaction.
func (c *TableFlowController) Consume(commitTs uint64, size uint64, blockCallBack func() error) error {
func (c *TableFlowController) Consume(
msg *model.PolymorphicEvent,
size uint64,
callBack func(batch bool) error,
) error {
commitTs := msg.CRTs
lastCommitTs := atomic.LoadUint64(&c.lastCommitTs)

if commitTs < lastCommitTs {
Expand All @@ -65,8 +84,7 @@ func (c *TableFlowController) Consume(commitTs uint64, size uint64, blockCallBac
}

if commitTs > lastCommitTs {
atomic.StoreUint64(&c.lastCommitTs, commitTs)
err := c.memoryQuota.consumeWithBlocking(size, blockCallBack)
err := c.memoryQuota.consumeWithBlocking(size, callBack)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -82,13 +100,7 @@ func (c *TableFlowController) Consume(commitTs uint64, size uint64, blockCallBac
}
}

c.queueMu.Lock()
defer c.queueMu.Unlock()
c.queueMu.queue.PushBack(&commitTsSizeEntry{
commitTs: commitTs,
size: size,
})

c.enqueueSingleMsg(msg, size)
return nil
}

Expand All @@ -98,7 +110,7 @@ func (c *TableFlowController) Release(resolvedTs uint64) {

c.queueMu.Lock()
for c.queueMu.queue.Len() > 0 {
if peeked := c.queueMu.queue.Front().(*commitTsSizeEntry); peeked.commitTs <= resolvedTs {
if peeked := c.queueMu.queue.Front().(*txnSizeEntry); peeked.commitTs <= resolvedTs {
nBytesToRelease += peeked.size
c.queueMu.queue.PopFront()
} else {
Expand All @@ -110,6 +122,62 @@ func (c *TableFlowController) Release(resolvedTs uint64) {
c.memoryQuota.release(nBytesToRelease)
}

// Note that msgs received by enqueueSingleMsg must be sorted by commitTs_startTs order.
func (c *TableFlowController) enqueueSingleMsg(msg *model.PolymorphicEvent, size uint64) {
commitTs := msg.CRTs
lastCommitTs := atomic.LoadUint64(&c.lastCommitTs)

c.queueMu.Lock()
defer c.queueMu.Unlock()

var e deque.Elem
// 1. Processing a new txn with different commitTs.
if e = c.queueMu.queue.Back(); e == nil || lastCommitTs < commitTs {
atomic.StoreUint64(&c.lastCommitTs, commitTs)
c.queueMu.queue.PushBack(&txnSizeEntry{
startTs: msg.StartTs,
commitTs: commitTs,
size: size,
rowCount: 1,
})
c.batchGroupCount = 1
msg.Row.SplitTxn = true
return
}

// Processing txns with the same commitTs.
txnEntry := e.(*txnSizeEntry)
if txnEntry.commitTs != lastCommitTs {
log.Panic("got wrong commitTs from deque, report a bug",
zap.Uint64("lastCommitTs", c.lastCommitTs),
zap.Uint64("commitTsInDeque", txnEntry.commitTs))
}

// 2. Append row to current txn entry.
if txnEntry.startTs == msg.Row.StartTs &&
txnEntry.rowCount < maxRowsPerTxn && txnEntry.size < maxSizePerTxn {
txnEntry.size += size
txnEntry.rowCount++
return
}

// 3. Split the txn or handle a new txn with the same commitTs.
c.queueMu.queue.PushBack(&txnSizeEntry{
startTs: msg.StartTs,
commitTs: commitTs,
size: size,
rowCount: 1,
})
c.batchGroupCount++
msg.Row.SplitTxn = true

if c.batchGroupCount >= batchSize {
c.batchGroupCount = 0
// TODO(CharlesCheung): add batch resolve mechanism to mitigate oom problem
log.Debug("emit batch resolve event throw callback")
}
}

// Abort interrupts any ongoing Consume call
func (c *TableFlowController) Abort() {
c.memoryQuota.abort()
Expand Down
Loading

0 comments on commit 3c89d33

Please sign in to comment.