Skip to content

Commit

Permalink
Attempt to make Blockhole blocking on L7
Browse files Browse the repository at this point in the history
Based on Fu Wei's idea, we employ blocking on L7 but without using
external tools.

The main idea is to read out `X-PeerURLs` from the header, since this
is how peer traffic identifies itself to others. As we also know that
all node will create direct connections with its peers, so the traffic
blocking will actually happen at all nodes' proxy (contrary to the
current design, where only the proxy of the peer that's being blackholed.

However, this way of blocking (by `X-PeerURLs` from the header) will
miss certain types of traffic to certain endpoints,
e.g. /members, /version, and /raft/probing.

See below for the log extract,
as it's header doesn't contain X-PeerURLs information.
- map[Accept-Encoding:[gzip] User-Agent:[Go-http-client/1.1]] /members
- map[Accept-Encoding:[gzip] User-Agent:[Go-http-client/1.1]] /version
- map[Accept-Encoding:[gzip] User-Agent:[Go-http-client/1.1]] /raft/probing

In order to read out `X-PeerURLs` from the header, we need to terminate
the SSL connection, as we can't drop cleartext traffic (ref [1]). Thus,
a new option `e2e.WithSSLTerminationProxy(true)` is introduced, which
will change the network flow into

A -- B's SSL termination proxy - B's transparent proxy - B
     ^ newly introduced          ^ in the original codebase

This prototype doesn't address
- blocking only RX or TX traffic
- slowness when performing test cleanup
, and the coding convention needs to be improved as it's still a PoC

References:
[1] etcd-io#15595
  • Loading branch information
henrybear327 committed Apr 23, 2024
1 parent 01c8097 commit acf10cf
Show file tree
Hide file tree
Showing 5 changed files with 355 additions and 58 deletions.
239 changes: 218 additions & 21 deletions pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package proxy

import (
"context"
"bufio"
"bytes"
"crypto/tls"
"fmt"
"io"
mrand "math/rand"
Expand All @@ -31,12 +33,14 @@ import (
"go.uber.org/zap"

"go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/tests/v3/framework/testutils"
)

var (
defaultDialTimeout = 3 * time.Second
defaultBufferSize = 48 * 1024
defaultRetryInterval = 10 * time.Millisecond
FixturesDir = testutils.MustAbsPath("../fixtures")
)

// Server defines proxy server layer that simulates common network faults:
Expand Down Expand Up @@ -137,6 +141,15 @@ type ServerConfig struct {
DialTimeout time.Duration
BufferSize int
RetryInterval time.Duration

IsSSLTerminatingProxy bool
TerminatingTLSInfo transport.TLSInfo
// for SSL termination proxy: put outgoing connection local addr - connection initiated by peerURLs
// for transparent proxy: read incoming connection remote addr - check BlackholeMap to see if peerURLs is blocked
ConnectionMap map[string]string
ConnectionMapMu *sync.RWMutex
BlackholeMap map[string]bool // peerURLs to be blackholed - true
BlackholeMapMu *sync.RWMutex
}

type server struct {
Expand All @@ -150,6 +163,13 @@ type server struct {
tlsInfo transport.TLSInfo
dialTimeout time.Duration

terminatingTLSInfo transport.TLSInfo
isSSLTerminatingProxy bool
connectionMap map[string]string
connectionMapMu *sync.RWMutex
blackholeMap map[string]bool
blackholeMapMu *sync.RWMutex

bufferSize int
retryInterval time.Duration

Expand Down Expand Up @@ -200,6 +220,13 @@ func NewServer(cfg ServerConfig) Server {
tlsInfo: cfg.TLSInfo,
dialTimeout: cfg.DialTimeout,

isSSLTerminatingProxy: cfg.IsSSLTerminatingProxy,
terminatingTLSInfo: cfg.TerminatingTLSInfo,
connectionMap: cfg.ConnectionMap,
connectionMapMu: cfg.ConnectionMapMu,
blackholeMap: cfg.BlackholeMap,
blackholeMapMu: cfg.BlackholeMapMu,

bufferSize: cfg.BufferSize,
retryInterval: cfg.RetryInterval,

Expand All @@ -216,6 +243,7 @@ func NewServer(cfg ServerConfig) Server {
if err == nil {
s.fromPort, _ = strconv.Atoi(fromPort)
}

var toPort string
_, toPort, err = net.SplitHostPort(cfg.To.Host)
if err == nil {
Expand Down Expand Up @@ -250,7 +278,11 @@ func NewServer(cfg ServerConfig) Server {

var ln net.Listener
if !s.tlsInfo.Empty() {
ln, err = transport.NewListener(addr, s.from.Scheme, &s.tlsInfo)
if s.isSSLTerminatingProxy {
ln, err = transport.NewListener(addr, "https", &s.tlsInfo)
} else {
ln, err = transport.NewListener(addr, s.from.Scheme, &s.tlsInfo)
}
} else {
ln, err = net.Listen(s.from.Scheme, addr)
}
Expand All @@ -265,6 +297,7 @@ func NewServer(cfg ServerConfig) Server {
go s.listenAndServe()

s.lg.Info("started proxying", zap.String("from", s.From()), zap.String("to", s.To()))

return s
}

Expand All @@ -284,7 +317,7 @@ func (s *server) To() string {
func (s *server) listenAndServe() {
defer s.closeWg.Done()

ctx := context.Background()
// ctx := context.Background()
s.lg.Info("proxy is listening on", zap.String("from", s.From()))
close(s.readyc)

Expand Down Expand Up @@ -355,22 +388,37 @@ func (s *server) listenAndServe() {

var out net.Conn
if !s.tlsInfo.Empty() {
var tp *http.Transport
tp, err = transport.NewTransport(s.tlsInfo, s.dialTimeout)
// Not sure why this is not working...
// var tp *http.Transport
// tp, err = transport.NewTransport(s.tlsInfo, s.dialTimeout)
// if err != nil {
// select {
// case s.errc <- err:
// select {
// case <-s.donec:
// return
// default:
// }
// case <-s.donec:
// return
// }
// continue
// }
// out, err = tp.DialContext(ctx, s.to.Scheme, s.to.Host)

// using simple Cert loading for now
var cert tls.Certificate
CertPath := FixturesDir + "/server.crt"
PrivateKeyPath := FixturesDir + "/server.key.insecure"
cert, err = tls.LoadX509KeyPair(CertPath, PrivateKeyPath)
if err != nil {
select {
case s.errc <- err:
select {
case <-s.donec:
return
default:
}
case <-s.donec:
return
}
continue
panic(err)
}
out, err = tp.DialContext(ctx, s.to.Scheme, s.to.Host)
conf := &tls.Config{
InsecureSkipVerify: true,
Certificates: []tls.Certificate{cert},
}
out, err = tls.Dial(s.to.Scheme, s.to.Host, conf)
} else {
out, err = net.Dial(s.to.Scheme, s.to.Host)
}
Expand All @@ -393,36 +441,72 @@ func (s *server) listenAndServe() {
go func() {
defer s.closeWg.Done()
// read incoming bytes from listener, dispatch to outgoing connection

s.transmit(out, in)
out.Close()
in.Close()

if s.isSSLTerminatingProxy {
// when the connection is closed, we delete the entry from the map
s.connectionMapMu.Lock()
port, err := getPort(out.LocalAddr().String())
if err != nil {
panic("")
}
delete(s.connectionMap, port)
s.connectionMapMu.Unlock()
}
}()
go func() {
defer s.closeWg.Done()
// read response from outgoing connection, write back to listener
s.receive(in, out)
in.Close()
out.Close()

if s.isSSLTerminatingProxy {
// when the connection is closed, we delete the entry from the map
s.connectionMapMu.Lock()
port, err := getPort(out.LocalAddr().String())
if err != nil {
panic("")
}
delete(s.connectionMap, port)
s.connectionMapMu.Unlock()
}
}()
}
}

func (s *server) transmit(dst io.Writer, src io.Reader) {
func (s *server) transmit(dst, src net.Conn) {
s.ioCopy(dst, src, proxyTx)
}

func (s *server) receive(dst io.Writer, src io.Reader) {
func (s *server) receive(dst, src net.Conn) {
s.ioCopy(dst, src, proxyRx)
}

func getPort(host string) (string, error) {
if strings.HasPrefix(host, "https") {
return strings.Replace(host, "https://localhost:", "", 1), nil
}

_, port, err := net.SplitHostPort(host)
if err != nil {
return "", nil
}
// return strconv.Atoi(port)
return port, nil
}

type proxyType uint8

const (
proxyTx proxyType = iota
proxyRx
)

func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) {
func (s *server) ioCopy(dst, src net.Conn, ptype proxyType) {
buf := make([]byte, s.bufferSize)
for {
nr1, err := src.Read(buf)
Expand Down Expand Up @@ -455,6 +539,53 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) {
}
data := buf[:nr1]

// attempt to obtain the header information
// this approach won't work since we are looking at encrypted bytestream, and headers are also encrypted
// unless we are able to decrypt this
if s.isSSLTerminatingProxy && ptype == proxyTx {
/*
if we are isolating A from B and C
case 1 - Proxy of A
The incoming traffic and outgoing traffic are already handled by the original design
case 2 - Proxy of B and C
The outgoing traffic to A is already handled by the original design
The incoming traffic from A will have to be inspected at all SSL termination proxy, by extracting
X-PeerURLs field from the Header. Since the SSL termination proxy will initiate a new connection
to forward the traffic to the transparent proxy (the original proxy), we will record which port
that the new forwarding connection is using in a map shared by SSL termination proxy and the
transparent proxy, containing the key-value pair of port-peer mapping. So now at the transparent
proxy, we can identify which traffic is coming from which node!
Note A: failed to decode headers and no X-PeerURLs
After looking into the logs, we can see the following traffic that doesn't have the X-PeerURLs present
- map[Accept-Encoding:[gzip] User-Agent:[Go-http-client/1.1]] /members
- map[Accept-Encoding:[gzip] User-Agent:[Go-http-client/1.1]] /version
- map[Accept-Encoding:[gzip] User-Agent:[Go-http-client/1.1]] /raft/probing
*/
if req, err := http.ReadRequest(bufio.NewReader(bytes.NewBuffer(data))); err != nil {
// check note A
} else {
peerURLs := req.Header.Get("X-PeerURLs")

if len(peerURLs) == 0 {
// check note A
} else {
s.connectionMapMu.Lock()
port, err := getPort(dst.LocalAddr().String())
if err != nil {
panic("")
}
blockingPort, err := getPort(peerURLs) // TODO: make this able to parse multiple peerURLs?
if err != nil {
panic("")
}
s.connectionMap[port] = blockingPort
s.connectionMapMu.Unlock()
}
}
}

// alters/corrupts/drops data
switch ptype {
case proxyTx:
Expand All @@ -472,6 +603,40 @@ func (s *server) ioCopy(dst io.Writer, src io.Reader, ptype proxyType) {
default:
panic("unknown proxy type")
}

// we also block off the traffic to targetted node
if !s.isSSLTerminatingProxy && data != nil {
var port string
switch ptype {
case proxyTx:
port, err = getPort(src.RemoteAddr().String())
if err != nil {
panic("")
}

case proxyRx:
port, err = getPort(dst.RemoteAddr().String())
if err != nil {
panic("")
}
default:
panic("unknown proxy type")
}

s.connectionMapMu.RLock()
if peerPort, ok := s.connectionMap[port]; ok {

s.blackholeMapMu.RLock()
if _, ok := s.blackholeMap[peerPort]; ok {
data = nil
}
s.blackholeMapMu.RUnlock()
} else {
// we might have non-peer messages on the network
}
s.connectionMapMu.RUnlock()
}

nr2 := len(data)
switch ptype {
case proxyTx:
Expand Down Expand Up @@ -867,6 +1032,21 @@ func (s *server) BlackholeTx() {
zap.String("from", s.From()),
zap.String("to", s.To()),
)

s.blackholeMapMu.Lock()

port, err := getPort(s.listener.Addr().String())
if err != nil {
panic("")
}
portInt, err := strconv.Atoi(port)
if err != nil {
panic("")
}
port = strconv.Itoa((portInt - 2))
s.blackholeMap[port] = true

s.blackholeMapMu.Unlock()
}

func (s *server) UnblackholeTx() {
Expand All @@ -876,6 +1056,19 @@ func (s *server) UnblackholeTx() {
zap.String("from", s.From()),
zap.String("to", s.To()),
)

s.blackholeMapMu.Lock()
port, err := getPort(s.listener.Addr().String())
if err != nil {
panic("")
}
portInt, err := strconv.Atoi(port)
if err != nil {
panic("")
}
port = strconv.Itoa((portInt - 2))
delete(s.blackholeMap, port)
s.blackholeMapMu.Unlock()
}

func (s *server) BlackholeRx() {
Expand Down Expand Up @@ -972,7 +1165,11 @@ func (s *server) ResetListener() error {
var ln net.Listener
var err error
if !s.tlsInfo.Empty() {
ln, err = transport.NewListener(s.from.Host, s.from.Scheme, &s.tlsInfo)
if s.isSSLTerminatingProxy {
ln, err = transport.NewListener(s.from.Host, "https", &s.tlsInfo)
} else {
ln, err = transport.NewListener(s.from.Host, s.from.Scheme, &s.tlsInfo)
}
} else {
ln, err = net.Listen(s.from.Scheme, s.from.Host)
}
Expand Down
Loading

0 comments on commit acf10cf

Please sign in to comment.