Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Agent connection pool cleanup #7966

Merged
merged 3 commits into from
Jun 4, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent/consul/auto_encrypt.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (c *Client) RequestAutoEncryptCerts(servers []string, port int, token strin
for _, ip := range ips {
addr := net.TCPAddr{IP: ip, Port: port}

if err = c.connPool.RPC(c.config.Datacenter, c.config.NodeName, &addr, 0, "AutoEncrypt.Sign", &args, &reply); err == nil {
if err = c.connPool.RPC(c.config.Datacenter, c.config.NodeName, &addr, "AutoEncrypt.Sign", &args, &reply); err == nil {
return &reply, pkPEM, nil
} else {
c.logger.Warn("AutoEncrypt failed", "error", err)
Expand Down
5 changes: 2 additions & 3 deletions agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ func NewClientLogger(config *Config, logger hclog.InterceptLogger, tlsConfigurat
MaxTime: clientRPCConnMaxIdle,
MaxStreams: clientMaxStreams,
TLSConfigurator: tlsConfigurator,
ForceTLS: config.VerifyOutgoing,
Datacenter: config.Datacenter,
}

Expand Down Expand Up @@ -308,7 +307,7 @@ TRY:
}

// Make the request.
rpcErr := c.connPool.RPC(c.config.Datacenter, server.ShortName, server.Addr, server.Version, method, args, reply)
rpcErr := c.connPool.RPC(c.config.Datacenter, server.ShortName, server.Addr, method, args, reply)
if rpcErr == nil {
return nil
}
Expand Down Expand Up @@ -356,7 +355,7 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io

// Request the operation.
var reply structs.SnapshotResponse
snap, err := SnapshotRPC(c.connPool, c.config.Datacenter, server.ShortName, server.Addr, server.UseTLS, args, in, &reply)
snap, err := SnapshotRPC(c.connPool, c.config.Datacenter, server.ShortName, server.Addr, args, in, &reply)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) {
for range servers {
time.Sleep(200 * time.Millisecond)
s := c.routers.FindServer()
ok, err := c.connPool.Ping(s.Datacenter, s.ShortName, s.Addr, s.Version)
ok, err := c.connPool.Ping(s.Datacenter, s.ShortName, s.Addr)
if !ok {
t.Errorf("Unable to ping server %v: %s", s.String(), err)
}
Expand Down
4 changes: 2 additions & 2 deletions agent/consul/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ CHECK_LEADER:
rpcErr := structs.ErrNoLeader
if leader != nil {
rpcErr = s.connPool.RPC(s.config.Datacenter, leader.ShortName, leader.Addr,
leader.Version, method, args, reply)
method, args, reply)
if rpcErr != nil && canRetry(info, rpcErr) {
goto RETRY
}
Expand Down Expand Up @@ -617,7 +617,7 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{

metrics.IncrCounterWithLabels([]string{"rpc", "cross-dc"}, 1,
[]metrics.Label{{Name: "datacenter", Value: dc}})
if err := s.connPool.RPC(dc, server.ShortName, server.Addr, server.Version, method, args, reply); err != nil {
if err := s.connPool.RPC(dc, server.ShortName, server.Addr, method, args, reply); err != nil {
manager.NotifyFailedServer(server)
s.rpcLogger().Error("RPC failed to server in DC",
"server", server.Addr,
Expand Down
1 change: 0 additions & 1 deletion agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,6 @@ func NewServerLogger(config *Config, logger hclog.InterceptLogger, tokens *token
MaxTime: serverRPCCache,
MaxStreams: serverMaxStreams,
TLSConfigurator: tlsConfigurator,
ForceTLS: config.VerifyOutgoing,
Datacenter: config.Datacenter,
}

Expand Down
2 changes: 1 addition & 1 deletion agent/consul/server_serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (s *Server) maybeBootstrap() {

// Retry with exponential backoff to get peer status from this server
for attempt := uint(0); attempt < maxPeerRetries; attempt++ {
if err := s.connPool.RPC(s.config.Datacenter, server.ShortName, server.Addr, server.Version,
if err := s.connPool.RPC(s.config.Datacenter, server.ShortName, server.Addr,
"Status.Peers", &structs.DCSpecificRequest{Datacenter: s.config.Datacenter}, &peers); err != nil {
nextRetry := (1 << attempt) * time.Second
s.logger.Error("Failed to confirm peer status for server (will retry).",
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1277,7 +1277,7 @@ func testVerifyRPC(s1, s2 *Server, t *testing.T) (bool, error) {
if leader == nil {
t.Fatal("no leader")
}
return s2.connPool.Ping(leader.Datacenter, leader.ShortName, leader.Addr, leader.Version)
return s2.connPool.Ping(leader.Datacenter, leader.ShortName, leader.Addr)
}

func TestServer_TLSToNoTLS(t *testing.T) {
Expand Down
7 changes: 3 additions & 4 deletions agent/consul/snapshot_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re
return nil, structs.ErrNoDCPath
}

snap, err := SnapshotRPC(s.connPool, dc, server.ShortName, server.Addr, server.UseTLS, args, in, reply)
snap, err := SnapshotRPC(s.connPool, dc, server.ShortName, server.Addr, args, in, reply)
if err != nil {
manager.NotifyFailedServer(server)
return nil, err
Expand All @@ -52,7 +52,7 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re
if server == nil {
return nil, structs.ErrNoLeader
}
return SnapshotRPC(s.connPool, args.Datacenter, server.ShortName, server.Addr, server.UseTLS, args, in, reply)
return SnapshotRPC(s.connPool, args.Datacenter, server.ShortName, server.Addr, args, in, reply)
}
}

Expand Down Expand Up @@ -194,14 +194,13 @@ func SnapshotRPC(
dc string,
nodeName string,
addr net.Addr,
useTLS bool,
args *structs.SnapshotRequest,
in io.Reader,
reply *structs.SnapshotResponse,
) (io.ReadCloser, error) {
// Write the snapshot RPC byte to set the mode, then perform the
// request.
conn, hc, err := connPool.DialTimeout(dc, nodeName, addr, 10*time.Second, useTLS, pool.RPCSnapshot)
conn, hc, err := connPool.DialTimeout(dc, nodeName, addr, 10*time.Second, pool.RPCSnapshot)
if err != nil {
return nil, err
}
Expand Down
16 changes: 8 additions & 8 deletions agent/consul/snapshot_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func verifySnapshot(t *testing.T, s *Server, dc, token string) {
Op: structs.SnapshotSave,
}
var reply structs.SnapshotResponse
snap, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.NodeName, s.config.RPCAddr, false,
snap, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.NodeName, s.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -121,7 +121,7 @@ func verifySnapshot(t *testing.T, s *Server, dc, token string) {

// Restore the snapshot.
args.Op = structs.SnapshotRestore
restore, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.NodeName, s.config.RPCAddr, false,
restore, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.NodeName, s.config.RPCAddr,
&args, snap, &reply)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestSnapshot_LeaderState(t *testing.T) {
Op: structs.SnapshotSave,
}
var reply structs.SnapshotResponse
snap, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.NodeName, s1.config.RPCAddr, false,
snap, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.NodeName, s1.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestSnapshot_LeaderState(t *testing.T) {

// Restore the snapshot.
args.Op = structs.SnapshotRestore
restore, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.NodeName, s1.config.RPCAddr, false,
restore, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.NodeName, s1.config.RPCAddr,
&args, snap, &reply)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestSnapshot_ACLDeny(t *testing.T) {
Op: structs.SnapshotSave,
}
var reply structs.SnapshotResponse
_, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.NodeName, s1.config.RPCAddr, false,
_, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.NodeName, s1.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply)
if !acl.IsErrPermissionDenied(err) {
t.Fatalf("err: %v", err)
Expand All @@ -282,7 +282,7 @@ func TestSnapshot_ACLDeny(t *testing.T) {
Op: structs.SnapshotRestore,
}
var reply structs.SnapshotResponse
_, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.NodeName, s1.config.RPCAddr, false,
_, err := SnapshotRPC(s1.connPool, s1.config.Datacenter, s1.config.NodeName, s1.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply)
if !acl.IsErrPermissionDenied(err) {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -391,7 +391,7 @@ func TestSnapshot_AllowStale(t *testing.T) {
Op: structs.SnapshotSave,
}
var reply structs.SnapshotResponse
_, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.NodeName, s.config.RPCAddr, false,
_, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.NodeName, s.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply)
if err == nil || !strings.Contains(err.Error(), structs.ErrNoLeader.Error()) {
t.Fatalf("err: %v", err)
Expand All @@ -408,7 +408,7 @@ func TestSnapshot_AllowStale(t *testing.T) {
Op: structs.SnapshotSave,
}
var reply structs.SnapshotResponse
_, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.NodeName, s.config.RPCAddr, false,
_, err := SnapshotRPC(s.connPool, s.config.Datacenter, s.config.NodeName, s.config.RPCAddr,
&args, bytes.NewReader([]byte("")), &reply)
if err == nil || !strings.Contains(err.Error(), "Raft error when taking snapshot") {
t.Fatalf("err: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/stats_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewStatsFetcher(logger hclog.Logger, pool *pool.ConnPool, datacenter string
func (f *StatsFetcher) fetch(server *metadata.Server, replyCh chan *autopilot.ServerStats) {
var args struct{}
var reply autopilot.ServerStats
err := f.pool.RPC(f.datacenter, server.ShortName, server.Addr, server.Version, "Status.RaftStats", &args, &reply)
err := f.pool.RPC(f.datacenter, server.ShortName, server.Addr, "Status.RaftStats", &args, &reply)
if err != nil {
f.logger.Warn("error getting server health from server",
"server", server.Name,
Expand Down
27 changes: 16 additions & 11 deletions agent/consul/status_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,25 @@ func insecureRPCClient(s *Server, c tlsutil.Config) (rpc.ClientCodec, error) {
if wrapper == nil {
return nil, err
}
conn, _, err := pool.DialTimeoutWithRPCTypeDirectly(
s.config.Datacenter,
s.config.NodeName,
addr,
nil,
time.Second,
true,
wrapper,
pool.RPCTLSInsecure,
pool.RPCTLSInsecure,
)
d := &net.Dialer{Timeout: time.Second}
conn, err := d.Dial("tcp", addr.String())
if err != nil {
return nil, err
}
// Switch the connection into TLS mode
if _, err = conn.Write([]byte{byte(pool.RPCTLSInsecure)}); err != nil {
conn.Close()
return nil, err
}

// Wrap the connection in a TLS client
tlsConn, err := wrapper(s.config.Datacenter, conn)
if err != nil {
conn.Close()
return nil, err
}
conn = tlsConn

return msgpackrpc.NewCodecFromHandle(true, true, conn, structs.MsgpackHandle), nil
}

Expand Down
Loading