Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support SCTP port mapping #1825

Merged
merged 1 commit into from
Feb 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
371 changes: 218 additions & 153 deletions agent.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ message PortConfig {

TCP = 0 [(gogoproto.enumvalue_customname) = "ProtocolTCP"];
UDP = 1 [(gogoproto.enumvalue_customname) = "ProtocolUDP"];
SCTP = 2 [(gogoproto.enumvalue_customname) = "ProtocolSCTP"];
}

// Name for the port. If provided the port information can
Expand Down
7 changes: 6 additions & 1 deletion cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"os"
"os/signal"
"syscall"

"github.com/ishidawataru/sctp"
)

func main() {
Expand All @@ -28,7 +30,7 @@ func main() {
p.Run()
}

// parseHostContainerAddrs parses the flags passed on reexec to create the TCP or UDP
// parseHostContainerAddrs parses the flags passed on reexec to create the TCP/UDP/SCTP
// net.Addrs to map the host and container ports
func parseHostContainerAddrs() (host net.Addr, container net.Addr) {
var (
Expand All @@ -48,6 +50,9 @@ func parseHostContainerAddrs() (host net.Addr, container net.Addr) {
case "udp":
host = &net.UDPAddr{IP: net.ParseIP(*hostIP), Port: *hostPort}
container = &net.UDPAddr{IP: net.ParseIP(*containerIP), Port: *containerPort}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should probably update the comment at the start of the function to include SCTP.

case "sctp":
host = &sctp.SCTPAddr{IP: []net.IP{net.ParseIP(*hostIP)}, Port: *hostPort}
container = &sctp.SCTPAddr{IP: []net.IP{net.ParseIP(*containerIP)}, Port: *containerPort}
default:
log.Fatalf("unsupported protocol %s", *proto)
}
Expand Down
75 changes: 64 additions & 11 deletions cmd/proxy/network_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"strings"
"testing"
"time"

"github.com/ishidawataru/sctp"
)

var _ = flag.Bool("incontainer", false, "Indicates if the test is running in a container")
Expand All @@ -27,7 +29,7 @@ type EchoServerOptions struct {
TCPHalfClose bool
}

type TCPEchoServer struct {
type StreamEchoServer struct {
listener net.Listener
testCtx *testing.T
opts EchoServerOptions
Expand All @@ -40,26 +42,40 @@ type UDPEchoServer struct {

func NewEchoServer(t *testing.T, proto, address string, opts EchoServerOptions) EchoServer {
var server EchoServer
if strings.HasPrefix(proto, "tcp") {
if !strings.HasPrefix(proto, "tcp") && opts.TCPHalfClose {
t.Fatalf("TCPHalfClose is not supported for %s", proto)
}

switch {
case strings.HasPrefix(proto, "tcp"):
listener, err := net.Listen(proto, address)
if err != nil {
t.Fatal(err)
}
server = &TCPEchoServer{listener: listener, testCtx: t, opts: opts}
} else {
if opts.TCPHalfClose {
t.Fatalf("TCPHalfClose is not supported for %s", proto)
}
server = &StreamEchoServer{listener: listener, testCtx: t, opts: opts}
case strings.HasPrefix(proto, "udp"):
socket, err := net.ListenPacket(proto, address)
if err != nil {
t.Fatal(err)
}
server = &UDPEchoServer{conn: socket, testCtx: t}
case strings.HasPrefix(proto, "sctp"):
addr, err := sctp.ResolveSCTPAddr(proto, address)
if err != nil {
t.Fatal(err)
}
listener, err := sctp.ListenSCTP(proto, addr)
if err != nil {
t.Fatal(err)
}
server = &StreamEchoServer{listener: listener, testCtx: t}
default:
t.Fatalf("unknown protocol: %s", proto)
}
return server
}

func (server *TCPEchoServer) Run() {
func (server *StreamEchoServer) Run() {
go func() {
for {
client, err := server.listener.Accept()
Expand Down Expand Up @@ -87,8 +103,8 @@ func (server *TCPEchoServer) Run() {
}()
}

func (server *TCPEchoServer) LocalAddr() net.Addr { return server.listener.Addr() }
func (server *TCPEchoServer) Close() { server.listener.Close() }
func (server *StreamEchoServer) LocalAddr() net.Addr { return server.listener.Addr() }
func (server *StreamEchoServer) Close() { server.listener.Close() }

func (server *UDPEchoServer) Run() {
go func() {
Expand All @@ -115,7 +131,19 @@ func (server *UDPEchoServer) Close() { server.conn.Close() }
func testProxyAt(t *testing.T, proto string, proxy Proxy, addr string, halfClose bool) {
defer proxy.Close()
go proxy.Run()
client, err := net.Dial(proto, addr)
var client net.Conn
var err error
if strings.HasPrefix(proto, "sctp") {
var a *sctp.SCTPAddr
a, err = sctp.ResolveSCTPAddr(proto, addr)
if err != nil {
t.Fatal(err)
}
client, err = sctp.DialSCTP(proto, nil, a)
} else {
client, err = net.Dial(proto, addr)
}

if err != nil {
t.Fatalf("Can't connect to the proxy: %v", err)
}
Expand Down Expand Up @@ -253,3 +281,28 @@ func TestUDPWriteError(t *testing.T) {
t.Fatal(fmt.Errorf("Expected [%v] but got [%v]", testBuf, recvBuf))
}
}

func TestSCTP4Proxy(t *testing.T) {
backend := NewEchoServer(t, "sctp", "127.0.0.1:0", EchoServerOptions{})
defer backend.Close()
backend.Run()
frontendAddr := &sctp.SCTPAddr{IP: []net.IP{net.IPv4(127, 0, 0, 1)}, Port: 0}
proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
if err != nil {
t.Fatal(err)
}
testProxy(t, "sctp", proxy, false)
}

func TestSCTP6Proxy(t *testing.T) {
t.Skip("Need to start CI docker with --ipv6")
backend := NewEchoServer(t, "sctp", "[::1]:0", EchoServerOptions{})
defer backend.Close()
backend.Run()
frontendAddr := &sctp.SCTPAddr{IP: []net.IP{net.IPv6loopback}, Port: 0}
proxy, err := NewProxy(frontendAddr, backend.LocalAddr())
if err != nil {
t.Fatal(err)
}
testProxy(t, "sctp", proxy, false)
}
4 changes: 4 additions & 0 deletions cmd/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package main

import (
"net"

"github.com/ishidawataru/sctp"
)

// Proxy defines the behavior of a proxy. It forwards traffic back and forth
Expand All @@ -30,6 +32,8 @@ func NewProxy(frontendAddr, backendAddr net.Addr) (Proxy, error) {
return NewUDPProxy(frontendAddr.(*net.UDPAddr), backendAddr.(*net.UDPAddr))
case *net.TCPAddr:
return NewTCPProxy(frontendAddr.(*net.TCPAddr), backendAddr.(*net.TCPAddr))
case *sctp.SCTPAddr:
return NewSCTPProxy(frontendAddr.(*sctp.SCTPAddr), backendAddr.(*sctp.SCTPAddr))
default:
panic("Unsupported protocol")
}
Expand Down
93 changes: 93 additions & 0 deletions cmd/proxy/sctp_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package main

import (
"io"
"log"
"net"
"sync"

"github.com/ishidawataru/sctp"
)

// SCTPProxy is a proxy for SCTP connections. It implements the Proxy interface to
// handle SCTP traffic forwarding between the frontend and backend addresses.
type SCTPProxy struct {
listener *sctp.SCTPListener
frontendAddr *sctp.SCTPAddr
backendAddr *sctp.SCTPAddr
}

// NewSCTPProxy creates a new SCTPProxy.
func NewSCTPProxy(frontendAddr, backendAddr *sctp.SCTPAddr) (*SCTPProxy, error) {
listener, err := sctp.ListenSCTP("sctp", frontendAddr)
if err != nil {
return nil, err
}
// If the port in frontendAddr was 0 then ListenSCTP will have a picked
// a port to listen on, hence the call to Addr to get that actual port:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean that if the port is not specified is ephemeral?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. the comment is taken from tcp_proxy.go.

return &SCTPProxy{
listener: listener,
frontendAddr: listener.Addr().(*sctp.SCTPAddr),
backendAddr: backendAddr,
}, nil
}

func (proxy *SCTPProxy) clientLoop(client *sctp.SCTPConn, quit chan bool) {
backend, err := sctp.DialSCTP("sctp", nil, proxy.backendAddr)
if err != nil {
log.Printf("Can't forward traffic to backend sctp/%v: %s\n", proxy.backendAddr, err)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please change this to use logrus package.
es: logrus.WithError(err).Errorf("[...]")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, cmd/proxy package does not use logrus.
Not sure why, probably for reducing dependency.

client.Close()
return
}
clientC := sctp.NewSCTPSndRcvInfoWrappedConn(client)
backendC := sctp.NewSCTPSndRcvInfoWrappedConn(backend)

var wg sync.WaitGroup
var broker = func(to, from net.Conn) {
io.Copy(to, from)
from.Close()
to.Close()
wg.Done()
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't appear that 'dir' is used here. Should it be removed and the invocations below adjusted?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, thought the placement would show more context. I mean the "dir" parameter of the broker function 7 lines up.

wg.Add(2)
go broker(clientC, backendC)
go broker(backendC, clientC)

finish := make(chan struct{})
go func() {
wg.Wait()
close(finish)
}()

select {
case <-quit:
case <-finish:
}
clientC.Close()
backendC.Close()
<-finish
}

// Run starts forwarding the traffic using SCTP.
func (proxy *SCTPProxy) Run() {
quit := make(chan bool)
defer close(quit)
for {
client, err := proxy.listener.Accept()
if err != nil {
log.Printf("Stopping proxy on sctp/%v for sctp/%v (%s)", proxy.frontendAddr, proxy.backendAddr, err)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here use logrus

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, cmd/proxy looks intentionally avoiding using logrus.

return
}
go proxy.clientLoop(client.(*sctp.SCTPConn), quit)
}
}

// Close stops forwarding the traffic.
func (proxy *SCTPProxy) Close() { proxy.listener.Close() }

// FrontendAddr returns the SCTP address on which the proxy is listening.
func (proxy *SCTPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr }

// BackendAddr returns the SCTP proxied address.
func (proxy *SCTPProxy) BackendAddr() net.Addr { return proxy.backendAddr }
4 changes: 4 additions & 0 deletions drivers/bridge/port_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net"

"github.com/docker/libnetwork/types"
"github.com/ishidawataru/sctp"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -92,6 +93,9 @@ func (n *bridgeNetwork) allocatePort(bnd *types.PortBinding, containerIP, defHos
case *net.UDPAddr:
bnd.HostPort = uint16(host.(*net.UDPAddr).Port)
return nil
case *sctp.SCTPAddr:
bnd.HostPort = uint16(host.(*sctp.SCTPAddr).Port)
return nil
default:
// For completeness
return ErrUnsupportedAddressType(fmt.Sprintf("%T", netAddr))
Expand Down
11 changes: 7 additions & 4 deletions drivers/bridge/port_mapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func TestPortMappingConfig(t *testing.T) {

binding1 := types.PortBinding{Proto: types.UDP, Port: uint16(400), HostPort: uint16(54000)}
binding2 := types.PortBinding{Proto: types.TCP, Port: uint16(500), HostPort: uint16(65000)}
portBindings := []types.PortBinding{binding1, binding2}
binding3 := types.PortBinding{Proto: types.SCTP, Port: uint16(300), HostPort: uint16(65000)}
portBindings := []types.PortBinding{binding1, binding2, binding3}

sbOptions := make(map[string]interface{})
sbOptions[netlabel.PortMap] = portBindings
Expand Down Expand Up @@ -69,15 +70,17 @@ func TestPortMappingConfig(t *testing.T) {
t.Fatalf("Cannot find network %s inside driver", "dummy")
}
ep, _ := network.endpoints["ep1"]
if len(ep.portMapping) != 2 {
if len(ep.portMapping) != 3 {
t.Fatalf("Failed to store the port bindings into the sandbox info. Found: %v", ep.portMapping)
}
if ep.portMapping[0].Proto != binding1.Proto || ep.portMapping[0].Port != binding1.Port ||
ep.portMapping[1].Proto != binding2.Proto || ep.portMapping[1].Port != binding2.Port {
ep.portMapping[1].Proto != binding2.Proto || ep.portMapping[1].Port != binding2.Port ||
ep.portMapping[2].Proto != binding3.Proto || ep.portMapping[2].Port != binding3.Port {
t.Fatal("bridgeEndpoint has incorrect port mapping values")
}
if ep.portMapping[0].HostIP == nil || ep.portMapping[0].HostPort == 0 ||
ep.portMapping[1].HostIP == nil || ep.portMapping[1].HostPort == 0 {
ep.portMapping[1].HostIP == nil || ep.portMapping[1].HostPort == 0 ||
ep.portMapping[2].HostIP == nil || ep.portMapping[2].HostPort == 0 {
t.Fatal("operational port mapping data not found on bridgeEndpoint")
}

Expand Down
26 changes: 25 additions & 1 deletion iptables/iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,31 @@ func (c *ChainInfo) Forward(action Action, ip net.IP, port int, proto, destAddr
"--dport", strconv.Itoa(destPort),
"-j", "MASQUERADE",
}
return ProgramRule(Nat, "POSTROUTING", action, args)

if err := ProgramRule(Nat, "POSTROUTING", action, args); err != nil {
return err
}

if proto == "sctp" {
// Linux kernel v4.9 and below enables NETIF_F_SCTP_CRC for veth by
// the following commit.
// This introduces a problem when conbined with a physical NIC without
// NETIF_F_SCTP_CRC. As for a workaround, here we add an iptables entry
// to fill the checksum.
//
// https://github.com/torvalds/linux/commit/c80fafbbb59ef9924962f83aac85531039395b18
args = []string{
"-p", proto,
"--sport", strconv.Itoa(destPort),
"-j", "CHECKSUM",
"--checksum-fill",
}
if err := ProgramRule(Mangle, "POSTROUTING", action, args); err != nil {
return err
}
}

return nil
}

// Link adds reciprocal ACCEPT rule for two supplied IP addresses.
Expand Down
7 changes: 4 additions & 3 deletions portallocator/portallocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (p *PortAllocator) RequestPortInRange(ip net.IP, proto string, portStart, p
p.mutex.Lock()
defer p.mutex.Unlock()

if proto != "tcp" && proto != "udp" {
if proto != "tcp" && proto != "udp" && proto != "sctp" {
return 0, ErrUnknownProtocol
}

Expand All @@ -131,8 +131,9 @@ func (p *PortAllocator) RequestPortInRange(ip net.IP, proto string, portStart, p
protomap, ok := p.ipMap[ipstr]
if !ok {
protomap = protoMap{
"tcp": p.newPortMap(),
"udp": p.newPortMap(),
"tcp": p.newPortMap(),
"udp": p.newPortMap(),
"sctp": p.newPortMap(),
}

p.ipMap[ipstr] = protomap
Expand Down
Loading