Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing performance issues and out-of-order packets #916

Merged
merged 11 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ profile_test:

# Used mainly for debugging, because docker container do not have access to parent machine ports
run:
$(RUN) go run $(LDFLAGS) $(SOURCE) --input-dummy=0 --output-http="http://localhost:9000" --input-raw-track-response --input-raw 127.0.0.1:9000 --verbose --debug --middleware "./examples/middleware/echo.sh" --output-file requests.gor
$(RUN) go run $(LDFLAGS) $(SOURCE) --input-dummy=0 --output-http="http://localhost:9000" --input-raw-track-response --input-raw 127.0.0.1:9000 --verbose 0 --middleware "./examples/middleware/echo.sh" --output-file requests.gor

run-2:
$(RUN) go run $(LDFLAGS) $(SOURCE) --input-raw :8000 --input-raw-bpf-filter "dst port 8000" --output-stdout --output-http "http://localhost:8000" --input-dummy=0
Expand Down
104 changes: 73 additions & 31 deletions capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ import (
"time"

"github.com/buger/goreplay/size"
"github.com/buger/goreplay/tcp"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"golang.org/x/sys/unix"
)

// Handler is a function that is used to handle packets
type Handler func(*Packet)
// PacketHandler is a function that is used to handle packets
type PacketHandler func(*tcp.Packet)

// PcapOptions options that can be set on a pcap capture handle,
// these options take effect on inactive pcap handles
Expand All @@ -39,7 +41,7 @@ type Listener struct {
sync.Mutex
Transport string // transport layer default to tcp
Activate func() error // function is used to activate the engine. it must be called before reading packets
Handles map[string]gopacket.PacketDataSource
Handles map[string]gopacket.ZeroCopyPacketDataSource
Interfaces []net.Interface
loopIndex int
Reading chan bool // this channel is closed when the listener has started reading packets
Expand Down Expand Up @@ -105,7 +107,7 @@ func NewListener(host string, port uint16, transport string, engine EngineType,
if transport != "" {
l.Transport = transport
}
l.Handles = make(map[string]gopacket.PacketDataSource)
l.Handles = make(map[string]gopacket.ZeroCopyPacketDataSource)
l.trackResponse = trackResponse
l.closeDone = make(chan struct{})
l.quit = make(chan struct{})
Expand Down Expand Up @@ -138,7 +140,7 @@ func (l *Listener) SetPcapOptions(opts PcapOptions) {
// Listen listens for packets from the handles, and call handler on every packet received
// until the context done signal is sent or there is unrecoverable error on all handles.
// this function must be called after activating pcap handles
func (l *Listener) Listen(ctx context.Context, handler Handler) (err error) {
func (l *Listener) Listen(ctx context.Context, handler PacketHandler) (err error) {
l.read(handler)
done := ctx.Done()
select {
Expand All @@ -152,7 +154,7 @@ func (l *Listener) Listen(ctx context.Context, handler Handler) (err error) {
}

// ListenBackground is like listen but can run concurrently and signal error through channel
func (l *Listener) ListenBackground(ctx context.Context, handler Handler) chan error {
func (l *Listener) ListenBackground(ctx context.Context, handler PacketHandler) chan error {
err := make(chan error, 1)
go func() {
defer close(err)
Expand All @@ -172,34 +174,36 @@ func (l *Listener) Filter(ifi net.Interface) (filter string) {
if l.port != 0 {
port = fmt.Sprintf("port %d", l.port)
}
dir := " dst " // direction
filter = fmt.Sprintf("%s dst %s", l.Transport, port)
if l.trackResponse {
dir = " "
filter = fmt.Sprintf("%s %s", l.Transport, port)
}
filter = fmt.Sprintf("(%s%s%s)", l.Transport, dir, port)

if listenAll(l.host) || isDevice(l.host, ifi) {
return
return "(" + filter + ")"
}
filter = fmt.Sprintf("(%s%s%s and host %s)", l.Transport, dir, port, l.host)
filter = fmt.Sprintf("(host %s and (%s))", l.host, filter)

log.Println("BPF filter: " + filter)
return
}

// PcapDumpHandler returns a handler to write packet data in PCAP
// format, See http://wiki.wireshark.org/Development/LibpcapFileFormathandler.
// if link layer is invalid Ethernet is assumed
func PcapDumpHandler(file *os.File, link layers.LinkType) (handler func(packet *Packet) error, err error) {
if link.String() == "" {
link = layers.LinkTypeEthernet
}
w := NewWriterNanos(file)
err = w.WriteFileHeader(64<<10, link)
if err != nil {
return nil, err
}
return func(packet *Packet) error {
return w.WritePacket(*packet.Info, packet.Data)
}, nil
}
// func PcapDumpHandler(file *os.File, link layers.LinkType) (handler func(packet *tcp.Packet) error, err error) {
// if link.String() == "" {
// link = layers.LinkTypeEthernet
// }
// w := NewWriterNanos(file)
// err = w.WriteFileHeader(64<<10, link)
// if err != nil {
// return nil, err
// }
// return func(packet *tcp.Packet) error {
// return w.WritePacket(*packet.Info, packet.Data)
// }, nil
// }

// PcapHandle returns new pcap Handle from dev on success.
// this function should be called after setting all necessary options for this listener
Expand Down Expand Up @@ -262,6 +266,7 @@ func (l *Listener) PcapHandle(ifi net.Interface) (handle *pcap.Handle, err error
} else {
l.BPFFilter = l.Filter(ifi)
}
fmt.Println("Interface:", ifi.Name, ". BPF Filter:", l.BPFFilter)
err = handle.SetBPFFilter(l.BPFFilter)
if err != nil {
handle.Close()
Expand All @@ -286,6 +291,7 @@ func (l *Listener) SocketHandle(ifi net.Interface) (handle Socket, err error) {
} else {
l.BPFFilter = l.Filter(ifi)
}
fmt.Println("BPF Filter: ", l.BPFFilter)
if err = handle.SetBPFFilter(l.BPFFilter); err != nil {
handle.Close()
return nil, fmt.Errorf("BPF filter error: %q%s, interface: %q", err, l.BPFFilter, ifi.Name)
Expand All @@ -294,11 +300,11 @@ func (l *Listener) SocketHandle(ifi net.Interface) (handle Socket, err error) {
return
}

func (l *Listener) read(handler Handler) {
func (l *Listener) read(handler PacketHandler) {
l.Lock()
defer l.Unlock()
for key, handle := range l.Handles {
go func(key string, hndl gopacket.PacketDataSource) {
go func(key string, hndl gopacket.ZeroCopyPacketDataSource) {
defer l.closeHandles(key)
linkSize := 14
linkType := int(layers.LinkTypeEthernet)
Expand All @@ -312,14 +318,27 @@ func (l *Listener) read(handler Handler) {
return // can't find the linktype size
}
}

ticker := time.NewTicker(time.Second)

for {
select {
case <-l.quit:
return
case <-ticker.C:
usedMemory, totalMemory := memUsage()
if 100*float64(usedMemory)/float64(totalMemory) >= 95 {
fmt.Println("Using more memory then allowed. Enabling throttling.")
time.Sleep(time.Second)
}
continue
default:
data, ci, err := hndl.ReadPacketData()
data, ci, err := hndl.ZeroCopyReadPacketData()
if err == nil {
handler(NewPacket(data, linkType, linkSize, &ci))
pckt, err := tcp.ParsePacket(data, linkType, linkSize, &ci)
if err == nil {
handler(pckt)
}
continue
}
if enext, ok := err.(pcap.NextError); ok && enext == pcap.NextErrorTimeoutExpired {
Expand All @@ -332,11 +351,11 @@ func (l *Listener) read(handler Handler) {
continue
}
if err == io.EOF || err == io.ErrClosedPipe {
return
}
if os.Getenv("GORDEBUG") != "0" {
log.Printf("stopped reading from %s interface with error %s\n", key, err)
return
}

log.Printf("stopped reading from %s interface with error %s\n", key, err)
return
}
}
Expand Down Expand Up @@ -478,3 +497,26 @@ func listenAll(addr string) bool {
}
return false
}

func pcapLinkTypeLength(lType int) (int, bool) {
switch layers.LinkType(lType) {
case layers.LinkTypeEthernet:
return 14, true
case layers.LinkTypeNull, layers.LinkTypeLoop:
return 4, true
case layers.LinkTypeRaw, 12, 14:
return 0, true
case layers.LinkTypeIPv4, layers.LinkTypeIPv6:
// (TODO:) look out for IP encapsulation?
return 0, true
case layers.LinkTypeLinuxSLL:
return 16, true
case layers.LinkTypeFDDI:
return 13, true
case 226 /*DLT_IPNET*/ :
// https://www.tcpdump.org/linktypes/LINKTYPE_IPNET.html
return 24, true
default:
return 0, false
}
}
40 changes: 20 additions & 20 deletions capture/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,25 +133,25 @@ func TestPcapHandler(t *testing.T) {
}
}

func TestSocketHandler(t *testing.T) {
l, err := NewListener(LoopBack.Name, 8000, "", EngineRawSocket, true)
err = l.Activate()
if err != nil {
return
}
defer l.Handles[LoopBack.Name].(*SockRaw).Close()
if err != nil {
t.Errorf("expected error to be nil, got %v", err)
return
}
for i := 0; i < 5; i++ {
_, _ = net.Dial("tcp", "127.0.0.1:8000")
}
sts, _ := l.Handles[LoopBack.Name].(*SockRaw).Stats()
if sts.Packets < 5 {
t.Errorf("expected >=5 packets got %d", sts.Packets)
}
}
// func TestSocketHandler(t *testing.T) {
// l, err := NewListener(LoopBack.Name, 8000, "", EngineRawSocket, true)
// err = l.Activate()
// if err != nil {
// return
// }
// defer l.Handles[LoopBack.Name].(*SockRaw).Close()
// if err != nil {
// t.Errorf("expected error to be nil, got %v", err)
// return
// }
// for i := 0; i < 5; i++ {
// _, _ = net.Dial("tcp", "127.0.0.1:8000")
// }
// sts, _ := l.Handles[LoopBack.Name].(*SockRaw).Stats()
// if sts.Packets < 5 {
// t.Errorf("expected >=5 packets got %d", sts.Packets)
// }
// }

func BenchmarkPcapDump(b *testing.B) {
f, err := ioutil.TempFile("", "pcap_file")
Expand Down Expand Up @@ -220,7 +220,7 @@ func init() {
}
}

func handler(n, counter *int32) Handler {
func handler(n, counter *int32) PacketHandler {
return func(p *Packet) {
nn := int32(len(p.Data))
atomic.AddInt32(n, nn)
Expand Down
Loading