Skip to content

Commit

Permalink
Merge branch 'fix-issue-34766' of https://github.com/e11jah/tidb into…
Browse files Browse the repository at this point in the history
… fix-issue-34766
  • Loading branch information
e1ijah1 committed May 30, 2022
2 parents 16dff71 + 3f1ca4b commit 96441d7
Show file tree
Hide file tree
Showing 54 changed files with 638 additions and 196 deletions.
1 change: 1 addition & 0 deletions br/cmd/br/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,5 +160,6 @@ func newStreamRestoreCommand() *cobra.Command {
}
task.DefineFilterFlags(command, filterOutSysAndMemTables, true)
task.DefineStreamRestoreFlags(command)
command.Hidden = true
return command
}
1 change: 1 addition & 0 deletions br/cmd/br/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func NewStreamCommand() *cobra.Command {
command.Root().HelpFunc()(command, strings)
})

command.Hidden = true
return command
}

Expand Down
7 changes: 7 additions & 0 deletions br/pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ func (push *pushDown) pushBackup(
// Finished.
return res, nil
}
failpoint.Inject("backup-timeout-error", func(val failpoint.Value) {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint backup-timeout-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Msg: msg,
}
})
failpoint.Inject("backup-storage-error", func(val failpoint.Value) {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint backup-storage-error injected.", zap.String("msg", msg))
Expand Down
1 change: 1 addition & 0 deletions br/pkg/utils/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var retryableServerError = []string{
"error during dispatch",
"put object timeout",
"internalerror",
"not read from or written to within the timeout period",
}

// RetryableFunc presents a retryable operation.
Expand Down
15 changes: 12 additions & 3 deletions br/tests/br_full/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,23 @@ if ps -q $pid ; then
exit 1
fi


# backup full
echo "backup with lz4 start..."
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\")"
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-lz4" --concurrency 4 --compression lz4
test_log="${TEST_DIR}/${DB}_test.log"
error_str="not read from or written to within the timeout period"
unset BR_LOG_TO_TERM

export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\");github.com/pingcap/tidb/br/pkg/backup/backup-timeout-error=1*return(\"not read from or written to within the timeout period\")"
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-lz4" --concurrency 4 --compression lz4 --log-file $test_log
export GO_FAILPOINTS=""
size_lz4=$(du -d 0 $TEST_DIR/$DB-lz4 | awk '{print $1}')

if ! grep -i "$error_str" $test_log; then
echo "${error_str} not found in log"
echo "TEST: [$TEST_NAME] test restore failed!"
exit 1
fi

echo "backup with zstd start..."
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-zstd" --concurrency 4 --compression zstd --compression-level 6
size_zstd=$(du -d 0 $TEST_DIR/$DB-zstd | awk '{print $1}')
Expand Down
18 changes: 14 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ var (
map[string]string{
"check-mb4-value-in-utf8": "tidb_check_mb4_value_in_utf8",
"enable-collect-execution-info": "tidb_enable_collect_execution_info",
"plugin.load": "plugin_load",
"plugin.dir": "plugin_dir",
},
},
{
Expand All @@ -134,6 +132,13 @@ var (
"memory-usage-alarm-ratio": "tidb_memory_usage_alarm_ratio",
},
},
{
"plugin",
map[string]string{
"load": "plugin_load",
"dir": "plugin_dir",
},
},
}

// ConflictOptions indicates the conflict config options existing in both [instance] and other sections in config file.
Expand Down Expand Up @@ -761,6 +766,8 @@ var defaultConf = Config{
OOMUseTmpStorage: true,
TempStorageQuota: -1,
TempStoragePath: tempStorageDirName,
MemQuotaQuery: 1 << 30,
OOMAction: "cancel",
EnableBatchDML: false,
CheckMb4ValueInUTF8: *NewAtomicBool(true),
MaxIndexLength: 3072,
Expand Down Expand Up @@ -791,6 +798,7 @@ var defaultConf = Config{
EnableErrorStack: nbUnset, // If both options are nbUnset, getDisableErrorStack() returns true
EnableTimestamp: nbUnset,
DisableTimestamp: nbUnset, // If both options are nbUnset, getDisableTimestamp() returns false
QueryLogMaxLen: logutil.DefaultQueryLogMaxLen,
RecordPlanInSlowLog: logutil.DefaultRecordPlanInSlowLog,
EnableSlowLog: *NewAtomicBool(logutil.DefaultTiDBEnableSlowLog),
},
Expand Down Expand Up @@ -839,6 +847,7 @@ var defaultConf = Config{
TxnTotalSizeLimit: DefTxnTotalSizeLimit,
DistinctAggPushDown: false,
ProjectionPushDown: false,
CommitterConcurrency: defTiKVCfg.CommitterConcurrency,
MaxTxnTTL: defTiKVCfg.MaxTxnTTL, // 1hour
// TODO: set indexUsageSyncLease to 60s.
IndexUsageSyncLease: "0s",
Expand All @@ -848,14 +857,15 @@ var defaultConf = Config{
StatsLoadConcurrency: 5,
StatsLoadQueueSize: 1000,
EnableStatsCacheMemQuota: false,
RunAutoAnalyze: true,
},
ProxyProtocol: ProxyProtocol{
Networks: "",
HeaderTimeout: 5,
},
PreparedPlanCache: PreparedPlanCache{
Enabled: false,
Capacity: 1000,
Enabled: true,
Capacity: 100,
MemoryGuardRatio: 0.1,
},
OpenTracing: OpenTracing{
Expand Down
28 changes: 22 additions & 6 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,8 +496,6 @@ func TestConflictInstanceConfig(t *testing.T) {
var expectedNewName string
conf := new(Config)
configFile := "config.toml"
_, localFile, _, _ := runtime.Caller(0)
configFile = filepath.Join(filepath.Dir(localFile), configFile)

f, err := os.Create(configFile)
require.NoError(t, err)
Expand Down Expand Up @@ -543,26 +541,44 @@ func TestConflictInstanceConfig(t *testing.T) {
require.Equal(t, expectedNewName, newName)
}
}
}

err = f.Truncate(0)
require.NoError(t, err)
_, err = f.Seek(0, 0)
func TestDeprecatedConfig(t *testing.T) {
var expectedNewName string
conf := new(Config)
configFile := "config.toml"
_, localFile, _, _ := runtime.Caller(0)
configFile = filepath.Join(filepath.Dir(localFile), configFile)

f, err := os.Create(configFile)
require.NoError(t, err)
defer func(configFile string) {
require.NoError(t, os.Remove(configFile))
}(configFile)

// DeprecatedOptions indicates the options that should be moved to [instance] section.
// The value in conf.Instance.* would be overwritten by the other sections.
expectedDeprecatedOptions := map[string]InstanceConfigSection{
"": {
"", map[string]string{"enable-collect-execution-info": "tidb_enable_collect_execution_info"},
"", map[string]string{
"enable-collect-execution-info": "tidb_enable_collect_execution_info",
},
},
"log": {
"log", map[string]string{"slow-threshold": "tidb_slow_log_threshold"},
},
"performance": {
"performance", map[string]string{"memory-usage-alarm-ratio": "tidb_memory_usage_alarm_ratio"},
},
"plugin": {
"plugin", map[string]string{
"load": "plugin_load",
"dir": "plugin_dir",
},
},
}
_, err = f.WriteString("enable-collect-execution-info = false \n" +
"[plugin] \ndir=\"/plugin-path\" \nload=\"audit-1,whitelist-1\" \n" +
"[log] \nslow-threshold = 100 \n" +
"[performance] \nmemory-usage-alarm-ratio = 0.5")
require.NoError(t, err)
Expand Down
27 changes: 17 additions & 10 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ type backfillTaskContext struct {
type backfillWorker struct {
id int
ddlWorker *worker
reorgInfo *reorgInfo
batchCnt int
sessCtx sessionctx.Context
taskCh chan *reorgBackfillTask
Expand All @@ -155,11 +156,12 @@ type backfillWorker struct {
priority int
}

func newBackfillWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable) *backfillWorker {
func newBackfillWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable, reorgInfo *reorgInfo) *backfillWorker {
return &backfillWorker{
id: id,
table: t,
ddlWorker: worker,
reorgInfo: reorgInfo,
batchCnt: int(variable.GetDDLReorgBatchSize()),
sessCtx: sessCtx,
taskCh: make(chan *reorgBackfillTask, 1),
Expand Down Expand Up @@ -234,13 +236,14 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
lastLogCount := 0
lastLogTime := time.Now()
startTime := lastLogTime
rc := d.getReorgCtx(w.reorgInfo.Job)

for {
// Give job chance to be canceled, if we not check it here,
// if there is panic in bf.BackfillDataInTxn we will never cancel the job.
// Because reorgRecordTask may run a long time,
// we should check whether this ddl job is still runnable.
err := w.ddlWorker.isReorgRunnable(d)
err := w.ddlWorker.isReorgRunnable(w.reorgInfo.Job)
if err != nil {
result.err = err
return result
Expand All @@ -263,8 +266,8 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
// small ranges. This will cause the `redo` action in reorganization.
// So for added count and warnings collection, it is recommended to collect the statistics in every
// successfully committed small ranges rather than fetching it in the total result.
w.ddlWorker.reorgCtx.increaseRowCount(int64(taskCtx.addedCount))
w.ddlWorker.reorgCtx.mergeWarnings(taskCtx.warnings, taskCtx.warningsCount)
rc.increaseRowCount(int64(taskCtx.addedCount))
rc.mergeWarnings(taskCtx.warnings, taskCtx.warningsCount)

if num := result.scanCount - lastLogCount; num >= 30000 {
lastLogCount = result.scanCount
Expand Down Expand Up @@ -318,6 +321,10 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) {
topsql.MockHighCPULoad(job.Query, sqlPrefixes, 5)
})

failpoint.Inject("mockBackfillSlow", func() {
time.Sleep(30 * time.Millisecond)
})

// Dynamic change batch size.
w.batchCnt = int(variable.GetDDLReorgBatchSize())
result := w.handleBackfillTask(d, task, bf)
Expand Down Expand Up @@ -399,7 +406,7 @@ func (w *worker) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64,
nextKey, taskAddedCount, err := w.waitTaskResults(workers, taskCnt, totalAddedCount, startKey)
elapsedTime := time.Since(startTime)
if err == nil {
err = w.isReorgRunnable(reorgInfo.d)
err = w.isReorgRunnable(reorgInfo.Job)
}

if err != nil {
Expand All @@ -420,7 +427,7 @@ func (w *worker) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64,
}

// nextHandle will be updated periodically in runReorgJob, so no need to update it here.
w.reorgCtx.setNextKey(nextKey)
w.getReorgCtx(reorgInfo.Job).setNextKey(nextKey)
metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblOK).Observe(elapsedTime.Seconds())
logutil.BgLogger().Info("[ddl] backfill workers successfully processed batch",
zap.ByteString("elementType", reorgInfo.currElement.TypeKey),
Expand Down Expand Up @@ -583,7 +590,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
return errors.Trace(err)
}

if err := w.isReorgRunnable(reorgInfo.d); err != nil {
if err := w.isReorgRunnable(reorgInfo.Job); err != nil {
return errors.Trace(err)
}
if startKey == nil && endKey == nil {
Expand Down Expand Up @@ -642,19 +649,19 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba

switch bfWorkerType {
case typeAddIndexWorker:
idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap, reorgInfo.ReorgMeta.SQLMode)
idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap, reorgInfo)
idxWorker.priority = job.Priority
backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker)
go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker, job)
case typeUpdateColumnWorker:
// Setting InCreateOrAlterStmt tells the difference between SELECT casting and ALTER COLUMN casting.
sessCtx.GetSessionVars().StmtCtx.InCreateOrAlterStmt = true
updateWorker := newUpdateColumnWorker(sessCtx, w, i, t, oldColInfo, colInfo, decodeColMap, reorgInfo.ReorgMeta.SQLMode)
updateWorker := newUpdateColumnWorker(sessCtx, w, i, t, oldColInfo, colInfo, decodeColMap, reorgInfo)
updateWorker.priority = job.Priority
backfillWorkers = append(backfillWorkers, updateWorker.backfillWorker)
go updateWorker.backfillWorker.run(reorgInfo.d, updateWorker, job)
case typeCleanUpIndexWorker:
idxWorker := newCleanUpIndexWorker(sessCtx, w, i, t, decodeColMap, reorgInfo.ReorgMeta.SQLMode)
idxWorker := newCleanUpIndexWorker(sessCtx, w, i, t, decodeColMap, reorgInfo)
idxWorker.priority = job.Priority
backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker)
go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker, job)
Expand Down
17 changes: 13 additions & 4 deletions ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
atomicutil "go.uber.org/atomic"
)

type testCancelJob struct {
Expand Down Expand Up @@ -227,17 +229,24 @@ func TestCancel(t *testing.T) {
}

// Change some configurations.
ddl.ReorgWaitTimeout = 10 * time.Microsecond
ddl.ReorgWaitTimeout = 10 * time.Millisecond
tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 8")
tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 1")
tk = testkit.NewTestKit(t, store)
tk.MustExec("use test")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockBackfillSlow", "return"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockBackfillSlow"))
}()

hook := &ddl.TestDDLCallback{Do: dom}
i := 0
i := atomicutil.NewInt64(0)
cancel := false
cancelResult := false
cancelWhenReorgNotStart := false

hookFunc := func(job *model.Job) {
if job.SchemaState == allTestCase[i].cancelState && !cancel {
if job.SchemaState == allTestCase[i.Load()].cancelState && !cancel {
if !cancelWhenReorgNotStart && job.SchemaState == model.StateWriteReorganization && job.MayNeedReorg() && job.RowCount == 0 {
return
}
Expand All @@ -261,7 +270,7 @@ func TestCancel(t *testing.T) {
}

for j, tc := range allTestCase {
i = j
i.Store(int64(j))
msg := fmt.Sprintf("sql: %s, state: %s", tc.sql, tc.cancelState)
if tc.onJobBefore {
restHook(hook)
Expand Down
Loading

0 comments on commit 96441d7

Please sign in to comment.