@@ -8,8 +8,10 @@ import (
88 "io/ioutil"
99 "path/filepath"
1010 "strconv"
11+ "strings"
1112 "sync"
1213 "time"
14+ "unicode"
1315
1416 log "github.com/Sirupsen/logrus"
1517
@@ -250,6 +252,58 @@ func removeInplace(s, sep []byte) []byte {
250252 return s [:len (s )- len (sep )]
251253}
252254
255+ // decodeFlowKeyValues parses the key-values from a conntrack line and updates the flow
256+ // It only considers the following key-values:
257+ // src=127.0.0.1 dst=127.0.0.1 sport=58958 dport=6784 src=127.0.0.1 dst=127.0.0.1 sport=6784 dport=58958 id=1595499776
258+ // Keys can be present twice, so the order is important.
259+ // Conntrack could add other key-values such as secctx=, packets=, bytes=. Those are ignored.
260+ func decodeFlowKeyValues (line []byte , f * flow ) error {
261+ var err error
262+ for _ , field := range strings .FieldsFunc (string (line ), func (c rune ) bool { return unicode .IsSpace (c ) }) {
263+ kv := strings .SplitN (field , "=" , 2 )
264+ if len (kv ) != 2 {
265+ continue
266+ }
267+ key := kv [0 ]
268+ value := kv [1 ]
269+ firstTupleSet := f .Original .Layer4 .DstPort != 0
270+ switch {
271+ case key == "src" :
272+ if ! firstTupleSet {
273+ f .Original .Layer3 .SrcIP = value
274+ } else {
275+ f .Reply .Layer3 .SrcIP = value
276+ }
277+
278+ case key == "dst" :
279+ if ! firstTupleSet {
280+ f .Original .Layer3 .DstIP = value
281+ } else {
282+ f .Reply .Layer3 .DstIP = value
283+ }
284+
285+ case key == "sport" :
286+ if ! firstTupleSet {
287+ f .Original .Layer4 .SrcPort , err = strconv .Atoi (value )
288+ } else {
289+ f .Reply .Layer4 .SrcPort , err = strconv .Atoi (value )
290+ }
291+
292+ case key == "dport" :
293+ if ! firstTupleSet {
294+ f .Original .Layer4 .DstPort , err = strconv .Atoi (value )
295+ } else {
296+ f .Reply .Layer4 .DstPort , err = strconv .Atoi (value )
297+ }
298+
299+ case key == "id" :
300+ f .Independent .ID , err = strconv .ParseInt (value , 10 , 64 )
301+ }
302+ }
303+
304+ return err
305+ }
306+
253307func decodeStreamedFlow (scanner * bufio.Scanner ) (flow , error ) {
254308 var (
255309 // Use ints for parsing unused fields since their allocations
@@ -273,42 +327,29 @@ func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) {
273327 line = bytes .TrimLeft (line , " " )
274328 if bytes .HasPrefix (line , destroyTypeB ) {
275329 // Destroy events don't have a timeout or state field
276- _ , err = fmt .Sscanf (string (line ), "%s %s %d src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d " ,
330+ _ , err = fmt .Sscanf (string (line ), "%s %s %d" ,
277331 & f .Type ,
278332 & f .Original .Layer4 .Proto ,
279333 & unused [0 ],
280- & f .Original .Layer3 .SrcIP ,
281- & f .Original .Layer3 .DstIP ,
282- & f .Original .Layer4 .SrcPort ,
283- & f .Original .Layer4 .DstPort ,
284- & f .Reply .Layer3 .SrcIP ,
285- & f .Reply .Layer3 .DstIP ,
286- & f .Reply .Layer4 .SrcPort ,
287- & f .Reply .Layer4 .DstPort ,
288- & f .Independent .ID ,
289334 )
290335 } else {
291- _ , err = fmt .Sscanf (string (line ), "%s %s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d " ,
336+ _ , err = fmt .Sscanf (string (line ), "%s %s %d %d %s" ,
292337 & f .Type ,
293338 & f .Original .Layer4 .Proto ,
294339 & unused [0 ],
295340 & unused [1 ],
296341 & f .Independent .State ,
297- & f .Original .Layer3 .SrcIP ,
298- & f .Original .Layer3 .DstIP ,
299- & f .Original .Layer4 .SrcPort ,
300- & f .Original .Layer4 .DstPort ,
301- & f .Reply .Layer3 .SrcIP ,
302- & f .Reply .Layer3 .DstIP ,
303- & f .Reply .Layer4 .SrcPort ,
304- & f .Reply .Layer4 .DstPort ,
305- & f .Independent .ID ,
306342 )
307343 }
344+ if err != nil {
345+ return flow {}, fmt .Errorf ("Error parsing streamed flow %q: %v " , line , err )
346+ }
308347
348+ err = decodeFlowKeyValues (line , & f )
309349 if err != nil {
310350 return flow {}, fmt .Errorf ("Error parsing streamed flow %q: %v " , line , err )
311351 }
352+
312353 f .Reply .Layer4 .Proto = f .Original .Layer4 .Proto
313354 return f , nil
314355}
@@ -353,32 +394,27 @@ func decodeDumpedFlow(scanner *bufio.Scanner) (flow, error) {
353394 f flow
354395 )
355396
356- // Example:
357- // " tcp 6 431997 ESTABLISHED src=10.32.0.1 dst=10.32.0.1 sport=50274 dport=4040 src=10.32.0.1 dst=10.32.0.1 sport=4040 dport=50274 [ASSURED] mark=0 use=1 id=407401088c"
397+ // Examples with different formats:
398+ // With SELinux, there is a "secctx="
399+ // After "sudo sysctl net.netfilter.nf_conntrack_acct=1", there is "packets=" and "bytes="
400+ //
401+ // "tcp 6 431997 ESTABLISHED src=10.32.0.1 dst=10.32.0.1 sport=50274 dport=4040 src=10.32.0.1 dst=10.32.0.1 sport=4040 dport=50274 [ASSURED] mark=0 use=1 id=407401088"
402+ // "tcp 6 431998 ESTABLISHED src=10.0.2.2 dst=10.0.2.15 sport=49911 dport=22 src=10.0.2.15 dst=10.0.2.2 sport=22 dport=49911 [ASSURED] mark=0 use=1 id=2993966208"
403+ // "tcp 6 108 ESTABLISHED src=172.17.0.5 dst=172.17.0.2 sport=47010 dport=80 src=172.17.0.2 dst=172.17.0.5 sport=80 dport=47010 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=4001098880"
404+ // "tcp 6 431970 ESTABLISHED src=192.168.35.116 dst=216.58.213.227 sport=49862 dport=443 packets=11 bytes=1337 src=216.58.213.227 dst=192.168.35.116 sport=443 dport=49862 packets=8 bytes=716 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=943643840"
405+
358406 // remove tags since they are optional and make parsing harder
359407 line , err := getUntaggedLine (scanner )
360408 if err != nil {
361409 return flow {}, err
362410 }
363411
364- _ , err = fmt .Sscanf (string (line ), "%s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d mark=%d use=%d id=%d" ,
365- & f .Original .Layer4 .Proto ,
366- & unused [0 ],
367- & unused [1 ],
368- & f .Independent .State ,
369- & f .Original .Layer3 .SrcIP ,
370- & f .Original .Layer3 .DstIP ,
371- & f .Original .Layer4 .SrcPort ,
372- & f .Original .Layer4 .DstPort ,
373- & f .Reply .Layer3 .SrcIP ,
374- & f .Reply .Layer3 .DstIP ,
375- & f .Reply .Layer4 .SrcPort ,
376- & f .Reply .Layer4 .DstPort ,
377- & unused [2 ],
378- & unused [3 ],
379- & f .Independent .ID ,
380- )
412+ _ , err = fmt .Sscanf (string (line ), "%s %d %d %s" , & f .Original .Layer4 .Proto , & unused [0 ], & unused [1 ], & f .Independent .State )
413+ if err != nil {
414+ return flow {}, fmt .Errorf ("Error parsing dumped flow %q: %v " , line , err )
415+ }
381416
417+ err = decodeFlowKeyValues (line , & f )
382418 if err != nil {
383419 return flow {}, fmt .Errorf ("Error parsing dumped flow %q: %v " , line , err )
384420 }
0 commit comments