Skip to content

Commit

Permalink
Merge branch 'master' into fix7493
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Nov 1, 2022
2 parents c084457 + d80cbd2 commit d3b3787
Show file tree
Hide file tree
Showing 71 changed files with 3,119 additions and 599 deletions.
17 changes: 3 additions & 14 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ clean_integration_test_containers: ## Clean MySQL and Kafka integration test con
docker-compose -f $(TICDC_DOCKER_DEPLOYMENTS_DIR)/docker-compose-mysql-integration.yml down -v
docker-compose -f $(TICDC_DOCKER_DEPLOYMENTS_DIR)/docker-compose-kafka-integration.yml down -v

fmt: tools/bin/gofumports tools/bin/shfmt tools/bin/gci generate_mock generate-msgp-code tiflow-generate-mock
fmt: tools/bin/gofumports tools/bin/shfmt tools/bin/gci generate_mock generate-msgp-code
@echo "run gci (format imports)"
tools/bin/gci write $(FILES) 2>&1 | $(FAIL_ON_STDOUT)
@echo "run gofumports"
Expand Down Expand Up @@ -316,14 +316,9 @@ data-flow-diagram: docs/data-flow.dot
swagger-spec: tools/bin/swag
tools/bin/swag init --exclude dm,engine --parseVendor -generalInfo cdc/api/v1/api.go --output docs/swagger

generate_mock: ## Generate mock code.
generate_mock: tools/bin/mockgen
tools/bin/mockgen -source cdc/owner/owner.go -destination cdc/owner/mock/owner_mock.go
tools/bin/mockgen -source cdc/owner/status_provider.go -destination cdc/owner/mock/status_provider_mock.go
tools/bin/mockgen -source cdc/api/v2/api_helpers.go -destination cdc/api/v2/api_helpers_mock.go -package v2
tools/bin/mockgen -source pkg/etcd/etcd.go -destination pkg/etcd/mock/etcd_client_mock.go
tools/bin/mockgen -source cdc/processor/manager.go -destination cdc/processor/mock/manager_mock.go
tools/bin/mockgen -source cdc/capture/capture.go -destination cdc/capture/mock/capture_mock.go
tools/bin/mockgen -source pkg/cmd/factory/factory.go -destination pkg/cmd/factory/mock/factory_mock.go -package mock_factory
scripts/generate-mock.sh

clean:
go clean -i ./...
Expand Down Expand Up @@ -357,9 +352,6 @@ dm-chaos-case:
dm_debug-tools:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/binlog-event-blackhole ./dm/debug-tools/binlog-event-blackhole

dm_generate_mock: tools/bin/mockgen
./dm/tests/generate-mock.sh

dm_generate_openapi: tools/bin/oapi-codegen
@echo "generate_openapi"
cd dm && ../tools/bin/oapi-codegen --config=openapi/spec/server-gen-cfg.yaml openapi/spec/dm.yaml
Expand Down Expand Up @@ -495,9 +487,6 @@ tiflow-demo:
tiflow-chaos-case:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/tiflow-chaos-case ./engine/chaos/cases

tiflow-generate-mock: tools/bin/mockgen
scripts/generate-engine-mock.sh

engine_unit_test: check_failpoint_ctl
$(call run_engine_unit_test,$(ENGINE_PACKAGES))

Expand Down
8 changes: 8 additions & 0 deletions cdc/entry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,18 @@ var (
Name: "ignored_dml_event_count",
Help: "The total count of dml events that are ignored in mounter.",
}, []string{"namespace", "changefeed"})
mounterGroupInputChanSizeGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "mounter",
Name: "group_input_chan_size",
Help: "The size of input channel of mounter group",
}, []string{"namespace", "changefeed", "index"})
)

// InitMetrics registers all metrics in this file
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(totalRowsCountGauge)
registry.MustRegister(ignoredDMLEventCounter)
registry.MustRegister(mounterGroupInputChanSizeGauge)
}
35 changes: 15 additions & 20 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ type Mounter interface {
// DecodeEvent accepts `model.PolymorphicEvent` with `RawKVEntry` filled and
// decodes `RawKVEntry` into `RowChangedEvent`.
// If a `model.PolymorphicEvent` should be ignored, it will returns (false, nil).
DecodeEvent(ctx context.Context, event *model.PolymorphicEvent) (bool, error)
DecodeEvent(ctx context.Context, event *model.PolymorphicEvent) error
}

type mounterImpl struct {
type mounter struct {
schemaStorage SchemaStorage
tz *time.Location
enableOldValue bool
Expand All @@ -84,7 +84,7 @@ func NewMounter(schemaStorage SchemaStorage,
filter pfilter.Filter,
enableOldValue bool,
) Mounter {
return &mounterImpl{
return &mounter{
schemaStorage: schemaStorage,
changefeedID: changefeedID,
enableOldValue: enableOldValue,
Expand All @@ -99,28 +99,23 @@ func NewMounter(schemaStorage SchemaStorage,

// DecodeEvent decode kv events using ddl puller's schemaStorage
// this method could block indefinitely if the DDL puller is lagging.
// Note: If pEvent.Row is nil after decode, it means this event should be ignored.
func (m *mounterImpl) DecodeEvent(ctx context.Context, pEvent *model.PolymorphicEvent) (bool, error) {
func (m *mounter) DecodeEvent(ctx context.Context, event *model.PolymorphicEvent) error {
m.metricTotalRows.Inc()
if pEvent.IsResolved() {
return true, nil
if event.IsResolved() {
return nil
}
row, err := m.unmarshalAndMountRowChanged(ctx, pEvent.RawKV)
row, err := m.unmarshalAndMountRowChanged(ctx, event.RawKV)
if err != nil {
return false, errors.Trace(err)
}
if row == nil {
log.Debug("message's row changed event is nil, it should be ignored", zap.Uint64("startTs", pEvent.StartTs))
return true, nil
return errors.Trace(err)
}

pEvent.Row = row
pEvent.RawKV.Value = nil
pEvent.RawKV.OldValue = nil
return false, nil
event.Row = row
event.RawKV.Value = nil
event.RawKV.OldValue = nil
return nil
}

func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *model.RawKVEntry) (*model.RowChangedEvent, error) {
func (m *mounter) unmarshalAndMountRowChanged(ctx context.Context, raw *model.RawKVEntry) (*model.RowChangedEvent, error) {
if !bytes.HasPrefix(raw.Key, tablePrefix) {
return nil, nil
}
Expand Down Expand Up @@ -193,7 +188,7 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode
return row, err
}

func (m *mounterImpl) unmarshalRowKVEntry(tableInfo *model.TableInfo, rawKey []byte, rawValue []byte, rawOldValue []byte, base baseKVEntry) (*rowKVEntry, error) {
func (m *mounter) unmarshalRowKVEntry(tableInfo *model.TableInfo, rawKey []byte, rawValue []byte, rawOldValue []byte, base baseKVEntry) (*rowKVEntry, error) {
recordID, err := tablecodec.DecodeRowKey(rawKey)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -323,7 +318,7 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill
return cols, rawCols, nil
}

func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, model.RowChangedDatums, error) {
func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, model.RowChangedDatums, error) {
var err error
// Decode previous columns.
var preCols []*model.Column
Expand Down
131 changes: 131 additions & 0 deletions cdc/entry/mounter_group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package entry

import (
"context"
"strconv"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/filter"
"golang.org/x/sync/errgroup"
)

// MounterGroup is a group of mounter workers
type MounterGroup interface {
Run(ctx context.Context) error
AddEvent(ctx context.Context, event *model.PolymorphicEvent) error
}

type mounterGroup struct {
schemaStorage SchemaStorage
inputCh []chan *model.PolymorphicEvent
tz *time.Location
filter filter.Filter
enableOldValue bool

workerNum int
index uint64

changefeedID model.ChangeFeedID
}

const (
defaultMounterWorkerNum = 16
defaultInputChanSize = 256
defaultMetricInterval = 15 * time.Second
)

// NewMounterGroup return a group of mounters.
func NewMounterGroup(
schemaStorage SchemaStorage,
workerNum int,
enableOldValue bool,
filter filter.Filter,
tz *time.Location,
changefeedID model.ChangeFeedID,
) *mounterGroup {
if workerNum <= 0 {
workerNum = defaultMounterWorkerNum
}
inputCh := make([]chan *model.PolymorphicEvent, workerNum)
for i := 0; i < workerNum; i++ {
inputCh[i] = make(chan *model.PolymorphicEvent, defaultInputChanSize)
}
return &mounterGroup{
schemaStorage: schemaStorage,
inputCh: inputCh,
enableOldValue: enableOldValue,
filter: filter,
tz: tz,

workerNum: workerNum,

changefeedID: changefeedID,
}
}

func (m *mounterGroup) Run(ctx context.Context) error {
defer func() {
mounterGroupInputChanSizeGauge.DeleteLabelValues(m.changefeedID.Namespace, m.changefeedID.ID)
}()
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < m.workerNum; i++ {
idx := i
g.Go(func() error {
return m.runWorker(ctx, idx)
})
}
return g.Wait()
}

func (m *mounterGroup) runWorker(ctx context.Context, index int) error {
mounter := NewMounter(m.schemaStorage, m.changefeedID, m.tz, m.filter, m.enableOldValue)
rawCh := m.inputCh[index]
metrics := mounterGroupInputChanSizeGauge.
WithLabelValues(m.changefeedID.Namespace, m.changefeedID.ID, strconv.Itoa(index))
ticker := time.NewTicker(defaultMetricInterval)
for {
var pEvent *model.PolymorphicEvent
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-ticker.C:
metrics.Set(float64(len(rawCh)))
case pEvent = <-rawCh:
if pEvent.RawKV.OpType == model.OpTypeResolved {
pEvent.MarkFinished()
continue
}
err := mounter.DecodeEvent(ctx, pEvent)
if err != nil {
return errors.Trace(err)
}
pEvent.MarkFinished()
}
}
}

func (m *mounterGroup) AddEvent(ctx context.Context, event *model.PolymorphicEvent) error {
index := atomic.AddUint64(&m.index, 1) % uint64(m.workerNum)
select {
case <-ctx.Done():
return ctx.Err()
case m.inputCh[index] <- event:
return nil
}
}
8 changes: 4 additions & 4 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func testMounterDisableOldValue(t *testing.T, tc struct {
require.Nil(t, err)
mounter := NewMounter(scheamStorage,
model.DefaultChangeFeedID("c1"),
time.UTC, filter, false).(*mounterImpl)
time.UTC, filter, false).(*mounter)
mounter.tz = time.Local
ctx := context.Background()

Expand Down Expand Up @@ -1004,7 +1004,7 @@ func TestDecodeEventIgnoreRow(t *testing.T) {

ts := schemaStorage.GetLastSnapshot().CurrentTs()
schemaStorage.AdvanceResolvedTs(ver.Ver)
mounter := NewMounter(schemaStorage, cfID, time.Local, filter, true).(*mounterImpl)
mounter := NewMounter(schemaStorage, cfID, time.Local, filter, true).(*mounter)

type testCase struct {
schema string
Expand Down Expand Up @@ -1056,9 +1056,9 @@ func TestDecodeEventIgnoreRow(t *testing.T) {
walkTableSpanInStore(t, helper.Storage(), tableID, func(key []byte, value []byte) {
rawKV := f(key, value)
pEvent := model.NewPolymorphicEvent(rawKV)
ignored, err := mounter.DecodeEvent(ctx, pEvent)
err := mounter.DecodeEvent(ctx, pEvent)
require.Nil(t, err)
if ignored {
if pEvent.Row == nil {
return
}
row := pEvent.Row
Expand Down
16 changes: 3 additions & 13 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,7 @@ func (s *eventFeedSession) receiveFromStream(
}
if cevent.ResolvedTs != nil {
metricSendEventBatchResolvedSize.Observe(float64(len(cevent.ResolvedTs.Regions)))
err = s.sendResolvedTs(ctx, cevent.ResolvedTs, worker, addr)
err = s.sendResolvedTs(ctx, cevent.ResolvedTs, worker)
if err != nil {
return err
}
Expand Down Expand Up @@ -1233,7 +1233,7 @@ func (s *eventFeedSession) sendRegionChangeEvents(
) error {
statefulEvents := make([][]*regionStatefulEvent, worker.concurrency)
for i := 0; i < worker.concurrency; i++ {
// Allocate a buffer with 1.5x length than average to reduce reallocate.
// Allocate a buffer with 2x length than average to reduce reallocate.
buffLen := len(events) / worker.concurrency * 3 / 2
statefulEvents[i] = make([]*regionStatefulEvent, 0, buffLen)
}
Expand Down Expand Up @@ -1314,13 +1314,12 @@ func (s *eventFeedSession) sendResolvedTs(
ctx context.Context,
resolvedTs *cdcpb.ResolvedTs,
worker *regionWorker,
addr string,
) error {
statefulEvents := make([]*regionStatefulEvent, worker.concurrency)
// split resolved ts
for i := 0; i < worker.concurrency; i++ {
// Allocate a buffer with 1.5x length than average to reduce reallocate.
buffLen := len(resolvedTs.Regions) / worker.concurrency * 3 / 2
buffLen := len(resolvedTs.Regions) / worker.concurrency * 2
statefulEvents[i] = &regionStatefulEvent{
resolvedTsEvent: &resolvedTsEvent{
resolvedTs: resolvedTs.Ts,
Expand All @@ -1332,15 +1331,6 @@ func (s *eventFeedSession) sendResolvedTs(
for _, regionID := range resolvedTs.Regions {
state, ok := worker.getRegionState(regionID)
if ok {
if state.isStopped() {
log.Debug("drop resolved ts due to region feed stopped",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Uint64("regionID", regionID),
zap.Uint64("requestID", state.requestID),
zap.String("addr", addr))
continue
}
slot := worker.inputCalcSlot(regionID)
statefulEvents[slot].resolvedTsEvent.regions = append(
statefulEvents[slot].resolvedTsEvent.regions, state,
Expand Down
Loading

0 comments on commit d3b3787

Please sign in to comment.