Skip to content

Commit

Permalink
provider: refactor to only maintain one batched implementation and ad…
Browse files Browse the repository at this point in the history
…d throughput callback
  • Loading branch information
Jorropo committed May 11, 2023
1 parent 33e3f0c commit fb36b39
Show file tree
Hide file tree
Showing 16 changed files with 529 additions and 1,168 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.19
require (
github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a
github.com/benbjohnson/clock v1.3.0
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cespare/xxhash/v2 v2.2.0
github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3
github.com/cskr/pubsub v1.0.2
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.2.0 h1:HN5dHm3WBOgndBH6E8V0q2jIYIR3s9yglV8k/+MN3u4=
github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
Expand Down
30 changes: 0 additions & 30 deletions provider/README.md

This file was deleted.

119 changes: 0 additions & 119 deletions provider/batched/system_test.go

This file was deleted.

11 changes: 5 additions & 6 deletions provider/queue/queue.go → provider/internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package queue
import (
"context"
"fmt"

cid "github.com/ipfs/go-cid"
datastore "github.com/ipfs/go-datastore"
namespace "github.com/ipfs/go-datastore/namespace"
Expand All @@ -20,7 +21,6 @@ var log = logging.Logger("provider.queue")
type Queue struct {
// used to differentiate queues in datastore
// e.g. provider vs reprovider
name string
ctx context.Context
ds datastore.Datastore // Must be threadsafe
dequeue chan cid.Cid
Expand All @@ -32,11 +32,10 @@ type Queue struct {
}

// NewQueue creates a queue for cids
func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) {
namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/"))
cancelCtx, cancel := context.WithCancel(ctx)
func NewQueue(ds datastore.Datastore) *Queue {
namespaced := namespace.Wrap(ds, datastore.NewKey("/queue"))
cancelCtx, cancel := context.WithCancel(context.Background())
q := &Queue{
name: name,
ctx: cancelCtx,
ds: namespaced,
dequeue: make(chan cid.Cid),
Expand All @@ -45,7 +44,7 @@ func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue,
closed: make(chan struct{}, 1),
}
q.work()
return q, nil
return q
}

// Close stops the queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ func TestBasicOperation(t *testing.T) {
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue := NewQueue(ds)
defer queue.Close()

cids := makeCids(10)

Expand All @@ -63,10 +61,8 @@ func TestMangledData(t *testing.T) {
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue := NewQueue(ds)
defer queue.Close()

cids := makeCids(10)
for _, c := range cids {
Expand All @@ -75,7 +71,7 @@ func TestMangledData(t *testing.T) {

// put bad data in the queue
queueKey := datastore.NewKey("/test/0")
err = queue.ds.Put(ctx, queueKey, []byte("borked"))
err := queue.ds.Put(ctx, queueKey, []byte("borked"))
if err != nil {
t.Fatal(err)
}
Expand All @@ -91,10 +87,8 @@ func TestInitialization(t *testing.T) {
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue := NewQueue(ds)
defer queue.Close()

cids := makeCids(10)
for _, c := range cids {
Expand All @@ -104,10 +98,8 @@ func TestInitialization(t *testing.T) {
assertOrdered(cids[:5], queue, t)

// make a new queue, same data
queue, err = NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue = NewQueue(ds)
defer queue.Close()

assertOrdered(cids[5:], queue, t)
}
Expand All @@ -118,21 +110,18 @@ func TestInitializationWithManyCids(t *testing.T) {
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue := NewQueue(ds)

cids := makeCids(25)
for _, c := range cids {
queue.Enqueue(c)
}

queue.Close()

// make a new queue, same data
queue, err = NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue = NewQueue(ds)
defer queue.Close()

assertOrdered(cids, queue, t)
}
35 changes: 35 additions & 0 deletions provider/noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package provider

import (
"context"

"github.com/ipfs/go-cid"
)

type noopProvider struct{}

var _ System = (*noopProvider)(nil)

// NewNoopProvider creates a ProviderSystem that does nothing.
func NewNoopProvider() System {
return &noopProvider{}
}

func (op *noopProvider) Run() {
}

func (op *noopProvider) Close() error {
return nil
}

func (op *noopProvider) Provide(cid.Cid) error {
return nil
}

func (op *noopProvider) Reprovide(context.Context) error {
return nil
}

func (op *noopProvider) Stat() (ReproviderStats, error) {
return ReproviderStats{}, nil
}
29 changes: 0 additions & 29 deletions provider/offline.go

This file was deleted.

Loading

0 comments on commit fb36b39

Please sign in to comment.