From 33d9ca1b5b751999352113e610035d5b00ea1d85 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sun, 17 Sep 2017 11:38:10 +0300 Subject: [PATCH 01/39] dht_net: pipelined requests with a pump goroutine for reading --- dht_net.go | 172 +++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 126 insertions(+), 46 deletions(-) diff --git a/dht_net.go b/dht_net.go index 03a7cfc43..bb1954c99 100644 --- a/dht_net.go +++ b/dht_net.go @@ -188,17 +188,25 @@ func (dht *IpfsDHT) messageSenderForPeer(p peer.ID) (*messageSender, error) { } type messageSender struct { - s inet.Stream - r ggio.ReadCloser - w bufferedWriteCloser - lk sync.Mutex - p peer.ID - dht *IpfsDHT + s inet.Stream + w ggio.WriteCloser + rch chan chan requestResult + rcount int + lk sync.Mutex + p peer.ID + dht *IpfsDHT invalid bool singleMes int } +type requestResult struct { + mes *pb.Message + err error +} + +const requestResultBuffer = 16 + // invalidate is called before this messageSender is removed from the strmap. // It prevents the messageSender from being reused/reinitialized and then // forgotten (leaving the stream open). @@ -233,8 +241,12 @@ func (ms *messageSender) prep() error { return err } - ms.r = ggio.NewDelimitedReader(nstr, inet.MessageSizeMax) - ms.w = newBufferedDelimitedWriter(nstr) + r := ggio.NewDelimitedReader(nstr, inet.MessageSizeMax) + rch := make(chan chan requestResult, requestResultBuffer) + go messageReceiver(ms.dht.ctx, rch, r) + + ms.rch = rch + ms.w = ggio.NewDelimitedWriter(nstr) ms.s = nstr return nil @@ -250,13 +262,17 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro defer ms.lk.Unlock() retry := false for { + if ms.singleMes > streamReuseTries { + // TODO do this without holding the lock + return ms.sendMessageSingle(ctx, pmes) + } + if err := ms.prep(); err != nil { return err } - if err := ms.writeMsg(pmes); err != nil { - ms.s.Reset() - ms.s = nil + if err := ms.w.WriteMsg(pmes); err != nil { + ms.resetHard() if retry { log.Info("error writing message, bailing: ", err) @@ -271,8 +287,7 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes) if ms.singleMes > streamReuseTries { - go inet.FullClose(ms.s) - ms.s = nil + ms.resetHard() } else if retry { ms.singleMes++ } @@ -282,17 +297,24 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro } func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb.Message, error) { - ms.lk.Lock() - defer ms.lk.Unlock() + // XXX log total request time retry := false for { + ms.lk.Lock() + + if ms.singleMes > streamReuseTries { + ms.lk.Unlock() + return ms.sendRequestSingle(ctx, pmes) + } + if err := ms.prep(); err != nil { + ms.lk.Unlock() return nil, err } - if err := ms.writeMsg(pmes); err != nil { - ms.s.Reset() - ms.s = nil + if err := ms.w.WriteMsg(pmes); err != nil { + ms.resetHard() + ms.lk.Unlock() if retry { log.Info("error writing message, bailing: ", err) @@ -304,56 +326,114 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb } } - mes := new(pb.Message) - if err := ms.ctxReadMsg(ctx, mes); err != nil { - ms.s.Reset() - ms.s = nil + // XXX log + + resch := make(chan requestResult, 1) + select { + case ms.rch <- resch: + default: + // XXX Implement me -- pipeline stall + } + + rcount := ms.rcount + + ms.lk.Unlock() + + var res requestResult + select { + case res = <-resch: + + case <-ctx.Done(): + return nil, ctx.Err() + + // XXX read timeout + } + + if res.err != nil { + ms.lk.Lock() + ms.resetSoft(rcount) + ms.lk.Unlock() if retry { - log.Info("error reading message, bailing: ", err) - return nil, err + log.Info("error reading message, bailing: ", res.err) + return nil, res.err } else { - log.Info("error reading message, trying again: ", err) + log.Info("error reading message, trying again: ", res.err) retry = true continue } } - log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes) + // XXX log + ms.lk.Lock() if ms.singleMes > streamReuseTries { - go inet.FullClose(ms.s) - ms.s = nil + ms.resetSoft(rcount) } else if retry { ms.singleMes++ } + ms.lk.Unlock() - return mes, nil + return res.mes, nil } } -func (ms *messageSender) writeMsg(pmes *pb.Message) error { - if err := ms.w.WriteMsg(pmes); err != nil { +func (ms *messageSender) sendMessageSingle(ctx context.Context, pmes *pb.Message) error { + s, err := ms.dht.host.NewStream(ctx, ms.p, ProtocolDHT, ProtocolDHTOld) + if err != nil { return err } - return ms.w.Flush() + defer s.Close() + + w := ggio.NewDelimitedWriter(s) + return w.WriteMsg(pmes) } -func (ms *messageSender) ctxReadMsg(ctx context.Context, mes *pb.Message) error { - errc := make(chan error, 1) - go func(r ggio.ReadCloser) { - errc <- r.ReadMsg(mes) - }(ms.r) +func (ms *messageSender) sendRequestSingle(ctx context.Context, pmes *pb.Message) (*pb.Message, error) { + s, err := ms.dht.host.NewStream(ctx, ms.p, ProtocolDHT, ProtocolDHTOld) + if err != nil { + return nil, err + } + defer s.Close() - t := time.NewTimer(dhtReadMessageTimeout) - defer t.Stop() + r := ggio.NewDelimitedReader(s, inet.MessageSizeMax) + w := ggio.NewDelimitedWriter(s) - select { - case err := <-errc: - return err - case <-ctx.Done(): - return ctx.Err() - case <-t.C: - return ErrReadTimeout + if err := w.WriteMsg(pmes); err != nil { + return nil, err + } + + mes := new(pb.Message) + + // XXX context, timeout + if err := r.ReadMsg(mes); err != nil { + return nil, err } + + return mes, nil +} + +// Resets the stream unconditionally; increments the reset count. +// Mutex must be locked. +func (ms *messageSender) resetHard() { + close(ms.rch) + ms.s.Reset() + ms.s = nil + ms.rcount++ +} + +// Resets the stream only if the reset count matches the argument +// Allows multiple read failures in batched concurrent requests with +// only a single reset between them. +// Mutex must be locked. +func (ms *messageSender) resetSoft(rcount int) { + if rcount != ms.rcount { + return + } + + ms.resetHard() +} + +func messageReceiver(ctx context.Context, rch chan chan requestResult, r ggio.ReadCloser) { + // XXX Implement me } From db09dc68e138fca8d3ba9669698fc1440a4fa05f Mon Sep 17 00:00:00 2001 From: vyzo Date: Sun, 17 Sep 2017 12:04:53 +0300 Subject: [PATCH 02/39] dht_net: implement messageReceiver, address XXXs --- dht_net.go | 69 ++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 59 insertions(+), 10 deletions(-) diff --git a/dht_net.go b/dht_net.go index bb1954c99..d43bbce51 100644 --- a/dht_net.go +++ b/dht_net.go @@ -297,7 +297,7 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro } func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb.Message, error) { - // XXX log total request time + defer log.EventBegin(ctx, "dhtSendRequest", ms.dht.self, ms.p, pmes).Done() retry := false for { ms.lk.Lock() @@ -326,27 +326,45 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb } } - // XXX log + log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes) resch := make(chan requestResult, 1) select { case ms.rch <- resch: default: - // XXX Implement me -- pipeline stall + // pipeline stall, log it and time it + evt := log.EventBegin(ctx, "dhtSendRequestStall", ms.dht.self, ms.p, pmes) + select { + case ms.rch <- resch: + evt.Done() + case <-ctx.Done(): + evt.Done() + return nil, ctx.Err() + case <-ms.dht.ctx.Done(): + evt.Done() + return nil, ms.dht.ctx.Err() + } } rcount := ms.rcount ms.lk.Unlock() + t := time.NewTimer(dhtReadMessageTimeout) + defer t.Stop() + var res requestResult select { case res = <-resch: + case <-t.C: + return nil, ErrReadTimeout + case <-ctx.Done(): return nil, ctx.Err() - // XXX read timeout + case <-ms.dht.ctx.Done(): + return nil, ms.dht.ctx.Err() } if res.err != nil { @@ -364,8 +382,6 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb } } - // XXX log - ms.lk.Lock() if ms.singleMes > streamReuseTries { ms.resetSoft(rcount) @@ -405,9 +421,23 @@ func (ms *messageSender) sendRequestSingle(ctx context.Context, pmes *pb.Message mes := new(pb.Message) - // XXX context, timeout - if err := r.ReadMsg(mes); err != nil { - return nil, err + errc := make(chan error, 1) + go func() { + errc <- r.ReadMsg(mes) + }() + + t := time.NewTimer(dhtReadMessageTimeout) + defer t.Stop() + + select { + case err := <-errc: + if err != nil { + return nil, err + } + case <-ctx.Done(): + return nil, ctx.Err() + case <-t.C: + return nil, ErrReadTimeout } return mes, nil @@ -435,5 +465,24 @@ func (ms *messageSender) resetSoft(rcount int) { } func messageReceiver(ctx context.Context, rch chan chan requestResult, r ggio.ReadCloser) { - // XXX Implement me + for { + var next chan requestResult + var ok bool + select { + case next, ok = <-rch: + if !ok { + return + } + case <-ctx.Done(): + return + } + + mes := new(pb.Message) + err := r.ReadMsg(mes) + if err != nil { + next <- requestResult{err: err} + } else { + next <- requestResult{mes: mes} + } + } } From cb20a6f2891c9eaebc8359aeca21fb212c616fca Mon Sep 17 00:00:00 2001 From: vyzo Date: Sun, 17 Sep 2017 12:28:43 +0300 Subject: [PATCH 03/39] dht_net: limit the scope of the lock in fallback --- dht_net.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/dht_net.go b/dht_net.go index d43bbce51..b6a386624 100644 --- a/dht_net.go +++ b/dht_net.go @@ -382,13 +382,15 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb } } - ms.lk.Lock() - if ms.singleMes > streamReuseTries { - ms.resetSoft(rcount) - } else if retry { - ms.singleMes++ + if retry { + ms.lk.Lock() + if ms.singleMes > streamReuseTries { + ms.resetSoft(rcount) + } else { + ms.singleMes++ + } + ms.lk.Unlock() } - ms.lk.Unlock() return res.mes, nil } From 9f72b80e01d367ceb4eeacfd6ec90355681bc2fe Mon Sep 17 00:00:00 2001 From: vyzo Date: Sun, 17 Sep 2017 12:34:53 +0300 Subject: [PATCH 04/39] dht_net: unlock in context completions when the pipeline stalls --- dht_net.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dht_net.go b/dht_net.go index b6a386624..0e0c3cc5b 100644 --- a/dht_net.go +++ b/dht_net.go @@ -339,9 +339,11 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb evt.Done() case <-ctx.Done(): evt.Done() + ms.lk.Unlock() return nil, ctx.Err() case <-ms.dht.ctx.Done(): evt.Done() + ms.lk.Unlock() return nil, ms.dht.ctx.Err() } } From 0a07fc9150680e8a140d65564f68ba7edc7c39a2 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sun, 17 Sep 2017 13:00:57 +0300 Subject: [PATCH 05/39] dht_net: more consistent handling of retry resets; check for streamReuseTries after increment --- dht_net.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dht_net.go b/dht_net.go index 0e0c3cc5b..32a1494bd 100644 --- a/dht_net.go +++ b/dht_net.go @@ -286,10 +286,11 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes) - if ms.singleMes > streamReuseTries { - ms.resetHard() - } else if retry { + if retry { ms.singleMes++ + if ms.singleMes > streamReuseTries { + ms.resetHard() + } } return nil @@ -386,10 +387,9 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb if retry { ms.lk.Lock() + ms.singleMes++ if ms.singleMes > streamReuseTries { ms.resetSoft(rcount) - } else { - ms.singleMes++ } ms.lk.Unlock() } From b6865751b6bb3b58862860b66c1d8d929b02ce91 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sun, 17 Sep 2017 13:48:39 +0300 Subject: [PATCH 06/39] dht_net: increase requestResultBuffer to 64 --- dht_net.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dht_net.go b/dht_net.go index 32a1494bd..4051c0231 100644 --- a/dht_net.go +++ b/dht_net.go @@ -205,7 +205,7 @@ type requestResult struct { err error } -const requestResultBuffer = 16 +const requestResultBuffer = 64 // invalidate is called before this messageSender is removed from the strmap. // It prevents the messageSender from being reused/reinitialized and then From d367bf939f36d8bafc1ab74ce582c168a568ac8a Mon Sep 17 00:00:00 2001 From: vyzo Date: Sun, 17 Sep 2017 14:03:03 +0300 Subject: [PATCH 07/39] dht_net: more consistent event logging for dhtSendMessage; measure latency --- dht_net.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/dht_net.go b/dht_net.go index 4051c0231..039b104d2 100644 --- a/dht_net.go +++ b/dht_net.go @@ -258,6 +258,7 @@ func (ms *messageSender) prep() error { const streamReuseTries = 3 func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) error { + defer log.EventBegin(ctx, "dhtSendMessage", ms.dht.self, ms.p, pmes).Done() ms.lk.Lock() defer ms.lk.Unlock() retry := false @@ -284,8 +285,6 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro } } - log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes) - if retry { ms.singleMes++ if ms.singleMes > streamReuseTries { @@ -327,8 +326,6 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb } } - log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes) - resch := make(chan requestResult, 1) select { case ms.rch <- resch: From 9b38b31ddef4d6c0c3f5071b9c0767a23aff2181 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 20 Sep 2017 10:17:36 +0300 Subject: [PATCH 08/39] dht_net: resetHard on invalidate --- dht_net.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dht_net.go b/dht_net.go index 039b104d2..35b36e0f9 100644 --- a/dht_net.go +++ b/dht_net.go @@ -213,8 +213,7 @@ const requestResultBuffer = 64 func (ms *messageSender) invalidate() { ms.invalid = true if ms.s != nil { - ms.s.Reset() - ms.s = nil + ms.resetHard() } } @@ -232,6 +231,7 @@ func (ms *messageSender) prep() error { if ms.invalid { return fmt.Errorf("message sender has been invalidated") } + if ms.s != nil { return nil } From b2c23ed8e5b50873bab3ebb3b514a4854f3c5334 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 21 Sep 2017 10:21:59 +0300 Subject: [PATCH 09/39] dht_net: use context with timeout instead of timer for reads --- dht_net.go | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/dht_net.go b/dht_net.go index 35b36e0f9..8c8fb6f11 100644 --- a/dht_net.go +++ b/dht_net.go @@ -350,18 +350,15 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb ms.lk.Unlock() - t := time.NewTimer(dhtReadMessageTimeout) - defer t.Stop() + rctx, cancel := context.WithTimeout(ctx, dhtReadMessageTimeout) + defer cancel() var res requestResult select { case res = <-resch: - case <-t.C: - return nil, ErrReadTimeout - - case <-ctx.Done(): - return nil, ctx.Err() + case <-rctx.Done(): + return nil, rctx.Err() case <-ms.dht.ctx.Done(): return nil, ms.dht.ctx.Err() @@ -427,18 +424,16 @@ func (ms *messageSender) sendRequestSingle(ctx context.Context, pmes *pb.Message errc <- r.ReadMsg(mes) }() - t := time.NewTimer(dhtReadMessageTimeout) - defer t.Stop() + rctx, cancel := context.WithTimeout(ctx, dhtReadMessageTimeout) + defer cancel() select { case err := <-errc: if err != nil { return nil, err } - case <-ctx.Done(): - return nil, ctx.Err() - case <-t.C: - return nil, ErrReadTimeout + case <-rctx.Done(): + return nil, rctx.Err() } return mes, nil From 0490df765ea9a3b782b349b6e0cc7a8ac13e9c3d Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 21 Sep 2017 10:30:13 +0300 Subject: [PATCH 10/39] dht_net: reset stream on write errors in single message per stream fallback --- dht_net.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/dht_net.go b/dht_net.go index 8c8fb6f11..abfde8b80 100644 --- a/dht_net.go +++ b/dht_net.go @@ -400,7 +400,13 @@ func (ms *messageSender) sendMessageSingle(ctx context.Context, pmes *pb.Message defer s.Close() w := ggio.NewDelimitedWriter(s) - return w.WriteMsg(pmes) + + err = w.WriteMsg(pmes) + if err != nil { + s.Reset() + } + + return err } func (ms *messageSender) sendRequestSingle(ctx context.Context, pmes *pb.Message) (*pb.Message, error) { @@ -414,6 +420,7 @@ func (ms *messageSender) sendRequestSingle(ctx context.Context, pmes *pb.Message w := ggio.NewDelimitedWriter(s) if err := w.WriteMsg(pmes); err != nil { + s.Reset() return nil, err } From e19692de0dccff2158ff3d8619695dcccae060a6 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 27 Sep 2017 16:42:38 +0300 Subject: [PATCH 11/39] dht_net: denser code for messageReceiver --- dht_net.go | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/dht_net.go b/dht_net.go index abfde8b80..4f17975c9 100644 --- a/dht_net.go +++ b/dht_net.go @@ -469,23 +469,22 @@ func (ms *messageSender) resetSoft(rcount int) { func messageReceiver(ctx context.Context, rch chan chan requestResult, r ggio.ReadCloser) { for { - var next chan requestResult - var ok bool select { - case next, ok = <-rch: + case next, ok := <-rch: if !ok { return } + + mes := new(pb.Message) + err := r.ReadMsg(mes) + if err != nil { + next <- requestResult{err: err} + } else { + next <- requestResult{mes: mes} + } + case <-ctx.Done(): return } - - mes := new(pb.Message) - err := r.ReadMsg(mes) - if err != nil { - next <- requestResult{err: err} - } else { - next <- requestResult{mes: mes} - } } } From 298641affb217fbe3317ccff01ed5ec5239384e1 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 27 Dec 2017 11:17:23 +0200 Subject: [PATCH 12/39] dht_test: adds tests for concurrent requests --- dht_test.go | 59 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/dht_test.go b/dht_test.go index 97a48dc41..129c74825 100644 --- a/dht_test.go +++ b/dht_test.go @@ -1305,6 +1305,65 @@ func TestFindClosestPeers(t *testing.T) { } } +func testConcurrentRequests(t *testing.T, reqs int) { + ctx := context.Background() + + _, peers, dhts := setupDHTS(ctx, 10, t) + defer func() { + for i := 0; i < 10; i++ { + dhts[i].Close() + dhts[i].host.Close() + } + }() + + connect(t, ctx, dhts[0], dhts[1]) + for i := 2; i < 10; i++ { + connect(t, ctx, dhts[1], dhts[i]) + } + + ctxT, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + wg := &sync.WaitGroup{} + j := 2 + for i := 0; i < reqs; i++ { + wg.Add(1) + + go func(j int) { + defer wg.Done() + + p, err := dhts[0].FindPeer(ctxT, peers[j]) + if err != nil { + t.Fatal(err) + } + + if p.ID == "" { + t.Fatal("Failed to find peer.") + } + + if p.ID != peers[j] { + t.Fatal("Didnt find expected peer.") + } + + }(j) + + j++ + if j == 10 { + j = 2 + } + } + + wg.Wait() +} + +func TestConcurrentRequests(t *testing.T) { + testConcurrentRequests(t, requestResultBuffer/2) +} + +func TestConcurrentRequestsOverload(t *testing.T) { + testConcurrentRequests(t, 2*requestResultBuffer) +} + func TestGetSetPluggedProtocol(t *testing.T) { t.Run("PutValue/GetValue - same protocol", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) From 995da8994d6ce993cc12bbd4cdeff4df0fd9b29d Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 8 Jan 2018 11:28:14 +0200 Subject: [PATCH 13/39] dht_net: eliminate rcount contraption for stream resetting --- dht_net.go | 86 +++++++++++++++++++++++++++++------------------------- 1 file changed, 46 insertions(+), 40 deletions(-) diff --git a/dht_net.go b/dht_net.go index 4f17975c9..438de81f1 100644 --- a/dht_net.go +++ b/dht_net.go @@ -3,6 +3,7 @@ package dht import ( "bufio" "context" + "errors" "fmt" "io" "sync" @@ -188,13 +189,12 @@ func (dht *IpfsDHT) messageSenderForPeer(p peer.ID) (*messageSender, error) { } type messageSender struct { - s inet.Stream - w ggio.WriteCloser - rch chan chan requestResult - rcount int - lk sync.Mutex - p peer.ID - dht *IpfsDHT + s inet.Stream + w ggio.WriteCloser + rch chan chan requestResult + lk sync.Mutex + p peer.ID + dht *IpfsDHT invalid bool singleMes int @@ -212,9 +212,7 @@ const requestResultBuffer = 64 // forgotten (leaving the stream open). func (ms *messageSender) invalidate() { ms.invalid = true - if ms.s != nil { - ms.resetHard() - } + ms.reset() } func (ms *messageSender) prepOrInvalidate() error { @@ -243,7 +241,7 @@ func (ms *messageSender) prep() error { r := ggio.NewDelimitedReader(nstr, inet.MessageSizeMax) rch := make(chan chan requestResult, requestResultBuffer) - go messageReceiver(ms.dht.ctx, rch, r) + go ms.messageReceiver(rch, r) ms.rch = rch ms.w = ggio.NewDelimitedWriter(nstr) @@ -273,7 +271,7 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro } if err := ms.w.WriteMsg(pmes); err != nil { - ms.resetHard() + ms.reset() if retry { log.Info("error writing message, bailing: ", err) @@ -288,7 +286,7 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro if retry { ms.singleMes++ if ms.singleMes > streamReuseTries { - ms.resetHard() + ms.reset() } } @@ -313,7 +311,7 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb } if err := ms.w.WriteMsg(pmes); err != nil { - ms.resetHard() + ms.reset() ms.lk.Unlock() if retry { @@ -346,8 +344,6 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb } } - rcount := ms.rcount - ms.lk.Unlock() rctx, cancel := context.WithTimeout(ctx, dhtReadMessageTimeout) @@ -365,10 +361,6 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb } if res.err != nil { - ms.lk.Lock() - ms.resetSoft(rcount) - ms.lk.Unlock() - if retry { log.Info("error reading message, bailing: ", res.err) return nil, res.err @@ -383,7 +375,7 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb ms.lk.Lock() ms.singleMes++ if ms.singleMes > streamReuseTries { - ms.resetSoft(rcount) + ms.reset() } ms.lk.Unlock() } @@ -446,28 +438,18 @@ func (ms *messageSender) sendRequestSingle(ctx context.Context, pmes *pb.Message return mes, nil } -// Resets the stream unconditionally; increments the reset count. +// Resets the stream and shuts down the goroutine pump // Mutex must be locked. -func (ms *messageSender) resetHard() { - close(ms.rch) - ms.s.Reset() - ms.s = nil - ms.rcount++ -} - -// Resets the stream only if the reset count matches the argument -// Allows multiple read failures in batched concurrent requests with -// only a single reset between them. -// Mutex must be locked. -func (ms *messageSender) resetSoft(rcount int) { - if rcount != ms.rcount { - return +func (ms *messageSender) reset() { + if ms.s != nil { + close(ms.rch) + ms.s.Reset() + ms.s = nil } - - ms.resetHard() } -func messageReceiver(ctx context.Context, rch chan chan requestResult, r ggio.ReadCloser) { +func (ms *messageSender) messageReceiver(rch chan chan requestResult, r ggio.ReadCloser) { +loop: for { select { case next, ok := <-rch: @@ -479,11 +461,35 @@ func messageReceiver(ctx context.Context, rch chan chan requestResult, r ggio.Re err := r.ReadMsg(mes) if err != nil { next <- requestResult{err: err} + break loop } else { next <- requestResult{mes: mes} } - case <-ctx.Done(): + case <-ms.dht.ctx.Done(): + return + } + } + + // reset once; needs to happen in a goroutine to avoid deadlock + // in case of pipeline stalls + go func() { + ms.lk.Lock() + ms.reset() + ms.lk.Unlock() + }() + + // drain the pipeline + err := errors.New("Stream has been abandoned due to earlier errors") + for { + select { + case next, ok := <-rch: + if !ok { + return + } + next <- requestResult{err: err} + + case <-ms.dht.ctx.Done(): return } } From 67c0356e83ff5c9b0d0c4c64f6dc1063a9a6345c Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 8 Jan 2018 12:36:38 +0200 Subject: [PATCH 14/39] dht_net: move reset definition closer to prep for better readability --- dht_net.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/dht_net.go b/dht_net.go index 438de81f1..80884c607 100644 --- a/dht_net.go +++ b/dht_net.go @@ -250,6 +250,16 @@ func (ms *messageSender) prep() error { return nil } +// Resets the stream and shuts down the goroutine pump +// Mutex must be locked. +func (ms *messageSender) reset() { + if ms.s != nil { + close(ms.rch) + ms.s.Reset() + ms.s = nil + } +} + // streamReuseTries is the number of times we will try to reuse a stream to a // given peer before giving up and reverting to the old one-message-per-stream // behaviour. @@ -438,16 +448,6 @@ func (ms *messageSender) sendRequestSingle(ctx context.Context, pmes *pb.Message return mes, nil } -// Resets the stream and shuts down the goroutine pump -// Mutex must be locked. -func (ms *messageSender) reset() { - if ms.s != nil { - close(ms.rch) - ms.s.Reset() - ms.s = nil - } -} - func (ms *messageSender) messageReceiver(rch chan chan requestResult, r ggio.ReadCloser) { loop: for { From d41c165a2642ed0009cde5a0aacc02d768a58a33 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 8 Jan 2018 12:40:10 +0200 Subject: [PATCH 15/39] dht_net: address TODO about the lock in SendMessage --- dht_net.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dht_net.go b/dht_net.go index 80884c607..45ff4ca7b 100644 --- a/dht_net.go +++ b/dht_net.go @@ -268,15 +268,15 @@ const streamReuseTries = 3 func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) error { defer log.EventBegin(ctx, "dhtSendMessage", ms.dht.self, ms.p, pmes).Done() ms.lk.Lock() - defer ms.lk.Unlock() retry := false for { if ms.singleMes > streamReuseTries { - // TODO do this without holding the lock + ms.lk.Unlock() return ms.sendMessageSingle(ctx, pmes) } if err := ms.prep(); err != nil { + ms.lk.Unlock() return err } @@ -285,6 +285,7 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro if retry { log.Info("error writing message, bailing: ", err) + ms.lk.Unlock() return err } else { log.Info("error writing message, trying again: ", err) @@ -300,6 +301,7 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro } } + ms.lk.Unlock() return nil } } From bc502161be0a19c9f1a090b3474d26bd9b43abcf Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 8 Jan 2018 12:42:55 +0200 Subject: [PATCH 16/39] dht_net: clean up the log event names; dht prefix not needed. --- dht_net.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dht_net.go b/dht_net.go index 45ff4ca7b..11f929d3f 100644 --- a/dht_net.go +++ b/dht_net.go @@ -266,7 +266,7 @@ func (ms *messageSender) reset() { const streamReuseTries = 3 func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) error { - defer log.EventBegin(ctx, "dhtSendMessage", ms.dht.self, ms.p, pmes).Done() + defer log.EventBegin(ctx, "SendMessage", ms.dht.self, ms.p, pmes).Done() ms.lk.Lock() retry := false for { @@ -307,7 +307,7 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro } func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb.Message, error) { - defer log.EventBegin(ctx, "dhtSendRequest", ms.dht.self, ms.p, pmes).Done() + defer log.EventBegin(ctx, "SendRequest", ms.dht.self, ms.p, pmes).Done() retry := false for { ms.lk.Lock() @@ -341,7 +341,7 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb case ms.rch <- resch: default: // pipeline stall, log it and time it - evt := log.EventBegin(ctx, "dhtSendRequestStall", ms.dht.self, ms.p, pmes) + evt := log.EventBegin(ctx, "SendRequestStall", ms.dht.self, ms.p, pmes) select { case ms.rch <- resch: evt.Done() From bab37adf94f97b5f0a73dc3d854a2900e7c877e6 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 8 Jan 2018 13:22:10 +0200 Subject: [PATCH 17/39] dht_net: kill off the stream in case of read timeouts --- dht_net.go | 36 ++++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/dht_net.go b/dht_net.go index 11f929d3f..48b8b64fb 100644 --- a/dht_net.go +++ b/dht_net.go @@ -189,12 +189,13 @@ func (dht *IpfsDHT) messageSenderForPeer(p peer.ID) (*messageSender, error) { } type messageSender struct { - s inet.Stream - w ggio.WriteCloser - rch chan chan requestResult - lk sync.Mutex - p peer.ID - dht *IpfsDHT + s inet.Stream + w ggio.WriteCloser + rch chan chan requestResult + rctl chan struct{} + lk sync.Mutex + p peer.ID + dht *IpfsDHT invalid bool singleMes int @@ -241,9 +242,11 @@ func (ms *messageSender) prep() error { r := ggio.NewDelimitedReader(nstr, inet.MessageSizeMax) rch := make(chan chan requestResult, requestResultBuffer) - go ms.messageReceiver(rch, r) + rctl := make(chan struct{}, 1) + go ms.messageReceiver(rch, rctl, r) ms.rch = rch + ms.rctl = rctl ms.w = ggio.NewDelimitedWriter(nstr) ms.s = nstr @@ -356,6 +359,8 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb } } + rctl := ms.rctl + ms.lk.Unlock() rctx, cancel := context.WithTimeout(ctx, dhtReadMessageTimeout) @@ -366,6 +371,18 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb case res = <-resch: case <-rctx.Done(): + // A read timeout will cause the entire pipeline to time out. + // So signal for a stream reset to avoid clogging subsequent requests. + select { + case <-ctx.Done(): + // not a read timeout + default: + select { + case rctl <- struct{}{}: + default: + } + } + return nil, rctx.Err() case <-ms.dht.ctx.Done(): @@ -450,7 +467,7 @@ func (ms *messageSender) sendRequestSingle(ctx context.Context, pmes *pb.Message return mes, nil } -func (ms *messageSender) messageReceiver(rch chan chan requestResult, r ggio.ReadCloser) { +func (ms *messageSender) messageReceiver(rch chan chan requestResult, rctl chan struct{}, r ggio.ReadCloser) { loop: for { select { @@ -468,6 +485,9 @@ loop: next <- requestResult{mes: mes} } + case <-rctl: + break loop + case <-ms.dht.ctx.Done(): return } From a562c8f2729c3114bf3046385a2416133bde8edb Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 8 Jan 2018 13:35:25 +0200 Subject: [PATCH 18/39] dht_net: messageReceiver should poll for reset before unqueuing the next request --- dht_net.go | 39 +++++++++++++++++++++++---------------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/dht_net.go b/dht_net.go index 48b8b64fb..46e7c5fff 100644 --- a/dht_net.go +++ b/dht_net.go @@ -471,25 +471,32 @@ func (ms *messageSender) messageReceiver(rch chan chan requestResult, rctl chan loop: for { select { - case next, ok := <-rch: - if !ok { - return - } - - mes := new(pb.Message) - err := r.ReadMsg(mes) - if err != nil { - next <- requestResult{err: err} - break loop - } else { - next <- requestResult{mes: mes} - } - case <-rctl: + // poll for reset due to timeouts first, there might be requests queued break loop - case <-ms.dht.ctx.Done(): - return + default: + select { + case next, ok := <-rch: + if !ok { + return + } + + mes := new(pb.Message) + err := r.ReadMsg(mes) + if err != nil { + next <- requestResult{err: err} + break loop + } else { + next <- requestResult{mes: mes} + } + + case <-rctl: + break loop + + case <-ms.dht.ctx.Done(): + return + } } } From a5c987e930568c1dc869b0ba7d53eb3738e90e30 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 9 Feb 2018 11:27:23 +0200 Subject: [PATCH 19/39] dht_net: fix reset race --- dht_net.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/dht_net.go b/dht_net.go index 46e7c5fff..de59fc13c 100644 --- a/dht_net.go +++ b/dht_net.go @@ -263,6 +263,12 @@ func (ms *messageSender) reset() { } } +func (ms *messageSender) resetStream(s inet.Stream) { + if ms.s == s { + ms.reset() + } +} + // streamReuseTries is the number of times we will try to reuse a stream to a // given peer before giving up and reverting to the old one-message-per-stream // behaviour. @@ -360,6 +366,7 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb } rctl := ms.rctl + s := ms.s ms.lk.Unlock() @@ -404,7 +411,7 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb ms.lk.Lock() ms.singleMes++ if ms.singleMes > streamReuseTries { - ms.reset() + ms.resetStream(s) } ms.lk.Unlock() } @@ -502,11 +509,11 @@ loop: // reset once; needs to happen in a goroutine to avoid deadlock // in case of pipeline stalls - go func() { + go func(s inet.Stream) { ms.lk.Lock() - ms.reset() + ms.resetStream(s) ms.lk.Unlock() - }() + }(ms.s) // drain the pipeline err := errors.New("Stream has been abandoned due to earlier errors") From e553dac965fd081782260c9fd5a76fbaee8905bc Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 9 Feb 2018 11:29:07 +0200 Subject: [PATCH 20/39] dht_net: reset stream on single request context completion --- dht_net.go | 1 + 1 file changed, 1 insertion(+) diff --git a/dht_net.go b/dht_net.go index de59fc13c..135993257 100644 --- a/dht_net.go +++ b/dht_net.go @@ -468,6 +468,7 @@ func (ms *messageSender) sendRequestSingle(ctx context.Context, pmes *pb.Message return nil, err } case <-rctx.Done(): + s.Reset() return nil, rctx.Err() } From a6343b67b5d865512c793677686f0eb06106b5ba Mon Sep 17 00:00:00 2001 From: Erin Swenson-Healey Date: Wed, 16 May 2018 17:01:35 -0700 Subject: [PATCH 21/39] use delayed datastore to test pipelined requests --- delayed/delayed.go | 48 +++++++++++++++++ dht_test.go | 131 ++++++++++++++++++++++++++++++++++++--------- notify_test.go | 8 +-- package.json | 7 ++- 4 files changed, 164 insertions(+), 30 deletions(-) create mode 100644 delayed/delayed.go diff --git a/delayed/delayed.go b/delayed/delayed.go new file mode 100644 index 000000000..77309c0ac --- /dev/null +++ b/delayed/delayed.go @@ -0,0 +1,48 @@ +// Package delayed wraps a datastore allowing to artificially +// delay all operations. +package delayed + +import ( + ds "github.com/ipfs/go-datastore" + dsq "github.com/ipfs/go-datastore/query" + delay "github.com/ipfs/go-ipfs-delay" +) + +// New returns a new delayed datastore. +func New(ds ds.Datastore, delay delay.D) ds.Datastore { + return &delayed{ds: ds, delay: delay} +} + +type delayed struct { + ds ds.Datastore + delay delay.D +} + +func (dds *delayed) Put(key ds.Key, value interface{}) (err error) { + dds.delay.Wait() + return dds.ds.Put(key, value) +} + +func (dds *delayed) Get(key ds.Key) (value interface{}, err error) { + dds.delay.Wait() + return dds.ds.Get(key) +} + +func (dds *delayed) Has(key ds.Key) (exists bool, err error) { + dds.delay.Wait() + return dds.ds.Has(key) +} + +func (dds *delayed) Delete(key ds.Key) (err error) { + dds.delay.Wait() + return dds.ds.Delete(key) +} + +func (dds *delayed) Query(q dsq.Query) (dsq.Results, error) { + dds.delay.Wait() + return dds.ds.Query(q) +} + +func (dds *delayed) Batch() (ds.Batch, error) { + return ds.NewBasicBatch(dds), nil +} \ No newline at end of file diff --git a/dht_test.go b/dht_test.go index 129c74825..c94ad36c2 100644 --- a/dht_test.go +++ b/dht_test.go @@ -16,7 +16,9 @@ import ( pb "github.com/libp2p/go-libp2p-kad-dht/pb" cid "github.com/ipfs/go-cid" + delay "github.com/ipfs/go-ipfs-delay" u "github.com/ipfs/go-ipfs-util" + "github.com/libp2p/go-libp2p-kad-dht/delayed" kb "github.com/libp2p/go-libp2p-kbucket" peer "github.com/libp2p/go-libp2p-peer" pstore "github.com/libp2p/go-libp2p-peerstore" @@ -64,6 +66,7 @@ func (testValidator) Select(_ string, bs [][]byte) (int, error) { } return index, nil } + func (testValidator) Validate(_ string, b []byte) error { if bytes.Compare(b, []byte("expired")) == 0 { return errors.New("expired") @@ -71,21 +74,27 @@ func (testValidator) Validate(_ string, b []byte) error { return nil } -func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT { +func setupDHT(ctx context.Context, t *testing.T, client bool, maxDelay time.Duration) *IpfsDHT { + var dss ds.Batching = dssync.MutexWrap(ds.NewMapDatastore()) + if maxDelay != 0 { + dss = delayed.New(dss, delay.VariableUniform(maxDelay/2, maxDelay/2, nil)).(ds.Batching) + } + d, err := New( ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), + opts.Datastore(dss), opts.Client(client), opts.NamespacedValidator("v", blankValidator{}), ) - if err != nil { t.Fatal(err) } + return d } -func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer.ID, []*IpfsDHT) { +func setupDHTS(ctx context.Context, n int, t *testing.T, maxDelay time.Duration) ([]ma.Multiaddr, []peer.ID, []*IpfsDHT) { addrs := make([]ma.Multiaddr, n) dhts := make([]*IpfsDHT, n) peers := make([]peer.ID, n) @@ -94,7 +103,7 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer sanityPeersMap := make(map[string]struct{}) for i := 0; i < n; i++ { - dhts[i] = setupDHT(ctx, t, false) + dhts[i] = setupDHT(ctx, t, false, maxDelay) peers[i] = dhts[i].self addrs[i] = dhts[i].peerstore.Addrs(dhts[i].self)[0] @@ -456,7 +465,7 @@ func TestInvalidMessageSenderTracking(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dht := setupDHT(ctx, t, false) + dht := setupDHT(ctx, t, false, 0) defer dht.Close() foo := peer.ID("asdasd") @@ -479,7 +488,7 @@ func TestProvides(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, _, dhts := setupDHTS(ctx, 4, t) + _, _, dhts := setupDHTS(ctx, 4, t, 0) defer func() { for i := 0; i < 4; i++ { dhts[i].Close() @@ -529,7 +538,7 @@ func TestLocalProvides(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, _, dhts := setupDHTS(ctx, 4, t) + _, _, dhts := setupDHTS(ctx, 4, t, 0) defer func() { for i := 0; i < 4; i++ { dhts[i].Close() @@ -617,7 +626,7 @@ func TestBootstrap(t *testing.T) { defer cancel() nDHTs := 30 - _, _, dhts := setupDHTS(ctx, nDHTs, t) + _, _, dhts := setupDHTS(ctx, nDHTs, t, 0) defer func() { for i := 0; i < nDHTs; i++ { dhts[i].Close() @@ -671,7 +680,7 @@ func TestPeriodicBootstrap(t *testing.T) { defer cancel() nDHTs := 30 - _, _, dhts := setupDHTS(ctx, nDHTs, t) + _, _, dhts := setupDHTS(ctx, nDHTs, t, 0) defer func() { for i := 0; i < nDHTs; i++ { dhts[i].Close() @@ -742,7 +751,7 @@ func TestProvidesMany(t *testing.T) { defer cancel() nDHTs := 40 - _, _, dhts := setupDHTS(ctx, nDHTs, t) + _, _, dhts := setupDHTS(ctx, nDHTs, t, 0) defer func() { for i := 0; i < nDHTs; i++ { dhts[i].Close() @@ -843,7 +852,7 @@ func TestProvidesAsync(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, _, dhts := setupDHTS(ctx, 4, t) + _, _, dhts := setupDHTS(ctx, 4, t, 0) defer func() { for i := 0; i < 4; i++ { dhts[i].Close() @@ -885,7 +894,7 @@ func TestLayeredGet(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, _, dhts := setupDHTS(ctx, 4, t) + _, _, dhts := setupDHTS(ctx, 4, t, 0) defer func() { for i := 0; i < 4; i++ { dhts[i].Close() @@ -925,7 +934,7 @@ func TestFindPeer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, peers, dhts := setupDHTS(ctx, 4, t) + _, peers, dhts := setupDHTS(ctx, 4, t, 0) defer func() { for i := 0; i < 4; i++ { dhts[i].Close() @@ -963,7 +972,7 @@ func TestFindPeersConnectedToPeer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, peers, dhts := setupDHTS(ctx, 4, t) + _, peers, dhts := setupDHTS(ctx, 4, t, 0) defer func() { for i := 0; i < 4; i++ { dhts[i].Close() @@ -1051,8 +1060,8 @@ func TestConnectCollision(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - dhtA := setupDHT(ctx, t, false) - dhtB := setupDHT(ctx, t, false) + dhtA := setupDHT(ctx, t, false, 0) + dhtB := setupDHT(ctx, t, false, 0) addrA := dhtA.peerstore.Addrs(dhtA.self)[0] addrB := dhtB.peerstore.Addrs(dhtB.self)[0] @@ -1104,7 +1113,7 @@ func TestBadProtoMessages(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - d := setupDHT(ctx, t, false) + d := setupDHT(ctx, t, false, 0) nilrec := new(pb.Message) if _, err := d.handlePutValue(ctx, "testpeer", nilrec); err == nil { @@ -1116,8 +1125,8 @@ func TestClientModeConnect(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - a := setupDHT(ctx, t, false) - b := setupDHT(ctx, t, true) + a := setupDHT(ctx, t, false, 0) + b := setupDHT(ctx, t, true, 0) connectNoSync(t, ctx, a, b) @@ -1180,7 +1189,7 @@ func TestFindPeerQuery(t *testing.T) { defer cancel() nDHTs := 101 - _, allpeers, dhts := setupDHTS(ctx, nDHTs, t) + _, allpeers, dhts := setupDHTS(ctx, nDHTs, t, 0) defer func() { for i := 0; i < nDHTs; i++ { dhts[i].Close() @@ -1277,7 +1286,7 @@ func TestFindClosestPeers(t *testing.T) { defer cancel() nDHTs := 30 - _, _, dhts := setupDHTS(ctx, nDHTs, t) + _, _, dhts := setupDHTS(ctx, nDHTs, t, 0) defer func() { for i := 0; i < nDHTs; i++ { dhts[i].Close() @@ -1305,10 +1314,10 @@ func TestFindClosestPeers(t *testing.T) { } } -func testConcurrentRequests(t *testing.T, reqs int) { +func testConcurrentRequests(t *testing.T, reqs int, maxDelay time.Duration) { ctx := context.Background() - _, peers, dhts := setupDHTS(ctx, 10, t) + _, peers, dhts := setupDHTS(ctx, 10, t, maxDelay) defer func() { for i := 0; i < 10; i++ { dhts[i].Close() @@ -1357,11 +1366,83 @@ func testConcurrentRequests(t *testing.T, reqs int) { } func TestConcurrentRequests(t *testing.T) { - testConcurrentRequests(t, requestResultBuffer/2) + testConcurrentRequests(t, requestResultBuffer/2, 0) } func TestConcurrentRequestsOverload(t *testing.T) { - testConcurrentRequests(t, 2*requestResultBuffer) + testConcurrentRequests(t, 2*requestResultBuffer, 0) +} + +func TestDelayedConcurrentRequests(t *testing.T) { + reqs := 30 + ctx := context.Background() + + // create reqs-number of DHTs whose datastore operations + // are delayed (see delayed.go) by, at maximum, 800ms + _, _, dhts := setupDHTS(ctx, reqs, t, 800*time.Millisecond) + defer func() { + for i := 0; i < reqs; i++ { + dhts[i].Close() + dhts[i].host.Close() + } + }() + + // connect 0 with 1, and 1 with everything else + connect(t, ctx, dhts[0], dhts[1]) + for i := 2; i < reqs; i++ { + connect(t, ctx, dhts[1], dhts[i]) + } + + // ensure that our PutValue and GetValue calls complete + // within 30 seconds + ctxT, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + // PutValue everything into DHT 0 + putValueWg := &sync.WaitGroup{} + for i := 0; i < reqs; i++ { + putValueWg.Add(1) + + go func(j int) { + defer putValueWg.Done() + + cid := cid.NewCidV0(u.Hash([]byte(fmt.Sprintf("test%d", j)))) + key := fmt.Sprintf("/v/%s", string(cid.Bytes())) + val := []byte(key) + + err := dhts[0].PutValue(ctxT, key, val) + if err != nil { + t.Fatal(err) + } + }(i) + } + + putValueWg.Wait() + + // GetValue from each DHT and ensure that the key/values + // are correct + getValueWg := &sync.WaitGroup{} + for i := 0; i < reqs; i++ { + getValueWg.Add(1) + + go func(j int) { + defer getValueWg.Done() + + cid := cid.NewCidV0(u.Hash([]byte(fmt.Sprintf("test%d", j)))) + key := fmt.Sprintf("/v/%s", string(cid.Bytes())) + + val, err := dhts[j].GetValue(ctxT, key) + if err != nil { + t.Fatal(err) + } + + if string(val) != key { + t.Fatalf("Expected '%s' got '%s'", key, string(val)) + } + }(i) + } + + getValueWg.Wait() } func TestGetSetPluggedProtocol(t *testing.T) { diff --git a/notify_test.go b/notify_test.go index 855f7d71d..5f8c277f7 100644 --- a/notify_test.go +++ b/notify_test.go @@ -13,8 +13,8 @@ func TestNotifieeMultipleConn(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - d1 := setupDHT(ctx, t, false) - d2 := setupDHT(ctx, t, false) + d1 := setupDHT(ctx, t, false, 0) + d2 := setupDHT(ctx, t, false, 0) nn1 := (*netNotifiee)(d1) nn2 := (*netNotifiee)(d2) @@ -56,8 +56,8 @@ func TestNotifieeFuzz(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) defer cancel() - d1 := setupDHT(ctx, t, false) - d2 := setupDHT(ctx, t, false) + d1 := setupDHT(ctx, t, false, 0) + d2 := setupDHT(ctx, t, false, 0) for i := 0; i < 100; i++ { connectNoSync(t, ctx, d1, d2) diff --git a/package.json b/package.json index 7a837a77a..275ff7c3a 100644 --- a/package.json +++ b/package.json @@ -159,6 +159,12 @@ "hash": "Qmcc5CPuKyfDZNmqXNkk6j23CyZqZGypUv952NLHYGbeni", "name": "go-libp2p-swarm", "version": "3.0.10" + }, + { + "author": "hsanjuan", + "hash": "QmRJVNatYJwTAHgdSM1Xef9QVQ1Ch3XHdmcrykjP5Y4soL", + "name": "go-ipfs-delay", + "version": "0.0.1" } ], "gxVersion": "0.4.0", @@ -168,4 +174,3 @@ "releaseCmd": "git commit -a -m \"gx publish $VERSION\"", "version": "4.3.8" } - From 8d996294848b0e6fe5da70bb76f89da3f0ac8db1 Mon Sep 17 00:00:00 2001 From: Erin Swenson-Healey Date: Thu, 17 May 2018 06:48:28 -0700 Subject: [PATCH 22/39] add comments to copypasted delayed package and fix imports --- delayed/delayed.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/delayed/delayed.go b/delayed/delayed.go index 77309c0ac..e7b469d80 100644 --- a/delayed/delayed.go +++ b/delayed/delayed.go @@ -1,3 +1,9 @@ +// This package was copied from go-datastore 2.2.0, as +// go-libp2p-kad-dht is stuck using go-datastore 1.4.1 until +// its autobatch dependency is updated. +// +// see: https://github.com/whyrusleeping/autobatch/pull/8 + // Package delayed wraps a datastore allowing to artificially // delay all operations. package delayed @@ -45,4 +51,9 @@ func (dds *delayed) Query(q dsq.Query) (dsq.Results, error) { func (dds *delayed) Batch() (ds.Batch, error) { return ds.NewBasicBatch(dds), nil +} + +func (dds *delayed) DiskUsage() (uint64, error) { + dds.delay.Wait() + return ds.DiskUsage(dds.ds) } \ No newline at end of file From b126c52f42610a14e08657f2ca1161b0b276b548 Mon Sep 17 00:00:00 2001 From: Erin Swenson-Healey Date: Thu, 17 May 2018 06:51:20 -0700 Subject: [PATCH 23/39] datastore.DiskUsage is not defined in go-datastore 1.4.1 commenting out for now. will not be an issue once go-datastore dependency is upgraded --- delayed/delayed.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/delayed/delayed.go b/delayed/delayed.go index e7b469d80..f067460d8 100644 --- a/delayed/delayed.go +++ b/delayed/delayed.go @@ -53,7 +53,9 @@ func (dds *delayed) Batch() (ds.Batch, error) { return ds.NewBasicBatch(dds), nil } -func (dds *delayed) DiskUsage() (uint64, error) { - dds.delay.Wait() - return ds.DiskUsage(dds.ds) -} \ No newline at end of file +// note: datastore.DiskUsage is not defined in go-datastore 1.4.1 +// +//func (dds *delayed) DiskUsage() (uint64, error) { +// dds.delay.Wait() +// return ds.DiskUsage(dds.ds) +//} From dbc46ae73434db9a5e4c45e7554b23ea737d8a22 Mon Sep 17 00:00:00 2001 From: Erin Swenson-Healey Date: Thu, 17 May 2018 08:16:33 -0700 Subject: [PATCH 24/39] add timings --- dht_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dht_test.go b/dht_test.go index c94ad36c2..33a8631ce 100644 --- a/dht_test.go +++ b/dht_test.go @@ -1400,6 +1400,7 @@ func TestDelayedConcurrentRequests(t *testing.T) { // PutValue everything into DHT 0 putValueWg := &sync.WaitGroup{} + now := time.Now() for i := 0; i < reqs; i++ { putValueWg.Add(1) @@ -1418,10 +1419,12 @@ func TestDelayedConcurrentRequests(t *testing.T) { } putValueWg.Wait() + fmt.Printf("PutValue calls completed in %s\n", time.Since(now)) // GetValue from each DHT and ensure that the key/values // are correct getValueWg := &sync.WaitGroup{} + now = time.Now() for i := 0; i < reqs; i++ { getValueWg.Add(1) @@ -1443,6 +1446,7 @@ func TestDelayedConcurrentRequests(t *testing.T) { } getValueWg.Wait() + fmt.Printf("GetValue calls completed in %s\n", time.Since(now)) } func TestGetSetPluggedProtocol(t *testing.T) { From ef3f144b4671b40d7c2ad79bb4397c00ef5ef803 Mon Sep 17 00:00:00 2001 From: Erin Swenson-Healey Date: Thu, 17 May 2018 08:21:26 -0700 Subject: [PATCH 25/39] add flag for opting in to slow tests --- dht_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/dht_test.go b/dht_test.go index 33a8631ce..d0faeaab7 100644 --- a/dht_test.go +++ b/dht_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "errors" + "flag" "fmt" "math/rand" "sort" @@ -33,6 +34,7 @@ import ( var testCaseValues = map[string][]byte{} var testCaseCids []*cid.Cid +var slowtests = flag.Bool("slowtests", false, "set to true to run the slower tests") func init() { for i := 0; i < 100; i++ { @@ -1374,6 +1376,11 @@ func TestConcurrentRequestsOverload(t *testing.T) { } func TestDelayedConcurrentRequests(t *testing.T) { + // must pass "-slowtests=true" flag to run + if !*slowtests { + t.SkipNow() + } + reqs := 30 ctx := context.Background() From fa1eee2af33a19da547a1a75b9d3b24170552e9d Mon Sep 17 00:00:00 2001 From: Erin Swenson-Healey Date: Thu, 17 May 2018 08:43:15 -0700 Subject: [PATCH 26/39] go fmt --- dht_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dht_test.go b/dht_test.go index d0faeaab7..b83ea62cf 100644 --- a/dht_test.go +++ b/dht_test.go @@ -17,6 +17,8 @@ import ( pb "github.com/libp2p/go-libp2p-kad-dht/pb" cid "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" delay "github.com/ipfs/go-ipfs-delay" u "github.com/ipfs/go-ipfs-util" "github.com/libp2p/go-libp2p-kad-dht/delayed" From 906180057c718d526924d7a3fd16964b0b7ab678 Mon Sep 17 00:00:00 2001 From: Erin Swenson-Healey Date: Thu, 17 May 2018 08:47:13 -0700 Subject: [PATCH 27/39] symbol rename to convey intent --- dht_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dht_test.go b/dht_test.go index b83ea62cf..030c54b49 100644 --- a/dht_test.go +++ b/dht_test.go @@ -1409,7 +1409,7 @@ func TestDelayedConcurrentRequests(t *testing.T) { // PutValue everything into DHT 0 putValueWg := &sync.WaitGroup{} - now := time.Now() + checkpoint := time.Now() for i := 0; i < reqs; i++ { putValueWg.Add(1) @@ -1428,12 +1428,12 @@ func TestDelayedConcurrentRequests(t *testing.T) { } putValueWg.Wait() - fmt.Printf("PutValue calls completed in %s\n", time.Since(now)) + fmt.Printf("PutValue calls completed in %s\n", time.Since(checkpoint)) // GetValue from each DHT and ensure that the key/values // are correct getValueWg := &sync.WaitGroup{} - now = time.Now() + checkpoint = time.Now() for i := 0; i < reqs; i++ { getValueWg.Add(1) @@ -1455,7 +1455,7 @@ func TestDelayedConcurrentRequests(t *testing.T) { } getValueWg.Wait() - fmt.Printf("GetValue calls completed in %s\n", time.Since(now)) + fmt.Printf("GetValue calls completed in %s\n", time.Since(checkpoint)) } func TestGetSetPluggedProtocol(t *testing.T) { From 169b2607cfef3f73900b09b67326e7695465b146 Mon Sep 17 00:00:00 2001 From: Erin Swenson-Healey Date: Thu, 17 May 2018 09:16:43 -0700 Subject: [PATCH 28/39] remove flag --- dht_test.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/dht_test.go b/dht_test.go index 030c54b49..33425cbd2 100644 --- a/dht_test.go +++ b/dht_test.go @@ -36,7 +36,6 @@ import ( var testCaseValues = map[string][]byte{} var testCaseCids []*cid.Cid -var slowtests = flag.Bool("slowtests", false, "set to true to run the slower tests") func init() { for i := 0; i < 100; i++ { @@ -1378,8 +1377,11 @@ func TestConcurrentRequestsOverload(t *testing.T) { } func TestDelayedConcurrentRequests(t *testing.T) { - // must pass "-slowtests=true" flag to run - if !*slowtests { + if ci.IsRunning() { + t.Skip("skip on CI - timing dependent") + } + + if testing.Short() { t.SkipNow() } From 93fe9f705ba9e44e905cd8b86f62cc9a24c2d4d6 Mon Sep 17 00:00:00 2001 From: Erin Swenson-Healey Date: Wed, 30 May 2018 09:56:51 -0700 Subject: [PATCH 29/39] update autobatch to 0.2.9 --- package.json | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/package.json b/package.json index 275ff7c3a..b4cb6e4e5 100644 --- a/package.json +++ b/package.json @@ -66,7 +66,14 @@ }, { "author": "whyrusleeping", - "hash": "QmRNhSdqzMcuRxX9A1egBeQ3BhDTguDV5HPwi8wRykkPU8", + + "hash": "QmYP96pJMo5Hzc3UfsBaarQX1RimsaL5gzb1DT1tC2DJRB", + "name": "autobatch", + "version": "0.2.10" + }, + { + "author": "whyrusleeping", + "hash": "QmPdxCaVp4jZ9RbxqZADvKH6kiCR5jHvdR5f2ycjAY6T2a", "name": "go-testutil", "version": "1.2.7" }, From 5d7748fd6987b0615523d2421fe17f9c38e84714 Mon Sep 17 00:00:00 2001 From: Erin Swenson-Healey Date: Wed, 30 May 2018 10:18:29 -0700 Subject: [PATCH 30/39] upgrade go-datastore to 2.2.0 --- delayed/delayed.go | 61 ---------------------------------------------- dht_test.go | 2 +- 2 files changed, 1 insertion(+), 62 deletions(-) delete mode 100644 delayed/delayed.go diff --git a/delayed/delayed.go b/delayed/delayed.go deleted file mode 100644 index f067460d8..000000000 --- a/delayed/delayed.go +++ /dev/null @@ -1,61 +0,0 @@ -// This package was copied from go-datastore 2.2.0, as -// go-libp2p-kad-dht is stuck using go-datastore 1.4.1 until -// its autobatch dependency is updated. -// -// see: https://github.com/whyrusleeping/autobatch/pull/8 - -// Package delayed wraps a datastore allowing to artificially -// delay all operations. -package delayed - -import ( - ds "github.com/ipfs/go-datastore" - dsq "github.com/ipfs/go-datastore/query" - delay "github.com/ipfs/go-ipfs-delay" -) - -// New returns a new delayed datastore. -func New(ds ds.Datastore, delay delay.D) ds.Datastore { - return &delayed{ds: ds, delay: delay} -} - -type delayed struct { - ds ds.Datastore - delay delay.D -} - -func (dds *delayed) Put(key ds.Key, value interface{}) (err error) { - dds.delay.Wait() - return dds.ds.Put(key, value) -} - -func (dds *delayed) Get(key ds.Key) (value interface{}, err error) { - dds.delay.Wait() - return dds.ds.Get(key) -} - -func (dds *delayed) Has(key ds.Key) (exists bool, err error) { - dds.delay.Wait() - return dds.ds.Has(key) -} - -func (dds *delayed) Delete(key ds.Key) (err error) { - dds.delay.Wait() - return dds.ds.Delete(key) -} - -func (dds *delayed) Query(q dsq.Query) (dsq.Results, error) { - dds.delay.Wait() - return dds.ds.Query(q) -} - -func (dds *delayed) Batch() (ds.Batch, error) { - return ds.NewBasicBatch(dds), nil -} - -// note: datastore.DiskUsage is not defined in go-datastore 1.4.1 -// -//func (dds *delayed) DiskUsage() (uint64, error) { -// dds.delay.Wait() -// return ds.DiskUsage(dds.ds) -//} diff --git a/dht_test.go b/dht_test.go index 33425cbd2..3b5a02162 100644 --- a/dht_test.go +++ b/dht_test.go @@ -18,10 +18,10 @@ import ( cid "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" + delayed "github.com/ipfs/go-datastore/delayed" dssync "github.com/ipfs/go-datastore/sync" delay "github.com/ipfs/go-ipfs-delay" u "github.com/ipfs/go-ipfs-util" - "github.com/libp2p/go-libp2p-kad-dht/delayed" kb "github.com/libp2p/go-libp2p-kbucket" peer "github.com/libp2p/go-libp2p-peer" pstore "github.com/libp2p/go-libp2p-peerstore" From 5df809178b24cd173ef597bcd3d097e040c72018 Mon Sep 17 00:00:00 2001 From: Erin Swenson-Healey Date: Thu, 31 May 2018 08:13:19 -0700 Subject: [PATCH 31/39] add delay --- dht_test.go | 13 ++++++------- records_test.go | 28 ++++++++++++++-------------- 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/dht_test.go b/dht_test.go index 3b5a02162..1a807449b 100644 --- a/dht_test.go +++ b/dht_test.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "errors" - "flag" "fmt" "math/rand" "sort" @@ -16,11 +15,11 @@ import ( opts "github.com/libp2p/go-libp2p-kad-dht/opts" pb "github.com/libp2p/go-libp2p-kad-dht/pb" - cid "github.com/ipfs/go-cid" + "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" - delayed "github.com/ipfs/go-datastore/delayed" + "github.com/ipfs/go-datastore/delayed" dssync "github.com/ipfs/go-datastore/sync" - delay "github.com/ipfs/go-ipfs-delay" + "github.com/ipfs/go-ipfs-delay" u "github.com/ipfs/go-ipfs-util" kb "github.com/libp2p/go-libp2p-kbucket" peer "github.com/libp2p/go-libp2p-peer" @@ -29,7 +28,7 @@ import ( routing "github.com/libp2p/go-libp2p-routing" swarmt "github.com/libp2p/go-libp2p-swarm/testing" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" - ci "github.com/libp2p/go-testutil/ci" + "github.com/libp2p/go-testutil/ci" travisci "github.com/libp2p/go-testutil/ci/travis" ma "github.com/multiformats/go-multiaddr" ) @@ -264,8 +263,8 @@ func TestValueSetInvalid(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dhtA := setupDHT(ctx, t, false) - dhtB := setupDHT(ctx, t, false) + dhtA := setupDHT(ctx, t, false, 0) + dhtB := setupDHT(ctx, t, false, 0) defer dhtA.Close() defer dhtB.Close() diff --git a/records_test.go b/records_test.go index 40dd8a2df..de57d0dab 100644 --- a/records_test.go +++ b/records_test.go @@ -16,7 +16,7 @@ import ( // Check that GetPublicKey() correctly extracts a public key func TestPubkeyExtract(t *testing.T) { ctx := context.Background() - dht := setupDHT(ctx, t, false) + dht := setupDHT(ctx, t, false, 0) defer dht.Close() _, pk, err := ci.GenerateEd25519Key(rand.Reader) @@ -42,7 +42,7 @@ func TestPubkeyExtract(t *testing.T) { // Check that GetPublicKey() correctly retrieves a public key from the peerstore func TestPubkeyPeerstore(t *testing.T) { ctx := context.Background() - dht := setupDHT(ctx, t, false) + dht := setupDHT(ctx, t, false, 0) r := u.NewSeededRand(15) // generate deterministic keypair _, pubk, err := ci.GenerateKeyPairWithReader(ci.RSA, 1024, r) @@ -73,8 +73,8 @@ func TestPubkeyPeerstore(t *testing.T) { func TestPubkeyDirectFromNode(t *testing.T) { ctx := context.Background() - dhtA := setupDHT(ctx, t, false) - dhtB := setupDHT(ctx, t, false) + dhtA := setupDHT(ctx, t, false, 0) + dhtB := setupDHT(ctx, t, false, 0) defer dhtA.Close() defer dhtB.Close() @@ -103,8 +103,8 @@ func TestPubkeyDirectFromNode(t *testing.T) { func TestPubkeyFromDHT(t *testing.T) { ctx := context.Background() - dhtA := setupDHT(ctx, t, false) - dhtB := setupDHT(ctx, t, false) + dhtA := setupDHT(ctx, t, false, 0) + dhtB := setupDHT(ctx, t, false, 0) defer dhtA.Close() defer dhtB.Close() @@ -150,8 +150,8 @@ func TestPubkeyFromDHT(t *testing.T) { func TestPubkeyNotFound(t *testing.T) { ctx := context.Background() - dhtA := setupDHT(ctx, t, false) - dhtB := setupDHT(ctx, t, false) + dhtA := setupDHT(ctx, t, false, 0) + dhtB := setupDHT(ctx, t, false, 0) defer dhtA.Close() defer dhtB.Close() @@ -182,8 +182,8 @@ func TestPubkeyNotFound(t *testing.T) { func TestPubkeyBadKeyFromDHT(t *testing.T) { ctx := context.Background() - dhtA := setupDHT(ctx, t, false) - dhtB := setupDHT(ctx, t, false) + dhtA := setupDHT(ctx, t, false, 0) + dhtB := setupDHT(ctx, t, false, 0) defer dhtA.Close() defer dhtB.Close() @@ -236,8 +236,8 @@ func TestPubkeyBadKeyFromDHT(t *testing.T) { func TestPubkeyBadKeyFromDHTGoodKeyDirect(t *testing.T) { ctx := context.Background() - dhtA := setupDHT(ctx, t, false) - dhtB := setupDHT(ctx, t, false) + dhtA := setupDHT(ctx, t, false, 0) + dhtB := setupDHT(ctx, t, false, 0) defer dhtA.Close() defer dhtB.Close() @@ -291,8 +291,8 @@ func TestPubkeyBadKeyFromDHTGoodKeyDirect(t *testing.T) { func TestPubkeyGoodKeyFromDHTGoodKeyDirect(t *testing.T) { ctx := context.Background() - dhtA := setupDHT(ctx, t, false) - dhtB := setupDHT(ctx, t, false) + dhtA := setupDHT(ctx, t, false, 0) + dhtB := setupDHT(ctx, t, false, 0) defer dhtA.Close() defer dhtB.Close() From c40fd0e361706edc4c619021d3661e505c80a224 Mon Sep 17 00:00:00 2001 From: Erin Swenson-Healey Date: Thu, 31 May 2018 10:22:08 -0700 Subject: [PATCH 32/39] use autobatch from go-datastore --- providers/providers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/providers.go b/providers/providers.go index 14690409d..88369f21e 100644 --- a/providers/providers.go +++ b/providers/providers.go @@ -10,7 +10,7 @@ import ( lru "github.com/hashicorp/golang-lru" cid "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" - autobatch "github.com/ipfs/go-datastore/autobatch" + "github.com/ipfs/go-datastore/autobatch" dsq "github.com/ipfs/go-datastore/query" logging "github.com/ipfs/go-log" goprocess "github.com/jbenet/goprocess" From 79de04c571d047501ed44defc11596e677cad577 Mon Sep 17 00:00:00 2001 From: Erin Swenson-Healey Date: Thu, 31 May 2018 10:24:16 -0700 Subject: [PATCH 33/39] increase timeout threshhold to accommodate new PutValue perf --- dht_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dht_test.go b/dht_test.go index 1a807449b..8239aa1ef 100644 --- a/dht_test.go +++ b/dht_test.go @@ -1405,7 +1405,7 @@ func TestDelayedConcurrentRequests(t *testing.T) { // ensure that our PutValue and GetValue calls complete // within 30 seconds - ctxT, cancel := context.WithTimeout(ctx, 30*time.Second) + ctxT, cancel := context.WithTimeout(ctx, 45*time.Second) defer cancel() // PutValue everything into DHT 0 From dda877ca1d62338ea77dcbba07b89e5ae31056aa Mon Sep 17 00:00:00 2001 From: Erin Swenson-Healey Date: Mon, 4 Jun 2018 14:31:28 -0700 Subject: [PATCH 34/39] initialize stream with DHT protocols as per new API post-rebase --- dht_net.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dht_net.go b/dht_net.go index 135993257..87799110f 100644 --- a/dht_net.go +++ b/dht_net.go @@ -421,7 +421,7 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb } func (ms *messageSender) sendMessageSingle(ctx context.Context, pmes *pb.Message) error { - s, err := ms.dht.host.NewStream(ctx, ms.p, ProtocolDHT, ProtocolDHTOld) + s, err := ms.dht.host.NewStream(ctx, ms.p, ms.dht.protocols...) if err != nil { return err } @@ -438,7 +438,7 @@ func (ms *messageSender) sendMessageSingle(ctx context.Context, pmes *pb.Message } func (ms *messageSender) sendRequestSingle(ctx context.Context, pmes *pb.Message) (*pb.Message, error) { - s, err := ms.dht.host.NewStream(ctx, ms.p, ProtocolDHT, ProtocolDHTOld) + s, err := ms.dht.host.NewStream(ctx, ms.p, ms.dht.protocols...) if err != nil { return nil, err } From cb5284f37b98b7f93423d5b40d7202101f170542 Mon Sep 17 00:00:00 2001 From: Erin Swenson-Healey Date: Tue, 5 Jun 2018 14:56:53 -0700 Subject: [PATCH 35/39] replace delayed datastore with mocknet + latency --- dht_test.go | 116 +++++++++++++++++++++++++++++------------------- notify_test.go | 8 ++-- records_test.go | 28 ++++++------ 3 files changed, 89 insertions(+), 63 deletions(-) diff --git a/dht_test.go b/dht_test.go index 8239aa1ef..9cbb8184f 100644 --- a/dht_test.go +++ b/dht_test.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "math" "math/rand" "sort" "strings" @@ -12,15 +13,13 @@ import ( "testing" "time" - opts "github.com/libp2p/go-libp2p-kad-dht/opts" - pb "github.com/libp2p/go-libp2p-kad-dht/pb" - "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/delayed" dssync "github.com/ipfs/go-datastore/sync" - "github.com/ipfs/go-ipfs-delay" u "github.com/ipfs/go-ipfs-util" + "github.com/libp2p/go-libp2p-host" + opts "github.com/libp2p/go-libp2p-kad-dht/opts" + pb "github.com/libp2p/go-libp2p-kad-dht/pb" kb "github.com/libp2p/go-libp2p-kbucket" peer "github.com/libp2p/go-libp2p-peer" pstore "github.com/libp2p/go-libp2p-peerstore" @@ -28,6 +27,7 @@ import ( routing "github.com/libp2p/go-libp2p-routing" swarmt "github.com/libp2p/go-libp2p-swarm/testing" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" + "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/libp2p/go-testutil/ci" travisci "github.com/libp2p/go-testutil/ci/travis" ma "github.com/multiformats/go-multiaddr" @@ -76,16 +76,15 @@ func (testValidator) Validate(_ string, b []byte) error { return nil } -func setupDHT(ctx context.Context, t *testing.T, client bool, maxDelay time.Duration) *IpfsDHT { - var dss ds.Batching = dssync.MutexWrap(ds.NewMapDatastore()) - if maxDelay != 0 { - dss = delayed.New(dss, delay.VariableUniform(maxDelay/2, maxDelay/2, nil)).(ds.Batching) - } +func setupDHTWithSwarm(ctx context.Context, t *testing.T, client bool) *IpfsDHT { + return setupDHT(ctx, t, client, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))) +} +func setupDHT(ctx context.Context, t *testing.T, client bool, host host.Host) *IpfsDHT { d, err := New( ctx, - bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), - opts.Datastore(dss), + host, + opts.Datastore(dssync.MutexWrap(ds.NewMapDatastore())), opts.Client(client), opts.NamespacedValidator("v", blankValidator{}), ) @@ -96,7 +95,7 @@ func setupDHT(ctx context.Context, t *testing.T, client bool, maxDelay time.Dura return d } -func setupDHTS(ctx context.Context, n int, t *testing.T, maxDelay time.Duration) ([]ma.Multiaddr, []peer.ID, []*IpfsDHT) { +func setupDHTsWithMocknet(ctx context.Context, n int, t *testing.T, latency time.Duration) ([]ma.Multiaddr, []peer.ID, []*IpfsDHT) { addrs := make([]ma.Multiaddr, n) dhts := make([]*IpfsDHT, n) peers := make([]peer.ID, n) @@ -104,8 +103,18 @@ func setupDHTS(ctx context.Context, n int, t *testing.T, maxDelay time.Duration) sanityAddrsMap := make(map[string]struct{}) sanityPeersMap := make(map[string]struct{}) + mn := mocknet.New(ctx) + mn.SetLinkDefaults(mocknet.LinkOptions{ + Latency: latency, + }) + for i := 0; i < n; i++ { - dhts[i] = setupDHT(ctx, t, false, maxDelay) + h, err := mn.GenPeer() + if err != nil { + t.Fatal(err) + } + + dhts[i] = setupDHT(ctx, t, false, h) peers[i] = dhts[i].self addrs[i] = dhts[i].peerstore.Addrs(dhts[i].self)[0] @@ -121,6 +130,8 @@ func setupDHTS(ctx context.Context, n int, t *testing.T, maxDelay time.Duration) } } + mn.LinkAll() + return addrs, peers, dhts } @@ -191,7 +202,7 @@ func TestValueGetSet(t *testing.T) { var dhts [5]*IpfsDHT for i := range dhts { - dhts[i] = setupDHT(ctx, t, false) + dhts[i] = setupDHTWithSwarm(ctx, t, false) defer dhts[i].Close() defer dhts[i].host.Close() } @@ -263,8 +274,8 @@ func TestValueSetInvalid(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dhtA := setupDHT(ctx, t, false, 0) - dhtB := setupDHT(ctx, t, false, 0) + dhtA := setupDHTWithSwarm(ctx, t, false) + dhtB := setupDHTWithSwarm(ctx, t, false) defer dhtA.Close() defer dhtB.Close() @@ -316,8 +327,8 @@ func TestSearchValue(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dhtA := setupDHT(ctx, t, false) - dhtB := setupDHT(ctx, t, false) + dhtA := setupDHTWithSwarm(ctx, t, false) + dhtB := setupDHTWithSwarm(ctx, t, false) defer dhtA.Close() defer dhtB.Close() @@ -467,7 +478,7 @@ func TestInvalidMessageSenderTracking(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dht := setupDHT(ctx, t, false, 0) + dht := setupDHTWithSwarm(ctx, t, false) defer dht.Close() foo := peer.ID("asdasd") @@ -490,7 +501,7 @@ func TestProvides(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, _, dhts := setupDHTS(ctx, 4, t, 0) + _, _, dhts := setupDHTsWithMocknet(ctx, 4, t, 0) defer func() { for i := 0; i < 4; i++ { dhts[i].Close() @@ -540,7 +551,7 @@ func TestLocalProvides(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, _, dhts := setupDHTS(ctx, 4, t, 0) + _, _, dhts := setupDHTsWithMocknet(ctx, 4, t, 0) defer func() { for i := 0; i < 4; i++ { dhts[i].Close() @@ -628,7 +639,7 @@ func TestBootstrap(t *testing.T) { defer cancel() nDHTs := 30 - _, _, dhts := setupDHTS(ctx, nDHTs, t, 0) + _, _, dhts := setupDHTsWithMocknet(ctx, nDHTs, t, 0) defer func() { for i := 0; i < nDHTs; i++ { dhts[i].Close() @@ -682,7 +693,7 @@ func TestPeriodicBootstrap(t *testing.T) { defer cancel() nDHTs := 30 - _, _, dhts := setupDHTS(ctx, nDHTs, t, 0) + _, _, dhts := setupDHTsWithMocknet(ctx, nDHTs, t, 0) defer func() { for i := 0; i < nDHTs; i++ { dhts[i].Close() @@ -753,7 +764,7 @@ func TestProvidesMany(t *testing.T) { defer cancel() nDHTs := 40 - _, _, dhts := setupDHTS(ctx, nDHTs, t, 0) + _, _, dhts := setupDHTsWithMocknet(ctx, nDHTs, t, 0) defer func() { for i := 0; i < nDHTs; i++ { dhts[i].Close() @@ -854,7 +865,7 @@ func TestProvidesAsync(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, _, dhts := setupDHTS(ctx, 4, t, 0) + _, _, dhts := setupDHTsWithMocknet(ctx, 4, t, 0) defer func() { for i := 0; i < 4; i++ { dhts[i].Close() @@ -896,7 +907,7 @@ func TestLayeredGet(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, _, dhts := setupDHTS(ctx, 4, t, 0) + _, _, dhts := setupDHTsWithMocknet(ctx, 4, t, 0) defer func() { for i := 0; i < 4; i++ { dhts[i].Close() @@ -936,7 +947,7 @@ func TestFindPeer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, peers, dhts := setupDHTS(ctx, 4, t, 0) + _, peers, dhts := setupDHTsWithMocknet(ctx, 4, t, 0) defer func() { for i := 0; i < 4; i++ { dhts[i].Close() @@ -974,7 +985,7 @@ func TestFindPeersConnectedToPeer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - _, peers, dhts := setupDHTS(ctx, 4, t, 0) + _, peers, dhts := setupDHTsWithMocknet(ctx, 4, t, 0) defer func() { for i := 0; i < 4; i++ { dhts[i].Close() @@ -1062,8 +1073,8 @@ func TestConnectCollision(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - dhtA := setupDHT(ctx, t, false, 0) - dhtB := setupDHT(ctx, t, false, 0) + dhtA := setupDHTWithSwarm(ctx, t, false) + dhtB := setupDHTWithSwarm(ctx, t, false) addrA := dhtA.peerstore.Addrs(dhtA.self)[0] addrB := dhtB.peerstore.Addrs(dhtB.self)[0] @@ -1115,7 +1126,7 @@ func TestBadProtoMessages(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - d := setupDHT(ctx, t, false, 0) + d := setupDHTWithSwarm(ctx, t, false) nilrec := new(pb.Message) if _, err := d.handlePutValue(ctx, "testpeer", nilrec); err == nil { @@ -1127,8 +1138,8 @@ func TestClientModeConnect(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - a := setupDHT(ctx, t, false, 0) - b := setupDHT(ctx, t, true, 0) + a := setupDHTWithSwarm(ctx, t, false) + b := setupDHTWithSwarm(ctx, t, true) connectNoSync(t, ctx, a, b) @@ -1191,7 +1202,7 @@ func TestFindPeerQuery(t *testing.T) { defer cancel() nDHTs := 101 - _, allpeers, dhts := setupDHTS(ctx, nDHTs, t, 0) + _, allpeers, dhts := setupDHTsWithMocknet(ctx, nDHTs, t, 0) defer func() { for i := 0; i < nDHTs; i++ { dhts[i].Close() @@ -1288,7 +1299,7 @@ func TestFindClosestPeers(t *testing.T) { defer cancel() nDHTs := 30 - _, _, dhts := setupDHTS(ctx, nDHTs, t, 0) + _, _, dhts := setupDHTsWithMocknet(ctx, nDHTs, t, 0) defer func() { for i := 0; i < nDHTs; i++ { dhts[i].Close() @@ -1319,7 +1330,7 @@ func TestFindClosestPeers(t *testing.T) { func testConcurrentRequests(t *testing.T, reqs int, maxDelay time.Duration) { ctx := context.Background() - _, peers, dhts := setupDHTS(ctx, 10, t, maxDelay) + _, peers, dhts := setupDHTsWithMocknet(ctx, 10, t, maxDelay) defer func() { for i := 0; i < 10; i++ { dhts[i].Close() @@ -1384,12 +1395,11 @@ func TestDelayedConcurrentRequests(t *testing.T) { t.SkipNow() } - reqs := 30 + reqs := 50 + latency := time.Millisecond * 100 ctx := context.Background() - // create reqs-number of DHTs whose datastore operations - // are delayed (see delayed.go) by, at maximum, 800ms - _, _, dhts := setupDHTS(ctx, reqs, t, 800*time.Millisecond) + _, _, dhts := setupDHTsWithMocknet(ctx, reqs, t, latency) defer func() { for i := 0; i < reqs; i++ { dhts[i].Close() @@ -1404,8 +1414,8 @@ func TestDelayedConcurrentRequests(t *testing.T) { } // ensure that our PutValue and GetValue calls complete - // within 30 seconds - ctxT, cancel := context.WithTimeout(ctx, 45*time.Second) + // within 15 seconds + ctxT, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() // PutValue everything into DHT 0 @@ -1429,7 +1439,10 @@ func TestDelayedConcurrentRequests(t *testing.T) { } putValueWg.Wait() - fmt.Printf("PutValue calls completed in %s\n", time.Since(checkpoint)) + elapsed := time.Since(checkpoint) + + mustBeWithin(t, elapsed, time.Duration(time.Second*7), time.Second*3) + fmt.Printf("PutValue calls completed in %s\n", elapsed) // GetValue from each DHT and ensure that the key/values // are correct @@ -1456,7 +1469,10 @@ func TestDelayedConcurrentRequests(t *testing.T) { } getValueWg.Wait() - fmt.Printf("GetValue calls completed in %s\n", time.Since(checkpoint)) + elapsed = time.Since(checkpoint) + + mustBeWithin(t, elapsed, time.Duration(time.Second*3), time.Second*2) + fmt.Printf("GetValue calls completed in %s\n", elapsed) } func TestGetSetPluggedProtocol(t *testing.T) { @@ -1538,3 +1554,13 @@ func TestGetSetPluggedProtocol(t *testing.T) { } }) } + +func within(actual time.Duration, expected time.Duration, tolerance time.Duration) bool { + return math.Abs(float64(actual)-float64(expected)) < float64(tolerance) +} + +func mustBeWithin(t *testing.T, actual time.Duration, expected time.Duration, tolerance time.Duration) { + if !within(actual, expected, tolerance) { + t.Fatalf("should have been within %s of %s, was %s\n", tolerance.String(), expected.String(), actual.String()) + } +} diff --git a/notify_test.go b/notify_test.go index 5f8c277f7..a5a488ddb 100644 --- a/notify_test.go +++ b/notify_test.go @@ -13,8 +13,8 @@ func TestNotifieeMultipleConn(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - d1 := setupDHT(ctx, t, false, 0) - d2 := setupDHT(ctx, t, false, 0) + d1 := setupDHTWithSwarm(ctx, t, false) + d2 := setupDHTWithSwarm(ctx, t, false) nn1 := (*netNotifiee)(d1) nn2 := (*netNotifiee)(d2) @@ -56,8 +56,8 @@ func TestNotifieeFuzz(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) defer cancel() - d1 := setupDHT(ctx, t, false, 0) - d2 := setupDHT(ctx, t, false, 0) + d1 := setupDHTWithSwarm(ctx, t, false) + d2 := setupDHTWithSwarm(ctx, t, false) for i := 0; i < 100; i++ { connectNoSync(t, ctx, d1, d2) diff --git a/records_test.go b/records_test.go index de57d0dab..c7ead7d6f 100644 --- a/records_test.go +++ b/records_test.go @@ -16,7 +16,7 @@ import ( // Check that GetPublicKey() correctly extracts a public key func TestPubkeyExtract(t *testing.T) { ctx := context.Background() - dht := setupDHT(ctx, t, false, 0) + dht := setupDHTWithSwarm(ctx, t, false) defer dht.Close() _, pk, err := ci.GenerateEd25519Key(rand.Reader) @@ -42,7 +42,7 @@ func TestPubkeyExtract(t *testing.T) { // Check that GetPublicKey() correctly retrieves a public key from the peerstore func TestPubkeyPeerstore(t *testing.T) { ctx := context.Background() - dht := setupDHT(ctx, t, false, 0) + dht := setupDHTWithSwarm(ctx, t, false) r := u.NewSeededRand(15) // generate deterministic keypair _, pubk, err := ci.GenerateKeyPairWithReader(ci.RSA, 1024, r) @@ -73,8 +73,8 @@ func TestPubkeyPeerstore(t *testing.T) { func TestPubkeyDirectFromNode(t *testing.T) { ctx := context.Background() - dhtA := setupDHT(ctx, t, false, 0) - dhtB := setupDHT(ctx, t, false, 0) + dhtA := setupDHTWithSwarm(ctx, t, false) + dhtB := setupDHTWithSwarm(ctx, t, false) defer dhtA.Close() defer dhtB.Close() @@ -103,8 +103,8 @@ func TestPubkeyDirectFromNode(t *testing.T) { func TestPubkeyFromDHT(t *testing.T) { ctx := context.Background() - dhtA := setupDHT(ctx, t, false, 0) - dhtB := setupDHT(ctx, t, false, 0) + dhtA := setupDHTWithSwarm(ctx, t, false) + dhtB := setupDHTWithSwarm(ctx, t, false) defer dhtA.Close() defer dhtB.Close() @@ -150,8 +150,8 @@ func TestPubkeyFromDHT(t *testing.T) { func TestPubkeyNotFound(t *testing.T) { ctx := context.Background() - dhtA := setupDHT(ctx, t, false, 0) - dhtB := setupDHT(ctx, t, false, 0) + dhtA := setupDHTWithSwarm(ctx, t, false) + dhtB := setupDHTWithSwarm(ctx, t, false) defer dhtA.Close() defer dhtB.Close() @@ -182,8 +182,8 @@ func TestPubkeyNotFound(t *testing.T) { func TestPubkeyBadKeyFromDHT(t *testing.T) { ctx := context.Background() - dhtA := setupDHT(ctx, t, false, 0) - dhtB := setupDHT(ctx, t, false, 0) + dhtA := setupDHTWithSwarm(ctx, t, false) + dhtB := setupDHTWithSwarm(ctx, t, false) defer dhtA.Close() defer dhtB.Close() @@ -236,8 +236,8 @@ func TestPubkeyBadKeyFromDHT(t *testing.T) { func TestPubkeyBadKeyFromDHTGoodKeyDirect(t *testing.T) { ctx := context.Background() - dhtA := setupDHT(ctx, t, false, 0) - dhtB := setupDHT(ctx, t, false, 0) + dhtA := setupDHTWithSwarm(ctx, t, false) + dhtB := setupDHTWithSwarm(ctx, t, false) defer dhtA.Close() defer dhtB.Close() @@ -291,8 +291,8 @@ func TestPubkeyBadKeyFromDHTGoodKeyDirect(t *testing.T) { func TestPubkeyGoodKeyFromDHTGoodKeyDirect(t *testing.T) { ctx := context.Background() - dhtA := setupDHT(ctx, t, false, 0) - dhtB := setupDHT(ctx, t, false, 0) + dhtA := setupDHTWithSwarm(ctx, t, false) + dhtB := setupDHTWithSwarm(ctx, t, false) defer dhtA.Close() defer dhtB.Close() From 314c5db0337e74957f20e0fc28c118609c7567d9 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 20 Jun 2018 14:22:55 -0700 Subject: [PATCH 36/39] remove unused gx deps --- package.json | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/package.json b/package.json index b4cb6e4e5..bb1ded67f 100644 --- a/package.json +++ b/package.json @@ -64,13 +64,6 @@ "name": "base32", "version": "0.0.2" }, - { - "author": "whyrusleeping", - - "hash": "QmYP96pJMo5Hzc3UfsBaarQX1RimsaL5gzb1DT1tC2DJRB", - "name": "autobatch", - "version": "0.2.10" - }, { "author": "whyrusleeping", "hash": "QmPdxCaVp4jZ9RbxqZADvKH6kiCR5jHvdR5f2ycjAY6T2a", @@ -165,6 +158,7 @@ "author": "whyrusleeping", "hash": "Qmcc5CPuKyfDZNmqXNkk6j23CyZqZGypUv952NLHYGbeni", "name": "go-libp2p-swarm", +<<<<<<< HEAD "version": "3.0.10" }, { @@ -172,6 +166,9 @@ "hash": "QmRJVNatYJwTAHgdSM1Xef9QVQ1Ch3XHdmcrykjP5Y4soL", "name": "go-ipfs-delay", "version": "0.0.1" +======= + "version": "3.0.2" +>>>>>>> remove unused gx deps } ], "gxVersion": "0.4.0", From 7578cd86ffe7284c93342dca11ff465ce7d22c64 Mon Sep 17 00:00:00 2001 From: Cole Brown Date: Tue, 21 Aug 2018 13:16:48 -0400 Subject: [PATCH 37/39] Briefly comment singleMes --- dht_net.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dht_net.go b/dht_net.go index 87799110f..507778a04 100644 --- a/dht_net.go +++ b/dht_net.go @@ -197,7 +197,10 @@ type messageSender struct { p peer.ID dht *IpfsDHT - invalid bool + invalid bool + // singleMes tracks the number of times a message or request has failed to + // send via this messageSender, triggering a stream reset if its limit is + // reached. singleMes int } From fc3cb0feb9858c0566f031646986d57d2a412b96 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 6 Sep 2018 12:16:44 +0300 Subject: [PATCH 38/39] fix broken package.json rebase artifact... --- package.json | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/package.json b/package.json index bb1ded67f..fd2b3f2d9 100644 --- a/package.json +++ b/package.json @@ -158,17 +158,7 @@ "author": "whyrusleeping", "hash": "Qmcc5CPuKyfDZNmqXNkk6j23CyZqZGypUv952NLHYGbeni", "name": "go-libp2p-swarm", -<<<<<<< HEAD "version": "3.0.10" - }, - { - "author": "hsanjuan", - "hash": "QmRJVNatYJwTAHgdSM1Xef9QVQ1Ch3XHdmcrykjP5Y4soL", - "name": "go-ipfs-delay", - "version": "0.0.1" -======= - "version": "3.0.2" ->>>>>>> remove unused gx deps } ], "gxVersion": "0.4.0", From 4737c8ab9c702a79c8e72d1c3aa09c218124754a Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 6 Sep 2018 12:24:08 +0300 Subject: [PATCH 39/39] fix broken test --- dht_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/dht_test.go b/dht_test.go index 9cbb8184f..75174ce3f 100644 --- a/dht_test.go +++ b/dht_test.go @@ -383,8 +383,8 @@ func TestGetValues(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dhtA := setupDHT(ctx, t, false) - dhtB := setupDHT(ctx, t, false) + dhtA := setupDHTWithSwarm(ctx, t, false) + dhtB := setupDHTWithSwarm(ctx, t, false) defer dhtA.Close() defer dhtB.Close() @@ -431,8 +431,8 @@ func TestValueGetInvalid(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dhtA := setupDHT(ctx, t, false) - dhtB := setupDHT(ctx, t, false) + dhtA := setupDHTWithSwarm(ctx, t, false) + dhtB := setupDHTWithSwarm(ctx, t, false) defer dhtA.Close() defer dhtB.Close() @@ -1172,9 +1172,9 @@ func TestClientModeFindPeer(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - a := setupDHT(ctx, t, false) - b := setupDHT(ctx, t, true) - c := setupDHT(ctx, t, true) + a := setupDHTWithSwarm(ctx, t, false) + b := setupDHTWithSwarm(ctx, t, true) + c := setupDHTWithSwarm(ctx, t, true) connectNoSync(t, ctx, b, a) connectNoSync(t, ctx, c, a)