Skip to content

Commit

Permalink
tables: reduce GetSessionVars from MutateContext in tables (#54715
Browse files Browse the repository at this point in the history
)

close #54397
  • Loading branch information
lcwangchao committed Jul 19, 2024
1 parent eab8700 commit 708cbfe
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 55 deletions.
2 changes: 1 addition & 1 deletion pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1384,7 +1384,7 @@ func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, ra
newColumnIDs = append(newColumnIDs, colID)
newRow = append(newRow, val)
}
rd := &w.tblCtx.GetSessionVars().RowEncoder
rd := w.tblCtx.GetRowEncodingConfig().RowEncoder
ec := w.exprCtx.GetEvalCtx().ErrCtx()
var checksum rowcodec.Checksum
if w.checksumNeeded {
Expand Down
3 changes: 3 additions & 0 deletions pkg/errctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ func (ctx *Context) AppendWarning(err error) {
// It also allows using `errors.ErrorGroup`, in this case, it'll handle each error in order, and return the first error
// it founds.
func (ctx *Context) HandleError(err error) error {
if err == nil {
return nil
}
// The function of handling `errors.ErrorGroup` is placed in `HandleError` but not in `HandleErrorWithAlias`, because
// it's hard to give a proper error and warn alias for an error group.
if errs, ok := err.(errors.ErrorGroup); ok {
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1415,12 +1415,12 @@ func (w *tableWorker) compareData(ctx context.Context, task *lookupTableTask, ta
vals = append(vals, row.GetDatum(i, &col.FieldType))
}
tablecodec.TruncateIndexValues(tblInfo, w.idxLookup.index, vals)
sctx := w.idxLookup.Ctx().GetSessionVars().StmtCtx
tc := w.idxLookup.Ctx().GetSessionVars().StmtCtx.TypeCtx()
for i := range vals {
col := w.idxTblCols[i]
idxVal := idxRow.GetDatum(i, w.idxColTps[i])
tablecodec.TruncateIndexValue(&idxVal, w.idxLookup.index.Columns[i], col.ColumnInfo)
cmpRes, err := tables.CompareIndexAndVal(sctx, vals[i], idxVal, collators[i], col.FieldType.IsArray() && vals[i].Kind() == types.KindMysqlJSON)
cmpRes, err := tables.CompareIndexAndVal(tc, vals[i], idxVal, collators[i], col.FieldType.IsArray() && vals[i].Kind() == types.KindMysqlJSON)
if err != nil {
return ir().ReportAdminCheckInconsistentWithColInfo(ctx,
handle,
Expand Down
3 changes: 1 addition & 2 deletions pkg/executor/test/admintest/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1666,13 +1666,12 @@ func TestAdminCheckTableErrorLocateForClusterIndex(t *testing.T) {
tblInfo := tbl.Meta()
idxInfo := tblInfo.Indices[0]
indexOpr := tables.NewIndex(tblInfo.ID, tblInfo, idxInfo)
sc := ctx.GetSessionVars().StmtCtx

pattern := "handle:\\s(\\d+)"
r := regexp.MustCompile(pattern)

getCommonHandle := func(randomRow int) *kv.CommonHandle {
h, err := codec.EncodeKey(sc.TimeZone(), nil, types.MakeDatums(randomRow)...)
h, err := codec.EncodeKey(ctx.GetExprCtx().GetEvalCtx().Location(), nil, types.MakeDatums(randomRow)...)
require.NoError(t, err)
ch, err := kv.NewCommonHandle(h)
require.NoError(t, err)
Expand Down
2 changes: 0 additions & 2 deletions pkg/table/tables/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ go_library(
"//pkg/parser/terror",
"//pkg/sessionctx",
"//pkg/sessionctx/binloginfo",
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/variable",
"//pkg/statistics",
"//pkg/table",
Expand Down Expand Up @@ -95,7 +94,6 @@ go_test(
"//pkg/session",
"//pkg/session/types",
"//pkg/sessionctx",
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/variable",
"//pkg/sessiontxn",
"//pkg/table",
Expand Down
14 changes: 8 additions & 6 deletions pkg/table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,10 @@ func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValu
vars := sctx.GetSessionVars()
writeBufs := vars.GetWriteStmtBufs()
skipCheck := vars.StmtCtx.BatchCheck
sc := sctx.GetSessionVars().StmtCtx
evalCtx := sctx.GetExprCtx().GetEvalCtx()
loc, ec := evalCtx.Location(), evalCtx.ErrCtx()
for _, value := range indexedValues {
key, distinct, err := c.GenIndexKey(sc.ErrCtx(), sc.TimeZone(), value, h, writeBufs.IndexKeyBuf)
key, distinct, err := c.GenIndexKey(ec, loc, value, h, writeBufs.IndexKeyBuf)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -238,9 +239,9 @@ func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValu
c.initNeedRestoreData.Do(func() {
c.needRestoredData = NeedRestoredData(c.idxInfo.Columns, c.tblInfo.Columns)
})
idxVal, err := tablecodec.GenIndexValuePortal(sctx.GetSessionVars().StmtCtx.TimeZone(), c.tblInfo, c.idxInfo,
idxVal, err := tablecodec.GenIndexValuePortal(loc, c.tblInfo, c.idxInfo,
c.needRestoredData, distinct, opt.Untouched, value, h, c.phyTblID, handleRestoreData, nil)
err = sctx.GetSessionVars().StmtCtx.HandleError(err)
err = ec.HandleError(err)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -393,9 +394,10 @@ func needPresumeKeyNotExistsFlag(ctx context.Context, txn kv.Transaction, key, t
// Delete removes the entry for handle h and indexedValues from KV index.
func (c *index) Delete(ctx table.MutateContext, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle) error {
indexedValues := c.getIndexedValue(indexedValue)
sc := ctx.GetSessionVars().StmtCtx
evalCtx := ctx.GetExprCtx().GetEvalCtx()
loc, ec := evalCtx.Location(), evalCtx.ErrCtx()
for _, value := range indexedValues {
key, distinct, err := c.GenIndexKey(sc.ErrCtx(), sc.TimeZone(), value, h, nil)
key, distinct, err := c.GenIndexKey(ec, loc, value, h, nil)
if err != nil {
return err
}
Expand Down
24 changes: 12 additions & 12 deletions pkg/table/tables/mutation_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand Down Expand Up @@ -77,7 +76,7 @@ type columnMaps struct {
// The time complexity is O(M * C + I)
// The space complexity is O(M + C + I)
func CheckDataConsistency(
txn kv.Transaction, sessVars *variable.SessionVars, t *TableCommon,
txn kv.Transaction, tc types.Context, t *TableCommon,
rowToInsert, rowToRemove []types.Datum, memBuffer kv.MemBuffer, sh kv.StagingHandle,
) error {
if t.Meta().GetPartitionInfo() != nil {
Expand Down Expand Up @@ -115,7 +114,7 @@ func CheckDataConsistency(
}

if err := checkIndexKeys(
sessVars, t, rowToInsert, rowToRemove, indexMutations, columnMaps.IndexIDToInfo, columnMaps.IndexIDToRowColInfos,
tc, t, rowToInsert, rowToRemove, indexMutations, columnMaps.IndexIDToInfo, columnMaps.IndexIDToRowColInfos,
); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -205,7 +204,7 @@ func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, in
// (a) {added indices} is a subset of {needed indices} => each index mutation is consistent with the input/row key/value
// (b) {needed indices} is a subset of {added indices}. The check process would be exactly the same with how we generate the mutations, thus ignored.
func checkIndexKeys(
sessVars *variable.SessionVars, t *TableCommon, rowToInsert, rowToRemove []types.Datum,
tc types.Context, t *TableCommon, rowToInsert, rowToRemove []types.Datum,
indexMutations []mutation, indexIDToInfo map[int64]*model.IndexInfo,
indexIDToRowColInfos map[int64][]rowcodec.ColInfo,
) error {
Expand Down Expand Up @@ -260,9 +259,10 @@ func checkIndexKeys(
indexData = indexData[:0]
}

loc := tc.Location()
for i, v := range decodedIndexValues {
fieldType := t.Columns[indexInfo.Columns[i].Offset].FieldType.ArrayType()
datum, err := tablecodec.DecodeColumnValue(v, fieldType, sessVars.Location())
datum, err := tablecodec.DecodeColumnValue(v, fieldType, loc)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -271,9 +271,9 @@ func checkIndexKeys(

// When it is in add index new backfill state.
if len(value) == 0 || isTmpIdxValAndDeleted {
err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToRemove, indexInfo, t.Meta())
err = compareIndexData(tc, t.Columns, indexData, rowToRemove, indexInfo, t.Meta())
} else {
err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToInsert, indexInfo, t.Meta())
err = compareIndexData(tc, t.Columns, indexData, rowToInsert, indexInfo, t.Meta())
}
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -356,7 +356,7 @@ func collectTableMutationsFromBufferStage(t *TableCommon, memBuffer kv.MemBuffer
// compareIndexData compares the decoded index data with the input data.
// Returns error if the index data is not a subset of the input data.
func compareIndexData(
sc *stmtctx.StatementContext, cols []*table.Column, indexData, input []types.Datum, indexInfo *model.IndexInfo,
tc types.Context, cols []*table.Column, indexData, input []types.Datum, indexInfo *model.IndexInfo,
tableInfo *model.TableInfo,
) error {
for i := range indexData {
Expand All @@ -372,7 +372,7 @@ func compareIndexData(
cols[indexInfo.Columns[i].Offset].ColumnInfo,
)

comparison, err := CompareIndexAndVal(sc, expectedDatum, decodedMutationDatum,
comparison, err := CompareIndexAndVal(tc, expectedDatum, decodedMutationDatum,
collate.GetCollator(decodedMutationDatum.Collation()),
cols[indexInfo.Columns[i].Offset].ColumnInfo.FieldType.IsArray() && expectedDatum.Kind() == types.KindMysqlJSON)
if err != nil {
Expand All @@ -392,7 +392,7 @@ func compareIndexData(
}

// CompareIndexAndVal compare index valued and row value.
func CompareIndexAndVal(sctx *stmtctx.StatementContext, rowVal types.Datum, idxVal types.Datum, collator collate.Collator, cmpMVIndex bool) (int, error) {
func CompareIndexAndVal(tc types.Context, rowVal types.Datum, idxVal types.Datum, collator collate.Collator, cmpMVIndex bool) (int, error) {
var cmpRes int
var err error
if cmpMVIndex {
Expand All @@ -401,7 +401,7 @@ func CompareIndexAndVal(sctx *stmtctx.StatementContext, rowVal types.Datum, idxV
count := bj.GetElemCount()
for elemIdx := 0; elemIdx < count; elemIdx++ {
jsonDatum := types.NewJSONDatum(bj.ArrayGetElem(elemIdx))
cmpRes, err = jsonDatum.Compare(sctx.TypeCtx(), &idxVal, collate.GetBinaryCollator())
cmpRes, err = jsonDatum.Compare(tc, &idxVal, collate.GetBinaryCollator())
if err != nil {
return 0, errors.Trace(err)
}
Expand All @@ -410,7 +410,7 @@ func CompareIndexAndVal(sctx *stmtctx.StatementContext, rowVal types.Datum, idxV
}
}
} else {
cmpRes, err = idxVal.Compare(sctx.TypeCtx(), &rowVal, collator)
cmpRes, err = idxVal.Compare(tc, &rowVal, collator)
}
return cmpRes, err
}
Expand Down
27 changes: 13 additions & 14 deletions pkg/table/tables/mutation_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand Down Expand Up @@ -72,7 +71,7 @@ func TestCompareIndexData(t *testing.T) {
}

for caseID, data := range testData {
sc := stmtctx.NewStmtCtx()
tc := types.DefaultStmtNoWarningContext
cols := make([]*table.Column, 0)
indexCols := make([]*model.IndexColumn, 0)
for i, ft := range data.fts {
Expand All @@ -81,7 +80,7 @@ func TestCompareIndexData(t *testing.T) {
}
indexInfo := &model.IndexInfo{Name: model.NewCIStr("i0"), Columns: indexCols}

err := compareIndexData(sc, cols, data.indexData, data.inputData, indexInfo, &model.TableInfo{Name: model.NewCIStr("t")})
err := compareIndexData(tc, cols, data.indexData, data.inputData, indexInfo, &model.TableInfo{Name: model.NewCIStr("t")})
require.Equal(t, data.correct, err == nil, "case id = %v", caseID)
}
}
Expand Down Expand Up @@ -228,15 +227,15 @@ func TestCheckIndexKeysAndCheckHandleConsistency(t *testing.T) {
{ID: 2, Offset: 1, FieldType: *types.NewFieldType(mysql.TypeDatetime)},
},
}
sessVars := variable.NewSessionVars(nil)
tc := types.DefaultStmtNoWarningContext
rd := rowcodec.Encoder{Enable: true}

now := types.CurrentTime(mysql.TypeDatetime)
rowToInsert := []types.Datum{
types.NewStringDatum("some string"),
types.NewTimeDatum(now),
}
anotherTime, err := now.Add(sessVars.StmtCtx.TypeCtx(), types.NewDuration(24, 0, 0, 0, 0))
anotherTime, err := now.Add(tc, types.NewDuration(24, 0, 0, 0, 0))
require.Nil(t, err)
rowToRemove := []types.Datum{
types.NewStringDatum("old string"),
Expand All @@ -254,7 +253,7 @@ func TestCheckIndexKeysAndCheckHandleConsistency(t *testing.T) {
for _, isCommonHandle := range []bool{true, false} {
for _, lc := range locations {
for _, columnInfos := range columnInfoSets {
sessVars.StmtCtx.SetTimeZone(lc)
tc = tc.WithLocation(lc)
tableInfo := model.TableInfo{
ID: 1,
Name: model.NewCIStr("t"),
Expand All @@ -266,7 +265,7 @@ func TestCheckIndexKeysAndCheckHandleConsistency(t *testing.T) {
table := MockTableFromMeta(&tableInfo).(*TableCommon)
var handle, corruptedHandle kv.Handle
if isCommonHandle {
encoded, err := codec.EncodeKey(sessVars.StmtCtx.TimeZone(), nil, rowToInsert[0])
encoded, err := codec.EncodeKey(tc.Location(), nil, rowToInsert[0])
require.Nil(t, err)
corrupted := make([]byte, len(encoded))
copy(corrupted, encoded)
Expand All @@ -285,26 +284,26 @@ func TestCheckIndexKeysAndCheckHandleConsistency(t *testing.T) {
maps := getOrBuildColumnMaps(getter, setter, table)

// test checkIndexKeys
insertionKey, insertionValue, err := buildIndexKeyValue(index, rowToInsert, sessVars, tableInfo,
insertionKey, insertionValue, err := buildIndexKeyValue(index, rowToInsert, tc.Location(), tableInfo,
indexInfo, table, handle)
require.Nil(t, err)
deletionKey, _, err := buildIndexKeyValue(index, rowToRemove, sessVars, tableInfo, indexInfo, table,
deletionKey, _, err := buildIndexKeyValue(index, rowToRemove, tc.Location(), tableInfo, indexInfo, table,
handle)
require.Nil(t, err)
indexMutations := []mutation{
{key: insertionKey, value: insertionValue, indexID: indexInfo.ID},
{key: deletionKey, indexID: indexInfo.ID},
}
err = checkIndexKeys(
sessVars, table, rowToInsert, rowToRemove, indexMutations, maps.IndexIDToInfo,
tc, table, rowToInsert, rowToRemove, indexMutations, maps.IndexIDToInfo,
maps.IndexIDToRowColInfos,
)
require.Nil(t, err)

// test checkHandleConsistency
rowKey := tablecodec.EncodeRowKeyWithHandle(table.tableID, handle)
corruptedRowKey := tablecodec.EncodeRowKeyWithHandle(table.tableID, corruptedHandle)
rowValue, err := tablecodec.EncodeRow(sessVars.StmtCtx.TimeZone(), rowToInsert, []int64{1, 2}, nil, nil, nil, &rd)
rowValue, err := tablecodec.EncodeRow(tc.Location(), rowToInsert, []int64{1, 2}, nil, nil, nil, &rd)
require.Nil(t, err)
rowMutation := mutation{key: rowKey, value: rowValue}
corruptedRowMutation := mutation{key: corruptedRowKey, value: rowValue}
Expand All @@ -318,21 +317,21 @@ func TestCheckIndexKeysAndCheckHandleConsistency(t *testing.T) {
}
}

func buildIndexKeyValue(index table.Index, rowToInsert []types.Datum, sessVars *variable.SessionVars,
func buildIndexKeyValue(index table.Index, rowToInsert []types.Datum, loc *time.Location,
tableInfo model.TableInfo, indexInfo *model.IndexInfo, table *TableCommon, handle kv.Handle) ([]byte, []byte, error) {
indexedValues, err := index.FetchValues(rowToInsert, nil)
if err != nil {
return nil, nil, err
}
key, distinct, err := tablecodec.GenIndexKey(
sessVars.StmtCtx.TimeZone(), &tableInfo, indexInfo, 1, indexedValues, handle, nil,
loc, &tableInfo, indexInfo, 1, indexedValues, handle, nil,
)
if err != nil {
return nil, nil, err
}
rsData := TryGetHandleRestoredDataWrapper(table.meta, rowToInsert, nil, indexInfo)
value, err := tablecodec.GenIndexValuePortal(
sessVars.StmtCtx.TimeZone(), &tableInfo, indexInfo, NeedRestoredData(indexInfo.Columns, tableInfo.Columns),
loc, &tableInfo, indexInfo, NeedRestoredData(indexInfo.Columns, tableInfo.Columns),
distinct, false, indexedValues, handle, 0, rsData, nil,
)
if err != nil {
Expand Down
Loading

0 comments on commit 708cbfe

Please sign in to comment.