Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#48870
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
tiancaiamao authored and ti-chi-bot committed Nov 24, 2023
1 parent 5232881 commit 59d8dcd
Show file tree
Hide file tree
Showing 12 changed files with 819 additions and 17 deletions.
4 changes: 4 additions & 0 deletions br/pkg/lightning/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ go_test(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
<<<<<<< HEAD
=======
"@com_github_tikv_pd_client//http",
>>>>>>> 8eb191303ac (*: fix grpc client leak bug for AUTO_ID_CACHE=1 tables (#48870))
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_uber_go_goleak//:goleak",
Expand Down
70 changes: 70 additions & 0 deletions br/pkg/lightning/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,73 @@ func TestAllocGlobalAutoID(t *testing.T) {
require.Equal(t, c.expectAllocatorTypes, allocatorTypes, c.tableID)
}
}
<<<<<<< HEAD
=======

type mockRequirement struct {
kv.Storage
}

func (r mockRequirement) Store() kv.Storage {
return r.Storage
}

func (r mockRequirement) AutoIDClient() *autoid.ClientDiscover {
return nil
}

func TestRebaseTableAllocators(t *testing.T) {
storePath := t.TempDir()
kvStore, err := mockstore.NewMockStore(mockstore.WithPath(storePath))
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, kvStore.Close())
})
ti := newTableInfo(t, 1, 42,
"create table t42 (a int primary key nonclustered auto_increment) AUTO_ID_CACHE 1", kvStore)
allocators, err := common.GetGlobalAutoIDAlloc(mockRequirement{kvStore}, 1, ti)
require.NoError(t, err)
require.Len(t, allocators, 2)
for _, alloc := range allocators {
id, err := alloc.NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(1), id)
}
ctx := context.Background()
allocatorTypes := make([]autoid.AllocatorType, 0, len(allocators))
// rebase to 123
for _, alloc := range allocators {
require.NoError(t, alloc.Rebase(ctx, 123, false))
allocatorTypes = append(allocatorTypes, alloc.GetType())
}
require.Equal(t, []autoid.AllocatorType{autoid.AutoIncrementType, autoid.RowIDAllocType}, allocatorTypes)
// this call does nothing
require.NoError(t, common.RebaseTableAllocators(ctx, nil, mockRequirement{kvStore}, 1, ti))
for _, alloc := range allocators {
nextID, err := alloc.NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(124), nextID)
}
// this call rebase AutoIncrementType allocator to 223
require.NoError(t, common.RebaseTableAllocators(ctx, map[autoid.AllocatorType]int64{
autoid.AutoIncrementType: 223,
}, mockRequirement{kvStore}, 1, ti))
next, err := allocators[0].NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(224), next)
next, err = allocators[1].NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(124), next)
// this call rebase AutoIncrementType allocator to 323, RowIDAllocType allocator to 423
require.NoError(t, common.RebaseTableAllocators(ctx, map[autoid.AllocatorType]int64{
autoid.AutoIncrementType: 323,
autoid.RowIDAllocType: 423,
}, mockRequirement{kvStore}, 1, ti))
next, err = allocators[0].NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(324), next)
next, err = allocators[1].NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(424), next)
}
>>>>>>> 8eb191303ac (*: fix grpc client leak bug for AUTO_ID_CACHE=1 tables (#48870))
30 changes: 30 additions & 0 deletions br/pkg/lightning/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ type TableImporter struct {
alloc autoid.Allocators
logger log.Logger
kvStore tidbkv.Storage
<<<<<<< HEAD
=======
etcdCli *clientv3.Client
autoidCli *autoid.ClientDiscover

// dupIgnoreRows tracks the rowIDs of rows that are duplicated and should be ignored.
dupIgnoreRows extsort.ExternalSorter
>>>>>>> 8eb191303ac (*: fix grpc client leak bug for AUTO_ID_CACHE=1 tables (#48870))

ignoreColumns map[string]struct{}
}
Expand All @@ -89,6 +97,7 @@ func NewTableImporter(
if err != nil {
return nil, errors.Annotatef(err, "failed to tables.TableFromMeta %s", tableName)
}
autoidCli := autoid.NewClientDiscover(etcdCli)

return &TableImporter{
tableName: tableName,
Expand All @@ -98,6 +107,11 @@ func NewTableImporter(
encTable: tbl,
alloc: idAlloc,
kvStore: kvStore,
<<<<<<< HEAD
=======
etcdCli: etcdCli,
autoidCli: autoidCli,
>>>>>>> 8eb191303ac (*: fix grpc client leak bug for AUTO_ID_CACHE=1 tables (#48870))
logger: logger.With(zap.String("table", tableName)),
ignoreColumns: ignoreColumns,
}, nil
Expand Down Expand Up @@ -268,6 +282,22 @@ func (tr *TableImporter) populateChunks(ctx context.Context, rc *Controller, cp
return err
}

<<<<<<< HEAD
=======
// AutoIDRequirement implements autoid.Requirement.
var _ autoid.Requirement = &TableImporter{}

// Store implements the autoid.Requirement interface.
func (tr *TableImporter) Store() tidbkv.Storage {
return tr.kvStore
}

// AutoIDClient implements the autoid.Requirement interface.
func (tr *TableImporter) AutoIDClient() *autoid.ClientDiscover {
return tr.autoidCli
}

>>>>>>> 8eb191303ac (*: fix grpc client leak bug for AUTO_ID_CACHE=1 tables (#48870))
// RebaseChunkRowIDs rebase the row id of the chunks.
func (*TableImporter) RebaseChunkRowIDs(cp *checkpoints.TableCheckpoint, rowIDBase int64) {
if rowIDBase == 0 {
Expand Down
44 changes: 44 additions & 0 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
<<<<<<< HEAD:ddl/column.go
"github.com/pingcap/tidb/config"
sess "github.com/pingcap/tidb/ddl/internal/session"
ddlutil "github.com/pingcap/tidb/ddl/util"
Expand All @@ -52,6 +53,34 @@ import (
decoder "github.com/pingcap/tidb/util/rowDecoder"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/util/sqlexec"
=======
"github.com/pingcap/tidb/pkg/config"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/logutil"
decoder "github.com/pingcap/tidb/pkg/util/rowDecoder"
"github.com/pingcap/tidb/pkg/util/rowcodec"
"github.com/pingcap/tidb/pkg/util/sqlexec"
kvutil "github.com/tikv/client-go/v2/util"
>>>>>>> 8eb191303ac (*: fix grpc client leak bug for AUTO_ID_CACHE=1 tables (#48870)):pkg/ddl/column.go
"go.uber.org/zap"
)

Expand Down Expand Up @@ -1681,6 +1710,21 @@ func checkNewAutoRandomBits(idAccessors meta.AutoIDAccessors, oldCol *model.Colu
return nil
}

<<<<<<< HEAD:ddl/column.go
=======
type asAutoIDRequirement ddlCtx

var _ autoid.Requirement = &asAutoIDRequirement{}

func (r *asAutoIDRequirement) Store() kv.Storage {
return r.store
}

func (r *asAutoIDRequirement) AutoIDClient() *autoid.ClientDiscover {
return r.autoidCli
}

>>>>>>> 8eb191303ac (*: fix grpc client leak bug for AUTO_ID_CACHE=1 tables (#48870)):pkg/ddl/column.go
// applyNewAutoRandomBits set auto_random bits to TableInfo and
// migrate auto_increment ID to auto_random ID if possible.
func applyNewAutoRandomBits(d *ddlCtx, m *meta.Meta, dbInfo *model.DBInfo,
Expand Down
41 changes: 41 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
<<<<<<< HEAD:ddl/ddl.go
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/ingest"
sess "github.com/pingcap/tidb/ddl/internal/session"
Expand Down Expand Up @@ -64,6 +65,41 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/syncutil"
=======
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl/ingest"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/syncer"
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/disttask/framework/dispatcher"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/owner"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/binloginfo"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/statistics/handle"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/table"
pumpcli "github.com/pingcap/tidb/pkg/tidb-binlog/pump_client"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/gcutil"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/syncutil"
>>>>>>> 8eb191303ac (*: fix grpc client leak bug for AUTO_ID_CACHE=1 tables (#48870)):pkg/ddl/ddl.go
"github.com/tikv/client-go/v2/tikvrpc"
clientv3 "go.etcd.io/etcd/client/v3"
atomicutil "go.uber.org/atomic"
Expand Down Expand Up @@ -355,8 +391,12 @@ type ddlCtx struct {
statsHandle *handle.Handle
tableLockCkr util.DeadTableLockChecker
etcdCli *clientv3.Client
<<<<<<< HEAD:ddl/ddl.go
// backfillJobCh gets notification if any backfill jobs coming.
backfillJobCh chan struct{}
=======
autoidCli *autoid.ClientDiscover
>>>>>>> 8eb191303ac (*: fix grpc client leak bug for AUTO_ID_CACHE=1 tables (#48870)):pkg/ddl/ddl.go

*waitSchemaSyncedController
*schemaVersionManager
Expand Down Expand Up @@ -679,6 +719,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
infoCache: opt.InfoCache,
tableLockCkr: deadLockCkr,
etcdCli: opt.EtcdCli,
autoidCli: opt.AutoIDClient,
schemaVersionManager: newSchemaVersionManager(),
waitSchemaSyncedController: newWaitSchemaSyncedController(),
runningJobIDs: make([]string, 0, jobRecordCapacity),
Expand Down
24 changes: 19 additions & 5 deletions ddl/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@ package ddl
import (
"time"

<<<<<<< HEAD:ddl/options.go
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
=======
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/autoid"
>>>>>>> 8eb191303ac (*: fix grpc client leak bug for AUTO_ID_CACHE=1 tables (#48870)):pkg/ddl/options.go
clientv3 "go.etcd.io/etcd/client/v3"
)

Expand All @@ -27,11 +33,12 @@ type Option func(*Options)

// Options represents all the options of the DDL module needs
type Options struct {
EtcdCli *clientv3.Client
Store kv.Storage
InfoCache *infoschema.InfoCache
Hook Callback
Lease time.Duration
EtcdCli *clientv3.Client
Store kv.Storage
AutoIDClient *autoid.ClientDiscover
InfoCache *infoschema.InfoCache
Hook Callback
Lease time.Duration
}

// WithEtcdClient specifies the `clientv3.Client` of DDL used to request the etcd service
Expand All @@ -55,6 +62,13 @@ func WithInfoCache(ic *infoschema.InfoCache) Option {
}
}

// WithAutoIDClient specifies the autoid client used by the autoid service for those AUTO_ID_CACHE=1 tables.
func WithAutoIDClient(cli *autoid.ClientDiscover) Option {
return func(options *Options) {
options.AutoIDClient = cli
}
}

// WithHook specifies the `Callback` of DDL used to notify the outer module when events are triggered
func WithHook(callback Callback) Option {
return func(options *Options) {
Expand Down
Loading

0 comments on commit 59d8dcd

Please sign in to comment.