Skip to content

Commit

Permalink
Merge pull request #6245 from planetscale/mysqlctl-kill-connection
Browse files Browse the repository at this point in the history
mysqlctl: Fix connection leak in edge case of killConnection().
  • Loading branch information
sougou authored May 31, 2020
2 parents 8635ab8 + a8fa863 commit b8479b0
Show file tree
Hide file tree
Showing 10 changed files with 22 additions and 19 deletions.
8 changes: 4 additions & 4 deletions go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,13 +477,13 @@ func (fmd *FakeMysqlDaemon) GetAppConnection(ctx context.Context) (*dbconnpool.P
}

// GetDbaConnection is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) GetDbaConnection() (*dbconnpool.DBConnection, error) {
return dbconnpool.NewDBConnection(context.Background(), fmd.db.ConnParams())
func (fmd *FakeMysqlDaemon) GetDbaConnection(ctx context.Context) (*dbconnpool.DBConnection, error) {
return dbconnpool.NewDBConnection(ctx, fmd.db.ConnParams())
}

// GetAllPrivsConnection is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) GetAllPrivsConnection() (*dbconnpool.DBConnection, error) {
return dbconnpool.NewDBConnection(context.Background(), fmd.db.ConnParams())
func (fmd *FakeMysqlDaemon) GetAllPrivsConnection(ctx context.Context) (*dbconnpool.DBConnection, error) {
return dbconnpool.NewDBConnection(ctx, fmd.db.ConnParams())
}

// SetSemiSyncEnabled is part of the MysqlDaemon interface.
Expand Down
3 changes: 2 additions & 1 deletion go/vt/mysqlctl/metadata_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"fmt"

"golang.org/x/net/context"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -70,7 +71,7 @@ func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string,
log.Infof("Populating _vt.local_metadata table...")

// Get a non-pooled DBA connection.
conn, err := mysqld.GetDbaConnection()
conn, err := mysqld.GetDbaConnection(context.TODO())
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/mysqlctl/mysql_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ type MysqlDaemon interface {
// GetAppConnection returns a app connection to be able to talk to the database.
GetAppConnection(ctx context.Context) (*dbconnpool.PooledDBConnection, error)
// GetDbaConnection returns a dba connection.
GetDbaConnection() (*dbconnpool.DBConnection, error)
GetDbaConnection(ctx context.Context) (*dbconnpool.DBConnection, error)
// GetAllPrivsConnection returns an allprivs connection (for user with all privileges except SUPER).
GetAllPrivsConnection() (*dbconnpool.DBConnection, error)
GetAllPrivsConnection(ctx context.Context) (*dbconnpool.DBConnection, error)

// ExecuteSuperQueryList executes a list of queries, no result
ExecuteSuperQueryList(ctx context.Context, queryList []string) error
Expand Down
8 changes: 4 additions & 4 deletions go/vt/mysqlctl/mysqld.go
Original file line number Diff line number Diff line change
Expand Up @@ -1096,13 +1096,13 @@ func (mysqld *Mysqld) GetAppConnection(ctx context.Context) (*dbconnpool.PooledD
}

// GetDbaConnection creates a new DBConnection.
func (mysqld *Mysqld) GetDbaConnection() (*dbconnpool.DBConnection, error) {
return dbconnpool.NewDBConnection(context.TODO(), mysqld.dbcfgs.DbaConnector())
func (mysqld *Mysqld) GetDbaConnection(ctx context.Context) (*dbconnpool.DBConnection, error) {
return dbconnpool.NewDBConnection(ctx, mysqld.dbcfgs.DbaConnector())
}

// GetAllPrivsConnection creates a new DBConnection.
func (mysqld *Mysqld) GetAllPrivsConnection() (*dbconnpool.DBConnection, error) {
return dbconnpool.NewDBConnection(context.TODO(), mysqld.dbcfgs.AllPrivsWithDB())
func (mysqld *Mysqld) GetAllPrivsConnection(ctx context.Context) (*dbconnpool.DBConnection, error) {
return dbconnpool.NewDBConnection(ctx, mysqld.dbcfgs.AllPrivsWithDB())
}

// Close will close this instance of Mysqld. It will wait for all dba
Expand Down
6 changes: 4 additions & 2 deletions go/vt/mysqlctl/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (mysqld *Mysqld) killConnection(connID int64) error {
// Get another connection with which to kill.
// Use background context because the caller's context is likely expired,
// which is the reason we're being asked to kill the connection.
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if poolConn, connErr := getPoolReconnect(ctx, mysqld.dbaPool); connErr == nil {
// We got a pool connection.
Expand All @@ -171,10 +171,12 @@ func (mysqld *Mysqld) killConnection(connID int64) error {
// It might be because the connection pool is exhausted,
// because some connections need to be killed!
// Try to open a new connection without the pool.
killConn, connErr = mysqld.GetDbaConnection()
conn, connErr := mysqld.GetDbaConnection(ctx)
if connErr != nil {
return connErr
}
defer conn.Close()
killConn = conn
}

_, err := killConn.ExecuteFetch(fmt.Sprintf("kill %d", connID), 10000, false)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/xtrabackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, params BackupPara
return false, vterrors.New(vtrpc.Code_INVALID_ARGUMENT, "xtrabackupUser must be specified.")
}
// use a mysql connection to detect flavor at runtime
conn, err := params.Mysqld.GetDbaConnection()
conn, err := params.Mysqld.GetDbaConnection(ctx)
if conn != nil && err == nil {
defer conn.Close()
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/rpc_lock_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (agent *ActionAgent) LockTables(ctx context.Context) error {
return errors.New("tables already locked on this tablet")
}

conn, err := agent.MysqlDaemon.GetDbaConnection()
conn, err := agent.MysqlDaemon.GetDbaConnection(ctx)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/rpc_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// ExecuteFetchAsDba will execute the given query, possibly disabling binlogs and reload schema.
func (agent *ActionAgent) ExecuteFetchAsDba(ctx context.Context, query []byte, dbName string, maxrows int, disableBinlogs bool, reloadSchema bool) (*querypb.QueryResult, error) {
// get a connection
conn, err := agent.MysqlDaemon.GetDbaConnection()
conn, err := agent.MysqlDaemon.GetDbaConnection(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -72,7 +72,7 @@ func (agent *ActionAgent) ExecuteFetchAsDba(ctx context.Context, query []byte, d
// ExecuteFetchAsAllPrivs will execute the given query, possibly reloading schema.
func (agent *ActionAgent) ExecuteFetchAsAllPrivs(ctx context.Context, query []byte, dbName string, maxrows int, reloadSchema bool) (*querypb.QueryResult, error) {
// get a connection
conn, err := agent.MysqlDaemon.GetAllPrivsConnection()
conn, err := agent.MysqlDaemon.GetAllPrivsConnection(ctx)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ func TestUnicode(t *testing.T) {
}}

// We need a latin1 connection.
conn, err := env.Mysqld.GetDbaConnection()
conn, err := env.Mysqld.GetDbaConnection(context.Background())
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestStreamRowsUnicode(t *testing.T) {
defer engine.Close()

// We need a latin1 connection.
conn, err := env.Mysqld.GetDbaConnection()
conn, err := env.Mysqld.GetDbaConnection(context.Background())
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit b8479b0

Please sign in to comment.