Skip to content

Commit

Permalink
coreapi swarm: rewire connect/disconnect
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
  • Loading branch information
magik6k committed Sep 18, 2018
1 parent 272159a commit 6fa1289
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 95 deletions.
67 changes: 9 additions & 58 deletions core/commands/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit"
"gx/ipfs/QmYVqYJTVjetcf1guieEgWpK1PZtHPytP624vKzTF1P3r2/go-ipfs-config"
ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
inet "gx/ipfs/QmZNJyx9GGCX4GeuHnLB8fxaxMLs4MjTjHokxfQcCd6Nve/go-libp2p-net"
pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore"
"gx/ipfs/QmeDpqUwwdye8ABKVMPXKuWwPVURFdqTqssbTUB39E2Nwd/go-libp2p-swarm"
iaddr "gx/ipfs/QmePSRaGafvmURQwQkHPDBJsaGwKXC1WpBBHVCQxdr8FPn/go-ipfs-addr"
Expand Down Expand Up @@ -377,26 +376,14 @@ ipfs swarm connect /ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3
cmdkit.StringArg("address", true, true, "Address of peer to connect to.").EnableStdin(),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

addrs := req.Arguments

if n.PeerHost == nil {
res.SetError(ErrNotOnline, cmdkit.ErrClient)
return
}

// FIXME(steb): Nasty
swrm, ok := n.PeerHost.Network().(*swarm.Swarm)
if !ok {
res.SetError(fmt.Errorf("peerhost network was not swarm"), cmdkit.ErrNormal)
return
}

pis, err := peersWithAddresses(addrs)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
Expand All @@ -405,19 +392,17 @@ ipfs swarm connect /ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3

output := make([]string, len(pis))
for i, pi := range pis {
swrm.Backoff().Clear(pi.ID)

output[i] = "connect " + pi.ID.Pretty()

err := n.PeerHost.Connect(req.Context, pi)
err := api.Swarm().Connect(req.Context, pi)
if err != nil {
res.SetError(fmt.Errorf("%s failure: %s", output[i], err), cmdkit.ErrNormal)
return
}
output[i] += " success"
}

cmds.EmitOnce(res, &stringList{addrs})
cmds.EmitOnce(res, &stringList{output})
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeEncoder(stringListEncoder),
Expand All @@ -442,60 +427,26 @@ it will reconnect.
cmdkit.StringArg("address", true, true, "Address of peer to disconnect from.").EnableStdin(),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

addrs := req.Arguments

if n.PeerHost == nil {
res.SetError(ErrNotOnline, cmdkit.ErrClient)
return
}

iaddrs, err := parseAddresses(addrs)
iaddrs, err := parseAddresses(req.Arguments)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

output := make([]string, len(iaddrs))
for i, addr := range iaddrs {
taddr := addr.Transport()
id := addr.ID()
output[i] = "disconnect " + id.Pretty()
output[i] = "disconnect " + addr.ID().Pretty()

net := n.PeerHost.Network()

if taddr == nil {
if net.Connectedness(id) != inet.Connected {
output[i] += " failure: not connected"
} else if err := net.ClosePeer(id); err != nil {
output[i] += " failure: " + err.Error()
} else {
output[i] += " success"
}
if err := api.Swarm().Disconnect(req.Context, addr.Multiaddr()); err != nil {
output[i] += " failure: " + err.Error()
} else {
found := false
for _, conn := range net.ConnsToPeer(id) {
if !conn.RemoteMultiaddr().Equal(taddr) {
continue
}

if err := conn.Close(); err != nil {
output[i] += " failure: " + err.Error()
} else {
output[i] += " success"
}
found = true
break
}

if !found {
output[i] += " failure: conn not found"
}
output[i] += " success"
}
}
cmds.EmitOnce(res, &stringList{output})
Expand Down
23 changes: 15 additions & 8 deletions core/coreapi/interface/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@ package iface

import (
"context"
"errors"
"time"

peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
"gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
"gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore"
)

// PeerInfo contains information about a peer
type PeerInfo interface {
var (
ErrNotConnected = errors.New("not connected")
ErrConnNotFound = errors.New("conn not found")
)

// ConnectionInfo contains information about a peer
type ConnectionInfo interface {
// ID returns PeerID
ID() peer.ID

Expand All @@ -20,18 +28,17 @@ type PeerInfo interface {
Latency(context.Context) (time.Duration, error)

// Streams returns list of streams established with the peer
// TODO: should this return multicodecs?
Streams(context.Context) ([]string, error)
Streams(context.Context) ([]protocol.ID, error)
}

// SwarmAPI specifies the interface to libp2p swarm
type SwarmAPI interface {
// Connect to a given address
Connect(context.Context, ma.Multiaddr) error
// Connect to a given peer
Connect(context.Context, pstore.PeerInfo) error

// Disconnect from a given address
Disconnect(context.Context, ma.Multiaddr) error

// Peers returns the list of peers we are connected to
Peers(context.Context) ([]PeerInfo, error)
Peers(context.Context) ([]ConnectionInfo, error)
}
51 changes: 22 additions & 29 deletions core/coreapi/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package coreapi

import (
"context"
"errors"
"fmt"
"time"

Expand All @@ -14,6 +13,8 @@ import (
pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore"
swarm "gx/ipfs/QmeDpqUwwdye8ABKVMPXKuWwPVURFdqTqssbTUB39E2Nwd/go-libp2p-swarm"
iaddr "gx/ipfs/QmePSRaGafvmURQwQkHPDBJsaGwKXC1WpBBHVCQxdr8FPn/go-ipfs-addr"
inet "gx/ipfs/QmZNJyx9GGCX4GeuHnLB8fxaxMLs4MjTjHokxfQcCd6Nve/go-libp2p-net"
protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
)

type SwarmAPI struct {
Expand All @@ -29,7 +30,7 @@ type connInfo struct {
muxer string
}

func (api *SwarmAPI) Connect(ctx context.Context, addr ma.Multiaddr) error {
func (api *SwarmAPI) Connect(ctx context.Context, pi pstore.PeerInfo) error {
if api.node.PeerHost == nil {
return coreiface.ErrOffline
}
Expand All @@ -39,16 +40,6 @@ func (api *SwarmAPI) Connect(ctx context.Context, addr ma.Multiaddr) error {
return fmt.Errorf("peerhost network was not swarm")
}

ia, err := iaddr.ParseMultiaddr(ma.Multiaddr(addr))
if err != nil {
return err
}

pi := pstore.PeerInfo{
ID: ia.ID(),
Addrs: []ma.Multiaddr{ia.Transport()},
}

swrm.Backoff().Clear(pi.ID)

return api.node.PeerHost.Connect(ctx, pi)
Expand All @@ -65,36 +56,38 @@ func (api *SwarmAPI) Disconnect(ctx context.Context, addr ma.Multiaddr) error {
}

taddr := ia.Transport()
id := ia.ID()
net := api.node.PeerHost.Network()

found := false
conns := api.node.PeerHost.Network().ConnsToPeer(ia.ID())
for _, conn := range conns {
if !conn.RemoteMultiaddr().Equal(taddr) {
continue
if taddr == nil {
if net.Connectedness(id) != inet.Connected {
return coreiface.ErrNotConnected
} else if err := net.ClosePeer(id); err != nil {
return err
}
} else {
for _, conn := range net.ConnsToPeer(id) {
if !conn.RemoteMultiaddr().Equal(taddr) {
continue
}

if err := conn.Close(); err != nil {
return err
return conn.Close()
}
found = true
break
}

if !found {
return errors.New("conn not found")
return coreiface.ErrConnNotFound
}

return nil
}

func (api *SwarmAPI) Peers(context.Context) ([]coreiface.PeerInfo, error) {
func (api *SwarmAPI) Peers(context.Context) ([]coreiface.ConnectionInfo, error) {
if api.node.PeerHost == nil {
return nil, coreiface.ErrOffline
}

conns := api.node.PeerHost.Network().Conns()

var out []coreiface.PeerInfo
var out []coreiface.ConnectionInfo
for _, c := range conns {
pid := c.RemotePeer()
addr := c.RemoteMultiaddr()
Expand Down Expand Up @@ -133,12 +126,12 @@ func (ci *connInfo) Latency(context.Context) (time.Duration, error) {
return ci.api.node.Peerstore.LatencyEWMA(peer.ID(ci.ID())), nil
}

func (ci *connInfo) Streams(context.Context) ([]string, error) {
func (ci *connInfo) Streams(context.Context) ([]protocol.ID, error) {
streams := ci.conn.GetStreams()

out := make([]string, len(streams))
out := make([]protocol.ID, len(streams))
for i, s := range streams {
out[i] = string(s.Protocol())
out[i] = s.Protocol()
}

return out, nil
Expand Down

0 comments on commit 6fa1289

Please sign in to comment.