-
Notifications
You must be signed in to change notification settings - Fork 0
/
wifi_plot.py
104 lines (78 loc) · 2.62 KB
/
wifi_plot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#!/usr/bin/env python3
"""
plots data collected by wifi_scan.py
Typically run this on your laptop unless you have a graphical display or VNC setup on Raspberry Pi.
"""
from __future__ import annotations
import argparse
from pathlib import Path
import json
from datetime import datetime
import numpy as np
import pandas
import xarray
from matplotlib.pyplot import show
MAX_SSID = 100
FMT = "%Y-%m-%dT%H_%M_%S"
def load(filename) -> xarray.DataArray:
"""
Loads data collected by wifi_scan.py
"""
filename = Path(filename).expanduser()
ddata = json.loads(filename.read_text())
data = xarray.DataArray(
dims=["time", "cell", "params"],
coords=[
(
"time",
pandas.to_datetime(list(ddata.keys()), format=FMT),
),
("cell", range(MAX_SSID)),
("params", ["essid", "bssid", "dbm"]),
],
data=np.empty((len(ddata), MAX_SSID, 3), dtype="<U32"),
)
for time, bssids in ddata.items():
for i, (bssid, v) in enumerate(bssids.items()):
t = datetime.strptime(time, FMT)
data.loc[t][i] = [v["essid"], bssid, v["signal_level_dBm"]]
return data
def select(data: xarray.DataArray, max_seen_frac) -> xarray.DataArray:
"""
Removes stationary stations from data, based on histogram of SSID.
1. get a list of all the BSSIDs seen across time
2. get SSIDs that are not seen persistently vs. time
The code below is not necessarily the most efficient way to select the data.
"""
df = data.loc[:, :, "bssid"].to_dataframe("ssid")
bssid_counts = df.value_counts()
Ntime = len(data.time)
bssid_seen_frac = bssid_counts / Ntime
i_mobile = (bssid_seen_frac < max_seen_frac).droplevel(0)
mobile_bssids = i_mobile[i_mobile].index.values
# %% iterate over time
Nmobile = xarray.DataArray(
dims=["time"],
coords=[("time", data.time.data)],
data=np.zeros(len(data.time), dtype=int),
)
for dt in data:
Nmobile.loc[dt.time] = dt.loc[:, "bssid"].isin(mobile_bssids).sum()
return Nmobile
if __name__ == "__main__":
p = argparse.ArgumentParser(
description="Plots data collected by wifi_scan.py"
)
p.add_argument("filename", help="JSON file containing wifi_scan.py data")
p.add_argument(
"fixed_frac",
help="fraction of time device must be detected to be declared stationary.",
type=float,
default=0.5,
nargs="?",
)
args = p.parse_args()
data = load(args.filename)
Nmobile = select(data, max_seen_frac=args.fixed_frac)
Nmobile.plot()
show()