Skip to content

Commit

Permalink
Rework retrieval strategy.
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Kevin Atkinson <k@kevina.org>
  • Loading branch information
kevina committed Apr 30, 2018
1 parent 3626771 commit 7537e29
Showing 1 changed file with 33 additions and 24 deletions.
57 changes: 33 additions & 24 deletions unixfs/hamt/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type fetcher struct {
ctx context.Context
dserv ipld.DAGService

newJob chan *Shard
reqRes chan *Shard
result chan result

Expand Down Expand Up @@ -63,7 +62,6 @@ func startFetcher(ctx context.Context, dserv ipld.DAGService) *fetcher {
f := &fetcher{
ctx: ctx,
dserv: dserv,
newJob: make(chan *Shard, 16),
reqRes: make(chan *Shard),
result: make(chan result),
idle: true,
Expand All @@ -80,14 +78,13 @@ type result struct {
errs []error
}

// get recursively gets the missing child shards for the hamt object.
// get gets the missing child shards for the hamt object.
// The missing children for the passed in shard is returned. The
// children of the children are recursively retrieved in the
// background. The result is the result of the batch request and not
// just the single job. In particular, if the 'errs' field is empty
// the 'vals' of the result is guaranteed to contain the all the
// missing child shards, but the map may also contain child shards of
// other jobs in the batch
// children are then also retrieved in the background. The result is
// the result of the batch request and not just the single job. In
// particular, if the 'errs' field is empty the 'vals' of the result
// is guaranteed to contain the all the missing child shards, but the
// map may also contain child shards of other jobs in the batch
func (f *fetcher) get(hamt *Shard) result {
f.reqRes <- hamt
res := <-f.result
Expand All @@ -114,8 +111,6 @@ func (f *fetcher) mainLoop() {
var want *Shard
for {
select {
case j := <-f.newJob:
f.mainLoopAddJob(j)
case id := <-f.reqRes:
if want != nil {
// programmer error
Expand All @@ -128,12 +123,13 @@ func (f *fetcher) mainLoop() {
// no children that need to be retrieved
f.result <- result{vals: make(map[string]*Shard)}
}
if f.idle {
f.launch()
}
}
if j.res.vals != nil {
f.hits++
delete(f.jobs, id)
f.doneCnt--
f.result <- j.res
f.mainLoopSendResult(j)
} else {
if j.idx != -1 {
f.misses++
Expand All @@ -149,28 +145,25 @@ func (f *fetcher) mainLoop() {
case cnt := <-f.done:
f.doneCnt += cnt
f.launch()
log.Infof("fetcher: batch job done")
log.Infof("fetcher stats (done, hits, nearMisses, misses): %d %d %d %d", f.doneCnt, f.hits, f.nearMisses, f.misses)
if want != nil {
j := f.jobs[want]
if j.res.vals != nil {
delete(f.jobs, want)
f.doneCnt--
f.result <- j.res
f.mainLoopSendResult(j)
want = nil
}
}
log.Infof("fetcher: batch job done")
log.Infof("fetcher stats (done, hits, nearMisses, misses): %d %d %d %d", f.doneCnt, f.hits, f.nearMisses, f.misses)
case <-f.ctx.Done():
log.Infof("fetcher: exiting")
log.Infof("fetcher stats (done, hits, nearMisses, misses): %d %d %d %d", f.doneCnt, f.hits, f.nearMisses, f.misses)
log.Infof("fetcher unretrived jobs %d", len(f.jobs))
log.Infof("fetcher total number of CIDs retrieved: %d", f.cidCnt)
return
}
}
}

// addJob adds a job to retrive the missing child shards for the
// provided shard
func (f *fetcher) mainLoopAddJob(hamt *Shard) *job {
children := hamt.missingChildShards()
if len(children) == 0 {
Expand All @@ -184,10 +177,25 @@ func (f *fetcher) mainLoopAddJob(hamt *Shard) *job {
f.cidCnt += len(j.cids)
f.todo.push(j)
f.jobs[j.id] = j
return j
}

func (f *fetcher) mainLoopSendResult(j *job) {
f.result <- j.res
delete(f.jobs, j.id)
f.doneCnt--
if len(j.res.errs) != 0 {
return
}
// we want the first child to be the first to run not the last so
// add the jobs in the reverse order
for i := len(j.cids) - 1; i >= 0; i-- {
hamt := j.res.vals[string(j.cids[i].Bytes())]
f.mainLoopAddJob(hamt)
}
if f.idle {
f.launch()
}
return j
}

type batchJob struct {
Expand Down Expand Up @@ -217,7 +225,9 @@ func (f *fetcher) launch() {
}

if len(bj.cids) == 0 {
log.Infof("fetcher: entering idle state: no more jobs")
if !f.idle {
log.Infof("fetcher: entering idle state: no more jobs")
}
f.idle = true
return
}
Expand All @@ -238,7 +248,6 @@ func (f *fetcher) launch() {
fetched.errs = append(fetched.errs, err)
continue
}
f.newJob <- hamt
fetched.vals[string(no.Node.Cid().Bytes())] = hamt
}
for _, job := range bj.jobs {
Expand Down

0 comments on commit 7537e29

Please sign in to comment.