Skip to content

Commit 8c1e139

Browse files
author
Alfonso Acosta
committed
[WIP] Diable XML in conntrack parsing
Not working yet
1 parent 8e78c0c commit 8c1e139

File tree

1 file changed

+100
-47
lines changed

1 file changed

+100
-47
lines changed

probe/endpoint/conntrack.go

Lines changed: 100 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package endpoint
33
import (
44
"bufio"
55
"encoding/xml"
6+
"fmt"
67
"io"
78
"os"
89
"path/filepath"
@@ -165,7 +166,7 @@ func (c *conntrackWalker) run() {
165166

166167
args := append([]string{
167168
"--buffer-size", strconv.Itoa(c.bufferSize), "-E",
168-
"-o", "xml", "-p", "tcp"}, c.args...,
169+
"-o", "id", "-p", "tcp"}, c.args...,
169170
)
170171
cmd := exec.Command("conntrack", args...)
171172
stdout, err := cmd.StdoutPipe()
@@ -204,39 +205,66 @@ func (c *conntrackWalker) run() {
204205
c.cmd = cmd
205206
c.Unlock()
206207

207-
// Swallow the first two lines
208208
reader := bufio.NewReader(stdout)
209-
if line, err := reader.ReadString('\n'); err != nil {
210-
log.Errorf("conntrack error: %v", err)
211-
return
212-
} else if line != xmlHeader {
213-
log.Errorf("conntrack invalid output: '%s'", line)
214-
return
215-
}
216-
if line, err := reader.ReadString('\n'); err != nil {
217-
log.Errorf("conntrack error: %v", err)
218-
return
219-
} else if line != conntrackOpenTag {
220-
log.Errorf("conntrack invalid output: '%s'", line)
221-
return
222-
}
209+
defer log.Infof("conntrack exiting")
223210

224-
defer log.Infof("contrack exiting")
225-
226-
// Now loop on the output stream
227-
decoder := xml.NewDecoder(reader)
211+
// Lop on the output stream
228212
for {
229-
var f flow
230-
if err := decoder.Decode(&f); err != nil {
213+
f, err := decodeStreamedFlow(reader)
214+
if err != nil {
231215
log.Errorf("conntrack error: %v", err)
232216
return
233217
}
234218
c.handleFlow(f, false)
235219
}
236220
}
237221

222+
func makeEmptyFlow() flow {
223+
var f flow
224+
metas := make([]meta, 3)
225+
f.Metas = metas
226+
// TODO: do we really need the direction/protocol type when not using XML?
227+
f.Original = &metas[0]
228+
f.Reply = &metas[1]
229+
f.Independent = &metas[2]
230+
return f
231+
}
232+
233+
func decodeStreamedFlow(reader *bufio.Reader) (flow, error) {
234+
var (
235+
// TODO: use ints where possible?
236+
omit [10]string
237+
f = makeEmptyFlow()
238+
)
239+
l, _ := reader.ReadString('\n')
240+
// " [NEW] tcp 6 120 SYN_SENT src=127.0.0.1 dst=127.0.0.1 sport=58958 dport=6784 [UNREPLIED] src=127.0.0.1 dst=127.0.0.1 sport=6784 dport=58958 id=1595499776\n"
241+
n, err := fmt.Sscanf(l, " %s tcp %s %s %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d %s id=%x\n",
242+
&omit[0],
243+
&omit[1],
244+
&omit[2],
245+
&f.Independent.State,
246+
&f.Original.Layer3.SrcIP,
247+
&f.Original.Layer3.DstIP,
248+
&f.Original.Layer4.SrcPort,
249+
&f.Original.Layer4.DstPort,
250+
&f.Reply.Layer3.SrcIP,
251+
&f.Reply.Layer3.DstIP,
252+
&f.Reply.Layer4.SrcPort,
253+
&f.Reply.Layer4.DstPort,
254+
&omit[3],
255+
&f.Independent.ID,
256+
)
257+
258+
if err != nil {
259+
log.Infof("Streamed Error: %#v, n=%d, line = %#q", err, n, l)
260+
return flow{}, err
261+
}
262+
log.Infof("Streamed flow: %v", f)
263+
return f, nil
264+
}
265+
238266
func (c *conntrackWalker) existingConnections() ([]flow, error) {
239-
args := append([]string{"-L", "-o", "xml", "-p", "tcp"}, c.args...)
267+
args := append([]string{"-L", "-o", "id", "-p", "tcp"}, c.args...)
240268
cmd := exec.Command("conntrack", args...)
241269
stdout, err := cmd.StdoutPipe()
242270
if err != nil {
@@ -250,13 +278,54 @@ func (c *conntrackWalker) existingConnections() ([]flow, error) {
250278
log.Errorf("conntrack existingConnections exit error: %v", err)
251279
}
252280
}()
253-
var result conntrack
254-
if err := xml.NewDecoder(stdout).Decode(&result); err == io.EOF {
255-
return []flow{}, nil
256-
} else if err != nil {
257-
return []flow{}, err
281+
282+
reader := bufio.NewReader(stdout)
283+
var result []flow
284+
for {
285+
f, err := readDumpedFlow(reader)
286+
if err != nil {
287+
if err == io.EOF {
288+
break
289+
}
290+
log.Errorf("conntrack error: %v", err)
291+
return result, err
292+
}
293+
result = append(result, f)
258294
}
259-
return result.Flows, nil
295+
return result, nil
296+
}
297+
298+
func readDumpedFlow(reader *bufio.Reader) (flow, error) {
299+
var (
300+
// TODO: use byteslices where possible?
301+
omit [10]string
302+
f = makeEmptyFlow()
303+
)
304+
n, err := fmt.Fscanf(reader, "%s %s %s %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d %s %s %s id=%x\n",
305+
&f.Original.Layer4.Proto,
306+
&omit[0],
307+
&omit[1],
308+
&f.Independent.State,
309+
&f.Original.Layer3.SrcIP,
310+
&f.Original.Layer3.DstIP,
311+
&f.Original.Layer4.SrcPort,
312+
&f.Original.Layer4.DstPort,
313+
&f.Reply.Layer3.SrcIP,
314+
&f.Reply.Layer3.DstIP,
315+
&f.Reply.Layer4.SrcPort,
316+
&f.Reply.Layer4.DstPort,
317+
&omit[2],
318+
&omit[3],
319+
&omit[4],
320+
&f.Independent.ID,
321+
)
322+
323+
if err != nil {
324+
return flow{}, err
325+
}
326+
327+
f.Reply.Layer4.Proto = f.Original.Layer4.Proto
328+
return f, nil
260329
}
261330

262331
func (c *conntrackWalker) stop() {
@@ -269,21 +338,8 @@ func (c *conntrackWalker) stop() {
269338
}
270339

271340
func (c *conntrackWalker) handleFlow(f flow, forceAdd bool) {
272-
// A flow consists of 3 'metas' - the 'original' 4 tuple (as seen by this
273-
// host) and the 'reply' 4 tuple, which is what it has been rewritten to.
274-
// This code finds those metas, which are identified by a Direction
275-
// attribute.
276-
for i := range f.Metas {
277-
meta := &f.Metas[i]
278-
switch meta.Direction {
279-
case "original":
280-
f.Original = meta
281-
case "reply":
282-
f.Reply = meta
283-
case "independent":
284-
f.Independent = meta
285-
}
286-
}
341+
c.Lock()
342+
defer c.Unlock()
287343

288344
// For not, I'm only interested in tcp connections - there is too much udp
289345
// traffic going on (every container talking to weave dns, for example) to
@@ -292,9 +348,6 @@ func (c *conntrackWalker) handleFlow(f flow, forceAdd bool) {
292348
return
293349
}
294350

295-
c.Lock()
296-
defer c.Unlock()
297-
298351
// Ignore flows for which we never saw an update; they are likely
299352
// incomplete or wrong. See #1462.
300353
switch {

0 commit comments

Comments
 (0)