diff --git a/gossip/comm/comm_impl.go b/gossip/comm/comm_impl.go index 34cfc7ae12d..b9f3369cd0d 100644 --- a/gossip/comm/comm_impl.go +++ b/gossip/comm/comm_impl.go @@ -178,7 +178,9 @@ func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidT cl := proto.NewGossipClient(cc) - if _, err = cl.Ping(context.Background(), &proto.Empty{}); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), defConnTimeout) + defer cancel() + if _, err = cl.Ping(ctx, &proto.Empty{}); err != nil { cc.Close() return nil, err } @@ -276,7 +278,9 @@ func (c *commImpl) Probe(remotePeer *RemotePeer) error { } defer cc.Close() cl := proto.NewGossipClient(cc) - _, err = cl.Ping(context.Background(), &proto.Empty{}) + ctx, cancel := context.WithTimeout(context.Background(), defConnTimeout) + defer cancel() + _, err = cl.Ping(ctx, &proto.Empty{}) c.logger.Debug("Returning", err) return err } @@ -294,7 +298,9 @@ func (c *commImpl) Handshake(remotePeer *RemotePeer) (api.PeerIdentityType, erro defer cc.Close() cl := proto.NewGossipClient(cc) - if _, err = cl.Ping(context.Background(), &proto.Empty{}); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), defConnTimeout) + defer cancel() + if _, err = cl.Ping(ctx, &proto.Empty{}); err != nil { return nil, err } diff --git a/gossip/comm/comm_test.go b/gossip/comm/comm_test.go index 4102d4a8539..d9f9c849c7b 100644 --- a/gossip/comm/comm_test.go +++ b/gossip/comm/comm_test.go @@ -493,6 +493,60 @@ func TestParallelSend(t *testing.T) { assert.Equal(t, messages2Send, c) } +type nonResponsivePeer struct { + net.Listener + *grpc.Server + port int +} + +func newNonResponsivePeer() *nonResponsivePeer { + rand.Seed(time.Now().UnixNano()) + port := 50000 + rand.Intn(1000) + s, l, _, _ := createGRPCLayer(port) + nrp := &nonResponsivePeer{ + Listener: l, + Server: s, + port: port, + } + proto.RegisterGossipServer(s, nrp) + go s.Serve(l) + return nrp +} + +func (bp *nonResponsivePeer) Ping(context.Context, *proto.Empty) (*proto.Empty, error) { + time.Sleep(time.Second * 15) + return &proto.Empty{}, nil +} + +func (bp *nonResponsivePeer) GossipStream(stream proto.Gossip_GossipStreamServer) error { + return nil +} + +func (bp *nonResponsivePeer) stop() { + bp.Server.Stop() + bp.Listener.Close() +} + +func TestNonResponsivePing(t *testing.T) { + t.Parallel() + port := 50000 - rand.Intn(1000) + c, _ := newCommInstance(port, naiveSec) + defer c.Stop() + nonRespPeer := newNonResponsivePeer() + defer nonRespPeer.stop() + s := make(chan struct{}) + go func() { + c.Probe(remotePeer(nonRespPeer.port)) + s <- struct{}{} + }() + select { + case <-time.After(time.Second * 10): + assert.Fail(t, "Request wasn't cancelled on time") + case <-s: + } + +} + func TestResponses(t *testing.T) { t.Parallel() comm1, _ := newCommInstance(8611, naiveSec)