Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Commit

Permalink
Parallelize requesting from peers in ordersync (#848)
Browse files Browse the repository at this point in the history
* Parallelized part of `GetOrders`

* Added an inner context to cut `getOrdersFromPeer` short

* Refactored ordersync to request from a new peer as soon as possible

* Improved ordersync `GetOrders` slightly

* Adjusted `maxPeersInParallel`

* Addressed review feedback from @albrow

* Switched to `sync.RWMutex`

* Addressed lingering review feedback from @albrow
  • Loading branch information
jalextowle authored Jul 1, 2020
1 parent a161d3a commit 75e90bf
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 27 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ lint: lint-go lint-ts lint-prettier

.PHONY: lint-go
lint-go:
golangci-lint run
golangci-lint run --timeout 2m


.PHONY: lint-ts
Expand Down
85 changes: 59 additions & 26 deletions core/ordersync/ordersync.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"errors"
"fmt"
"math/rand"
"sync"
"time"

"github.com/0xProject/0x-mesh/p2p"
Expand Down Expand Up @@ -44,6 +45,9 @@ const (
// approxDelay * (1 - jitter) <= actualDelay < approxDelay * (1 + jitter)
//
ordersyncJitterAmount = 0.1
// maxRequestPeersInParallel is the largest number of peers that `GetOrders`
// will try to pull orders from at once.
maxRequestPeersInParallel = 10
)

var (
Expand Down Expand Up @@ -285,50 +289,79 @@ func (s *Service) GetOrders(ctx context.Context, minPeers int) error {
default:
}

// TODO(albrow): As a performance optimization, do this for loop
// partly in parallel.
m := &sync.RWMutex{}
wg := &sync.WaitGroup{}
semaphore := make(chan struct{}, maxRequestPeersInParallel)
currentNeighbors := s.node.Neighbors()
shufflePeers(currentNeighbors)
innerCtx, cancel := context.WithCancel(ctx)
defer cancel()
for _, peerID := range currentNeighbors {
if len(successfullySyncedPeers) >= minPeers {
m.RLock()
successfullySyncedPeerLength := len(successfullySyncedPeers)
successfullySynced := successfullySyncedPeers.Contains(peerID.Pretty())
m.RUnlock()
if successfullySyncedPeerLength >= minPeers {
return nil
}
if successfullySyncedPeers.Contains(peerID.Pretty()) {
if successfullySynced {
continue
}

log.WithFields(log.Fields{
"provider": peerID.Pretty(),
}).Trace("requesting orders from neighbor via ordersync")
select {
case <-ctx.Done():
return ctx.Err()
default:
}

if err := s.getOrdersFromPeer(ctx, peerID); err != nil {
log.WithFields(log.Fields{
"error": err.Error(),
"provider": peerID.Pretty(),
}).Warn("could not get orders from peer via ordersync")
continue
} else {
// TODO(albrow): Handle case where no orders were returned from this
// peer. This could be considered a valid response, depending on the implementation
// details of the subprotocol. We need to not try them again, but also not count
// them toward the number of peers we have successfully synced with.
log.WithFields(log.Fields{
"provider": peerID.Pretty(),
}).Trace("succesfully got orders from peer via ordersync")
successfullySyncedPeers.Add(peerID.Pretty())
}
wg.Add(1)
go func(id peer.ID) {
defer wg.Done()
select {
case <-innerCtx.Done():
// NOTE(jalextowle): In this case, we haven't written to the semaphore
// so we shouldn't read from it.
return
case semaphore <- struct{}{}:
defer func() { <-semaphore }()
}
if err := s.getOrdersFromPeer(innerCtx, id); err != nil {
log.WithFields(log.Fields{
"error": err.Error(),
"provider": id.Pretty(),
}).Warn("could not get orders from peer via ordersync")
} else {
// TODO(albrow): Handle case where no orders were returned from this
// peer. This could be considered a valid response, depending on the implementation
// details of the subprotocol. We need to not try them again, but also not count
// them toward the number of peers we have successfully synced with.
log.WithFields(log.Fields{
"provider": id.Pretty(),
}).Trace("succesfully got orders from peer via ordersync")
m.Lock()
successfullySyncedPeers.Add(id.Pretty())
successfullySyncedPeerLength := len(successfullySyncedPeers)
m.Unlock()
if successfullySyncedPeerLength >= minPeers {
cancel()
}
}
}(peerID)
}

// Wait for all goroutines to exit. If the inner context has been
// cancelled, then we have successfully completed ordersync.
wg.Wait()
if innerCtx.Err() == context.Canceled {
return nil
}

delayBeforeNextRetry := retryBackoff.Duration()
m.RLock()
successfullySyncedPeerLength := len(successfullySyncedPeers)
m.RUnlock()
log.WithFields(log.Fields{
"delayBeforeNextRetry": delayBeforeNextRetry.String(),
"minPeers": minPeers,
"successfullySyncedPeers": len(successfullySyncedPeers),
"successfullySyncedPeers": successfullySyncedPeerLength,
}).Debug("ordersync could not get orders from enough peers (trying again soon)")
select {
case <-ctx.Done():
Expand Down

0 comments on commit 75e90bf

Please sign in to comment.