Skip to content

Commit

Permalink
Merge pull request #3 from ipfs/feat/configurable-options
Browse files Browse the repository at this point in the history
Add configuration options for ignoring freezing, listening for adds & removes
  • Loading branch information
hannahhoward authored May 22, 2019
2 parents 89a8189 + e97a7ce commit 41fab2d
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 27 deletions.
115 changes: 105 additions & 10 deletions peertaskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,115 @@ import (
peer "github.com/libp2p/go-libp2p-peer"
)

type peerTaskQueueEvent int

const (
peerAdded = peerTaskQueueEvent(1)
peerRemoved = peerTaskQueueEvent(2)
)

type hookFunc func(p peer.ID, event peerTaskQueueEvent)

// PeerTaskQueue is a prioritized list of tasks to be executed on peers.
// The queue puts tasks on in blocks, then alternates between peers (roughly)
// to execute the block with the highest priority, or otherwise the one added
// first if priorities are equal.
type PeerTaskQueue struct {
lock sync.Mutex
pQueue pq.PQ
peerTrackers map[peer.ID]*peertracker.PeerTracker
frozenPeers map[peer.ID]struct{}
lock sync.Mutex
pQueue pq.PQ
peerTrackers map[peer.ID]*peertracker.PeerTracker
frozenPeers map[peer.ID]struct{}
hooks []hookFunc
ignoreFreezing bool
}

// Option is a function that configures the peer task queue
type Option func(*PeerTaskQueue) Option

func chain(firstOption Option, secondOption Option) Option {
return func(ptq *PeerTaskQueue) Option {
firstReverse := firstOption(ptq)
secondReverse := secondOption(ptq)
return chain(secondReverse, firstReverse)
}
}

// IgnoreFreezing is an option that can make the task queue ignore freezing and unfreezing
func IgnoreFreezing(ignoreFreezing bool) Option {
return func(ptq *PeerTaskQueue) Option {
previous := ptq.ignoreFreezing
ptq.ignoreFreezing = ignoreFreezing
return IgnoreFreezing(previous)
}
}

func removeHook(hook hookFunc) Option {
return func(ptq *PeerTaskQueue) Option {
for i, testHook := range ptq.hooks {
if &hook == &testHook {
ptq.hooks = append(ptq.hooks[:i], ptq.hooks[i+1:]...)
break
}
}
return addHook(hook)
}
}

func addHook(hook hookFunc) Option {
return func(ptq *PeerTaskQueue) Option {
ptq.hooks = append(ptq.hooks, hook)
return removeHook(hook)
}
}

// OnPeerAddedHook adds a hook function that gets called whenever the ptq adds a new peer
func OnPeerAddedHook(onPeerAddedHook func(p peer.ID)) Option {
hook := func(p peer.ID, event peerTaskQueueEvent) {
if event == peerAdded {
onPeerAddedHook(p)
}
}
return addHook(hook)
}

// OnPeerRemovedHook adds a hook function that gets called whenever the ptq adds a new peer
func OnPeerRemovedHook(onPeerRemovedHook func(p peer.ID)) Option {
hook := func(p peer.ID, event peerTaskQueueEvent) {
if event == peerRemoved {
onPeerRemovedHook(p)
}
}
return addHook(hook)
}

// New creates a new PeerTaskQueue
func New() *PeerTaskQueue {
return &PeerTaskQueue{
func New(options ...Option) *PeerTaskQueue {
ptq := &PeerTaskQueue{
peerTrackers: make(map[peer.ID]*peertracker.PeerTracker),
frozenPeers: make(map[peer.ID]struct{}),
pQueue: pq.New(peertracker.PeerCompare),
}
ptq.Options(options...)
return ptq
}

// Options uses configuration functions to configure the peer task queue.
// It returns an Option that can be called to reverse the changes.
func (ptq *PeerTaskQueue) Options(options ...Option) Option {
if len(options) == 0 {
return nil
}
if len(options) == 1 {
return options[0](ptq)
}
reverse := options[0](ptq)
return chain(ptq.Options(options[1:]...), reverse)
}

func (ptq *PeerTaskQueue) callHooks(to peer.ID, event peerTaskQueueEvent) {
for _, hook := range ptq.hooks {
hook(to, event)
}
}

// PushBlock adds a new block of tasks for the given peer to the queue
Expand All @@ -38,6 +129,7 @@ func (ptq *PeerTaskQueue) PushBlock(to peer.ID, tasks ...peertask.Task) {
peerTracker = peertracker.New(to)
ptq.pQueue.Push(peerTracker)
ptq.peerTrackers[to] = peerTracker
ptq.callHooks(to, peerAdded)
}

peerTracker.PushBlock(to, tasks, func(e []peertask.Task) {
Expand Down Expand Up @@ -65,6 +157,7 @@ func (ptq *PeerTaskQueue) PopBlock() *peertask.TaskBlock {
target := peerTracker.Target()
delete(ptq.peerTrackers, target)
delete(ptq.frozenPeers, target)
ptq.callHooks(target, peerRemoved)
} else {
ptq.pQueue.Push(peerTracker)
}
Expand All @@ -81,11 +174,13 @@ func (ptq *PeerTaskQueue) Remove(identifier peertask.Identifier, p peer.ID) {
// block we were about to send them, we should wait a short period of time
// to make sure we receive any other in-flight cancels before sending
// them a block they already potentially have
if !peerTracker.IsFrozen() {
ptq.frozenPeers[p] = struct{}{}
}
if !ptq.ignoreFreezing {
if !peerTracker.IsFrozen() {
ptq.frozenPeers[p] = struct{}{}
}

peerTracker.Freeze()
peerTracker.Freeze()
}
ptq.pQueue.Update(peerTracker.Index())
}
ptq.lock.Unlock()
Expand Down
144 changes: 127 additions & 17 deletions peertaskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipfs/go-peertaskqueue/testutil"
peer "github.com/libp2p/go-libp2p-peer"
)

func TestPushPop(t *testing.T) {
Expand Down Expand Up @@ -70,8 +71,7 @@ func TestPushPop(t *testing.T) {
}
}

// This test checks that peers wont starve out other peers
func TestPeerRepeats(t *testing.T) {
func TestFreezeUnfreeze(t *testing.T) {
ptq := New()
peers := testutil.GeneratePeers(4)
a := peers[0]
Expand All @@ -90,26 +90,69 @@ func TestPeerRepeats(t *testing.T) {
}

// now, pop off four tasks, there should be one from each
var targets []string
var tasks []*peertask.TaskBlock
for i := 0; i < 4; i++ {
t := ptq.PopBlock()
targets = append(targets, t.Target.Pretty())
tasks = append(tasks, t)
matchNTasks(t, ptq, 4, a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty())

ptq.Remove(peertask.Task{Identifier: "1"}, b)

// b should be frozen, causing it to get skipped in the rotation
matchNTasks(t, ptq, 3, a.Pretty(), c.Pretty(), d.Pretty())

ptq.ThawRound()

matchNTasks(t, ptq, 1, b.Pretty())

}

func TestFreezeUnfreezeNoFreezingOption(t *testing.T) {
ptq := New(IgnoreFreezing(true))
peers := testutil.GeneratePeers(4)
a := peers[0]
b := peers[1]
c := peers[2]
d := peers[3]

// Have each push some blocks

for i := 0; i < 5; i++ {
is := fmt.Sprint(i)
ptq.PushBlock(a, peertask.Task{Identifier: is})
ptq.PushBlock(b, peertask.Task{Identifier: is})
ptq.PushBlock(c, peertask.Task{Identifier: is})
ptq.PushBlock(d, peertask.Task{Identifier: is})
}

expected := []string{a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty()}
sort.Strings(expected)
sort.Strings(targets)
// now, pop off four tasks, there should be one from each
matchNTasks(t, ptq, 4, a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty())

t.Log(targets)
t.Log(expected)
for i, s := range targets {
if expected[i] != s {
t.Fatal("unexpected peer", s, expected[i])
}
ptq.Remove(peertask.Task{Identifier: "1"}, b)

// b should be frozen, causing it to get skipped in the rotation
matchNTasks(t, ptq, 4, a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty())

}

// This test checks that peers wont starve out other peers
func TestPeerRepeats(t *testing.T) {
ptq := New()
peers := testutil.GeneratePeers(4)
a := peers[0]
b := peers[1]
c := peers[2]
d := peers[3]

// Have each push some blocks

for i := 0; i < 5; i++ {
is := fmt.Sprint(i)
ptq.PushBlock(a, peertask.Task{Identifier: is})
ptq.PushBlock(b, peertask.Task{Identifier: is})
ptq.PushBlock(c, peertask.Task{Identifier: is})
ptq.PushBlock(d, peertask.Task{Identifier: is})
}

// now, pop off four tasks, there should be one from each
tasks := matchNTasks(t, ptq, 4, a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty())

// Now, if one of the tasks gets finished, the next task off the queue should
// be for the same peer
for blockI := 0; blockI < 4; blockI++ {
Expand All @@ -125,6 +168,50 @@ func TestPeerRepeats(t *testing.T) {
}
}

func TestHooks(t *testing.T) {
var peersAdded []string
var peersRemoved []string
onPeerAdded := func(p peer.ID) {
peersAdded = append(peersAdded, p.Pretty())
}
onPeerRemoved := func(p peer.ID) {
peersRemoved = append(peersRemoved, p.Pretty())
}
ptq := New(OnPeerAddedHook(onPeerAdded), OnPeerRemovedHook(onPeerRemoved))
peers := testutil.GeneratePeers(2)
a := peers[0]
b := peers[1]
ptq.PushBlock(a, peertask.Task{Identifier: "1"})
ptq.PushBlock(b, peertask.Task{Identifier: "2"})
expected := []string{a.Pretty(), b.Pretty()}
sort.Strings(expected)
sort.Strings(peersAdded)
if len(peersAdded) != len(expected) {
t.Fatal("Incorrect number of peers added")
}
for i, s := range peersAdded {
if expected[i] != s {
t.Fatal("unexpected peer", s, expected[i])
}
}

task := ptq.PopBlock()
task.Done(task.Tasks)
task = ptq.PopBlock()
task.Done(task.Tasks)
ptq.PopBlock()
ptq.PopBlock()

sort.Strings(peersRemoved)
if len(peersRemoved) != len(expected) {
t.Fatal("Incorrect number of peers removed")
}
for i, s := range peersRemoved {
if expected[i] != s {
t.Fatal("unexpected peer", s, expected[i])
}
}
}
func TestCleaningUpQueues(t *testing.T) {
ptq := New()

Expand Down Expand Up @@ -157,3 +244,26 @@ func TestCleaningUpQueues(t *testing.T) {
}

}

func matchNTasks(t *testing.T, ptq *PeerTaskQueue, n int, expected ...string) []*peertask.TaskBlock {
var targets []string
var tasks []*peertask.TaskBlock
for i := 0; i < n; i++ {
t := ptq.PopBlock()
targets = append(targets, t.Target.Pretty())
tasks = append(tasks, t)
}

sort.Strings(expected)
sort.Strings(targets)

t.Log(targets)
t.Log(expected)
for i, s := range targets {
if expected[i] != s {
t.Fatal("unexpected peer", s, expected[i])
}
}

return tasks
}

0 comments on commit 41fab2d

Please sign in to comment.