Skip to content

Commit

Permalink
*: support writing rows with checksum values (#43163) (#43542)
Browse files Browse the repository at this point in the history
ref #43626
  • Loading branch information
ti-chi-bot authored May 9, 2023
1 parent d062cdd commit c8220bb
Show file tree
Hide file tree
Showing 15 changed files with 1,306 additions and 30 deletions.
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ go_library(
"//util/ranger",
"//util/resourcegrouptag",
"//util/rowDecoder",
"//util/rowcodec",
"//util/set",
"//util/slice",
"//util/sqlexec",
Expand Down
72 changes: 65 additions & 7 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/hex"
"fmt"
"math/bits"
"sort"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -49,6 +50,7 @@ import (
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
decoder "github.com/pingcap/tidb/util/rowDecoder"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -1180,6 +1182,9 @@ type updateColumnWorker struct {
rowDecoder *decoder.RowDecoder

rowMap map[int64]types.Datum

checksumBuffer rowcodec.RowData
checksumNeeded bool
}

func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *updateColumnWorker {
Expand All @@ -1197,12 +1202,33 @@ func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalT
}
}
rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap)
checksumNeeded := false
failpoint.Inject("forceRowLevelChecksumOnUpdateColumnBackfill", func() {
orig := variable.EnableRowLevelChecksum.Load()
defer variable.EnableRowLevelChecksum.Store(orig)
variable.EnableRowLevelChecksum.Store(true)
})
// We use global `EnableRowLevelChecksum` to detect whether checksum is enabled in ddl backfill worker because
// `SessionVars.IsRowLevelChecksumEnabled` will filter out internal sessions.
if variable.EnableRowLevelChecksum.Load() {
if numNonPubCols := len(t.DeletableCols()) - len(t.Cols()); numNonPubCols > 1 {
cols := make([]*model.ColumnInfo, len(t.DeletableCols()))
for i, col := range t.DeletableCols() {
cols[i] = col.ToInfo()
}
logutil.BgLogger().Warn("skip checksum in update-column backfill since the number of non-public columns is greater than 1",
zap.String("jobQuery", reorgInfo.Query), zap.String("reorgInfo", reorgInfo.String()), zap.Any("cols", cols))
} else {
checksumNeeded = true
}
}
return &updateColumnWorker{
backfillCtx: newBackfillCtx(reorgInfo.d, id, sessCtx, reorgInfo.SchemaName, t, jc, "update_col_rate", false),
oldColInfo: oldCol,
newColInfo: newCol,
rowDecoder: rowDecoder,
rowMap: make(map[int64]types.Datum, len(decodeColMap)),
backfillCtx: newBackfillCtx(reorgInfo.d, id, sessCtx, reorgInfo.SchemaName, t, jc, "update_col_rate", false),
oldColInfo: oldCol,
newColInfo: newCol,
rowDecoder: rowDecoder,
rowMap: make(map[int64]types.Datum, len(decodeColMap)),
checksumNeeded: checksumNeeded,
}
}

Expand Down Expand Up @@ -1332,15 +1358,15 @@ func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, ra
if err != nil {
return errors.Trace(err)
}

newColumnIDs := make([]int64, 0, len(w.rowMap))
newRow := make([]types.Datum, 0, len(w.rowMap))
for colID, val := range w.rowMap {
newColumnIDs = append(newColumnIDs, colID)
newRow = append(newRow, val)
}
checksums := w.calcChecksums()
sctx, rd := w.sessCtx.GetSessionVars().StmtCtx, &w.sessCtx.GetSessionVars().RowEncoder
newRowVal, err := tablecodec.EncodeRow(sctx, newRow, newColumnIDs, nil, nil, rd)
newRowVal, err := tablecodec.EncodeRow(sctx, newRow, newColumnIDs, nil, nil, rd, checksums...)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1350,6 +1376,38 @@ func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, ra
return nil
}

func (w *updateColumnWorker) calcChecksums() []uint32 {
if !w.checksumNeeded {
return nil
}
// when w.checksumNeeded is true, it indicates that there is only one write-reorg column (the new column) and other
// columns are public, thus we have to calculate two checksums that one of which only contains the old column and
// the other only contains the new column.
var checksums [2]uint32
for i, id := range []int64{w.newColInfo.ID, w.oldColInfo.ID} {
if len(w.checksumBuffer.Cols) > 0 {
w.checksumBuffer.Cols = w.checksumBuffer.Cols[:0]
}
for _, col := range w.table.DeletableCols() {
if col.ID == id || (col.IsGenerated() && !col.GeneratedStored) {
continue
}
d := w.rowMap[col.ID]
w.checksumBuffer.Cols = append(w.checksumBuffer.Cols, rowcodec.ColData{ColumnInfo: col.ToInfo(), Datum: &d})
}
if !sort.IsSorted(w.checksumBuffer) {
sort.Sort(w.checksumBuffer)
}
checksum, err := w.checksumBuffer.Checksum()
if err != nil {
logutil.BgLogger().Warn("skip checksum in update-column backfill due to encode error", zap.Error(err))
return nil
}
checksums[i] = checksum
}
return checksums[:]
}

// reformatErrors casted error because `convertTo` function couldn't package column name and datum value for some errors.
func (w *updateColumnWorker) reformatErrors(err error) error {
// Since row count is not precious in concurrent reorganization, here we substitute row count with datum value.
Expand Down
71 changes: 71 additions & 0 deletions ddl/column_type_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
func TestColumnTypeChangeBetweenInteger(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

// Modify column from null to not null.
Expand Down Expand Up @@ -108,6 +109,7 @@ func TestColumnTypeChangeBetweenInteger(t *testing.T) {
func TestColumnTypeChangeStateBetweenInteger(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (c1 int, c2 int)")
Expand Down Expand Up @@ -176,6 +178,7 @@ func TestColumnTypeChangeStateBetweenInteger(t *testing.T) {
func TestRollbackColumnTypeChangeBetweenInteger(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (c1 bigint, c2 bigint)")
Expand Down Expand Up @@ -255,6 +258,7 @@ func init() {
func TestColumnTypeChangeFromIntegerToOthers(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

prepare := func(tk *testkit.TestKit) {
Expand Down Expand Up @@ -406,6 +410,7 @@ func TestColumnTypeChangeFromIntegerToOthers(t *testing.T) {
func TestColumnTypeChangeBetweenVarcharAndNonVarchar(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")
tk.MustExec("drop database if exists col_type_change_char;")
tk.MustExec("create database col_type_change_char;")
Expand Down Expand Up @@ -434,6 +439,7 @@ func TestColumnTypeChangeBetweenVarcharAndNonVarchar(t *testing.T) {
func TestColumnTypeChangeFromStringToOthers(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

// Set time zone to UTC.
Expand Down Expand Up @@ -665,6 +671,7 @@ func TestColumnTypeChangeFromStringToOthers(t *testing.T) {
func TestColumnTypeChangeFromNumericToOthers(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

// Set time zone to UTC.
Expand Down Expand Up @@ -929,6 +936,7 @@ func TestColumnTypeChangeFromNumericToOthers(t *testing.T) {
func TestColumnTypeChangeIgnoreDisplayLength(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

var assertResult bool
Expand Down Expand Up @@ -970,6 +978,7 @@ func TestColumnTypeChangeIgnoreDisplayLength(t *testing.T) {
func TestColumnTypeChangeFromDateTimeTypeToOthers(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

// Set time zone to UTC.
Expand Down Expand Up @@ -1148,6 +1157,7 @@ func TestColumnTypeChangeFromDateTimeTypeToOthers(t *testing.T) {
func TestColumnTypeChangeFromJsonToOthers(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

// Set time zone to UTC.
Expand Down Expand Up @@ -1541,6 +1551,7 @@ func TestColumnTypeChangeFromJsonToOthers(t *testing.T) {
func TestUpdateDataAfterChangeTimestampToDate(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")
tk.MustExec("drop table if exists t, t1")
tk.MustExec("create table t (col timestamp default '1971-06-09' not null, col1 int default 1, unique key(col1));")
Expand Down Expand Up @@ -1580,6 +1591,50 @@ func TestRowFormat(t *testing.T) {
tk.MustExec("drop table if exists t")
}

func TestRowFormatWithChecksums(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int primary key, v varchar(10))")
tk.MustExec("insert into t values (1, \"123\");")
tk.MustExec("alter table t modify column v varchar(5);")

tbl := external.GetTableByName(t, tk, "test", "t")
encodedKey := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(1))

h := helper.NewHelper(store.(helper.Storage))
data, err := h.GetMvccByEncodedKey(encodedKey)
require.NoError(t, err)
// row value with checksums
require.Equal(t, []byte{0x80, 0x2, 0x3, 0x0, 0x0, 0x0, 0x1, 0x2, 0x3, 0x1, 0x0, 0x4, 0x0, 0x7, 0x0, 0x1, 0x31, 0x32, 0x33, 0x31, 0x32, 0x33, 0x8, 0x52, 0x78, 0xc9, 0x28, 0x52, 0x78, 0xc9, 0x28}, data.Info.Writes[0].ShortValue)
tk.MustExec("drop table if exists t")
}

func TestRowLevelChecksumWithMultiSchemaChange(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int primary key, v varchar(10))")
tk.MustExec("insert into t values (1, \"123\")")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/forceRowLevelChecksumOnUpdateColumnBackfill", "return"))
defer failpoint.Disable("github.com/pingcap/tidb/ddl/forceRowLevelChecksumOnUpdateColumnBackfill")
tk.MustExec("alter table t add column vv int, modify column v varchar(5)")

tbl := external.GetTableByName(t, tk, "test", "t")
encodedKey := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(1))

h := helper.NewHelper(store.(helper.Storage))
data, err := h.GetMvccByEncodedKey(encodedKey)
require.NoError(t, err)
// checksum skipped and with a null col vv
require.Equal(t, []byte{0x80, 0x0, 0x3, 0x0, 0x1, 0x0, 0x1, 0x2, 0x4, 0x3, 0x1, 0x0, 0x4, 0x0, 0x7, 0x0, 0x1, 0x31, 0x32, 0x33, 0x31, 0x32, 0x33}, data.Info.Writes[0].ShortValue)
tk.MustExec("drop table if exists t")
}

// Close issue #22395
// Background:
// Since the changing column is implemented as adding a new column and substitute the old one when it finished.
Expand All @@ -1589,6 +1644,7 @@ func TestRowFormat(t *testing.T) {
func TestChangingColOriginDefaultValue(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

tk1 := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -1666,6 +1722,7 @@ func TestChangingColOriginDefaultValue(t *testing.T) {
func TestChangingColOriginDefaultValueAfterAddColAndCastSucc(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

tk1 := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -1753,6 +1810,7 @@ func TestChangingColOriginDefaultValueAfterAddColAndCastSucc(t *testing.T) {
func TestChangingColOriginDefaultValueAfterAddColAndCastFail(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

tk1 := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -1810,6 +1868,7 @@ func TestChangingColOriginDefaultValueAfterAddColAndCastFail(t *testing.T) {
func TestChangingAttributeOfColumnWithFK(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

prepare := func() {
Expand Down Expand Up @@ -1845,6 +1904,7 @@ func TestChangingAttributeOfColumnWithFK(t *testing.T) {
func TestAlterPrimaryKeyToNull(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

tk.MustExec("drop table if exists t, t1")
Expand Down Expand Up @@ -1881,6 +1941,7 @@ func TestChangeUnsignedIntToDatetime(t *testing.T) {
func TestDDLExitWhenCancelMeetPanic(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

tk.MustExec("drop table if exists t")
Expand Down Expand Up @@ -1923,6 +1984,7 @@ func TestDDLExitWhenCancelMeetPanic(t *testing.T) {
func TestChangeIntToBitWillPanicInBackfillIndexes(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

tk.MustExec("drop table if exists t")
Expand Down Expand Up @@ -1953,6 +2015,7 @@ func TestChangeIntToBitWillPanicInBackfillIndexes(t *testing.T) {
func TestCancelCTCInReorgStateWillCauseGoroutineLeak(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockInfiniteReorgLogic", `return(true)`))
Expand Down Expand Up @@ -2005,6 +2068,7 @@ func TestCancelCTCInReorgStateWillCauseGoroutineLeak(t *testing.T) {
func TestCTCShouldCastTheDefaultValue(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

tk.MustExec("drop table if exists t")
Expand Down Expand Up @@ -2036,6 +2100,7 @@ func TestCTCShouldCastTheDefaultValue(t *testing.T) {
func TestCTCCastBitToBinary(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

// For point 1:
Expand Down Expand Up @@ -2200,6 +2265,7 @@ func TestChangeFromTimeToYear(t *testing.T) {
func TestCastDateToTimestampInReorgAttribute(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, 600*time.Millisecond)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

tk.MustExec("CREATE TABLE `t` (`a` DATE NULL DEFAULT '8497-01-06')")
Expand Down Expand Up @@ -2263,6 +2329,7 @@ func TestForIssue24621(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a char(250));")
Expand All @@ -2274,6 +2341,7 @@ func TestForIssue24621(t *testing.T) {
func TestChangeNullValueFromOtherTypeToTimestamp(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

// Some ddl cases.
Expand Down Expand Up @@ -2316,6 +2384,7 @@ func TestColumnTypeChangeBetweenFloatAndDouble(t *testing.T) {
store := testkit.CreateMockStore(t)
// issue #31372
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

prepare := func(createTableStmt string) {
Expand All @@ -2342,6 +2411,7 @@ func TestColumnTypeChangeBetweenFloatAndDouble(t *testing.T) {
func TestColumnTypeChangeTimestampToInt(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

// 1. modify a timestamp column to bigint
Expand Down Expand Up @@ -2430,6 +2500,7 @@ func TestColumnTypeChangeTimestampToInt(t *testing.T) {
func TestFixDDLTxnWillConflictWithReorgTxn(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

tk.MustExec("create table t (a int)")
Expand Down
Loading

0 comments on commit c8220bb

Please sign in to comment.