Skip to content

Commit

Permalink
ddl: set lossy DDL change source for column worker (#43232)
Browse files Browse the repository at this point in the history
close #43227
  • Loading branch information
Rustin170506 authored Apr 26, 2023
1 parent 0b40301 commit a64b149
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 2 deletions.
14 changes: 14 additions & 0 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1385,6 +1385,20 @@ func (w *updateColumnWorker) BackfillData(handleRange reorgBackfillTask) (taskCt
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0

// Because TiCDC do not want this kind of change,
// so we set the lossy DDL reorg txn source to 1 to
// avoid TiCDC to replicate this kind of change.
var txnSource uint64
if val := txn.GetOption(kv.TxnSource); val != nil {
txnSource, _ = val.(uint64)
}
err := kv.SetLossyDDLReorgSource(&txnSource, kv.LossyDDLColumnReorgSource)
if err != nil {
return errors.Trace(err)
}
txn.SetOption(kv.TxnSource, txnSource)

txn.SetOption(kv.Priority, handleRange.priority)
if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(handleRange.getJobID()); tagger != nil {
txn.SetOption(kv.ResourceGroupTagger, tagger)
Expand Down
3 changes: 2 additions & 1 deletion kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,14 @@ go_test(
"key_test.go",
"main_test.go",
"mock_test.go",
"option_test.go",
"txn_test.go",
"utils_test.go",
"version_test.go",
],
embed = [":kv"],
flaky = True,
shard_count = 19,
shard_count = 21,
deps = [
"//parser/model",
"//parser/mysql",
Expand Down
61 changes: 61 additions & 0 deletions kv/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package kv
import (
"context"

"github.com/pingcap/errors"
"github.com/tikv/client-go/v2/util"
)

Expand Down Expand Up @@ -96,6 +97,10 @@ const (
// ScanBatchSize set the iter scan batch size.
ScanBatchSize
// TxnSource set the source of this transaction.
// We use an uint64 to represent the source of a transaction.
// The first 8 bits are reserved for TiCDC to implement BDR synchronization,
// and the next 8 bits are reserved for Lossy DDL reorg Backfill job.
// The remaining 48 bits are reserved for extendability.
TxnSource
// ResourceGroupName set the bind resource group name.
ResourceGroupName
Expand Down Expand Up @@ -193,3 +198,59 @@ const (
// InternalDistTask is the type of distributed task.
InternalDistTask = "DistTask"
)

// The bitmap:
// |RESERVED|LOSSY_DDL_REORG_SOURCE_BITS|CDC_WRITE_SOURCE_BITS|
// | 48 | 8 | 4(RESERVED) | 4 |
const (
// TiCDC uses 1 - 255 to indicate the source of TiDB.
// For now, 1 - 15 are reserved for TiCDC to implement BDR synchronization.
// 16 - 255 are reserved for extendability.
cdcWriteSourceBits = 8
cdcWriteSourceMax = (1 << cdcWriteSourceBits) - 1

// TiCDC uses 1-255 to indicate the change from a lossy DDL reorg Backfill job.
// For now, we only use 1 for column reorg backfill job.
lossyDDLReorgSourceBits = 8
LossyDDLColumnReorgSource = 1
lossyDDLReorgSourceMax = (1 << lossyDDLReorgSourceBits) - 1
lossyDDLReorgSourceShift = cdcWriteSourceBits
)

// SetCDCWriteSource sets the TiCDC write source in the txnSource.
func SetCDCWriteSource(txnSource *uint64, value uint64) error {
if value > cdcWriteSourceBits {
return errors.Errorf("value %d is out of TiCDC write source range, should be in [1, 15]",
value)
}
*txnSource |= value

return nil
}

func getCDCWriteSource(txnSource uint64) uint64 {
return txnSource & cdcWriteSourceMax
}

func isCDCWriteSourceSet(txnSource uint64) bool {
return (txnSource & cdcWriteSourceMax) != 0
}

// SetLossyDDLReorgSource sets the lossy DDL reorg source in the txnSource.
func SetLossyDDLReorgSource(txnSource *uint64, value uint64) error {
if value > lossyDDLReorgSourceMax {
return errors.Errorf("value %d is out of lossy DDL reorg source range, should be in [1, %d]",
value, lossyDDLReorgSourceMax)
}
*txnSource |= value << lossyDDLReorgSourceShift

return nil
}

func getLossyDDLReorgSource(txnSource uint64) uint64 {
return (txnSource >> lossyDDLReorgSourceShift) & lossyDDLReorgSourceMax
}

func isLossyDDLReorgSourceSet(txnSource uint64) bool {
return (txnSource >> lossyDDLReorgSourceShift) != 0
}
111 changes: 111 additions & 0 deletions kv/option_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2023 PingCAP, Inc.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kv

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestSetCDCWriteSource(t *testing.T) {
for _, tc := range []struct {
name string
cdcWriteSource uint64
expectedSet bool
expectedCdcWriteSource uint64
expectedError string
}{
{
name: "cdc write source is set",
cdcWriteSource: 1,
expectedSet: true,
expectedCdcWriteSource: 1,
},
{
name: "cdc write source is not set",
cdcWriteSource: 0,
expectedSet: false,
expectedCdcWriteSource: 0,
},
{
name: "cdc write source is not valid",
cdcWriteSource: 16,
expectedError: ".*out of TiCDC write source range.*",
},
} {
t.Run(tc.name, func(t *testing.T) {
var txnOption uint64
err := SetCDCWriteSource(&txnOption, tc.cdcWriteSource)
if tc.expectedError != "" {
require.Regexp(t, tc.expectedError, err.Error())
return
}
require.NoError(t, err)
require.Equal(t, tc.expectedSet, isCDCWriteSourceSet(txnOption))
require.Equal(t, tc.expectedCdcWriteSource, getCDCWriteSource(txnOption))
})
}
}

func TestSetLossyDDLReorgSource(t *testing.T) {
for _, tc := range []struct {
name string
currentSource uint64
lossyDDLReorgSource uint64
expectedSet bool
expectedLossyDDLReorgSource uint64
expectedError string
}{
{
name: "lossy ddl reorg source is set",
currentSource: 0,
lossyDDLReorgSource: 1,
expectedSet: true,
expectedLossyDDLReorgSource: 1,
},
{
name: "lossy ddl reorg source is set",
currentSource: 12, // SetCDCWriteSource
lossyDDLReorgSource: 1,
expectedSet: true,
expectedLossyDDLReorgSource: 1,
},
{
name: "lossy ddl reorg source is not set",
currentSource: 12, // SetCDCWriteSource
lossyDDLReorgSource: 0,
expectedSet: false,
expectedLossyDDLReorgSource: 0,
},
{
name: "lossy ddl reorg source is not valid",
currentSource: 12, // SetCDCWriteSource
lossyDDLReorgSource: 256,
expectedError: ".*out of lossy DDL reorg source range.*",
},
} {
t.Run(tc.name, func(t *testing.T) {
err := SetLossyDDLReorgSource(&tc.currentSource, tc.lossyDDLReorgSource)
if tc.expectedError != "" {
require.Regexp(t, tc.expectedError, err.Error())
return
}
require.NoError(t, err)
require.Equal(t, tc.expectedSet, isLossyDDLReorgSourceSet(tc.currentSource))
require.Equal(t, tc.expectedLossyDDLReorgSource, getLossyDDLReorgSource(tc.currentSource))
})
}
}
16 changes: 15 additions & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,21 @@ func (s *session) doCommit(ctx context.Context) error {
if tables := sessVars.TxnCtx.TemporaryTables; len(tables) > 0 {
s.txn.SetOption(kv.KVFilter, temporaryTableKVFilter(tables))
}
s.txn.SetOption(kv.TxnSource, sessVars.CDCWriteSource)

var txnSource uint64
if val := s.txn.GetOption(kv.TxnSource); val != nil {
txnSource, _ = val.(uint64)
}
// If the transaction is started by CDC, we need to set the CDCWriteSource option.
if sessVars.CDCWriteSource != 0 {
err := kv.SetCDCWriteSource(&txnSource, sessVars.CDCWriteSource)
if err != nil {
return errors.Trace(err)
}

s.txn.SetOption(kv.TxnSource, txnSource)
}

if tables := sessVars.TxnCtx.CachedTables; len(tables) > 0 {
c := cachedTableRenewLease{tables: tables}
now := time.Now()
Expand Down

0 comments on commit a64b149

Please sign in to comment.