Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support writing rows with checksum values (#43163) #43542

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ go_library(
"//util/ranger",
"//util/resourcegrouptag",
"//util/rowDecoder",
"//util/rowcodec",
"//util/set",
"//util/slice",
"//util/sqlexec",
Expand Down
72 changes: 65 additions & 7 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/hex"
"fmt"
"math/bits"
"sort"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -49,6 +50,7 @@ import (
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
decoder "github.com/pingcap/tidb/util/rowDecoder"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -1180,6 +1182,9 @@ type updateColumnWorker struct {
rowDecoder *decoder.RowDecoder

rowMap map[int64]types.Datum

checksumBuffer rowcodec.RowData
checksumNeeded bool
}

func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *updateColumnWorker {
Expand All @@ -1197,12 +1202,33 @@ func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalT
}
}
rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap)
checksumNeeded := false
failpoint.Inject("forceRowLevelChecksumOnUpdateColumnBackfill", func() {
orig := variable.EnableRowLevelChecksum.Load()
defer variable.EnableRowLevelChecksum.Store(orig)
variable.EnableRowLevelChecksum.Store(true)
})
// We use global `EnableRowLevelChecksum` to detect whether checksum is enabled in ddl backfill worker because
// `SessionVars.IsRowLevelChecksumEnabled` will filter out internal sessions.
if variable.EnableRowLevelChecksum.Load() {
if numNonPubCols := len(t.DeletableCols()) - len(t.Cols()); numNonPubCols > 1 {
cols := make([]*model.ColumnInfo, len(t.DeletableCols()))
for i, col := range t.DeletableCols() {
cols[i] = col.ToInfo()
}
logutil.BgLogger().Warn("skip checksum in update-column backfill since the number of non-public columns is greater than 1",
zap.String("jobQuery", reorgInfo.Query), zap.String("reorgInfo", reorgInfo.String()), zap.Any("cols", cols))
} else {
checksumNeeded = true
}
}
return &updateColumnWorker{
backfillCtx: newBackfillCtx(reorgInfo.d, id, sessCtx, reorgInfo.SchemaName, t, jc, "update_col_rate", false),
oldColInfo: oldCol,
newColInfo: newCol,
rowDecoder: rowDecoder,
rowMap: make(map[int64]types.Datum, len(decodeColMap)),
backfillCtx: newBackfillCtx(reorgInfo.d, id, sessCtx, reorgInfo.SchemaName, t, jc, "update_col_rate", false),
oldColInfo: oldCol,
newColInfo: newCol,
rowDecoder: rowDecoder,
rowMap: make(map[int64]types.Datum, len(decodeColMap)),
checksumNeeded: checksumNeeded,
}
}

Expand Down Expand Up @@ -1332,15 +1358,15 @@ func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, ra
if err != nil {
return errors.Trace(err)
}

newColumnIDs := make([]int64, 0, len(w.rowMap))
newRow := make([]types.Datum, 0, len(w.rowMap))
for colID, val := range w.rowMap {
newColumnIDs = append(newColumnIDs, colID)
newRow = append(newRow, val)
}
checksums := w.calcChecksums()
sctx, rd := w.sessCtx.GetSessionVars().StmtCtx, &w.sessCtx.GetSessionVars().RowEncoder
newRowVal, err := tablecodec.EncodeRow(sctx, newRow, newColumnIDs, nil, nil, rd)
newRowVal, err := tablecodec.EncodeRow(sctx, newRow, newColumnIDs, nil, nil, rd, checksums...)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1350,6 +1376,38 @@ func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, ra
return nil
}

func (w *updateColumnWorker) calcChecksums() []uint32 {
if !w.checksumNeeded {
return nil
}
// when w.checksumNeeded is true, it indicates that there is only one write-reorg column (the new column) and other
// columns are public, thus we have to calculate two checksums that one of which only contains the old column and
// the other only contains the new column.
var checksums [2]uint32
for i, id := range []int64{w.newColInfo.ID, w.oldColInfo.ID} {
if len(w.checksumBuffer.Cols) > 0 {
w.checksumBuffer.Cols = w.checksumBuffer.Cols[:0]
}
for _, col := range w.table.DeletableCols() {
if col.ID == id || (col.IsGenerated() && !col.GeneratedStored) {
continue
}
d := w.rowMap[col.ID]
w.checksumBuffer.Cols = append(w.checksumBuffer.Cols, rowcodec.ColData{ColumnInfo: col.ToInfo(), Datum: &d})
}
if !sort.IsSorted(w.checksumBuffer) {
sort.Sort(w.checksumBuffer)
}
checksum, err := w.checksumBuffer.Checksum()
if err != nil {
logutil.BgLogger().Warn("skip checksum in update-column backfill due to encode error", zap.Error(err))
return nil
}
checksums[i] = checksum
}
return checksums[:]
}

// reformatErrors casted error because `convertTo` function couldn't package column name and datum value for some errors.
func (w *updateColumnWorker) reformatErrors(err error) error {
// Since row count is not precious in concurrent reorganization, here we substitute row count with datum value.
Expand Down
71 changes: 71 additions & 0 deletions ddl/column_type_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
func TestColumnTypeChangeBetweenInteger(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

// Modify column from null to not null.
Expand Down Expand Up @@ -108,6 +109,7 @@ func TestColumnTypeChangeBetweenInteger(t *testing.T) {
func TestColumnTypeChangeStateBetweenInteger(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (c1 int, c2 int)")
Expand Down Expand Up @@ -176,6 +178,7 @@ func TestColumnTypeChangeStateBetweenInteger(t *testing.T) {
func TestRollbackColumnTypeChangeBetweenInteger(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (c1 bigint, c2 bigint)")
Expand Down Expand Up @@ -255,6 +258,7 @@ func init() {
func TestColumnTypeChangeFromIntegerToOthers(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

prepare := func(tk *testkit.TestKit) {
Expand Down Expand Up @@ -406,6 +410,7 @@ func TestColumnTypeChangeFromIntegerToOthers(t *testing.T) {
func TestColumnTypeChangeBetweenVarcharAndNonVarchar(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")
tk.MustExec("drop database if exists col_type_change_char;")
tk.MustExec("create database col_type_change_char;")
Expand Down Expand Up @@ -434,6 +439,7 @@ func TestColumnTypeChangeBetweenVarcharAndNonVarchar(t *testing.T) {
func TestColumnTypeChangeFromStringToOthers(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

// Set time zone to UTC.
Expand Down Expand Up @@ -665,6 +671,7 @@ func TestColumnTypeChangeFromStringToOthers(t *testing.T) {
func TestColumnTypeChangeFromNumericToOthers(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

// Set time zone to UTC.
Expand Down Expand Up @@ -929,6 +936,7 @@ func TestColumnTypeChangeFromNumericToOthers(t *testing.T) {
func TestColumnTypeChangeIgnoreDisplayLength(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

var assertResult bool
Expand Down Expand Up @@ -970,6 +978,7 @@ func TestColumnTypeChangeIgnoreDisplayLength(t *testing.T) {
func TestColumnTypeChangeFromDateTimeTypeToOthers(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

// Set time zone to UTC.
Expand Down Expand Up @@ -1148,6 +1157,7 @@ func TestColumnTypeChangeFromDateTimeTypeToOthers(t *testing.T) {
func TestColumnTypeChangeFromJsonToOthers(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

// Set time zone to UTC.
Expand Down Expand Up @@ -1541,6 +1551,7 @@ func TestColumnTypeChangeFromJsonToOthers(t *testing.T) {
func TestUpdateDataAfterChangeTimestampToDate(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")
tk.MustExec("drop table if exists t, t1")
tk.MustExec("create table t (col timestamp default '1971-06-09' not null, col1 int default 1, unique key(col1));")
Expand Down Expand Up @@ -1580,6 +1591,50 @@ func TestRowFormat(t *testing.T) {
tk.MustExec("drop table if exists t")
}

func TestRowFormatWithChecksums(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int primary key, v varchar(10))")
tk.MustExec("insert into t values (1, \"123\");")
tk.MustExec("alter table t modify column v varchar(5);")

tbl := external.GetTableByName(t, tk, "test", "t")
encodedKey := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(1))

h := helper.NewHelper(store.(helper.Storage))
data, err := h.GetMvccByEncodedKey(encodedKey)
require.NoError(t, err)
// row value with checksums
require.Equal(t, []byte{0x80, 0x2, 0x3, 0x0, 0x0, 0x0, 0x1, 0x2, 0x3, 0x1, 0x0, 0x4, 0x0, 0x7, 0x0, 0x1, 0x31, 0x32, 0x33, 0x31, 0x32, 0x33, 0x8, 0x52, 0x78, 0xc9, 0x28, 0x52, 0x78, 0xc9, 0x28}, data.Info.Writes[0].ShortValue)
tk.MustExec("drop table if exists t")
}

func TestRowLevelChecksumWithMultiSchemaChange(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int primary key, v varchar(10))")
tk.MustExec("insert into t values (1, \"123\")")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/forceRowLevelChecksumOnUpdateColumnBackfill", "return"))
defer failpoint.Disable("github.com/pingcap/tidb/ddl/forceRowLevelChecksumOnUpdateColumnBackfill")
tk.MustExec("alter table t add column vv int, modify column v varchar(5)")

tbl := external.GetTableByName(t, tk, "test", "t")
encodedKey := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(1))

h := helper.NewHelper(store.(helper.Storage))
data, err := h.GetMvccByEncodedKey(encodedKey)
require.NoError(t, err)
// checksum skipped and with a null col vv
require.Equal(t, []byte{0x80, 0x0, 0x3, 0x0, 0x1, 0x0, 0x1, 0x2, 0x4, 0x3, 0x1, 0x0, 0x4, 0x0, 0x7, 0x0, 0x1, 0x31, 0x32, 0x33, 0x31, 0x32, 0x33}, data.Info.Writes[0].ShortValue)
tk.MustExec("drop table if exists t")
}

// Close issue #22395
// Background:
// Since the changing column is implemented as adding a new column and substitute the old one when it finished.
Expand All @@ -1589,6 +1644,7 @@ func TestRowFormat(t *testing.T) {
func TestChangingColOriginDefaultValue(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

tk1 := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -1666,6 +1722,7 @@ func TestChangingColOriginDefaultValue(t *testing.T) {
func TestChangingColOriginDefaultValueAfterAddColAndCastSucc(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

tk1 := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -1753,6 +1810,7 @@ func TestChangingColOriginDefaultValueAfterAddColAndCastSucc(t *testing.T) {
func TestChangingColOriginDefaultValueAfterAddColAndCastFail(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

tk1 := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -1810,6 +1868,7 @@ func TestChangingColOriginDefaultValueAfterAddColAndCastFail(t *testing.T) {
func TestChangingAttributeOfColumnWithFK(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

prepare := func() {
Expand Down Expand Up @@ -1845,6 +1904,7 @@ func TestChangingAttributeOfColumnWithFK(t *testing.T) {
func TestAlterPrimaryKeyToNull(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

tk.MustExec("drop table if exists t, t1")
Expand Down Expand Up @@ -1881,6 +1941,7 @@ func TestChangeUnsignedIntToDatetime(t *testing.T) {
func TestDDLExitWhenCancelMeetPanic(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

tk.MustExec("drop table if exists t")
Expand Down Expand Up @@ -1923,6 +1984,7 @@ func TestDDLExitWhenCancelMeetPanic(t *testing.T) {
func TestChangeIntToBitWillPanicInBackfillIndexes(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

tk.MustExec("drop table if exists t")
Expand Down Expand Up @@ -1953,6 +2015,7 @@ func TestChangeIntToBitWillPanicInBackfillIndexes(t *testing.T) {
func TestCancelCTCInReorgStateWillCauseGoroutineLeak(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockInfiniteReorgLogic", `return(true)`))
Expand Down Expand Up @@ -2005,6 +2068,7 @@ func TestCancelCTCInReorgStateWillCauseGoroutineLeak(t *testing.T) {
func TestCTCShouldCastTheDefaultValue(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

tk.MustExec("drop table if exists t")
Expand Down Expand Up @@ -2036,6 +2100,7 @@ func TestCTCShouldCastTheDefaultValue(t *testing.T) {
func TestCTCCastBitToBinary(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

// For point 1:
Expand Down Expand Up @@ -2200,6 +2265,7 @@ func TestChangeFromTimeToYear(t *testing.T) {
func TestCastDateToTimestampInReorgAttribute(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, 600*time.Millisecond)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

tk.MustExec("CREATE TABLE `t` (`a` DATE NULL DEFAULT '8497-01-06')")
Expand Down Expand Up @@ -2263,6 +2329,7 @@ func TestForIssue24621(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a char(250));")
Expand All @@ -2274,6 +2341,7 @@ func TestForIssue24621(t *testing.T) {
func TestChangeNullValueFromOtherTypeToTimestamp(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

// Some ddl cases.
Expand Down Expand Up @@ -2316,6 +2384,7 @@ func TestColumnTypeChangeBetweenFloatAndDouble(t *testing.T) {
store := testkit.CreateMockStore(t)
// issue #31372
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

prepare := func(createTableStmt string) {
Expand All @@ -2342,6 +2411,7 @@ func TestColumnTypeChangeBetweenFloatAndDouble(t *testing.T) {
func TestColumnTypeChangeTimestampToInt(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

// 1. modify a timestamp column to bigint
Expand Down Expand Up @@ -2430,6 +2500,7 @@ func TestColumnTypeChangeTimestampToInt(t *testing.T) {
func TestFixDDLTxnWillConflictWithReorgTxn(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_row_level_checksum = 1")
tk.MustExec("use test")

tk.MustExec("create table t (a int)")
Expand Down
Loading