88 "os"
99 "path/filepath"
1010 "strconv"
11+ "strings"
1112 "sync"
1213 "time"
1314
@@ -230,36 +231,82 @@ func makeEmptyFlow() flow {
230231 return f
231232}
232233
234+ func getUntaggedLine (reader * bufio.Reader ) (string , error ) {
235+ // TODO: read bytes?
236+ line , err := reader .ReadString ('\n' )
237+ if err != nil {
238+ return "" , err
239+ }
240+ // Remove [ASSURED]/[UNREPLIED] tags inplace
241+ // TODO: replace in-place?
242+ line = strings .Replace (line , "[ASSURED] " , "" , - 1 )
243+ line = strings .Replace (line , "[UNREPLIED] " , "" , - 1 )
244+ return line , nil
245+ }
246+
233247func decodeStreamedFlow (reader * bufio.Reader ) (flow , error ) {
234248 var (
235249 // TODO: use ints where possible?
236250 omit [10 ]string
237251 f = makeEmptyFlow ()
238252 )
239- l , _ := reader .ReadString ('\n' )
240- // " [NEW] tcp 6 120 SYN_SENT src=127.0.0.1 dst=127.0.0.1 sport=58958 dport=6784 [UNREPLIED] src=127.0.0.1 dst=127.0.0.1 sport=6784 dport=58958 id=1595499776\n"
241- n , err := fmt .Sscanf (l , " %s tcp %s %s %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d %s id=%x\n " ,
242- & omit [0 ],
243- & omit [1 ],
244- & omit [2 ],
245- & f .Independent .State ,
246- & f .Original .Layer3 .SrcIP ,
247- & f .Original .Layer3 .DstIP ,
248- & f .Original .Layer4 .SrcPort ,
249- & f .Original .Layer4 .DstPort ,
250- & f .Reply .Layer3 .SrcIP ,
251- & f .Reply .Layer3 .DstIP ,
252- & f .Reply .Layer4 .SrcPort ,
253- & f .Reply .Layer4 .DstPort ,
254- & omit [3 ],
255- & f .Independent .ID ,
256- )
257253
254+ // Examples:
255+ // " [UPDATE] udp 17 29 src=192.168.2.100 dst=192.168.2.1 sport=57767 dport=53 src=192.168.2.1 dst=192.168.2.100 sport=53 dport=57767"
256+ // " [NEW] tcp 6 120 SYN_SENT src=127.0.0.1 dst=127.0.0.1 sport=58958 dport=6784 [UNREPLIED] src=127.0.0.1 dst=127.0.0.1 sport=6784 dport=58958 id=1595499776"
257+ // " [UPDATE] tcp 6 120 TIME_WAIT src=10.0.2.15 dst=10.0.2.15 sport=51154 dport=4040 src=10.0.2.15 dst=10.0.2.15 sport=4040 dport=51154 [ASSURED] id=3663628160"
258+ // " [DESTROY] tcp 6 src=172.17.0.1 dst=172.17.0.1 sport=34078 dport=53 src=172.17.0.1 dst=10.0.2.15 sport=53 dport=34078 id=3668417984" (note how the timeout and state field is missing)
259+
260+ // Remove tags since they are optional and make parsing harder
261+ line , err := getUntaggedLine (reader )
258262 if err != nil {
259- log .Infof ("Streamed Error: %#v, n=%d, line = %#q" , err , n , l )
260263 return flow {}, err
261264 }
262- log .Infof ("Streamed flow: %v" , f )
265+
266+ // TODO: refactor and probably create a fully-fledged parser, this is just good enough for performance testing
267+ if strings .Contains (line , "[DESTROY]" ) {
268+ // Destroy events don't have a timeout or state field
269+ _ , err = fmt .Sscanf (line , "%s %s %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%x\n " ,
270+ & f .Type ,
271+ & f .Original .Layer4 .Proto ,
272+ & omit [0 ],
273+ & f .Original .Layer3 .SrcIP ,
274+ & f .Original .Layer3 .DstIP ,
275+ & f .Original .Layer4 .SrcPort ,
276+ & f .Original .Layer4 .DstPort ,
277+ & f .Reply .Layer3 .SrcIP ,
278+ & f .Reply .Layer3 .DstIP ,
279+ & f .Reply .Layer4 .SrcPort ,
280+ & f .Reply .Layer4 .DstPort ,
281+ & f .Independent .ID ,
282+ )
283+ } else {
284+ _ , err = fmt .Sscanf (line , "%s %s %s %s %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%x\n " ,
285+ & f .Type ,
286+ & f .Original .Layer4 .Proto ,
287+ & omit [0 ],
288+ & omit [1 ],
289+ & f .Independent .State ,
290+ & f .Original .Layer3 .SrcIP ,
291+ & f .Original .Layer3 .DstIP ,
292+ & f .Original .Layer4 .SrcPort ,
293+ & f .Original .Layer4 .DstPort ,
294+ & f .Reply .Layer3 .SrcIP ,
295+ & f .Reply .Layer3 .DstIP ,
296+ & f .Reply .Layer4 .SrcPort ,
297+ & f .Reply .Layer4 .DstPort ,
298+ & f .Independent .ID ,
299+ )
300+ }
301+
302+ if err != nil {
303+ return flow {}, fmt .Errorf ("Error parsing streamed flow %q: %v " , line , err )
304+ }
305+ if len (f .Type ) < 3 || f .Type [0 ] != '[' || f .Type [len (f .Type )- 1 ] != ']' {
306+ return flow {}, fmt .Errorf ("Unexpected type format: %q" , f .Type )
307+ }
308+ f .Type = strings .ToLower (f .Type [1 : len (f .Type )- 1 ])
309+ f .Reply .Layer4 .Proto = f .Original .Layer4 .Proto
263310 return f , nil
264311}
265312
@@ -301,7 +348,16 @@ func readDumpedFlow(reader *bufio.Reader) (flow, error) {
301348 omit [10 ]string
302349 f = makeEmptyFlow ()
303350 )
304- n , err := fmt .Fscanf (reader , "%s %s %s %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d %s %s %s id=%x\n " ,
351+
352+ // Example:
353+ // " tcp 6 431997 ESTABLISHED src=10.32.0.1 dst=10.32.0.1 sport=50274 dport=4040 src=10.32.0.1 dst=10.32.0.1 sport=4040 dport=50274 [ASSURED] mark=0 use=1 id=407401088c"
354+ // remove tags since they are optional and make parsing harder
355+ line , err := getUntaggedLine (reader )
356+ if err != nil {
357+ return flow {}, err
358+ }
359+
360+ _ , err = fmt .Sscanf (line , "%s %s %s %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d %s %s id=%x\n " ,
305361 & f .Original .Layer4 .Proto ,
306362 & omit [0 ],
307363 & omit [1 ],
@@ -316,12 +372,11 @@ func readDumpedFlow(reader *bufio.Reader) (flow, error) {
316372 & f .Reply .Layer4 .DstPort ,
317373 & omit [2 ],
318374 & omit [3 ],
319- & omit [4 ],
320375 & f .Independent .ID ,
321376 )
322377
323378 if err != nil {
324- return flow {}, err
379+ return flow {}, fmt . Errorf ( "Error parsing dumped flow %q: %v " , line , err )
325380 }
326381
327382 f .Reply .Layer4 .Proto = f .Original .Layer4 .Proto
0 commit comments