Skip to content

Commit

Permalink
handle timeouts using i/o deadlines.
Browse files Browse the repository at this point in the history
Both initiator and responder now time out if reads
and/or writes are blocked for 30 seconds (default
negotiation timeout).

Introduces two tests to validate the new behaviour.

Fixes #47.
  • Loading branch information
raulk committed Nov 1, 2019
1 parent af073b1 commit 21fd290
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 21 deletions.
58 changes: 37 additions & 21 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,12 @@ var ErrNoProtocols = errors.New("no protocols specified")
// on this ReadWriteCloser. It returns an error if, for example,
// the muxer does not know how to handle this protocol.
func SelectProtoOrFail(proto string, rwc io.ReadWriteCloser) error {
errCh := make(chan error, 1)
go func() {
var buf bytes.Buffer
delimWrite(&buf, []byte(ProtocolID))
delimWrite(&buf, []byte(proto))
_, err := io.Copy(rwc, &buf)
errCh <- err
}()
// We have to read *both* errors.
err1 := readMultistreamHeader(rwc)
err2 := readProto(proto, rwc)
if werr := <-errCh; werr != nil {
return werr
}
if err1 != nil {
return err1
}
if err2 != nil {
return err2
if clearFn, err := setDeadline(rwc); err != nil {
return err
} else {
defer clearFn()
}
return nil
return selectSingleProtocol(proto, rwc)
}

// SelectOneOf will perform handshakes with the protocols on the given slice
Expand All @@ -49,12 +34,18 @@ func SelectOneOf(protos []string, rwc io.ReadWriteCloser) (string, error) {
return "", ErrNoProtocols
}

if clearFn, err := setDeadline(rwc); err != nil {
return "", err
} else {
defer clearFn()
}

// Use SelectProtoOrFail to pipeline the /multistream/1.0.0 handshake
// with an attempt to negotiate the first protocol. If that fails, we
// can continue negotiating the rest of the protocols normally.
//
// This saves us a round trip.
switch err := SelectProtoOrFail(protos[0], rwc); err {
switch err := selectSingleProtocol(protos[0], rwc); err {
case nil:
return protos[0], nil
case ErrNotSupported: // try others
Expand All @@ -74,6 +65,31 @@ func SelectOneOf(protos []string, rwc io.ReadWriteCloser) (string, error) {
return "", ErrNotSupported
}

// selectSingleProtocol attempts to select a single protocol.
func selectSingleProtocol(proto string, rwc io.ReadWriteCloser) error {
errCh := make(chan error, 1)
go func() {
var buf bytes.Buffer
delimWrite(&buf, []byte(ProtocolID))
delimWrite(&buf, []byte(proto))
_, err := io.Copy(rwc, &buf)
errCh <- err
}()
// We have to read *both* errors.
err1 := readMultistreamHeader(rwc)
err2 := readProto(proto, rwc)
if werr := <-errCh; werr != nil {
return werr
}
if err1 != nil {
return err1
}
if err2 != nil {
return err2
}
return nil
}

func handshake(rw io.ReadWriter) error {
errCh := make(chan error, 1)
go func() {
Expand Down
12 changes: 12 additions & 0 deletions multistream.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ func (msm *MultistreamMuxer) NegotiateLazy(rwc io.ReadWriteCloser) (io.ReadWrite
writeErr := make(chan error, 1)
defer close(pval)

if clearFn, err := setDeadline(rwc); err != nil {
return nil, "", nil, err
} else {
defer clearFn()
}

lzc := &lazyServerConn{
con: rwc,
}
Expand Down Expand Up @@ -292,6 +298,12 @@ loop:
// Negotiate performs protocol selection and returns the protocol name and
// the matching handler function for it (or an error).
func (msm *MultistreamMuxer) Negotiate(rwc io.ReadWriteCloser) (string, HandlerFunc, error) {
if clearFn, err := setDeadline(rwc); err != nil {
return "", nil, err
} else {
defer clearFn()
}

// Send our protocol ID
err := delimWriteBuffered(rwc, []byte(ProtocolID))
if err != nil {
Expand Down
43 changes: 43 additions & 0 deletions multistream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"net"
"sort"
"strings"
"testing"
"time"
)
Expand Down Expand Up @@ -684,3 +685,45 @@ func TestNegotiateFail(t *testing.T) {
t.Fatal("got wrong protocol")
}
}

func TestInitiatorTimeout(t *testing.T) {
a, _ := newPipe(t)

var old time.Duration
old, NegotiationTimeout = NegotiationTimeout, 1*time.Second
defer func() { NegotiationTimeout = old }()

mux := NewMultistreamMuxer()
mux.AddHandler("/a", func(p string, rwc io.ReadWriteCloser) error {
t.Error("shouldnt execute this handler")
return nil
})

ch := make(chan error)
go func() {
defer close(ch)
err := SelectProtoOrFail("/a", a)
ch <- err
}()

// nothing is reading from b.

if err := <-ch; !strings.Contains(err.Error(), "i/o timeout") {
t.Fatal("expected a timeout error")
}
}

func TestResponderTimeout(t *testing.T) {
_, b := newPipe(t)

var old time.Duration
old, NegotiationTimeout = NegotiationTimeout, 1*time.Second
defer func() { NegotiationTimeout = old }()

mux := NewMultistreamMuxer()
// nothing is sending from a.
err := mux.Handle(b)
if !strings.Contains(err.Error(), "i/o timeout") {
t.Fatal("expected a timeout error")
}
}
32 changes: 32 additions & 0 deletions timeout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package multistream

import (
"fmt"
"io"
"time"
)

// NegotiationTimeout is the maximum time a protocol negotiation atempt is
// allowed to be inflight before it fails.
var NegotiationTimeout = 30 * time.Second

// setDeadline attempts to set a read and write deadline on the underlying IO
// object, if it supports it.
func setDeadline(rwc io.ReadWriteCloser) (func(), error) {
// rwc could be:
// - a net.Conn or a libp2p Stream, both of which satisfy this interface.
// - something else (e.g. testing), in which case we skip over setting
// a deadline.
type deadline interface {
SetDeadline(time.Time) error
}
if d, ok := rwc.(deadline); ok {
if err := d.SetDeadline(time.Now().Add(NegotiationTimeout)); err != nil {
// this should not happen; if it does, something is broken and we
// should fail immediately.
return nil, fmt.Errorf("failed while setting a deadline: %w", err)
}
return func() { d.SetDeadline(time.Time{}) }, nil
}
return func() {}, nil
}

0 comments on commit 21fd290

Please sign in to comment.