Skip to content

Commit

Permalink
provider: refactor to only maintain one batched implementation and ad…
Browse files Browse the repository at this point in the history
…d throughput callback
  • Loading branch information
Jorropo committed Apr 6, 2023
1 parent 999d939 commit f450d31
Show file tree
Hide file tree
Showing 12 changed files with 397 additions and 1,128 deletions.
30 changes: 0 additions & 30 deletions provider/README.md

This file was deleted.

119 changes: 0 additions & 119 deletions provider/batched/system_test.go

This file was deleted.

11 changes: 5 additions & 6 deletions provider/queue/queue.go → provider/internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package queue
import (
"context"
"fmt"

cid "github.com/ipfs/go-cid"
datastore "github.com/ipfs/go-datastore"
namespace "github.com/ipfs/go-datastore/namespace"
Expand All @@ -20,7 +21,6 @@ var log = logging.Logger("provider.queue")
type Queue struct {
// used to differentiate queues in datastore
// e.g. provider vs reprovider
name string
ctx context.Context
ds datastore.Datastore // Must be threadsafe
dequeue chan cid.Cid
Expand All @@ -32,11 +32,10 @@ type Queue struct {
}

// NewQueue creates a queue for cids
func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) {
namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/"))
cancelCtx, cancel := context.WithCancel(ctx)
func NewQueue(ds datastore.Datastore) *Queue {
namespaced := namespace.Wrap(ds, datastore.NewKey("/queue"))
cancelCtx, cancel := context.WithCancel(context.Background())
q := &Queue{
name: name,
ctx: cancelCtx,
ds: namespaced,
dequeue: make(chan cid.Cid),
Expand All @@ -45,7 +44,7 @@ func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue,
closed: make(chan struct{}, 1),
}
q.work()
return q, nil
return q
}

// Close stops the queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ func TestBasicOperation(t *testing.T) {
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue := NewQueue(ds)
defer queue.Close()

cids := makeCids(10)

Expand All @@ -63,10 +61,8 @@ func TestMangledData(t *testing.T) {
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue := NewQueue(ds)
defer queue.Close()

cids := makeCids(10)
for _, c := range cids {
Expand All @@ -75,7 +71,7 @@ func TestMangledData(t *testing.T) {

// put bad data in the queue
queueKey := datastore.NewKey("/test/0")
err = queue.ds.Put(ctx, queueKey, []byte("borked"))
err := queue.ds.Put(ctx, queueKey, []byte("borked"))
if err != nil {
t.Fatal(err)
}
Expand All @@ -91,10 +87,8 @@ func TestInitialization(t *testing.T) {
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue := NewQueue(ds)
defer queue.Close()

cids := makeCids(10)
for _, c := range cids {
Expand All @@ -104,10 +98,8 @@ func TestInitialization(t *testing.T) {
assertOrdered(cids[:5], queue, t)

// make a new queue, same data
queue, err = NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue = NewQueue(ds)
defer queue.Close()

assertOrdered(cids[5:], queue, t)
}
Expand All @@ -118,21 +110,18 @@ func TestInitializationWithManyCids(t *testing.T) {
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue := NewQueue(ds)

cids := makeCids(25)
for _, c := range cids {
queue.Enqueue(c)
}

queue.Close()

// make a new queue, same data
queue, err = NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue = NewQueue(ds)
defer queue.Close()

assertOrdered(cids, queue, t)
}
Loading

0 comments on commit f450d31

Please sign in to comment.