Skip to content

Commit

Permalink
Merge branch 'main' into fix-cache-tests-races
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss authored Oct 11, 2023
2 parents 27024ee + 937eb43 commit f3b5da1
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 43 deletions.
15 changes: 0 additions & 15 deletions share/ipld/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"

"github.com/celestiaorg/celestia-node/share"
)
Expand Down Expand Up @@ -100,9 +98,6 @@ func GetLeaves(ctx context.Context,
maxShares int,
put func(int, ipld.Node),
) {
ctx, span := tracer.Start(ctx, "get-leaves")
defer span.End()

// this buffer ensures writes to 'jobs' are never blocking (bin-tree-feat)
jobs := make(chan *job, (maxShares+1)/2) // +1 for the case where 'maxShares' is 1
jobs <- &job{cid: root, ctx: ctx}
Expand All @@ -120,29 +115,19 @@ func GetLeaves(ctx context.Context,
// work over each job concurrently, s.t. shares do not block
// processing of each other
pool.Submit(func() {
ctx, span := tracer.Start(j.ctx, "process-job")
defer span.End()
defer wg.Done()

span.SetAttributes(
attribute.String("cid", j.cid.String()),
attribute.Int("pos", j.sharePos),
)

nd, err := GetNode(ctx, bGetter, j.cid)
if err != nil {
// we don't really care about errors here
// just fetch as much as possible
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return
}
// check links to know what we should do with the node
lnks := nd.Links()
if len(lnks) == 0 {
// successfully fetched a share/leaf
// ladies and gentlemen, we got em!
span.SetStatus(codes.Ok, "")
put(j.sharePos, nd)
return
}
Expand Down
6 changes: 0 additions & 6 deletions share/ipld/get_shares.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ func GetShare(
// Does not return any error, and returns/unblocks only on success
// (got all shares) or on context cancellation.
func GetShares(ctx context.Context, bg blockservice.BlockGetter, root cid.Cid, shares int, put func(int, share.Share)) {
ctx, span := tracer.Start(ctx, "get-shares")
defer span.End()

putNode := func(i int, leaf format.Node) {
put(i, leafToShare(leaf))
}
Expand All @@ -51,9 +48,6 @@ func GetSharesByNamespace(
namespace share.Namespace,
maxShares int,
) ([]share.Share, *nmt.Proof, error) {
ctx, span := tracer.Start(ctx, "get-shares-by-namespace")
defer span.End()

data := NewNamespaceData(maxShares, namespace, WithLeaves(), WithProofs())
err := data.CollectLeavesByNamespace(ctx, bGetter, root)
if err != nil {
Expand Down
18 changes: 0 additions & 18 deletions share/ipld/namespace_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"

"github.com/celestiaorg/nmt"

Expand Down Expand Up @@ -193,13 +191,6 @@ func (n *NamespaceData) CollectLeavesByNamespace(
return err
}

ctx, span := tracer.Start(ctx, "get-leaves-by-namespace")
defer span.End()

span.SetAttributes(
attribute.String("namespace", n.namespace.String()),
)

// buffer the jobs to avoid blocking, we only need as many
// queued as the number of shares in the second-to-last layer
jobs := make(chan job, (n.maxShares+1)/2)
Expand Down Expand Up @@ -227,15 +218,8 @@ func (n *NamespaceData) CollectLeavesByNamespace(
return retrievalErr
}
pool.Submit(func() {
ctx, span := tracer.Start(j.ctx, "process-job")
defer span.End()
defer wg.done()

span.SetAttributes(
attribute.String("cid", j.cid.String()),
attribute.Int("pos", j.sharePos),
)

// if an error is likely to be returned or not depends on
// the underlying impl of the blockservice, currently it is not a realistic probability
nd, err := GetNode(ctx, bGetter, j.cid)
Expand All @@ -248,7 +232,6 @@ func (n *NamespaceData) CollectLeavesByNamespace(
"pos", j.sharePos,
"err", err,
)
span.SetStatus(codes.Error, err.Error())
// we still need to update the bounds
n.addLeaf(j.sharePos, nil)
return
Expand All @@ -257,7 +240,6 @@ func (n *NamespaceData) CollectLeavesByNamespace(
links := nd.Links()
if len(links) == 0 {
// successfully fetched a leaf belonging to the namespace
span.SetStatus(codes.Ok, "")
// we found a leaf, so we update the bounds
n.addLeaf(j.sharePos, nd)
return
Expand Down
4 changes: 1 addition & 3 deletions share/ipld/nmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
logging "github.com/ipfs/go-log/v2"
mh "github.com/multiformats/go-multihash"
mhcore "github.com/multiformats/go-multihash/core"
"go.opentelemetry.io/otel"

"github.com/celestiaorg/celestia-app/pkg/appconsts"
"github.com/celestiaorg/celestia-app/pkg/da"
Expand All @@ -25,8 +24,7 @@ import (
)

var (
tracer = otel.Tracer("ipld")
log = logging.Logger("ipld")
log = logging.Logger("ipld")
)

const (
Expand Down
5 changes: 4 additions & 1 deletion share/p2p/peers/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,11 @@ func (p *pool) next(ctx context.Context) <-chan peer.ID {
return
}

p.m.RLock()
hasPeerCh := p.hasPeerCh
p.m.RUnlock()
select {
case <-p.hasPeerCh:
case <-hasPeerCh:
case <-ctx.Done():
return
}
Expand Down

0 comments on commit f3b5da1

Please sign in to comment.