Skip to content
This repository has been archived by the owner on Sep 10, 2022. It is now read-only.

Commit

Permalink
call the connection gater when accepting connections and after crypto…
Browse files Browse the repository at this point in the history
… handshake (#55)
  • Loading branch information
aarshkshah1992 authored May 15, 2020
1 parent 1a7cd59 commit 0c577bb
Show file tree
Hide file tree
Showing 7 changed files with 279 additions and 113 deletions.
60 changes: 60 additions & 0 deletions gater_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package stream_test

import (
"sync"

"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/control"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

ma "github.com/multiformats/go-multiaddr"
)

type testGater struct {
sync.Mutex

blockAccept, blockSecured bool
}

var _ connmgr.ConnectionGater = (*testGater)(nil)

func (t *testGater) BlockAccept(block bool) {
t.Lock()
defer t.Unlock()

t.blockAccept = block
}

func (t *testGater) BlockSecured(block bool) {
t.Lock()
defer t.Unlock()

t.blockSecured = block
}

func (t *testGater) InterceptPeerDial(p peer.ID) (allow bool) {
panic("not implemented")
}

func (t *testGater) InterceptAddrDial(id peer.ID, multiaddr ma.Multiaddr) (allow bool) {
panic("not implemented")
}

func (t *testGater) InterceptAccept(multiaddrs network.ConnMultiaddrs) (allow bool) {
t.Lock()
defer t.Unlock()

return !t.blockAccept
}

func (t *testGater) InterceptSecured(direction network.Direction, id peer.ID, multiaddrs network.ConnMultiaddrs) (allow bool) {
t.Lock()
defer t.Unlock()

return !t.blockSecured
}

func (t *testGater) InterceptUpgraded(conn network.Conn) (allow bool, reason control.DisconnectReason) {
panic("not implemented")
}
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ go 1.12
require (
github.com/ipfs/go-log v1.0.4
github.com/jbenet/go-temp-err-catcher v0.1.0
github.com/libp2p/go-libp2p-core v0.5.3
github.com/libp2p/go-libp2p-core v0.5.5
github.com/libp2p/go-libp2p-mplex v0.2.3
github.com/libp2p/go-libp2p-pnet v0.2.0
github.com/libp2p/go-maddr-filter v0.0.5
github.com/multiformats/go-multiaddr v0.2.1
github.com/multiformats/go-multiaddr-net v0.1.5
github.com/stretchr/testify v1.4.0
Expand Down
10 changes: 4 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,22 @@ github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS
github.com/libp2p/go-libp2p-core v0.2.0/go.mod h1:X0eyB0Gy93v0DZtSYbEM7RnMChm9Uv3j7yRXjO77xSI=
github.com/libp2p/go-libp2p-core v0.3.1/go.mod h1:thvWy0hvaSBhnVBaW37BvzgVV68OUhgJJLAa6almrII=
github.com/libp2p/go-libp2p-core v0.5.0/go.mod h1:49XGI+kc38oGVwqSBhDEwytaAxgZasHhFfQKibzTls0=
github.com/libp2p/go-libp2p-core v0.5.3 h1:b9W3w7AZR2n/YJhG8d0qPFGhGhCWKIvPuJgp4hhc4MM=
github.com/libp2p/go-libp2p-core v0.5.3/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y=
github.com/libp2p/go-libp2p-core v0.5.5 h1:/yiFUZDoBWqvpWeHHJ1iA8SOs5obT1/+UdNfckwD57M=
github.com/libp2p/go-libp2p-core v0.5.5/go.mod h1:vj3awlOr9+GMZJFH9s4mpt9RHHgGqeHCopzbYKZdRjM=
github.com/libp2p/go-libp2p-mplex v0.2.3 h1:2zijwaJvpdesST2MXpI5w9wWFRgYtMcpRX7rrw0jmOo=
github.com/libp2p/go-libp2p-mplex v0.2.3/go.mod h1:CK3p2+9qH9x+7ER/gWWDYJ3QW5ZxWDkm+dVvjfuG3ek=
github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k=
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
github.com/libp2p/go-libp2p-testing v0.1.1 h1:U03z3HnGI7Ni8Xx6ONVZvUFOAzWYmolWf5W5jAOPNmU=
github.com/libp2p/go-libp2p-testing v0.1.1/go.mod h1:xaZWMJrPUM5GlDBxCeGUi7kI4eqnjVyavGroI2nxEM0=
github.com/libp2p/go-maddr-filter v0.0.5 h1:CW3AgbMO6vUvT4kf87y4N+0P8KUl2aqLYhrGyDUbLSg=
github.com/libp2p/go-maddr-filter v0.0.5/go.mod h1:Jk+36PMfIqCJhAnaASRH83bdAvfDRp/w6ENFaC9bG+M=
github.com/libp2p/go-mplex v0.1.2 h1:qOg1s+WdGLlpkrczDqmhYzyk3vCfsQ8+RxRTQjOZWwI=
github.com/libp2p/go-mplex v0.1.2/go.mod h1:Xgz2RDCi3co0LeZfgjm4OgUF15+sVR8SRcu3SFXI1lk=
github.com/libp2p/go-msgio v0.0.4 h1:agEFehY3zWJFUHK6SEMR7UYmk2z6kC3oeCM7ybLhguA=
github.com/libp2p/go-msgio v0.0.4/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ=
github.com/libp2p/go-openssl v0.0.4 h1:d27YZvLoTyMhIN4njrkr8zMDOM4lfpHIp6A+TK9fovg=
github.com/libp2p/go-openssl v0.0.4/go.mod h1:unDrJpgy3oFr+rqXsarWifmJuNnJR4chtO1HmaZjggc=
github.com/libp2p/go-openssl v0.0.5 h1:pQkejVhF0xp08D4CQUcw8t+BFJeXowja6RVcb5p++EA=
github.com/libp2p/go-openssl v0.0.5/go.mod h1:unDrJpgy3oFr+rqXsarWifmJuNnJR4chtO1HmaZjggc=
github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
Expand All @@ -108,7 +108,6 @@ github.com/mr-tron/base58 v1.1.3 h1:v+sk57XuaCKGXpWtVBX8YJzO7hMGx4Aajh4TQbdEFdc=
github.com/mr-tron/base58 v1.1.3/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp8Nq/kkI=
github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA=
github.com/multiformats/go-multiaddr v0.0.1/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
github.com/multiformats/go-multiaddr v0.0.4/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
github.com/multiformats/go-multiaddr v0.2.0/go.mod h1:0nO36NvPpyV4QzvTLi/lafl2y95ncPj0vFwVF6k6wJ4=
github.com/multiformats/go-multiaddr v0.2.1 h1:SgG/cw5vqyB5QQe5FPe2TqggU9WtrA9X4nZw7LlVqOI=
Expand Down Expand Up @@ -172,7 +171,6 @@ golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8 h1:1wopBVtVdWnn03fZelqdXTqk7U7zPQCb+T4rbU9ZEoU=
golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443 h1:IcSOAf4PyMp3U3XbIEj1/xJ2BjNN2jWv7JoyOsMxXUU=
golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand Down
11 changes: 11 additions & 0 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ func (l *listener) handleIncoming() {
return
}

// gate the connection if applicable
if l.upgrader.ConnGater != nil && !l.upgrader.ConnGater.InterceptAccept(maconn) {
log.Debugf("gater blocked incoming connection on local addr %s from %s",
maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())

if err := maconn.Close(); err != nil {
log.Warnf("failed to incoming connection rejected by gater; err: %s", err)
}
continue
}

// The go routine below calls Release when the context is
// canceled so there's no need to wait on it here.
l.threshold.Wait()
Expand Down
135 changes: 46 additions & 89 deletions listener_test.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
package stream_test

import (
"context"
"errors"
"io"
"net"
"sync"
"testing"
"time"

"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/sec/insecure"
"github.com/libp2p/go-libp2p-core/transport"

mplex "github.com/libp2p/go-libp2p-mplex"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"

Expand All @@ -23,94 +18,10 @@ import (
"github.com/stretchr/testify/require"
)

// negotiatingMuxer sets up a new mplex connection
// It makes sure that this happens at the same time for client and server.
type negotiatingMuxer struct{}

func (m *negotiatingMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) {
var err error
// run a fake muxer negotiation
if isServer {
_, err = c.Write([]byte("setup"))
} else {
_, err = c.Read(make([]byte, 5))
}
if err != nil {
return nil, err
}
return mplex.DefaultTransport.NewConn(c, isServer)
}

// blockingMuxer blocks the muxer negotiation until the contain chan is closed
type blockingMuxer struct {
unblock chan struct{}
}

var _ mux.Multiplexer = &blockingMuxer{}

func newBlockingMuxer() *blockingMuxer {
return &blockingMuxer{unblock: make(chan struct{})}
}

func (m *blockingMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) {
<-m.unblock
return (&negotiatingMuxer{}).NewConn(c, isServer)
}

func (m *blockingMuxer) Unblock() {
close(m.unblock)
}

// errorMuxer is a muxer that errors while setting up
type errorMuxer struct{}

var _ mux.Multiplexer = &errorMuxer{}

func (m *errorMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) {
return nil, errors.New("mux error")
}

var (
defaultUpgrader = &st.Upgrader{
Secure: insecure.New(peer.ID(1)),
Muxer: &negotiatingMuxer{},
}
)

func init() {
transport.AcceptTimeout = 1 * time.Hour
}

func testConn(t *testing.T, clientConn, serverConn transport.CapableConn) {
t.Helper()
require := require.New(t)

cstr, err := clientConn.OpenStream()
require.NoError(err)

_, err = cstr.Write([]byte("foobar"))
require.NoError(err)

sstr, err := serverConn.AcceptStream()
require.NoError(err)

b := make([]byte, 6)
_, err = sstr.Read(b)
require.NoError(err)
require.Equal([]byte("foobar"), b)
}

func dial(t *testing.T, upgrader *st.Upgrader, raddr ma.Multiaddr, p peer.ID) (transport.CapableConn, error) {
t.Helper()

macon, err := manet.Dial(raddr)
if err != nil {
return nil, err
}

return upgrader.UpgradeOutbound(context.Background(), nil, macon, p)
}

func createListener(t *testing.T, upgrader *st.Upgrader) transport.Listener {
t.Helper()
require := require.New(t)
Expand Down Expand Up @@ -385,3 +296,49 @@ func TestAcceptQueueBacklogged(t *testing.T) {

require.Eventually(func() bool { return len(errCh) == st.AcceptQueueLength+1 }, 2*time.Second, 100*time.Millisecond)
}

func TestListenerConnectionGater(t *testing.T) {
require := require.New(t)

testGater := &testGater{}
upgrader := *defaultUpgrader
upgrader.ConnGater = testGater

ln := createListener(t, &upgrader)
defer ln.Close()

// no gating.
conn, err := dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(0))
require.NoError(err)
require.False(conn.IsClosed())
_ = conn.Close()

// rejecting after handshake.
testGater.BlockSecured(true)
testGater.BlockAccept(false)
conn, err = dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(0))
require.Error(err)
require.Nil(conn)

// rejecting on accept will trigger first.
testGater.BlockSecured(true)
testGater.BlockAccept(true)
conn, err = dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(0))
require.Error(err)
require.Nil(conn)

// rejecting only on acceptance.
testGater.BlockSecured(false)
testGater.BlockAccept(true)
conn, err = dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(0))
require.Error(err)
require.Nil(conn)

// back to normal
testGater.BlockSecured(false)
testGater.BlockAccept(false)
conn, err = dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(0))
require.NoError(err)
require.False(conn.IsClosed())
_ = conn.Close()
}
38 changes: 22 additions & 16 deletions upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ import (
"context"
"errors"
"fmt"
"github.com/libp2p/go-libp2p-core/network"
"net"

"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/peer"
ipnet "github.com/libp2p/go-libp2p-core/pnet"
"github.com/libp2p/go-libp2p-core/sec"
"github.com/libp2p/go-libp2p-core/transport"
"github.com/libp2p/go-libp2p-pnet"

filter "github.com/libp2p/go-maddr-filter"
manet "github.com/multiformats/go-multiaddr-net"
)

Expand All @@ -27,10 +28,10 @@ var AcceptQueueLength = 16
// Upgrader is a multistream upgrader that can upgrade an underlying connection
// to a full transport connection (secure and multiplexed).
type Upgrader struct {
PSK ipnet.PSK
Secure sec.SecureTransport
Muxer mux.Multiplexer
Filters *filter.Filters
PSK ipnet.PSK
Secure sec.SecureTransport
Muxer mux.Multiplexer
ConnGater connmgr.ConnectionGater
}

// UpgradeListener upgrades the passed multiaddr-net listener into a full libp2p-transport listener.
Expand All @@ -55,22 +56,16 @@ func (u *Upgrader) UpgradeOutbound(ctx context.Context, t transport.Transport, m
if p == "" {
return nil, ErrNilPeer
}
return u.upgrade(ctx, t, maconn, p)
return u.upgrade(ctx, t, maconn, p, network.DirOutbound)
}

// UpgradeInbound upgrades the given inbound multiaddr-net connection into a
// full libp2p-transport connection.
func (u *Upgrader) UpgradeInbound(ctx context.Context, t transport.Transport, maconn manet.Conn) (transport.CapableConn, error) {
return u.upgrade(ctx, t, maconn, "")
return u.upgrade(ctx, t, maconn, "", network.DirInbound)
}

func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn manet.Conn, p peer.ID) (transport.CapableConn, error) {
if u.Filters != nil && u.Filters.AddrBlocked(maconn.RemoteMultiaddr()) {
log.Debugf("blocked connection from %s", maconn.RemoteMultiaddr())
maconn.Close()
return nil, fmt.Errorf("blocked connection from %s", maconn.RemoteMultiaddr())
}

func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn manet.Conn, p peer.ID, dir network.Direction) (transport.CapableConn, error) {
var conn net.Conn = maconn
if u.PSK != nil {
pconn, err := pnet.NewProtectedConn(u.PSK, conn)
Expand All @@ -91,18 +86,29 @@ func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma
return nil, fmt.Errorf("failed to negotiate security protocol: %s", err)
}

// call the connection gater, if one is registered.
if u.ConnGater != nil && !u.ConnGater.InterceptSecured(dir, sconn.RemotePeer(), maconn) {
if err := maconn.Close(); err != nil {
log.Errorf("failed to close connection with peer %s and addr %s; err: %s",
p.Pretty(), maconn.RemoteMultiaddr(), err)
}
return nil, fmt.Errorf("gater rejected connection with peer %s and addr %s with direction %d",
sconn.RemotePeer().Pretty(), maconn.RemoteMultiaddr(), dir)
}

smconn, err := u.setupMuxer(ctx, sconn, p)
if err != nil {
sconn.Close()
return nil, fmt.Errorf("failed to negotiate stream multiplexer: %s", err)
}

return &transportConn{
tc := &transportConn{
MuxedConn: smconn,
ConnMultiaddrs: maconn,
ConnSecurity: sconn,
transport: t,
}, nil
}
return tc, nil
}

func (u *Upgrader) setupSecurity(ctx context.Context, conn net.Conn, p peer.ID) (sec.SecureConn, error) {
Expand Down
Loading

0 comments on commit 0c577bb

Please sign in to comment.