Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a write-through block service #3294

Merged
merged 5 commits into from
Oct 12, 2016
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions blocks/blocksutil/block_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ type BlockGenerator struct {
seq int
}

func (bg *BlockGenerator) Next() blocks.Block {
func (bg *BlockGenerator) Next() *blocks.BasicBlock {
bg.seq++
return blocks.NewBlock([]byte(string(bg.seq)))
}

func (bg *BlockGenerator) Blocks(n int) []blocks.Block {
blocks := make([]blocks.Block, 0)
func (bg *BlockGenerator) Blocks(n int) []*blocks.BasicBlock {
blocks := make([]*blocks.BasicBlock, 0)
for i := 0; i < n; i++ {
b := bg.Next()
blocks = append(blocks, b)
Expand Down
129 changes: 82 additions & 47 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
package blockservice

import (
"context"
"errors"
"fmt"

blocks "github.com/ipfs/go-ipfs/blocks"
"github.com/ipfs/go-ipfs/blocks/blockstore"
exchange "github.com/ipfs/go-ipfs/exchange"

context "context"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid"
)
Expand All @@ -23,77 +23,112 @@ var ErrNotFound = errors.New("blockservice: key not found")
// BlockService is a hybrid block datastore. It stores data in a local
// datastore and may retrieve data from a remote Exchange.
// It uses an internal `datastore.Datastore` instance to store values.
type BlockService struct {
// TODO don't expose underlying impl details
Blockstore blockstore.Blockstore
Exchange exchange.Interface
type BlockService interface {
Blockstore() blockstore.Blockstore
Exchange() exchange.Interface
AddBlock(o blocks.Block) (*cid.Cid, error)
AddBlocks(bs []blocks.Block) ([]*cid.Cid, error)
GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error)
GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block
DeleteBlock(o blocks.Block) error
Close() error
}

type blockService struct {
blockstore blockstore.Blockstore
exchange exchange.Interface
// If checkFirst is true then first check that a block doesn't
// already exist to avoid republishing the block on the exchange.
checkFirst bool
}

// NewBlockService creates a BlockService with given datastore instance.
func New(bs blockstore.Blockstore, rem exchange.Interface) *BlockService {
func New(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
if rem == nil {
log.Warning("blockservice running in local (offline) mode.")
}

return &blockService{
blockstore: bs,
exchange: rem,
checkFirst: true,
}
}

// NewWriteThrough ceates a BlockService that guarantees writes will go
// through to the blockstore and are not skipped by cache checks.
func NewWriteThrough(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
if rem == nil {
log.Warning("blockservice running in local (offline) mode.")
}

return &BlockService{
Blockstore: bs,
Exchange: rem,
return &blockService{
blockstore: bs,
exchange: rem,
checkFirst: false,
}
}

func (bs *blockService) Blockstore() blockstore.Blockstore {
return bs.blockstore
}

func (bs *blockService) Exchange() exchange.Interface {
return bs.exchange
}

// AddBlock adds a particular block to the service, Putting it into the datastore.
// TODO pass a context into this if the remote.HasBlock is going to remain here.
func (s *BlockService) AddBlock(o blocks.Block) (*cid.Cid, error) {
// TODO: while this is a great optimization, we should think about the
// possibility of streaming writes directly to disk. If we can pass this object
// all the way down to the datastore without having to 'buffer' its data,
// we could implement a `WriteTo` method on it that could do a streaming write
// of the content, saving us (probably) considerable memory.
func (s *blockService) AddBlock(o blocks.Block) (*cid.Cid, error) {
c := o.Cid()
has, err := s.Blockstore.Has(c)
if err != nil {
return nil, err
}
if s.checkFirst {
has, err := s.blockstore.Has(c)
if err != nil {
return nil, err
}

if has {
return c, nil
if has {
return c, nil
}
}

err = s.Blockstore.Put(o)
err := s.blockstore.Put(o)
if err != nil {
return nil, err
}

if err := s.Exchange.HasBlock(o); err != nil {
if err := s.exchange.HasBlock(o); err != nil {
return nil, errors.New("blockservice is closed")
}

return c, nil
}

func (s *BlockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
var toput []blocks.Block
for _, b := range bs {
has, err := s.Blockstore.Has(b.Cid())
if err != nil {
return nil, err
}

if has {
continue
if s.checkFirst {
for _, b := range bs {
has, err := s.blockstore.Has(b.Cid())
if err != nil {
return nil, err
}
if has {
continue
}
toput = append(toput, b)
}

toput = append(toput, b)
} else {
toput = bs;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

semi colons??? noooo

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran a gofmt and pushed it

}

err := s.Blockstore.PutMany(toput)
err := s.blockstore.PutMany(toput)
if err != nil {
return nil, err
}

var ks []*cid.Cid
for _, o := range toput {
if err := s.Exchange.HasBlock(o); err != nil {
if err := s.exchange.HasBlock(o); err != nil {
return nil, fmt.Errorf("blockservice is closed (%s)", err)
}

Expand All @@ -104,19 +139,19 @@ func (s *BlockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {

// GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash).
func (s *BlockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
log.Debugf("BlockService GetBlock: '%s'", c)

block, err := s.Blockstore.Get(c)
block, err := s.blockstore.Get(c)
if err == nil {
return block, nil
}

if err == blockstore.ErrNotFound && s.Exchange != nil {
if err == blockstore.ErrNotFound && s.exchange != nil {
// TODO be careful checking ErrNotFound. If the underlying
// implementation changes, this will break.
log.Debug("Blockservice: Searching bitswap")
blk, err := s.Exchange.GetBlock(ctx, c)
blk, err := s.exchange.GetBlock(ctx, c)
if err != nil {
if err == blockstore.ErrNotFound {
return nil, ErrNotFound
Expand All @@ -137,13 +172,13 @@ func (s *BlockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block,
// GetBlocks gets a list of blocks asynchronously and returns through
// the returned channel.
// NB: No guarantees are made about order.
func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
out := make(chan blocks.Block, 0)
go func() {
defer close(out)
var misses []*cid.Cid
for _, c := range ks {
hit, err := s.Blockstore.Get(c)
hit, err := s.blockstore.Get(c)
if err != nil {
misses = append(misses, c)
continue
Expand All @@ -160,7 +195,7 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc
return
}

rblocks, err := s.Exchange.GetBlocks(ctx, misses)
rblocks, err := s.exchange.GetBlocks(ctx, misses)
if err != nil {
log.Debugf("Error with GetBlocks: %s", err)
return
Expand All @@ -178,11 +213,11 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc
}

// DeleteBlock deletes a block in the blockservice from the datastore
func (s *BlockService) DeleteBlock(o blocks.Block) error {
return s.Blockstore.DeleteBlock(o.Cid())
func (s *blockService) DeleteBlock(o blocks.Block) error {
return s.blockstore.DeleteBlock(o.Cid())
}

func (s *BlockService) Close() error {
func (s *blockService) Close() error {
log.Debug("blockservice is shutting down...")
return s.Exchange.Close()
return s.exchange.Close()
}
49 changes: 49 additions & 0 deletions blockservice/blockservice_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package blockservice

import (
"testing"

"github.com/ipfs/go-ipfs/blocks"
"github.com/ipfs/go-ipfs/blocks/blockstore"
butil "github.com/ipfs/go-ipfs/blocks/blocksutil"
offline "github.com/ipfs/go-ipfs/exchange/offline"

ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore"
dssync "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync"
)

func TestWriteThroughWorks(t *testing.T) {
bstore := &PutCountingBlockstore{
blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())),
0,
}
bstore2 := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
exch := offline.Exchange(bstore2)
bserv := NewWriteThrough(bstore, exch)
bgen := butil.NewBlockGenerator()

block := bgen.Next()

t.Logf("PutCounter: %d", bstore.PutCounter)
bserv.AddBlock(block)
if bstore.PutCounter != 1 {
t.Fatalf("expected just one Put call, have: %d", bstore.PutCounter)
}

bserv.AddBlock(block)
if bstore.PutCounter != 2 {
t.Fatal("Put should have called again, should be 2 is: %d", bstore.PutCounter)
}
}

var _ blockstore.GCBlockstore = (*PutCountingBlockstore)(nil)

type PutCountingBlockstore struct {
blockstore.GCBlockstore
PutCounter int
}

func (bs *PutCountingBlockstore) Put(block blocks.Block) error {
bs.PutCounter++
return bs.GCBlockstore.Put(block)
}
4 changes: 2 additions & 2 deletions blockservice/test/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
)

// Mocks returns |n| connected mock Blockservices
func Mocks(n int) []*BlockService {
func Mocks(n int) []BlockService {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
sg := bitswap.NewTestSessionGenerator(net)

instances := sg.Instances(n)

var servs []*BlockService
var servs []BlockService
for _, i := range instances {
servs = append(servs, New(i.Blockstore(), i.Exchange))
}
Expand Down
2 changes: 1 addition & 1 deletion core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type IpfsNode struct {
// Services
Peerstore pstore.Peerstore // storage for other Peer instances
Blockstore bstore.GCBlockstore // the block store (lower level)
Blocks *bserv.BlockService // the block service, get/add blocks.
Blocks bserv.BlockService // the block service, get/add blocks.
DAG merkledag.DAGService // the merkle dag service, get/add objects.
Resolver *path.Resolver // the path resolution system
Reporter metrics.Reporter
Expand Down
8 changes: 4 additions & 4 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type LinkService interface {
GetOfflineLinkService() LinkService
}

func NewDAGService(bs *bserv.BlockService) *dagService {
func NewDAGService(bs bserv.BlockService) *dagService {
return &dagService{Blocks: bs}
}

Expand All @@ -51,7 +51,7 @@ func NewDAGService(bs *bserv.BlockService) *dagService {
// TODO: should cache Nodes that are in memory, and be
// able to free some of them when vm pressure is high
type dagService struct {
Blocks *bserv.BlockService
Blocks bserv.BlockService
}

// Add adds a node to the dagService, storing the block in the BlockService
Expand Down Expand Up @@ -113,8 +113,8 @@ func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*Link, error)
}

func (n *dagService) GetOfflineLinkService() LinkService {
if n.Blocks.Exchange.IsOnline() {
bsrv := bserv.New(n.Blocks.Blockstore, offline.Exchange(n.Blocks.Blockstore))
if n.Blocks.Exchange().IsOnline() {
bsrv := bserv.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()))
return NewDAGService(bsrv)
} else {
return n
Expand Down
2 changes: 1 addition & 1 deletion merkledag/merkledag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func TestFetchGraph(t *testing.T) {
}

// create an offline dagstore and ensure all blocks were fetched
bs := bserv.New(bsis[1].Blockstore, offline.Exchange(bsis[1].Blockstore))
bs := bserv.New(bsis[1].Blockstore(), offline.Exchange(bsis[1].Blockstore()))

offline_ds := NewDAGService(bs)

Expand Down