Skip to content

Commit

Permalink
Merge pull request #1296 from libp2p/rcmgr-itest
Browse files Browse the repository at this point in the history
Basic resource manager integration tests
  • Loading branch information
marten-seemann authored Jan 18, 2022
2 parents 15d7dfb + 699fdde commit 7e3f22f
Show file tree
Hide file tree
Showing 3 changed files with 593 additions and 0 deletions.
274 changes: 274 additions & 0 deletions itest/echo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
package itest

import (
"context"
"fmt"
"io"
"sync"
"time"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

logging "github.com/ipfs/go-log/v2"
)

const (
EchoService = "test.echo"
EchoProtoID = "/test/echo"
)

var (
echoLog = logging.Logger("echo")
)

type Echo struct {
Host host.Host

mx sync.Mutex
status EchoStatus

beforeReserve, beforeRead, beforeWrite, beforeDone func() error
}

type EchoStatus struct {
StreamsIn int
EchosIn, EchosOut int
IOErrors int
ResourceServiceErrors int
ResourceReservationErrors int
}

func NewEcho(h host.Host) *Echo {
e := &Echo{Host: h}
h.SetStreamHandler(EchoProtoID, e.handleStream)
return e
}

func (e *Echo) Status() EchoStatus {
e.mx.Lock()
defer e.mx.Unlock()

return e.status
}

func (e *Echo) BeforeReserve(f func() error) {
e.mx.Lock()
defer e.mx.Unlock()

e.beforeReserve = f
}

func (e *Echo) BeforeRead(f func() error) {
e.mx.Lock()
defer e.mx.Unlock()

e.beforeRead = f
}

func (e *Echo) BeforeWrite(f func() error) {
e.mx.Lock()
defer e.mx.Unlock()

e.beforeWrite = f
}

func (e *Echo) BeforeDone(f func() error) {
e.mx.Lock()
defer e.mx.Unlock()

e.beforeDone = f
}

func (e *Echo) getBeforeReserve() func() error {
e.mx.Lock()
defer e.mx.Unlock()

return e.beforeReserve
}

func (e *Echo) getBeforeRead() func() error {
e.mx.Lock()
defer e.mx.Unlock()

return e.beforeRead
}

func (e *Echo) getBeforeWrite() func() error {
e.mx.Lock()
defer e.mx.Unlock()

return e.beforeWrite
}

func (e *Echo) getBeforeDone() func() error {
e.mx.Lock()
defer e.mx.Unlock()

return e.beforeDone
}

func (e *Echo) handleStream(s network.Stream) {
defer s.Close()

e.mx.Lock()
e.status.StreamsIn++
e.mx.Unlock()

if beforeReserve := e.getBeforeReserve(); beforeReserve != nil {
if err := beforeReserve(); err != nil {
echoLog.Debugf("error syncing before reserve: %s", err)

s.Reset()
return
}
}

if err := s.Scope().SetService(EchoService); err != nil {
echoLog.Debugf("error attaching stream to echo service: %s", err)

e.mx.Lock()
e.status.ResourceServiceErrors++
e.mx.Unlock()

s.Reset()
return
}

if err := s.Scope().ReserveMemory(4096, network.ReservationPriorityAlways); err != nil {
echoLog.Debugf("error reserving memory: %s", err)

e.mx.Lock()
e.status.ResourceReservationErrors++
e.mx.Unlock()

s.Reset()
return
}

if beforeRead := e.getBeforeRead(); beforeRead != nil {
if err := beforeRead(); err != nil {
echoLog.Debugf("error syncing before read: %s", err)

s.Reset()
return
}
}

buf := make([]byte, 4096)

s.SetReadDeadline(time.Now().Add(5 * time.Second))
n, err := s.Read(buf)
switch {
case err == io.EOF:
if n == 0 {
return
}

case err != nil:
echoLog.Debugf("I/O error : %s", err)

e.mx.Lock()
e.status.IOErrors++
e.mx.Unlock()

s.Reset()
return
}

e.mx.Lock()
e.status.EchosIn++
e.mx.Unlock()

if beforeWrite := e.getBeforeWrite(); beforeWrite != nil {
if err := beforeWrite(); err != nil {
echoLog.Debugf("error syncing before write: %s", err)

s.Reset()
return
}
}

s.SetWriteDeadline(time.Now().Add(5 * time.Second))
_, err = s.Write(buf[:n])
if err != nil {
echoLog.Debugf("I/O error: %s", err)

e.mx.Lock()
e.status.IOErrors++
e.mx.Unlock()

s.Reset()
return
}

e.mx.Lock()
e.status.EchosOut++
e.mx.Unlock()

s.CloseWrite()

if beforeDone := e.getBeforeDone(); beforeDone != nil {
if err := beforeDone(); err != nil {
echoLog.Debugf("error syncing before done: %s", err)

s.Reset()
}
}
}

func (e *Echo) Echo(p peer.ID, what string) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

s, err := e.Host.NewStream(ctx, p, EchoProtoID)
if err != nil {
return err
}
defer s.Close()

if err := s.Scope().SetService(EchoService); err != nil {
echoLog.Debugf("error attaching stream to echo service: %s", err)

s.Reset()
return err
}

if err := s.Scope().ReserveMemory(4096, network.ReservationPriorityAlways); err != nil {
echoLog.Debugf("error reserving memory: %s", err)

s.Reset()
return err
}

s.SetWriteDeadline(time.Now().Add(5 * time.Second))
_, err = s.Write([]byte(what))
if err != nil {
return err
}
s.CloseWrite()

buf := make([]byte, 4096)

s.SetReadDeadline(time.Now().Add(5 * time.Second))
n, err := s.Read(buf)
switch {
case err == io.EOF:
if n == 0 {
return err
}

case err != nil:
echoLog.Debugf("I/O error : %s", err)

s.Reset()
return err
}

if what != string(buf[:n]) {
return fmt.Errorf("echo output doesn't match input")
}

return nil
}
73 changes: 73 additions & 0 deletions itest/echo_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package itest

import (
"context"
"testing"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"

"github.com/stretchr/testify/require"
)

func createEchos(t *testing.T, count int, makeOpts ...func(int) libp2p.Option) []*Echo {
result := make([]*Echo, 0, count)

for i := 0; i < count; i++ {
opts := make([]libp2p.Option, 0, len(makeOpts))
for _, makeOpt := range makeOpts {
opts = append(opts, makeOpt(i))
}

h, err := libp2p.New(opts...)
if err != nil {
t.Fatal(err)
}

e := NewEcho(h)
result = append(result, e)
}

for i := 0; i < count; i++ {
for j := 0; j < count; j++ {
if i == j {
continue
}

result[i].Host.Peerstore().AddAddrs(result[j].Host.ID(), result[j].Host.Addrs(), peerstore.PermanentAddrTTL)
}
}

return result
}

func closeEchos(echos []*Echo) {
for _, e := range echos {
e.Host.Close()
}
}

func checkEchoStatus(t *testing.T, e *Echo, expected EchoStatus) {
t.Helper()
require.Equal(t, expected, e.Status())
}

func TestEcho(t *testing.T) {
echos := createEchos(t, 2)
defer closeEchos(echos)

if err := echos[0].Host.Connect(context.TODO(), peer.AddrInfo{ID: echos[1].Host.ID()}); err != nil {
t.Fatal(err)
}

if err := echos[0].Echo(echos[1].Host.ID(), "hello libp2p"); err != nil {
t.Fatal(err)
}

checkEchoStatus(t, echos[1], EchoStatus{
StreamsIn: 1,
EchosIn: 1,
EchosOut: 1,
})
}
Loading

0 comments on commit 7e3f22f

Please sign in to comment.