@@ -528,6 +528,7 @@ type shardResponse struct {
528
528
type shardState struct {
529
529
shard int32 // shard ID
530
530
remainingPeers []cluster.Node // peers that we have not sent a query to yet
531
+ inflight int // number of requests in flight
531
532
}
532
533
533
534
// AskPeer issues the query on the next peer, if available, and returns it
@@ -537,6 +538,7 @@ func (state *shardState) AskPeer(ctx context.Context, fn fetchFunc, responses ch
537
538
}
538
539
peer := state .remainingPeers [0 ]
539
540
state .remainingPeers = state .remainingPeers [1 :]
541
+ state .inflight ++
540
542
go state .askPeer (ctx , peer , fn , responses )
541
543
return peer , true
542
544
}
@@ -611,6 +613,8 @@ func queryPeers(ctx context.Context, peerGroups map[int32][]cluster.Node, name s
611
613
//request canceled
612
614
return
613
615
case resp := <- responses :
616
+ states [resp .shardGroup ].inflight --
617
+
614
618
if _ , ok := receivedResponses [resp .shardGroup ]; ok {
615
619
// already received this response (possibly speculatively)
616
620
continue
@@ -623,8 +627,11 @@ func queryPeers(ctx context.Context, peerGroups map[int32][]cluster.Node, name s
623
627
speculativeRequests .Inc ()
624
628
continue
625
629
}
626
- // No more peers to try. Cancel the reqCtx, which will cancel all in-flight
627
- // requests.
630
+ // if there is another request in-flight for this shardGroup, then we can wait for that
631
+ if states [resp .shardGroup ].inflight > 0 {
632
+ continue
633
+ }
634
+ // we're out of options. Cancel the reqCtx, which will cancel all in-flight requests.
628
635
cancel ()
629
636
errorChan <- resp .err
630
637
return
0 commit comments