Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp committed Jan 23, 2024
1 parent b27b1e9 commit cdc4a1e
Show file tree
Hide file tree
Showing 12 changed files with 47 additions and 9 deletions.
3 changes: 3 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,9 @@ func createClientWithKeyspace(
nil, keyspaceID, c.svrUrls, c.tlsCfg, c.option)
if err := c.setup(); err != nil {
c.cancel()
if c.pdSvcDiscovery != nil {
c.pdSvcDiscovery.Close()
}
return nil, err
}

Expand Down
7 changes: 7 additions & 0 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ type client struct {
callerID string
respHandler respHandleFunc
bo *retry.Backoffer
// defaultSD indicates whether the client is created with the default service discovery.
defaultSD bool
}

// ClientOption configures the HTTP client.
Expand Down Expand Up @@ -303,12 +305,17 @@ func NewClient(
return nil
}
c.inner.init(sd)
c.defaultSD = true
return c
}

// Close gracefully closes the HTTP client.
func (c *client) Close() {
c.inner.close()
// only close the service discovery if it's created by the client.
if c.defaultSD && c.inner.sd != nil {
c.inner.sd.Close()
}
log.Info("[pd] http client closed", zap.String("source", c.inner.source))
}

Expand Down
2 changes: 2 additions & 0 deletions client/pd_service_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ func (suite *serviceClientTestSuite) TearDownTest() {
func (suite *serviceClientTestSuite) TearDownSuite() {
suite.leaderServer.grpcServer.GracefulStop()
suite.followerServer.grpcServer.GracefulStop()
suite.leaderClient.GetClientConn().Close()
suite.followerClient.GetClientConn().Close()
suite.clean()
}

Expand Down
5 changes: 0 additions & 5 deletions client/testutil/leak.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,4 @@ var LeakOptions = []goleak.Option{
goleak.IgnoreTopFunction("google.golang.org/grpc.(*addrConn).createTransport"),
goleak.IgnoreTopFunction("google.golang.org/grpc.(*addrConn).resetTransport"),
goleak.IgnoreTopFunction("google.golang.org/grpc.(*Server).handleRawConn"),
// TODO: remove the below options once we fixed the http connection leak problems
goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*controlBuffer).get"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*http2Server).keepalive"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"),
}
19 changes: 17 additions & 2 deletions tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,21 @@ func TestClientClusterIDCheck(t *testing.T) {
defer cluster2.Destroy()
endpoints2 := runServer(re, cluster2)
// Try to create a client with the mixed endpoints.
_, err = pd.NewClientWithContext(
cli, err := pd.NewClientWithContext(
ctx, append(endpoints1, endpoints2...),
pd.SecurityOption{}, pd.WithMaxErrorRetry(1),
)
re.Error(err)
defer cli.Close()
re.Contains(err.Error(), "unmatched cluster id")
// updateMember should fail due to unmatched cluster ID found.
re.NoError(failpoint.Enable("github.com/tikv/pd/client/skipClusterIDCheck", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/client/skipFirstUpdateMember", `return(true)`))
_, err = pd.NewClientWithContext(ctx, []string{endpoints1[0], endpoints2[0]},
cli, err = pd.NewClientWithContext(ctx, []string{endpoints1[0], endpoints2[0]},
pd.SecurityOption{}, pd.WithMaxErrorRetry(1),
)
re.Error(err)
defer cli.Close()
re.Contains(err.Error(), "ErrClientGetMember")
re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipFirstUpdateMember"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipClusterIDCheck"))
Expand All @@ -105,6 +107,7 @@ func TestClientLeaderChange(t *testing.T) {

endpoints := runServer(re, cluster)
cli := setupCli(re, ctx, endpoints)
defer cli.Close()
innerCli, ok := cli.(interface{ GetServiceDiscovery() pd.ServiceDiscovery })
re.True(ok)

Expand Down Expand Up @@ -165,6 +168,7 @@ func TestLeaderTransfer(t *testing.T) {

endpoints := runServer(re, cluster)
cli := setupCli(re, ctx, endpoints)
defer cli.Close()

var lastTS uint64
testutil.Eventually(re, func() bool {
Expand Down Expand Up @@ -254,6 +258,7 @@ func TestTSOAllocatorLeader(t *testing.T) {
allocatorLeaderMap[dcLocation] = pdName
}
cli := setupCli(re, ctx, endpoints)
defer cli.Close()
innerCli, ok := cli.(interface{ GetServiceDiscovery() pd.ServiceDiscovery })
re.True(ok)

Expand Down Expand Up @@ -287,7 +292,9 @@ func TestTSOFollowerProxy(t *testing.T) {

endpoints := runServer(re, cluster)
cli1 := setupCli(re, ctx, endpoints)
defer cli1.Close()
cli2 := setupCli(re, ctx, endpoints)
defer cli2.Close()
cli2.UpdateOption(pd.EnableTSOFollowerProxy, true)

var wg sync.WaitGroup
Expand Down Expand Up @@ -325,6 +332,7 @@ func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) {

endpoints := runServer(re, cluster)
cli := setupCli(re, ctx, endpoints)
defer cli.Close()

var wg sync.WaitGroup
var maxUnavailableTime, leaderReadyTime time.Time
Expand Down Expand Up @@ -397,6 +405,7 @@ func TestGlobalAndLocalTSO(t *testing.T) {

endpoints := runServer(re, cluster)
cli := setupCli(re, ctx, endpoints)
defer cli.Close()

// Wait for all nodes becoming healthy.
time.Sleep(time.Second * 5)
Expand Down Expand Up @@ -508,6 +517,7 @@ func TestCustomTimeout(t *testing.T) {

endpoints := runServer(re, cluster)
cli := setupCli(re, ctx, endpoints, pd.WithCustomTimeoutOption(time.Second))
defer cli.Close()

start := time.Now()
re.NoError(failpoint.Enable("github.com/tikv/pd/server/customTimeout", "return(true)"))
Expand Down Expand Up @@ -581,6 +591,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionByFollowerForwardin
defer cancel()

cli := setupCli(re, ctx, suite.endpoints, pd.WithForwardingOption(true))
defer cli.Close()
re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork1", "return(true)"))
time.Sleep(200 * time.Millisecond)
r, err := cli.GetRegion(context.Background(), []byte("a"))
Expand All @@ -600,6 +611,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTsoByFollowerForwarding1(
ctx, cancel := context.WithCancel(suite.ctx)
defer cancel()
cli := setupCli(re, ctx, suite.endpoints, pd.WithForwardingOption(true))
defer cli.Close()

re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)"))
var lastTS uint64
Expand Down Expand Up @@ -634,6 +646,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTsoByFollowerForwarding2(
ctx, cancel := context.WithCancel(suite.ctx)
defer cancel()
cli := setupCli(re, ctx, suite.endpoints, pd.WithForwardingOption(true))
defer cli.Close()

re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)"))
var lastTS uint64
Expand Down Expand Up @@ -670,6 +683,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTsoAndRegionByFollowerFor
re.NoError(failpoint.Enable("github.com/tikv/pd/client/grpcutil/unreachableNetwork2", fmt.Sprintf("return(\"%s\")", follower.GetAddr())))

cli := setupCli(re, ctx, suite.endpoints, pd.WithForwardingOption(true))
defer cli.Close()
var lastTS uint64
testutil.Eventually(re, func() bool {
physical, logical, err := cli.GetTS(context.TODO())
Expand Down Expand Up @@ -732,6 +746,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionFromFollower() {

cluster := suite.cluster
cli := setupCli(re, ctx, suite.endpoints)
defer cli.Close()
cli.UpdateOption(pd.EnableFollowerHandle, true)
re.NotEmpty(cluster.WaitLeader())
leader := cluster.GetLeaderServer()
Expand Down
5 changes: 3 additions & 2 deletions tests/integrations/client/client_tls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,13 @@ func testTLSReload(
CertPath: testClientTLSInfo.CertFile,
KeyPath: testClientTLSInfo.KeyFile,
}, pd.WithGRPCDialOptions(grpc.WithBlock()))
cli.Close()
if err != nil {
errc <- err
dcancel()
return
}
dcancel()
cli.Close()
}
}()

Expand Down Expand Up @@ -212,12 +212,13 @@ func testTLSReload(
caData, certData, keyData := loadTLSContent(re,
testClientTLSInfo.TrustedCAFile, testClientTLSInfo.CertFile, testClientTLSInfo.KeyFile)
ctx1, cancel1 := context.WithTimeout(ctx, 2*time.Second)
_, err = pd.NewClientWithContext(ctx1, endpoints, pd.SecurityOption{
cli, err = pd.NewClientWithContext(ctx1, endpoints, pd.SecurityOption{
SSLCABytes: caData,
SSLCertBytes: certData,
SSLKEYBytes: keyData,
}, pd.WithGRPCDialOptions(grpc.WithBlock()))
re.NoError(err)
defer cli.Close()
cancel1()
}

Expand Down
1 change: 1 addition & 0 deletions tests/integrations/client/gc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (suite *gcClientTestSuite) TearDownSuite() {
re := suite.Require()
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/gc/checkKeyspace"))
suite.cleanup()
suite.client.Close()
}

func (suite *gcClientTestSuite) TearDownTest() {
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/client/global_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (suite *globalConfigTestSuite) SetupSuite() {
func (suite *globalConfigTestSuite) TearDownSuite() {
suite.client.Close()
suite.cleanup()
suite.client.Close()
}

func (suite *globalConfigTestSuite) GetEtcdPath(configPath string) string {
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/client/http_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,7 @@ func (suite *httpClientTestSuite) TestRedirectWithMetrics() {
env := suite.env[defaultServiceDiscovery]

cli := setupCli(suite.Require(), env.ctx, env.endpoints)
defer cli.Close()
sd := cli.GetServiceDiscovery()

metricCnt := prometheus.NewCounterVec(
Expand Down
6 changes: 6 additions & 0 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByDefaultKeysp
mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, func() {
time.Sleep(3 * time.Second)
})
for _, client := range clients {
client.Close()
}
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKeyspaceGroups() {
Expand Down Expand Up @@ -232,6 +235,9 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe
mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, func() {
time.Sleep(3 * time.Second)
})
for _, client := range clients {
client.Close()
}
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() {
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ func checkTSOPath(re *require.Assertions, isAPIServiceMode bool) {
defer cleanup()

cli := mcs.SetupClientWithAPIContext(ctx, re, pd.NewAPIContextV2(""), []string{backendEndpoints})
defer cli.Close()
physical, logical, err := cli.GetTS(ctx)
re.NoError(err)
ts := tsoutil.ComposeTS(physical, logical)
Expand Down
5 changes: 5 additions & 0 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ func (suite *tsoClientTestSuite) TearDownSuite() {
suite.tsoCluster.Destroy()
}
suite.cluster.Destroy()
for _, client := range suite.clients {
client.Close()
}
}

func (suite *tsoClientTestSuite) TestGetTS() {
Expand Down Expand Up @@ -252,6 +255,7 @@ func (suite *tsoClientTestSuite) TestDiscoverTSOServiceWithLegacyPath() {
defer cancel()
client := mcs.SetupClientWithKeyspaceID(
ctx, re, keyspaceID, strings.Split(suite.backendEndpoints, ","))
defer client.Close()
var lastTS uint64
for j := 0; j < tsoRequestRound; j++ {
physical, logical, err := client.GetTS(ctx)
Expand Down Expand Up @@ -491,6 +495,7 @@ func TestUpgradingAPIandTSOClusters(t *testing.T) {
pdClient, err := pd.NewClientWithContext(context.Background(),
[]string{backendEndpoints}, pd.SecurityOption{}, pd.WithMaxErrorRetry(1))
re.NoError(err)
defer pdClient.Close()

// Create a TSO cluster which has 2 servers
tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, backendEndpoints)
Expand Down

0 comments on commit cdc4a1e

Please sign in to comment.