Skip to content

Commit

Permalink
*: fix grpc client leak bug for AUTO_ID_CACHE=1 tables pingcap#48870 (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao committed Dec 6, 2023
1 parent 575187d commit 7382dc1
Show file tree
Hide file tree
Showing 17 changed files with 148 additions and 636 deletions.
3 changes: 1 addition & 2 deletions br/pkg/lightning/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ go_library(
"//br/pkg/lightning/log",
"//br/pkg/utils",
"//errno",
"//kv",
"//meta/autoid",
"//parser/model",
"//sessionctx/variable",
Expand Down Expand Up @@ -102,7 +101,7 @@ go_test(
],
embed = [":common"],
flaky = True,
shard_count = 20,
shard_count = 21,
deps = [
"//br/pkg/errors",
"//br/pkg/lightning/log",
Expand Down
67 changes: 67 additions & 0 deletions br/pkg/lightning/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,70 @@ func TestAllocGlobalAutoID(t *testing.T) {
require.Equal(t, c.expectAllocatorTypes, allocatorTypes, c.tableID)
}
}

type mockRequirement struct {
kv.Storage
}

func (r mockRequirement) Store() kv.Storage {
return r.Storage
}

func (r mockRequirement) AutoIDClient() *autoid.ClientDiscover {
return nil
}

func TestRebaseTableAllocators(t *testing.T) {
storePath := t.TempDir()
kvStore, err := mockstore.NewMockStore(mockstore.WithPath(storePath))
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, kvStore.Close())
})
ti := newTableInfo(t, 1, 42,
"create table t42 (a int primary key nonclustered auto_increment) AUTO_ID_CACHE 1", kvStore)
allocators, err := common.GetGlobalAutoIDAlloc(mockRequirement{kvStore}, 1, ti)
require.NoError(t, err)
require.Len(t, allocators, 2)
for _, alloc := range allocators {
id, err := alloc.NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(1), id)
}
ctx := context.Background()
allocatorTypes := make([]autoid.AllocatorType, 0, len(allocators))
// rebase to 123
for _, alloc := range allocators {
require.NoError(t, alloc.Rebase(ctx, 123, false))
allocatorTypes = append(allocatorTypes, alloc.GetType())
}
require.Equal(t, []autoid.AllocatorType{autoid.AutoIncrementType, autoid.RowIDAllocType}, allocatorTypes)
// this call does nothing
require.NoError(t, common.RebaseTableAllocators(ctx, nil, mockRequirement{kvStore}, 1, ti))
for _, alloc := range allocators {
nextID, err := alloc.NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(124), nextID)
}
// this call rebase AutoIncrementType allocator to 223
require.NoError(t, common.RebaseTableAllocators(ctx, map[autoid.AllocatorType]int64{
autoid.AutoIncrementType: 223,
}, mockRequirement{kvStore}, 1, ti))
next, err := allocators[0].NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(224), next)
next, err = allocators[1].NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(124), next)
// this call rebase AutoIncrementType allocator to 323, RowIDAllocType allocator to 423
require.NoError(t, common.RebaseTableAllocators(ctx, map[autoid.AllocatorType]int64{
autoid.AutoIncrementType: 323,
autoid.RowIDAllocType: 423,
}, mockRequirement{kvStore}, 1, ti))
next, err = allocators[0].NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(324), next)
next, err = allocators[1].NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(424), next)
}
18 changes: 18 additions & 0 deletions br/pkg/lightning/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type TableImporter struct {
alloc autoid.Allocators
logger log.Logger
kvStore tidbkv.Storage
etcdCli *clientv3.Client
autoidCli *autoid.ClientDiscover

ignoreColumns map[string]struct{}
}
Expand All @@ -89,6 +91,7 @@ func NewTableImporter(
if err != nil {
return nil, errors.Annotatef(err, "failed to tables.TableFromMeta %s", tableName)
}
autoidCli := autoid.NewClientDiscover(etcdCli)

return &TableImporter{
tableName: tableName,
Expand All @@ -98,6 +101,8 @@ func NewTableImporter(
encTable: tbl,
alloc: idAlloc,
kvStore: kvStore,
etcdCli: etcdCli,
autoidCli: autoidCli,
logger: logger.With(zap.String("table", tableName)),
ignoreColumns: ignoreColumns,
}, nil
Expand Down Expand Up @@ -268,6 +273,19 @@ func (tr *TableImporter) populateChunks(ctx context.Context, rc *Controller, cp
return err
}

// AutoIDRequirement implements autoid.Requirement.
var _ autoid.Requirement = &TableImporter{}

// Store implements the autoid.Requirement interface.
func (tr *TableImporter) Store() tidbkv.Storage {
return tr.kvStore
}

// AutoIDClient implements the autoid.Requirement interface.
func (tr *TableImporter) AutoIDClient() *autoid.ClientDiscover {
return tr.autoidCli
}

// RebaseChunkRowIDs rebase the row id of the chunks.
func (*TableImporter) RebaseChunkRowIDs(cp *checkpoints.TableCheckpoint, rowIDBase int64) {
if rowIDBase == 0 {
Expand Down
16 changes: 14 additions & 2 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ func (w *worker) doModifyColumnTypeWithData(
job.SnapshotVer = 0
job.SchemaState = model.StateWriteReorganization
case model.StateWriteReorganization:
tbl, err := getTable(d.store, dbInfo.ID, tblInfo)
tbl, err := getTable((*asAutoIDRequirement)(d), dbInfo.ID, tblInfo)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -1681,6 +1681,18 @@ func checkNewAutoRandomBits(idAccessors meta.AutoIDAccessors, oldCol *model.Colu
return nil
}

type asAutoIDRequirement ddlCtx

var _ autoid.Requirement = &asAutoIDRequirement{}

func (r *asAutoIDRequirement) Store() kv.Storage {
return r.store
}

func (r *asAutoIDRequirement) AutoIDClient() *autoid.ClientDiscover {
return r.autoidCli
}

// applyNewAutoRandomBits set auto_random bits to TableInfo and
// migrate auto_increment ID to auto_random ID if possible.
func applyNewAutoRandomBits(d *ddlCtx, m *meta.Meta, dbInfo *model.DBInfo,
Expand All @@ -1690,7 +1702,7 @@ func applyNewAutoRandomBits(d *ddlCtx, m *meta.Meta, dbInfo *model.DBInfo,
if !needMigrateFromAutoIncToAutoRand {
return nil
}
autoRandAlloc := autoid.NewAllocatorsFromTblInfo(d.store, dbInfo.ID, tblInfo).Get(autoid.AutoRandomType)
autoRandAlloc := autoid.NewAllocatorsFromTblInfo((*asAutoIDRequirement)(d), dbInfo.ID, tblInfo).Get(autoid.AutoRandomType)
if autoRandAlloc == nil {
errMsg := fmt.Sprintf(autoid.AutoRandomAllocatorNotFound, dbInfo.Name.O, tblInfo.Name.O)
return dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(errMsg)
Expand Down
42 changes: 2 additions & 40 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
<<<<<<< HEAD:ddl/ddl.go
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/ingest"
sess "github.com/pingcap/tidb/ddl/internal/session"
Expand All @@ -46,6 +45,7 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/parser/ast"
Expand All @@ -65,41 +65,6 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/syncutil"
=======
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl/ingest"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/syncer"
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/disttask/framework/dispatcher"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/owner"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/binloginfo"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/statistics/handle"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/table"
pumpcli "github.com/pingcap/tidb/pkg/tidb-binlog/pump_client"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/gcutil"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/syncutil"
>>>>>>> 8eb191303ac (*: fix grpc client leak bug for AUTO_ID_CACHE=1 tables (#48870)):pkg/ddl/ddl.go
"github.com/tikv/client-go/v2/tikvrpc"
clientv3 "go.etcd.io/etcd/client/v3"
atomicutil "go.uber.org/atomic"
Expand Down Expand Up @@ -391,12 +356,9 @@ type ddlCtx struct {
statsHandle *handle.Handle
tableLockCkr util.DeadTableLockChecker
etcdCli *clientv3.Client
<<<<<<< HEAD:ddl/ddl.go
autoidCli *autoid.ClientDiscover
// backfillJobCh gets notification if any backfill jobs coming.
backfillJobCh chan struct{}
=======
autoidCli *autoid.ClientDiscover
>>>>>>> 8eb191303ac (*: fix grpc client leak bug for AUTO_ID_CACHE=1 tables (#48870)):pkg/ddl/ddl.go

*waitSchemaSyncedController
*schemaVersionManager
Expand Down
7 changes: 1 addition & 6 deletions ddl/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,9 @@ package ddl
import (
"time"

<<<<<<< HEAD:ddl/options.go
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
=======
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/autoid"
>>>>>>> 8eb191303ac (*: fix grpc client leak bug for AUTO_ID_CACHE=1 tables (#48870)):pkg/ddl/options.go
"github.com/pingcap/tidb/meta/autoid"
clientv3 "go.etcd.io/etcd/client/v3"
)

Expand Down
1 change: 1 addition & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ go_library(
"//keyspace",
"//kv",
"//meta",
"//meta/autoid",
"//metrics",
"//owner",
"//parser/ast",
Expand Down
59 changes: 2 additions & 57 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/br/pkg/streamhelper/daemon"
<<<<<<< HEAD:domain/domain.go
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/placement"
Expand All @@ -53,6 +52,7 @@ import (
"github.com/pingcap/tidb/keyspace"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -84,62 +84,6 @@ import (
"github.com/pingcap/tidb/util/servermemorylimit"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/syncutil"
=======
"github.com/pingcap/tidb/pkg/bindinfo"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/ddl/placement"
"github.com/pingcap/tidb/pkg/ddl/schematracker"
ddlutil "github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/disttask/framework/dispatcher"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/domain/globalconfigsync"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/domain/resourcegroup"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/infoschema"
infoschema_metrics "github.com/pingcap/tidb/pkg/infoschema/metrics"
"github.com/pingcap/tidb/pkg/infoschema/perfschema"
"github.com/pingcap/tidb/pkg/keyspace"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/owner"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/privilege/privileges"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/sessionstates"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics/handle"
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
"github.com/pingcap/tidb/pkg/store/helper"
"github.com/pingcap/tidb/pkg/telemetry"
"github.com/pingcap/tidb/pkg/ttl/ttlworker"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/dbterror"
disttaskutil "github.com/pingcap/tidb/pkg/util/disttask"
"github.com/pingcap/tidb/pkg/util/domainutil"
"github.com/pingcap/tidb/pkg/util/engine"
"github.com/pingcap/tidb/pkg/util/etcd"
"github.com/pingcap/tidb/pkg/util/expensivequery"
"github.com/pingcap/tidb/pkg/util/gctuner"
"github.com/pingcap/tidb/pkg/util/globalconn"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/memoryusagealarm"
"github.com/pingcap/tidb/pkg/util/replayer"
"github.com/pingcap/tidb/pkg/util/servermemorylimit"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/pingcap/tidb/pkg/util/sqlkiller"
"github.com/pingcap/tidb/pkg/util/syncutil"
>>>>>>> 8eb191303ac (*: fix grpc client leak bug for AUTO_ID_CACHE=1 tables (#48870)):pkg/domain/domain.go
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/transaction"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -1150,6 +1094,7 @@ func (do *Domain) Init(
etcd.SetEtcdCliByNamespace(cli, keyspace.MakeKeyspaceEtcdNamespace(do.store.GetCodec()))

do.etcdClient = cli
do.autoidClient = autoid.NewClientDiscover(cli)

do.autoidClient = autoid.NewClientDiscover(cli)

Expand Down
11 changes: 6 additions & 5 deletions executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,9 @@ func (ti *TableImporter) PopulateChunks(ctx context.Context) (map[int32]*checkpo
ti.logger.Error("close etcd client error", zap.Error(err))
}
}()
autoidCli := autoid.NewClientDiscover(etcdCli)

r := &asAutoIDRequirement{ti.kvStore, etcdCli}
r := &asAutoIDRequirement{ti.kvStore, autoidCli}
// todo: the new base should be the max row id of the last Node if we support distributed import.
if err = common.RebaseGlobalAutoID(ctx, 0, r, ti.dbID, ti.tableInfo.Core); err != nil {
return nil, errors.Trace(err)
Expand All @@ -412,8 +413,8 @@ func (ti *TableImporter) PopulateChunks(ctx context.Context) (map[int32]*checkpo
}

type asAutoIDRequirement struct {
kvStore tidbkv.Storage
etcdCli *clientv3.Client
kvStore tidbkv.Storage
autoidCli *autoid.ClientDiscover
}

var _ autoid.Requirement = &asAutoIDRequirement{}
Expand All @@ -422,8 +423,8 @@ func (r *asAutoIDRequirement) Store() tidbkv.Storage {
return r.kvStore
}

func (r *asAutoIDRequirement) GetEtcdClient() *clientv3.Client {
return r.etcdCli
func (r *asAutoIDRequirement) AutoIDClient() *autoid.ClientDiscover {
return r.autoidCli
}

func (ti *TableImporter) rebaseChunkRowID(rowIDBase int64) {
Expand Down
1 change: 0 additions & 1 deletion infoschema/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ go_test(
"@com_github_prometheus_prometheus//promql",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//testutils",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//:grpc",
"@org_uber_go_goleak//:goleak",
],
Expand Down
Loading

0 comments on commit 7382dc1

Please sign in to comment.