Skip to content

Commit

Permalink
Merge pull request #7966 from hashicorp/pool_improvements
Browse files Browse the repository at this point in the history
Agent connection pool cleanup
  • Loading branch information
hanshasselberg authored Jun 4, 2020
2 parents 891a002 + 1fbc1d4 commit 0f34333
Show file tree
Hide file tree
Showing 15 changed files with 65 additions and 115 deletions.
2 changes: 1 addition & 1 deletion agent/consul/auto_encrypt.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (c *Client) RequestAutoEncryptCerts(servers []string, port int, token strin
for _, ip := range ips {
addr := net.TCPAddr{IP: ip, Port: port}

if err = c.connPool.RPC(c.config.Datacenter, c.config.NodeName, &addr, 0, "AutoEncrypt.Sign", &args, &reply); err == nil {
if err = c.connPool.RPC(c.config.Datacenter, c.config.NodeName, &addr, "AutoEncrypt.Sign", &args, &reply); err == nil {
return &reply, pkPEM, nil
} else {
c.logger.Warn("AutoEncrypt failed", "error", err)
Expand Down
5 changes: 2 additions & 3 deletions agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ func NewClientLogger(config *Config, logger hclog.InterceptLogger, tlsConfigurat
MaxTime: clientRPCConnMaxIdle,
MaxStreams: clientMaxStreams,
TLSConfigurator: tlsConfigurator,
ForceTLS: config.VerifyOutgoing,
Datacenter: config.Datacenter,
}

Expand Down Expand Up @@ -310,7 +309,7 @@ TRY:
}

// Make the request.
rpcErr := c.connPool.RPC(c.config.Datacenter, server.ShortName, server.Addr, server.Version, method, args, reply)
rpcErr := c.connPool.RPC(c.config.Datacenter, server.ShortName, server.Addr, method, args, reply)
if rpcErr == nil {
return nil
}
Expand Down Expand Up @@ -358,7 +357,7 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io

// Request the operation.
var reply structs.SnapshotResponse
snap, err := SnapshotRPC(c.connPool, c.config.Datacenter, server.ShortName, server.Addr, server.UseTLS, args, in, &reply)
snap, err := SnapshotRPC(c.connPool, c.config.Datacenter, server.ShortName, server.Addr, args, in, &reply)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) {
for range servers {
time.Sleep(200 * time.Millisecond)
s := c.routers.FindServer()
ok, err := c.connPool.Ping(s.Datacenter, s.ShortName, s.Addr, s.Version)
ok, err := c.connPool.Ping(s.Datacenter, s.ShortName, s.Addr)
if !ok {
t.Errorf("Unable to ping server %v: %s", s.String(), err)
}
Expand Down
4 changes: 2 additions & 2 deletions agent/consul/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ CHECK_LEADER:
rpcErr := structs.ErrNoLeader
if leader != nil {
rpcErr = s.connPool.RPC(s.config.Datacenter, leader.ShortName, leader.Addr,
leader.Version, method, args, reply)
method, args, reply)
if rpcErr != nil && canRetry(info, rpcErr) {
goto RETRY
}
Expand Down Expand Up @@ -617,7 +617,7 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{

metrics.IncrCounterWithLabels([]string{"rpc", "cross-dc"}, 1,
[]metrics.Label{{Name: "datacenter", Value: dc}})
if err := s.connPool.RPC(dc, server.ShortName, server.Addr, server.Version, method, args, reply); err != nil {
if err := s.connPool.RPC(dc, server.ShortName, server.Addr, method, args, reply); err != nil {
manager.NotifyFailedServer(server)
s.rpcLogger().Error("RPC failed to server in DC",
"server", server.Addr,
Expand Down
1 change: 0 additions & 1 deletion agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,6 @@ func NewServerLogger(config *Config, logger hclog.InterceptLogger, tokens *token
MaxTime: serverRPCCache,
MaxStreams: serverMaxStreams,
TLSConfigurator: tlsConfigurator,
ForceTLS: config.VerifyOutgoing,
Datacenter: config.Datacenter,
}

Expand Down
2 changes: 1 addition & 1 deletion agent/consul/server_serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (s *Server) maybeBootstrap() {

// Retry with exponential backoff to get peer status from this server
for attempt := uint(0); attempt < maxPeerRetries; attempt++ {
if err := s.connPool.RPC(s.config.Datacenter, server.ShortName, server.Addr, server.Version,
if err := s.connPool.RPC(s.config.Datacenter, server.ShortName, server.Addr,
"Status.Peers", &structs.DCSpecificRequest{Datacenter: s.config.Datacenter}, &peers); err != nil {
nextRetry := (1 << attempt) * time.Second
s.logger.Error("Failed to confirm peer status for server (will retry).",
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1276,7 +1276,7 @@ func testVerifyRPC(s1, s2 *Server, t *testing.T) (bool, error) {
if leader == nil {
t.Fatal("no leader")
}
return s2.connPool.Ping(leader.Datacenter, leader.ShortName, leader.Addr, leader.Version)
return s2.connPool.Ping(leader.Datacenter, leader.ShortName, leader.Addr)
}

func TestServer_TLSToNoTLS(t *testing.T) {
Expand Down
7 changes: 3 additions & 4 deletions agent/consul/snapshot_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re
return nil, structs.ErrNoDCPath
}

snap, err := SnapshotRPC(s.connPool, dc, server.ShortName, server.Addr, server.UseTLS, args, in, reply)
snap, err := SnapshotRPC(s.connPool, dc, server.ShortName, server.Addr, args, in, reply)
if err != nil {
manager.NotifyFailedServer(server)
return nil, err
Expand All @@ -52,7 +52,7 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re
if server == nil {
return nil, structs.ErrNoLeader
}
return SnapshotRPC(s.connPool, args.Datacenter, server.ShortName, server.Addr, server.UseTLS, args, in, reply)
return SnapshotRPC(s.connPool, args.Datacenter, server.ShortName, server.Addr, args, in, reply)
}
}

Expand Down Expand Up @@ -194,14 +194,13 @@ func SnapshotRPC(
dc string,
nodeName string,
addr net.Addr,
useTLS bool,
args *structs.SnapshotRequest,
in io.Reader,
reply *structs.SnapshotResponse,
) (io.ReadCloser, error) {
// Write the snapshot RPC byte to set the mode, then perform the
// request.
conn, hc, err := connPool.DialTimeout(dc, nodeName, addr, 10*time.Second, useTLS, pool.RPCSnapshot)
conn, hc, err := connPool.DialTimeout(dc, nodeName, addr, pool.RPCSnapshot)
if err != nil {
return nil, err
}
Expand Down
16 changes: 8 additions & 8 deletions agent/consul/snapshot_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func verifySnapshot(t *testing.T, s *Server, dc, token string) {
Op: structs.SnapshotSave,
}
var reply structs.SnapshotResponse
snap, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.NodeName, s.config.RPCAddr, false,
snap, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.NodeName, s.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -121,7 +121,7 @@ func verifySnapshot(t *testing.T, s *Server, dc, token string) {

// Restore the snapshot.
args.Op = structs.SnapshotRestore
restore, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.NodeName, s.config.RPCAddr, false,
restore, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.NodeName, s.config.RPCAddr,
&args, snap, &reply)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestSnapshot_LeaderState(t *testing.T) {
Op: structs.SnapshotSave,
}
var reply structs.SnapshotResponse
snap, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.NodeName, s1.config.RPCAddr, false,
snap, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.NodeName, s1.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestSnapshot_LeaderState(t *testing.T) {

// Restore the snapshot.
args.Op = structs.SnapshotRestore
restore, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.NodeName, s1.config.RPCAddr, false,
restore, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.NodeName, s1.config.RPCAddr,
&args, snap, &reply)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestSnapshot_ACLDeny(t *testing.T) {
Op: structs.SnapshotSave,
}
var reply structs.SnapshotResponse
_, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.NodeName, s1.config.RPCAddr, false,
_, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.NodeName, s1.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply)
if !acl.IsErrPermissionDenied(err) {
t.Fatalf("err: %v", err)
Expand All @@ -282,7 +282,7 @@ func TestSnapshot_ACLDeny(t *testing.T) {
Op: structs.SnapshotRestore,
}
var reply structs.SnapshotResponse
_, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.NodeName, s1.config.RPCAddr, false,
_, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.NodeName, s1.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply)
if !acl.IsErrPermissionDenied(err) {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -391,7 +391,7 @@ func TestSnapshot_AllowStale(t *testing.T) {
Op: structs.SnapshotSave,
}
var reply structs.SnapshotResponse
_, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.NodeName, s.config.RPCAddr, false,
_, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.NodeName, s.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply)
if err == nil || !strings.Contains(err.Error(), structs.ErrNoLeader.Error()) {
t.Fatalf("err: %v", err)
Expand All @@ -408,7 +408,7 @@ func TestSnapshot_AllowStale(t *testing.T) {
Op: structs.SnapshotSave,
}
var reply structs.SnapshotResponse
_, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.NodeName, s.config.RPCAddr, false,
_, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.NodeName, s.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply)
if err == nil || !strings.Contains(err.Error(), "Raft error when taking snapshot") {
t.Fatalf("err: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/stats_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewStatsFetcher(logger hclog.Logger, pool *pool.ConnPool, datacenter string
func (f *StatsFetcher) fetch(server *metadata.Server, replyCh chan *autopilot.ServerStats) {
var args struct{}
var reply autopilot.ServerStats
err := f.pool.RPC(f.datacenter, server.ShortName, server.Addr, server.Version, "Status.RaftStats", &args, &reply)
err := f.pool.RPC(f.datacenter, server.ShortName, server.Addr, "Status.RaftStats", &args, &reply)
if err != nil {
f.logger.Warn("error getting server health from server",
"server", server.Name,
Expand Down
27 changes: 16 additions & 11 deletions agent/consul/status_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,25 @@ func insecureRPCClient(s *Server, c tlsutil.Config) (rpc.ClientCodec, error) {
if wrapper == nil {
return nil, err
}
conn, _, err := pool.DialTimeoutWithRPCTypeDirectly(
s.config.Datacenter,
s.config.NodeName,
addr,
nil,
time.Second,
true,
wrapper,
pool.RPCTLSInsecure,
pool.RPCTLSInsecure,
)
d := &net.Dialer{Timeout: time.Second}
conn, err := d.Dial("tcp", addr.String())
if err != nil {
return nil, err
}
// Switch the connection into TLS mode
if _, err = conn.Write([]byte{byte(pool.RPCTLSInsecure)}); err != nil {
conn.Close()
return nil, err
}

// Wrap the connection in a TLS client
tlsConn, err := wrapper(s.config.Datacenter, conn)
if err != nil {
conn.Close()
return nil, err
}
conn = tlsConn

return msgpackrpc.NewCodecFromHandle(true, true, conn, structs.MsgpackHandle), nil
}

Expand Down
Loading

0 comments on commit 0f34333

Please sign in to comment.