Skip to content

Commit

Permalink
Merge branch 'release-4.0' into cherry-pick-2429-to-release-4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand authored Aug 5, 2021
2 parents 5890d93 + d2277db commit 28d1eac
Show file tree
Hide file tree
Showing 11 changed files with 20 additions and 350 deletions.
6 changes: 6 additions & 0 deletions cdc/owner/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type ddlPullerImpl struct {
mu sync.Mutex
resolvedTS uint64
pendingDDLJobs []*timodel.Job
lastDDLJobID int64
cancel context.CancelFunc
}

Expand Down Expand Up @@ -117,9 +118,14 @@ func (h *ddlPullerImpl) Run(ctx cdcContext.Context) error {
log.Info("discard the ddl job", zap.Int64("jobID", job.ID), zap.String("query", job.Query))
return nil
}
if job.ID == h.lastDDLJobID {
log.Warn("ignore duplicated DDL job", zap.Any("job", job))
return nil
}
h.mu.Lock()
defer h.mu.Unlock()
h.pendingDDLJobs = append(h.pendingDDLJobs, job)
h.lastDDLJobID = job.ID
return nil
}

Expand Down
5 changes: 2 additions & 3 deletions cdc/owner/ddl_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,8 @@ func (s *ddlPullerSuite) TestPuller(c *check.C) {
resolvedTs, ddl = p.PopFrontDDL()
c.Assert(resolvedTs, check.Equals, uint64(25))
c.Assert(ddl.ID, check.Equals, int64(3))
resolvedTs, ddl = p.PopFrontDDL()
c.Assert(resolvedTs, check.Equals, uint64(25))
c.Assert(ddl.ID, check.Equals, int64(3))
_, ddl = p.PopFrontDDL()
c.Assert(ddl, check.IsNil)

waitResolvedTsGrowing(c, p, 30)
resolvedTs, ddl = p.PopFrontDDL()
Expand Down
6 changes: 0 additions & 6 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ import (
)

const (
// defaultMemBufferCapacity is the default memory buffer per change feed.
defaultMemBufferCapacity int64 = 10 * 1024 * 1024 * 1024 // 10G

defaultSyncResolvedBatch = 1024

schemaStorageGCLag = time.Minute * 20
Expand All @@ -71,7 +68,6 @@ type oldProcessor struct {
captureInfo model.CaptureInfo
changefeedID string
changefeed model.ChangeFeedInfo
limitter *puller.BlurResourceLimitter
stopped int32

pdCli pd.Client
Expand Down Expand Up @@ -163,7 +159,6 @@ func newProcessor(
) (*oldProcessor, error) {
etcdCli := session.Client()
cdcEtcdCli := kv.NewCDCEtcdClient(ctx, etcdCli)
limitter := puller.NewBlurResourceLimmter(defaultMemBufferCapacity)

log.Info("start processor with startts",
zap.Uint64("startts", checkpointTs), util.ZapFieldChangefeed(ctx))
Expand Down Expand Up @@ -193,7 +188,6 @@ func newProcessor(

p := &oldProcessor{
id: uuid.New().String(),
limitter: limitter,
captureInfo: captureInfo,
changefeedID: changefeedID,
changefeed: changefeed,
Expand Down
4 changes: 0 additions & 4 deletions cdc/processor/pipeline/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
)

type pullerNode struct {
limitter *puller.BlurResourceLimitter

tableName string // quoted schema and table, used in metircs only

tableID model.TableID
Expand All @@ -40,10 +38,8 @@ type pullerNode struct {
}

func newPullerNode(
limitter *puller.BlurResourceLimitter,
tableID model.TableID, replicaInfo *model.TableReplicaInfo, tableName string) pipeline.Node {
return &pullerNode{
limitter: limitter,
tableID: tableID,
replicaInfo: replicaInfo,
tableName: tableName,
Expand Down
4 changes: 1 addition & 3 deletions cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/entry"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/puller"
"github.com/pingcap/ticdc/cdc/sink"
"github.com/pingcap/ticdc/cdc/sink/common"
serverConfig "github.com/pingcap/ticdc/pkg/config"
Expand Down Expand Up @@ -162,7 +161,6 @@ const defaultRunnersSize = 5
// NewTablePipeline creates a table pipeline
// TODO(leoppro): implement a mock kvclient to test the table pipeline
func NewTablePipeline(ctx cdcContext.Context,
limitter *puller.BlurResourceLimitter,
mounter entry.Mounter,
tableID model.TableID,
tableName string,
Expand Down Expand Up @@ -191,7 +189,7 @@ func NewTablePipeline(ctx cdcContext.Context,
runnerSize++
}
p := pipeline.NewPipeline(ctx, 500*time.Millisecond, runnerSize, defaultOutputChannelSize)
p.AppendNode(ctx, "puller", newPullerNode(limitter, tableID, replicaInfo, tableName))
p.AppendNode(ctx, "puller", newPullerNode(tableID, replicaInfo, tableName))
p.AppendNode(ctx, "sorter", newSorterNode(tableName, tableID, flowController))
p.AppendNode(ctx, "mounter", newMounterNode(mounter))
if cyclicEnabled {
Expand Down
10 changes: 3 additions & 7 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ import (
)

const (
// defaultMemBufferCapacity is the default memory buffer per change feed.
defaultMemBufferCapacity int64 = 10 * 1024 * 1024 * 1024 // 10G

schemaStorageGCLag = time.Minute * 20
)

Expand All @@ -59,7 +56,6 @@ type processor struct {

tables map[model.TableID]tablepipeline.TablePipeline

limitter *puller.BlurResourceLimitter
schemaStorage entry.SchemaStorage
filter *filter.Filter
mounter entry.Mounter
Expand All @@ -86,7 +82,6 @@ func newProcessor(ctx cdcContext.Context) *processor {
changefeedID := ctx.ChangefeedVars().ID
advertiseAddr := ctx.GlobalVars().CaptureInfo.AdvertiseAddr
p := &processor{
limitter: puller.NewBlurResourceLimmter(defaultMemBufferCapacity),
tables: make(map[model.TableID]tablepipeline.TablePipeline),
errCh: make(chan error, 1),
changefeedID: changefeedID,
Expand Down Expand Up @@ -505,7 +500,9 @@ func (p *processor) sendError(err error) {
select {
case p.errCh <- err:
default:
log.Error("processor receives redundant error", zap.Error(err))
if errors.Cause(err) != context.Canceled {
log.Error("processor receives redundant error", zap.Error(err))
}
}
}

Expand Down Expand Up @@ -720,7 +717,6 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode
sink := p.sinkManager.CreateTableSink(tableID, replicaInfo.StartTs)
table := tablepipeline.NewTablePipeline(
ctx,
p.limitter,
p.mounter,
tableID,
tableNameStr,
Expand Down
188 changes: 0 additions & 188 deletions cdc/puller/buffer.go

This file was deleted.

Loading

0 comments on commit 28d1eac

Please sign in to comment.