Skip to content

Commit

Permalink
Support BESS protocol (for UML)
Browse files Browse the repository at this point in the history
BESS protocol transferrs L2 packets as AF_UNIX SOCK_SEQPACKET .
BESS protocol has been used by the vector network interfaces of User Mode Linux (UML).

```
(terminal 1) $ bin/gvproxy -debug -listen unix:///tmp/network.sock -listen-bess unixpacket:///tmp/bess.sock
(terminal 2) $ linux.uml vec0:transport=bess,dst=/tmp/bess.sock,depth=128,gro=1,mac=5a:94:ef:e4:0c:ee root=/dev/root rootfstype=hostfs init=/bin/bash mem=2G
(terminal 2: UML)$ ip addr add 192.168.127.2/24 dev vec0
(terminal 2: UML)$ ip link set vec0 up
(terminal 2: UML)$ ip route add default via 192.168.127.254
```

More docs about the User Mode Linux with BESS socket transport: https://www.kernel.org/doc/html/latest/virt/uml/user_mode_linux_howto_v2.html#bess-socket-transport

Signed-off-by: Akihiro Suda <akihiro.suda.cz@hco.ntt.co.jp>
  • Loading branch information
AkihiroSuda committed Jan 5, 2022
1 parent 467763f commit 826b1b6
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 51 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ With gvproxy and the VM discussing on a unix socket:

Made for Windows but also works for Linux and macOS with [HyperKit](https://github.com/moby/hyperkit).

## Run with User Mode Linux

```
(terminal 1) $ bin/gvproxy -debug -listen unix:///tmp/network.sock -listen-bess unixpacket:///tmp/bess.sock
(terminal 2) $ linux.uml vec0:transport=bess,dst=/tmp/bess.sock,depth=128,gro=1,mac=5a:94:ef:e4:0c:ee root=/dev/root rootfstype=hostfs init=/bin/bash mem=2G
(terminal 2: UML)$ ip addr add 192.168.127.2/24 dev vec0
(terminal 2: UML)$ ip link set vec0 up
(terminal 2: UML)$ ip route add default via 192.168.127.254
```

More docs about the User Mode Linux with BESS socket transport: https://www.kernel.org/doc/html/latest/virt/uml/user_mode_linux_howto_v2.html#bess-socket-transport

### Host

#### Windows prerequisites
Expand Down
49 changes: 49 additions & 0 deletions cmd/gvproxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
endpoints arrayFlags
vpnkitSocket string
qemuSocket string
bessSocket string
forwardSocket arrayFlags
forwardDest arrayFlags
forwardUser arrayFlags
Expand All @@ -53,6 +54,7 @@ func main() {
flag.IntVar(&sshPort, "ssh-port", 2222, "Port to access the guest virtual machine. Must be between 1024 and 65535")
flag.StringVar(&vpnkitSocket, "listen-vpnkit", "", "VPNKit socket to be used by Hyperkit")
flag.StringVar(&qemuSocket, "listen-qemu", "", "Socket to be used by Qemu")
flag.StringVar(&bessSocket, "listen-bess", "", "unixpacket socket to be used by Bess-compatible applications")
flag.Var(&forwardSocket, "forward-sock", "Forwards a unix socket to the guest virtual machine over SSH")
flag.Var(&forwardDest, "forward-dest", "Forwards a unix socket to the guest virtual machine over SSH")
flag.Var(&forwardUser, "forward-user", "SSH user to use for unix socket forward")
Expand Down Expand Up @@ -82,9 +84,29 @@ func main() {
exitWithError(errors.Errorf("%q already exists", uri.Path))
}
}
if len(bessSocket) > 0 {
uri, err := url.Parse(bessSocket)
if err != nil || uri == nil {
exitWithError(errors.Wrapf(err, "invalid value for listen-bess"))
}
if uri.Scheme != "unixpacket" {
exitWithError(errors.New("listen-bess must be unixpacket:// address"))
}
if _, err := os.Stat(uri.Path); err == nil {
exitWithError(errors.Errorf("%q already exists", uri.Path))
}
}

if vpnkitSocket != "" && qemuSocket != "" {
exitWithError(errors.New("cannot use qemu and vpnkit protocol at the same time"))
}
if vpnkitSocket != "" && bessSocket != "" {
exitWithError(errors.New("cannot use bess and vpnkit protocol at the same time"))
}
if qemuSocket != "" && bessSocket != "" {
exitWithError(errors.New("cannot use qemu and bess protocol at the same time"))
}

// If the given port is not between the privileged ports
// and the oft considered maximum port, return an error.
if sshPort < 1024 || sshPort > 65535 {
Expand All @@ -94,6 +116,9 @@ func main() {
if qemuSocket != "" {
protocol = types.QemuProtocol
}
if bessSocket != "" {
protocol = types.BessProtocol
}

if c := len(forwardSocket); c != len(forwardDest) || c != len(forwardUser) || c != len(forwardIdentify) {
exitWithError(errors.New("-forward-sock, --forward-dest, --forward-user, and --forward-identity must all be specified together, " +
Expand Down Expand Up @@ -309,6 +334,30 @@ func run(ctx context.Context, g *errgroup.Group, configuration *types.Configurat
})
}

if bessSocket != "" {
bessListener, err := transport.Listen(bessSocket)
if err != nil {
return err
}

g.Go(func() error {
<-ctx.Done()
if err := bessListener.Close(); err != nil {
log.Errorf("error closing %s: %q", bessSocket, err)
}
return os.Remove(bessSocket)
})

g.Go(func() error {
conn, err := bessListener.Accept()
if err != nil {
return errors.Wrap(err, "bess accept error")

}
return vn.AcceptBess(ctx, conn)
})
}

for i := 0; i < len(forwardSocket); i++ {
dest := url.URL{
Scheme: "ssh",
Expand Down
20 changes: 20 additions & 0 deletions pkg/tap/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ package tap
import "encoding/binary"

type protocol interface {
Stream() bool
}

type streamProtocol interface {
protocol
Buf() []byte
Write(buf []byte, size int)
Read(buf []byte) int
Expand All @@ -11,6 +16,10 @@ type protocol interface {
type hyperkitProtocol struct {
}

func (s *hyperkitProtocol) Stream() bool {
return true
}

func (s *hyperkitProtocol) Buf() []byte {
return make([]byte, 2)
}
Expand All @@ -26,6 +35,10 @@ func (s *hyperkitProtocol) Read(buf []byte) int {
type qemuProtocol struct {
}

func (s *qemuProtocol) Stream() bool {
return true
}

func (s *qemuProtocol) Buf() []byte {
return make([]byte, 4)
}
Expand All @@ -37,3 +50,10 @@ func (s *qemuProtocol) Write(buf []byte, size int) {
func (s *qemuProtocol) Read(buf []byte) int {
return int(binary.BigEndian.Uint32(buf[0:4]))
}

type bessProtocol struct {
}

func (s *bessProtocol) Stream() bool {
return false
}
168 changes: 119 additions & 49 deletions pkg/tap/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,23 @@ func (e *Switch) connect(conn net.Conn) (int, bool) {
}

func (e *Switch) tx(src, dst tcpip.LinkAddress, pkt *stack.PacketBuffer) error {
size := e.protocol.Buf()
e.protocol.Write(size, pkt.Size())
if e.protocol.Stream() {
return e.txStream(src, dst, pkt, e.protocol.(streamProtocol))
}
return e.txNonStream(src, dst, pkt)
}

func (e *Switch) txNonStream(src, dst tcpip.LinkAddress, pkt *stack.PacketBuffer) error {
return e.txBuf(src, dst, pkt, nil)
}

func (e *Switch) txStream(src, dst tcpip.LinkAddress, pkt *stack.PacketBuffer, sProtocol streamProtocol) error {
size := sProtocol.Buf()
sProtocol.Write(size, pkt.Size())
return e.txBuf(src, dst, pkt, size)
}

func (e *Switch) txBuf(src, dst tcpip.LinkAddress, pkt *stack.PacketBuffer, size []byte) error {
e.writeLock.Lock()
defer e.writeLock.Unlock()

Expand All @@ -132,12 +146,23 @@ func (e *Switch) tx(src, dst tcpip.LinkAddress, pkt *stack.PacketBuffer) error {
if id == srcID {
continue
}
if _, err := conn.Write(size); err != nil {
e.disconnect(id, conn)
return err
}
for _, view := range pkt.Views() {
if _, err := conn.Write(view); err != nil {
if len(size) > 0 {
if _, err := conn.Write(size); err != nil {
e.disconnect(id, conn)
return err
}
for _, view := range pkt.Views() {
if _, err := conn.Write(view); err != nil {
e.disconnect(id, conn)
return err
}
}
} else {
var b []byte
for _, view := range pkt.Views() {
b = append(b, []byte(view)...)
}
if _, err := conn.Write(b); err != nil {
e.disconnect(id, conn)
return err
}
Expand All @@ -154,12 +179,23 @@ func (e *Switch) tx(src, dst tcpip.LinkAddress, pkt *stack.PacketBuffer) error {
}
e.camLock.RUnlock()
conn := e.conns[id]
if _, err := conn.Write(size); err != nil {
e.disconnect(id, conn)
return err
}
for _, view := range pkt.Views() {
if _, err := conn.Write(view); err != nil {
if len(size) > 0 {
if _, err := conn.Write(size); err != nil {
e.disconnect(id, conn)
return err
}
for _, view := range pkt.Views() {
if _, err := conn.Write(view); err != nil {
e.disconnect(id, conn)
return err
}
}
} else {
var b []byte
for _, view := range pkt.Views() {
b = append(b, []byte(view)...)
}
if _, err := conn.Write(b); err != nil {
e.disconnect(id, conn)
return err
}
Expand All @@ -183,7 +219,34 @@ func (e *Switch) disconnect(id int, conn net.Conn) {
}

func (e *Switch) rx(ctx context.Context, id int, conn net.Conn) error {
sizeBuf := e.protocol.Buf()
if e.protocol.Stream() {
return e.rxStream(ctx, id, conn, e.protocol.(streamProtocol))
}
return e.rxNonStream(ctx, id, conn)
}

func (e *Switch) rxNonStream(ctx context.Context, id int, conn net.Conn) error {
bufSize := 1024 * 128
buf := make([]byte, bufSize)
loop:
for {
select {
case <-ctx.Done():
break loop
default:
// passthrough
}
n, err := conn.Read(buf)
if err != nil {
return errors.Wrap(err, "cannot read size from socket")
}
e.rxBuf(ctx, id, buf[:n])
}
return nil
}

func (e *Switch) rxStream(ctx context.Context, id int, conn net.Conn, sProtocol streamProtocol) error {
sizeBuf := sProtocol.Buf()
loop:
for {
select {
Expand All @@ -196,54 +259,61 @@ loop:
if err != nil {
return errors.Wrap(err, "cannot read size from socket")
}
size := e.protocol.Read(sizeBuf)
size := sProtocol.Read(sizeBuf)

buf := make([]byte, size)
_, err = io.ReadFull(conn, buf)
if err != nil {
return errors.Wrap(err, "cannot read packet from socket")
}
e.rxBuf(ctx, id, buf)
}
return nil
}

if e.debug {
packet := gopacket.NewPacket(buf, layers.LayerTypeEthernet, gopacket.Default)
log.Info(packet.String())
}

view := buffer.View(buf)
eth := header.Ethernet(view)
vv := buffer.NewVectorisedView(len(view), []buffer.View{view})
func (e *Switch) rxBuf(ctx context.Context, id int, buf []byte) {
if e.debug {
packet := gopacket.NewPacket(buf, layers.LayerTypeEthernet, gopacket.Default)
log.Info(packet.String())
}

e.camLock.Lock()
e.cam[eth.SourceAddress()] = id
e.camLock.Unlock()
view := buffer.View(buf)
eth := header.Ethernet(view)
vv := buffer.NewVectorisedView(len(view), []buffer.View{view})

if eth.DestinationAddress() != e.gateway.LinkAddress() {
if err := e.tx(eth.SourceAddress(), eth.DestinationAddress(), stack.NewPacketBuffer(stack.PacketBufferOptions{
Data: vv,
})); err != nil {
log.Error(err)
}
}
if eth.DestinationAddress() == e.gateway.LinkAddress() || eth.DestinationAddress() == header.EthernetBroadcastAddress {
vv.TrimFront(header.EthernetMinimumSize)
e.gateway.DeliverNetworkPacket(
eth.SourceAddress(),
eth.DestinationAddress(),
eth.Type(),
stack.NewPacketBuffer(stack.PacketBufferOptions{
Data: vv,
}),
)
e.camLock.Lock()
e.cam[eth.SourceAddress()] = id
e.camLock.Unlock()

if eth.DestinationAddress() != e.gateway.LinkAddress() {
if err := e.tx(eth.SourceAddress(), eth.DestinationAddress(), stack.NewPacketBuffer(stack.PacketBufferOptions{
Data: vv,
})); err != nil {
log.Error(err)
}

atomic.AddUint64(&e.Received, uint64(size))
}
return nil
if eth.DestinationAddress() == e.gateway.LinkAddress() || eth.DestinationAddress() == header.EthernetBroadcastAddress {
vv.TrimFront(header.EthernetMinimumSize)
e.gateway.DeliverNetworkPacket(
eth.SourceAddress(),
eth.DestinationAddress(),
eth.Type(),
stack.NewPacketBuffer(stack.PacketBufferOptions{
Data: vv,
}),
)
}

atomic.AddUint64(&e.Received, uint64(len(buf)))
}

func protocolImplementation(protocol types.Protocol) protocol {
if protocol == types.QemuProtocol {
switch protocol {
case types.QemuProtocol:
return &qemuProtocol{}
case types.BessProtocol:
return &bessProtocol{}
default:
return &hyperkitProtocol{}
}
return &hyperkitProtocol{}
}
4 changes: 2 additions & 2 deletions pkg/transport/listen_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func Listen(endpoint string) (net.Listener, error) {
return nil, err
}
return mdlayhervsock.Listen(uint32(port))
case "unix":
return net.Listen("unix", parsed.Path)
case "unix", "unixpacket":
return net.Listen(parsed.Scheme, parsed.Path)
case "tcp":
return net.Listen("tcp", parsed.Host)
default:
Expand Down
2 changes: 2 additions & 0 deletions pkg/types/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Configuration struct {
// Qemu or Hyperkit protocol
// Qemu protocol is 32bits big endian size of the packet, then the packet.
// Hyperkit protocol is handshake, then 16bits little endian size of packet, then the packet.
// Bess protocol transfers bare L2 packets as SOCK_SEQPACKET.
Protocol Protocol
}

Expand All @@ -59,6 +60,7 @@ type Protocol string
const (
HyperKitProtocol Protocol = "hyperkit"
QemuProtocol Protocol = "qemu"
BessProtocol Protocol = "bess"
)

type Zone struct {
Expand Down
Loading

0 comments on commit 826b1b6

Please sign in to comment.