diff --git a/pkg/executor/BUILD.bazel b/pkg/executor/BUILD.bazel index b3b89fd9f2a2d..01fc53802837a 100644 --- a/pkg/executor/BUILD.bazel +++ b/pkg/executor/BUILD.bazel @@ -121,6 +121,7 @@ go_library( "//pkg/executor/lockstats", "//pkg/executor/metrics", "//pkg/executor/sortexec", + "//pkg/executor/staticrecordset", "//pkg/executor/unionexec", "//pkg/expression", "//pkg/expression/aggregation", diff --git a/pkg/executor/adapter.go b/pkg/executor/adapter.go index b6f34209f2f44..28864542ded64 100644 --- a/pkg/executor/adapter.go +++ b/pkg/executor/adapter.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/executor/internal/exec" executor_metrics "github.com/pingcap/tidb/pkg/executor/metrics" + "github.com/pingcap/tidb/pkg/executor/staticrecordset" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/keyspace" @@ -241,6 +242,17 @@ func (a *recordSet) OnFetchReturned() { a.stmt.LogSlowQuery(a.txnStartTS, len(a.lastErrs) == 0, true) } +// Detach creates a new `RecordSet` which doesn't depend on the current session context. +func (a *recordSet) TryDetach() (sqlexec.RecordSet, bool, error) { + // TODO: also detach the executor. Currently, the executor inside may contain the session context. Once + // the executor itself supports detach, we should also detach it here. + e, ok := a.executor.(*TableReaderExecutor) + if !ok { + return nil, false, nil + } + return staticrecordset.New(a.Fields(), e, a.stmt.GetTextToLog(false)), true, nil +} + // ExecStmt implements the sqlexec.Statement interface, it builds a planner.Plan to an sqlexec.Statement. type ExecStmt struct { // GoCtx stores parent go context.Context for a stmt. diff --git a/pkg/executor/staticrecordset/BUILD.bazel b/pkg/executor/staticrecordset/BUILD.bazel new file mode 100644 index 0000000000000..161aee7d4d05e --- /dev/null +++ b/pkg/executor/staticrecordset/BUILD.bazel @@ -0,0 +1,31 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "staticrecordset", + srcs = ["recordset.go"], + importpath = "github.com/pingcap/tidb/pkg/executor/staticrecordset", + visibility = ["//visibility:public"], + deps = [ + "//pkg/executor/internal/exec", + "//pkg/parser/ast", + "//pkg/util", + "//pkg/util/chunk", + "//pkg/util/logutil", + "//pkg/util/sqlexec", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "staticrecordset_test", + timeout = "short", + srcs = ["integration_test.go"], + flaky = True, + shard_count = 4, + deps = [ + "//pkg/testkit", + "//pkg/util/sqlexec", + "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//tikv", + ], +) diff --git a/pkg/executor/staticrecordset/integration_test.go b/pkg/executor/staticrecordset/integration_test.go new file mode 100644 index 0000000000000..5d13ca2fe1286 --- /dev/null +++ b/pkg/executor/staticrecordset/integration_test.go @@ -0,0 +1,150 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package staticrecordset_test + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/util/sqlexec" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/tikv" +) + +func TestStaticRecordSet(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("create table t(id int)") + tk.MustExec("insert into t values (1), (2), (3)") + + rs, err := tk.Exec("select * from t") + require.NoError(t, err) + drs := rs.(sqlexec.DetachableRecordSet) + srs, ok, err := drs.TryDetach() + require.True(t, ok) + require.NoError(t, err) + + // check schema + require.Len(t, srs.Fields(), 1) + require.Equal(t, "id", srs.Fields()[0].Column.Name.O) + + // check data + chk := srs.NewChunk(nil) + err = srs.Next(context.Background(), chk) + require.NoError(t, err) + require.Equal(t, 3, chk.NumRows()) + require.Equal(t, int64(1), chk.GetRow(0).GetInt64(0)) + require.Equal(t, int64(2), chk.GetRow(1).GetInt64(0)) + require.Equal(t, int64(3), chk.GetRow(2).GetInt64(0)) + + require.NoError(t, srs.Close()) +} + +func TestStaticRecordSetWithTxn(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("create table t(id int)") + tk.MustExec("insert into t values (1), (2), (3)") + + rs, err := tk.Exec("select * from t") + require.NoError(t, err) + txn, err := tk.Session().Txn(false) + require.NoError(t, err) + require.True(t, txn.Valid()) + drs := rs.(sqlexec.DetachableRecordSet) + srs, ok, err := drs.TryDetach() + require.True(t, ok) + require.NoError(t, err) + + // The transaction should have been committed. + txn, err = tk.Session().Txn(false) + require.NoError(t, err) + require.False(t, txn.Valid()) + + // Now, it's fine to run another statement on the session + // remove all existing data in the table + tk.MustExec("truncate table t") + + // check data + chk := srs.NewChunk(nil) + err = srs.Next(context.Background(), chk) + require.NoError(t, err) + require.Equal(t, 3, chk.NumRows()) + require.Equal(t, int64(1), chk.GetRow(0).GetInt64(0)) + require.Equal(t, int64(2), chk.GetRow(1).GetInt64(0)) + require.Equal(t, int64(3), chk.GetRow(2).GetInt64(0)) + + require.NoError(t, srs.Close()) +} + +func TestStaticRecordSetExceedGCTime(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("create table t(id int)") + tk.MustExec("insert into t values (1), (2), (3)") + + rs, err := tk.Exec("select * from t") + require.NoError(t, err) + // Get the startTS + txn, err := tk.Session().Txn(false) + require.NoError(t, err) + startTS := txn.StartTS() + + // Detach the record set + drs := rs.(sqlexec.DetachableRecordSet) + srs, ok, err := drs.TryDetach() + require.True(t, ok) + require.NoError(t, err) + + // Now, it's fine to run another statement on the session + // remove all existing data in the table + tk.MustExec("truncate table t") + + // Update the safe point + store.(tikv.Storage).UpdateSPCache(startTS+1, time.Now()) + + // Check data, it'll get an error + chk := srs.NewChunk(nil) + err = srs.Next(context.Background(), chk) + require.Error(t, err) + require.NoError(t, srs.Close()) +} + +func TestDetachError(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("create table t(id int)") + tk.MustExec("insert into t values (1), (2), (3)") + + // explicit transaction is not allowed + tk.MustExec("begin") + rs, err := tk.Exec("select * from t") + require.NoError(t, err) + drs2 := rs.(sqlexec.DetachableRecordSet) + _, ok, err := drs2.TryDetach() + require.False(t, ok) + require.NoError(t, err) + tk.MustExec("commit") +} diff --git a/pkg/executor/staticrecordset/recordset.go b/pkg/executor/staticrecordset/recordset.go new file mode 100644 index 0000000000000..973b868f79e7a --- /dev/null +++ b/pkg/executor/staticrecordset/recordset.go @@ -0,0 +1,79 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package staticrecordset + +import ( + "context" + + "github.com/pingcap/tidb/pkg/executor/internal/exec" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/sqlexec" + "go.uber.org/zap" +) + +var _ sqlexec.RecordSet = &staticRecordSet{} + +type staticRecordSet struct { + fields []*ast.ResultField + executor exec.Executor + + sqlText string +} + +// New creates a new staticRecordSet +func New(fields []*ast.ResultField, executor exec.Executor, sqlText string) sqlexec.RecordSet { + return &staticRecordSet{ + fields: fields, + executor: executor, + sqlText: sqlText, + } +} + +func (s *staticRecordSet) Fields() []*ast.ResultField { + return s.fields +} + +func (s *staticRecordSet) Next(ctx context.Context, req *chunk.Chunk) (err error) { + defer func() { + r := recover() + if r == nil { + return + } + err = util.GetRecoverError(r) + logutil.Logger(ctx).Error("execute sql panic", zap.String("sql", s.sqlText), zap.Stack("stack")) + }() + + return exec.Next(ctx, s.executor, req) +} + +// NewChunk create a chunk base on top-level executor's exec.NewFirstChunk(). +func (s *staticRecordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk { + if alloc == nil { + return exec.NewFirstChunk(s.executor) + } + + return alloc.Alloc(s.executor.RetFieldTypes(), s.executor.InitCap(), s.executor.MaxChunkSize()) +} + +// Close closes the executor. +func (s *staticRecordSet) Close() error { + err := exec.Close(s.executor) + s.executor = nil + + return err +} diff --git a/pkg/session/session.go b/pkg/session/session.go index 58e41a7319362..02b53650381e3 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -2383,6 +2383,40 @@ func (rs *execStmtResult) Close() error { return err2 } +func (rs *execStmtResult) TryDetach() (sqlexec.RecordSet, bool, error) { + if !rs.sql.IsReadOnly(rs.se.GetSessionVars()) { + return nil, false, nil + } + if !plannercore.IsAutoCommitTxn(rs.se.GetSessionVars()) { + return nil, false, nil + } + + drs, ok := rs.RecordSet.(sqlexec.DetachableRecordSet) + if !ok { + return nil, false, nil + } + detachedRS, ok, err := drs.TryDetach() + if !ok || err != nil { + return nil, ok, err + } + + // FIXME: block the min-start-ts. Now the `min-start-ts` will advance and exceed the `startTS` used by + // this detached record set, and may cause an error if `GC` runs. + + // Now, a transaction is not needed for the detached record set, so we commit the transaction and cleanup + // the session state. + err = finishStmt(context.Background(), rs.se, nil, rs.sql) + if err != nil { + err2 := detachedRS.Close() + if err2 != nil { + logutil.BgLogger().Error("close detached record set failed", zap.Error(err2)) + } + return nil, false, err + } + + return detachedRS, true, nil +} + // rollbackOnError makes sure the next statement starts a new transaction with the latest InfoSchema. func (s *session) rollbackOnError(ctx context.Context) { if !s.sessionVars.InTxn() { diff --git a/pkg/util/sqlexec/restricted_sql_executor.go b/pkg/util/sqlexec/restricted_sql_executor.go index 0bc90cdb6766f..a1dd7142afa0c 100644 --- a/pkg/util/sqlexec/restricted_sql_executor.go +++ b/pkg/util/sqlexec/restricted_sql_executor.go @@ -199,6 +199,22 @@ type RecordSet interface { Close() error } +// DetachableRecordSet extends the `RecordSet` to support detaching from current session context +type DetachableRecordSet interface { + RecordSet + + // TryDetach detaches the record set from the current session context. + // + // The last two return value indicates whether the record set is suitable for detaching, and whether + // it detaches successfully. If it faces any error during detaching (and there is no way to rollback), + // it will return the error. If an error is returned, the record set (and session) will be left at + // an unknown state. + // + // If the caller receives `_, false, _`, it means the original record set can still be used. If the caller + // receives an error, it means the original record set (and the session) is dirty. + TryDetach() (RecordSet, bool, error) +} + // MultiQueryNoDelayResult is an interface for one no-delay result for one statement in multi-queries. type MultiQueryNoDelayResult interface { // AffectedRows return affected row for one statement in multi-queries.