diff --git a/ddl/column.go b/ddl/column.go index 843a4c4a0b72a..a7f9dee08c24f 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -1385,6 +1385,20 @@ func (w *updateColumnWorker) BackfillData(handleRange reorgBackfillTask) (taskCt errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error { taskCtx.addedCount = 0 taskCtx.scanCount = 0 + + // Because TiCDC do not want this kind of change, + // so we set the lossy DDL reorg txn source to 1 to + // avoid TiCDC to replicate this kind of change. + var txnSource uint64 + if val := txn.GetOption(kv.TxnSource); val != nil { + txnSource, _ = val.(uint64) + } + err := kv.SetLossyDDLReorgSource(&txnSource, kv.LossyDDLColumnReorgSource) + if err != nil { + return errors.Trace(err) + } + txn.SetOption(kv.TxnSource, txnSource) + txn.SetOption(kv.Priority, handleRange.priority) if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(handleRange.getJobID()); tagger != nil { txn.SetOption(kv.ResourceGroupTagger, tagger) diff --git a/kv/BUILD.bazel b/kv/BUILD.bazel index ebfe5252110b8..a34e7aa9bae52 100644 --- a/kv/BUILD.bazel +++ b/kv/BUILD.bazel @@ -67,13 +67,14 @@ go_test( "key_test.go", "main_test.go", "mock_test.go", + "option_test.go", "txn_test.go", "utils_test.go", "version_test.go", ], embed = [":kv"], flaky = True, - shard_count = 19, + shard_count = 21, deps = [ "//parser/model", "//parser/mysql", diff --git a/kv/option.go b/kv/option.go index 84911be7cafb2..62c0be765b995 100644 --- a/kv/option.go +++ b/kv/option.go @@ -17,6 +17,7 @@ package kv import ( "context" + "github.com/pingcap/errors" "github.com/tikv/client-go/v2/util" ) @@ -96,6 +97,10 @@ const ( // ScanBatchSize set the iter scan batch size. ScanBatchSize // TxnSource set the source of this transaction. + // We use an uint64 to represent the source of a transaction. + // The first 8 bits are reserved for TiCDC to implement BDR synchronization, + // and the next 8 bits are reserved for Lossy DDL reorg Backfill job. + // The remaining 48 bits are reserved for extendability. TxnSource // ResourceGroupName set the bind resource group name. ResourceGroupName @@ -193,3 +198,59 @@ const ( // InternalDistTask is the type of distributed task. InternalDistTask = "DistTask" ) + +// The bitmap: +// |RESERVED|LOSSY_DDL_REORG_SOURCE_BITS|CDC_WRITE_SOURCE_BITS| +// | 48 | 8 | 4(RESERVED) | 4 | +const ( + // TiCDC uses 1 - 255 to indicate the source of TiDB. + // For now, 1 - 15 are reserved for TiCDC to implement BDR synchronization. + // 16 - 255 are reserved for extendability. + cdcWriteSourceBits = 8 + cdcWriteSourceMax = (1 << cdcWriteSourceBits) - 1 + + // TiCDC uses 1-255 to indicate the change from a lossy DDL reorg Backfill job. + // For now, we only use 1 for column reorg backfill job. + lossyDDLReorgSourceBits = 8 + LossyDDLColumnReorgSource = 1 + lossyDDLReorgSourceMax = (1 << lossyDDLReorgSourceBits) - 1 + lossyDDLReorgSourceShift = cdcWriteSourceBits +) + +// SetCDCWriteSource sets the TiCDC write source in the txnSource. +func SetCDCWriteSource(txnSource *uint64, value uint64) error { + if value > cdcWriteSourceBits { + return errors.Errorf("value %d is out of TiCDC write source range, should be in [1, 15]", + value) + } + *txnSource |= value + + return nil +} + +func getCDCWriteSource(txnSource uint64) uint64 { + return txnSource & cdcWriteSourceMax +} + +func isCDCWriteSourceSet(txnSource uint64) bool { + return (txnSource & cdcWriteSourceMax) != 0 +} + +// SetLossyDDLReorgSource sets the lossy DDL reorg source in the txnSource. +func SetLossyDDLReorgSource(txnSource *uint64, value uint64) error { + if value > lossyDDLReorgSourceMax { + return errors.Errorf("value %d is out of lossy DDL reorg source range, should be in [1, %d]", + value, lossyDDLReorgSourceMax) + } + *txnSource |= value << lossyDDLReorgSourceShift + + return nil +} + +func getLossyDDLReorgSource(txnSource uint64) uint64 { + return (txnSource >> lossyDDLReorgSourceShift) & lossyDDLReorgSourceMax +} + +func isLossyDDLReorgSourceSet(txnSource uint64) bool { + return (txnSource >> lossyDDLReorgSourceShift) != 0 +} diff --git a/kv/option_test.go b/kv/option_test.go new file mode 100644 index 0000000000000..922a1bc00f348 --- /dev/null +++ b/kv/option_test.go @@ -0,0 +1,111 @@ +// Copyright 2023 PingCAP, Inc. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSetCDCWriteSource(t *testing.T) { + for _, tc := range []struct { + name string + cdcWriteSource uint64 + expectedSet bool + expectedCdcWriteSource uint64 + expectedError string + }{ + { + name: "cdc write source is set", + cdcWriteSource: 1, + expectedSet: true, + expectedCdcWriteSource: 1, + }, + { + name: "cdc write source is not set", + cdcWriteSource: 0, + expectedSet: false, + expectedCdcWriteSource: 0, + }, + { + name: "cdc write source is not valid", + cdcWriteSource: 16, + expectedError: ".*out of TiCDC write source range.*", + }, + } { + t.Run(tc.name, func(t *testing.T) { + var txnOption uint64 + err := SetCDCWriteSource(&txnOption, tc.cdcWriteSource) + if tc.expectedError != "" { + require.Regexp(t, tc.expectedError, err.Error()) + return + } + require.NoError(t, err) + require.Equal(t, tc.expectedSet, isCDCWriteSourceSet(txnOption)) + require.Equal(t, tc.expectedCdcWriteSource, getCDCWriteSource(txnOption)) + }) + } +} + +func TestSetLossyDDLReorgSource(t *testing.T) { + for _, tc := range []struct { + name string + currentSource uint64 + lossyDDLReorgSource uint64 + expectedSet bool + expectedLossyDDLReorgSource uint64 + expectedError string + }{ + { + name: "lossy ddl reorg source is set", + currentSource: 0, + lossyDDLReorgSource: 1, + expectedSet: true, + expectedLossyDDLReorgSource: 1, + }, + { + name: "lossy ddl reorg source is set", + currentSource: 12, // SetCDCWriteSource + lossyDDLReorgSource: 1, + expectedSet: true, + expectedLossyDDLReorgSource: 1, + }, + { + name: "lossy ddl reorg source is not set", + currentSource: 12, // SetCDCWriteSource + lossyDDLReorgSource: 0, + expectedSet: false, + expectedLossyDDLReorgSource: 0, + }, + { + name: "lossy ddl reorg source is not valid", + currentSource: 12, // SetCDCWriteSource + lossyDDLReorgSource: 256, + expectedError: ".*out of lossy DDL reorg source range.*", + }, + } { + t.Run(tc.name, func(t *testing.T) { + err := SetLossyDDLReorgSource(&tc.currentSource, tc.lossyDDLReorgSource) + if tc.expectedError != "" { + require.Regexp(t, tc.expectedError, err.Error()) + return + } + require.NoError(t, err) + require.Equal(t, tc.expectedSet, isLossyDDLReorgSourceSet(tc.currentSource)) + require.Equal(t, tc.expectedLossyDDLReorgSource, getLossyDDLReorgSource(tc.currentSource)) + }) + } +} diff --git a/session/session.go b/session/session.go index d0374afe9605d..4bb1f2525c955 100644 --- a/session/session.go +++ b/session/session.go @@ -658,7 +658,21 @@ func (s *session) doCommit(ctx context.Context) error { if tables := sessVars.TxnCtx.TemporaryTables; len(tables) > 0 { s.txn.SetOption(kv.KVFilter, temporaryTableKVFilter(tables)) } - s.txn.SetOption(kv.TxnSource, sessVars.CDCWriteSource) + + var txnSource uint64 + if val := s.txn.GetOption(kv.TxnSource); val != nil { + txnSource, _ = val.(uint64) + } + // If the transaction is started by CDC, we need to set the CDCWriteSource option. + if sessVars.CDCWriteSource != 0 { + err := kv.SetCDCWriteSource(&txnSource, sessVars.CDCWriteSource) + if err != nil { + return errors.Trace(err) + } + + s.txn.SetOption(kv.TxnSource, txnSource) + } + if tables := sessVars.TxnCtx.CachedTables; len(tables) > 0 { c := cachedTableRenewLease{tables: tables} now := time.Now()