Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support pessimistic transaction (experimental feature) #10297

Merged
merged 13 commits into from
May 11, 2019
19 changes: 19 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type Config struct {
Binlog Binlog `toml:"binlog" json:"binlog"`
CompatibleKillQuery bool `toml:"compatible-kill-query" json:"compatible-kill-query"`
Plugin Plugin `toml:"plugin" json:"plugin"`
PessimisticTxn PessimisticTxn `toml:"pessimistic-txn" json:"pessimistic_txn"`
CheckMb4ValueInUTF8 bool `toml:"check-mb4-value-in-utf8" json:"check-mb4-value-in-utf8"`
// TreatOldVersionUTF8AsUTF8MB4 is use to treat old version table/column UTF8 charset as UTF8MB4. This is for compatibility.
// Currently not support dynamic modify, because this need to reload all old version schema.
Expand Down Expand Up @@ -291,6 +292,18 @@ type Plugin struct {
Load string `toml:"load" json:"load"`
}

// PessimisticTxn is the config for pessimistic transaction.
type PessimisticTxn struct {
// Enable must be true for 'begin lock' or session variable to start a pessimistic transaction.
Enable bool `toml:"enable" json:"enable"`
// Starts a pessimistic transaction by default when Enable is true.
Default bool `toml:"default" json:"default"`
// The max count of retry for a single statement in a pessimistic transaction.
MaxRetryCount uint `toml:"max-retry-count" json:"max-retry-count"`
// The pessimistic lock ttl in milliseconds.
TTL uint64 `toml:"ttl" json:"ttl"`
}

var defaultConf = Config{
Host: "0.0.0.0",
AdvertiseAddress: "",
Expand Down Expand Up @@ -374,6 +387,12 @@ var defaultConf = Config{
WriteTimeout: "15s",
Strategy: "range",
},
PessimisticTxn: PessimisticTxn{
Enable: false,
Default: false,
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
MaxRetryCount: 256,
TTL: 60 * 1000,
},
}

var (
Expand Down
15 changes: 14 additions & 1 deletion config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -282,4 +282,17 @@ ignore-error = false
binlog-socket = ""

# the strategy for sending binlog to pump, value can be "range" or "hash" now.
strategy = "range"
strategy = "range"

[pessimistic-txn]
# enable pessimistic transaction.
enable = false

# start pessimistic transaction by default.
default = false

# max retry count for a statement in a pessimistic transaction.
max-retry-count = 256

# default TTL in milliseconds for pessimistic lock.
ttl = 60000
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (taskCtx

// Lock the row key to notify us that someone delete or update the row,
// then we should not backfill the index of it, otherwise the adding index is redundant.
err := txn.LockKeys(idxRecord.key)
err := txn.LockKeys(context.Background(), 0, idxRecord.key)
if err != nil {
return errors.Trace(err)
}
Expand Down
213 changes: 202 additions & 11 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"math"
"strconv"
"strings"
"sync/atomic"
"time"
Expand All @@ -39,6 +40,7 @@ import (
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
Expand Down Expand Up @@ -151,8 +153,10 @@ type ExecStmt struct {

Ctx sessionctx.Context
// StartTime stands for the starting time when executing the statement.
StartTime time.Time
isPreparedStmt bool
StartTime time.Time
isPreparedStmt bool
isSelectForUpdate bool
retryCount uint
}

// OriginText returns original statement as a string.
Expand Down Expand Up @@ -219,7 +223,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
}()
}

e, err := a.buildExecutor(sctx)
e, err := a.buildExecutor()
if err != nil {
return nil, err
}
Expand All @@ -246,19 +250,29 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
a.Ctx.GetSessionVars().StmtCtx.StmtType = GetStmtLabel(a.StmtNode)
}

isPessimistic := sctx.GetSessionVars().TxnCtx.IsPessimistic

// Special handle for "select for update statement" in pessimistic transaction.
if isPessimistic && a.isSelectForUpdate {
return a.handlePessimisticSelectForUpdate(ctx, e)
}

// If the executor doesn't return any result to the client, we execute it without delay.
if e.Schema().Len() == 0 {
return a.handleNoDelayExecutor(ctx, sctx, e)
if isPessimistic {
return nil, a.handlePessimisticDML(ctx, e)
}
return a.handleNoDelayExecutor(ctx, e)
} else if proj, ok := e.(*ProjectionExec); ok && proj.calculateNoDelay {
// Currently this is only for the "DO" statement. Take "DO 1, @a=2;" as an example:
// the Projection has two expressions and two columns in the schema, but we should
// not return the result of the two expressions.
return a.handleNoDelayExecutor(ctx, sctx, e)
return a.handleNoDelayExecutor(ctx, e)
}

var txnStartTS uint64
txn, err1 := sctx.Txn(false)
if err1 != nil {
txn, err := sctx.Txn(false)
if err != nil {
return nil, err
}
if txn.Valid() {
Expand All @@ -271,7 +285,81 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
}, nil
}

func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e Executor) (sqlexec.RecordSet, error) {
type chunkRowRecordSet struct {
rows []chunk.Row
idx int
fields []*ast.ResultField
e Executor
}

func (c *chunkRowRecordSet) Fields() []*ast.ResultField {
return c.fields
}

func (c *chunkRowRecordSet) Next(ctx context.Context, req *chunk.RecordBatch) error {
chk := req.Chunk
chk.Reset()
for !chk.IsFull() && c.idx < len(c.rows) {
chk.AppendRow(c.rows[c.idx])
c.idx++
}
return nil
}

func (c *chunkRowRecordSet) NewRecordBatch() *chunk.RecordBatch {
return chunk.NewRecordBatch(c.e.newFirstChunk())
}

func (c *chunkRowRecordSet) Close() error {
return nil
}

func (a *ExecStmt) handlePessimisticSelectForUpdate(ctx context.Context, e Executor) (sqlexec.RecordSet, error) {
for {
rs, err := a.runPessimisticSelectForUpdate(ctx, e)
e, err = a.handlePessimisticLockError(ctx, err)
if err != nil {
return nil, err
}
if e == nil {
return rs, nil
}
}
}

func (a *ExecStmt) runPessimisticSelectForUpdate(ctx context.Context, e Executor) (sqlexec.RecordSet, error) {
rs := &recordSet{
executor: e,
stmt: a,
}
defer func() {
terror.Log(rs.Close())
}()

var rows []chunk.Row
var err error
fields := rs.Fields()
req := rs.NewRecordBatch()
for {
err = rs.Next(ctx, req)
if err != nil {
// Handle 'write conflict' error.
break
}
if req.NumRows() == 0 {
return &chunkRowRecordSet{rows: rows, fields: fields, e: e}, nil
lysu marked this conversation as resolved.
Show resolved Hide resolved
}
iter := chunk.NewIterator4Chunk(req.Chunk)
for r := iter.Begin(); r != iter.End(); r = iter.Next() {
rows = append(rows, r)
}
req.Chunk = chunk.Renew(req.Chunk, a.Ctx.GetSessionVars().MaxChunkSize)
}
return nil, err
}

func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlexec.RecordSet, error) {
sctx := a.Ctx
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("executor.handleNoDelayExecutor", opentracing.ChildOf(span.Context()))
defer span1.Finish()
Expand All @@ -297,12 +385,114 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co
if err != nil {
return nil, err
}
return nil, err
}

func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
sctx := a.Ctx
txn, err := sctx.Txn(true)
if err != nil {
return err
}
txnCtx := sctx.GetSessionVars().TxnCtx
for {
_, err = a.handleNoDelayExecutor(ctx, e)
if err != nil {
return err
}
keys, err1 := txn.(pessimisticTxn).KeysNeedToLock()
if err1 != nil {
return err1
}
if len(keys) == 0 {
return nil
}
forUpdateTS := txnCtx.GetForUpdateTS()
err = txn.LockKeys(ctx, forUpdateTS, keys...)
e, err = a.handlePessimisticLockError(ctx, err)
if err != nil {
return err
}
if e == nil {
return nil
}
}
}

// handlePessimisticLockError updates TS and rebuild executor if the err is write conflict.
func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (Executor, error) {
if err == nil {
return nil, nil
}
errStr := err.Error()
if !strings.Contains(errStr, util.WriteConflictMarker) {
return nil, err
}
if a.retryCount >= config.GetGlobalConfig().PessimisticTxn.MaxRetryCount {
return nil, errors.New("pessimistic lock retry limit reached")
}
a.retryCount++
conflictTS := extractConflictTS(errStr)
if conflictTS == 0 {
logutil.Logger(ctx).Warn("failed to extract conflictTS from a conflict error")
}
sctx := a.Ctx
txnCtx := sctx.GetSessionVars().TxnCtx
forUpdateTS := txnCtx.GetForUpdateTS()
logutil.Logger(ctx).Info("pessimistic write conflict, retry statement",
zap.Uint64("txn", txnCtx.StartTS),
zap.Uint64("forUpdateTS", forUpdateTS),
zap.Uint64("conflictTS", conflictTS))
if conflictTS > txnCtx.GetForUpdateTS() {
txnCtx.SetForUpdateTS(conflictTS)
} else {
ts, err1 := sctx.GetStore().GetOracle().GetTimestamp(ctx)
if err1 != nil {
return nil, err1
}
txnCtx.SetForUpdateTS(ts)
}
e, err := a.buildExecutor()
if err != nil {
return nil, err
}
// Rollback the statement change before retry it.
sctx.StmtRollback()
sctx.GetSessionVars().StmtCtx.ResetForRetry()

if err = e.Open(ctx); err != nil {
return nil, err
}
return e, nil
}

func extractConflictTS(errStr string) uint64 {
strs := strings.Split(errStr, "conflictTS=")
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
if len(strs) != 2 {
return 0
}
tsPart := strs[1]
length := strings.IndexByte(tsPart, ',')
if length < 0 {
return 0
}
tsStr := tsPart[:length]
ts, err := strconv.ParseUint(tsStr, 10, 64)
if err != nil {
return 0
}
return ts
}

return nil, nil
type pessimisticTxn interface {
kv.Transaction
// KeysNeedToLock returns the keys need to be locked.
KeysNeedToLock() ([]kv.Key, error)
}

// buildExecutor build a executor from plan, prepared statement may need additional procedure.
func (a *ExecStmt) buildExecutor(ctx sessionctx.Context) (Executor, error) {
func (a *ExecStmt) buildExecutor() (Executor, error) {
ctx := a.Ctx
if _, ok := a.Plan.(*plannercore.Execute); !ok {
// Do not sync transaction for Execute statement, because the real optimization work is done in
// "ExecuteExec.Build".
Expand Down Expand Up @@ -344,14 +534,15 @@ func (a *ExecStmt) buildExecutor(ctx sessionctx.Context) (Executor, error) {

// ExecuteExec is not a real Executor, we only use it to build another Executor from a prepared statement.
if executorExec, ok := e.(*ExecuteExec); ok {
err := executorExec.Build()
err := executorExec.Build(b)
if err != nil {
return nil, err
}
a.isPreparedStmt = true
a.Plan = executorExec.plan
e = executorExec.stmtExec
}
a.isSelectForUpdate = b.isSelectForUpdate
return e, nil
}

Expand Down
2 changes: 1 addition & 1 deletion executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa
}

recordKey := e.table.RecordKey(row.handle)
err := txn.LockKeys(recordKey)
err := txn.LockKeys(ctx, 0, recordKey)
if err != nil {
return result, err
}
Expand Down
Loading