Skip to content

Commit

Permalink
Fixing performance issues and out-of-order packets (#916)
Browse files Browse the repository at this point in the history
### Reducing CPU context switching and number of goroutines. 
Packet capture and packet processing now use only two goroutines which helps to minimize CPU context switches. Spawning too many goroutines is harmful here. 

### Optimized packet capture - allocated memory only when required, and only for data which is used
Using ZeroCopy methods from libpcap library to avoid unnecessary allocations. Now memory gets allocated ONLY for the valid packets, and only for the packets which have the data. E.g. no SYN/FIN packets are used now. Additionally we now use `sync.Pool` for re-using packet objects, which helps to re-use already allocated memory. 

### Simplification and optimization of request/response detection
There is no SYN/FIN packets anymore etc. Now only packet payload is used to detect start and end of the packet. More over payload detection now does not require generating a total “message” buffer, and works with individual packet payloads. 

Message payloads now concatenated from packets only in the end when message is dispatched. Also, before checking if message is complete, added additional check if all received packets in the valid order, e.g. if their SEQ is valid, and no packets are missing. 

Reworked chunked encoding validation, and now it does not need expensive operation of re-calculating all the chunks. Now it “trust” that client gives valid chunk body, check if packets are in the right order (e.g. SEQ match), and checks if message ends with the right suffix. All is done with 0 allocations. 

Parsing all Headers using `proto.GetHeaders` was proved to be very slow. Now we only parse the headers we need(and do it only once).

Packets gets matched together using ACK, which on high RPS removed chances of duplicating IDs. Additionally, even if packets are received out of order, now it will properly sort them, before dispatching the message.

### Changes in ID generation algorithm
Message ID generation and relations between request and response IDs is fully rewritten. Responses now do not have to lookup for request data in order to get the same ID. ID no rely on the fact that SEQ of the first packet of the response should be the same as ACK of the request. If previously Message ID contained random values, like current timestamp, now it has a consistent algorithm which is based on TCP stream id (SrcPort + DstPort + SrcIP/DstIP) and current ACK/SEQ number (to distinguish multiple messages within the same stream).

### BPF filter optimizations
When tracking response it now uses a more accurate BPF rule to filter only needed traffic. 

### Misc
The packet code is now fully moved to tcp/Packet, so packet processing done only once in one place.

TCP output now has a 5 second timeout, and has a proper Close method.

Fully switching to go modules and removing vendoring.
  • Loading branch information
Urban Ishimwe authored May 19, 2021
1 parent 8f14d5b commit e74e945
Show file tree
Hide file tree
Showing 1,184 changed files with 858 additions and 370,225 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ profile_test:

# Used mainly for debugging, because docker container do not have access to parent machine ports
run:
$(RUN) go run $(LDFLAGS) $(SOURCE) --input-dummy=0 --output-http="http://localhost:9000" --input-raw-track-response --input-raw 127.0.0.1:9000 --verbose --debug --middleware "./examples/middleware/echo.sh" --output-file requests.gor
$(RUN) go run $(LDFLAGS) $(SOURCE) --input-dummy=0 --output-http="http://localhost:9000" --input-raw-track-response --input-raw 127.0.0.1:9000 --verbose 0 --middleware "./examples/middleware/echo.sh" --output-file requests.gor

run-2:
$(RUN) go run $(LDFLAGS) $(SOURCE) --input-raw :8000 --input-raw-bpf-filter "dst port 8000" --output-stdout --output-http "http://localhost:8000" --input-dummy=0
Expand Down
95 changes: 64 additions & 31 deletions capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ import (
"time"

"github.com/buger/goreplay/size"
"github.com/buger/goreplay/tcp"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"golang.org/x/sys/unix"
)

// Handler is a function that is used to handle packets
type Handler func(*Packet)
// PacketHandler is a function that is used to handle packets
type PacketHandler func(*tcp.Packet)

// PcapOptions options that can be set on a pcap capture handle,
// these options take effect on inactive pcap handles
Expand All @@ -39,7 +41,7 @@ type Listener struct {
sync.Mutex
Transport string // transport layer default to tcp
Activate func() error // function is used to activate the engine. it must be called before reading packets
Handles map[string]gopacket.PacketDataSource
Handles map[string]gopacket.ZeroCopyPacketDataSource
Interfaces []net.Interface
loopIndex int
Reading chan bool // this channel is closed when the listener has started reading packets
Expand Down Expand Up @@ -105,7 +107,7 @@ func NewListener(host string, port uint16, transport string, engine EngineType,
if transport != "" {
l.Transport = transport
}
l.Handles = make(map[string]gopacket.PacketDataSource)
l.Handles = make(map[string]gopacket.ZeroCopyPacketDataSource)
l.trackResponse = trackResponse
l.closeDone = make(chan struct{})
l.quit = make(chan struct{})
Expand Down Expand Up @@ -138,7 +140,7 @@ func (l *Listener) SetPcapOptions(opts PcapOptions) {
// Listen listens for packets from the handles, and call handler on every packet received
// until the context done signal is sent or there is unrecoverable error on all handles.
// this function must be called after activating pcap handles
func (l *Listener) Listen(ctx context.Context, handler Handler) (err error) {
func (l *Listener) Listen(ctx context.Context, handler PacketHandler) (err error) {
l.read(handler)
done := ctx.Done()
select {
Expand All @@ -152,7 +154,7 @@ func (l *Listener) Listen(ctx context.Context, handler Handler) (err error) {
}

// ListenBackground is like listen but can run concurrently and signal error through channel
func (l *Listener) ListenBackground(ctx context.Context, handler Handler) chan error {
func (l *Listener) ListenBackground(ctx context.Context, handler PacketHandler) chan error {
err := make(chan error, 1)
go func() {
defer close(err)
Expand All @@ -172,34 +174,36 @@ func (l *Listener) Filter(ifi net.Interface) (filter string) {
if l.port != 0 {
port = fmt.Sprintf("port %d", l.port)
}
dir := " dst " // direction
filter = fmt.Sprintf("%s dst %s", l.Transport, port)
if l.trackResponse {
dir = " "
filter = fmt.Sprintf("%s %s", l.Transport, port)
}
filter = fmt.Sprintf("(%s%s%s)", l.Transport, dir, port)

if listenAll(l.host) || isDevice(l.host, ifi) {
return
return "(" + filter + ")"
}
filter = fmt.Sprintf("(%s%s%s and host %s)", l.Transport, dir, port, l.host)
filter = fmt.Sprintf("(host %s and (%s))", l.host, filter)

log.Println("BPF filter: " + filter)
return
}

// PcapDumpHandler returns a handler to write packet data in PCAP
// format, See http://wiki.wireshark.org/Development/LibpcapFileFormathandler.
// if link layer is invalid Ethernet is assumed
func PcapDumpHandler(file *os.File, link layers.LinkType) (handler func(packet *Packet) error, err error) {
if link.String() == "" {
link = layers.LinkTypeEthernet
}
w := NewWriterNanos(file)
err = w.WriteFileHeader(64<<10, link)
if err != nil {
return nil, err
}
return func(packet *Packet) error {
return w.WritePacket(*packet.Info, packet.Data)
}, nil
}
// func PcapDumpHandler(file *os.File, link layers.LinkType) (handler func(packet *tcp.Packet) error, err error) {
// if link.String() == "" {
// link = layers.LinkTypeEthernet
// }
// w := NewWriterNanos(file)
// err = w.WriteFileHeader(64<<10, link)
// if err != nil {
// return nil, err
// }
// return func(packet *tcp.Packet) error {
// return w.WritePacket(*packet.Info, packet.Data)
// }, nil
// }

// PcapHandle returns new pcap Handle from dev on success.
// this function should be called after setting all necessary options for this listener
Expand Down Expand Up @@ -262,6 +266,7 @@ func (l *Listener) PcapHandle(ifi net.Interface) (handle *pcap.Handle, err error
} else {
l.BPFFilter = l.Filter(ifi)
}
fmt.Println("Interface:", ifi.Name, ". BPF Filter:", l.BPFFilter)
err = handle.SetBPFFilter(l.BPFFilter)
if err != nil {
handle.Close()
Expand All @@ -286,6 +291,7 @@ func (l *Listener) SocketHandle(ifi net.Interface) (handle Socket, err error) {
} else {
l.BPFFilter = l.Filter(ifi)
}
fmt.Println("BPF Filter: ", l.BPFFilter)
if err = handle.SetBPFFilter(l.BPFFilter); err != nil {
handle.Close()
return nil, fmt.Errorf("BPF filter error: %q%s, interface: %q", err, l.BPFFilter, ifi.Name)
Expand All @@ -294,11 +300,11 @@ func (l *Listener) SocketHandle(ifi net.Interface) (handle Socket, err error) {
return
}

func (l *Listener) read(handler Handler) {
func (l *Listener) read(handler PacketHandler) {
l.Lock()
defer l.Unlock()
for key, handle := range l.Handles {
go func(key string, hndl gopacket.PacketDataSource) {
go func(key string, hndl gopacket.ZeroCopyPacketDataSource) {
defer l.closeHandles(key)
linkSize := 14
linkType := int(layers.LinkTypeEthernet)
Expand All @@ -312,14 +318,18 @@ func (l *Listener) read(handler Handler) {
return // can't find the linktype size
}
}

for {
select {
case <-l.quit:
return
default:
data, ci, err := hndl.ReadPacketData()
data, ci, err := hndl.ZeroCopyReadPacketData()
if err == nil {
handler(NewPacket(data, linkType, linkSize, &ci))
pckt, err := tcp.ParsePacket(data, linkType, linkSize, &ci)
if err == nil {
handler(pckt)
}
continue
}
if enext, ok := err.(pcap.NextError); ok && enext == pcap.NextErrorTimeoutExpired {
Expand All @@ -332,11 +342,11 @@ func (l *Listener) read(handler Handler) {
continue
}
if err == io.EOF || err == io.ErrClosedPipe {
return
}
if os.Getenv("GORDEBUG") != "0" {
log.Printf("stopped reading from %s interface with error %s\n", key, err)
return
}

log.Printf("stopped reading from %s interface with error %s\n", key, err)
return
}
}
Expand Down Expand Up @@ -478,3 +488,26 @@ func listenAll(addr string) bool {
}
return false
}

func pcapLinkTypeLength(lType int) (int, bool) {
switch layers.LinkType(lType) {
case layers.LinkTypeEthernet:
return 14, true
case layers.LinkTypeNull, layers.LinkTypeLoop:
return 4, true
case layers.LinkTypeRaw, 12, 14:
return 0, true
case layers.LinkTypeIPv4, layers.LinkTypeIPv6:
// (TODO:) look out for IP encapsulation?
return 0, true
case layers.LinkTypeLinuxSLL:
return 16, true
case layers.LinkTypeFDDI:
return 13, true
case 226 /*DLT_IPNET*/ :
// https://www.tcpdump.org/linktypes/LINKTYPE_IPNET.html
return 24, true
default:
return 0, false
}
}
40 changes: 20 additions & 20 deletions capture/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,25 +133,25 @@ func TestPcapHandler(t *testing.T) {
}
}

func TestSocketHandler(t *testing.T) {
l, err := NewListener(LoopBack.Name, 8000, "", EngineRawSocket, true)
err = l.Activate()
if err != nil {
return
}
defer l.Handles[LoopBack.Name].(*SockRaw).Close()
if err != nil {
t.Errorf("expected error to be nil, got %v", err)
return
}
for i := 0; i < 5; i++ {
_, _ = net.Dial("tcp", "127.0.0.1:8000")
}
sts, _ := l.Handles[LoopBack.Name].(*SockRaw).Stats()
if sts.Packets < 5 {
t.Errorf("expected >=5 packets got %d", sts.Packets)
}
}
// func TestSocketHandler(t *testing.T) {
// l, err := NewListener(LoopBack.Name, 8000, "", EngineRawSocket, true)
// err = l.Activate()
// if err != nil {
// return
// }
// defer l.Handles[LoopBack.Name].(*SockRaw).Close()
// if err != nil {
// t.Errorf("expected error to be nil, got %v", err)
// return
// }
// for i := 0; i < 5; i++ {
// _, _ = net.Dial("tcp", "127.0.0.1:8000")
// }
// sts, _ := l.Handles[LoopBack.Name].(*SockRaw).Stats()
// if sts.Packets < 5 {
// t.Errorf("expected >=5 packets got %d", sts.Packets)
// }
// }

func BenchmarkPcapDump(b *testing.B) {
f, err := ioutil.TempFile("", "pcap_file")
Expand Down Expand Up @@ -220,7 +220,7 @@ func init() {
}
}

func handler(n, counter *int32) Handler {
func handler(n, counter *int32) PacketHandler {
return func(p *Packet) {
nn := int32(len(p.Data))
atomic.AddInt32(n, nn)
Expand Down
Loading

0 comments on commit e74e945

Please sign in to comment.