Skip to content

Commit 9ba0779

Browse files
D3Hunterhawkingrei
authored andcommitted
ddl: reuse backend for DXF subtasks of same step (pingcap#59165)
ref pingcap#57229, ref pingcap#57497
1 parent 145dcae commit 9ba0779

16 files changed

+197
-91
lines changed

pkg/ddl/backfilling.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727

2828
"github.com/pingcap/errors"
2929
"github.com/pingcap/failpoint"
30+
"github.com/pingcap/tidb/pkg/config"
3031
"github.com/pingcap/tidb/pkg/ddl/ingest"
3132
"github.com/pingcap/tidb/pkg/ddl/logutil"
3233
sess "github.com/pingcap/tidb/pkg/ddl/session"
@@ -36,6 +37,7 @@ import (
3637
"github.com/pingcap/tidb/pkg/expression/exprctx"
3738
"github.com/pingcap/tidb/pkg/expression/exprstatic"
3839
"github.com/pingcap/tidb/pkg/kv"
40+
"github.com/pingcap/tidb/pkg/lightning/backend/local"
3941
"github.com/pingcap/tidb/pkg/meta/model"
4042
"github.com/pingcap/tidb/pkg/metrics"
4143
"github.com/pingcap/tidb/pkg/parser/ast"
@@ -735,9 +737,21 @@ func (dc *ddlCtx) addIndexWithLocalIngest(
735737
hasUnique = hasUnique || indexInfo.Unique
736738
}
737739

740+
var (
741+
cfg *local.BackendConfig
742+
bd *local.Backend
743+
err error
744+
)
745+
if config.GetGlobalConfig().Store == config.StoreTypeTiKV {
746+
cfg, bd, err = ingest.CreateLocalBackend(ctx, dc.store, job, false)
747+
if err != nil {
748+
return errors.Trace(err)
749+
}
750+
defer bd.Close()
751+
}
738752
bcCtx, err := ingest.NewBackendCtxBuilder(ctx, dc.store, job).
739753
WithCheckpointManagerParam(sessPool, reorgInfo.PhysicalTableID).
740-
Build()
754+
Build(cfg, bd)
741755
if err != nil {
742756
return errors.Trace(err)
743757
}

pkg/ddl/backfilling_import_cloud.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/pingcap/tidb/pkg/kv"
2525
"github.com/pingcap/tidb/pkg/lightning/backend"
2626
"github.com/pingcap/tidb/pkg/lightning/backend/external"
27+
"github.com/pingcap/tidb/pkg/lightning/backend/local"
2728
"github.com/pingcap/tidb/pkg/lightning/common"
2829
"github.com/pingcap/tidb/pkg/lightning/config"
2930
"github.com/pingcap/tidb/pkg/meta/model"
@@ -40,6 +41,7 @@ type cloudImportExecutor struct {
4041
ptbl table.PhysicalTable
4142
cloudStoreURI string
4243
backendCtx ingest.BackendCtx
44+
backend *local.Backend
4345
}
4446

4547
func newCloudImportExecutor(
@@ -60,10 +62,16 @@ func newCloudImportExecutor(
6062

6163
func (m *cloudImportExecutor) Init(ctx context.Context) error {
6264
logutil.Logger(ctx).Info("cloud import executor init subtask exec env")
63-
bCtx, err := ingest.NewBackendCtxBuilder(ctx, m.store, m.job).Build()
65+
cfg, bd, err := ingest.CreateLocalBackend(ctx, m.store, m.job, false)
6466
if err != nil {
67+
return errors.Trace(err)
68+
}
69+
bCtx, err := ingest.NewBackendCtxBuilder(ctx, m.store, m.job).Build(cfg, bd)
70+
if err != nil {
71+
bd.Close()
6572
return err
6673
}
74+
m.backend = bd
6775
m.backendCtx = bCtx
6876
return nil
6977
}
@@ -159,5 +167,6 @@ func (m *cloudImportExecutor) Cleanup(ctx context.Context) error {
159167
if m.backendCtx != nil {
160168
m.backendCtx.Close()
161169
}
170+
m.backend.Close()
162171
return nil
163172
}

pkg/ddl/backfilling_read_index.go

+19-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
"github.com/pingcap/errors"
2626
"github.com/pingcap/failpoint"
27+
"github.com/pingcap/tidb/pkg/config"
2728
"github.com/pingcap/tidb/pkg/ddl/ingest"
2829
"github.com/pingcap/tidb/pkg/ddl/logutil"
2930
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
@@ -32,6 +33,7 @@ import (
3233
"github.com/pingcap/tidb/pkg/disttask/operator"
3334
"github.com/pingcap/tidb/pkg/kv"
3435
"github.com/pingcap/tidb/pkg/lightning/backend/external"
36+
"github.com/pingcap/tidb/pkg/lightning/backend/local"
3537
"github.com/pingcap/tidb/pkg/meta/model"
3638
"github.com/pingcap/tidb/pkg/metrics"
3739
"github.com/pingcap/tidb/pkg/table"
@@ -54,6 +56,8 @@ type readIndexExecutor struct {
5456
curRowCount *atomic.Int64
5557

5658
subtaskSummary sync.Map // subtaskID => readIndexSummary
59+
backendCfg *local.BackendConfig
60+
backend *local.Backend
5761
}
5862

5963
type readIndexSummary struct {
@@ -82,8 +86,17 @@ func newReadIndexExecutor(
8286
}, nil
8387
}
8488

85-
func (*readIndexExecutor) Init(_ context.Context) error {
89+
func (r *readIndexExecutor) Init(ctx context.Context) error {
8690
logutil.DDLLogger().Info("read index executor init subtask exec env")
91+
cfg := config.GetGlobalConfig()
92+
if cfg.Store == config.StoreTypeTiKV {
93+
cfg, bd, err := ingest.CreateLocalBackend(ctx, r.d.store, r.job, false)
94+
if err != nil {
95+
return errors.Trace(err)
96+
}
97+
r.backendCfg = cfg
98+
r.backend = bd
99+
}
87100
return nil
88101
}
89102

@@ -119,7 +132,7 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
119132
// TODO(tangenta): support checkpoint manager that interact with subtask table.
120133
bCtx, err := ingest.NewBackendCtxBuilder(ctx, r.d.store, r.job).
121134
WithImportDistributedLock(r.d.etcdCli, sm.TS).
122-
Build()
135+
Build(r.backendCfg, r.backend)
123136
if err != nil {
124137
return err
125138
}
@@ -151,8 +164,11 @@ func (r *readIndexExecutor) RealtimeSummary() *execute.SubtaskSummary {
151164
}
152165
}
153166

154-
func (*readIndexExecutor) Cleanup(ctx context.Context) error {
167+
func (r *readIndexExecutor) Cleanup(ctx context.Context) error {
155168
tidblogutil.Logger(ctx).Info("read index executor cleanup subtask exec env")
169+
if r.backend != nil {
170+
r.backend.Close()
171+
}
156172
return nil
157173
}
158174

pkg/ddl/index.go

+24-7
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"github.com/pingcap/tidb/pkg/infoschema"
4747
"github.com/pingcap/tidb/pkg/kv"
4848
"github.com/pingcap/tidb/pkg/lightning/backend"
49+
"github.com/pingcap/tidb/pkg/lightning/backend/local"
4950
litconfig "github.com/pingcap/tidb/pkg/lightning/config"
5051
"github.com/pingcap/tidb/pkg/meta"
5152
"github.com/pingcap/tidb/pkg/meta/metabuild"
@@ -2440,25 +2441,41 @@ func (w *worker) addTableIndex(
24402441
}
24412442

24422443
func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo *reorgInfo, store kv.Storage) (err error) {
2443-
var bc ingest.BackendCtx
2444+
var (
2445+
backendCtx ingest.BackendCtx
2446+
cfg *local.BackendConfig
2447+
backend *local.Backend
2448+
)
2449+
defer func() {
2450+
if backendCtx != nil {
2451+
backendCtx.Close()
2452+
}
2453+
if backend != nil {
2454+
backend.Close()
2455+
}
2456+
}()
24442457
for _, elem := range reorgInfo.elements {
24452458
indexInfo := model.FindIndexInfoByID(t.Meta().Indices, elem.ID)
24462459
if indexInfo == nil {
24472460
return errors.New("unexpected error, can't find index info")
24482461
}
24492462
if indexInfo.Unique {
24502463
ctx := tidblogutil.WithCategory(ctx, "ddl-ingest")
2451-
if bc == nil {
2452-
bc, err = ingest.NewBackendCtxBuilder(ctx, store, reorgInfo.Job).
2464+
if backendCtx == nil {
2465+
if config.GetGlobalConfig().Store == config.StoreTypeTiKV {
2466+
cfg, backend, err = ingest.CreateLocalBackend(ctx, store, reorgInfo.Job, true)
2467+
if err != nil {
2468+
return errors.Trace(err)
2469+
}
2470+
}
2471+
backendCtx, err = ingest.NewBackendCtxBuilder(ctx, store, reorgInfo.Job).
24532472
ForDuplicateCheck().
2454-
Build()
2473+
Build(cfg, backend)
24552474
if err != nil {
24562475
return err
24572476
}
2458-
//nolint:revive,all_revive
2459-
defer bc.Close()
24602477
}
2461-
err = bc.CollectRemoteDuplicateRows(indexInfo.ID, t)
2478+
err = backendCtx.CollectRemoteDuplicateRows(indexInfo.ID, t)
24622479
if err != nil {
24632480
return err
24642481
}

pkg/ddl/ingest/BUILD.bazel

-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ go_library(
5555
"@com_github_tikv_client_go_v2//tikv",
5656
"@com_github_tikv_client_go_v2//util",
5757
"@com_github_tikv_pd_client//:client",
58-
"@com_github_tikv_pd_client//servicediscovery",
5958
"@io_etcd_go_etcd_client_v3//:client",
6059
"@org_golang_x_exp//maps",
6160
"@org_uber_go_atomic//:atomic",

pkg/ddl/ingest/backend.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ type litBackendCtx struct {
102102
memRoot MemRoot
103103
jobID int64
104104
tbl table.Table
105+
// litBackendCtx doesn't manage the lifecycle of backend, caller should do it.
105106
backend *local.Backend
106107
ctx context.Context
107108
cfg *local.BackendConfig
@@ -319,14 +320,14 @@ func (bc *litBackendCtx) unsafeImportAndReset(ctx context.Context, ei *engineInf
319320
}
320321

321322
// ForceSyncFlagForTest is a flag to force sync only for test.
322-
var ForceSyncFlagForTest = false
323+
var ForceSyncFlagForTest atomic.Bool
323324

324325
func (bc *litBackendCtx) checkFlush() (shouldFlush bool, shouldImport bool) {
325326
failpoint.Inject("forceSyncFlagForTest", func() {
326327
// used in a manual test
327-
ForceSyncFlagForTest = true
328+
ForceSyncFlagForTest.Store(true)
328329
})
329-
if ForceSyncFlagForTest {
330+
if ForceSyncFlagForTest.Load() {
330331
return true, true
331332
}
332333
LitDiskRoot.UpdateUsage()
@@ -359,7 +360,6 @@ func (bc *litBackendCtx) Close() {
359360
logutil.Logger(bc.ctx).Info(LitInfoCloseBackend, zap.Int64("jobID", bc.jobID),
360361
zap.Int64("current memory usage", LitMemRoot.CurrentUsage()),
361362
zap.Int64("max memory quota", LitMemRoot.MaxMemoryQuota()))
362-
bc.backend.Close()
363363
LitDiskRoot.Remove(bc.jobID)
364364
BackendCounterForTest.Dec()
365365
}

pkg/ddl/ingest/backend_mgr.go

+49-45
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import (
3434
"github.com/pingcap/tidb/pkg/util/intest"
3535
"github.com/pingcap/tidb/pkg/util/logutil"
3636
"github.com/tikv/client-go/v2/tikv"
37-
sd "github.com/tikv/pd/client/servicediscovery"
3837
clientv3 "go.etcd.io/etcd/client/v3"
3938
"go.uber.org/atomic"
4039
"go.uber.org/zap"
@@ -94,29 +93,16 @@ func (b *BackendCtxBuilder) ForDuplicateCheck() *BackendCtxBuilder {
9493
var BackendCounterForTest = atomic.Int64{}
9594

9695
// Build builds a BackendCtx.
97-
func (b *BackendCtxBuilder) Build() (BackendCtx, error) {
96+
func (b *BackendCtxBuilder) Build(cfg *local.BackendConfig, bd *local.Backend) (BackendCtx, error) {
9897
ctx, store, job := b.ctx, b.store, b.job
99-
sortPath, err := GenIngestTempDataDir()
98+
jobSortPath, err := genJobSortPath(job.ID, b.checkDup)
10099
if err != nil {
101100
return nil, err
102101
}
103-
jobSortPath := filepath.Join(sortPath, encodeBackendTag(job.ID, b.checkDup))
104102
intest.Assert(job.Type == model.ActionAddPrimaryKey ||
105103
job.Type == model.ActionAddIndex)
106104
intest.Assert(job.ReorgMeta != nil)
107105

108-
resGroupName := job.ReorgMeta.ResourceGroupName
109-
concurrency := job.ReorgMeta.GetConcurrency()
110-
maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeed()
111-
hasUnique, err := hasUniqueIndex(job)
112-
if err != nil {
113-
return nil, err
114-
}
115-
cfg, err := genConfig(ctx, jobSortPath, LitMemRoot, hasUnique, resGroupName, concurrency, maxWriteSpeed)
116-
if err != nil {
117-
logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", job.ID), zap.Error(err))
118-
return nil, err
119-
}
120106
failpoint.Inject("beforeCreateLocalBackend", func() {
121107
ResignOwnerForTest.Store(true)
122108
})
@@ -141,45 +127,41 @@ func (b *BackendCtxBuilder) Build() (BackendCtx, error) {
141127
return mockBackend, nil
142128
}
143129

144-
discovery := pdCli.GetServiceDiscovery()
145-
bd, err := createLocalBackend(ctx, cfg, discovery)
146-
if err != nil {
147-
logutil.Logger(ctx).Error(LitErrCreateBackendFail, zap.Int64("job ID", job.ID), zap.Error(err))
148-
return nil, err
149-
}
150-
151130
bCtx := newBackendContext(ctx, job.ID, bd, cfg,
152131
defaultImportantVariables, LitMemRoot, b.etcdClient, job.RealStartTS, b.importTS, cpMgr)
153132

154-
logutil.Logger(ctx).Info(LitInfoCreateBackend, zap.Int64("job ID", job.ID),
155-
zap.Int64("current memory usage", LitMemRoot.CurrentUsage()),
156-
zap.Int64("max memory quota", LitMemRoot.MaxMemoryQuota()),
157-
zap.Bool("has unique index", hasUnique))
158-
159133
LitDiskRoot.Add(job.ID, bCtx)
160134
BackendCounterForTest.Add(1)
161135
return bCtx, nil
162136
}
163137

164-
func hasUniqueIndex(job *model.Job) (bool, error) {
165-
args, err := model.GetModifyIndexArgs(job)
138+
func genJobSortPath(jobID int64, checkDup bool) (string, error) {
139+
sortPath, err := GenIngestTempDataDir()
166140
if err != nil {
167-
return false, errors.Trace(err)
141+
return "", err
168142
}
143+
return filepath.Join(sortPath, encodeBackendTag(jobID, checkDup)), nil
144+
}
169145

170-
for _, a := range args.IndexArgs {
171-
if a.Unique {
172-
return true, nil
173-
}
146+
// CreateLocalBackend creates a local backend for adding index.
147+
func CreateLocalBackend(ctx context.Context, store kv.Storage, job *model.Job, checkDup bool) (*local.BackendConfig, *local.Backend, error) {
148+
jobSortPath, err := genJobSortPath(job.ID, checkDup)
149+
if err != nil {
150+
return nil, nil, err
174151
}
175-
return false, nil
176-
}
152+
intest.Assert(job.Type == model.ActionAddPrimaryKey ||
153+
job.Type == model.ActionAddIndex)
154+
intest.Assert(job.ReorgMeta != nil)
155+
156+
resGroupName := job.ReorgMeta.ResourceGroupName
157+
concurrency := job.ReorgMeta.GetConcurrency()
158+
maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeed()
159+
hasUnique, err := hasUniqueIndex(job)
160+
if err != nil {
161+
return nil, nil, err
162+
}
163+
cfg := genConfig(ctx, jobSortPath, LitMemRoot, hasUnique, resGroupName, concurrency, maxWriteSpeed)
177164

178-
func createLocalBackend(
179-
ctx context.Context,
180-
cfg *local.BackendConfig,
181-
pdSvcDiscovery sd.ServiceDiscovery,
182-
) (*local.Backend, error) {
183165
tidbCfg := config.GetGlobalConfig()
184166
tls, err := common.NewTLS(
185167
tidbCfg.Security.ClusterSSLCA,
@@ -190,13 +172,35 @@ func createLocalBackend(
190172
)
191173
if err != nil {
192174
logutil.Logger(ctx).Error(LitErrCreateBackendFail, zap.Error(err))
193-
return nil, err
175+
return nil, nil, err
194176
}
195177

196178
ddllogutil.DDLIngestLogger().Info("create local backend for adding index",
197179
zap.String("sortDir", cfg.LocalStoreDir),
198-
zap.String("keyspaceName", cfg.KeyspaceName))
199-
return local.NewBackend(ctx, tls, *cfg, pdSvcDiscovery)
180+
zap.String("keyspaceName", cfg.KeyspaceName),
181+
zap.Int64("job ID", job.ID),
182+
zap.Int64("current memory usage", LitMemRoot.CurrentUsage()),
183+
zap.Int64("max memory quota", LitMemRoot.MaxMemoryQuota()),
184+
zap.Bool("has unique index", hasUnique))
185+
186+
//nolint: forcetypeassert
187+
pdCli := store.(tikv.Storage).GetRegionCache().PDClient()
188+
be, err := local.NewBackend(ctx, tls, *cfg, pdCli.GetServiceDiscovery())
189+
return cfg, be, err
190+
}
191+
192+
func hasUniqueIndex(job *model.Job) (bool, error) {
193+
args, err := model.GetModifyIndexArgs(job)
194+
if err != nil {
195+
return false, errors.Trace(err)
196+
}
197+
198+
for _, a := range args.IndexArgs {
199+
if a.Unique {
200+
return true, nil
201+
}
202+
}
203+
return false, nil
200204
}
201205

202206
const checkpointUpdateInterval = 10 * time.Minute

0 commit comments

Comments
 (0)