Skip to content

Commit

Permalink
Merge pull request #8 from hsanjuan/reset/streams
Browse files Browse the repository at this point in the history
Reset streams when no reading is going to be done anymore
  • Loading branch information
hsanjuan authored Jun 28, 2018
2 parents 28997d8 + d0d288b commit 2e9c129
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 18 deletions.
7 changes: 6 additions & 1 deletion p2p/net/gostream/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@ func (c *conn) Write(b []byte) (n int, err error) {
// Close closes the connection.
// Any blocked Read or Write operations will be unblocked and return errors.
func (c *conn) Close() error {
return c.s.Close()
if err := c.s.Close(); err != nil {
c.s.Reset()
return err
}
go pnet.AwaitEOF(c.s)
return nil
}

// LocalAddr returns the local network address.
Expand Down
40 changes: 24 additions & 16 deletions p2p/net/gostream/gostream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package gostream
import (
"bufio"
"context"
"io/ioutil"
"testing"
"time"

Expand Down Expand Up @@ -39,8 +38,10 @@ func TestServerClient(t *testing.T) {
clientHost.Peerstore().AddAddrs(srvHost.ID(), srvHost.Addrs(), peerstore.PermanentAddrTTL)

var tag protocol.ID = "/testitytest"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
go func(ctx context.Context) {
listener, err := Listen(srvHost, tag)
if err != nil {
t.Fatal(err)
Expand All @@ -58,19 +59,25 @@ func TestServerClient(t *testing.T) {
defer servConn.Close()

reader := bufio.NewReader(servConn)
msg, err := reader.ReadString('\n')
if err != nil {
t.Fatal(err)
}
if string(msg) != "is libp2p awesome?\n" {
t.Fatalf("Bad incoming message: %s", msg)
}

_, err = servConn.Write([]byte("yes it is"))
if err != nil {
t.Fatal(err)
for {
msg, err := reader.ReadString('\n')
if err != nil {
t.Fatal(err)
}
if string(msg) != "is libp2p awesome?\n" {
t.Fatalf("Bad incoming message: %s", msg)
}

_, err = servConn.Write([]byte("yes it is\n"))
if err != nil {
t.Fatal(err)
}
select {
case <-ctx.Done():
return
}
}
}()
}(ctx)

clientConn, err := Dial(clientHost, srvHost.ID(), tag)
if err != nil {
Expand Down Expand Up @@ -109,12 +116,13 @@ func TestServerClient(t *testing.T) {
t.Fatal(err)
}

resp, err := ioutil.ReadAll(clientConn)
reader := bufio.NewReader(clientConn)
resp, err := reader.ReadString('\n')
if err != nil {
t.Fatal(err)
}

if string(resp) != "yes it is" {
if string(resp) != "yes it is\n" {
t.Errorf("Bad response: %s", resp)
}

Expand Down
2 changes: 1 addition & 1 deletion p2p/net/gostream/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func Listen(h host.Host, tag protocol.ID) (net.Listener, error) {
select {
case l.streamCh <- s:
case <-ctx.Done():
s.Close()
s.Reset()
}
})

Expand Down

0 comments on commit 2e9c129

Please sign in to comment.