From 61b3ade5ec6e3b1cc4d025ea4f5ed46039ea2457 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 20 May 2019 17:31:15 -0700 Subject: [PATCH 1/2] feat(options): add ignore freezing option Add an option to ignore freezing when removing tasks, as well as a configuration infrastructure --- peertaskqueue.go | 58 ++++++++++++++++++++----- peertaskqueue_test.go | 99 +++++++++++++++++++++++++++++++++++-------- 2 files changed, 130 insertions(+), 27 deletions(-) diff --git a/peertaskqueue.go b/peertaskqueue.go index 5597378..1d49ebf 100644 --- a/peertaskqueue.go +++ b/peertaskqueue.go @@ -14,19 +14,55 @@ import ( // to execute the block with the highest priority, or otherwise the one added // first if priorities are equal. type PeerTaskQueue struct { - lock sync.Mutex - pQueue pq.PQ - peerTrackers map[peer.ID]*peertracker.PeerTracker - frozenPeers map[peer.ID]struct{} + lock sync.Mutex + pQueue pq.PQ + peerTrackers map[peer.ID]*peertracker.PeerTracker + frozenPeers map[peer.ID]struct{} + ignoreFreezing bool +} + +// Option is a function that configures the peer task queue +type Option func(*PeerTaskQueue) Option + +func chain(firstOption Option, secondOption Option) Option { + return func(ptq *PeerTaskQueue) Option { + firstReverse := firstOption(ptq) + secondReverse := secondOption(ptq) + return chain(secondReverse, firstReverse) + } +} + +// IgnoreFreezing is an option that can make the task queue ignore freezing and unfreezing +func IgnoreFreezing(ignoreFreezing bool) Option { + return func(ptq *PeerTaskQueue) Option { + previous := ptq.ignoreFreezing + ptq.ignoreFreezing = ignoreFreezing + return IgnoreFreezing(previous) + } } // New creates a new PeerTaskQueue -func New() *PeerTaskQueue { - return &PeerTaskQueue{ +func New(options ...Option) *PeerTaskQueue { + ptq := &PeerTaskQueue{ peerTrackers: make(map[peer.ID]*peertracker.PeerTracker), frozenPeers: make(map[peer.ID]struct{}), pQueue: pq.New(peertracker.PeerCompare), } + ptq.Options(options...) + return ptq +} + +// Options uses configuration functions to configure the peer task queue. +// It returns an Option that can be called to reverse the changes. +func (ptq *PeerTaskQueue) Options(options ...Option) Option { + if len(options) == 0 { + return nil + } + if len(options) == 1 { + return options[0](ptq) + } + reverse := options[0](ptq) + return chain(ptq.Options(options[1:]...), reverse) } // PushBlock adds a new block of tasks for the given peer to the queue @@ -81,11 +117,13 @@ func (ptq *PeerTaskQueue) Remove(identifier peertask.Identifier, p peer.ID) { // block we were about to send them, we should wait a short period of time // to make sure we receive any other in-flight cancels before sending // them a block they already potentially have - if !peerTracker.IsFrozen() { - ptq.frozenPeers[p] = struct{}{} - } + if !ptq.ignoreFreezing { + if !peerTracker.IsFrozen() { + ptq.frozenPeers[p] = struct{}{} + } - peerTracker.Freeze() + peerTracker.Freeze() + } ptq.pQueue.Update(peerTracker.Index()) } ptq.lock.Unlock() diff --git a/peertaskqueue_test.go b/peertaskqueue_test.go index f73a339..02ad14b 100644 --- a/peertaskqueue_test.go +++ b/peertaskqueue_test.go @@ -70,8 +70,7 @@ func TestPushPop(t *testing.T) { } } -// This test checks that peers wont starve out other peers -func TestPeerRepeats(t *testing.T) { +func TestFreezeUnfreeze(t *testing.T) { ptq := New() peers := testutil.GeneratePeers(4) a := peers[0] @@ -90,26 +89,69 @@ func TestPeerRepeats(t *testing.T) { } // now, pop off four tasks, there should be one from each - var targets []string - var tasks []*peertask.TaskBlock - for i := 0; i < 4; i++ { - t := ptq.PopBlock() - targets = append(targets, t.Target.Pretty()) - tasks = append(tasks, t) + matchNTasks(t, ptq, 4, a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty()) + + ptq.Remove(peertask.Task{Identifier: "1"}, b) + + // b should be frozen, causing it to get skipped in the rotation + matchNTasks(t, ptq, 3, a.Pretty(), c.Pretty(), d.Pretty()) + + ptq.ThawRound() + + matchNTasks(t, ptq, 1, b.Pretty()) + +} + +func TestFreezeUnfreezeNoFreezingOption(t *testing.T) { + ptq := New(IgnoreFreezing(true)) + peers := testutil.GeneratePeers(4) + a := peers[0] + b := peers[1] + c := peers[2] + d := peers[3] + + // Have each push some blocks + + for i := 0; i < 5; i++ { + is := fmt.Sprint(i) + ptq.PushBlock(a, peertask.Task{Identifier: is}) + ptq.PushBlock(b, peertask.Task{Identifier: is}) + ptq.PushBlock(c, peertask.Task{Identifier: is}) + ptq.PushBlock(d, peertask.Task{Identifier: is}) } - expected := []string{a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty()} - sort.Strings(expected) - sort.Strings(targets) + // now, pop off four tasks, there should be one from each + matchNTasks(t, ptq, 4, a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty()) - t.Log(targets) - t.Log(expected) - for i, s := range targets { - if expected[i] != s { - t.Fatal("unexpected peer", s, expected[i]) - } + ptq.Remove(peertask.Task{Identifier: "1"}, b) + + // b should be frozen, causing it to get skipped in the rotation + matchNTasks(t, ptq, 4, a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty()) + +} + +// This test checks that peers wont starve out other peers +func TestPeerRepeats(t *testing.T) { + ptq := New() + peers := testutil.GeneratePeers(4) + a := peers[0] + b := peers[1] + c := peers[2] + d := peers[3] + + // Have each push some blocks + + for i := 0; i < 5; i++ { + is := fmt.Sprint(i) + ptq.PushBlock(a, peertask.Task{Identifier: is}) + ptq.PushBlock(b, peertask.Task{Identifier: is}) + ptq.PushBlock(c, peertask.Task{Identifier: is}) + ptq.PushBlock(d, peertask.Task{Identifier: is}) } + // now, pop off four tasks, there should be one from each + tasks := matchNTasks(t, ptq, 4, a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty()) + // Now, if one of the tasks gets finished, the next task off the queue should // be for the same peer for blockI := 0; blockI < 4; blockI++ { @@ -157,3 +199,26 @@ func TestCleaningUpQueues(t *testing.T) { } } + +func matchNTasks(t *testing.T, ptq *PeerTaskQueue, n int, expected ...string) []*peertask.TaskBlock { + var targets []string + var tasks []*peertask.TaskBlock + for i := 0; i < n; i++ { + t := ptq.PopBlock() + targets = append(targets, t.Target.Pretty()) + tasks = append(tasks, t) + } + + sort.Strings(expected) + sort.Strings(targets) + + t.Log(targets) + t.Log(expected) + for i, s := range targets { + if expected[i] != s { + t.Fatal("unexpected peer", s, expected[i]) + } + } + + return tasks +} From e97a7ce49cfdc3b435a60416fb8e3ebc47a89ebe Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 20 May 2019 18:20:14 -0700 Subject: [PATCH 2/2] feat(options): add options for hooks Add ability to listen for peers getting added and removed from peer request queue --- peertaskqueue.go | 57 +++++++++++++++++++++++++++++++++++++++++++ peertaskqueue_test.go | 45 ++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+) diff --git a/peertaskqueue.go b/peertaskqueue.go index 1d49ebf..bb367c1 100644 --- a/peertaskqueue.go +++ b/peertaskqueue.go @@ -9,6 +9,15 @@ import ( peer "github.com/libp2p/go-libp2p-peer" ) +type peerTaskQueueEvent int + +const ( + peerAdded = peerTaskQueueEvent(1) + peerRemoved = peerTaskQueueEvent(2) +) + +type hookFunc func(p peer.ID, event peerTaskQueueEvent) + // PeerTaskQueue is a prioritized list of tasks to be executed on peers. // The queue puts tasks on in blocks, then alternates between peers (roughly) // to execute the block with the highest priority, or otherwise the one added @@ -18,6 +27,7 @@ type PeerTaskQueue struct { pQueue pq.PQ peerTrackers map[peer.ID]*peertracker.PeerTracker frozenPeers map[peer.ID]struct{} + hooks []hookFunc ignoreFreezing bool } @@ -41,6 +51,45 @@ func IgnoreFreezing(ignoreFreezing bool) Option { } } +func removeHook(hook hookFunc) Option { + return func(ptq *PeerTaskQueue) Option { + for i, testHook := range ptq.hooks { + if &hook == &testHook { + ptq.hooks = append(ptq.hooks[:i], ptq.hooks[i+1:]...) + break + } + } + return addHook(hook) + } +} + +func addHook(hook hookFunc) Option { + return func(ptq *PeerTaskQueue) Option { + ptq.hooks = append(ptq.hooks, hook) + return removeHook(hook) + } +} + +// OnPeerAddedHook adds a hook function that gets called whenever the ptq adds a new peer +func OnPeerAddedHook(onPeerAddedHook func(p peer.ID)) Option { + hook := func(p peer.ID, event peerTaskQueueEvent) { + if event == peerAdded { + onPeerAddedHook(p) + } + } + return addHook(hook) +} + +// OnPeerRemovedHook adds a hook function that gets called whenever the ptq adds a new peer +func OnPeerRemovedHook(onPeerRemovedHook func(p peer.ID)) Option { + hook := func(p peer.ID, event peerTaskQueueEvent) { + if event == peerRemoved { + onPeerRemovedHook(p) + } + } + return addHook(hook) +} + // New creates a new PeerTaskQueue func New(options ...Option) *PeerTaskQueue { ptq := &PeerTaskQueue{ @@ -65,6 +114,12 @@ func (ptq *PeerTaskQueue) Options(options ...Option) Option { return chain(ptq.Options(options[1:]...), reverse) } +func (ptq *PeerTaskQueue) callHooks(to peer.ID, event peerTaskQueueEvent) { + for _, hook := range ptq.hooks { + hook(to, event) + } +} + // PushBlock adds a new block of tasks for the given peer to the queue func (ptq *PeerTaskQueue) PushBlock(to peer.ID, tasks ...peertask.Task) { ptq.lock.Lock() @@ -74,6 +129,7 @@ func (ptq *PeerTaskQueue) PushBlock(to peer.ID, tasks ...peertask.Task) { peerTracker = peertracker.New(to) ptq.pQueue.Push(peerTracker) ptq.peerTrackers[to] = peerTracker + ptq.callHooks(to, peerAdded) } peerTracker.PushBlock(to, tasks, func(e []peertask.Task) { @@ -101,6 +157,7 @@ func (ptq *PeerTaskQueue) PopBlock() *peertask.TaskBlock { target := peerTracker.Target() delete(ptq.peerTrackers, target) delete(ptq.frozenPeers, target) + ptq.callHooks(target, peerRemoved) } else { ptq.pQueue.Push(peerTracker) } diff --git a/peertaskqueue_test.go b/peertaskqueue_test.go index 02ad14b..3babaca 100644 --- a/peertaskqueue_test.go +++ b/peertaskqueue_test.go @@ -10,6 +10,7 @@ import ( "github.com/ipfs/go-peertaskqueue/peertask" "github.com/ipfs/go-peertaskqueue/testutil" + peer "github.com/libp2p/go-libp2p-peer" ) func TestPushPop(t *testing.T) { @@ -167,6 +168,50 @@ func TestPeerRepeats(t *testing.T) { } } +func TestHooks(t *testing.T) { + var peersAdded []string + var peersRemoved []string + onPeerAdded := func(p peer.ID) { + peersAdded = append(peersAdded, p.Pretty()) + } + onPeerRemoved := func(p peer.ID) { + peersRemoved = append(peersRemoved, p.Pretty()) + } + ptq := New(OnPeerAddedHook(onPeerAdded), OnPeerRemovedHook(onPeerRemoved)) + peers := testutil.GeneratePeers(2) + a := peers[0] + b := peers[1] + ptq.PushBlock(a, peertask.Task{Identifier: "1"}) + ptq.PushBlock(b, peertask.Task{Identifier: "2"}) + expected := []string{a.Pretty(), b.Pretty()} + sort.Strings(expected) + sort.Strings(peersAdded) + if len(peersAdded) != len(expected) { + t.Fatal("Incorrect number of peers added") + } + for i, s := range peersAdded { + if expected[i] != s { + t.Fatal("unexpected peer", s, expected[i]) + } + } + + task := ptq.PopBlock() + task.Done(task.Tasks) + task = ptq.PopBlock() + task.Done(task.Tasks) + ptq.PopBlock() + ptq.PopBlock() + + sort.Strings(peersRemoved) + if len(peersRemoved) != len(expected) { + t.Fatal("Incorrect number of peers removed") + } + for i, s := range peersRemoved { + if expected[i] != s { + t.Fatal("unexpected peer", s, expected[i]) + } + } +} func TestCleaningUpQueues(t *testing.T) { ptq := New()