Skip to content

Commit e52d8ab

Browse files
authored
planner: refactor some code related to binding (#59895)
ref #51347
1 parent b0efc0f commit e52d8ab

File tree

5 files changed

+302
-267
lines changed

5 files changed

+302
-267
lines changed

pkg/bindinfo/BUILD.bazel

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go_library(
55
srcs = [
66
"binding.go",
77
"binding_cache.go",
8+
"binding_operator.go",
89
"global_handle.go",
910
"session_handle.go",
1011
"utils.go",
@@ -46,7 +47,7 @@ go_test(
4647
timeout = "moderate",
4748
srcs = [
4849
"binding_cache_test.go",
49-
"global_handle_test.go",
50+
"binding_operator_test.go",
5051
"main_test.go",
5152
"session_handle_test.go",
5253
],

pkg/bindinfo/binding_operator.go

+233
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package bindinfo
16+
17+
import (
18+
"strings"
19+
"time"
20+
21+
"github.com/pingcap/errors"
22+
"github.com/pingcap/failpoint"
23+
"github.com/pingcap/tidb/pkg/parser/mysql"
24+
"github.com/pingcap/tidb/pkg/sessionctx"
25+
"github.com/pingcap/tidb/pkg/types"
26+
"github.com/pingcap/tidb/pkg/util"
27+
)
28+
29+
// BindingOperator is used to operate (create/drop/update/GC) bindings.
30+
type BindingOperator interface {
31+
// CreateGlobalBinding creates a Bindings to the storage and the cache.
32+
// It replaces all the exists bindings for the same normalized SQL.
33+
CreateGlobalBinding(sctx sessionctx.Context, bindings []*Binding) (err error)
34+
35+
// DropGlobalBinding drop Bindings to the storage and Bindings int the cache.
36+
DropGlobalBinding(sqlDigests []string) (deletedRows uint64, err error)
37+
38+
// SetGlobalBindingStatus set a Bindings's status to the storage and bind cache.
39+
SetGlobalBindingStatus(newStatus, sqlDigest string) (ok bool, err error)
40+
41+
// GCGlobalBinding physically removes the deleted bind records in mysql.bind_info.
42+
GCGlobalBinding() (err error)
43+
}
44+
45+
type bindingOperator struct {
46+
cache BindingCacheUpdater
47+
sPool util.DestroyableSessionPool
48+
}
49+
50+
func newBindingOperator(sPool util.DestroyableSessionPool, cache BindingCacheUpdater) BindingOperator {
51+
return &bindingOperator{
52+
sPool: sPool,
53+
cache: cache,
54+
}
55+
}
56+
57+
// CreateGlobalBinding creates a Bindings to the storage and the cache.
58+
// It replaces all the exists bindings for the same normalized SQL.
59+
func (op *bindingOperator) CreateGlobalBinding(sctx sessionctx.Context, bindings []*Binding) (err error) {
60+
for _, binding := range bindings {
61+
if err := prepareHints(sctx, binding); err != nil {
62+
return err
63+
}
64+
}
65+
defer func() {
66+
if err == nil {
67+
err = op.cache.LoadFromStorageToCache(false)
68+
}
69+
}()
70+
71+
return callWithSCtx(op.sPool, true, func(sctx sessionctx.Context) error {
72+
// Lock mysql.bind_info to synchronize with CreateBinding / AddBinding / DropBinding on other tidb instances.
73+
if err = lockBindInfoTable(sctx); err != nil {
74+
return err
75+
}
76+
77+
for i, binding := range bindings {
78+
now := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3)
79+
80+
updateTs := now.String()
81+
_, err = exec(
82+
sctx,
83+
`UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %?`,
84+
StatusDeleted,
85+
updateTs,
86+
binding.OriginalSQL,
87+
updateTs,
88+
)
89+
if err != nil {
90+
return err
91+
}
92+
93+
binding.CreateTime = now
94+
binding.UpdateTime = now
95+
96+
// Insert the Bindings to the storage.
97+
_, err = exec(
98+
sctx,
99+
`INSERT INTO mysql.bind_info VALUES (%?,%?, %?, %?, %?, %?, %?, %?, %?, %?, %?)`,
100+
binding.OriginalSQL,
101+
binding.BindSQL,
102+
strings.ToLower(binding.Db),
103+
binding.Status,
104+
binding.CreateTime.String(),
105+
binding.UpdateTime.String(),
106+
binding.Charset,
107+
binding.Collation,
108+
binding.Source,
109+
binding.SQLDigest,
110+
binding.PlanDigest,
111+
)
112+
failpoint.Inject("CreateGlobalBindingNthFail", func(val failpoint.Value) {
113+
n := val.(int)
114+
if n == i {
115+
err = errors.NewNoStackError("An injected error")
116+
}
117+
})
118+
if err != nil {
119+
return err
120+
}
121+
}
122+
return nil
123+
})
124+
}
125+
126+
// DropGlobalBinding drop Bindings to the storage and Bindings int the cache.
127+
func (op *bindingOperator) DropGlobalBinding(sqlDigests []string) (deletedRows uint64, err error) {
128+
if len(sqlDigests) == 0 {
129+
return 0, errors.New("sql digest is empty")
130+
}
131+
defer func() {
132+
if err == nil {
133+
err = op.cache.LoadFromStorageToCache(false)
134+
}
135+
}()
136+
137+
err = callWithSCtx(op.sPool, true, func(sctx sessionctx.Context) error {
138+
// Lock mysql.bind_info to synchronize with CreateBinding / AddBinding / DropBinding on other tidb instances.
139+
if err = lockBindInfoTable(sctx); err != nil {
140+
return err
141+
}
142+
143+
for _, sqlDigest := range sqlDigests {
144+
updateTs := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3).String()
145+
_, err = exec(
146+
sctx,
147+
`UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE sql_digest = %? AND update_time < %? AND status != %?`,
148+
StatusDeleted,
149+
updateTs,
150+
sqlDigest,
151+
updateTs,
152+
StatusDeleted,
153+
)
154+
if err != nil {
155+
return err
156+
}
157+
deletedRows += sctx.GetSessionVars().StmtCtx.AffectedRows()
158+
}
159+
return nil
160+
})
161+
if err != nil {
162+
deletedRows = 0
163+
}
164+
return deletedRows, err
165+
}
166+
167+
// SetGlobalBindingStatus set a Bindings's status to the storage and bind cache.
168+
func (op *bindingOperator) SetGlobalBindingStatus(newStatus, sqlDigest string) (ok bool, err error) {
169+
var (
170+
updateTs types.Time
171+
oldStatus0, oldStatus1 string
172+
)
173+
if newStatus == StatusDisabled {
174+
// For compatibility reasons, when we need to 'set binding disabled for <stmt>',
175+
// we need to consider both the 'enabled' and 'using' status.
176+
oldStatus0 = StatusUsing
177+
oldStatus1 = StatusEnabled
178+
} else if newStatus == StatusEnabled {
179+
// In order to unify the code, two identical old statuses are set.
180+
oldStatus0 = StatusDisabled
181+
oldStatus1 = StatusDisabled
182+
}
183+
184+
defer func() {
185+
if err == nil {
186+
err = op.cache.LoadFromStorageToCache(false)
187+
}
188+
}()
189+
190+
err = callWithSCtx(op.sPool, true, func(sctx sessionctx.Context) error {
191+
// Lock mysql.bind_info to synchronize with SetBindingStatus on other tidb instances.
192+
if err = lockBindInfoTable(sctx); err != nil {
193+
return err
194+
}
195+
196+
updateTs = types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3)
197+
updateTsStr := updateTs.String()
198+
199+
_, err = exec(sctx, `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE sql_digest = %? AND update_time < %? AND status IN (%?, %?)`,
200+
newStatus, updateTsStr, sqlDigest, updateTsStr, oldStatus0, oldStatus1)
201+
return err
202+
})
203+
return
204+
}
205+
206+
// GCGlobalBinding physically removes the deleted bind records in mysql.bind_info.
207+
func (op *bindingOperator) GCGlobalBinding() (err error) {
208+
return callWithSCtx(op.sPool, true, func(sctx sessionctx.Context) error {
209+
// Lock mysql.bind_info to synchronize with CreateBinding / AddBinding / DropBinding on other tidb instances.
210+
if err = lockBindInfoTable(sctx); err != nil {
211+
return err
212+
}
213+
214+
// To make sure that all the deleted bind records have been acknowledged to all tidb,
215+
// we only garbage collect those records with update_time before 10 leases.
216+
updateTime := time.Now().Add(-(10 * Lease))
217+
updateTimeStr := types.NewTime(types.FromGoTime(updateTime), mysql.TypeTimestamp, 3).String()
218+
_, err = exec(sctx, `DELETE FROM mysql.bind_info WHERE status = 'deleted' and update_time < %?`, updateTimeStr)
219+
return err
220+
})
221+
}
222+
223+
// lockBindInfoTable simulates `LOCK TABLE mysql.bind_info WRITE` by acquiring a pessimistic lock on a
224+
// special builtin row of mysql.bind_info. Note that this function must be called with h.sctx.Lock() held.
225+
// We can replace this implementation to normal `LOCK TABLE mysql.bind_info WRITE` if that feature is
226+
// generally available later.
227+
// This lock would enforce the CREATE / DROP GLOBAL BINDING statements to be executed sequentially,
228+
// even if they come from different tidb instances.
229+
func lockBindInfoTable(sctx sessionctx.Context) error {
230+
// h.sctx already locked.
231+
_, err := exec(sctx, LockBindInfoSQL)
232+
return err
233+
}
File renamed without changes.

0 commit comments

Comments
 (0)