Skip to content

Commit cd14b62

Browse files
author
Alfonso Acosta
committed
Cleanup
* Remove XML traces * Improve performance * Fix tests
1 parent f19889f commit cd14b62

File tree

3 files changed

+294
-220
lines changed

3 files changed

+294
-220
lines changed

probe/endpoint/conntrack.go

Lines changed: 77 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@ package endpoint
22

33
import (
44
"bufio"
5-
"encoding/xml"
5+
"bytes"
66
"fmt"
77
"io"
88
"os"
99
"path/filepath"
1010
"strconv"
11-
"strings"
1211
"sync"
1312
"time"
1413

@@ -20,48 +19,45 @@ import (
2019
const (
2120
// From https://www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt
2221
// Check a tcp-related file for existence since we need tcp tracking
23-
procFileToCheck = "sys/net/netfilter/nf_conntrack_tcp_timeout_close"
24-
xmlHeader = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n"
25-
conntrackOpenTag = "<conntrack>\n"
26-
timeWait = "TIME_WAIT"
27-
tcpProto = "tcp"
28-
newType = "new"
29-
updateType = "update"
30-
destroyType = "destroy"
22+
procFileToCheck = "sys/net/netfilter/nf_conntrack_tcp_timeout_close"
23+
timeWait = "TIME_WAIT"
24+
tcpProto = "tcp"
25+
newType = "[NEW]"
26+
updateType = "[UPDATE]"
27+
destroyType = "[DESTROY]"
28+
)
29+
30+
var (
31+
destroyTypeB = []byte(destroyType)
32+
assured = []byte("[ASSURED] ")
33+
unreplied = []byte("[UNREPLIED] ")
3134
)
3235

3336
type layer3 struct {
34-
XMLName xml.Name `xml:"layer3"`
35-
SrcIP string `xml:"src"`
36-
DstIP string `xml:"dst"`
37+
SrcIP string
38+
DstIP string
3739
}
3840

3941
type layer4 struct {
40-
XMLName xml.Name `xml:"layer4"`
41-
SrcPort int `xml:"sport"`
42-
DstPort int `xml:"dport"`
43-
Proto string `xml:"protoname,attr"`
42+
SrcPort int
43+
DstPort int
44+
Proto string
4445
}
4546

4647
type meta struct {
47-
XMLName xml.Name `xml:"meta"`
48-
Direction string `xml:"direction,attr"`
49-
Layer3 layer3 `xml:"layer3"`
50-
Layer4 layer4 `xml:"layer4"`
51-
ID int64 `xml:"id"`
52-
State string `xml:"state"`
48+
Layer3 layer3
49+
Layer4 layer4
50+
ID int64
51+
State string
5352
}
5453

5554
type flow struct {
56-
XMLName xml.Name `xml:"flow"`
57-
Type string `xml:"type,attr"`
58-
59-
Original, Reply, Independent meta `xml:"-"`
55+
Type string
56+
Original, Reply, Independent meta
6057
}
6158

6259
type conntrack struct {
63-
XMLName xml.Name `xml:"conntrack"`
64-
Flows []flow `xml:"flow"`
60+
Flows []flow
6561
}
6662

6763
// flowWalker is something that maintains flows, and provides an accessor
@@ -205,12 +201,12 @@ func (c *conntrackWalker) run() {
205201
c.cmd = cmd
206202
c.Unlock()
207203

208-
reader := bufio.NewReader(stdout)
204+
scanner := bufio.NewScanner(bufio.NewReader(stdout))
209205
defer log.Infof("conntrack exiting")
210206

211207
// Lop on the output stream
212208
for {
213-
f, err := decodeStreamedFlow(reader)
209+
f, err := decodeStreamedFlow(scanner)
214210
if err != nil {
215211
log.Errorf("conntrack error: %v", err)
216212
return
@@ -219,24 +215,40 @@ func (c *conntrackWalker) run() {
219215
}
220216
}
221217

222-
func getUntaggedLine(reader *bufio.Reader) (string, error) {
223-
// TODO: read bytes?
224-
line, err := reader.ReadString('\n')
225-
if err != nil {
226-
return "", err
218+
// Get a line without [ASSURED]/[UNREPLIED] tags (it simplifies parsing)
219+
func getUntaggedLine(scanner *bufio.Scanner) ([]byte, error) {
220+
success := scanner.Scan()
221+
if !success {
222+
if err := scanner.Err(); err != nil {
223+
return nil, err
224+
}
225+
return nil, io.EOF
227226
}
228-
// Remove [ASSURED]/[UNREPLIED] tags inplace
229-
// TODO: replace in-place?
230-
line = strings.Replace(line, "[ASSURED] ", "", -1)
231-
line = strings.Replace(line, "[UNREPLIED] ", "", -1)
227+
line := scanner.Bytes()
228+
// Remove [ASSURED]/[UNREPLIED] tags
229+
line = removeInplace(line, assured)
230+
line = removeInplace(line, unreplied)
232231
return line, nil
233232
}
234233

235-
func decodeStreamedFlow(reader *bufio.Reader) (flow, error) {
234+
func removeInplace(s, sep []byte) []byte {
235+
// TODO: See if we can get better performance
236+
// removing multiple substrings at once (with index/suffixarray New()+Lookup())
237+
// Probably not worth it for only two substrings occurring once.
238+
index := bytes.Index(s, sep)
239+
if index < 0 {
240+
return s
241+
}
242+
copy(s[index:], s[index+len(sep):])
243+
return s[:len(s)-len(sep)]
244+
}
245+
246+
func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) {
236247
var (
237-
// TODO: use []byte/int where possible?
238-
omit [4]string
239-
f flow
248+
// Use ints for parsing unused fields since their allocations
249+
// are almost for free
250+
unused [2]int
251+
f flow
240252
)
241253

242254
// Examples:
@@ -246,18 +258,18 @@ func decodeStreamedFlow(reader *bufio.Reader) (flow, error) {
246258
// " [DESTROY] tcp 6 src=172.17.0.1 dst=172.17.0.1 sport=34078 dport=53 src=172.17.0.1 dst=10.0.2.15 sport=53 dport=34078 id=3668417984" (note how the timeout and state field is missing)
247259

248260
// Remove tags since they are optional and make parsing harder
249-
line, err := getUntaggedLine(reader)
261+
line, err := getUntaggedLine(scanner)
250262
if err != nil {
251263
return flow{}, err
252264
}
253265

254-
// TODO: refactor and probably create a fully-fledged parser, this is just good enough for performance testing
255-
if strings.Contains(line, "[DESTROY]") {
266+
line = bytes.TrimLeft(line, " ")
267+
if bytes.HasPrefix(line, destroyTypeB) {
256268
// Destroy events don't have a timeout or state field
257-
_, err = fmt.Sscanf(line, "%s %s %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%x\n",
269+
_, err = fmt.Sscanf(string(line), "%s %s %d src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d",
258270
&f.Type,
259271
&f.Original.Layer4.Proto,
260-
&omit[0],
272+
&unused[0],
261273
&f.Original.Layer3.SrcIP,
262274
&f.Original.Layer3.DstIP,
263275
&f.Original.Layer4.SrcPort,
@@ -269,11 +281,11 @@ func decodeStreamedFlow(reader *bufio.Reader) (flow, error) {
269281
&f.Independent.ID,
270282
)
271283
} else {
272-
_, err = fmt.Sscanf(line, "%s %s %s %s %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%x\n",
284+
_, err = fmt.Sscanf(string(line), "%s %s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d",
273285
&f.Type,
274286
&f.Original.Layer4.Proto,
275-
&omit[0],
276-
&omit[1],
287+
&unused[0],
288+
&unused[1],
277289
&f.Independent.State,
278290
&f.Original.Layer3.SrcIP,
279291
&f.Original.Layer3.DstIP,
@@ -290,10 +302,6 @@ func decodeStreamedFlow(reader *bufio.Reader) (flow, error) {
290302
if err != nil {
291303
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
292304
}
293-
if len(f.Type) < 3 || f.Type[0] != '[' || f.Type[len(f.Type)-1] != ']' {
294-
return flow{}, fmt.Errorf("Unexpected type format: %q", f.Type)
295-
}
296-
f.Type = strings.ToLower(f.Type[1 : len(f.Type)-1])
297305
f.Reply.Layer4.Proto = f.Original.Layer4.Proto
298306
return f, nil
299307
}
@@ -314,10 +322,10 @@ func (c *conntrackWalker) existingConnections() ([]flow, error) {
314322
}
315323
}()
316324

317-
reader := bufio.NewReader(stdout)
325+
scanner := bufio.NewScanner(bufio.NewReader(stdout))
318326
var result []flow
319327
for {
320-
f, err := decodeDumpedFlow(reader)
328+
f, err := decodeDumpedFlow(scanner)
321329
if err != nil {
322330
if err == io.EOF {
323331
break
@@ -330,25 +338,26 @@ func (c *conntrackWalker) existingConnections() ([]flow, error) {
330338
return result, nil
331339
}
332340

333-
func decodeDumpedFlow(reader *bufio.Reader) (flow, error) {
341+
func decodeDumpedFlow(scanner *bufio.Scanner) (flow, error) {
334342
var (
335-
// TODO: use int/[]byte where possible?
336-
omit [4]string
337-
f flow
343+
// Use ints for parsing unused fields since allocations
344+
// are almost for free
345+
unused [4]int
346+
f flow
338347
)
339348

340349
// Example:
341350
// " tcp 6 431997 ESTABLISHED src=10.32.0.1 dst=10.32.0.1 sport=50274 dport=4040 src=10.32.0.1 dst=10.32.0.1 sport=4040 dport=50274 [ASSURED] mark=0 use=1 id=407401088c"
342351
// remove tags since they are optional and make parsing harder
343-
line, err := getUntaggedLine(reader)
352+
line, err := getUntaggedLine(scanner)
344353
if err != nil {
345354
return flow{}, err
346355
}
347356

348-
_, err = fmt.Sscanf(line, "%s %s %s %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d %s %s id=%x\n",
357+
_, err = fmt.Sscanf(string(line), "%s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d mark=%d use=%d id=%d",
349358
&f.Original.Layer4.Proto,
350-
&omit[0],
351-
&omit[1],
359+
&unused[0],
360+
&unused[1],
352361
&f.Independent.State,
353362
&f.Original.Layer3.SrcIP,
354363
&f.Original.Layer3.DstIP,
@@ -358,8 +367,8 @@ func decodeDumpedFlow(reader *bufio.Reader) (flow, error) {
358367
&f.Reply.Layer3.DstIP,
359368
&f.Reply.Layer4.SrcPort,
360369
&f.Reply.Layer4.DstPort,
361-
&omit[2],
362-
&omit[3],
370+
&unused[2],
371+
&unused[3],
363372
&f.Independent.ID,
364373
)
365374

0 commit comments

Comments
 (0)