diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 15845e117a2..2d32d843eda 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -21,6 +21,7 @@ import ( "math" "math/rand" "time" + "unsafe" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -211,7 +212,7 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode if rowKV == nil { return nil, nil } - return m.mountRowKVEntry(tableInfo, rowKV, raw.ApproximateSize()) + return m.mountRowKVEntry(tableInfo, rowKV, raw.ApproximateDataSize()) } return nil, nil }() @@ -305,6 +306,7 @@ func UnmarshalDDL(raw *model.RawKVEntry) (*timodel.Job, error) { func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fillWithDefaultValue bool) ([]*model.Column, error) { cols := make([]*model.Column, len(tableInfo.RowColumnsOffset)) for _, colInfo := range tableInfo.Columns { + colSize := 0 if !model.IsColCDCVisible(colInfo) { continue } @@ -314,15 +316,19 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill if exist { var err error var warn string - colValue, warn, err = formatColVal(colDatums, colInfo.Tp) + var size int + colValue, size, warn, err = formatColVal(colDatums, colInfo.Tp) if err != nil { return nil, errors.Trace(err) } if warn != "" { log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String())) } + colSize += size } else if fillWithDefaultValue { - colValue = getDefaultOrZeroValue(colInfo) + var size int + colValue, size = getDefaultOrZeroValue(colInfo) + colSize += size } else { continue } @@ -331,6 +337,8 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill Type: colInfo.Tp, Value: colValue, Flag: tableInfo.ColumnsFlag[colInfo.ID], + // ApproximateBytes = column data size + column struct size + ApproximateBytes: colSize + sizeOfEmptyColumn, } } return cols, nil @@ -399,84 +407,119 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr TableID: row.PhysicalTableID, IsPartition: tableInfo.GetPartitionInfo() != nil, }, - Columns: cols, - PreColumns: preCols, - IndexColumns: tableInfo.IndexColumnsOffset, - ApproximateSize: dataSize, + Columns: cols, + PreColumns: preCols, + IndexColumns: tableInfo.IndexColumnsOffset, + ApproximateDataSize: dataSize, }, nil } var emptyBytes = make([]byte, 0) -func formatColVal(datum types.Datum, tp byte) (value interface{}, warn string, err error) { +const ( + sizeOfEmptyColumn = int(unsafe.Sizeof(model.Column{})) + sizeOfEmptyBytes = int(unsafe.Sizeof(emptyBytes)) + sizeOfEmptyString = int(unsafe.Sizeof("")) +) + +func sizeOfDatum(d types.Datum) int { + array := [...]types.Datum{d} + return int(types.EstimatedMemUsage(array[:], 1)) +} + +func sizeOfString(s string) int { + // string data size + string struct size. + return len(s) + sizeOfEmptyString +} + +func sizeOfBytes(b []byte) int { + // bytes data size + bytes struct size. + return len(b) + sizeOfEmptyBytes +} + +func formatColVal(datum types.Datum, tp byte) ( + value interface{}, size int, warn string, err error, +) { if datum.IsNull() { - return nil, "", nil + return nil, 0, "", nil } switch tp { case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp: - return datum.GetMysqlTime().String(), "", nil + v := datum.GetMysqlTime().String() + return v, sizeOfString(v), "", nil case mysql.TypeDuration: - return datum.GetMysqlDuration().String(), "", nil + v := datum.GetMysqlDuration().String() + return v, sizeOfString(v), "", nil case mysql.TypeJSON: - return datum.GetMysqlJSON().String(), "", nil + v := datum.GetMysqlJSON().String() + return v, sizeOfString(v), "", nil case mysql.TypeNewDecimal: - v := datum.GetMysqlDecimal() - if v == nil { - return nil, "", nil + d := datum.GetMysqlDecimal() + if d == nil { + // nil takes 0 byte. + return nil, 0, "", nil } - return v.String(), "", nil + v := d.String() + return v, sizeOfString(v), "", nil case mysql.TypeEnum: - return datum.GetMysqlEnum().Value, "", nil + v := datum.GetMysqlEnum().Value + const sizeOfV = unsafe.Sizeof(v) + return v, int(sizeOfV), "", nil case mysql.TypeSet: - return datum.GetMysqlSet().Value, "", nil + v := datum.GetMysqlSet().Value + const sizeOfV = unsafe.Sizeof(v) + return v, int(sizeOfV), "", nil case mysql.TypeBit: // Encode bits as integers to avoid pingcap/tidb#10988 (which also affects MySQL itself) v, err := datum.GetBinaryLiteral().ToInt(nil) - return v, "", err + const sizeOfV = unsafe.Sizeof(v) + return v, int(sizeOfV), "", err case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob: b := datum.GetBytes() if b == nil { b = emptyBytes } - return b, "", nil + return b, sizeOfBytes(b), "", nil case mysql.TypeFloat, mysql.TypeDouble: v := datum.GetFloat64() if math.IsNaN(v) || math.IsInf(v, 1) || math.IsInf(v, -1) { warn = fmt.Sprintf("the value is invalid in column: %f", v) v = 0 } - return v, warn, nil + const sizeOfV = unsafe.Sizeof(v) + return v, int(sizeOfV), warn, nil default: - return datum.GetValue(), "", nil + return datum.GetValue(), sizeOfDatum(datum), "", nil } } -func getDefaultOrZeroValue(col *timodel.ColumnInfo) interface{} { +func getDefaultOrZeroValue(col *timodel.ColumnInfo) (interface{}, int) { // see https://github.com/pingcap/tidb/issues/9304 // must use null if TiDB not write the column value when default value is null // and the value is null if !mysql.HasNotNullFlag(col.Flag) { d := types.NewDatum(nil) - return d.GetValue() + const size = unsafe.Sizeof(d.GetValue()) + return d.GetValue(), int(size) } if col.GetDefaultValue() != nil { d := types.NewDatum(col.GetDefaultValue()) - return d.GetValue() + return d.GetValue(), sizeOfDatum(d) } switch col.Tp { case mysql.TypeEnum: // For enum type, if no default value and not null is set, // the default value is the first element of the enum list d := types.NewDatum(col.FieldType.Elems[0]) - return d.GetValue() + return d.GetValue(), sizeOfDatum(d) case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar: - return emptyBytes + return emptyBytes, sizeOfEmptyBytes } d := table.GetZeroValue(col) - return d.GetValue() + return d.GetValue(), sizeOfDatum(d) } // DecodeTableID decodes the raw key to a table ID diff --git a/cdc/entry/mounter_test.go b/cdc/entry/mounter_test.go index 0e326ce329f..0430c1ee4f0 100644 --- a/cdc/entry/mounter_test.go +++ b/cdc/entry/mounter_test.go @@ -37,23 +37,36 @@ func TestMounterDisableOldValue(t *testing.T) { testCases := []struct { tableName string createTableDDL string - values [][]interface{} + // [] for rows, []infterface{} for columns. + values [][]interface{} + // [] for table partition if there is any, + // []int for approximateBytes of rows. + putApproximateBytes [][]int + delApproximateBytes [][]int }{{ - tableName: "simple", - createTableDDL: "create table simple(id int primary key)", - values: [][]interface{}{{1}, {2}, {3}, {4}, {5}}, + tableName: "simple", + createTableDDL: "create table simple(id int primary key)", + values: [][]interface{}{{1}, {2}, {3}, {4}, {5}}, + putApproximateBytes: [][]int{{346, 346, 346, 346, 346}}, + delApproximateBytes: [][]int{{346, 346, 346, 346, 346}}, }, { - tableName: "no_pk", - createTableDDL: "create table no_pk(id int not null unique key)", - values: [][]interface{}{{1}, {2}, {3}, {4}, {5}}, + tableName: "no_pk", + createTableDDL: "create table no_pk(id int not null unique key)", + values: [][]interface{}{{1}, {2}, {3}, {4}, {5}}, + putApproximateBytes: [][]int{{345, 345, 345, 345, 345}}, + delApproximateBytes: [][]int{{217, 217, 217, 217, 217}}, }, { - tableName: "many_index", - createTableDDL: "create table many_index(id int not null unique key, c1 int unique key, c2 int, INDEX (c2))", - values: [][]interface{}{{1, 1, 1}, {2, 2, 2}, {3, 3, 3}, {4, 4, 4}, {5, 5, 5}}, + tableName: "many_index", + createTableDDL: "create table many_index(id int not null unique key, c1 int unique key, c2 int, INDEX (c2))", + values: [][]interface{}{{1, 1, 1}, {2, 2, 2}, {3, 3, 3}, {4, 4, 4}, {5, 5, 5}}, + putApproximateBytes: [][]int{{638, 638, 638, 638, 638}}, + delApproximateBytes: [][]int{{254, 254, 254, 254, 254}}, }, { - tableName: "default_value", - createTableDDL: "create table default_value(id int primary key, c1 int, c2 int not null default 5, c3 varchar(20), c4 varchar(20) not null default '666')", - values: [][]interface{}{{1}, {2}, {3}, {4}, {5}}, + tableName: "default_value", + createTableDDL: "create table default_value(id int primary key, c1 int, c2 int not null default 5, c3 varchar(20), c4 varchar(20) not null default '666')", + values: [][]interface{}{{1}, {2}, {3}, {4}, {5}}, + putApproximateBytes: [][]int{{708, 708, 708, 708, 708}}, + delApproximateBytes: [][]int{{353, 353, 353, 353, 353}}, }, { tableName: "partition_table", createTableDDL: `CREATE TABLE partition_table ( @@ -78,6 +91,8 @@ func TestMounterDisableOldValue(t *testing.T) { {18, "aae", "bbf", 21, 14}, {15, "afa", "bbc", 11, 12}, }, + putApproximateBytes: [][]int{{775}, {777}, {777}, {777, 777}}, + delApproximateBytes: [][]int{{227}, {227}, {227}, {227, 227}}, }, { tableName: "tp_int", createTableDDL: `create table tp_int @@ -98,6 +113,8 @@ func TestMounterDisableOldValue(t *testing.T) { {4, 127, 32767, 8388607, 2147483647, 9223372036854775807}, {5, -128, -32768, -8388608, -2147483648, -9223372036854775808}, }, + putApproximateBytes: [][]int{{986, 706, 986, 986, 986}}, + delApproximateBytes: [][]int{{346, 346, 346, 346, 346}}, }, { tableName: "tp_text", createTableDDL: `create table tp_text @@ -116,7 +133,7 @@ func TestMounterDisableOldValue(t *testing.T) { c_binary binary(16) null, c_varbinary varbinary(16) null, constraint pk - primary key (id) + primary key (id) );`, values: [][]interface{}{ {1}, @@ -138,6 +155,8 @@ func TestMounterDisableOldValue(t *testing.T) { {5, "你好", "我好", "大家好", "道路", "千万条", "安全", "第一条", "行车", "不规范", "亲人", "两行泪", "!"}, {6, "😀", "😃", "😄", "😁", "😆", "😅", "😂", "🤣", "☺️", "😊", "😇", "🙂"}, }, + putApproximateBytes: [][]int{{1211, 1459, 1411, 1323, 1398, 1369}}, + delApproximateBytes: [][]int{{347, 347, 347, 347, 347, 347}}, }, { tableName: "tp_time", createTableDDL: `create table tp_time @@ -149,12 +168,14 @@ func TestMounterDisableOldValue(t *testing.T) { c_time time null, c_year year null, constraint pk - primary key (id) + primary key (id) );`, values: [][]interface{}{ {1}, {2, "2020-02-20", "2020-02-20 02:20:20", "2020-02-20 02:20:20", "02:20:20", "2020"}, }, + putApproximateBytes: [][]int{{707, 819}}, + delApproximateBytes: [][]int{{347, 347}}, }, { tableName: "tp_real", createTableDDL: `create table tp_real @@ -164,12 +185,14 @@ func TestMounterDisableOldValue(t *testing.T) { c_double double null, c_decimal decimal null, constraint pk - primary key (id) + primary key (id) );`, values: [][]interface{}{ {1}, {2, "2020.0202", "2020.0303", "2020.0404"}, }, + putApproximateBytes: [][]int{{563, 551}}, + delApproximateBytes: [][]int{{347, 347}}, }, { tableName: "tp_other", createTableDDL: `create table tp_other @@ -180,12 +203,14 @@ func TestMounterDisableOldValue(t *testing.T) { c_bit bit(64) null, c_json json null, constraint pk - primary key (id) + primary key (id) );`, values: [][]interface{}{ {1}, {2, "a", "a,c", 888, `{"aa":"bb"}`}, }, + putApproximateBytes: [][]int{{636, 624}}, + delApproximateBytes: [][]int{{348, 348}}, }, { tableName: "clustered_index1", createTableDDL: "CREATE TABLE clustered_index1 (id VARCHAR(255) PRIMARY KEY, data INT);", @@ -194,6 +219,8 @@ func TestMounterDisableOldValue(t *testing.T) { {"你好😘", 666}, {"世界🤪", 888}, }, + putApproximateBytes: [][]int{{383, 446, 446}}, + delApproximateBytes: [][]int{{311, 318, 318}}, }, { tableName: "clustered_index2", createTableDDL: "CREATE TABLE clustered_index2 (id VARCHAR(255), data INT, ddaa date, PRIMARY KEY (id, data, ddaa), UNIQUE KEY (id, data, ddaa));", @@ -201,6 +228,8 @@ func TestMounterDisableOldValue(t *testing.T) { {"你好😘", 666, "2020-11-20"}, {"世界🤪", 888, "2020-05-12"}, }, + putApproximateBytes: [][]int{{592, 592}}, + delApproximateBytes: [][]int{{592, 592}}, }} for _, tc := range testCases { testMounterDisableOldValue(t, tc) @@ -208,9 +237,11 @@ func TestMounterDisableOldValue(t *testing.T) { } func testMounterDisableOldValue(t *testing.T, tc struct { - tableName string - createTableDDL string - values [][]interface{} + tableName string + createTableDDL string + values [][]interface{} + putApproximateBytes [][]int + delApproximateBytes [][]int }) { store, err := mockstore.NewMockStore() require.Nil(t, err) @@ -257,7 +288,7 @@ func testMounterDisableOldValue(t *testing.T, tc struct { mounter.tz = time.Local ctx := context.Background() - mountAndCheckRowInTable := func(tableID int64, f func(key []byte, value []byte) *model.RawKVEntry) int { + mountAndCheckRowInTable := func(tableID int64, rowBytes []int, f func(key []byte, value []byte) *model.RawKVEntry) int { var rows int walkTableSpanInStore(t, store, tableID, func(key []byte, value []byte) { rawKV := f(key, value) @@ -269,6 +300,8 @@ func testMounterDisableOldValue(t *testing.T, tc struct { rows++ require.Equal(t, row.Table.Table, tc.tableName) require.Equal(t, row.Table.Schema, "test") + require.Equal(t, rowBytes[rows-1], row.ApproximateBytes(), row) + t.Log("ApproximateBytes", tc.tableName, rows-1, row.ApproximateBytes()) // TODO: test column flag, column type and index columns if len(row.Columns) != 0 { checkSQL, params := prepareCheckSQL(t, tc.tableName, row.Columns) @@ -284,19 +317,19 @@ func testMounterDisableOldValue(t *testing.T, tc struct { return rows } - mountAndCheckRow := func(f func(key []byte, value []byte) *model.RawKVEntry) int { + mountAndCheckRow := func(rowsBytes [][]int, f func(key []byte, value []byte) *model.RawKVEntry) int { partitionInfo := tableInfo.GetPartitionInfo() if partitionInfo == nil { - return mountAndCheckRowInTable(tableInfo.ID, f) + return mountAndCheckRowInTable(tableInfo.ID, rowsBytes[0], f) } var rows int - for _, p := range partitionInfo.Definitions { - rows += mountAndCheckRowInTable(p.ID, f) + for i, p := range partitionInfo.Definitions { + rows += mountAndCheckRowInTable(p.ID, rowsBytes[i], f) } return rows } - rows := mountAndCheckRow(func(key []byte, value []byte) *model.RawKVEntry { + rows := mountAndCheckRow(tc.putApproximateBytes, func(key []byte, value []byte) *model.RawKVEntry { return &model.RawKVEntry{ OpType: model.OpTypePut, Key: key, @@ -307,7 +340,7 @@ func testMounterDisableOldValue(t *testing.T, tc struct { }) require.Equal(t, rows, len(tc.values)) - rows = mountAndCheckRow(func(key []byte, value []byte) *model.RawKVEntry { + rows = mountAndCheckRow(tc.delApproximateBytes, func(key []byte, value []byte) *model.RawKVEntry { return &model.RawKVEntry{ OpType: model.OpTypeDelete, Key: key, diff --git a/cdc/model/kv.go b/cdc/model/kv.go index 1e249248970..aad0da3c6cb 100644 --- a/cdc/model/kv.go +++ b/cdc/model/kv.go @@ -88,7 +88,8 @@ func (v *RawKVEntry) String() string { v.OpType, string(v.Key), string(v.Value), v.StartTs, v.CRTs, v.RegionID) } -// ApproximateSize calculate the approximate size of this event -func (v *RawKVEntry) ApproximateSize() int64 { +// ApproximateDataSize calculate the approximate size of protobuf binary +// representation of this event. +func (v *RawKVEntry) ApproximateDataSize() int64 { return int64(len(v.Key) + len(v.Value) + len(v.OldValue)) } diff --git a/cdc/model/kv_test.go b/cdc/model/kv_test.go index c1465240d1a..b566e23a5f4 100644 --- a/cdc/model/kv_test.go +++ b/cdc/model/kv_test.go @@ -56,5 +56,5 @@ func TestRawKVEntry(t *testing.T) { } require.Equal(t, "OpType: 1, Key: 123, Value: 345, StartTs: 100, CRTs: 101, RegionID: 0", raw.String()) - require.Equal(t, int64(6), raw.ApproximateSize()) + require.Equal(t, int64(6), raw.ApproximateDataSize()) } diff --git a/cdc/model/sink.go b/cdc/model/sink.go index e05447d3fd5..cccee1fae02 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -17,6 +17,7 @@ import ( "fmt" "strconv" "sync" + "unsafe" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/quotes" @@ -261,8 +262,9 @@ type RowChangedEvent struct { PreColumns []*Column `json:"pre-columns" msg:"-"` IndexColumns [][]int `json:"-" msg:"index-columns"` - // approximate size of this event, calculate by tikv proto bytes size - ApproximateSize int64 `json:"-" msg:"-"` + // ApproximateDataSize is the approximate size of protobuf binary + // representation of this event. + ApproximateDataSize int64 `json:"-" msg:"-"` } // IsDelete returns true if the row is a delete event @@ -316,12 +318,44 @@ func (r *RowChangedEvent) HandleKeyColumns() []*Column { return pkeyCols } +// ApproximateBytes returns approximate bytes in memory consumed by the event. +func (r *RowChangedEvent) ApproximateBytes() int { + const sizeOfRowEvent = int(unsafe.Sizeof(*r)) + const sizeOfTable = int(unsafe.Sizeof(*r.Table)) + const sizeOfIndexes = int(unsafe.Sizeof(r.IndexColumns[0])) + const sizeOfInt = int(unsafe.Sizeof(int(0))) + + // Size of table name + size := len(r.Table.Schema) + len(r.Table.Table) + sizeOfTable + // Size of cols + for i := range r.Columns { + size += r.Columns[i].ApproximateBytes + } + // Size of pre cols + for i := range r.PreColumns { + if r.PreColumns[i] != nil { + size += r.PreColumns[i].ApproximateBytes + } + } + // Size of index columns + for i := range r.IndexColumns { + size += len(r.IndexColumns[i]) * sizeOfInt + size += sizeOfIndexes + } + // Size of an empty row event + size += sizeOfRowEvent + return size +} + // Column represents a column value in row changed event type Column struct { Name string `json:"name" msg:"name"` Type byte `json:"type" msg:"type"` Flag ColumnFlagType `json:"flag" msg:"-"` Value interface{} `json:"value" msg:"value"` + + // ApproximateBytes is approximate bytes consumed by the column. + ApproximateBytes int `json:"-"` } // RedoColumn stores Column change diff --git a/cdc/processor/pipeline/mounter.go b/cdc/processor/pipeline/mounter.go deleted file mode 100644 index 1bfc94b43c4..00000000000 --- a/cdc/processor/pipeline/mounter.go +++ /dev/null @@ -1,132 +0,0 @@ -// Copyright 2021 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package pipeline - -import ( - "context" - "log" - "sync" - "time" - - "github.com/edwingeng/deque" - "github.com/pingcap/failpoint" - "github.com/pingcap/ticdc/cdc/model" - "github.com/pingcap/ticdc/pkg/notify" - "github.com/pingcap/ticdc/pkg/pipeline" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" - "golang.org/x/time/rate" -) - -const ( - waitEventMountedBatchSize = 128 * 1024 // larger means less CPU overhead used by `select` but more memory overhead. - maxNotificationsPerSecond = 10.0 // larger means lower latency, but more CPU used by `select`. -) - -// mounterNode is now used to buffer unmounted events. -// TODO rename this node, or refactor the mounter to make it synchronous. -type mounterNode struct { - mu sync.Mutex - queue deque.Deque // we use Deque for better memory consumption and support for batching - - wg errgroup.Group - cancel context.CancelFunc - - // notifies new events pushed to the queue - notifier notify.Notifier - // limits the rate at which notifications are sent - rl *rate.Limiter -} - -func newMounterNode() pipeline.Node { - return &mounterNode{ - queue: deque.NewDeque(), - rl: rate.NewLimiter(maxNotificationsPerSecond, 1 /* burst */), - } -} - -func (n *mounterNode) Init(ctx pipeline.NodeContext) error { - stdCtx, cancel := context.WithCancel(ctx) - n.cancel = cancel - - receiver, err := n.notifier.NewReceiver(time.Millisecond * 100) - if err != nil { - log.Panic("unexpected error", zap.Error(err)) - } - - n.wg.Go(func() error { - defer receiver.Stop() - for { - select { - case <-stdCtx.Done(): - return nil - case <-receiver.C: - // handles writes to the queue - for { - n.mu.Lock() - msgs := n.queue.PopManyFront(waitEventMountedBatchSize) - n.mu.Unlock() - if len(msgs) == 0 { - break // inner loop - } - - for _, msg := range msgs { - msg := msg.(pipeline.Message) - if msg.Tp != pipeline.MessageTypePolymorphicEvent { - // sends the control message directly to the next node - ctx.SendToNextNode(msg) - continue // to handling the next message - } - - // handles PolymorphicEvents - event := msg.PolymorphicEvent - if event.RawKV.OpType != model.OpTypeResolved { - failpoint.Inject("MounterNodeWaitPrepare", func() {}) - // only RowChangedEvents need mounting - err := event.WaitPrepare(stdCtx) - if err != nil { - ctx.Throw(err) - return nil - } - } - - ctx.SendToNextNode(msg) - } - } - } - } - }) - - return nil -} - -// Receive receives the message from the previous node -func (n *mounterNode) Receive(ctx pipeline.NodeContext) error { - msg := ctx.Message() - n.mu.Lock() - n.queue.PushBack(msg) - n.mu.Unlock() - - if n.rl.Allow() { - // send notification under the rate limiter - n.notifier.Notify() - } - return nil -} - -func (n *mounterNode) Destroy(ctx pipeline.NodeContext) error { - defer n.notifier.Close() - n.cancel() - return n.wg.Wait() -} diff --git a/cdc/processor/pipeline/mounter_test.go b/cdc/processor/pipeline/mounter_test.go deleted file mode 100644 index dbb19ccee56..00000000000 --- a/cdc/processor/pipeline/mounter_test.go +++ /dev/null @@ -1,164 +0,0 @@ -// Copyright 2021 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package pipeline - -import ( - "context" - "errors" - "sync" - "sync/atomic" - "time" - - "github.com/pingcap/check" - "github.com/pingcap/log" - "github.com/pingcap/ticdc/cdc/model" - cdcContext "github.com/pingcap/ticdc/pkg/context" - "github.com/pingcap/ticdc/pkg/pipeline" - "github.com/pingcap/ticdc/pkg/retry" - "github.com/pingcap/ticdc/pkg/util/testleak" - "go.uber.org/zap" -) - -type mounterNodeSuite struct{} - -var _ = check.Suite(&mounterNodeSuite{}) - -type checkNode struct { - c *check.C - count int - expected int -} - -func (n *checkNode) Init(ctx pipeline.NodeContext) error { - // do nothing - return nil -} - -func (n *checkNode) Receive(ctx pipeline.NodeContext) error { - message := ctx.Message() - if message.Tp == pipeline.MessageTypePolymorphicEvent { - if message.PolymorphicEvent.RawKV.OpType == model.OpTypeResolved { - n.c.Assert(n.count, check.Equals, n.expected) - return errors.New("finished") - } - n.c.Assert(message.PolymorphicEvent.Row, check.NotNil) - } - - if n.count%100 == 0 { - log.Info("message received", zap.Int("count", n.count)) - } - - if n.count == basicsTestMessageCount/2 { - log.Info("sleeping for 5 seconds to simulate blocking") - time.Sleep(time.Second * 5) - } - n.count++ - return nil -} - -func (n *checkNode) Destroy(ctx pipeline.NodeContext) error { - return nil -} - -const ( - basicsTestMessageCount = 10000 -) - -func generateMockRawKV(ts uint64) *model.RawKVEntry { - return &model.RawKVEntry{ - OpType: model.OpTypePut, - Key: []byte{}, - Value: []byte{}, - OldValue: nil, - StartTs: ts - 5, - CRTs: ts, - RegionID: 0, - } -} - -func (s *mounterNodeSuite) TestMounterNodeBasics(c *check.C) { - defer testleak.AfterTest(c)() - - ctx, cancel := cdcContext.WithCancel(cdcContext.NewBackendContext4Test(false)) - defer cancel() - - ctx = cdcContext.WithErrorHandler(ctx, func(err error) error { - return nil - }) - runnersSize, outputChannelSize := 2, 64 - p := pipeline.NewPipeline(ctx, 0, runnersSize, outputChannelSize) - mounterNode := newMounterNode() - p.AppendNode(ctx, "mounter", mounterNode) - - checkNode := &checkNode{ - c: c, - count: 0, - expected: basicsTestMessageCount, - } - p.AppendNode(ctx, "check", checkNode) - - var sentCount int64 - sendMsg := func(p *pipeline.Pipeline, msg pipeline.Message) { - err := retry.Do(context.Background(), func() error { - return p.SendToFirstNode(msg) - }, retry.WithBackoffBaseDelay(10), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(100)) - atomic.AddInt64(&sentCount, 1) - c.Assert(err, check.IsNil) - } - - mockMounterInput := make(chan *model.PolymorphicEvent, 10240) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - for i := 0; i < basicsTestMessageCount; i++ { - var msg pipeline.Message - if i%100 == 0 { - // generates a control message - msg = pipeline.TickMessage() - } else { - msg = pipeline.PolymorphicEventMessage(model.NewPolymorphicEvent(generateMockRawKV(uint64(i << 5)))) - msg.PolymorphicEvent.SetUpFinishedChan() - select { - case <-ctx.Done(): - return - case mockMounterInput <- msg.PolymorphicEvent: - } - } - sendMsg(p, msg) - } - msg := pipeline.PolymorphicEventMessage(model.NewResolvedPolymorphicEvent(0, (basicsTestMessageCount<<5)+1)) - sendMsg(p, msg) - c.Assert(atomic.LoadInt64(&sentCount), check.Equals, int64(basicsTestMessageCount+1)) - log.Info("finished sending") - }() - - wg.Add(1) - go func() { - defer wg.Done() - for { - select { - case <-ctx.Done(): - return - case event := <-mockMounterInput: - event.Row = &model.RowChangedEvent{} // mocked row - event.PrepareFinished() - } - } - }() - - p.Wait() - cancel() - wg.Wait() -} diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index 8692cf6bbcb..34099128a33 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -140,7 +140,16 @@ func (n *sorterNode) Init(ctx pipeline.NodeContext) error { log.Panic("unexpected empty msg", zap.Reflect("msg", msg)) } if msg.RawKV.OpType != model.OpTypeResolved { - size := uint64(msg.RawKV.ApproximateSize()) + // DESIGN NOTE: We send the messages to the mounter in + // this separate goroutine to prevent blocking + // the whole pipeline. + msg.SetUpFinishedChan() + select { + case <-ctx.Done(): + return nil + case n.mounter.Input() <- msg: + } + commitTs := msg.CRTs // We interpolate a resolved-ts if none has been sent for some time. if time.Since(lastSendResolvedTsTime) > resolvedTsInterpolateInterval { @@ -155,9 +164,21 @@ func (n *sorterNode) Init(ctx pipeline.NodeContext) error { ctx.SendToNextNode(pipeline.PolymorphicEventMessage(model.NewResolvedPolymorphicEvent(0, lastCRTs))) } } + + // Must wait before accessing msg.Row + err := msg.WaitPrepare(ctx) + if err != nil { + if errors.Cause(err) != context.Canceled { + ctx.Throw(err) + } + return errors.Trace(err) + } + // We calculate memory consumption by RowChangedEvent size. + // It's much larger than RawKVEntry. + size := uint64(msg.Row.ApproximateBytes()) // NOTE we allow the quota to be exceeded if blocking means interrupting a transaction. // Otherwise the pipeline would deadlock. - err := n.flowController.Consume(commitTs, size, func() error { + err = n.flowController.Consume(commitTs, size, func() error { if lastCRTs > lastSentResolvedTs { // If we are blocking, we send a Resolved Event here to elicit a sink-flush. // Not sending a Resolved Event here will very likely deadlock the pipeline. @@ -178,15 +199,6 @@ func (n *sorterNode) Init(ctx pipeline.NodeContext) error { return nil } lastCRTs = commitTs - - // DESIGN NOTE: We send the messages to the mounter in this separate goroutine to prevent - // blocking the whole pipeline. - msg.SetUpFinishedChan() - select { - case <-ctx.Done(): - return nil - case n.mounter.Input() <- msg: - } } else { // handle OpTypeResolved if msg.CRTs < lastSentResolvedTs { diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index 6d496df39ef..ef234785277 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -164,9 +164,9 @@ func (t *tablePipelineImpl) Wait() { // replicating 1024 tables in the worst case. const defaultOutputChannelSize = 64 -// There are 5 or 6 runners in table pipeline: header, puller, sorter, mounter, +// There are 4 or 5 runners in table pipeline: header, puller, sorter, // sink, cyclic if cyclic replication is enabled -const defaultRunnersSize = 5 +const defaultRunnersSize = 4 // NewTablePipeline creates a table pipeline // TODO(leoppro): implement a mock kvclient to test the table pipeline @@ -208,7 +208,6 @@ func NewTablePipeline(ctx cdcContext.Context, p.AppendNode(ctx, "puller", newPullerNode(tableID, replicaInfo, tableName)) p.AppendNode(ctx, "sorter", sorterNode) - p.AppendNode(ctx, "mounter", newMounterNode()) if cyclicEnabled { p.AppendNode(ctx, "cyclic", newCyclicMarkNode(replicaInfo.MarkTableID)) } diff --git a/cdc/sink/cdclog/s3.go b/cdc/sink/cdclog/s3.go index 53db1e3fb6d..52726b60c9b 100644 --- a/cdc/sink/cdclog/s3.go +++ b/cdc/sink/cdclog/s3.go @@ -101,7 +101,7 @@ func (tb *tableBuffer) flush(ctx context.Context, sink *logSink) error { flushedSize := int64(0) for event := int64(0); event < sendEvents; event++ { row := <-tb.dataCh - flushedSize += row.ApproximateSize + flushedSize += row.ApproximateDataSize if event == sendEvents-1 { // if last event, we record ts as new rotate file name newFileName = makeTableFileObject(row.Table.TableID, row.CommitTs) diff --git a/cdc/sink/cdclog/utils.go b/cdc/sink/cdclog/utils.go index f2b351ca836..9dd3621b0af 100644 --- a/cdc/sink/cdclog/utils.go +++ b/cdc/sink/cdclog/utils.go @@ -165,7 +165,7 @@ func (l *logSink) emitRowChangedEvents(ctx context.Context, newUnit func(int64) case <-ctx.Done(): return ctx.Err() case l.units[hash].dataChan() <- row: - l.units[hash].Size().Add(row.ApproximateSize) + l.units[hash].Size().Add(row.ApproximateDataSize) l.units[hash].Events().Inc() } } diff --git a/cdc/sorter/unified/heap_sorter.go b/cdc/sorter/unified/heap_sorter.go index a03f88d78ce..b9d7f89affe 100644 --- a/cdc/sorter/unified/heap_sorter.go +++ b/cdc/sorter/unified/heap_sorter.go @@ -317,7 +317,7 @@ func (h *heapSorter) init(ctx context.Context, onError func(err error)) { } // 5 * 8 is for the 5 fields in PolymorphicEvent - state.heapSizeBytesEstimate += event.RawKV.ApproximateSize() + 40 + state.heapSizeBytesEstimate += event.RawKV.ApproximateDataSize() + 40 needFlush := state.heapSizeBytesEstimate >= int64(state.sorterConfig.ChunkSizeLimit) || (isResolvedEvent && state.rateCounter < flushRateLimitPerSecond) diff --git a/cdc/sorter/unified/memory_backend.go b/cdc/sorter/unified/memory_backend.go index 27ca7f511b6..b1b91bcce77 100644 --- a/cdc/sorter/unified/memory_backend.go +++ b/cdc/sorter/unified/memory_backend.go @@ -110,7 +110,7 @@ type memoryBackEndWriter struct { func (w *memoryBackEndWriter) writeNext(event *model.PolymorphicEvent) error { w.backEnd.events = append(w.backEnd.events, event) // 8 * 5 is for the 5 fields in PolymorphicEvent, each of which is thought of as a 64-bit pointer - w.bytesWritten += 8*5 + event.RawKV.ApproximateSize() + w.bytesWritten += 8*5 + event.RawKV.ApproximateDataSize() failpoint.Inject("sorterDebug", func() { if event.CRTs < w.maxTs {