Skip to content

Commit

Permalink
executor, session: Add Detach() method to detach the record set (#54091)
Browse files Browse the repository at this point in the history
close #54090
  • Loading branch information
YangKeao authored Jun 24, 2024
1 parent bec113a commit e036274
Show file tree
Hide file tree
Showing 7 changed files with 323 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ go_library(
"//pkg/executor/lockstats",
"//pkg/executor/metrics",
"//pkg/executor/sortexec",
"//pkg/executor/staticrecordset",
"//pkg/executor/unionexec",
"//pkg/expression",
"//pkg/expression/aggregation",
Expand Down
12 changes: 12 additions & 0 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
executor_metrics "github.com/pingcap/tidb/pkg/executor/metrics"
"github.com/pingcap/tidb/pkg/executor/staticrecordset"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/keyspace"
Expand Down Expand Up @@ -241,6 +242,17 @@ func (a *recordSet) OnFetchReturned() {
a.stmt.LogSlowQuery(a.txnStartTS, len(a.lastErrs) == 0, true)
}

// Detach creates a new `RecordSet` which doesn't depend on the current session context.
func (a *recordSet) TryDetach() (sqlexec.RecordSet, bool, error) {
// TODO: also detach the executor. Currently, the executor inside may contain the session context. Once
// the executor itself supports detach, we should also detach it here.
e, ok := a.executor.(*TableReaderExecutor)
if !ok {
return nil, false, nil
}
return staticrecordset.New(a.Fields(), e, a.stmt.GetTextToLog(false)), true, nil
}

// ExecStmt implements the sqlexec.Statement interface, it builds a planner.Plan to an sqlexec.Statement.
type ExecStmt struct {
// GoCtx stores parent go context.Context for a stmt.
Expand Down
31 changes: 31 additions & 0 deletions pkg/executor/staticrecordset/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "staticrecordset",
srcs = ["recordset.go"],
importpath = "github.com/pingcap/tidb/pkg/executor/staticrecordset",
visibility = ["//visibility:public"],
deps = [
"//pkg/executor/internal/exec",
"//pkg/parser/ast",
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/logutil",
"//pkg/util/sqlexec",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "staticrecordset_test",
timeout = "short",
srcs = ["integration_test.go"],
flaky = True,
shard_count = 4,
deps = [
"//pkg/testkit",
"//pkg/util/sqlexec",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
],
)
150 changes: 150 additions & 0 deletions pkg/executor/staticrecordset/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package staticrecordset_test

import (
"context"
"testing"
"time"

"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikv"
)

func TestStaticRecordSet(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table t(id int)")
tk.MustExec("insert into t values (1), (2), (3)")

rs, err := tk.Exec("select * from t")
require.NoError(t, err)
drs := rs.(sqlexec.DetachableRecordSet)
srs, ok, err := drs.TryDetach()
require.True(t, ok)
require.NoError(t, err)

// check schema
require.Len(t, srs.Fields(), 1)
require.Equal(t, "id", srs.Fields()[0].Column.Name.O)

// check data
chk := srs.NewChunk(nil)
err = srs.Next(context.Background(), chk)
require.NoError(t, err)
require.Equal(t, 3, chk.NumRows())
require.Equal(t, int64(1), chk.GetRow(0).GetInt64(0))
require.Equal(t, int64(2), chk.GetRow(1).GetInt64(0))
require.Equal(t, int64(3), chk.GetRow(2).GetInt64(0))

require.NoError(t, srs.Close())
}

func TestStaticRecordSetWithTxn(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table t(id int)")
tk.MustExec("insert into t values (1), (2), (3)")

rs, err := tk.Exec("select * from t")
require.NoError(t, err)
txn, err := tk.Session().Txn(false)
require.NoError(t, err)
require.True(t, txn.Valid())
drs := rs.(sqlexec.DetachableRecordSet)
srs, ok, err := drs.TryDetach()
require.True(t, ok)
require.NoError(t, err)

// The transaction should have been committed.
txn, err = tk.Session().Txn(false)
require.NoError(t, err)
require.False(t, txn.Valid())

// Now, it's fine to run another statement on the session
// remove all existing data in the table
tk.MustExec("truncate table t")

// check data
chk := srs.NewChunk(nil)
err = srs.Next(context.Background(), chk)
require.NoError(t, err)
require.Equal(t, 3, chk.NumRows())
require.Equal(t, int64(1), chk.GetRow(0).GetInt64(0))
require.Equal(t, int64(2), chk.GetRow(1).GetInt64(0))
require.Equal(t, int64(3), chk.GetRow(2).GetInt64(0))

require.NoError(t, srs.Close())
}

func TestStaticRecordSetExceedGCTime(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table t(id int)")
tk.MustExec("insert into t values (1), (2), (3)")

rs, err := tk.Exec("select * from t")
require.NoError(t, err)
// Get the startTS
txn, err := tk.Session().Txn(false)
require.NoError(t, err)
startTS := txn.StartTS()

// Detach the record set
drs := rs.(sqlexec.DetachableRecordSet)
srs, ok, err := drs.TryDetach()
require.True(t, ok)
require.NoError(t, err)

// Now, it's fine to run another statement on the session
// remove all existing data in the table
tk.MustExec("truncate table t")

// Update the safe point
store.(tikv.Storage).UpdateSPCache(startTS+1, time.Now())

// Check data, it'll get an error
chk := srs.NewChunk(nil)
err = srs.Next(context.Background(), chk)
require.Error(t, err)
require.NoError(t, srs.Close())
}

func TestDetachError(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table t(id int)")
tk.MustExec("insert into t values (1), (2), (3)")

// explicit transaction is not allowed
tk.MustExec("begin")
rs, err := tk.Exec("select * from t")
require.NoError(t, err)
drs2 := rs.(sqlexec.DetachableRecordSet)
_, ok, err := drs2.TryDetach()
require.False(t, ok)
require.NoError(t, err)
tk.MustExec("commit")
}
79 changes: 79 additions & 0 deletions pkg/executor/staticrecordset/recordset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package staticrecordset

import (
"context"

"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"go.uber.org/zap"
)

var _ sqlexec.RecordSet = &staticRecordSet{}

type staticRecordSet struct {
fields []*ast.ResultField
executor exec.Executor

sqlText string
}

// New creates a new staticRecordSet
func New(fields []*ast.ResultField, executor exec.Executor, sqlText string) sqlexec.RecordSet {
return &staticRecordSet{
fields: fields,
executor: executor,
sqlText: sqlText,
}
}

func (s *staticRecordSet) Fields() []*ast.ResultField {
return s.fields
}

func (s *staticRecordSet) Next(ctx context.Context, req *chunk.Chunk) (err error) {
defer func() {
r := recover()
if r == nil {
return
}
err = util.GetRecoverError(r)
logutil.Logger(ctx).Error("execute sql panic", zap.String("sql", s.sqlText), zap.Stack("stack"))
}()

return exec.Next(ctx, s.executor, req)
}

// NewChunk create a chunk base on top-level executor's exec.NewFirstChunk().
func (s *staticRecordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {
if alloc == nil {
return exec.NewFirstChunk(s.executor)
}

return alloc.Alloc(s.executor.RetFieldTypes(), s.executor.InitCap(), s.executor.MaxChunkSize())
}

// Close closes the executor.
func (s *staticRecordSet) Close() error {
err := exec.Close(s.executor)
s.executor = nil

return err
}
34 changes: 34 additions & 0 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2383,6 +2383,40 @@ func (rs *execStmtResult) Close() error {
return err2
}

func (rs *execStmtResult) TryDetach() (sqlexec.RecordSet, bool, error) {
if !rs.sql.IsReadOnly(rs.se.GetSessionVars()) {
return nil, false, nil
}
if !plannercore.IsAutoCommitTxn(rs.se.GetSessionVars()) {
return nil, false, nil
}

drs, ok := rs.RecordSet.(sqlexec.DetachableRecordSet)
if !ok {
return nil, false, nil
}
detachedRS, ok, err := drs.TryDetach()
if !ok || err != nil {
return nil, ok, err
}

// FIXME: block the min-start-ts. Now the `min-start-ts` will advance and exceed the `startTS` used by
// this detached record set, and may cause an error if `GC` runs.

// Now, a transaction is not needed for the detached record set, so we commit the transaction and cleanup
// the session state.
err = finishStmt(context.Background(), rs.se, nil, rs.sql)
if err != nil {
err2 := detachedRS.Close()
if err2 != nil {
logutil.BgLogger().Error("close detached record set failed", zap.Error(err2))
}
return nil, false, err
}

return detachedRS, true, nil
}

// rollbackOnError makes sure the next statement starts a new transaction with the latest InfoSchema.
func (s *session) rollbackOnError(ctx context.Context) {
if !s.sessionVars.InTxn() {
Expand Down
16 changes: 16 additions & 0 deletions pkg/util/sqlexec/restricted_sql_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,22 @@ type RecordSet interface {
Close() error
}

// DetachableRecordSet extends the `RecordSet` to support detaching from current session context
type DetachableRecordSet interface {
RecordSet

// TryDetach detaches the record set from the current session context.
//
// The last two return value indicates whether the record set is suitable for detaching, and whether
// it detaches successfully. If it faces any error during detaching (and there is no way to rollback),
// it will return the error. If an error is returned, the record set (and session) will be left at
// an unknown state.
//
// If the caller receives `_, false, _`, it means the original record set can still be used. If the caller
// receives an error, it means the original record set (and the session) is dirty.
TryDetach() (RecordSet, bool, error)
}

// MultiQueryNoDelayResult is an interface for one no-delay result for one statement in multi-queries.
type MultiQueryNoDelayResult interface {
// AffectedRows return affected row for one statement in multi-queries.
Expand Down

0 comments on commit e036274

Please sign in to comment.