Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic resource manager integration tests #1296

Merged
merged 2 commits into from
Jan 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 274 additions & 0 deletions itest/echo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
package itest

import (
"context"
"fmt"
"io"
"sync"
"time"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

logging "github.com/ipfs/go-log/v2"
)

const (
EchoService = "test.echo"
EchoProtoID = "/test/echo"
)

var (
echoLog = logging.Logger("echo")
)

type Echo struct {
Host host.Host

mx sync.Mutex
status EchoStatus

beforeReserve, beforeRead, beforeWrite, beforeDone func() error
}

type EchoStatus struct {
StreamsIn int
EchosIn, EchosOut int
IOErrors int
ResourceServiceErrors int
ResourceReservationErrors int
}

func NewEcho(h host.Host) *Echo {
e := &Echo{Host: h}
h.SetStreamHandler(EchoProtoID, e.handleStream)
return e
}

func (e *Echo) Status() EchoStatus {
e.mx.Lock()
defer e.mx.Unlock()

return e.status
}

func (e *Echo) BeforeReserve(f func() error) {
e.mx.Lock()
defer e.mx.Unlock()

e.beforeReserve = f
}

func (e *Echo) BeforeRead(f func() error) {
e.mx.Lock()
defer e.mx.Unlock()

e.beforeRead = f
}

func (e *Echo) BeforeWrite(f func() error) {
e.mx.Lock()
defer e.mx.Unlock()

e.beforeWrite = f
}

func (e *Echo) BeforeDone(f func() error) {
e.mx.Lock()
defer e.mx.Unlock()

e.beforeDone = f
}

func (e *Echo) getBeforeReserve() func() error {
e.mx.Lock()
defer e.mx.Unlock()

return e.beforeReserve
}

func (e *Echo) getBeforeRead() func() error {
e.mx.Lock()
defer e.mx.Unlock()

return e.beforeRead
}

func (e *Echo) getBeforeWrite() func() error {
e.mx.Lock()
defer e.mx.Unlock()

return e.beforeWrite
}

func (e *Echo) getBeforeDone() func() error {
e.mx.Lock()
defer e.mx.Unlock()

return e.beforeDone
}

func (e *Echo) handleStream(s network.Stream) {
defer s.Close()

e.mx.Lock()
e.status.StreamsIn++
e.mx.Unlock()

if beforeReserve := e.getBeforeReserve(); beforeReserve != nil {
if err := beforeReserve(); err != nil {
echoLog.Debugf("error syncing before reserve: %s", err)

s.Reset()
return
}
}

if err := s.Scope().SetService(EchoService); err != nil {
echoLog.Debugf("error attaching stream to echo service: %s", err)

e.mx.Lock()
e.status.ResourceServiceErrors++
e.mx.Unlock()

s.Reset()
return
}

if err := s.Scope().ReserveMemory(4096, network.ReservationPriorityAlways); err != nil {
echoLog.Debugf("error reserving memory: %s", err)

e.mx.Lock()
e.status.ResourceReservationErrors++
e.mx.Unlock()

s.Reset()
return
}

if beforeRead := e.getBeforeRead(); beforeRead != nil {
if err := beforeRead(); err != nil {
echoLog.Debugf("error syncing before read: %s", err)

s.Reset()
return
}
}

buf := make([]byte, 4096)

s.SetReadDeadline(time.Now().Add(5 * time.Second))
n, err := s.Read(buf)
switch {
case err == io.EOF:
if n == 0 {
return
}

case err != nil:
echoLog.Debugf("I/O error : %s", err)

e.mx.Lock()
e.status.IOErrors++
e.mx.Unlock()

s.Reset()
return
}

e.mx.Lock()
e.status.EchosIn++
e.mx.Unlock()

if beforeWrite := e.getBeforeWrite(); beforeWrite != nil {
if err := beforeWrite(); err != nil {
echoLog.Debugf("error syncing before write: %s", err)

s.Reset()
return
}
}

s.SetWriteDeadline(time.Now().Add(5 * time.Second))
_, err = s.Write(buf[:n])
if err != nil {
echoLog.Debugf("I/O error: %s", err)

e.mx.Lock()
e.status.IOErrors++
e.mx.Unlock()

s.Reset()
return
}

e.mx.Lock()
e.status.EchosOut++
e.mx.Unlock()

s.CloseWrite()

if beforeDone := e.getBeforeDone(); beforeDone != nil {
if err := beforeDone(); err != nil {
echoLog.Debugf("error syncing before done: %s", err)

s.Reset()
}
}
}

func (e *Echo) Echo(p peer.ID, what string) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

s, err := e.Host.NewStream(ctx, p, EchoProtoID)
if err != nil {
return err
}
defer s.Close()

if err := s.Scope().SetService(EchoService); err != nil {
echoLog.Debugf("error attaching stream to echo service: %s", err)

s.Reset()
return err
}

if err := s.Scope().ReserveMemory(4096, network.ReservationPriorityAlways); err != nil {
echoLog.Debugf("error reserving memory: %s", err)

s.Reset()
return err
}

s.SetWriteDeadline(time.Now().Add(5 * time.Second))
_, err = s.Write([]byte(what))
if err != nil {
return err
}
s.CloseWrite()

buf := make([]byte, 4096)

s.SetReadDeadline(time.Now().Add(5 * time.Second))
n, err := s.Read(buf)
switch {
case err == io.EOF:
if n == 0 {
return err
}

case err != nil:
echoLog.Debugf("I/O error : %s", err)

s.Reset()
return err
}

if what != string(buf[:n]) {
return fmt.Errorf("echo output doesn't match input")
}

return nil
}
73 changes: 73 additions & 0 deletions itest/echo_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package itest

import (
"context"
"testing"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"

"github.com/stretchr/testify/require"
)

func createEchos(t *testing.T, count int, makeOpts ...func(int) libp2p.Option) []*Echo {
result := make([]*Echo, 0, count)

for i := 0; i < count; i++ {
opts := make([]libp2p.Option, 0, len(makeOpts))
for _, makeOpt := range makeOpts {
opts = append(opts, makeOpt(i))
}

h, err := libp2p.New(opts...)
if err != nil {
t.Fatal(err)
}

e := NewEcho(h)
result = append(result, e)
}

for i := 0; i < count; i++ {
for j := 0; j < count; j++ {
if i == j {
continue
}

result[i].Host.Peerstore().AddAddrs(result[j].Host.ID(), result[j].Host.Addrs(), peerstore.PermanentAddrTTL)
}
}

return result
}

func closeEchos(echos []*Echo) {
for _, e := range echos {
e.Host.Close()
}
}

func checkEchoStatus(t *testing.T, e *Echo, expected EchoStatus) {
vyzo marked this conversation as resolved.
Show resolved Hide resolved
t.Helper()
require.Equal(t, expected, e.Status())
}

func TestEcho(t *testing.T) {
echos := createEchos(t, 2)
defer closeEchos(echos)

if err := echos[0].Host.Connect(context.TODO(), peer.AddrInfo{ID: echos[1].Host.ID()}); err != nil {
t.Fatal(err)
}

if err := echos[0].Echo(echos[1].Host.ID(), "hello libp2p"); err != nil {
t.Fatal(err)
}

checkEchoStatus(t, echos[1], EchoStatus{
StreamsIn: 1,
EchosIn: 1,
EchosOut: 1,
})
}
Loading