Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: stale reads compatible with prepare #25156

Merged
merged 17 commits into from
Jun 11, 2021
12 changes: 5 additions & 7 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
}
}
if a.PsStmt.Executor == nil {
b := newExecutorBuilder(a.Ctx, is, a.Ti)
b := newExecutorBuilder(a.Ctx, is, a.Ti, a.SnapshotTS, a.ExplicitStaleness)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a.ExplicitStaleness is added as a parameter, found #25206 lost the point get case, just like me.

newExecutor := b.build(a.Plan)
if b.err != nil {
return nil, b.err
Expand Down Expand Up @@ -289,7 +289,7 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) {
return 0, err
}
a.InfoSchema = ret.InfoSchema
a.SnapshotTS = ret.SnapshotTS
a.SnapshotTS = ret.LastSnapshotTS
a.ExplicitStaleness = ret.ExplicitStaleness
p, names, err := planner.Optimize(ctx, a.Ctx, a.StmtNode, a.InfoSchema)
if err != nil {
Expand Down Expand Up @@ -336,7 +336,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
if n, ok := val.(int); ok {
startTS := oracle.ExtractPhysical(a.SnapshotTS) / 1000
if n != int(startTS) {
panic("different tso")
panic(fmt.Sprintf("different tso %d != %d", n, startTS))
}
failpoint.Return()
}
Expand All @@ -346,7 +346,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
// Convert to seconds
startTS := oracle.ExtractPhysical(a.SnapshotTS) / 1000
if int(startTS) <= n-1 || n+1 <= int(startTS) {
panic("tso violate tolerance")
panic(fmt.Sprintf("different tso %d != %d", n, startTS))
}
failpoint.Return()
}
Expand Down Expand Up @@ -792,9 +792,7 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
}

b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti)
b.snapshotTS = a.SnapshotTS
b.explicitStaleness = a.ExplicitStaleness
b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti, a.SnapshotTS, a.ExplicitStaleness)
e := b.build(a.Plan)
if b.err != nil {
return nil, errors.Trace(b.err)
Expand Down
10 changes: 5 additions & 5 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func buildHashAggExecutor(ctx sessionctx.Context, src Executor, schema *expressi
plan.SetSchema(schema)
plan.Init(ctx, nil, 0)
plan.SetChildren(nil)
b := newExecutorBuilder(ctx, nil, nil)
b := newExecutorBuilder(ctx, nil, nil, 0, false)
exec := b.build(plan)
hashAgg := exec.(*HashAggExec)
hashAgg.children[0] = src
Expand Down Expand Up @@ -342,7 +342,7 @@ func buildStreamAggExecutor(ctx sessionctx.Context, srcExec Executor, schema *ex
plan = sg
}

b := newExecutorBuilder(ctx, nil, nil)
b := newExecutorBuilder(ctx, nil, nil, 0, false)
return b.build(plan)
}

Expand Down Expand Up @@ -575,7 +575,7 @@ func buildWindowExecutor(ctx sessionctx.Context, windowFunc string, funcs int, f
plan = win
}

b := newExecutorBuilder(ctx, nil, nil)
b := newExecutorBuilder(ctx, nil, nil, 0, false)
exec := b.build(plan)
return exec
}
Expand Down Expand Up @@ -1322,7 +1322,7 @@ func prepare4IndexInnerHashJoin(tc *indexJoinTestCase, outerDS *mockDataSource,
hashCols: tc.outerHashKeyIdx,
},
innerCtx: innerCtx{
readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil, nil)},
readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil, nil, 0, false)},
rowTypes: rightTypes,
colLens: colLens,
keyCols: tc.innerJoinKeyIdx,
Expand Down Expand Up @@ -1388,7 +1388,7 @@ func prepare4IndexMergeJoin(tc *indexJoinTestCase, outerDS *mockDataSource, inne
compareFuncs: outerCompareFuncs,
},
innerMergeCtx: innerMergeCtx{
readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil, nil)},
readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil, nil, 0, false)},
rowTypes: rightTypes,
joinKeys: innerJoinKeys,
colLens: colLens,
Expand Down
14 changes: 10 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,13 @@ type CTEStorages struct {
IterInTbl cteutil.Storage
}

func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo) *executorBuilder {
func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, snapshotTS uint64, explicitStaleness bool) *executorBuilder {
return &executorBuilder{
ctx: ctx,
is: is,
Ti: ti,
ctx: ctx,
is: is,
Ti: ti,
snapshotTS: snapshotTS,
explicitStaleness: explicitStaleness,
}
}

Expand Down Expand Up @@ -663,6 +665,10 @@ func (b *executorBuilder) buildPrepare(v *plannercore.Prepare) Executor {
}

func (b *executorBuilder) buildExecute(v *plannercore.Execute) Executor {
b.snapshotTS = v.SnapshotTS
if b.snapshotTS != 0 {
b.is, b.err = domain.GetDomain(b.ctx).GetSnapshotInfoSchema(b.snapshotTS)
}
e := &ExecuteExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
is: b.is,
Expand Down
2 changes: 1 addition & 1 deletion executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
}
return &ExecStmt{
GoCtx: ctx,
SnapshotTS: ret.SnapshotTS,
SnapshotTS: ret.LastSnapshotTS,
ExplicitStaleness: ret.ExplicitStaleness,
InfoSchema: ret.InfoSchema,
Plan: finalPlan,
Expand Down
2 changes: 1 addition & 1 deletion executor/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (h *CoprocessorDAGHandler) buildDAGExecutor(req *coprocessor.Request) (Exec
}
plan = core.InjectExtraProjection(plan)
// Build executor.
b := newExecutorBuilder(h.sctx, is, nil)
b := newExecutorBuilder(h.sctx, is, nil, 0, false)
return b.build(plan), nil
}

Expand Down
2 changes: 1 addition & 1 deletion executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ func buildMergeJoinExec(ctx sessionctx.Context, joinType plannercore.JoinType, i
j.CompareFuncs = append(j.CompareFuncs, expression.GetCmpFunction(nil, j.LeftJoinKeys[i], j.RightJoinKeys[i]))
}

b := newExecutorBuilder(ctx, nil, nil)
b := newExecutorBuilder(ctx, nil, nil, 0, false)
return b.build(j)
}

Expand Down
15 changes: 8 additions & 7 deletions executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,12 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error {
}

preparedObj := &plannercore.CachedPrepareStmt{
PreparedAst: prepared,
VisitInfos: destBuilder.GetVisitInfo(),
NormalizedSQL: normalizedSQL,
SQLDigest: digest,
ForUpdateRead: destBuilder.GetIsForUpdateRead(),
PreparedAst: prepared,
VisitInfos: destBuilder.GetVisitInfo(),
NormalizedSQL: normalizedSQL,
SQLDigest: digest,
ForUpdateRead: destBuilder.GetIsForUpdateRead(),
SnapshotTSEvaluator: ret.SnapshotTSEvaluator,
}
return vars.AddPreparedStmt(e.ID, preparedObj)
}
Expand Down Expand Up @@ -314,7 +315,7 @@ func (e *DeallocateExec) Next(ctx context.Context, req *chunk.Chunk) error {

// CompileExecutePreparedStmt compiles a session Execute command to a stmt.Statement.
func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context,
ID uint32, args []types.Datum) (sqlexec.Statement, bool, bool, error) {
ID uint32, is infoschema.InfoSchema, snapshotTS uint64, args []types.Datum) (sqlexec.Statement, bool, bool, error) {
startTime := time.Now()
defer func() {
sctx.GetSessionVars().DurationCompile = time.Since(startTime)
Expand All @@ -324,7 +325,6 @@ func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context,
return nil, false, false, err
}
execStmt.BinaryArgs = args
is := sctx.GetInfoSchema().(infoschema.InfoSchema)
execPlan, names, err := planner.Optimize(ctx, sctx, execStmt, is)
if err != nil {
return nil, false, false, err
Expand All @@ -338,6 +338,7 @@ func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context,
Ctx: sctx,
OutputNames: names,
Ti: &TelemetryInfo{},
SnapshotTS: snapshotTS,
}
if preparedPointer, ok := sctx.GetSessionVars().PreparedStmts[ID]; ok {
preparedObj, ok := preparedPointer.(*plannercore.CachedPrepareStmt)
Expand Down
4 changes: 3 additions & 1 deletion executor/seqtest/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/metrics"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/session"
Expand Down Expand Up @@ -143,7 +144,8 @@ func (s *seqTestSuite) TestPrepared(c *C) {
tk.ResultSetToResult(rs, Commentf("%v", rs)).Check(testkit.Rows())

// Check that ast.Statement created by executor.CompileExecutePreparedStmt has query text.
stmt, _, _, err := executor.CompileExecutePreparedStmt(context.TODO(), tk.Se, stmtID, []types.Datum{types.NewDatum(1)})
stmt, _, _, err := executor.CompileExecutePreparedStmt(context.TODO(), tk.Se, stmtID,
tk.Se.GetInfoSchema().(infoschema.InfoSchema), 0, []types.Datum{types.NewDatum(1)})
c.Assert(err, IsNil)
c.Assert(stmt.OriginText(), Equals, query)

Expand Down
77 changes: 77 additions & 0 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,3 +772,80 @@ func (s *testStaleTxnSuite) TestSetTransactionInfoSchema(c *C) {
tk.MustExec("commit")
c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer3)
}

func (s *testStaleTxnSuite) TestStaleSelect(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
defer tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int)")

tolerance := 50 * time.Millisecond

tk.MustExec("insert into t values (1)")
time.Sleep(tolerance)
time1 := time.Now()

tk.MustExec("insert into t values (2)")
time.Sleep(tolerance)
time2 := time.Now()

tk.MustExec("insert into t values (3)")
time.Sleep(tolerance)

staleRows := testkit.Rows("1")
staleSQL := fmt.Sprintf(`select * from t as of timestamp '%s'`, time1.Format("2006-1-2 15:04:05.000"))
Copy link
Member

@nolouch nolouch Jun 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen in this situation?

create table t (id int)
insert into t values (1)                                            ---- 1s
prepare s from  select * from t as of timestamp Now() - INTERVAL 2 Second  -----2s
execute s ;                                                         ----- 4s
alter table t add column age int;                                ----- 5s
insert into t values (2, 2)                                      --------5s
execute s;                                                         ----- 8s

maybe we need to store the expression rather than SanpshotTS.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to prepare the handle of infoschema in #25080, but that is not true anymore. The handle of infoschema is now added.

Copy link
Member

@nolouch nolouch Jun 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to get the result (2, 2) in the time point 8s. but I guess you implement will always get the result 1.

Copy link
Contributor Author

@xhebox xhebox Jun 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the timestamp is fixed once preprocessed now.

Your expectation needs a re-preprocessing, which drops all the prepared efforts. It is not a simple reason of my implementation. If timestamp is changed, schema and snapshot is changed, the result from previous prepare statements is meaningless.

Its semantic may be intuitive, but it does not make much sense under our current framework of prepare statements.

A full support, I guess, will need to store the expr of as of into the plan. And the optimizer should consider infoschema and snapshot in their optimization process. Then it will be much easier to implement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could provide a hack solution, if you really want it to be true.

Copy link
Contributor Author

@xhebox xhebox Jun 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it must be re-preprocessed. Infoschema may change. Even if it is not changed, a check for stale infoschema is needed, which leads to one more tikv metadata access.

Otherwise, the result is inconsistent.

Agree with you.

Glad to hear that but it can not be true for now. I can only hack around. The solution will be tricky.

Copy link
Member

@nolouch nolouch Jun 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I mean I can accept re-preprocessed, cannot accept the result is fixed on the point in mine example. BTW, Even if we have the infoschema cache, still we need to access TiKV to get the schema version for every request?

Copy link
Contributor Author

@xhebox xhebox Jun 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No for the current solution. Because it is already ensured when prepare. If you want to ensure the consistency for NOW() - 2 seconds, however, yes, you need extra request. You don't know the schema version before 2 seconds. It is not prepared due to NOW(). If you don't check, it may be outdated.

FYI, check

tidb/domain/domain.go

Lines 103 to 107 in 5c95062

m := meta.NewSnapshotMeta(snapshot)
neededSchemaVersion, err := m.GetSchemaVersion()
if err != nil {
return nil, false, 0, nil, err
}
for the code of getting SchemaVersion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have checked it, the implementation is different from what I understand. I think the schema version is guaranteed to increase, it could be optimized to reduce the extra request by check the schema version is not changed between the older request and the up-to-date schema version.

Copy link
Member

@nolouch nolouch Jun 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, it is easy to have a single point of bottleneck with the current implementation, because the meta information is always read from a certain TiKV node.


// test normal stale select
tk.MustQuery(staleSQL).Check(staleRows)

// test stale select in txn
tk.MustExec("begin")
c.Assert(tk.ExecToErr(staleSQL), NotNil)
tk.MustExec("commit")

// test prepared stale select
tk.MustExec(fmt.Sprintf(`prepare s from "%s"`, staleSQL))
tk.MustQuery("execute s")

// test prepared stale select in txn
tk.MustExec("begin")
tk.MustQuery("execute s").Check(staleRows)
tk.MustExec("commit")

// test stale select in stale txn
tk.MustExec(fmt.Sprintf(`start transaction read only as of timestamp '%s'`, time2.Format("2006-1-2 15:04:05.000")))
c.Assert(tk.ExecToErr(staleSQL), NotNil)
tk.MustExec("commit")

// test prepared stale select in stale txn
tk.MustExec(fmt.Sprintf(`start transaction read only as of timestamp '%s'`, time2.Format("2006-1-2 15:04:05.000")))
tk.MustQuery("execute s").Check(staleRows)
tk.MustExec("commit")
Comment on lines +817 to +824
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is execute s expected to be executed successfully in staleness transaction while the staleSQL are not?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about not in prepare?

Copy link
Contributor Author

@xhebox xhebox Jun 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is expected. And it should be possible to throw an error in OptimizePreparedPlan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this is not expected. We should avoid multiple snapshotTS in a transaction as users may think it's data inconsistency error during transaction.


// test prepared stale select with schema change
tk.MustExec("alter table t add column c int")
tk.MustExec("insert into t values (4, 5)")
time.Sleep(10 * time.Millisecond)
tk.MustQuery("execute s").Check(staleRows)

// test dynamic timestamp stale select
time3 := time.Now()
tk.MustExec("alter table t add column d int")
tk.MustExec("insert into t values (4, 4, 4)")
time.Sleep(tolerance)
time4 := time.Now()
staleRows = testkit.Rows("1 <nil>", "2 <nil>", "3 <nil>", "4 5")
tk.MustQuery(fmt.Sprintf("select * from t as of timestamp CURRENT_TIMESTAMP(3) - INTERVAL %d MICROSECOND", time4.Sub(time3).Microseconds())).Check(staleRows)

// test prepared dynamic timestamp stale select
time5 := time.Now()
tk.MustExec(fmt.Sprintf(`prepare v from "select * from t as of timestamp CURRENT_TIMESTAMP(3) - INTERVAL %d MICROSECOND"`, time5.Sub(time3).Microseconds()))
tk.MustQuery("execute v").Check(staleRows)

// test point get
time6 := time.Now()
tk.MustExec("insert into t values (5, 5, 5)")
time.Sleep(tolerance)
tk.MustQuery(fmt.Sprintf("select * from t as of timestamp '%s' where c=5", time6.Format("2006-1-2 15:04:05.000"))).Check(testkit.Rows("4 5 <nil>"))
}
20 changes: 11 additions & 9 deletions planner/core/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
Expand Down Expand Up @@ -194,13 +195,14 @@ func NewPSTMTPlanCacheValue(plan Plan, names []*types.FieldName, srcMap map[*mod

// CachedPrepareStmt store prepared ast from PrepareExec and other related fields
type CachedPrepareStmt struct {
PreparedAst *ast.Prepared
VisitInfos []visitInfo
ColumnInfos interface{}
Executor interface{}
NormalizedSQL string
NormalizedPlan string
SQLDigest *parser.Digest
PlanDigest *parser.Digest
ForUpdateRead bool
PreparedAst *ast.Prepared
VisitInfos []visitInfo
ColumnInfos interface{}
Executor interface{}
NormalizedSQL string
NormalizedPlan string
SQLDigest *parser.Digest
PlanDigest *parser.Digest
ForUpdateRead bool
SnapshotTSEvaluator func(sessionctx.Context) (uint64, error)
}
18 changes: 17 additions & 1 deletion planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -182,6 +183,7 @@ type Execute struct {
UsingVars []expression.Expression
PrepareParams []types.Datum
ExecID uint32
SnapshotTS uint64
Stmt ast.StmtNode
StmtType string
Plan Plan
Expand Down Expand Up @@ -256,6 +258,20 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont
}
}

var snapshotTS uint64
if preparedObj.SnapshotTSEvaluator != nil {
// if preparedObj.SnapshotTSEvaluator != nil, it is a stale read SQL:
// which means its infoschema is specified by the SQL, not the current/latest infoschema
var err error
snapshotTS, err = preparedObj.SnapshotTSEvaluator(sctx)
if err != nil {
return errors.Trace(err)
}
is, err = domain.GetDomain(sctx).GetSnapshotInfoSchema(snapshotTS)
if err != nil {
return errors.Trace(err)
}
}
if prepared.SchemaVersion != is.SchemaMetaVersion() {
// In order to avoid some correctness issues, we have to clear the
// cached plan once the schema version is changed.
Expand All @@ -265,7 +281,6 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont
preparedObj.Executor = nil
// If the schema version has changed we need to preprocess it again,
// if this time it failed, the real reason for the error is schema changed.
// FIXME: compatible with prepare https://github.com/pingcap/tidb/issues/24932
ret := &PreprocessorReturn{InfoSchema: is}
err := Preprocess(sctx, prepared.Stmt, InPrepare, WithPreprocessorReturn(ret))
if err != nil {
Expand All @@ -277,6 +292,7 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont
if err != nil {
return err
}
e.SnapshotTS = snapshotTS
e.Stmt = prepared.Stmt
return nil
}
Expand Down
Loading