Skip to content

Commit

Permalink
Fix locking in getConnection
Browse files Browse the repository at this point in the history
Signed-off-by: Henry Robinson <hrobinson@slack-corp.com>
  • Loading branch information
henryr committed Apr 10, 2024
1 parent de9cbc1 commit ba5be77
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 11 deletions.
24 changes: 16 additions & 8 deletions go/vt/vtgateproxy/vtgateproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sync"

"google.golang.org/grpc"

"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/grpcclient"
Expand Down Expand Up @@ -59,32 +60,35 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt
// If the connection exists, return it
proxy.mu.RLock()
existingConn := proxy.targetConns[target]
proxy.mu.RUnlock()

if existingConn != nil {
proxy.mu.RUnlock()
log.V(100).Infof("Reused connection for %v\n", target)
return existingConn, nil
}

// No luck, need to create a new one. Serialize new additions so we don't create multiple
// for a given target.
log.V(100).Infof("Need to create connection for %v\n", target)
proxy.mu.RUnlock()

proxy.mu.Lock()
defer proxy.mu.Unlock()

// Otherwise create a new connection after dropping the lock, allowing multiple requests to
// race to create the conn for now.
grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) {
return append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`)), nil
})
// Check again in case conn was made between lock acquisitions.
existingConn = proxy.targetConns[target]
if existingConn != nil {
log.V(100).Infof("Reused connection for %v\n", target)
return existingConn, nil
}

// Otherwise create a new connection.
conn, err := vtgateconn.DialProtocol(ctx, "grpc", target)
if err != nil {
return nil, err
}

log.V(100).Infof("Created new connection for %v\n", target)
proxy.targetConns[target] = conn
proxy.mu.Unlock()

return conn, nil
}
Expand Down Expand Up @@ -174,4 +178,8 @@ func (proxy *VTGateProxy) StreamExecute(ctx context.Context, session *vtgateconn
}

func Init() {
log.V(100).Infof("Registering GRPC dial options")
grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) {
return append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`)), nil
})
}
6 changes: 3 additions & 3 deletions misc/git/pre-commit
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ if [ -z "$GIT_DIR" ]; then
GIT_DIR="${DIR}/.."
fi

for hook in $GIT_DIR/../misc/git/hooks/*; do
$hook
done
#for hook in $GIT_DIR/../misc/git/hooks/*; do
# $hook
#done

0 comments on commit ba5be77

Please sign in to comment.