Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clear out memory after reads from the dagreader #4525

Merged
merged 2 commits into from
Jan 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions unixfs/io/dagreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"io"
"io/ioutil"
"math/rand"
"strings"
"testing"

Expand Down Expand Up @@ -72,6 +73,55 @@ func TestSeekAndRead(t *testing.T) {
}
}

func TestSeekAndReadLarge(t *testing.T) {
dserv := testu.GetDAGServ()
inbuf := make([]byte, 20000)
rand.Read(inbuf)

node := testu.GetNode(t, dserv, inbuf, testu.UseProtoBufLeaves)
ctx, closer := context.WithCancel(context.Background())
defer closer()

reader, err := NewDagReader(ctx, node, dserv)
if err != nil {
t.Fatal(err)
}

_, err = reader.Seek(10000, io.SeekStart)
if err != nil {
t.Fatal(err)
}

buf := make([]byte, 100)
_, err = io.ReadFull(reader, buf)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(buf, inbuf[10000:10100]) {
t.Fatal("seeked read failed")
}

pbdr := reader.(*pbDagReader)
var count int
for i, p := range pbdr.promises {
if i > 20 && i < 30 {
if p == nil {
t.Fatal("expected index to be not nil: ", i)
}
count++
} else {
if p != nil {
t.Fatal("expected index to be nil: ", i)
}
}
}
// -1 because we read some and it cleared one
if count != preloadSize-1 {
t.Fatalf("expected %d preloaded promises, got %d", preloadSize-1, count)
}
}

func TestRelativeSeek(t *testing.T) {
dserv := testu.GetDAGServ()
ctx, closer := context.WithCancel(context.Background())
Expand Down
60 changes: 56 additions & 4 deletions unixfs/io/pbdagreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
ft "github.com/ipfs/go-ipfs/unixfs"
ftpb "github.com/ipfs/go-ipfs/unixfs/pb"

node "gx/ipfs/QmNwUEK7QbwSqyKBu3mMtToo8SUc6wQJ7gdZq4gGGJqfnf/go-ipld-format"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
cid "gx/ipfs/QmeSrf6pzut73u6zLQkRFQ3ygt3k6XFT2kjdYP8Tnkwwyg/go-cid"
)

// DagReader provides a way to easily read the data contained in a dag.
Expand All @@ -30,6 +32,9 @@ type pbDagReader struct {
// NodeGetters for each of 'nodes' child links
promises []mdag.NodeGetter

// the cid of each child of the current node
links []*cid.Cid

// the index of the child link currently being read from
linkPosition int

Expand All @@ -47,30 +52,54 @@ var _ DagReader = (*pbDagReader)(nil)

func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv mdag.DAGService) *pbDagReader {
fctx, cancel := context.WithCancel(ctx)
promises := mdag.GetDAG(fctx, serv, n)
curLinks := getLinkCids(n)
return &pbDagReader{
node: n,
serv: serv,
buf: NewBufDagReader(pb.GetData()),
promises: promises,
promises: make([]mdag.NodeGetter, len(curLinks)),
links: curLinks,
ctx: fctx,
cancel: cancel,
pbdata: pb,
}
}

const preloadSize = 10

func (dr *pbDagReader) preloadNextNodes(ctx context.Context) {
beg := dr.linkPosition
end := beg + preloadSize
if end >= len(dr.links) {
end = len(dr.links)
}

for i, p := range mdag.GetNodes(ctx, dr.serv, dr.links[beg:end]) {
dr.promises[beg+i] = p
}
}

// precalcNextBuf follows the next link in line and loads it from the
// DAGService, setting the next buffer to read from
func (dr *pbDagReader) precalcNextBuf(ctx context.Context) error {
dr.buf.Close() // Just to make sure
if dr.buf != nil {
dr.buf.Close() // Just to make sure
dr.buf = nil
}

if dr.linkPosition >= len(dr.promises) {
return io.EOF
}

if dr.promises[dr.linkPosition] == nil {
dr.preloadNextNodes(ctx)
}

nxt, err := dr.promises[dr.linkPosition].Get(ctx)
if err != nil {
return err
}
dr.promises[dr.linkPosition] = nil
dr.linkPosition++

switch nxt := nxt.(type) {
Expand Down Expand Up @@ -105,6 +134,15 @@ func (dr *pbDagReader) precalcNextBuf(ctx context.Context) error {
}
}

func getLinkCids(n node.Node) []*cid.Cid {
links := n.Links()
out := make([]*cid.Cid, 0, len(links))
for _, l := range links {
out = append(out, l.Cid)
}
return out
}

// Size return the total length of the data from the DAG structured file.
func (dr *pbDagReader) Size() uint64 {
return dr.pbdata.GetFilesize()
Expand All @@ -117,6 +155,12 @@ func (dr *pbDagReader) Read(b []byte) (int, error) {

// CtxReadFull reads data from the DAG structured file
func (dr *pbDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
if dr.buf == nil {
if err := dr.precalcNextBuf(ctx); err != nil {
return 0, err
}
}

// If no cached buffer, load one
total := 0
for {
Expand Down Expand Up @@ -145,6 +189,12 @@ func (dr *pbDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
}

func (dr *pbDagReader) WriteTo(w io.Writer) (int64, error) {
if dr.buf == nil {
if err := dr.precalcNextBuf(dr.ctx); err != nil {
return 0, err
}
}

// If no cached buffer, load one
total := int64(0)
for {
Expand Down Expand Up @@ -199,7 +249,9 @@ func (dr *pbDagReader) Seek(offset int64, whence int) (int64, error) {
left := offset
if int64(len(pb.Data)) >= offset {
// Close current buf to close potential child dagreader
dr.buf.Close()
if dr.buf != nil {
dr.buf.Close()
}
dr.buf = NewBufDagReader(pb.GetData()[offset:])

// start reading links from the beginning
Expand Down