Skip to content

Commit

Permalink
lightning: refactor to reuse in load data, part 3 (#42689)
Browse files Browse the repository at this point in the history
ref #40499
  • Loading branch information
D3Hunter committed Mar 31, 2023
1 parent 2d4df7f commit 4e30a64
Show file tree
Hide file tree
Showing 10 changed files with 394 additions and 310 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "kv",
srcs = [
"allocator.go",
"base.go",
"kv2sql.go",
"session.go",
"sql2kv.go",
Expand Down
330 changes: 330 additions & 0 deletions br/pkg/lightning/backend/kv/base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,330 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kv

import (
"context"
"math/rand"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// ExtraHandleColumnInfo is the column info of extra handle column.
var ExtraHandleColumnInfo = model.NewExtraHandleColInfo()

// GeneratedCol generated column info.
type GeneratedCol struct {
// index of the column in the table
Index int
Expr expression.Expression
}

// AutoIDConverterFn is a function to convert auto id.
type AutoIDConverterFn func(int64) int64

// RowArrayMarshaller wraps a slice of types.Datum for logging the content into zap.
type RowArrayMarshaller []types.Datum

var kindStr = [...]string{
types.KindNull: "null",
types.KindInt64: "int64",
types.KindUint64: "uint64",
types.KindFloat32: "float32",
types.KindFloat64: "float64",
types.KindString: "string",
types.KindBytes: "bytes",
types.KindBinaryLiteral: "binary",
types.KindMysqlDecimal: "decimal",
types.KindMysqlDuration: "duration",
types.KindMysqlEnum: "enum",
types.KindMysqlBit: "bit",
types.KindMysqlSet: "set",
types.KindMysqlTime: "time",
types.KindInterface: "interface",
types.KindMinNotNull: "min",
types.KindMaxValue: "max",
types.KindRaw: "raw",
types.KindMysqlJSON: "json",
}

// MarshalLogArray implements the zapcore.ArrayMarshaler interface
func (row RowArrayMarshaller) MarshalLogArray(encoder zapcore.ArrayEncoder) error {
for _, datum := range row {
kind := datum.Kind()
var str string
var err error
switch kind {
case types.KindNull:
str = "NULL"
case types.KindMinNotNull:
str = "-inf"
case types.KindMaxValue:
str = "+inf"
default:
str, err = datum.ToString()
if err != nil {
return err
}
}
if err := encoder.AppendObject(zapcore.ObjectMarshalerFunc(func(enc zapcore.ObjectEncoder) error {
enc.AddString("kind", kindStr[kind])
enc.AddString("val", redact.String(str))
return nil
})); err != nil {
return err
}
}
return nil
}

// BaseKVEncoder encodes a row into a KV pair.
type BaseKVEncoder struct {
GenCols []GeneratedCol
SessionCtx *Session
Table table.Table
Columns []*table.Column
AutoRandomColID int64
// convert auto id for shard rowid or auto random id base on row id generated by lightning
AutoIDFn AutoIDConverterFn

logger *zap.Logger
recordCache []types.Datum
}

// NewBaseKVEncoder creates a new BaseKVEncoder.
func NewBaseKVEncoder(config *encode.EncodingConfig) (*BaseKVEncoder, error) {
meta := config.Table.Meta()
cols := config.Table.Cols()
se := NewSession(&config.SessionOptions, config.Logger)
// Set CommonAddRecordCtx to session to reuse the slices and BufStore in AddRecord
recordCtx := tables.NewCommonAddRecordCtx(len(cols))
tables.SetAddRecordCtx(se, recordCtx)

var autoRandomColID int64
autoIDFn := func(id int64) int64 { return id }
if meta.ContainsAutoRandomBits() {
col := common.GetAutoRandomColumn(meta)
autoRandomColID = col.ID

shardFmt := autoid.NewShardIDFormat(&col.FieldType, meta.AutoRandomBits, meta.AutoRandomRangeBits)
shard := rand.New(rand.NewSource(config.AutoRandomSeed)).Int63()
autoIDFn = func(id int64) int64 {
return shardFmt.Compose(shard, id)
}
} else if meta.ShardRowIDBits > 0 {
rd := rand.New(rand.NewSource(config.AutoRandomSeed)) // nolint:gosec
mask := int64(1)<<meta.ShardRowIDBits - 1
shift := autoid.RowIDBitLength - meta.ShardRowIDBits - 1
autoIDFn = func(id int64) int64 {
rd.Seed(id)
shardBits := (int64(rd.Uint32()) & mask) << shift
return shardBits | id
}
}

// collect expressions for evaluating stored generated columns
genCols, err := CollectGeneratedColumns(se, meta, cols)
if err != nil {
return nil, errors.Annotate(err, "failed to parse generated column expressions")
}
return &BaseKVEncoder{
GenCols: genCols,
SessionCtx: se,
Table: config.Table,
Columns: cols,
AutoRandomColID: autoRandomColID,
AutoIDFn: autoIDFn,
logger: config.Logger.Logger,
}, nil
}

// GetOrCreateRecord returns a record slice from the cache if possible, otherwise creates a new one.
func (e *BaseKVEncoder) GetOrCreateRecord() []types.Datum {
if e.recordCache != nil {
return e.recordCache
}
return make([]types.Datum, 0, len(e.Columns)+1)
}

// Record2KV converts a row into a KV pair.
func (e *BaseKVEncoder) Record2KV(record, originalRow []types.Datum, rowID int64) (*KvPairs, error) {
_, err := e.Table.AddRecord(e.SessionCtx, record)
if err != nil {
e.logger.Error("kv encode failed",
zap.Array("originalRow", RowArrayMarshaller(originalRow)),
zap.Array("convertedRow", RowArrayMarshaller(record)),
log.ShortError(err),
)
return nil, errors.Trace(err)
}
kvPairs := e.SessionCtx.TakeKvPairs()
for i := 0; i < len(kvPairs.Pairs); i++ {
var encoded [9]byte // The max length of encoded int64 is 9.
kvPairs.Pairs[i].RowID = common.EncodeIntRowIDToBuf(encoded[:0], rowID)
}
e.recordCache = record[:0]
return kvPairs, nil
}

// ProcessColDatum processes the datum of a column.
func (e *BaseKVEncoder) ProcessColDatum(col *table.Column, rowID int64, inputDatum *types.Datum) (types.Datum, error) {
value, err := e.getActualDatum(col, rowID, inputDatum)
if err != nil {
return value, err
}

if e.IsAutoRandomCol(col.ToInfo()) {
meta := e.Table.Meta()
shardFmt := autoid.NewShardIDFormat(&col.FieldType, meta.AutoRandomBits, meta.AutoRandomRangeBits)
// this allocator is the same as the allocator in table importer, i.e. PanickingAllocators. below too.
alloc := e.Table.Allocators(e.SessionCtx).Get(autoid.AutoRandomType)
if err := alloc.Rebase(context.Background(), value.GetInt64()&shardFmt.IncrementalMask(), false); err != nil {
return value, errors.Trace(err)
}
}
if IsAutoIncCol(col.ToInfo()) {
alloc := e.Table.Allocators(e.SessionCtx).Get(autoid.AutoIncrementType)
if err := alloc.Rebase(context.Background(), GetAutoRecordID(value, &col.FieldType), false); err != nil {
return value, errors.Trace(err)
}
}
return value, nil
}

func (e *BaseKVEncoder) getActualDatum(col *table.Column, rowID int64, inputDatum *types.Datum) (types.Datum, error) {
var (
value types.Datum
err error
)

isBadNullValue := false
if inputDatum != nil {
value, err = table.CastValue(e.SessionCtx, *inputDatum, col.ToInfo(), false, false)
if err != nil {
return value, err
}
if err := col.CheckNotNull(&value, 0); err == nil {
return value, nil // the most normal case
}
isBadNullValue = true
}
// handle special values
switch {
case IsAutoIncCol(col.ToInfo()):
// we still need a conversion, e.g. to catch overflow with a TINYINT column.
value, err = table.CastValue(e.SessionCtx, types.NewIntDatum(rowID), col.ToInfo(), false, false)
case e.IsAutoRandomCol(col.ToInfo()):
var val types.Datum
realRowID := e.AutoIDFn(rowID)
if mysql.HasUnsignedFlag(col.GetFlag()) {
val = types.NewUintDatum(uint64(realRowID))
} else {
val = types.NewIntDatum(realRowID)
}
value, err = table.CastValue(e.SessionCtx, val, col.ToInfo(), false, false)
case col.IsGenerated():
// inject some dummy value for gen col so that MutRowFromDatums below sees a real value instead of nil.
// if MutRowFromDatums sees a nil it won't initialize the underlying storage and cause SetDatum to panic.
value = types.GetMinValue(&col.FieldType)
case isBadNullValue:
err = col.HandleBadNull(&value, e.SessionCtx.Vars.StmtCtx, 0)
default:
value, err = table.GetColDefaultValue(e.SessionCtx, col.ToInfo())
}
return value, err
}

// IsAutoRandomCol checks if the column is auto random column.
func (e *BaseKVEncoder) IsAutoRandomCol(col *model.ColumnInfo) bool {
return e.Table.Meta().ContainsAutoRandomBits() && col.ID == e.AutoRandomColID
}

// EvalGeneratedColumns evaluates the generated columns.
func (e *BaseKVEncoder) EvalGeneratedColumns(record []types.Datum, cols []*table.Column) (errCol *model.ColumnInfo, err error) {
return evalGeneratedColumns(e.SessionCtx, record, cols, e.GenCols)
}

// LogKVConvertFailed logs the error when converting a row to KV pair failed.
func (e *BaseKVEncoder) LogKVConvertFailed(row []types.Datum, j int, colInfo *model.ColumnInfo, err error) error {
var original types.Datum
if 0 <= j && j < len(row) {
original = row[j]
row = row[j : j+1]
}

e.logger.Error("kv convert failed",
zap.Array("original", RowArrayMarshaller(row)),
zap.Int("originalCol", j),
zap.String("colName", colInfo.Name.O),
zap.Stringer("colType", &colInfo.FieldType),
log.ShortError(err),
)

e.logger.Error("failed to convert kv value", logutil.RedactAny("origVal", original.GetValue()),
zap.Stringer("fieldType", &colInfo.FieldType), zap.String("column", colInfo.Name.O),
zap.Int("columnID", j+1))
return errors.Annotatef(
err,
"failed to cast value as %s for column `%s` (#%d)", &colInfo.FieldType, colInfo.Name.O, j+1,
)
}

// LogEvalGenExprFailed logs the error when evaluating the generated column expression failed.
func (e *BaseKVEncoder) LogEvalGenExprFailed(row []types.Datum, colInfo *model.ColumnInfo, err error) error {
e.logger.Error("kv convert failed: cannot evaluate generated column expression",
zap.Array("original", RowArrayMarshaller(row)),
zap.String("colName", colInfo.Name.O),
log.ShortError(err),
)

return errors.Annotatef(
err,
"failed to evaluate generated column expression for column `%s`",
colInfo.Name.O,
)
}

func evalGeneratedColumns(se *Session, record []types.Datum, cols []*table.Column, genCols []GeneratedCol) (errCol *model.ColumnInfo, err error) {
mutRow := chunk.MutRowFromDatums(record)
for _, gc := range genCols {
col := cols[gc.Index].ToInfo()
evaluated, err := gc.Expr.Eval(mutRow.ToRow())
if err != nil {
return col, err
}
value, err := table.CastValue(se, evaluated, col, false, false)
if err != nil {
return col, err
}
mutRow.SetDatum(gc.Index, value)
record[gc.Index] = value
}
return nil, nil
}
6 changes: 3 additions & 3 deletions br/pkg/lightning/backend/kv/kv2sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type TableKVDecoder struct {
se *Session
// tableName is the unique table name in the form "`db`.`tbl`".
tableName string
genCols []genCol
genCols []GeneratedCol
}

func (t *TableKVDecoder) Name() string {
Expand Down Expand Up @@ -77,7 +77,7 @@ func (t *TableKVDecoder) IterRawIndexKeys(h kv.Handle, rawRow []byte, fn func([]
row[i] = types.GetMinValue(&col.FieldType)
}
}
if _, err := evaluateGeneratedColumns(t.se, row, t.tbl.Cols(), t.genCols); err != nil {
if _, err := evalGeneratedColumns(t.se, row, t.tbl.Cols(), t.genCols); err != nil {
return err
}
}
Expand Down Expand Up @@ -121,7 +121,7 @@ func NewTableKVDecoder(
recordCtx := tables.NewCommonAddRecordCtx(len(cols))
tables.SetAddRecordCtx(se, recordCtx)

genCols, err := collectGeneratedColumns(se, tbl.Meta(), cols)
genCols, err := CollectGeneratedColumns(se, tbl.Meta(), cols)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 4e30a64

Please sign in to comment.