Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configuration options for ignoring freezing, listening for adds & removes #3

Merged
merged 2 commits into from
May 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 105 additions & 10 deletions peertaskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,115 @@ import (
peer "github.com/libp2p/go-libp2p-peer"
)

type peerTaskQueueEvent int

const (
peerAdded = peerTaskQueueEvent(1)
peerRemoved = peerTaskQueueEvent(2)
)

type hookFunc func(p peer.ID, event peerTaskQueueEvent)

// PeerTaskQueue is a prioritized list of tasks to be executed on peers.
// The queue puts tasks on in blocks, then alternates between peers (roughly)
// to execute the block with the highest priority, or otherwise the one added
// first if priorities are equal.
type PeerTaskQueue struct {
lock sync.Mutex
pQueue pq.PQ
peerTrackers map[peer.ID]*peertracker.PeerTracker
frozenPeers map[peer.ID]struct{}
lock sync.Mutex
pQueue pq.PQ
peerTrackers map[peer.ID]*peertracker.PeerTracker
frozenPeers map[peer.ID]struct{}
hooks []hookFunc
ignoreFreezing bool
}

// Option is a function that configures the peer task queue
type Option func(*PeerTaskQueue) Option

func chain(firstOption Option, secondOption Option) Option {
return func(ptq *PeerTaskQueue) Option {
firstReverse := firstOption(ptq)
secondReverse := secondOption(ptq)
return chain(secondReverse, firstReverse)
}
}

// IgnoreFreezing is an option that can make the task queue ignore freezing and unfreezing
func IgnoreFreezing(ignoreFreezing bool) Option {
return func(ptq *PeerTaskQueue) Option {
previous := ptq.ignoreFreezing
ptq.ignoreFreezing = ignoreFreezing
return IgnoreFreezing(previous)
}
}

func removeHook(hook hookFunc) Option {
return func(ptq *PeerTaskQueue) Option {
for i, testHook := range ptq.hooks {
if &hook == &testHook {
ptq.hooks = append(ptq.hooks[:i], ptq.hooks[i+1:]...)
break
}
}
return addHook(hook)
}
}

func addHook(hook hookFunc) Option {
return func(ptq *PeerTaskQueue) Option {
ptq.hooks = append(ptq.hooks, hook)
return removeHook(hook)
}
}

// OnPeerAddedHook adds a hook function that gets called whenever the ptq adds a new peer
func OnPeerAddedHook(onPeerAddedHook func(p peer.ID)) Option {
hook := func(p peer.ID, event peerTaskQueueEvent) {
if event == peerAdded {
onPeerAddedHook(p)
}
}
return addHook(hook)
}

// OnPeerRemovedHook adds a hook function that gets called whenever the ptq adds a new peer
func OnPeerRemovedHook(onPeerRemovedHook func(p peer.ID)) Option {
hook := func(p peer.ID, event peerTaskQueueEvent) {
if event == peerRemoved {
onPeerRemovedHook(p)
}
}
return addHook(hook)
}

// New creates a new PeerTaskQueue
func New() *PeerTaskQueue {
return &PeerTaskQueue{
func New(options ...Option) *PeerTaskQueue {
ptq := &PeerTaskQueue{
peerTrackers: make(map[peer.ID]*peertracker.PeerTracker),
frozenPeers: make(map[peer.ID]struct{}),
pQueue: pq.New(peertracker.PeerCompare),
}
ptq.Options(options...)
return ptq
}

// Options uses configuration functions to configure the peer task queue.
// It returns an Option that can be called to reverse the changes.
func (ptq *PeerTaskQueue) Options(options ...Option) Option {
if len(options) == 0 {
return nil
}
if len(options) == 1 {
return options[0](ptq)
}
reverse := options[0](ptq)
return chain(ptq.Options(options[1:]...), reverse)
}

func (ptq *PeerTaskQueue) callHooks(to peer.ID, event peerTaskQueueEvent) {
for _, hook := range ptq.hooks {
hook(to, event)
}
}

// PushBlock adds a new block of tasks for the given peer to the queue
Expand All @@ -38,6 +129,7 @@ func (ptq *PeerTaskQueue) PushBlock(to peer.ID, tasks ...peertask.Task) {
peerTracker = peertracker.New(to)
ptq.pQueue.Push(peerTracker)
ptq.peerTrackers[to] = peerTracker
ptq.callHooks(to, peerAdded)
}

peerTracker.PushBlock(to, tasks, func(e []peertask.Task) {
Expand Down Expand Up @@ -65,6 +157,7 @@ func (ptq *PeerTaskQueue) PopBlock() *peertask.TaskBlock {
target := peerTracker.Target()
delete(ptq.peerTrackers, target)
delete(ptq.frozenPeers, target)
ptq.callHooks(target, peerRemoved)
} else {
ptq.pQueue.Push(peerTracker)
}
Expand All @@ -81,11 +174,13 @@ func (ptq *PeerTaskQueue) Remove(identifier peertask.Identifier, p peer.ID) {
// block we were about to send them, we should wait a short period of time
// to make sure we receive any other in-flight cancels before sending
// them a block they already potentially have
if !peerTracker.IsFrozen() {
ptq.frozenPeers[p] = struct{}{}
}
if !ptq.ignoreFreezing {
if !peerTracker.IsFrozen() {
ptq.frozenPeers[p] = struct{}{}
}

peerTracker.Freeze()
peerTracker.Freeze()
}
ptq.pQueue.Update(peerTracker.Index())
}
ptq.lock.Unlock()
Expand Down
144 changes: 127 additions & 17 deletions peertaskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipfs/go-peertaskqueue/testutil"
peer "github.com/libp2p/go-libp2p-peer"
)

func TestPushPop(t *testing.T) {
Expand Down Expand Up @@ -70,8 +71,7 @@ func TestPushPop(t *testing.T) {
}
}

// This test checks that peers wont starve out other peers
func TestPeerRepeats(t *testing.T) {
func TestFreezeUnfreeze(t *testing.T) {
ptq := New()
peers := testutil.GeneratePeers(4)
a := peers[0]
Expand All @@ -90,26 +90,69 @@ func TestPeerRepeats(t *testing.T) {
}

// now, pop off four tasks, there should be one from each
var targets []string
var tasks []*peertask.TaskBlock
for i := 0; i < 4; i++ {
t := ptq.PopBlock()
targets = append(targets, t.Target.Pretty())
tasks = append(tasks, t)
matchNTasks(t, ptq, 4, a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty())

ptq.Remove(peertask.Task{Identifier: "1"}, b)

// b should be frozen, causing it to get skipped in the rotation
matchNTasks(t, ptq, 3, a.Pretty(), c.Pretty(), d.Pretty())

ptq.ThawRound()

matchNTasks(t, ptq, 1, b.Pretty())

}

func TestFreezeUnfreezeNoFreezingOption(t *testing.T) {
ptq := New(IgnoreFreezing(true))
peers := testutil.GeneratePeers(4)
a := peers[0]
b := peers[1]
c := peers[2]
d := peers[3]

// Have each push some blocks

for i := 0; i < 5; i++ {
is := fmt.Sprint(i)
ptq.PushBlock(a, peertask.Task{Identifier: is})
ptq.PushBlock(b, peertask.Task{Identifier: is})
ptq.PushBlock(c, peertask.Task{Identifier: is})
ptq.PushBlock(d, peertask.Task{Identifier: is})
}

expected := []string{a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty()}
sort.Strings(expected)
sort.Strings(targets)
// now, pop off four tasks, there should be one from each
matchNTasks(t, ptq, 4, a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty())

t.Log(targets)
t.Log(expected)
for i, s := range targets {
if expected[i] != s {
t.Fatal("unexpected peer", s, expected[i])
}
ptq.Remove(peertask.Task{Identifier: "1"}, b)

// b should be frozen, causing it to get skipped in the rotation
matchNTasks(t, ptq, 4, a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty())

}

// This test checks that peers wont starve out other peers
func TestPeerRepeats(t *testing.T) {
ptq := New()
peers := testutil.GeneratePeers(4)
a := peers[0]
b := peers[1]
c := peers[2]
d := peers[3]

// Have each push some blocks

for i := 0; i < 5; i++ {
is := fmt.Sprint(i)
ptq.PushBlock(a, peertask.Task{Identifier: is})
ptq.PushBlock(b, peertask.Task{Identifier: is})
ptq.PushBlock(c, peertask.Task{Identifier: is})
ptq.PushBlock(d, peertask.Task{Identifier: is})
}

// now, pop off four tasks, there should be one from each
tasks := matchNTasks(t, ptq, 4, a.Pretty(), b.Pretty(), c.Pretty(), d.Pretty())

// Now, if one of the tasks gets finished, the next task off the queue should
// be for the same peer
for blockI := 0; blockI < 4; blockI++ {
Expand All @@ -125,6 +168,50 @@ func TestPeerRepeats(t *testing.T) {
}
}

func TestHooks(t *testing.T) {
var peersAdded []string
var peersRemoved []string
onPeerAdded := func(p peer.ID) {
peersAdded = append(peersAdded, p.Pretty())
}
onPeerRemoved := func(p peer.ID) {
peersRemoved = append(peersRemoved, p.Pretty())
}
ptq := New(OnPeerAddedHook(onPeerAdded), OnPeerRemovedHook(onPeerRemoved))
peers := testutil.GeneratePeers(2)
a := peers[0]
b := peers[1]
ptq.PushBlock(a, peertask.Task{Identifier: "1"})
ptq.PushBlock(b, peertask.Task{Identifier: "2"})
expected := []string{a.Pretty(), b.Pretty()}
sort.Strings(expected)
sort.Strings(peersAdded)
if len(peersAdded) != len(expected) {
t.Fatal("Incorrect number of peers added")
}
for i, s := range peersAdded {
if expected[i] != s {
t.Fatal("unexpected peer", s, expected[i])
}
}

task := ptq.PopBlock()
task.Done(task.Tasks)
task = ptq.PopBlock()
task.Done(task.Tasks)
ptq.PopBlock()
ptq.PopBlock()

sort.Strings(peersRemoved)
if len(peersRemoved) != len(expected) {
t.Fatal("Incorrect number of peers removed")
}
for i, s := range peersRemoved {
if expected[i] != s {
t.Fatal("unexpected peer", s, expected[i])
}
}
}
func TestCleaningUpQueues(t *testing.T) {
ptq := New()

Expand Down Expand Up @@ -157,3 +244,26 @@ func TestCleaningUpQueues(t *testing.T) {
}

}

func matchNTasks(t *testing.T, ptq *PeerTaskQueue, n int, expected ...string) []*peertask.TaskBlock {
var targets []string
var tasks []*peertask.TaskBlock
for i := 0; i < n; i++ {
t := ptq.PopBlock()
targets = append(targets, t.Target.Pretty())
tasks = append(tasks, t)
}

sort.Strings(expected)
sort.Strings(targets)

t.Log(targets)
t.Log(expected)
for i, s := range targets {
if expected[i] != s {
t.Fatal("unexpected peer", s, expected[i])
}
}

return tasks
}