From e3c47c633d033b76ed8e9f8cfb64036d242003fb Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 6 Aug 2018 19:21:56 -0400 Subject: [PATCH 1/6] clarify --- api/cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/cluster.go b/api/cluster.go index 3dafd970e0..2cf3559e8b 100644 --- a/api/cluster.go +++ b/api/cluster.go @@ -306,7 +306,7 @@ func (s *Server) peerQuery(ctx context.Context, data cluster.Traceable, name, pa // across the cluster, except to the local peer. If any peer fails requests to // other peers are aborted. If enough peers have been heard from (based on // speculation-threshold configuration), and we are missing the others, try to -// speculatively query other members of the shard group. +// speculatively query each other member of each shard group. // ctx: request context // data: request to be submitted // name: name to be used in logging & tracing From af8923b1a0251eb53376b1188260169b389ad25d Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 6 Aug 2018 19:24:03 -0400 Subject: [PATCH 2/6] implement speculation-threshold by its spec spec says: ``` ratio of peer responses after which speculation is used. Set to 1 to disable. speculation-threshold = 1 ``` However, code would only trigger after *more* than the threshold is met. example: 4 shardgroups. threshold = 0.75 imagine after 3 shardgroups have returned, you would expect the speculation to kick in. However, the logic would flow like this: percentReceived = 1 - (1/4) = 3/4 = 0.75 if percentReceived > speculationThreshold then speculate if 0.75 > 0.75 -> no speculation!. It would wait until another shard returns (in this case, until all shard groups return) also if threshold=1, bypass speculation logic more explicitly --- api/cluster.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/api/cluster.go b/api/cluster.go index 2cf3559e8b..13b0dce56d 100644 --- a/api/cluster.go +++ b/api/cluster.go @@ -363,7 +363,12 @@ func (s *Server) peerQuerySpeculative(ctx context.Context, data cluster.Traceabl result := make(map[string]PeerResponse) - specCheckTicker := time.NewTicker(5 * time.Millisecond) + var ticker *time.Ticker + var tickChan <-chan time.Time + if speculationThreshold != 1 { + ticker = time.NewTicker(5 * time.Millisecond) + tickChan = ticker.C + } for len(pendingResponses) > 0 { select { @@ -382,12 +387,12 @@ func (s *Server) peerQuerySpeculative(ctx context.Context, data cluster.Traceabl delete(pendingResponses, resp.shardGroup) delete(originalPeers, resp.data.peer.GetName()) - case <-specCheckTicker.C: + case <-tickChan: // Check if it's time to speculate! percentReceived := 1 - (float64(len(pendingResponses)) / float64(len(peerGroups))) - if percentReceived > speculationThreshold { + if percentReceived >= speculationThreshold { // kick off speculative queries to other members now - specCheckTicker.Stop() + ticker.Stop() speculativeAttempts.Inc() for shardGroup := range pendingResponses { eligiblePeers := peerGroups[shardGroup][1:] From 8d06e781625d626d514c836ac5751a44790c1776 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 6 Aug 2018 20:46:01 -0400 Subject: [PATCH 3/6] assure ticker is always cleaned up --- api/cluster.go | 1 + 1 file changed, 1 insertion(+) diff --git a/api/cluster.go b/api/cluster.go index 13b0dce56d..b39d1c7638 100644 --- a/api/cluster.go +++ b/api/cluster.go @@ -368,6 +368,7 @@ func (s *Server) peerQuerySpeculative(ctx context.Context, data cluster.Traceabl if speculationThreshold != 1 { ticker = time.NewTicker(5 * time.Millisecond) tickChan = ticker.C + defer ticker.Stop() } for len(pendingResponses) > 0 { From dbb9bf6623e1b6897f732bc92a7b3611216d91c4 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 6 Aug 2018 20:46:22 -0400 Subject: [PATCH 4/6] simpler way to compute percentReceived --- api/cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/cluster.go b/api/cluster.go index b39d1c7638..f4337385bd 100644 --- a/api/cluster.go +++ b/api/cluster.go @@ -390,7 +390,7 @@ func (s *Server) peerQuerySpeculative(ctx context.Context, data cluster.Traceabl case <-tickChan: // Check if it's time to speculate! - percentReceived := 1 - (float64(len(pendingResponses)) / float64(len(peerGroups))) + percentReceived := float64(len(receivedResponses)) / float64(len(peerGroups)) if percentReceived >= speculationThreshold { // kick off speculative queries to other members now ticker.Stop() From 6690299a960bda14b319cc582186f8610c13613b Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Mon, 6 Aug 2018 20:46:52 -0400 Subject: [PATCH 5/6] doc fixes --- cluster/cluster.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 37fe333639..c03447fab1 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -71,7 +71,7 @@ type partitionCandidates struct { nodes []Node } -// return the list of nodes to broadcast requests to +// MembersForQuery returns the list of nodes to broadcast requests to // If partitions are assinged to nodes in groups // (a[0,1], b[0,1], c[2,3], d[2,3] as opposed to a[0,1], b[0,2], c[1,3], d[2,3]), // only 1 member per partition is returned. @@ -133,7 +133,10 @@ func MembersForQuery() ([]Node, error) { count := int(atomic.AddUint32(&counter, 1)) LOOP: + // for every partition... for _, candidates := range membersMap { + + // prefer the local node if it serves this partition if candidates.nodes[0].GetName() == thisNode.GetName() { if _, ok := selectedMembers[thisNode.GetName()]; !ok { selectedMembers[thisNode.GetName()] = struct{}{} @@ -142,14 +145,18 @@ LOOP: continue LOOP } + // for remote nodes, try to pick one we've already included + for _, n := range candidates.nodes { if _, ok := selectedMembers[n.GetName()]; ok { continue LOOP } } + // if no nodes have been selected yet then grab a node from // the set of available nodes in such a way that nodes are // weighted fairly across MembersForQuery calls + selected := candidates.nodes[count%len(candidates.nodes)] selectedMembers[selected.GetName()] = struct{}{} answer = append(answer, selected) @@ -159,7 +166,7 @@ LOOP: } // MembersForSpeculativeQuery returns a prioritized list of nodes for each shard group -// TODO - this assumes that the partition set for each node is perfectly aligned +// keyed by the first (lowest) partition of their shard group func MembersForSpeculativeQuery() (map[int32][]Node, error) { thisNode := Manager.ThisNode() allNodes := Manager.MemberList() @@ -192,7 +199,7 @@ func MembersForSpeculativeQuery() (map[int32][]Node, error) { } for _, shard := range membersMap { - // Shuffle to avoid always choosing the same peer firsts + // Shuffle to avoid always choosing the same peer first for i := len(shard) - 1; i > 0; i-- { j := rand.Intn(i + 1) shard[i], shard[j] = shard[j], shard[i] From 3428869a7ee3ed8b8408d67be9dc9df673d59616 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Tue, 7 Aug 2018 11:17:51 -0400 Subject: [PATCH 6/6] no need for pendingResponses --- api/cluster.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/api/cluster.go b/api/cluster.go index f4337385bd..a4ff36e591 100644 --- a/api/cluster.go +++ b/api/cluster.go @@ -323,7 +323,6 @@ func (s *Server) peerQuerySpeculative(ctx context.Context, data cluster.Traceabl defer cancel() originalPeers := make(map[string]struct{}, len(peerGroups)) - pendingResponses := make(map[int32]struct{}, len(peerGroups)) receivedResponses := make(map[int32]struct{}, len(peerGroups)) responses := make(chan struct { @@ -357,7 +356,6 @@ func (s *Server) peerQuerySpeculative(ctx context.Context, data cluster.Traceabl for group, peers := range peerGroups { peer := peers[0] originalPeers[peer.GetName()] = struct{}{} - pendingResponses[group] = struct{}{} go askPeer(group, peer) } @@ -371,7 +369,7 @@ func (s *Server) peerQuerySpeculative(ctx context.Context, data cluster.Traceabl defer ticker.Stop() } - for len(pendingResponses) > 0 { + for len(receivedResponses) < len(peerGroups) { select { case resp := <-responses: if _, ok := receivedResponses[resp.shardGroup]; ok { @@ -385,7 +383,6 @@ func (s *Server) peerQuerySpeculative(ctx context.Context, data cluster.Traceabl result[resp.data.peer.GetName()] = resp.data receivedResponses[resp.shardGroup] = struct{}{} - delete(pendingResponses, resp.shardGroup) delete(originalPeers, resp.data.peer.GetName()) case <-tickChan: @@ -395,8 +392,11 @@ func (s *Server) peerQuerySpeculative(ctx context.Context, data cluster.Traceabl // kick off speculative queries to other members now ticker.Stop() speculativeAttempts.Inc() - for shardGroup := range pendingResponses { - eligiblePeers := peerGroups[shardGroup][1:] + for shardGroup, peers := range peerGroups { + if _, ok := receivedResponses[shardGroup]; ok { + continue + } + eligiblePeers := peers[1:] for _, peer := range eligiblePeers { speculativeRequests.Inc() go askPeer(shardGroup, peer)