Skip to content

Commit

Permalink
Merge pull request #40 from RTradeLtd/crdt
Browse files Browse the repository at this point in the history
use crdt as an optional database
  • Loading branch information
RT-nilPointer authored Apr 17, 2020
2 parents 7dae66e + 7005a4b commit a85f85c
Show file tree
Hide file tree
Showing 14 changed files with 509 additions and 165 deletions.
67 changes: 67 additions & 0 deletions cmd/gateway/s3x/crdt_broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package s3x

import (
"context"

pb "github.com/RTradeLtd/TxPB/v3/go"
)

//crdtBroadcaster implements crdt.Broadcaster using a pb.PubSubAPIClient
type crdtBroadcaster struct {
topic string
client pb.PubSubAPI_PubSubClient
next chan []byte
err error //only read from if next is closed
}

// newCrdtBroadcaster builds a crdtBroadcaster, ctx must be closed after use to release resources.
func newCrdtBroadcaster(ctx context.Context, api pb.PubSubAPIClient, topic string) (*crdtBroadcaster, error) {
client, err := api.PubSub(ctx)
if err != nil {
return nil, err
}
if err := client.Send(&pb.PubSubRequest{
RequestType: pb.PSREQTYPE_PS_SUBSCRIBE,
Topics: []string{topic},
}); err != nil {
return nil, err
}
next := make(chan []byte)
b := &crdtBroadcaster{
topic: topic,
client: client,
next: next,
}
go func() {
for {
resp, err := client.Recv()
if err != nil {
b.err = err
close(next)
return
}
for _, m := range resp.GetMessage() {
next <- m.GetData()
}
}
}()
return b, nil
}

// Broadcast sends payload to other replicas.
func (b *crdtBroadcaster) Broadcast(data []byte) error {
return b.client.Send(&pb.PubSubRequest{
RequestType: pb.PSREQTYPE_PS_PUBLISH,
Topics: []string{b.topic},
Data: data,
})
}

// Next obtains the next payload received from the network.
func (b *crdtBroadcaster) Next() ([]byte, error) {
data, ok := <-b.next
if !ok {
return nil, b.err
}
return data, nil
}
104 changes: 104 additions & 0 deletions cmd/gateway/s3x/crdt_dagsyncer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package s3x

import (
"context"

pb "github.com/RTradeLtd/TxPB/v3/go"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
ipld "github.com/ipfs/go-ipld-format"
"go.uber.org/multierr"
)

//crdtDAGSyncer implements crdt.DAGSyncer using a remote DAGService and a local datastore to account for HasBlock
type crdtDAGSyncer struct {
dag ipld.DAGService
ds datastore.Batching
}

//newCrdtDAGSyncer creates a crdt.DAGSyncer using a NodeAPIClient and local datastore
func newCrdtDAGSyncer(client pb.NodeAPIClient, ds datastore.Batching) *crdtDAGSyncer {
return &crdtDAGSyncer{
dag: pb.NewDAGService(client),
ds: ds,
}
}

// Get retrieves nodes by CID. Depending on the NodeGetter
// implementation, this may involve fetching the Node from a remote
// machine; consider setting a deadline in the context.
func (d *crdtDAGSyncer) Get(ctx context.Context, c cid.Cid) (ipld.Node, error) {
n, err := d.dag.Get(ctx, c)
return n, d.setBlock(c, err)
}

// GetMany returns a channel of NodeOptions given a set of CIDs.
func (d *crdtDAGSyncer) GetMany(ctx context.Context, cs []cid.Cid) <-chan *ipld.NodeOption {
out := make(chan *ipld.NodeOption, len(cs))
go func() {
for _, c := range cs {
n, err := d.Get(ctx, c)
out <- &ipld.NodeOption{
Node: n,
Err: err,
}
}
close(out)
}()
return out
}

// Add adds a node to this DAG.
func (d *crdtDAGSyncer) Add(ctx context.Context, n ipld.Node) error {
return d.AddMany(ctx, []ipld.Node{n})
}

// AddMany adds many nodes to this DAG.
//
// Consider using the Batch NodeAdder (`NewBatch`) if you make
// extensive use of this function.
func (d *crdtDAGSyncer) AddMany(ctx context.Context, ns []ipld.Node) error {
if err := d.dag.AddMany(ctx, ns); err != nil {
return err
}
for _, n := range ns {
if err := d.setBlock(n.Cid()); err != nil {
return err
}
}
return nil
}

// Remove removes a node from this DAG.
//
// Remove returns no error if the requested node is not present in this DAG.
func (d *crdtDAGSyncer) Remove(ctx context.Context, c cid.Cid) error {
return d.RemoveMany(ctx, []cid.Cid{c})
}

// RemoveMany removes many nodes from this DAG.
//
// It returns success even if the nodes were not present in the DAG.
func (d *crdtDAGSyncer) RemoveMany(ctx context.Context, cs []cid.Cid) error {
for _, c := range cs {
if err := d.ds.Delete(datastore.NewKey(c.KeyString())); err != nil {
return err
}
}
return d.dag.RemoveMany(ctx, cs)
}

// HasBlock returns true if the block is locally available (therefore, it
// is considered processed).
func (d *crdtDAGSyncer) HasBlock(c cid.Cid) (bool, error) {
return d.ds.Has(datastore.NewKey(c.KeyString()))
}

//setBlock saves this block as true for HasBlock, the optional input error is returned with
//functionality bypassed to pipe errors through.
func (d *crdtDAGSyncer) setBlock(c cid.Cid, errs ...error) error {
if err := multierr.Combine(errs...); err != nil {
return err
}
return d.ds.Put(datastore.NewKey(c.KeyString()), nil)
}
10 changes: 8 additions & 2 deletions cmd/gateway/s3x/gateway-s3x-bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,15 @@ const (
testBucket1, testBucket2 = "bucket1", "testbucket2"
)

func TestS3XGateway_Bucket(t *testing.T) {
func TestS3X_Bucket_Badger(t *testing.T) {
testS3XBucket(t, DSTypeBadger)
}
func TestS3X_Bucket_Crdt(t *testing.T) {
testS3XBucket(t, DSTypeCrdt)
}
func testS3XBucket(t *testing.T, dsType DSType) {
ctx := context.Background()
gateway := getTestGateway(t)
gateway := newTestGateway(t, dsType)
defer func() {
if err := gateway.Shutdown(ctx); err != nil {
t.Fatal(err)
Expand Down
2 changes: 2 additions & 0 deletions cmd/gateway/s3x/gateway-s3x-ledger-store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type ledgerStore struct {
plocker bucketLocker //a locker to protect MultipartUploads from concurrent access (per upload ID)
mapLocker sync.Mutex //a lock to protect the l.Buckets map from concurrent access
pmapLocker sync.Mutex //a lock to protect the l.MultipartUploads map from concurrent access

cleanup []func() error //a list of functions to call before we close the backing database.
}

func newLedgerStore(ds datastore.Batching, dag pb.NodeAPIClient) (*ledgerStore, error) {
Expand Down
8 changes: 6 additions & 2 deletions cmd/gateway/s3x/gateway-s3x-ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"go.uber.org/multierr"
)

/* Design Notes
Expand All @@ -19,8 +20,11 @@ The reason for this is so that we can enable easy reuse of internal code.

// Close shuts down the ledger datastore
func (ls *ledgerStore) Close() error {
//todo: clean up caches
return ls.ds.Close()
var err error
for _, f := range ls.cleanup {
err = multierr.Append(err, f())
}
return multierr.Append(err, ls.ds.Close())
}

/////////////////////
Expand Down
10 changes: 8 additions & 2 deletions cmd/gateway/s3x/gateway-s3x-ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,15 @@ import (
dssync "github.com/ipfs/go-datastore/sync"
)

func TestS3XLedgerStore(t *testing.T) {
func TestS3X_LedgerStore_Badger(t *testing.T) {
testS3XLedgerStore(t, DSTypeBadger)
}
func TestS3X_LedgerStore_Crdt(t *testing.T) {
testS3XLedgerStore(t, DSTypeCrdt)
}
func testS3XLedgerStore(t *testing.T, dsType DSType) {
ctx := context.Background()
gateway := getTestGateway(t)
gateway := newTestGateway(t, dsType)
defer func() {
if err := gateway.Shutdown(ctx); err != nil {
t.Fatal(err)
Expand Down
10 changes: 8 additions & 2 deletions cmd/gateway/s3x/gateway-s3x-multipart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@ import (
minio "github.com/RTradeLtd/s3x/cmd"
)

func TestS3XGateway_Multipart(t *testing.T) {
func TestS3X_Multipart_Badger(t *testing.T) {
testS3XMultipart(t, DSTypeBadger)
}
func TestS3X_Multipart_Crdt(t *testing.T) {
testS3XMultipart(t, DSTypeCrdt)
}
func testS3XMultipart(t *testing.T, dsType DSType) {
bucket := "my multipart bucket"
object := "my multipart object"
objectETag := "bafybeibzfoslocl3zs4fngsqminlpikibos7u664circ6mw7kjwkwa6y54"
ctx := context.Background()
gateway := getTestGateway(t)
gateway := newTestGateway(t, dsType)
defer func() {
if err := gateway.Shutdown(ctx); err != nil {
t.Fatal(err)
Expand Down
10 changes: 8 additions & 2 deletions cmd/gateway/s3x/gateway-s3x-object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,15 @@ func testGetObject(t *testing.T, g *testGateway) {
}
}

func TestS3XGateway_Object(t *testing.T) {
func TestS3XG_Object_Badger(t *testing.T) {
testS3XGObject(t, DSTypeBadger)
}
func TestS3XG_Object_Crdt(t *testing.T) {
testS3XGObject(t, DSTypeCrdt)
}
func testS3XGObject(t *testing.T, dsType DSType) {
ctx := context.Background()
gateway := getTestGateway(t)
gateway := newTestGateway(t, dsType)
defer func() {
if err := gateway.Shutdown(ctx); err != nil {
t.Fatal(err)
Expand Down
Loading

0 comments on commit a85f85c

Please sign in to comment.