Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

forward: Kademlia forwarder peer query service #2052

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
160 changes: 160 additions & 0 deletions network/forward/forward.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package forward

import (
"errors"
"fmt"
"sync"

"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/network"
)

var (
NoMorePeers = errors.New("no more peers")
)

// Session encapsulates one single peer iteration query
type Session struct {
kademlia *network.KademliaLoadBalancer // kademlia backend
base []byte // base address to use for iteration
id int // id of session
capabilityIndex string // kademlia capabilityIndex in use
nextC chan struct{} // triggered to output one single peer from iterator
getC chan *network.Peer // receives peer from iterator
}

// Id returns the session id
func (s *Session) Id() int {
return s.id
}

// Get returns up to numPeers peers from the current position of the iterator
// If no further peers are available a NoMorePeers error will be returned
func (s *Session) Get(numPeers int) ([]*network.Peer, error) {
var result []*network.Peer
select {
case <-s.getC:
return result, NoMorePeers
default:
}
for i := 0; i < numPeers; i++ {
s.nextC <- struct{}{}
p, ok := <-s.getC
if !ok {
break
}
result = append(result, p)
}
return result, nil
}

// starts the iterator and blocks for request for next peer through Get()
func (s *Session) load() error {
err := s.kademlia.EachBinFiltered(s.base, s.capabilityIndex, func(bin network.LBBin) bool {
for _, p := range bin.LBPeers {
_, ok := <-s.nextC
if !ok {
return false
}
s.getC <- p.Peer
}
return true
})
close(s.getC)
return err
}

// frees resources
func (s *Session) destroy() {
close(s.nextC)
}

// SessionManager is the Session object factory
type SessionManager struct {
kademlia *network.KademliaLoadBalancer // underlying kademlia backend
sessions map[int]*Session // index of active sessions, mapped by session id
lastId int // last assigned id for session, starts at 1 to make create from context easier
mu sync.Mutex // protects sessions map
}

// NewSessionManager is the SessionManager constructor
// Sessions created with the SessionManager will use the provided kademlia backend
// TODO: argument should be network.KademliaBackend, but needs KademliaLoadBalancer to implement this
func NewSessionManager(kademlia *network.KademliaLoadBalancer) *SessionManager {
return &SessionManager{
sessions: make(map[int]*Session),
kademlia: kademlia,
}
}

// New creates a new Session object with the given capabilityindex and base address
// if capabilityIndex is empty, the global kademlia database will be used
// if base is nil, the kademlia base address will be used as comparator for the iteration
func (m *SessionManager) New(capabilityIndex string, base []byte) *Session {
s := &Session{
capabilityIndex: capabilityIndex,
kademlia: m.kademlia,
nextC: make(chan struct{}),
getC: make(chan *network.Peer),
}
if base == nil {
s.base = m.kademlia.BaseAddr()
} else {
s.base = base
}
go s.load()
return m.add(s)
}

// Reap frees the Session object resources and removes it from the session index
func (m *SessionManager) Reap(sessionId int) {
s, ok := m.sessions[sessionId]
if !ok {
return
}
s.destroy()
}

// ToContext creates a SessionContext from the existing Session matching the provided id
// if the session does not exist an error is returned
func (m *SessionManager) ToContext(id int) (*SessionContext, error) {
s, ok := m.sessions[id]
if !ok {
return nil, fmt.Errorf("No such session %d", id)
}
return &SessionContext{
CapabilityIndex: s.capabilityIndex,
SessionId: s.id,
Address: s.base,
}, nil
}

// FromContext retrieves or creates a Session from a provided context
// If the context has the "id" value set, the corresponding Session is returned, or error if it does not exist
// Otherwise, a new Session is created and returned, optionally with the "address" and/or "capability" values provided in the context
func (m *SessionManager) FromContext(sctx *SessionContext) (*Session, error) {

sessionId, ok := sctx.Value("id").(int)
if ok {
s, ok := m.sessions[sessionId]
if !ok {
return nil, fmt.Errorf("No such session %d", sessionId)
}
return s, nil
}

addr, _ := sctx.Value("address").([]byte)
capabilityIndex, _ := sctx.Value("capability").(string)
return m.New(capabilityIndex, addr), nil
}

// adds a new session to the sessionmanager
func (m *SessionManager) add(s *Session) *Session {
m.mu.Lock()
defer m.mu.Unlock()
m.lastId++
log.Trace("adding session", "id", m.lastId)
s.id = m.lastId
m.sessions[m.lastId] = s
return s
}
153 changes: 153 additions & 0 deletions network/forward/forward_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package forward

import (
"bytes"
"testing"

"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/network"
"github.com/ethersphere/swarm/network/capability"
"github.com/ethersphere/swarm/pot"
"github.com/ethersphere/swarm/testutil"
)

func init() {
testutil.Init()
}

// TestNew tests that the SessionManager constructor creates Session object with expected values
func TestNew(t *testing.T) {
addr := make([]byte, 32)
addr[31] = 0x01
kadParams := network.NewKadParams()
kad := network.NewKademlia(addr, kadParams)
kadLB := network.NewKademliaLoadBalancer(kad, false)
defer kadLB.Stop()

mgr := NewSessionManager(kadLB)
fwdBase := mgr.New("", nil)
defer mgr.Reap(fwdBase.Id())
if !bytes.Equal(fwdBase.base, addr) {
t.Fatalf("base base; expected %x, got %x", addr, fwdBase.base)
}
if fwdBase.id != 1 {
t.Fatalf("sessionId; expected %d, got %d", 1, fwdBase.id)
}

bytesNear := pot.NewAddressFromString("00000001")
capabilityIndex := "foo"
fwdExplicit := mgr.New(capabilityIndex, bytesNear)
if !bytes.Equal(fwdExplicit.base, bytesNear) {
t.Fatalf("base explicit; expected %x, got %x", bytesNear, fwdExplicit.base)
}
if fwdExplicit.id != 2 {
t.Fatalf("sessionId; expected %d, got %d", 2, fwdExplicit.id)
}
if fwdExplicit.capabilityIndex != capabilityIndex {
t.Fatalf("capabilityindex, expected %s, got %s", capabilityIndex, fwdExplicit.capabilityIndex)
}
if len(mgr.sessions) != 2 {
t.Fatalf("sessions array; expected %d, got %d", 2, len(mgr.sessions))
}
}

// TestManagerContext tests that the SessionManager's context translations creates Session objects with expected values, and retrieves existing matching Session objects
func TestManagerContext(t *testing.T) {
addr := make([]byte, 32)
addr[31] = 0x01
kadParams := network.NewKadParams()
kad := network.NewKademlia(addr, kadParams)
kadLB := network.NewKademliaLoadBalancer(kad, false)
defer kadLB.Stop()

mgr := NewSessionManager(kadLB)
fwdVoid := mgr.New("", nil) // id 1
defer mgr.Reap(fwdVoid.Id())
fwdOne := mgr.New("", nil) // id 2
defer mgr.Reap(fwdOne.Id())
if len(mgr.sessions) != 2 {
t.Fatalf("mgr session length; expected 2, got %d", len(mgr.sessions))
}
if mgr.sessions[2] != fwdOne {
t.Fatalf("fromcontext; expected %p, got %p", fwdOne, mgr.sessions[2])
}

newAddr := make([]byte, 32)
newAddr[31] = 0x02
fwdTwo := mgr.New("foo", newAddr) // id 3
defer mgr.Reap(fwdTwo.Id())
sctx, err := mgr.ToContext(3)
if err != nil {
t.Fatal(err)
}
if fwdTwo.id != sctx.SessionId {
t.Fatalf("to context id; expected %d, got %d", fwdTwo.id, sctx.SessionId)
}
if fwdTwo.capabilityIndex != sctx.CapabilityIndex {
t.Fatalf("to context id; expected %s, got %s", fwdTwo.capabilityIndex, sctx.CapabilityIndex)
}
if !bytes.Equal(fwdTwo.base, sctx.Address) {
t.Fatalf("to context id; expected %x, got %x", fwdTwo.base, sctx.Address)
}

sctx = NewSessionContext("", nil)
sctx.SessionId = 3
fwdThree, err := mgr.FromContext(sctx)
if err != nil {
t.Fatal(err)
}
if fwdThree != fwdTwo {
t.Fatalf("from new context; expected %p, got %p", fwdTwo, fwdThree)
}
}

// TestGet verifies that the synchronous Get method retrieves peers in the correct order
func TestGet(t *testing.T) {
bytesOwn := pot.NewAddressFromString("00000000")
kadParams := network.NewKadParams()
kad := network.NewKademlia(bytesOwn, kadParams)
kadLB := network.NewKademliaLoadBalancer(kad, false)
defer kadLB.Stop()
cp := capability.NewCapability(4, 2)
kad.RegisterCapabilityIndex("foo", *cp)

bytesFar := pot.NewAddressFromString("10000000")
bytesNear := pot.NewAddressFromString("00000001")
addrFar := network.NewBzzAddr(bytesFar, []byte{})
addrNear := network.NewBzzAddr(bytesNear, []byte{})
addrFar.Capabilities.Add(cp)
addrNear.Capabilities.Add(cp)
peerFar := network.NewPeer(&network.BzzPeer{BzzAddr: addrFar}, kad)
peerNear := network.NewPeer(&network.BzzPeer{BzzAddr: addrNear}, kad)
kad.Register(addrFar)
kad.Register(addrNear)
kad.On(peerFar)
kad.On(peerNear)

mgr := NewSessionManager(kadLB)
fwd := mgr.New("foo", nil)
defer mgr.Reap(fwd.Id())
p, err := fwd.Get(1)
if err != nil {
t.Fatal(err)
}
if len(p) != 1 {
t.Fatalf("get first count; expected 1, got %d", len(p))
}
if !bytes.Equal(p[0].Address(), bytesNear) {
t.Fatalf("get first address; expected %x, got %x", bytesNear, p[0].Address())
}

p, err = fwd.Get(1)
if err != nil {
t.Fatal(err)
}
if len(p) != 1 {
t.Fatalf("get peers count; expected 1, got %d", len(p))
}
if !bytes.Equal(p[0].Address(), bytesFar) {
t.Fatalf("get second address; expected %x, got %x", bytesFar, p[0].Address())
}
log.Trace("peer", "peer", p)

}
70 changes: 70 additions & 0 deletions network/forward/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package forward

import (
"time"

"github.com/ethersphere/swarm/network"
)

var (
zeroTime = time.Unix(0, 0)
)

// SessionInterface provides an interface for an individual session object
type SessionInterface interface {
Subscribe() <-chan *network.Peer
Get(numberOfPeers int) ([]*network.Peer, error)
}

// SessionContext is a context.Context that can be used to reference existing sessions or create new sessions
type SessionContext struct {
CapabilityIndex string
SessionId int
Address []byte
}

// NewSessionContext creates a new SessionContext with the provided capabilityIndex and base address
func NewSessionContext(capabilityIndex string, base []byte) *SessionContext {
return &SessionContext{
CapabilityIndex: capabilityIndex,
Address: base,
}
}

// Deadline implements context.Context
func (c *SessionContext) Deadline() (time.Time, bool) {
return zeroTime, false
}

// Done implements context.Context
func (c *SessionContext) Done() <-chan struct{} {
return nil
}

// Err implements context.Context
func (c *SessionContext) Err() error {
return nil
}

// Value implements context.Context
func (c *SessionContext) Value(k interface{}) interface{} {
ks, ok := k.(string)
if !ok {
return nil
}
switch ks {
case "address":
if c.Address == nil {
return nil
}
return c.Address
case "capability":
if c.CapabilityIndex == "" {
return nil
}
return c.CapabilityIndex
case "id":
return c.SessionId
}
return nil
}
Loading