-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DHT Request pipelining #92
Changes from all commits
33d9ca1
db09dc6
cb20a6f
9f72b80
0a07fc9
b686575
d367bf9
9b38b31
b2c23ed
0490df7
e19692d
298641a
995da89
67c0356
d41c165
bc50216
bab37ad
a562c8f
a5c987e
e553dac
a6343b6
8d99629
b126c52
dbc46ae
ef3f144
fa1eee2
9061800
169b260
93fe9f7
5d7748f
5df8091
c40fd0e
79de04c
dda877c
cb5284f
314c5db
7578cd8
fc3cb0f
4737c8a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ package dht | |
import ( | ||
"bufio" | ||
"context" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"sync" | ||
|
@@ -188,26 +189,34 @@ func (dht *IpfsDHT) messageSenderForPeer(p peer.ID) (*messageSender, error) { | |
} | ||
|
||
type messageSender struct { | ||
s inet.Stream | ||
r ggio.ReadCloser | ||
w bufferedWriteCloser | ||
lk sync.Mutex | ||
p peer.ID | ||
dht *IpfsDHT | ||
|
||
invalid bool | ||
s inet.Stream | ||
w ggio.WriteCloser | ||
rch chan chan requestResult | ||
rctl chan struct{} | ||
lk sync.Mutex | ||
p peer.ID | ||
dht *IpfsDHT | ||
|
||
invalid bool | ||
// singleMes tracks the number of times a message or request has failed to | ||
// send via this messageSender, triggering a stream reset if its limit is | ||
// reached. | ||
singleMes int | ||
} | ||
|
||
type requestResult struct { | ||
mes *pb.Message | ||
err error | ||
} | ||
|
||
const requestResultBuffer = 64 | ||
|
||
// invalidate is called before this messageSender is removed from the strmap. | ||
// It prevents the messageSender from being reused/reinitialized and then | ||
// forgotten (leaving the stream open). | ||
func (ms *messageSender) invalidate() { | ||
ms.invalid = true | ||
if ms.s != nil { | ||
ms.s.Reset() | ||
ms.s = nil | ||
} | ||
ms.reset() | ||
} | ||
|
||
func (ms *messageSender) prepOrInvalidate() error { | ||
|
@@ -224,6 +233,7 @@ func (ms *messageSender) prep() error { | |
if ms.invalid { | ||
return fmt.Errorf("message sender has been invalidated") | ||
} | ||
|
||
if ms.s != nil { | ||
return nil | ||
} | ||
|
@@ -233,33 +243,61 @@ func (ms *messageSender) prep() error { | |
return err | ||
} | ||
|
||
ms.r = ggio.NewDelimitedReader(nstr, inet.MessageSizeMax) | ||
ms.w = newBufferedDelimitedWriter(nstr) | ||
r := ggio.NewDelimitedReader(nstr, inet.MessageSizeMax) | ||
rch := make(chan chan requestResult, requestResultBuffer) | ||
rctl := make(chan struct{}, 1) | ||
go ms.messageReceiver(rch, rctl, r) | ||
|
||
ms.rch = rch | ||
ms.rctl = rctl | ||
ms.w = ggio.NewDelimitedWriter(nstr) | ||
ms.s = nstr | ||
|
||
return nil | ||
} | ||
|
||
// Resets the stream and shuts down the goroutine pump | ||
// Mutex must be locked. | ||
func (ms *messageSender) reset() { | ||
if ms.s != nil { | ||
close(ms.rch) | ||
ms.s.Reset() | ||
ms.s = nil | ||
} | ||
} | ||
|
||
func (ms *messageSender) resetStream(s inet.Stream) { | ||
if ms.s == s { | ||
ms.reset() | ||
} | ||
} | ||
|
||
// streamReuseTries is the number of times we will try to reuse a stream to a | ||
// given peer before giving up and reverting to the old one-message-per-stream | ||
// behaviour. | ||
const streamReuseTries = 3 | ||
|
||
func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) error { | ||
defer log.EventBegin(ctx, "SendMessage", ms.dht.self, ms.p, pmes).Done() | ||
ms.lk.Lock() | ||
defer ms.lk.Unlock() | ||
retry := false | ||
for { | ||
if ms.singleMes > streamReuseTries { | ||
ms.lk.Unlock() | ||
return ms.sendMessageSingle(ctx, pmes) | ||
} | ||
|
||
if err := ms.prep(); err != nil { | ||
ms.lk.Unlock() | ||
return err | ||
} | ||
|
||
if err := ms.writeMsg(pmes); err != nil { | ||
ms.s.Reset() | ||
ms.s = nil | ||
if err := ms.w.WriteMsg(pmes); err != nil { | ||
ms.reset() | ||
|
||
if retry { | ||
log.Info("error writing message, bailing: ", err) | ||
ms.lk.Unlock() | ||
return err | ||
} else { | ||
log.Info("error writing message, trying again: ", err) | ||
|
@@ -268,31 +306,37 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro | |
} | ||
} | ||
|
||
log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes) | ||
|
||
if ms.singleMes > streamReuseTries { | ||
go inet.FullClose(ms.s) | ||
ms.s = nil | ||
} else if retry { | ||
if retry { | ||
ms.singleMes++ | ||
if ms.singleMes > streamReuseTries { | ||
ms.reset() | ||
} | ||
} | ||
|
||
ms.lk.Unlock() | ||
return nil | ||
} | ||
} | ||
|
||
func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb.Message, error) { | ||
ms.lk.Lock() | ||
defer ms.lk.Unlock() | ||
defer log.EventBegin(ctx, "SendRequest", ms.dht.self, ms.p, pmes).Done() | ||
retry := false | ||
for { | ||
ms.lk.Lock() | ||
|
||
if ms.singleMes > streamReuseTries { | ||
ms.lk.Unlock() | ||
return ms.sendRequestSingle(ctx, pmes) | ||
} | ||
|
||
if err := ms.prep(); err != nil { | ||
ms.lk.Unlock() | ||
return nil, err | ||
} | ||
|
||
if err := ms.writeMsg(pmes); err != nil { | ||
ms.s.Reset() | ||
ms.s = nil | ||
if err := ms.w.WriteMsg(pmes); err != nil { | ||
ms.reset() | ||
ms.lk.Unlock() | ||
|
||
if retry { | ||
log.Info("error writing message, bailing: ", err) | ||
|
@@ -304,56 +348,189 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb | |
} | ||
} | ||
|
||
mes := new(pb.Message) | ||
if err := ms.ctxReadMsg(ctx, mes); err != nil { | ||
ms.s.Reset() | ||
ms.s = nil | ||
resch := make(chan requestResult, 1) | ||
select { | ||
case ms.rch <- resch: | ||
default: | ||
// pipeline stall, log it and time it | ||
evt := log.EventBegin(ctx, "SendRequestStall", ms.dht.self, ms.p, pmes) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this could use the updated |
||
select { | ||
case ms.rch <- resch: | ||
evt.Done() | ||
case <-ctx.Done(): | ||
evt.Done() | ||
ms.lk.Unlock() | ||
return nil, ctx.Err() | ||
case <-ms.dht.ctx.Done(): | ||
evt.Done() | ||
ms.lk.Unlock() | ||
return nil, ms.dht.ctx.Err() | ||
} | ||
} | ||
|
||
rctl := ms.rctl | ||
s := ms.s | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
wasn't considering the threaded context |
||
|
||
ms.lk.Unlock() | ||
|
||
rctx, cancel := context.WithTimeout(ctx, dhtReadMessageTimeout) | ||
defer cancel() | ||
|
||
var res requestResult | ||
select { | ||
case res = <-resch: | ||
|
||
case <-rctx.Done(): | ||
// A read timeout will cause the entire pipeline to time out. | ||
// So signal for a stream reset to avoid clogging subsequent requests. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason to not just throw away the response in this case? Throwing everything away seems like bad manners. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the rationale is that if the first request has timed out, then the subsequent requests will likely also time out. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also, more requests might be added in the pipeline in the meantime (while the first one has timed out), potentially leading to an avalanche of timeouts; i think it's better to just flush the pipeline and start over. |
||
select { | ||
case <-ctx.Done(): | ||
// not a read timeout | ||
default: | ||
select { | ||
case rctl <- struct{}{}: | ||
default: | ||
} | ||
} | ||
|
||
return nil, rctx.Err() | ||
|
||
case <-ms.dht.ctx.Done(): | ||
return nil, ms.dht.ctx.Err() | ||
} | ||
|
||
if res.err != nil { | ||
if retry { | ||
log.Info("error reading message, bailing: ", err) | ||
return nil, err | ||
log.Info("error reading message, bailing: ", res.err) | ||
return nil, res.err | ||
} else { | ||
log.Info("error reading message, trying again: ", err) | ||
log.Info("error reading message, trying again: ", res.err) | ||
retry = true | ||
continue | ||
} | ||
} | ||
|
||
log.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes) | ||
|
||
if ms.singleMes > streamReuseTries { | ||
go inet.FullClose(ms.s) | ||
ms.s = nil | ||
} else if retry { | ||
if retry { | ||
ms.lk.Lock() | ||
ms.singleMes++ | ||
if ms.singleMes > streamReuseTries { | ||
ms.resetStream(s) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
wasn't considering the threaded context |
||
} | ||
ms.lk.Unlock() | ||
} | ||
|
||
return mes, nil | ||
return res.mes, nil | ||
} | ||
} | ||
|
||
func (ms *messageSender) writeMsg(pmes *pb.Message) error { | ||
if err := ms.w.WriteMsg(pmes); err != nil { | ||
func (ms *messageSender) sendMessageSingle(ctx context.Context, pmes *pb.Message) error { | ||
s, err := ms.dht.host.NewStream(ctx, ms.p, ms.dht.protocols...) | ||
if err != nil { | ||
return err | ||
} | ||
return ms.w.Flush() | ||
defer s.Close() | ||
|
||
w := ggio.NewDelimitedWriter(s) | ||
|
||
err = w.WriteMsg(pmes) | ||
if err != nil { | ||
s.Reset() | ||
} | ||
|
||
return err | ||
} | ||
|
||
func (ms *messageSender) ctxReadMsg(ctx context.Context, mes *pb.Message) error { | ||
func (ms *messageSender) sendRequestSingle(ctx context.Context, pmes *pb.Message) (*pb.Message, error) { | ||
s, err := ms.dht.host.NewStream(ctx, ms.p, ms.dht.protocols...) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer s.Close() | ||
|
||
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax) | ||
w := ggio.NewDelimitedWriter(s) | ||
|
||
if err := w.WriteMsg(pmes); err != nil { | ||
s.Reset() | ||
return nil, err | ||
} | ||
|
||
mes := new(pb.Message) | ||
|
||
errc := make(chan error, 1) | ||
go func(r ggio.ReadCloser) { | ||
go func() { | ||
errc <- r.ReadMsg(mes) | ||
}(ms.r) | ||
}() | ||
|
||
t := time.NewTimer(dhtReadMessageTimeout) | ||
defer t.Stop() | ||
rctx, cancel := context.WithTimeout(ctx, dhtReadMessageTimeout) | ||
defer cancel() | ||
|
||
select { | ||
case err := <-errc: | ||
return err | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
case <-t.C: | ||
return ErrReadTimeout | ||
if err != nil { | ||
return nil, err | ||
} | ||
case <-rctx.Done(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should reset the stream here so as not to send an EOF (which indicates success). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. probably, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a reset. |
||
s.Reset() | ||
return nil, rctx.Err() | ||
} | ||
|
||
return mes, nil | ||
} | ||
|
||
func (ms *messageSender) messageReceiver(rch chan chan requestResult, rctl chan struct{}, r ggio.ReadCloser) { | ||
loop: | ||
for { | ||
select { | ||
case <-rctl: | ||
// poll for reset due to timeouts first, there might be requests queued | ||
break loop | ||
|
||
default: | ||
select { | ||
case next, ok := <-rch: | ||
if !ok { | ||
return | ||
} | ||
|
||
mes := new(pb.Message) | ||
err := r.ReadMsg(mes) | ||
if err != nil { | ||
next <- requestResult{err: err} | ||
break loop | ||
} else { | ||
next <- requestResult{mes: mes} | ||
} | ||
|
||
case <-rctl: | ||
break loop | ||
|
||
case <-ms.dht.ctx.Done(): | ||
return | ||
} | ||
} | ||
} | ||
|
||
// reset once; needs to happen in a goroutine to avoid deadlock | ||
// in case of pipeline stalls | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there's a race condition here. What happens if another thread calls There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think you are right, we should track which stream we are resetting -- previously, this was protected by the rcount contraption. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a guarded reset version, |
||
go func(s inet.Stream) { | ||
ms.lk.Lock() | ||
ms.resetStream(s) | ||
ms.lk.Unlock() | ||
}(ms.s) | ||
|
||
// drain the pipeline | ||
err := errors.New("Stream has been abandoned due to earlier errors") | ||
for { | ||
select { | ||
case next, ok := <-rch: | ||
if !ok { | ||
return | ||
} | ||
next <- requestResult{err: err} | ||
|
||
case <-ms.dht.ctx.Done(): | ||
return | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this could be renamed to
retried