-
Notifications
You must be signed in to change notification settings - Fork 0
/
data_parse.py
166 lines (140 loc) · 4.94 KB
/
data_parse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
'''
Copyright 2023 The Board of Trustees of The Leland Stanford Junior University
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import glob
import os
import pandas as pd
from multiprocessing import Process
def get_last_hops_from_paris_tr(file_path: str) -> pd.DataFrame:
"""
Extract the hop number and IPs for the second-to-last and last hop in
ICMP paris-traceroutes.
:param file_path: file path to the .json formatted scamper trace output
:return: dataframe with the IPs and hop numbers of the second-to-last and
last hops in the traceroutes as well as the stop reason.
"""
def sort_hops(e):
return e['probe_ttl']
def get_sec_last_ip(hops):
# If the list has fewer than two elements, return None
if not isinstance(hops, list) or len(hops) < 2:
print('not and instance of list or len is not >= 2: ', hops)
return None
hops.sort(key=sort_hops)
return hops[-2]['addr']
def get_sec_last_probe_ttl(hops):
# If the list has fewer than two elements, return None
if not isinstance(hops, list) or len(hops) < 2:
print('not and instance of list or len is not >= 2: ', hops)
return None
hops.sort(key=sort_hops)
return hops[-2]['probe_ttl']
df = pd.read_json(file_path, lines=True)
# filter for only 'trace' data
df = df[df["type"] == "trace"]
df['sec_last_ip'] = df['hops'].apply(lambda x: get_sec_last_ip(x))
df['sec_last_hop'] = df['hops'].apply(lambda x: get_sec_last_probe_ttl(x))
df = df[['dst', 'stop_reason', 'hop_count', 'sec_last_ip', 'sec_last_hop']]
return df
def aggregate_data(files: dict) -> pd.DataFrame:
"""
Aggregates data from list of files containing scamper outputs when running ttl_ping
into a single file.
:param files: list of .json files from scamper output
:return: a single aggregated dataframe with column for seq numbers
"""
def get_date(start):
try:
return start['ftime'].split()[0]
except:
return None
def get_start_time(start):
try:
return start['ftime']
except:
return None
def get_start_sec(start):
try:
return start['sec']
except:
return None
def get_rtt(hops):
try:
return hops[0]['rtt']
except:
return None
def get_probe_ttl(hops):
try:
return hops[0]['probe_ttl']
except:
return None
def get_ip_at_ttl(hops):
try:
return hops[0]['addr']
except:
return None
dfs = []
for seq, f in files.items():
f.flush()
df = pd.DataFrame()
try:
df = pd.read_json(f.name, lines=True)
except Exception as e:
print("Could not load json file with seq: " + str(seq))
print(e)
if df.empty:
continue
df = df[df['type'] == 'trace']
df['date'] = df['start'].apply(get_date)
df['seq'] = [seq] * len(df)
df['start_time'] = df['start'].apply(get_start_time)
df['start_sec'] = df['start'].apply(get_start_sec)
if 'hops' in df.columns:
df['ip_at_ttl'] = df['hops'].apply(get_ip_at_ttl)
df['probe_ttl'] = df['hops'].apply(get_probe_ttl)
df['rtt'] = df['hops'].apply(get_rtt)
else:
df['ip_at_ttl'] = [None] * len(df)
df['probe_ttl'] = [None] * len(df)
df['rtt'] = [None] * len(df)
df = df[['date', 'seq', 'dst', 'stop_reason', 'start_time', 'start_sec', 'hop_count', 'ip_at_ttl', 'probe_ttl', 'rtt']]
dfs.append(df)
if len(dfs) == 0:
return pd.DataFrame(columns=['date',
'seq',
'dst',
'stop_reason',
'start_time',
'start_sec',
'hop_count',
'ip_at_ttl',
'probe_ttl',
'rtt'])
all_dfs = pd.concat(dfs)
all_dfs.astype({
'date': 'str',
'seq': 'int32',
'dst': 'str',
'stop_reason': 'str',
'start_time': 'str',
'start_sec': 'int32',
'hop_count': 'int32',
'ip_at_ttl': 'str',
'probe_ttl': 'float',
'rtt': 'float'
}).dtypes
return all_dfs
def agg_for_dir(dir, table_id):
agg_dfs = []
for f in glob.glob(os.path.join(dir, "*")):
agg_dfs.append(aggregate_data(f, table_id))
return agg_dfs