Skip to content

Commit

Permalink
Change queue to be timestamp based, rather than index
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelavila committed Jul 1, 2019
1 parent 8e5ea5f commit dc37657
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 129 deletions.
123 changes: 30 additions & 93 deletions provider/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ package queue
import (
"context"
"fmt"
"strconv"
"strings"
"time"

cid "github.com/ipfs/go-cid"
datastore "github.com/ipfs/go-datastore"
Expand All @@ -25,8 +24,6 @@ type Queue struct {
// e.g. provider vs reprovider
name string
ctx context.Context
tail uint64
head uint64
ds datastore.Datastore // Must be threadsafe
dequeue chan cid.Cid
enqueue chan cid.Cid
Expand All @@ -37,16 +34,10 @@ type Queue struct {
// NewQueue creates a queue for cids
func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) {
namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/"))
head, tail, err := getQueueHeadTail(ctx, namespaced)
if err != nil {
return nil, err
}
cancelCtx, cancel := context.WithCancel(ctx)
q := &Queue{
name: name,
ctx: cancelCtx,
head: head,
tail: tail,
ds: namespaced,
dequeue: make(chan cid.Cid),
enqueue: make(chan cid.Cid),
Expand Down Expand Up @@ -77,41 +68,6 @@ func (q *Queue) Dequeue() <-chan cid.Cid {
return q.dequeue
}

// Look for next Cid in the queue and return it. Skip over gaps and mangled data
func (q *Queue) nextEntry() (datastore.Key, cid.Cid) {
for {
if q.head >= q.tail {
return datastore.Key{}, cid.Undef
}

key := q.queueKey(q.head)
value, err := q.ds.Get(key)

if err != nil {
if err == datastore.ErrNotFound {
log.Warningf("Error missing entry in queue: %s", key)
} else {
log.Errorf("Error fetching from queue: %s", err)
}
q.head++ // move on
continue
}

c, err := cid.Parse(value)
if err != nil {
log.Warningf("Error marshalling Cid from queue: ", err)
q.head++
err = q.ds.Delete(key)
if err != nil {
log.Warningf("Provider queue failed to delete: %s", key)
}
continue
}

return key, c
}
}

// Run dequeues and enqueues when available.
func (q *Queue) work() {
go func() {
Expand All @@ -124,7 +80,26 @@ func (q *Queue) work() {

for {
if c == cid.Undef {
k, c = q.nextEntry()
head, e := q.getQueueHead()

if e != nil {
log.Errorf("error querying for head of queue: %s, stopping provider", e)
return
} else if head != nil {
k = datastore.NewKey(head.Key)
c, e = cid.Parse(head.Value)
if e != nil {
log.Warningf("error parsing queue entry cid with key (%s), removing it from queue: %s", head.Key, e)
err := q.ds.Delete(k)
if err != nil {
log.Errorf("error deleting queue entry with key (%s), due to error (%s), stopping provider", head.Key, err)
return
}
continue
}
} else {
c = cid.Undef
}
}

// If c != cid.Undef set dequeue and attempt write, otherwise wait for enqueue
Expand All @@ -135,14 +110,12 @@ func (q *Queue) work() {

select {
case toQueue := <-q.enqueue:
nextKey := q.queueKey(q.tail)
nextKey := datastore.NewKey(fmt.Sprintf("%d", time.Now().Nanosecond()))

if err := q.ds.Put(nextKey, toQueue.Bytes()); err != nil {
log.Errorf("Failed to enqueue cid: %s", err)
continue
}

q.tail++
case dequeue <- c:
err := q.ds.Delete(k)

Expand All @@ -151,61 +124,25 @@ func (q *Queue) work() {
continue
}
c = cid.Undef
q.head++
case <-q.ctx.Done():
return
}
}
}()
}

func (q *Queue) queueKey(id uint64) datastore.Key {
s := fmt.Sprintf("%016X", id)
return datastore.NewKey(s)
}

func getQueueHeadTail(ctx context.Context, datastore datastore.Datastore) (uint64, uint64, error) {
head, err := getQueueHead(datastore)
func (q *Queue) getQueueHead() (*query.Result, error) {
qry := query.Query{Orders: []query.Order{query.OrderByKey{}}}
results, err := q.ds.Query(qry)
if err != nil {
return 0, 0, err
}
tail, err := getQueueTail(datastore)
if err != nil {
return 0, 0, err
}
return head, tail, nil
}

func getQueueHead(ds datastore.Datastore) (uint64, error) {
return getFirstIDByOrder(ds, query.OrderByKey{})
}

func getQueueTail(ds datastore.Datastore) (uint64, error) {
tail, err := getFirstIDByOrder(ds, query.OrderByKeyDescending{})
if err != nil {
return 0, err
}
if tail > 0 {
tail++
}
return tail, nil
}

func getFirstIDByOrder(ds datastore.Datastore, order query.Order) (uint64, error) {
q := query.Query{Orders: []query.Order{order}}
results, err := ds.Query(q)
if err != nil {
return 0, err
return nil, err
}
defer results.Close()
r, ok := results.NextSync()
if !ok {
return 0, nil
return nil, nil
}
trimmed := strings.TrimPrefix(r.Key, "/")
id, err := strconv.ParseUint(trimmed, 16, 64)
if err != nil {
return 0, err
}
return id, nil

return &r, nil
}

44 changes: 8 additions & 36 deletions provider/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"testing"
"time"

cid "github.com/ipfs/go-cid"
datastore "github.com/ipfs/go-datastore"
sync "github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-ipfs-blocksutil"
)

Expand Down Expand Up @@ -55,36 +55,6 @@ func TestBasicOperation(t *testing.T) {
assertOrdered(cids, queue, t)
}

func TestSparseDatastore(t *testing.T) {
ctx := context.Background()
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}

cids := makeCids(10)
for _, c := range cids {
queue.Enqueue(c)
}

// remove entries in the middle
err = queue.ds.Delete(queue.queueKey(5))
if err != nil {
t.Fatal(err)
}

err = queue.ds.Delete(queue.queueKey(6))
if err != nil {
t.Fatal(err)
}

expected := append(cids[:5], cids[7:]...)
assertOrdered(expected, queue, t)
}

func TestMangledData(t *testing.T) {
ctx := context.Background()
defer ctx.Done()
Expand All @@ -100,13 +70,15 @@ func TestMangledData(t *testing.T) {
queue.Enqueue(c)
}

// remove entries in the middle
err = queue.ds.Put(queue.queueKey(5), []byte("borked"))
// put bad data in the queue
queueKey := datastore.NewKey("/test/0")
err = queue.ds.Put(queueKey, []byte("borked"))
if err != nil {
t.Fatal(err)
}

expected := append(cids[:5], cids[6:]...)
// expect to only see the valid cids we entered
expected := cids
assertOrdered(expected, queue, t)
}

Expand Down

0 comments on commit dc37657

Please sign in to comment.