-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
grpc: ensure that streaming gRPC requests work over mesh gateway based wan federation #10838
Conversation
@@ -4524,6 +4527,9 @@ LOOP: | |||
} | |||
|
|||
// This is a mirror of a similar test in agent/consul/server_test.go | |||
// | |||
// TODO(rb): implement something similar to this as a full containerized test suite with proper |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling out this TODO, since I could not find a way to do the "firewalling" of the servers from each other in-process.
@@ -24,8 +24,10 @@ type Deps struct { | |||
type GRPCClientConner interface { | |||
ClientConn(datacenter string) (*grpc.ClientConn, error) | |||
ClientConnLeader() (*grpc.ClientConn, error) | |||
SetGatewayResolver(func(string) string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This grpc client pool is created in agent/setup.go
universally for servers and clients, but only servers will eventually create a GatewayResolver for themselves in agent/consul/server.go
. For the regular connection pool for RPCs the gw resolver field is set after the struct was created (but before it's used). I did the same here to avoid having to heavily refactor the agent construction code.
@@ -293,7 +293,7 @@ func (s *Server) handleNativeTLS(conn net.Conn) { | |||
s.handleSnapshotConn(tlsConn) | |||
|
|||
case pool.ALPN_RPCGRPC: | |||
s.grpcHandler.Handle(conn) | |||
s.grpcHandler.Handle(tlsConn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was an existing bug likely due to a simple copypaste gone awry. All of the other branches take a tlsConn
instead of a conn
to be on the correct side of the multiplex envelope.
@@ -390,6 +394,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) { | |||
s.config.PrimaryDatacenter, | |||
) | |||
s.connPool.GatewayResolver = s.gatewayLocator.PickGateway | |||
s.grpcConnPool.SetGatewayResolver(s.gatewayLocator.PickGateway) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's the late-set mentioned earlier.
} | ||
|
||
return tlsConn, nil | ||
func (t *Transport) dial(dc, nodeName, nextProto string) (net.Conn, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of ending up with 3 similar copies of the same function structure I refactored stuff so that now there's only one implementation of this dialing approach shared by all 3 locations.
s.lock.RLock() | ||
defer s.lock.RUnlock() | ||
|
||
for _, server := range s.servers { | ||
if server.Addr.String() == addr { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Formerly addr
was just the bare ip address of the server. When using wanfed over mesh gateways it's technically completely fine for a single server in each datacenter to have the same ip address, so it's no longer ok to just attempt to lookup a server based on the ip address alone.
Similar to how the grpc resolver logic prefixes the server IDs with the datacenter, I'm just prefixing these addrs with the datacenter since we also control the dialing side and can decode these as necessary before they actually get used to open sockets.
RPCMultiplexV2 = 4 | ||
RPCSnapshot = 5 | ||
RPCGossip = 6 | ||
RPCRaft RPCType = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since an iota
isn't used here, only RPCConsul
had the type RPCType
. Something I did in this PR surfaced this issue so I just fixed it in place.
if tcp, ok := rawConn.(*net.TCPConn); ok { | ||
_ = tcp.SetKeepAlive(true) | ||
_ = tcp.SetNoDelay(true) | ||
if nextProto != ALPN_RPCGRPC { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought I saw a reference in the grpc setup code to keepalives being controlled elsewhere, so since this wasn't being done in the existing grpc pool code I opted to keep this section off for grpc.
7b790e3
to
2ea563c
Compare
…d wan federation Fixes #10796
957554d
to
1193269
Compare
return conn, err | ||
} | ||
|
||
d := net.Dialer{LocalAddr: cfg.SrcAddr, Timeout: pool.DefaultDialTimeout} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the non-gateway version of grpc also have a non-infinite dial timeout like all of the others.
) | ||
|
||
const defaultDialTimeout = 10 * time.Second | ||
const DefaultDialTimeout = 10 * time.Second |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exported this so it could be used from the agent/grpc
package as well.
gwAddr := gatewayResolver(dc) | ||
if gwAddr == "" { | ||
return nil, nil, structs.ErrDCNotAvailable | ||
} | ||
|
||
dialer := &net.Dialer{LocalAddr: src, Timeout: defaultDialTimeout} | ||
dialer := &net.Dialer{LocalAddr: srcAddr, Timeout: DefaultDialTimeout} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All rpc types get a 10s timeout now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
…lient-to-server so this does not make sense to do
@freddygv i added a test in |
🍒 If backport labels were added before merging, cherry-picking will start automatically. To retroactively trigger a backport after merging, add backport labels and re-run https://circleci.com/gh/hashicorp/consul/432909. |
…eway based wan federation Backport of #10838 to 1.10.x
…eway based wan federation Backport of #10838 to 1.10.x
Fixes #10796