Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix runnable ingest job checking (#52503) #53578

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ go_library(
"//pkg/util/ranger",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_pebble//sstable",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_coreos_go_semver//semver",
"@com_github_docker_go_units//:go-units",
"@com_github_google_btree//:btree",
Expand Down
20 changes: 20 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/vfs"
"github.com/coreos/go-semver/semver"
"github.com/docker/go-units"
"github.com/google/uuid"
Expand Down Expand Up @@ -496,6 +497,25 @@ type Backend struct {
var _ DiskUsage = (*Backend)(nil)
var _ backend.Backend = (*Backend)(nil)

// only used in tests
type slowCreateFS struct {
vfs.FS
}

// WaitRMFolderChForTest is a channel for testing.
var WaitRMFolderChForTest = make(chan struct{})

func (s slowCreateFS) Create(name string) (vfs.File, error) {
if strings.Contains(name, "temporary") {
select {
case <-WaitRMFolderChForTest:
case <-time.After(1 * time.Second):
log.FromContext(context.Background()).Info("no one removes folder")
}
}
return s.FS.Create(name)
}

func openDuplicateDB(storeDir string) (*pebble.DB, error) {
dbPath := filepath.Join(storeDir, duplicateDBName)
// TODO: Optimize the opts for better write.
Expand Down
8 changes: 1 addition & 7 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
if err != nil {
logutil.BgLogger().Error("error when getting the ddl history count", zap.Error(err))
}
d.runningJobs.clear()
})

d.delRangeMgr = d.newDeleteRangeManager(ctxPool == nil)
Expand Down Expand Up @@ -790,13 +791,6 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
defer d.sessPool.Put(ctx)

ingest.InitGlobalLightningEnv()
d.ownerManager.SetRetireOwnerHook(func() {
// Since this instance is not DDL owner anymore, we clean up the processing job info.
if ingest.LitBackCtxMgr != nil {
ingest.LitBackCtxMgr.MarkJobFinish()
}
d.runningJobs = newRunningJobs()
})

return nil
}
Expand Down
37 changes: 37 additions & 0 deletions pkg/ddl/ddl_running_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)

type runningJobs struct {
Expand All @@ -36,6 +39,11 @@ type runningJobs struct {
// It is not necessarily being processed by a worker.
unfinishedIDs map[int64]struct{}
unfinishedSchema map[string]map[string]struct{} // database -> table -> struct{}

// processingReorgJobID records the ID of the ingest job that is being processed by a worker.
// TODO(tangenta): remove this when we support running multiple concurrent ingest jobs.
processingIngestJobID int64
lastLoggingTime time.Time
}

func newRunningJobs() *runningJobs {
Expand All @@ -46,11 +54,21 @@ func newRunningJobs() *runningJobs {
}
}

func (j *runningJobs) clear() {
j.Lock()
defer j.Unlock()
j.unfinishedIDs = make(map[int64]struct{})
j.unfinishedSchema = make(map[string]map[string]struct{})
}

func (j *runningJobs) add(job *model.Job) {
j.Lock()
defer j.Unlock()
j.processingIDs[job.ID] = struct{}{}
j.updateInternalRunningJobIDs()
if isIngestJob(job) {
j.processingIngestJobID = job.ID
}

if _, ok := j.unfinishedIDs[job.ID]; ok {
// Already exists, no need to add it again.
Expand All @@ -70,6 +88,9 @@ func (j *runningJobs) remove(job *model.Job) {
defer j.Unlock()
delete(j.processingIDs, job.ID)
j.updateInternalRunningJobIDs()
if isIngestJob(job) && job.ID == j.processingIngestJobID {
j.processingIngestJobID = 0
}

if job.IsFinished() || job.IsSynced() {
delete(j.unfinishedIDs, job.ID)
Expand Down Expand Up @@ -110,6 +131,16 @@ func (j *runningJobs) checkRunnable(job *model.Job) bool {
// Already processing by a worker. Skip running it again.
return false
}
if isIngestJob(job) && j.processingIngestJobID != 0 {
// We only allow one task to use ingest at the same time in order to limit the CPU/memory usage.
if time.Since(j.lastLoggingTime) > 1*time.Minute {
logutil.BgLogger().Info("ingest backfill worker is already in used by another DDL job",
zap.String("category", "ddl-ingest"),
zap.Int64("processing job ID", j.processingIngestJobID))
j.lastLoggingTime = time.Now()
}
return false
}
for _, info := range job.GetInvolvingSchemaInfo() {
if _, ok := j.unfinishedSchema[model.InvolvingAll]; ok {
return false
Expand All @@ -131,3 +162,9 @@ func (j *runningJobs) checkRunnable(job *model.Job) bool {
}
return true
}

func isIngestJob(job *model.Job) bool {
return (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) &&
job.ReorgMeta != nil &&
job.ReorgMeta.IsFastReorg
}
11 changes: 0 additions & 11 deletions pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/pkg/ddl/ingest"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -576,7 +575,6 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
startTime := time.Now()
defer func() {
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerFinishDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
markJobFinish(job)
}()

if jobNeedGC(job) {
Expand Down Expand Up @@ -622,15 +620,6 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
return errors.Trace(err)
}

func markJobFinish(job *model.Job) {
if (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) &&
job.ReorgMeta != nil &&
job.ReorgMeta.IsFastReorg &&
ingest.LitBackCtxMgr != nil {
ingest.LitBackCtxMgr.MarkJobFinish()
}
}

func (w *worker) writeDDLSeqNum(job *model.Job) {
w.ddlSeqNumMu.Lock()
w.ddlSeqNumMu.seqNum++
Expand Down
4 changes: 4 additions & 0 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl/copr"
Expand Down Expand Up @@ -843,6 +844,9 @@ func cleanupSortPath(ctx context.Context, currentJobID int64) error {
logutil.Logger(ctx).Warn(ingest.LitErrCleanSortPath, zap.Error(err))
return nil
}
failpoint.Inject("ownerResignAfterDispatchLoopCheck", func() {
close(local.WaitRMFolderChForTest)
})
}
}
return nil
Expand Down
39 changes: 6 additions & 33 deletions pkg/ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"fmt"
"math"
"strconv"
"sync"
"time"

"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
Expand All @@ -28,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/util/logutil"
kvutil "github.com/tikv/client-go/v2/util"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand All @@ -37,18 +37,12 @@ type BackendCtxMgr interface {
Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, pdAddr string, resourceGroupName string) (BackendCtx, error)
Unregister(jobID int64)
Load(jobID int64) (BackendCtx, bool)

MarkJobProcessing(jobID int64) (ok bool)
MarkJobFinish()
}

type litBackendCtxMgr struct {
generic.SyncMap[int64, *litBackendCtx]
memRoot MemRoot
diskRoot DiskRoot
processingJobID int64
lastLoggingTime time.Time
mu sync.Mutex
memRoot MemRoot
diskRoot DiskRoot
}

func newLitBackendCtxMgr(path string, memQuota uint64) BackendCtxMgr {
Expand All @@ -69,30 +63,6 @@ func newLitBackendCtxMgr(path string, memQuota uint64) BackendCtxMgr {
return mgr
}

// MarkJobProcessing marks ingest backfill is processing.
func (m *litBackendCtxMgr) MarkJobProcessing(jobID int64) bool {
m.mu.Lock()
defer m.mu.Unlock()
if m.processingJobID == 0 || m.processingJobID == jobID {
m.processingJobID = jobID
return true
}
if time.Since(m.lastLoggingTime) > 1*time.Minute {
logutil.BgLogger().Info("ingest backfill worker is already in used by another DDL job",
zap.String("category", "ddl-ingest"),
zap.Int64("processing job ID", m.processingJobID))
m.lastLoggingTime = time.Now()
}
return false
}

// MarkJobFinish marks ingest backfill is finished.
func (m *litBackendCtxMgr) MarkJobFinish() {
m.mu.Lock()
m.processingJobID = 0
m.mu.Unlock()
}

// CheckAvailable checks if the ingest backfill is available.
func (m *litBackendCtxMgr) CheckAvailable() (bool, error) {
if err := m.diskRoot.PreCheckUsage(); err != nil {
Expand All @@ -102,6 +72,9 @@ func (m *litBackendCtxMgr) CheckAvailable() (bool, error) {
return true, nil
}

// ResignOwnerForTest is only used for test.
var ResignOwnerForTest = atomic.NewBool(false)

// Register creates a new backend and registers it to the backend context.
func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client, pdAddr string, resourceGroupName string) (BackendCtx, error) {
bc, exist := m.Load(jobID)
Expand Down
9 changes: 0 additions & 9 deletions pkg/ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,6 @@ func NewMockBackendCtxMgr(sessCtxProvider func() sessionctx.Context) *MockBacken
}
}

// MarkJobProcessing implements BackendCtxMgr.MarkJobProcessing interface.
func (*MockBackendCtxMgr) MarkJobProcessing(_ int64) bool {
return true
}

// MarkJobFinish implements BackendCtxMgr.MarkJobFinish interface.
func (*MockBackendCtxMgr) MarkJobFinish() {
}

// CheckAvailable implements BackendCtxMgr.Available interface.
func (m *MockBackendCtxMgr) CheckAvailable() (bool, error) {
return len(m.runningJobs) == 0, nil
Expand Down
20 changes: 9 additions & 11 deletions pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,17 +224,6 @@ func (d *ddl) getReorgJob(sess *sess.Session) (*model.Job, error) {
if !d.runningJobs.checkRunnable(job) {
return false, nil
}
if (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) &&
job.State == model.JobStateQueueing &&
job.ReorgMeta != nil &&
job.ReorgMeta.IsFastReorg &&
ingest.LitBackCtxMgr != nil {
succeed := ingest.LitBackCtxMgr.MarkJobProcessing(job.ID)
if !succeed {
// We only allow one task to use ingest at the same time in order to limit the CPU/memory usage.
return false, nil
}
}
// Check if there is any block ddl running, like drop schema and flashback cluster.
sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job where "+
"(CONCAT(',', schema_ids, ',') REGEXP CONCAT(',', %s, ',') != 0 and type = %d and processing) "+
Expand Down Expand Up @@ -273,6 +262,15 @@ func (d *ddl) startDispatchLoop() {
time.Sleep(dispatchLoopWaitingDuration)
continue
}
failpoint.Inject("ownerResignAfterDispatchLoopCheck", func() {
if ingest.ResignOwnerForTest.Load() {
err2 := d.ownerManager.ResignOwner(context.Background())
if err2 != nil {
logutil.BgLogger().Info("resign meet error", zap.Error(err2))
}
ingest.ResignOwnerForTest.Store(false)
}
})
select {
case <-d.ddlJobCh:
case <-ticker.C:
Expand Down
12 changes: 1 addition & 11 deletions pkg/owner/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ type Manager interface {

// SetBeOwnerHook sets a hook. The hook is called before becoming an owner.
SetBeOwnerHook(hook func())
// SetRetireOwnerHook will be called after retiring the owner.
SetRetireOwnerHook(hook func())
}

const (
Expand Down Expand Up @@ -118,8 +116,7 @@ type ownerManager struct {
wg sync.WaitGroup
campaignCancel context.CancelFunc

beOwnerHook func()
retireOwnerHook func()
beOwnerHook func()
}

// NewOwnerManager creates a new Manager.
Expand Down Expand Up @@ -164,10 +161,6 @@ func (m *ownerManager) SetBeOwnerHook(hook func()) {
m.beOwnerHook = hook
}

func (m *ownerManager) SetRetireOwnerHook(hook func()) {
m.retireOwnerHook = hook
}

// ManagerSessionTTL is the etcd session's TTL in seconds. It's exported for testing.
var ManagerSessionTTL = 60

Expand Down Expand Up @@ -230,9 +223,6 @@ func (m *ownerManager) toBeOwner(elec *concurrency.Election) {

// RetireOwner make the manager to be a not owner.
func (m *ownerManager) RetireOwner() {
if m.retireOwnerHook != nil {
m.retireOwnerHook()
}
atomic.StorePointer(&m.elec, nil)
}

Expand Down
5 changes: 0 additions & 5 deletions pkg/owner/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,6 @@ func (m *mockManager) SetBeOwnerHook(hook func()) {
m.beOwnerHook = hook
}

// SetRetireOwnerHook implements Manager.SetRetireOwnerHook interface.
func (m *mockManager) SetRetireOwnerHook(hook func()) {
m.retireHook = hook
}

// CampaignCancel implements Manager.CampaignCancel interface
func (m *mockManager) CampaignCancel() {
m.campaignDone <- struct{}{}
Expand Down
Loading
Loading