Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dagservice interface #183

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type IpfsNode struct {
Blocks *bserv.BlockService

// the merkle dag service, get/add objects.
DAG *merkledag.DAGService
DAG merkledag.DAGService

// the path resolution system
Resolver *path.Resolver
Expand Down Expand Up @@ -157,7 +157,7 @@ func NewIpfsNode(cfg *config.Config, online bool) (*IpfsNode, error) {
return nil, err
}

dag := &merkledag.DAGService{Blocks: bs}
dag := merkledag.NewDAGService(bs)
ns := namesys.NewNameSystem(route)

success = true
Expand Down
2 changes: 1 addition & 1 deletion core/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewMockNode() (*IpfsNode, error) {
return nil, err
}

nd.DAG = &mdag.DAGService{bserv}
nd.DAG = mdag.NewDAGService(bserv)

// Namespace resolver
nd.Namesys = nsys.NewNameSystem(dht)
Expand Down
138 changes: 79 additions & 59 deletions importer/chunk/rabin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package chunk
import (
"bufio"
"bytes"
"fmt"
"io"
"math"
)
Expand All @@ -13,6 +12,13 @@ type MaybeRabin struct {
windowSize int
MinBlockSize int
MaxBlockSize int
inbuf *bufio.Reader
buf bytes.Buffer
window []byte // Window is a circular buffer
wi int // window index
rollingHash int
an int
readers []io.Reader
}

func NewMaybeRabin(avgBlkSize int) *MaybeRabin {
Expand All @@ -22,73 +28,87 @@ func NewMaybeRabin(avgBlkSize int) *MaybeRabin {
rb.windowSize = 16 // probably a good number...
rb.MinBlockSize = avgBlkSize / 2
rb.MaxBlockSize = (avgBlkSize / 2) * 3
rb.window = make([]byte, rb.windowSize)
rb.an = 1
return rb
}

func (mr *MaybeRabin) Split(r io.Reader) chan []byte {
out := make(chan []byte, 16)
go func() {
inbuf := bufio.NewReader(r)
blkbuf := new(bytes.Buffer)
func (mr *MaybeRabin) push(val byte) (outval int) {
outval = int(mr.window[mr.wi%len(mr.window)])
mr.window[mr.wi%len(mr.window)] = val
return
}

// some bullshit numbers i made up
a := 10 // honestly, no idea what this is
MOD := 33554383 // randomly chosen (seriously)
an := 1
rollingHash := 0
// Duplicate byte slice
func dup(b []byte) []byte {
d := make([]byte, len(b))
copy(d, b)
return d
}

// Window is a circular buffer
window := make([]byte, mr.windowSize)
push := func(i int, val byte) (outval int) {
outval = int(window[i%len(window)])
window[i%len(window)] = val
return
}
func (mr *MaybeRabin) nextReader() ([]byte, error) {
if len(mr.readers) == 0 {
mr.inbuf = nil
return mr.buf.Bytes(), nil
}
ri := len(mr.readers) - 1
mr.inbuf = bufio.NewReader(mr.readers[ri])
mr.readers = mr.readers[:ri]
return mr.Next()
}

// Duplicate byte slice
dup := func(b []byte) []byte {
d := make([]byte, len(b))
copy(d, b)
return d
}
func (mr *MaybeRabin) Next() ([]byte, error) {
if mr.inbuf == nil {
return nil, io.EOF
}

// Fill up the window
i := 0
for ; i < mr.windowSize; i++ {
b, err := inbuf.ReadByte()
if err != nil {
fmt.Println(err)
return
}
blkbuf.WriteByte(b)
push(i, b)
rollingHash = (rollingHash*a + int(b)) % MOD
an = (an * a) % MOD
}
// some bullshit numbers i made up
a := 10 // honestly, no idea what this is
MOD := 33554383 // randomly chosen (seriously)

for ; true; i++ {
b, err := inbuf.ReadByte()
if err != nil {
break
}
outval := push(i, b)
blkbuf.WriteByte(b)
rollingHash = (rollingHash*a + int(b) - an*outval) % MOD
if (rollingHash&mr.mask == mr.mask && blkbuf.Len() > mr.MinBlockSize) ||
blkbuf.Len() >= mr.MaxBlockSize {
out <- dup(blkbuf.Bytes())
blkbuf.Reset()
var b byte
var err error
// Fill up the window
for ; mr.wi < mr.windowSize; mr.wi++ {
b, err = mr.inbuf.ReadByte()
if err != nil {
if err == io.EOF {
return mr.nextReader()
}
return nil, err
}
mr.buf.WriteByte(b)
mr.push(b)
mr.rollingHash = (mr.rollingHash*a + int(b)) % MOD
mr.an = (mr.an * a) % MOD
}

// Check if there are enough remaining
peek, err := inbuf.Peek(mr.windowSize)
if err != nil || len(peek) != mr.windowSize {
break
}
for ; true; mr.wi++ {
b, err = mr.inbuf.ReadByte()
if err != nil {
break
}
outval := mr.push(b)
mr.buf.WriteByte(b)
mr.rollingHash = (mr.rollingHash*a + int(b) - mr.an*outval) % MOD
if (mr.rollingHash&mr.mask == mr.mask && mr.buf.Len() > mr.MinBlockSize) || mr.buf.Len() >= mr.MaxBlockSize {
block := dup(mr.buf.Bytes())
mr.buf.Reset()
return block, nil
}
io.Copy(blkbuf, inbuf)
out <- blkbuf.Bytes()
close(out)
}()
return out
}
if err == io.EOF {
return mr.nextReader()
}
return nil, err
}

func (mr *MaybeRabin) Size() int { return mr.MaxBlockSize }

func (mr *MaybeRabin) Push(r io.Reader) {
if mr.inbuf == nil {
mr.inbuf = bufio.NewReader(r)
} else {
mr.readers = append(mr.readers, r)
}
}
73 changes: 47 additions & 26 deletions importer/chunk/splitting.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,59 @@ import (

var log = util.Logger("chunk")

var DefaultSplitter = &SizeSplitter{1024 * 512}
var DefaultSplitter = &SizeSplitter{size: 1024 * 512}

// BlockSplitter is the interface to a block splitting algorithm.
type BlockSplitter interface {
Split(r io.Reader) chan []byte

// Size returns the maximum block size that this BlockSplitter may produce,
// or the maximum amount of data the BlockSplitter may buffer,
// whichever is larger.
Size() int

// Next returns a block split from the underlying reader.
// io.EOF is returned when the both last Reader and any splitting buffers
// are exausted.
Next() ([]byte, error)

// Push causes the Reader to start reading from a new io.Reader.
// When an EOF error is seen from the new io.Reader, it is popped
// and the Reader continues to read from the next most recent io.Reader.
Push(io.Reader)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this is a very different concurrency model. I haven't seen this in Go yet, do you have examples?

I think we should stick to channels.

Also, streaming data should be "read out" not "pushed". I.e. the Push function will cause all sorts of concurrency headaches because it's the provider telling the client when to read. Instead of push, it should work like this:

splitter := NewBlockSplitter(io.Reader) 
next := <-splitter.Split() // returns internal readable channel
// or
splitter := NewBlockSplitter()
split := splitter.Split(io.Reader) // create + return per-reader channel
next := <-split

Once the reader is made available, the client should read whenever the client can.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @whyrusleeping thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I dont know that im comfortable with switching away from channels...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although, I do like the idea of having an error returned if needed.

}

type SizeSplitter struct {
Size int
size int
readers []io.Reader
}

func (ss *SizeSplitter) Split(r io.Reader) chan []byte {
out := make(chan []byte)
go func() {
defer close(out)
for {
chunk := make([]byte, ss.Size)
nread, err := r.Read(chunk)
if err != nil {
if err == io.EOF {
if nread > 0 {
out <- chunk[:nread]
}
return
}
log.Error("Block split error: %s", err)
return
}
if nread < ss.Size {
chunk = chunk[:nread]
}
out <- chunk
func (ss *SizeSplitter) Size() int { return ss.size }

func (ss *SizeSplitter) Next() (b []byte, err error) {
b = make([]byte, ss.size)

var n, N, ri int
for len(ss.readers) > 0 {
ri = len(ss.readers) - 1
N, err = ss.readers[ri].Read(b[n:])
n += N
if err == io.EOF {
ss.readers = ss.readers[:ri]
err = nil
}
}()
return out
if n == ss.size {
return
}
}
if n == 0 {
return nil, io.EOF
}
b = b[:n]
return
}

func (ss *SizeSplitter) Push(r io.Reader) {
ss.readers = append(ss.readers, r)
}

func NewSizeSplitter(size int) *SizeSplitter { return &SizeSplitter{size: size} }
22 changes: 18 additions & 4 deletions importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,29 @@ func NewDagFromReader(r io.Reader) (*dag.Node, error) {
}

func NewDagFromReaderWithSplitter(r io.Reader, spl chunk.BlockSplitter) (*dag.Node, error) {
blkChan := spl.Split(r)
first := <-blkChan
// get rid of the r argument and push the r into spl up in NewDagFromReader
spl.Push(r)

first, err := spl.Next()
if err != nil {
return nil, err
}

root := &dag.Node{}

mbf := new(ft.MultiBlock)
for blk := range blkChan {
for {
blk, err := spl.Next()
if err != nil {
if err == io.EOF {
break
}
return nil, err
}

mbf.AddBlockSize(uint64(len(blk)))
child := &dag.Node{Data: ft.WrapData(blk)}
err := root.AddNodeLink("", child)
err = root.AddNodeLink("", child)
if err != nil {
return nil, err
}
Expand Down
32 changes: 20 additions & 12 deletions importer/importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ func TestBuildDag(t *testing.T) {

//Test where calls to read are smaller than the chunk size
func TestSizeBasedSplit(t *testing.T) {
bs := &chunk.SizeSplitter{512}
bs := chunk.NewSizeSplitter(512)
testFileConsistency(t, bs, 32*512)
bs = &chunk.SizeSplitter{4096}
bs = chunk.NewSizeSplitter(4096)
testFileConsistency(t, bs, 32*4096)

// Uneven offset
Expand Down Expand Up @@ -94,17 +94,25 @@ func TestMaybeRabinConsistency(t *testing.T) {
}

func TestRabinBlockSize(t *testing.T) {
buf := new(bytes.Buffer)
nbytes := 1024 * 1024
io.CopyN(buf, rand.Reader, int64(nbytes))
rab := chunk.NewMaybeRabin(4096)
blkch := rab.Split(buf)

var blocks [][]byte
for b := range blkch {
blocks = append(blocks, b)
rab.Push(
&io.LimitedReader{
R: rand.Reader,
N: int64(nbytes),
},
)

var nblocks int
for {
_, err := rab.Next()
if err != nil {
if err == io.EOF {
break
}
t.Fatal(err.Error())
}
nblocks = nblocks + 1
}

fmt.Printf("Avg block size: %d\n", nbytes/len(blocks))

fmt.Printf("Avg block size: %d\n", nbytes/nblocks)
}
Loading