Skip to content

Commit

Permalink
Merge branch 'master' into bugfix/window-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
LittleFall authored Jul 8, 2022
2 parents 006815e + eb3de65 commit ea05e16
Show file tree
Hide file tree
Showing 34 changed files with 573 additions and 295 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/license-checker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
steps:
- uses: actions/checkout@v2
- name: Check License Header
uses: apache/skywalking-eyes@main
uses: apache/skywalking-eyes@v0.3.0
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ bazel_coverage_test: failpoint-enable bazel_ci_prepare

bazel_build: bazel_ci_prepare
mkdir -p bin
bazel --output_user_root=/home/jenkins/.tidb/tmp build -k --config=ci //tidb-server/... //br/cmd/... //cmd/... --//build:with_nogo_flag=true
bazel --output_user_root=/home/jenkins/.tidb/tmp build -k --config=ci //tidb-server/... //br/cmd/... //cmd/... //util/... //dumpling/cmd/... //tidb-binlog/... --//build:with_nogo_flag=true
cp bazel-out/k8-fastbuild/bin/tidb-server/tidb-server_/tidb-server ./bin
cp bazel-out/k8-fastbuild/bin/cmd/importer/importer_/importer ./bin
cp bazel-out/k8-fastbuild/bin/tidb-server/tidb-server-check_/tidb-server-check ./bin
Expand Down
19 changes: 12 additions & 7 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,9 +533,11 @@ func (bc *Client) BackupRanges(
metaWriter *metautil.MetaWriter,
progressCallBack func(ProgressUnit),
) error {
log.Info("Backup Ranges Started", rtree.ZapRanges(ranges))
init := time.Now()

defer func() {
log.Info("Backup Ranges", zap.Duration("take", time.Since(init)))
log.Info("Backup Ranges Completed", zap.Duration("take", time.Since(init)))
}()

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
Expand Down Expand Up @@ -580,13 +582,15 @@ func (bc *Client) BackupRange(
start := time.Now()
defer func() {
elapsed := time.Since(start)
logutil.CL(ctx).Info("backup range finished", zap.Duration("take", elapsed))
logutil.CL(ctx).Info("backup range completed",
logutil.Key("startKey", req.StartKey), logutil.Key("endKey", req.EndKey),
zap.Duration("take", elapsed))
key := "range start:" + hex.EncodeToString(req.StartKey) + " end:" + hex.EncodeToString(req.EndKey)
if err != nil {
summary.CollectFailureUnit(key, err)
}
}()
logutil.CL(ctx).Info("backup started",
logutil.CL(ctx).Info("backup range started",
logutil.Key("startKey", req.StartKey), logutil.Key("endKey", req.EndKey),
zap.Uint64("rateLimit", req.RateLimit),
zap.Uint32("concurrency", req.Concurrency))
Expand All @@ -597,12 +601,13 @@ func (bc *Client) BackupRange(
return errors.Trace(err)
}

logutil.CL(ctx).Info("backup push down started")
push := newPushDown(bc.mgr, len(allStores))
results, err := push.pushBackup(ctx, req, allStores, progressCallBack)
if err != nil {
return errors.Trace(err)
}
logutil.CL(ctx).Info("finish backup push down", zap.Int("small-range-count", results.Len()))
logutil.CL(ctx).Info("backup push down completed", zap.Int("small-range-count", results.Len()))

// Find and backup remaining ranges.
// TODO: test fine grained backup.
Expand All @@ -619,9 +624,9 @@ func (bc *Client) BackupRange(
logutil.Key("endKey", req.EndKey),
zap.String("cf", req.Cf))
} else {
logutil.CL(ctx).Info("time range backed up",
zap.Reflect("StartVersion", req.StartVersion),
zap.Reflect("EndVersion", req.EndVersion))
logutil.CL(ctx).Info("transactional range backup completed",
zap.Reflect("StartTS", req.StartVersion),
zap.Reflect("EndTS", req.EndVersion))
}

var ascendErr error
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ func (ss *Schemas) BackupSchemas(
)

if !skipChecksum {
logger.Info("table checksum start")
logger.Info("Calculate table checksum start")
start := time.Now()
err := schema.calculateChecksum(ectx, store.GetClient(), backupTS, copConcurrency)
if err != nil {
return errors.Trace(err)
}
logger.Info("table checksum finished",
logger.Info("Calculate table checksum completed",
zap.Uint64("Crc64Xor", schema.crc64xor),
zap.Uint64("TotalKvs", schema.totalKvs),
zap.Uint64("TotalBytes", schema.totalBytes),
Expand Down Expand Up @@ -143,7 +143,7 @@ func (ss *Schemas) BackupSchemas(
if err := errg.Wait(); err != nil {
return errors.Trace(err)
}
log.Info("backup checksum", zap.Duration("take", time.Since(startAll)))
log.Info("Backup calculated table checksum into metas", zap.Duration("take", time.Since(startAll)))
summary.CollectDuration("backup checksum", time.Since(startAll))
return metaWriter.FinishWriteMetas(ctx, op)
}
Expand Down
20 changes: 2 additions & 18 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3078,7 +3078,8 @@ func checkMultiSpecs(sctx sessionctx.Context, specs []*ast.AlterTableSpec) error
func allSupported(specs []*ast.AlterTableSpec) bool {
for _, s := range specs {
switch s.Tp {
case ast.AlterTableAddColumns, ast.AlterTableDropColumn, ast.AlterTableDropIndex, ast.AlterTableDropPrimaryKey:
case ast.AlterTableAddColumns, ast.AlterTableDropColumn, ast.AlterTableDropIndex, ast.AlterTableDropPrimaryKey,
ast.AlterTableAddConstraint:
default:
return false
}
Expand Down Expand Up @@ -3115,23 +3116,6 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast
return err
}

if len(validSpecs) > 1 {
useMultiSchemaChange := false
switch validSpecs[0].Tp {
case ast.AlterTableAddColumns, ast.AlterTableDropColumn,
ast.AlterTableDropPrimaryKey, ast.AlterTableDropIndex:
useMultiSchemaChange = true
default:
return dbterror.ErrRunMultiSchemaChanges
}
if err != nil {
return errors.Trace(err)
}
if !useMultiSchemaChange {
return nil
}
}

if len(validSpecs) > 1 {
sctx.GetSessionVars().StmtCtx.MultiSchemaInfo = model.NewMultiSchemaInfo()
}
Expand Down
18 changes: 13 additions & 5 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,17 +414,25 @@ func (w *worker) updateDDLJob(t *meta.Meta, job *model.Job, meetErr bool) error
failpoint.Return(kv.ErrEntryTooLarge)
}
})
updateRawArgs := true
// If there is an error when running job and the RawArgs hasn't been decoded by DecodeArgs,
// so we shouldn't replace RawArgs with the marshaling Args.
if meetErr && (job.RawArgs != nil && job.Args == nil) {
updateRawArgs := needUpdateRawArgs(job, meetErr)
if !updateRawArgs {
logutil.Logger(w.logCtx).Info("[ddl] meet something wrong before update DDL job, shouldn't update raw args",
zap.String("job", job.String()))
updateRawArgs = false
}
return errors.Trace(t.UpdateDDLJob(0, job, updateRawArgs))
}

func needUpdateRawArgs(job *model.Job, meetErr bool) bool {
// If there is an error when running job and the RawArgs hasn't been decoded by DecodeArgs,
// we shouldn't replace RawArgs with the marshaling Args.
if meetErr && job.RawArgs != nil && job.Args == nil {
// However, for multi-schema change, the args of the parent job is always nil.
// Since Job.Encode() can handle the sub-jobs properly, we can safely update the raw args.
return job.MultiSchemaInfo != nil
}
return true
}

func (w *worker) deleteRange(ctx context.Context, job *model.Job) error {
var err error
if job.Version <= currentVersion {
Expand Down
55 changes: 44 additions & 11 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,10 +396,31 @@ func checkPrimaryKeyNotNull(d *ddlCtx, w *worker, sqlMode mysql.SQLMode, t *meta
return nil, err
}

func updateHiddenColumns(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, state model.SchemaState) {
// moveAndUpdateHiddenColumnsToPublic updates the hidden columns to public, and
// moves the hidden columns to proper offsets, so that Table.Columns' states meet the assumption of
// [public, public, ..., public, non-public, non-public, ..., non-public].
func moveAndUpdateHiddenColumnsToPublic(tblInfo *model.TableInfo, idxInfo *model.IndexInfo) {
hiddenColOffset := make(map[int]struct{}, 0)
for _, col := range idxInfo.Columns {
if tblInfo.Columns[col.Offset].Hidden {
tblInfo.Columns[col.Offset].State = state
hiddenColOffset[col.Offset] = struct{}{}
}
}
if len(hiddenColOffset) == 0 {
return
}
// Find the first non-public column.
firstNonPublicPos := len(tblInfo.Columns) - 1
for i, c := range tblInfo.Columns {
if c.State != model.StatePublic {
firstNonPublicPos = i
break
}
}
for _, col := range idxInfo.Columns {
tblInfo.Columns[col.Offset].State = model.StatePublic
if _, needMove := hiddenColOffset[col.Offset]; needMove {
tblInfo.MoveColumnInfo(col.Offset, firstNonPublicPos)
}
}
}
Expand Down Expand Up @@ -469,13 +490,8 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo

if indexInfo == nil {
if len(hiddenCols) > 0 {
pos := &ast.ColumnPosition{Tp: ast.ColumnPositionNone}
for _, hiddenCol := range hiddenCols {
_, _, _, err = createColumnInfoWithPosCheck(tblInfo, hiddenCol, pos)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
initAndAddColumnToTable(tblInfo, hiddenCol)
}
}
if err = checkAddColumnTooManyColumns(len(tblInfo.Columns)); err != nil {
Expand Down Expand Up @@ -532,7 +548,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
case model.StateNone:
// none -> delete only
indexInfo.State = model.StateDeleteOnly
updateHiddenColumns(tblInfo, indexInfo, model.StatePublic)
moveAndUpdateHiddenColumnsToPublic(tblInfo, indexInfo)
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != indexInfo.State)
if err != nil {
return ver, err
Expand Down Expand Up @@ -573,19 +589,23 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
}

var done bool
done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo)
if job.MultiSchemaInfo != nil {
done, ver, err = doReorgWorkForCreateIndexMultiSchema(w, d, t, job, tbl, indexInfo)
} else {
done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo)
}
if !done {
return ver, err
}

indexInfo.State = model.StatePublic
// Set column index flag.
addIndexColumnFlag(tblInfo, indexInfo)
if isPK {
if err = updateColsNull2NotNull(tblInfo, indexInfo); err != nil {
return ver, errors.Trace(err)
}
}
indexInfo.State = model.StatePublic
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != indexInfo.State)
if err != nil {
return ver, errors.Trace(err)
Expand All @@ -599,6 +619,19 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
return ver, errors.Trace(err)
}

func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) {
if job.MultiSchemaInfo.Revertible {
done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo)
if done {
job.MarkNonRevertible()
}
// We need another round to wait for all the others sub-jobs to finish.
return false, ver, err
}
return true, ver, err
}

func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) {
elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}}
Expand Down
36 changes: 35 additions & 1 deletion ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
}
proxyJob := sub.ToProxyJob(job)
ver, err = w.runDDLJob(d, t, &proxyJob)
err = handleRollbackException(err, proxyJob.Error)
if err != nil {
return ver, err
}
sub.FromProxyJob(&proxyJob)
return ver, err
return ver, nil
}
// The last rollback/cancelling sub-job is done.
job.State = model.JobStateRollbackDone
Expand Down Expand Up @@ -154,6 +158,22 @@ func handleRevertibleException(job *model.Job, subJob *model.SubJob, err *terror
}
}

func handleRollbackException(runJobErr error, proxyJobErr *terror.Error) error {
if runJobErr != nil {
// The physical errors are not recoverable during rolling back.
// We keep retrying it.
return runJobErr
}
if proxyJobErr != nil {
if proxyJobErr.Equal(dbterror.ErrCancelledDDLJob) {
// A cancelled DDL error is normal during rolling back.
return nil
}
return proxyJobErr
}
return nil
}

func appendToSubJobs(m *model.MultiSchemaInfo, job *model.Job) error {
err := fillMultiSchemaInfo(m, job)
if err != nil {
Expand Down Expand Up @@ -189,6 +209,20 @@ func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *model.Job) (err error
case model.ActionDropIndex, model.ActionDropPrimaryKey:
indexName := job.Args[0].(model.CIStr)
info.DropIndexes = append(info.DropIndexes, indexName)
case model.ActionAddIndex, model.ActionAddPrimaryKey:
indexName := job.Args[1].(model.CIStr)
indexPartSpecifications := job.Args[2].([]*ast.IndexPartSpecification)
info.AddIndexes = append(info.AddIndexes, indexName)
for _, indexPartSpecification := range indexPartSpecifications {
info.RelativeColumns = append(info.RelativeColumns, indexPartSpecification.Column.Name)
}
if hiddenCols, ok := job.Args[4].([]*model.ColumnInfo); ok {
for _, c := range hiddenCols {
for depColName := range c.Dependences {
info.RelativeColumns = append(info.RelativeColumns, model.NewCIStr(depColName))
}
}
}
default:
return dbterror.ErrRunMultiSchemaChanges
}
Expand Down
Loading

0 comments on commit ea05e16

Please sign in to comment.