Skip to content

Commit

Permalink
sorter,mounter: row size based flow control (#3366)
Browse files Browse the repository at this point in the history
  • Loading branch information
overvenus authored Nov 30, 2021
1 parent 88984b5 commit 30d77ea
Show file tree
Hide file tree
Showing 13 changed files with 200 additions and 374 deletions.
99 changes: 71 additions & 28 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"math"
"math/rand"
"time"
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand Down Expand Up @@ -211,7 +212,7 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode
if rowKV == nil {
return nil, nil
}
return m.mountRowKVEntry(tableInfo, rowKV, raw.ApproximateSize())
return m.mountRowKVEntry(tableInfo, rowKV, raw.ApproximateDataSize())
}
return nil, nil
}()
Expand Down Expand Up @@ -305,6 +306,7 @@ func UnmarshalDDL(raw *model.RawKVEntry) (*timodel.Job, error) {
func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fillWithDefaultValue bool) ([]*model.Column, error) {
cols := make([]*model.Column, len(tableInfo.RowColumnsOffset))
for _, colInfo := range tableInfo.Columns {
colSize := 0
if !model.IsColCDCVisible(colInfo) {
continue
}
Expand All @@ -314,15 +316,19 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill
if exist {
var err error
var warn string
colValue, warn, err = formatColVal(colDatums, colInfo.Tp)
var size int
colValue, size, warn, err = formatColVal(colDatums, colInfo.Tp)
if err != nil {
return nil, errors.Trace(err)
}
if warn != "" {
log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String()))
}
colSize += size
} else if fillWithDefaultValue {
colValue = getDefaultOrZeroValue(colInfo)
var size int
colValue, size = getDefaultOrZeroValue(colInfo)
colSize += size
} else {
continue
}
Expand All @@ -331,6 +337,8 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill
Type: colInfo.Tp,
Value: colValue,
Flag: tableInfo.ColumnsFlag[colInfo.ID],
// ApproximateBytes = column data size + column struct size
ApproximateBytes: colSize + sizeOfEmptyColumn,
}
}
return cols, nil
Expand Down Expand Up @@ -399,84 +407,119 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr
TableID: row.PhysicalTableID,
IsPartition: tableInfo.GetPartitionInfo() != nil,
},
Columns: cols,
PreColumns: preCols,
IndexColumns: tableInfo.IndexColumnsOffset,
ApproximateSize: dataSize,
Columns: cols,
PreColumns: preCols,
IndexColumns: tableInfo.IndexColumnsOffset,
ApproximateDataSize: dataSize,
}, nil
}

var emptyBytes = make([]byte, 0)

func formatColVal(datum types.Datum, tp byte) (value interface{}, warn string, err error) {
const (
sizeOfEmptyColumn = int(unsafe.Sizeof(model.Column{}))
sizeOfEmptyBytes = int(unsafe.Sizeof(emptyBytes))
sizeOfEmptyString = int(unsafe.Sizeof(""))
)

func sizeOfDatum(d types.Datum) int {
array := [...]types.Datum{d}
return int(types.EstimatedMemUsage(array[:], 1))
}

func sizeOfString(s string) int {
// string data size + string struct size.
return len(s) + sizeOfEmptyString
}

func sizeOfBytes(b []byte) int {
// bytes data size + bytes struct size.
return len(b) + sizeOfEmptyBytes
}

func formatColVal(datum types.Datum, tp byte) (
value interface{}, size int, warn string, err error,
) {
if datum.IsNull() {
return nil, "", nil
return nil, 0, "", nil
}
switch tp {
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp:
return datum.GetMysqlTime().String(), "", nil
v := datum.GetMysqlTime().String()
return v, sizeOfString(v), "", nil
case mysql.TypeDuration:
return datum.GetMysqlDuration().String(), "", nil
v := datum.GetMysqlDuration().String()
return v, sizeOfString(v), "", nil
case mysql.TypeJSON:
return datum.GetMysqlJSON().String(), "", nil
v := datum.GetMysqlJSON().String()
return v, sizeOfString(v), "", nil
case mysql.TypeNewDecimal:
v := datum.GetMysqlDecimal()
if v == nil {
return nil, "", nil
d := datum.GetMysqlDecimal()
if d == nil {
// nil takes 0 byte.
return nil, 0, "", nil
}
return v.String(), "", nil
v := d.String()
return v, sizeOfString(v), "", nil
case mysql.TypeEnum:
return datum.GetMysqlEnum().Value, "", nil
v := datum.GetMysqlEnum().Value
const sizeOfV = unsafe.Sizeof(v)
return v, int(sizeOfV), "", nil
case mysql.TypeSet:
return datum.GetMysqlSet().Value, "", nil
v := datum.GetMysqlSet().Value
const sizeOfV = unsafe.Sizeof(v)
return v, int(sizeOfV), "", nil
case mysql.TypeBit:
// Encode bits as integers to avoid pingcap/tidb#10988 (which also affects MySQL itself)
v, err := datum.GetBinaryLiteral().ToInt(nil)
return v, "", err
const sizeOfV = unsafe.Sizeof(v)
return v, int(sizeOfV), "", err
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar,
mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob:
b := datum.GetBytes()
if b == nil {
b = emptyBytes
}
return b, "", nil
return b, sizeOfBytes(b), "", nil
case mysql.TypeFloat, mysql.TypeDouble:
v := datum.GetFloat64()
if math.IsNaN(v) || math.IsInf(v, 1) || math.IsInf(v, -1) {
warn = fmt.Sprintf("the value is invalid in column: %f", v)
v = 0
}
return v, warn, nil
const sizeOfV = unsafe.Sizeof(v)
return v, int(sizeOfV), warn, nil
default:
return datum.GetValue(), "", nil
return datum.GetValue(), sizeOfDatum(datum), "", nil
}
}

func getDefaultOrZeroValue(col *timodel.ColumnInfo) interface{} {
func getDefaultOrZeroValue(col *timodel.ColumnInfo) (interface{}, int) {
// see https://github.com/pingcap/tidb/issues/9304
// must use null if TiDB not write the column value when default value is null
// and the value is null
if !mysql.HasNotNullFlag(col.Flag) {
d := types.NewDatum(nil)
return d.GetValue()
const size = unsafe.Sizeof(d.GetValue())
return d.GetValue(), int(size)
}

if col.GetDefaultValue() != nil {
d := types.NewDatum(col.GetDefaultValue())
return d.GetValue()
return d.GetValue(), sizeOfDatum(d)
}
switch col.Tp {
case mysql.TypeEnum:
// For enum type, if no default value and not null is set,
// the default value is the first element of the enum list
d := types.NewDatum(col.FieldType.Elems[0])
return d.GetValue()
return d.GetValue(), sizeOfDatum(d)
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar:
return emptyBytes
return emptyBytes, sizeOfEmptyBytes
}

d := table.GetZeroValue(col)
return d.GetValue()
return d.GetValue(), sizeOfDatum(d)
}

// DecodeTableID decodes the raw key to a table ID
Expand Down
Loading

0 comments on commit 30d77ea

Please sign in to comment.