Skip to content

Commit

Permalink
use unbounded chan in storage sink
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Nov 9, 2023
1 parent c2e9d09 commit 1a9d2e9
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type DMLSink struct {
alive struct {
sync.RWMutex
// msgCh is a channel to hold eventFragment.
msgCh chan eventFragment
msgCh *chann.DrainableChann[eventFragment]
isDead bool
}

Expand Down Expand Up @@ -142,15 +142,15 @@ func NewDMLSink(ctx context.Context,
cancel: wgCancel,
dead: make(chan struct{}),
}
s.alive.msgCh = make(chan eventFragment, defaultChannelSize)
s.alive.msgCh = chann.NewAutoDrainChann[eventFragment]()

encodedCh := make(chan eventFragment, defaultChannelSize)
workerChannels := make([]*chann.DrainableChann[eventFragment], cfg.WorkerCount)

// create a group of encoding workers.
for i := 0; i < defaultEncodingConcurrency; i++ {
encoder := encoderBuilder.Build()
s.encodingWorkers[i] = newEncodingWorker(i, s.changefeedID, encoder, s.alive.msgCh, encodedCh)
s.encodingWorkers[i] = newEncodingWorker(i, s.changefeedID, encoder, s.alive.msgCh.Out(), encodedCh)
}
// create defragmenter.
s.defragmenter = newDefragmenter(encodedCh, workerChannels)
Expand All @@ -170,7 +170,7 @@ func NewDMLSink(ctx context.Context,

s.alive.Lock()
s.alive.isDead = true
close(s.alive.msgCh)
s.alive.msgCh.CloseAndDrain()
s.alive.Unlock()
close(s.dead)

Expand Down Expand Up @@ -236,7 +236,7 @@ func (s *DMLSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa

s.statistics.ObserveRows(txn.Event.Rows...)
// emit a TxnCallbackableEvent encoupled with a sequence number starting from one.
s.alive.msgCh <- eventFragment{
s.alive.msgCh.In() <- eventFragment{
seqNumber: seq,
versionedTable: tbl,
event: txn,
Expand Down

0 comments on commit 1a9d2e9

Please sign in to comment.