Skip to content
This repository was archived by the owner on Apr 20, 2021. It is now read-only.

Commit f1e2b5d

Browse files
committed
probe: conntrack: fix output parsing
With net.netfilter.nf_conntrack_acct = 1, conntrack adds the following fields in the output: packets=3 bytes=164 And with SELinux (e.g. Fedora), conntrack adds: secctx=... The parsing with fmt.Sscanf introduced in weaveworks#2095 was unfortunately rejecting lines with those fields. This patch fixes that by adding more complicated parsing in decodeFlowKeyValues() with FieldsFunc and SplitN. Fixes weaveworks#2117 Regression from weaveworks#2095
1 parent 75ed573 commit f1e2b5d

File tree

2 files changed

+136
-40
lines changed

2 files changed

+136
-40
lines changed

probe/endpoint/conntrack.go

Lines changed: 75 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ import (
88
"io/ioutil"
99
"path/filepath"
1010
"strconv"
11+
"strings"
1112
"sync"
1213
"time"
14+
"unicode"
1315

1416
log "github.com/Sirupsen/logrus"
1517

@@ -250,6 +252,58 @@ func removeInplace(s, sep []byte) []byte {
250252
return s[:len(s)-len(sep)]
251253
}
252254

255+
// decodeFlowKeyValues parses the key-values from a conntrack line and updates the flow
256+
// It only considers the following key-values:
257+
// src=127.0.0.1 dst=127.0.0.1 sport=58958 dport=6784 src=127.0.0.1 dst=127.0.0.1 sport=6784 dport=58958 id=1595499776
258+
// Keys can be present twice, so the order is important.
259+
// Conntrack could add other key-values such as secctx=, packets=, bytes=. Those are ignored.
260+
func decodeFlowKeyValues(line []byte, f *flow) error {
261+
var err error
262+
for _, field := range strings.FieldsFunc(string(line), func(c rune) bool { return unicode.IsSpace(c) }) {
263+
kv := strings.SplitN(field, "=", 2)
264+
if len(kv) != 2 {
265+
continue
266+
}
267+
key := kv[0]
268+
value := kv[1]
269+
firstTupleSet := f.Original.Layer4.DstPort != 0
270+
switch {
271+
case key == "src":
272+
if !firstTupleSet {
273+
f.Original.Layer3.SrcIP = value
274+
} else {
275+
f.Reply.Layer3.SrcIP = value
276+
}
277+
278+
case key == "dst":
279+
if !firstTupleSet {
280+
f.Original.Layer3.DstIP = value
281+
} else {
282+
f.Reply.Layer3.DstIP = value
283+
}
284+
285+
case key == "sport":
286+
if !firstTupleSet {
287+
f.Original.Layer4.SrcPort, err = strconv.Atoi(value)
288+
} else {
289+
f.Reply.Layer4.SrcPort, err = strconv.Atoi(value)
290+
}
291+
292+
case key == "dport":
293+
if !firstTupleSet {
294+
f.Original.Layer4.DstPort, err = strconv.Atoi(value)
295+
} else {
296+
f.Reply.Layer4.DstPort, err = strconv.Atoi(value)
297+
}
298+
299+
case key == "id":
300+
f.Independent.ID, err = strconv.ParseInt(value, 10, 64)
301+
}
302+
}
303+
304+
return err
305+
}
306+
253307
func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) {
254308
var (
255309
// Use ints for parsing unused fields since their allocations
@@ -273,42 +327,29 @@ func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) {
273327
line = bytes.TrimLeft(line, " ")
274328
if bytes.HasPrefix(line, destroyTypeB) {
275329
// Destroy events don't have a timeout or state field
276-
_, err = fmt.Sscanf(string(line), "%s %s %d src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d",
330+
_, err = fmt.Sscanf(string(line), "%s %s %d",
277331
&f.Type,
278332
&f.Original.Layer4.Proto,
279333
&unused[0],
280-
&f.Original.Layer3.SrcIP,
281-
&f.Original.Layer3.DstIP,
282-
&f.Original.Layer4.SrcPort,
283-
&f.Original.Layer4.DstPort,
284-
&f.Reply.Layer3.SrcIP,
285-
&f.Reply.Layer3.DstIP,
286-
&f.Reply.Layer4.SrcPort,
287-
&f.Reply.Layer4.DstPort,
288-
&f.Independent.ID,
289334
)
290335
} else {
291-
_, err = fmt.Sscanf(string(line), "%s %s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d",
336+
_, err = fmt.Sscanf(string(line), "%s %s %d %d %s",
292337
&f.Type,
293338
&f.Original.Layer4.Proto,
294339
&unused[0],
295340
&unused[1],
296341
&f.Independent.State,
297-
&f.Original.Layer3.SrcIP,
298-
&f.Original.Layer3.DstIP,
299-
&f.Original.Layer4.SrcPort,
300-
&f.Original.Layer4.DstPort,
301-
&f.Reply.Layer3.SrcIP,
302-
&f.Reply.Layer3.DstIP,
303-
&f.Reply.Layer4.SrcPort,
304-
&f.Reply.Layer4.DstPort,
305-
&f.Independent.ID,
306342
)
307343
}
344+
if err != nil {
345+
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
346+
}
308347

348+
err = decodeFlowKeyValues(line, &f)
309349
if err != nil {
310350
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
311351
}
352+
312353
f.Reply.Layer4.Proto = f.Original.Layer4.Proto
313354
return f, nil
314355
}
@@ -353,32 +394,27 @@ func decodeDumpedFlow(scanner *bufio.Scanner) (flow, error) {
353394
f flow
354395
)
355396

356-
// Example:
357-
// " tcp 6 431997 ESTABLISHED src=10.32.0.1 dst=10.32.0.1 sport=50274 dport=4040 src=10.32.0.1 dst=10.32.0.1 sport=4040 dport=50274 [ASSURED] mark=0 use=1 id=407401088c"
397+
// Examples with different formats:
398+
// With SELinux, there is a "secctx="
399+
// After "sudo sysctl net.netfilter.nf_conntrack_acct=1", there is "packets=" and "bytes="
400+
//
401+
// "tcp 6 431997 ESTABLISHED src=10.32.0.1 dst=10.32.0.1 sport=50274 dport=4040 src=10.32.0.1 dst=10.32.0.1 sport=4040 dport=50274 [ASSURED] mark=0 use=1 id=407401088"
402+
// "tcp 6 431998 ESTABLISHED src=10.0.2.2 dst=10.0.2.15 sport=49911 dport=22 src=10.0.2.15 dst=10.0.2.2 sport=22 dport=49911 [ASSURED] mark=0 use=1 id=2993966208"
403+
// "tcp 6 108 ESTABLISHED src=172.17.0.5 dst=172.17.0.2 sport=47010 dport=80 src=172.17.0.2 dst=172.17.0.5 sport=80 dport=47010 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=4001098880"
404+
// "tcp 6 431970 ESTABLISHED src=192.168.35.116 dst=216.58.213.227 sport=49862 dport=443 packets=11 bytes=1337 src=216.58.213.227 dst=192.168.35.116 sport=443 dport=49862 packets=8 bytes=716 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=943643840"
405+
358406
// remove tags since they are optional and make parsing harder
359407
line, err := getUntaggedLine(scanner)
360408
if err != nil {
361409
return flow{}, err
362410
}
363411

364-
_, err = fmt.Sscanf(string(line), "%s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d mark=%d use=%d id=%d",
365-
&f.Original.Layer4.Proto,
366-
&unused[0],
367-
&unused[1],
368-
&f.Independent.State,
369-
&f.Original.Layer3.SrcIP,
370-
&f.Original.Layer3.DstIP,
371-
&f.Original.Layer4.SrcPort,
372-
&f.Original.Layer4.DstPort,
373-
&f.Reply.Layer3.SrcIP,
374-
&f.Reply.Layer3.DstIP,
375-
&f.Reply.Layer4.SrcPort,
376-
&f.Reply.Layer4.DstPort,
377-
&unused[2],
378-
&unused[3],
379-
&f.Independent.ID,
380-
)
412+
_, err = fmt.Sscanf(string(line), "%s %d %d %s", &f.Original.Layer4.Proto, &unused[0], &unused[1], &f.Independent.State)
413+
if err != nil {
414+
return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err)
415+
}
381416

417+
err = decodeFlowKeyValues(line, &f)
382418
if err != nil {
383419
return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err)
384420
}

probe/endpoint/conntrack_internal_test.go

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,11 @@ func TestStreamedFlowDecoding(t *testing.T) {
155155
}
156156

157157
// Obtained through conntrack -L -p tcp -o id
158-
const dumpedFlowsSource = `tcp 6 431998 ESTABLISHED src=10.0.2.2 dst=10.0.2.15 sport=49911 dport=22 src=10.0.2.15 dst=10.0.2.2 sport=22 dport=49911 [ASSURED] mark=0 use=1 id=2993966208`
158+
// With SELinux, there is a "secctx="
159+
// After "sudo sysctl net.netfilter.nf_conntrack_acct=1", there is "packets=" and "bytes="
160+
const dumpedFlowsSource = `tcp 6 431998 ESTABLISHED src=10.0.2.2 dst=10.0.2.15 sport=49911 dport=22 src=10.0.2.15 dst=10.0.2.2 sport=22 dport=49911 [ASSURED] mark=0 use=1 id=2993966208
161+
tcp 6 108 ESTABLISHED src=172.17.0.5 dst=172.17.0.2 sport=47010 dport=80 src=172.17.0.2 dst=172.17.0.5 sport=80 dport=47010 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=4001098880
162+
tcp 6 431970 ESTABLISHED src=192.168.35.116 dst=216.58.213.227 sport=49862 dport=443 packets=11 bytes=1337 src=216.58.213.227 dst=192.168.35.116 sport=443 dport=49862 packets=8 bytes=716 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=943643840`
159163

160164
var wantDumpedFlows = []flow{
161165
{
@@ -186,6 +190,62 @@ var wantDumpedFlows = []flow{
186190
State: "ESTABLISHED",
187191
},
188192
},
193+
{
194+
Original: meta{
195+
Layer3: layer3{
196+
SrcIP: "172.17.0.5",
197+
DstIP: "172.17.0.2",
198+
},
199+
Layer4: layer4{
200+
SrcPort: 47010,
201+
DstPort: 80,
202+
Proto: "tcp",
203+
},
204+
},
205+
Reply: meta{
206+
Layer3: layer3{
207+
SrcIP: "172.17.0.2",
208+
DstIP: "172.17.0.5",
209+
},
210+
Layer4: layer4{
211+
SrcPort: 80,
212+
DstPort: 47010,
213+
Proto: "tcp",
214+
},
215+
},
216+
Independent: meta{
217+
ID: 4001098880,
218+
State: "ESTABLISHED",
219+
},
220+
},
221+
{
222+
Original: meta{
223+
Layer3: layer3{
224+
SrcIP: "192.168.35.116",
225+
DstIP: "216.58.213.227",
226+
},
227+
Layer4: layer4{
228+
SrcPort: 49862,
229+
DstPort: 443,
230+
Proto: "tcp",
231+
},
232+
},
233+
Reply: meta{
234+
Layer3: layer3{
235+
SrcIP: "216.58.213.227",
236+
DstIP: "192.168.35.116",
237+
},
238+
Layer4: layer4{
239+
SrcPort: 443,
240+
DstPort: 49862,
241+
Proto: "tcp",
242+
},
243+
},
244+
Independent: meta{
245+
ID: 943643840,
246+
State: "ESTABLISHED",
247+
},
248+
},
189249
}
190250

191251
func TestDumpedFlowDecoding(t *testing.T) {

0 commit comments

Comments
 (0)