Skip to content

Commit

Permalink
Merge branch 'master' into move-inside-ddl-2
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Jun 27, 2022
2 parents a46f0e3 + ab27d49 commit ba70a3d
Show file tree
Hide file tree
Showing 24 changed files with 913 additions and 330 deletions.
4 changes: 2 additions & 2 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ var (

// DDL is responsible for updating schema in data store and maintaining in-memory InfoSchema cache.
type DDL interface {
CreateSchema(ctx sessionctx.Context, schema model.CIStr, charsetInfo *ast.CharsetOpt, placementPolicyRef *model.PolicyRefInfo) error
CreateSchema(ctx sessionctx.Context, stmt *ast.CreateDatabaseStmt) error
AlterSchema(sctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) error
DropSchema(ctx sessionctx.Context, schema model.CIStr) error
DropSchema(ctx sessionctx.Context, stmt *ast.DropDatabaseStmt) error
CreateTable(ctx sessionctx.Context, stmt *ast.CreateTableStmt) error
CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) error
DropTable(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error)
Expand Down
98 changes: 84 additions & 14 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,76 @@ const (
tiflashCheckPendingTablesRetry = 7
)

func (d *ddl) CreateSchema(ctx sessionctx.Context, schema model.CIStr, charsetInfo *ast.CharsetOpt, placementPolicyRef *model.PolicyRefInfo) (err error) {
dbInfo := &model.DBInfo{Name: schema}
if charsetInfo != nil {
chs, coll, err := ResolveCharsetCollation(ast.CharsetOpt{Chs: charsetInfo.Chs, Col: charsetInfo.Col})
func (d *ddl) CreateSchema(ctx sessionctx.Context, stmt *ast.CreateDatabaseStmt) (err error) {
var placementPolicyRef *model.PolicyRefInfo
sessionVars := ctx.GetSessionVars()

// If no charset and/or collation is specified use collation_server and character_set_server
charsetOpt := &ast.CharsetOpt{}
if sessionVars.GlobalVarsAccessor != nil {
charsetOpt.Col, err = variable.GetSessionOrGlobalSystemVar(sessionVars, variable.CollationServer)
if err != nil {
return errors.Trace(err)
return err
}
charsetOpt.Chs, err = variable.GetSessionOrGlobalSystemVar(sessionVars, variable.CharacterSetServer)
if err != nil {
return err
}
}

explicitCharset := false
explicitCollation := false
if len(stmt.Options) != 0 {
for _, val := range stmt.Options {
switch val.Tp {
case ast.DatabaseOptionCharset:
charsetOpt.Chs = val.Value
explicitCharset = true
case ast.DatabaseOptionCollate:
charsetOpt.Col = val.Value
explicitCollation = true
case ast.DatabaseOptionPlacementPolicy:
placementPolicyRef = &model.PolicyRefInfo{
Name: model.NewCIStr(val.Value),
}
}
}
dbInfo.Charset = chs
dbInfo.Collate = coll
} else {
dbInfo.Charset, dbInfo.Collate = charset.GetDefaultCharsetAndCollate()
}

if charsetOpt.Col != "" {
coll, err := collate.GetCollationByName(charsetOpt.Col)
if err != nil {
return err
}

// The collation is not valid for the specified character set.
// Try to remove any of them, but not if they are explicitly defined.
if coll.CharsetName != charsetOpt.Chs {
if explicitCollation && !explicitCharset {
// Use the explicitly set collation, not the implicit charset.
charsetOpt.Chs = ""
}
if !explicitCollation && explicitCharset {
// Use the explicitly set charset, not the (session) collation.
charsetOpt.Col = ""
}
}

}
dbInfo := &model.DBInfo{Name: stmt.Name}
chs, coll, err := ResolveCharsetCollation(ast.CharsetOpt{Chs: charsetOpt.Chs, Col: charsetOpt.Col})
if err != nil {
return errors.Trace(err)
}
dbInfo.Charset = chs
dbInfo.Collate = coll
dbInfo.PlacementPolicyRef = placementPolicyRef
return d.CreateSchemaWithInfo(ctx, dbInfo, OnExistError)

onExist := OnExistError
if stmt.IfNotExists {
onExist = OnExistIgnore
}
return d.CreateSchemaWithInfo(ctx, dbInfo, onExist)
}

func (d *ddl) CreateSchemaWithInfo(
Expand Down Expand Up @@ -147,6 +202,12 @@ func (d *ddl) CreateSchemaWithInfo(

err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)

if infoschema.ErrDatabaseExists.Equal(err) && onExist == OnExistIgnore {
ctx.GetSessionVars().StmtCtx.AppendNote(err)
return nil
}

return errors.Trace(err)
}

Expand Down Expand Up @@ -520,11 +581,14 @@ func (d *ddl) AlterSchema(sctx sessionctx.Context, stmt *ast.AlterDatabaseStmt)
return nil
}

func (d *ddl) DropSchema(ctx sessionctx.Context, schema model.CIStr) (err error) {
func (d *ddl) DropSchema(ctx sessionctx.Context, stmt *ast.DropDatabaseStmt) (err error) {
is := d.GetInfoSchemaWithInterceptor(ctx)
old, ok := is.SchemaByName(schema)
old, ok := is.SchemaByName(stmt.Name)
if !ok {
return errors.Trace(infoschema.ErrDatabaseNotExists)
if stmt.IfExists {
return nil
}
return infoschema.ErrDatabaseDropExists.GenWithStackByArgs(stmt.Name)
}
job := &model.Job{
SchemaID: old.ID,
Expand All @@ -537,13 +601,19 @@ func (d *ddl) DropSchema(ctx sessionctx.Context, schema model.CIStr) (err error)
err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
if err != nil {
if infoschema.ErrDatabaseNotExists.Equal(err) {
if stmt.IfExists {
return nil
}
return infoschema.ErrDatabaseDropExists.GenWithStackByArgs(stmt.Name)
}
return errors.Trace(err)
}
if !config.TableLockEnabled() {
return nil
}
// Clear table locks hold by the session.
tbs := is.SchemaTables(schema)
tbs := is.SchemaTables(stmt.Name)
lockTableIDs := make([]int64, 0)
for _, tb := range tbs {
if ok, _ := ctx.CheckTableLocked(tb.Meta().ID); ok {
Expand Down
19 changes: 14 additions & 5 deletions domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,22 @@ func TestInfo(t *testing.T) {
}
require.True(t, syncerStarted)

// Make sure loading schema is normal.
cs := &ast.CharsetOpt{
Chs: "utf8",
Col: "utf8_bin",
stmt := &ast.CreateDatabaseStmt{
Name: model.NewCIStr("aaa"),
// Make sure loading schema is normal.
Options: []*ast.DatabaseOption{
{
Tp: ast.DatabaseOptionCharset,
Value: "utf8",
},
{
Tp: ast.DatabaseOptionCollate,
Value: "utf8_bin",
},
},
}
ctx := mock.NewContext()
require.NoError(t, dom.ddl.CreateSchema(ctx, model.NewCIStr("aaa"), cs, nil))
require.NoError(t, dom.ddl.CreateSchema(ctx, stmt))
require.NoError(t, dom.Reload())
require.Equal(t, int64(1), dom.InfoSchema().SchemaMetaVersion())

Expand Down
58 changes: 22 additions & 36 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func IsFastPlan(p plannercore.Plan) bool {
}

// Exec builds an Executor from a plan. If the Executor doesn't return result,
// like the INSERT, UPDATE statements, it executes in this function, if the Executor returns
// like the INSERT, UPDATE statements, it executes in this function. If the Executor returns
// result, execution is done after this function returns, in the returned sqlexec.RecordSet Next method.
func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
defer func() {
Expand Down Expand Up @@ -708,7 +708,10 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
keys = filterTemporaryTableKeys(sctx.GetSessionVars(), keys)
seVars := sctx.GetSessionVars()
keys = filterLockTableKeys(seVars.StmtCtx, keys)
lockCtx := newLockCtx(seVars, seVars.LockWaitTimeout, len(keys))
lockCtx, err := newLockCtx(sctx, seVars.LockWaitTimeout, len(keys))
if err != nil {
return err
}
var lockKeyStats *util.LockKeysDetails
ctx = context.WithValue(ctx, util.LockKeysDetailCtxKey, &lockKeyStats)
startLocking := time.Now()
Expand All @@ -730,51 +733,27 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
}
}

// UpdateForUpdateTS updates the ForUpdateTS, if newForUpdateTS is 0, it obtain a new TS from PD.
func UpdateForUpdateTS(seCtx sessionctx.Context, newForUpdateTS uint64) error {
txn, err := seCtx.Txn(false)
if err != nil {
return err
}
if !txn.Valid() {
return errors.Trace(kv.ErrInvalidTxn)
}

// The Oracle serializable isolation is actually SI in pessimistic mode.
// Do not update ForUpdateTS when the user is using the Serializable isolation level.
// It can be used temporarily on the few occasions when an Oracle-like isolation level is needed.
// Support for this does not mean that TiDB supports serializable isolation of MySQL.
// tidb_skip_isolation_level_check should still be disabled by default.
if seCtx.GetSessionVars().IsIsolation(ast.Serializable) {
return nil
}
if newForUpdateTS == 0 {
// Because the ForUpdateTS is used for the snapshot for reading data in DML.
// We can avoid allocating a global TSO here to speed it up by using the local TSO.
version, err := seCtx.GetStore().CurrentVersion(seCtx.GetSessionVars().TxnCtx.TxnScope)
if err != nil {
return err
}
newForUpdateTS = version.Ver
}
seCtx.GetSessionVars().TxnCtx.SetForUpdateTS(newForUpdateTS)
txn.SetOption(kv.SnapshotTS, seCtx.GetSessionVars().TxnCtx.GetForUpdateTS())
return nil
}

// handlePessimisticLockError updates TS and rebuild executor if the err is write conflict.
func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error) (_ Executor, err error) {
if lockErr == nil {
return nil, nil
}
failpoint.Inject("assertPessimisticLockErr", func() {
if terror.ErrorEqual(kv.ErrWriteConflict, lockErr) {
sessiontxn.AddAssertEntranceForLockError(a.Ctx, "errWriteConflict")
} else if terror.ErrorEqual(kv.ErrKeyExists, lockErr) {
sessiontxn.AddAssertEntranceForLockError(a.Ctx, "errDuplicateKey")
}
})

defer func() {
if _, ok := errors.Cause(err).(*tikverr.ErrDeadlock); ok {
err = ErrDeadlock
}
}()

action, err := sessiontxn.GetTxnManager(a.Ctx).OnStmtErrorForNextAction(sessiontxn.StmtErrAfterPessimisticLock, lockErr)
txnManager := sessiontxn.GetTxnManager(a.Ctx)
action, err := txnManager.OnStmtErrorForNextAction(sessiontxn.StmtErrAfterPessimisticLock, lockErr)
if err != nil {
return nil, err
}
Expand All @@ -789,10 +768,17 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error
a.retryCount++
a.retryStartTime = time.Now()

err = sessiontxn.GetTxnManager(a.Ctx).OnStmtRetry(ctx)
err = txnManager.OnStmtRetry(ctx)
if err != nil {
return nil, err
}

// Without this line of code, the result will still be correct. But it can ensure that the update time of for update read
// is determined which is beneficial for testing.
if _, err = txnManager.GetStmtForUpdateTS(); err != nil {
return nil, err
}

breakpoint.Inject(a.Ctx, sessiontxn.BreakPointOnStmtRetryAfterLockError)

e, err := a.buildExecutor()
Expand Down
20 changes: 9 additions & 11 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ type BatchPointGetExec struct {
singlePart bool
partTblID int64
idxVals [][]types.Datum
startTS uint64
readReplicaScope string
isStaleness bool
snapshotTS uint64
Expand Down Expand Up @@ -97,22 +96,18 @@ func (e *BatchPointGetExec) buildVirtualColumnInfo() {

// Open implements the Executor interface.
func (e *BatchPointGetExec) Open(context.Context) error {
e.snapshotTS = e.startTS
sessVars := e.ctx.GetSessionVars()
txnCtx := sessVars.TxnCtx
stmtCtx := sessVars.StmtCtx
if e.lock {
e.snapshotTS = txnCtx.GetForUpdateTS()
}
txn, err := e.ctx.Txn(false)
if err != nil {
return err
}
e.txn = txn
var snapshot kv.Snapshot
if txn.Valid() && txnCtx.StartTS == txnCtx.GetForUpdateTS() && txnCtx.StartTS == e.snapshotTS {
// We can safely reuse the transaction snapshot if startTS is equal to forUpdateTS.
// The snapshot may contains cache that can reduce RPC call.
// We can safely reuse the transaction snapshot if snapshotTS is equal to forUpdateTS.
// The snapshot may contain cache that can reduce RPC call.
snapshot = txn.GetSnapshot()
} else {
snapshot = e.ctx.GetSnapshotWithTS(e.snapshotTS)
Expand Down Expand Up @@ -540,13 +535,16 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
}

// LockKeys locks the keys for pessimistic transaction.
func LockKeys(ctx context.Context, seCtx sessionctx.Context, lockWaitTime int64, keys ...kv.Key) error {
txnCtx := seCtx.GetSessionVars().TxnCtx
lctx := newLockCtx(seCtx.GetSessionVars(), lockWaitTime, len(keys))
func LockKeys(ctx context.Context, sctx sessionctx.Context, lockWaitTime int64, keys ...kv.Key) error {
txnCtx := sctx.GetSessionVars().TxnCtx
lctx, err := newLockCtx(sctx, lockWaitTime, len(keys))
if err != nil {
return err
}
if txnCtx.IsPessimistic {
lctx.InitReturnValues(len(keys))
}
err := doLockKeys(ctx, seCtx, lctx, keys...)
err = doLockKeys(ctx, sctx, lctx, keys...)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit ba70a3d

Please sign in to comment.