Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm/network: measure time of messages in priority queue #19250

Merged
merged 18 commits into from
Mar 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/swarm/swarm-smoke/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var (
allhosts string
hosts []string
filesize int
inputSeed int
syncDelay int
httpPort int
wsPort int
Expand Down Expand Up @@ -74,6 +75,12 @@ func main() {
Usage: "ws port",
Destination: &wsPort,
},
cli.IntFlag{
Name: "seed",
Value: 0,
Usage: "input seed in case we need deterministic upload",
Destination: &inputSeed,
},
cli.IntFlag{
Name: "filesize",
Value: 1024,
Expand Down
35 changes: 15 additions & 20 deletions cmd/swarm/swarm-smoke/upload_and_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ import (
)

func uploadAndSyncCmd(ctx *cli.Context, tuid string) error {
// use input seed if it has been set
if inputSeed != 0 {
seed = inputSeed
}

randomBytes := testutil.RandomBytes(seed, filesize*1000)

errc := make(chan error)
Expand All @@ -47,37 +52,28 @@ func uploadAndSyncCmd(ctx *cli.Context, tuid string) error {
errc <- uploadAndSync(ctx, randomBytes, tuid)
}()

var err error
select {
case err := <-errc:
case err = <-errc:
if err != nil {
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", commandName), nil).Inc(1)
}
return err
case <-time.After(time.Duration(timeout) * time.Second):
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", commandName), nil).Inc(1)

e := fmt.Errorf("timeout after %v sec", timeout)
// trigger debug functionality on randomBytes
err := trackChunks(randomBytes[:])
if err != nil {
e = fmt.Errorf("%v; triggerChunkDebug failed: %v", e, err)
}

return e
err = fmt.Errorf("timeout after %v sec", timeout)
}

// trigger debug functionality on randomBytes even on successful runs
err := trackChunks(randomBytes[:])
if err != nil {
log.Error(err.Error())
// trigger debug functionality on randomBytes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to trigger this even if err is nil?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, so that we compare success runs vs failed runs.

e := trackChunks(randomBytes[:])
if e != nil {
log.Error(e.Error())
}

return nil
return err
}

func trackChunks(testData []byte) error {
log.Warn("Test timed out, running chunk debug sequence")

addrs, err := getAllRefs(testData)
if err != nil {
return err
Expand All @@ -94,14 +90,14 @@ func trackChunks(testData []byte) error {

rpcClient, err := rpc.Dial(httpHost)
if err != nil {
log.Error("Error dialing host", "err", err)
log.Error("error dialing host", "err", err, "host", httpHost)
continue
}

var hasInfo []api.HasInfo
err = rpcClient.Call(&hasInfo, "bzz_has", addrs)
if err != nil {
log.Error("Error calling host", "err", err)
log.Error("error calling rpc client", "err", err, "host", httpHost)
continue
}

Expand All @@ -125,7 +121,6 @@ func trackChunks(testData []byte) error {
}

func getAllRefs(testData []byte) (storage.AddressCollection, error) {
log.Trace("Getting all references for given root hash")
datadir, err := ioutil.TempDir("", "chunk-debug")
if err != nil {
return nil, fmt.Errorf("unable to create temp dir: %v", err)
Expand Down
1 change: 1 addition & 0 deletions metrics/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (r *reporter) makeClient() (err error) {
URL: r.url,
Username: r.username,
Password: r.password,
Timeout: 10 * time.Second,
})

return
Expand Down
7 changes: 4 additions & 3 deletions swarm/network/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,24 +204,24 @@ func (f *Fetcher) run(peers *sync.Map) {

// incoming request
case hopCount = <-f.requestC:
log.Trace("new request", "request addr", f.addr)
// 2) chunk is requested, set requested flag
// launch a request iff none been launched yet
doRequest = !requested
log.Trace("new request", "request addr", f.addr, "doRequest", doRequest)
requested = true

// peer we requested from is gone. fall back to another
// and remove the peer from the peers map
case id := <-gone:
log.Trace("peer gone", "peer id", id.String(), "request addr", f.addr)
peers.Delete(id.String())
doRequest = requested
log.Trace("peer gone", "peer id", id.String(), "request addr", f.addr, "doRequest", doRequest)

// search timeout: too much time passed since the last request,
// extend the search to a new peer if we can find one
case <-waitC:
log.Trace("search timed out: requesting", "request addr", f.addr)
doRequest = requested
log.Trace("search timed out: requesting", "request addr", f.addr, "doRequest", doRequest)

// all Fetcher context closed, can quit
case <-f.ctx.Done():
Expand Down Expand Up @@ -288,6 +288,7 @@ func (f *Fetcher) doRequest(gone chan *enode.ID, peersToSkip *sync.Map, sources
for i = 0; i < len(sources); i++ {
req.Source = sources[i]
var err error
log.Trace("fetcher.doRequest", "request addr", f.addr, "peer", req.Source.String())
sourceID, quit, err = f.protoRequestFunc(f.ctx, req)
if err == nil {
// remove the peer from known sources
Expand Down
23 changes: 16 additions & 7 deletions swarm/network/priorityqueue/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ package priorityqueue
import (
"context"
"errors"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)

var (
Expand Down Expand Up @@ -69,21 +70,23 @@ READ:
case <-ctx.Done():
return
case x := <-q:
log.Trace("priority.queue f(x)", "p", p, "len(Queues[p])", len(pq.Queues[p]))
f(x)
val := x.(struct {
v interface{}
t time.Time
})
f(val.v)
metrics.GetOrRegisterResettingTimer("pq.run", nil).UpdateSince(val.t)
p = top
default:
if p > 0 {
p--
log.Trace("priority.queue p > 0", "p", p)
continue READ
}
p = top
select {
case <-ctx.Done():
return
case <-pq.wakeup:
log.Trace("priority.queue wakeup", "p", p)
}
}
}
Expand All @@ -95,9 +98,15 @@ func (pq *PriorityQueue) Push(x interface{}, p int) error {
if p < 0 || p >= len(pq.Queues) {
return errBadPriority
}
log.Trace("priority.queue push", "p", p, "len(Queues[p])", len(pq.Queues[p]))
val := struct {
v interface{}
t time.Time
}{
x,
time.Now(),
}
select {
case pq.Queues[p] <- x:
case pq.Queues[p] <- val:
default:
return ErrContention
}
Expand Down
12 changes: 12 additions & 0 deletions swarm/network/stream/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
if err != nil {
log.Warn("ERROR in handleRetrieveRequestMsg", "err", err)
}
osp.LogFields(olog.Bool("delivered", true))
return
}
osp.LogFields(olog.Bool("skipCheck", false))
Expand Down Expand Up @@ -216,20 +217,29 @@ type ChunkDeliveryMsgSyncing ChunkDeliveryMsg

// chunk delivery msg is response to retrieverequest msg
func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error {
var osp opentracing.Span
ctx, osp = spancontext.StartSpan(
ctx,
"handle.chunk.delivery")

processReceivedChunksCount.Inc(1)

// retrieve the span for the originating retrieverequest
spanId := fmt.Sprintf("stream.send.request.%v.%v", sp.ID(), req.Addr)
span := tracing.ShiftSpanByKey(spanId)

log.Trace("handle.chunk.delivery", "ref", req.Addr, "from peer", sp.ID())

go func() {
defer osp.Finish()

if span != nil {
span.LogFields(olog.String("finish", "from handleChunkDeliveryMsg"))
defer span.Finish()
}

req.peer = sp
log.Trace("handle.chunk.delivery", "put", req.Addr)
err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData))
if err != nil {
if err == storage.ErrChunkInvalid {
Expand All @@ -239,6 +249,7 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch
req.peer.Drop(err)
}
}
log.Trace("handle.chunk.delivery", "done put", req.Addr, "err", err)
}()
return nil
}
Expand Down Expand Up @@ -284,6 +295,7 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
// this span will finish only when delivery is handled (or times out)
ctx = context.WithValue(ctx, tracing.StoreLabelId, "stream.send.request")
ctx = context.WithValue(ctx, tracing.StoreLabelMeta, fmt.Sprintf("%v.%v", sp.ID(), req.Addr))
log.Trace("request.from.peers", "peer", sp.ID(), "ref", req.Addr)
err := sp.SendPriority(ctx, &RetrieveRequestMsg{
Addr: req.Addr,
SkipCheck: req.SkipCheck,
Expand Down
2 changes: 1 addition & 1 deletion swarm/network/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,7 @@ func (r *Registry) APIs() []rpc.API {
Namespace: "stream",
Version: "3.0",
Service: r.api,
Public: true,
Public: false,
},
}
}
Expand Down
5 changes: 2 additions & 3 deletions swarm/storage/chunker.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,6 @@ func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff in
chunkData, err := r.getter.Get(ctx, Reference(childAddress))
if err != nil {
metrics.GetOrRegisterResettingTimer("lcr.getter.get.err", nil).UpdateSince(startTime)
log.Debug("lazychunkreader.join", "key", fmt.Sprintf("%x", childAddress), "err", err)
select {
case errC <- fmt.Errorf("chunk %v-%v not found; key: %s", off, off+treeSize, fmt.Sprintf("%x", childAddress)):
case <-quitC:
Expand All @@ -561,12 +560,12 @@ func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff in

// Read keeps a cursor so cannot be called simulateously, see ReadAt
func (r *LazyChunkReader) Read(b []byte) (read int, err error) {
log.Debug("lazychunkreader.read", "key", r.addr)
log.Trace("lazychunkreader.read", "key", r.addr)
metrics.GetOrRegisterCounter("lazychunkreader.read", nil).Inc(1)

read, err = r.ReadAt(b, r.off)
if err != nil && err != io.EOF {
log.Debug("lazychunkreader.readat", "read", read, "err", err)
log.Trace("lazychunkreader.readat", "read", read, "err", err)
metrics.GetOrRegisterCounter("lazychunkreader.read.err", nil).Inc(1)
}

Expand Down
3 changes: 3 additions & 0 deletions swarm/storage/netstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ func (n *NetStore) Put(ctx context.Context, ch Chunk) error {

// if chunk is now put in the store, check if there was an active fetcher and call deliver on it
// (this delivers the chunk to requestors via the fetcher)
log.Trace("n.getFetcher", "ref", ch.Address())
if f := n.getFetcher(ch.Address()); f != nil {
log.Trace("n.getFetcher deliver", "ref", ch.Address())
f.deliver(ctx, ch)
}
return nil
Expand Down Expand Up @@ -341,5 +343,6 @@ func (f *fetcher) deliver(ctx context.Context, ch Chunk) {
f.chunk = ch
// closing the deliveredC channel will terminate ongoing requests
close(f.deliveredC)
log.Trace("n.getFetcher close deliveredC", "ref", ch.Address())
})
}
2 changes: 2 additions & 0 deletions swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,8 @@ func (s *Swarm) APIs() []rpc.API {

apis = append(apis, s.bzz.APIs()...)

apis = append(apis, s.streamer.APIs()...)

if s.ps != nil {
apis = append(apis, s.ps.APIs()...)
}
Expand Down
2 changes: 1 addition & 1 deletion vendor/github.com/opentracing/opentracing-go/CHANGELOG.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 3 additions & 15 deletions vendor/github.com/opentracing/opentracing-go/Makefile

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions vendor/github.com/opentracing/opentracing-go/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading