Skip to content

Commit

Permalink
refactor #375
Browse files Browse the repository at this point in the history
  • Loading branch information
evan-forbes committed Jun 7, 2021
1 parent 46c2b4d commit 898f83d
Showing 1 changed file with 50 additions and 98 deletions.
148 changes: 50 additions & 98 deletions p2p/ipld/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,21 @@ import (
"context"
"fmt"
"math"
"sync/atomic"
"time"
stdsync "sync"

"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
"github.com/lazyledger/nmt"
"github.com/lazyledger/rsmt2d"
"github.com/libp2p/go-libp2p-core/routing"
kbucket "github.com/libp2p/go-libp2p-kbucket"

"github.com/lazyledger/lazyledger-core/ipfs/plugin"
"github.com/lazyledger/lazyledger-core/libs/log"
"github.com/lazyledger/lazyledger-core/libs/sync"
"github.com/lazyledger/lazyledger-core/p2p/ipld/wrapper"
"github.com/lazyledger/lazyledger-core/types"
)

// PutBlock posts and pins erasured block data to IPFS using the provided
// ipld.NodeAdder. Note: the erasured data is currently recomputed
// TODO this craves for refactor
func PutBlock(
ctx context.Context,
adder ipld.NodeAdder,
Expand Down Expand Up @@ -52,113 +47,70 @@ func PutBlock(
if err != nil {
return fmt.Errorf("failure to recompute the extended data square: %w", err)
}
// get row and col roots to be provided
// this also triggers adding data to DAG
prov := newProvider(ctx, croute, int32(squareSize*4), logger.With("height", block.Height))
for _, root := range eds.RowRoots() {
prov.Provide(plugin.MustCidFromNamespacedSha256(root))
}
for _, root := range eds.ColumnRoots() {
prov.Provide(plugin.MustCidFromNamespacedSha256(root))

// setup provide workers
const workers = 32
var wg stdsync.WaitGroup
jobs := make(chan []byte, workers)
errc := make(chan error, 1)
workerCtx, workerCancel := context.WithCancel(ctx)
defer workerCancel() // ensure that this cancel is called

// start feeding the workers jobs
go feedJobs(workerCtx, jobs, eds.RowRoots(), eds.ColumnRoots())

// start each worker
for i := 0; i < workers; i++ {
wg.Add(1)
worker := provideWorker{croute, logger, workerCancel}
go worker.provide(workerCtx, &wg, jobs, errc)
}
// commit the batch to ipfs
err = batchAdder.Commit()

// for for all jobs to finish, an error to occur, or a if a timeout is reached
wg.Wait()
err = collectErrors(errc)
if err != nil {
return err
}
// wait until we provided all the roots if requested
<-prov.Done()
return prov.Err()
}

var provideWorkers = 32

type provider struct {
ctx context.Context
done chan struct{}

err error
errLk sync.RWMutex

jobs chan cid.Cid
total int32

croute routing.ContentRouting
log log.Logger
startTime time.Time
}

func newProvider(ctx context.Context, croute routing.ContentRouting, toProvide int32, logger log.Logger) *provider {
p := &provider{
ctx: ctx,
done: make(chan struct{}),
jobs: make(chan cid.Cid, provideWorkers),
total: toProvide,
croute: croute,
log: logger,
}
for range make([]bool, provideWorkers) {
go p.worker()
}
logger.Info("Started Providing to DHT")
p.startTime = time.Now()
return p
}

func (p *provider) Provide(id cid.Cid) {
select {
case p.jobs <- id:
case <-p.ctx.Done():
}
// commit the batch to ipfs
return batchAdder.Commit()
}

func (p *provider) Done() <-chan struct{} {
return p.done
func collectErrors(errc chan error) error {
close(errc)
err := <-errc
return err
}

func (p *provider) Err() error {
p.errLk.RLock()
defer p.errLk.RUnlock()
if p.err != nil {
return p.err
}
return p.ctx.Err()
type provideWorker struct {
croute routing.ContentRouting
logger log.Logger
cancel context.CancelFunc
}

func (p *provider) worker() {
for {
select {
case id := <-p.jobs:
err := p.croute.Provide(p.ctx, id, true)
if err != nil && err != kbucket.ErrLookupFailure { // Check for error to decrease test log spamming
if p.Err() == nil {
p.errLk.Lock()
p.err = err
p.errLk.Unlock()
}

p.log.Error("failed to provide to DHT", "err", err.Error())
func feedJobs(ctx context.Context, jobs chan<- []byte, roots ...[][]byte) {
defer close(jobs)
for _, rootSet := range roots {
for _, root := range rootSet {
select {
case <-ctx.Done():
return
default:
jobs <- root
}

p.provided()
case <-p.ctx.Done():
for {
select {
case <-p.jobs: // drain chan
p.provided() // ensure done is closed
default:
return
}
}
case <-p.done:
return
}
}
}

func (p *provider) provided() {
if atomic.AddInt32(&p.total, -1) == 0 {
p.log.Info("Finished providing to DHT", "took", time.Since(p.startTime).String())
close(p.done)
func (w *provideWorker) provide(ctx context.Context, wg *stdsync.WaitGroup, jobs <-chan []byte, errc chan<- error) {
defer wg.Done()
for job := range jobs {
rootCid := plugin.MustCidFromNamespacedSha256(job)
err := w.croute.Provide(ctx, rootCid, true)
if err != nil {
w.cancel()
errc <- err
}
}
}

0 comments on commit 898f83d

Please sign in to comment.