@@ -528,6 +528,7 @@ type shardResponse struct {
528
528
type shardState struct {
529
529
shard int32 // shard ID
530
530
remainingPeers []cluster.Node // peers that we have not sent a query to yet
531
+ inflight int // number of requests in flight
531
532
}
532
533
533
534
// AskPeer issues the query on the next peer, if available, and return it
@@ -537,6 +538,7 @@ func (state *shardState) AskPeer(ctx context.Context, fn fetchFunc, responses ch
537
538
}
538
539
peer := state .remainingPeers [0 ]
539
540
state .remainingPeers = state .remainingPeers [1 :]
541
+ state .inflight ++
540
542
go state .askPeer (ctx , peer , fn , responses )
541
543
return peer , true
542
544
}
@@ -612,6 +614,8 @@ func queryPeers(ctx context.Context, peerGroups map[int32][]cluster.Node, name s
612
614
//request canceled
613
615
return
614
616
case resp := <- responses :
617
+ states [resp .shardGroup ].inflight --
618
+
615
619
if _ , ok := receivedResponses [resp .shardGroup ]; ok {
616
620
// already received this response (possibly speculatively)
617
621
continue
@@ -624,8 +628,11 @@ func queryPeers(ctx context.Context, peerGroups map[int32][]cluster.Node, name s
624
628
speculativeRequests .Inc ()
625
629
continue
626
630
}
627
- // No more peers to try. Cancel the reqCtx, which will cancel all in-flight
628
- // requests.
631
+ // if there is another request in-flight for this shardGroup, then we can wait for that
632
+ if states [resp .shardGroup ].inflight > 0 {
633
+ continue
634
+ }
635
+ // we're out of options. Cancel the reqCtx, which will cancel all in-flight requests.
629
636
cancel ()
630
637
errorChan <- resp .err
631
638
return
0 commit comments