-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathdelayrecorder.py
executable file
·250 lines (209 loc) · 7.89 KB
/
delayrecorder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
#!/usr/bin/env python3
import sys
import argparse
import signal
import time
import serial, serial.tools.list_ports
import csv
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
from dataclasses import dataclass
from typing import List
@dataclass
class Stats:
num_measurements: int
min_delay: float
max_delay: float
mean_delay: float
median_delay: float
std_dev: float
def sigint_handler(signal, frame):
print("Caught KeyboardInterrupt, exiting.")
sys.exit(0)
def parse_arguments():
parser = argparse.ArgumentParser(
description="This script obtains delay measurements from a Glass-to-Glass device connected via USB, "
"saves results and statistics to a CSV file, and displays the results in a histogram plot. "
"You can also use it to display results from saved measurements in a previous test, in which case "
"the CSV file is read from rather than written to."
)
parser.add_argument(
"filename",
nargs="?",
default="results.csv",
type=Path,
help="The name of the CSV file where the data is saved. It is also the name for the saved plot. "
"Default is 'results.csv' which will cause 'results.png' to be created as well. "
"This is the filename used when using the '--readcsv' option as well.",
)
parser.add_argument(
"num_measurements",
nargs="?",
default=100,
type=int,
help="An integer for the number of measurements to save and use when generating statistics. Default is 100.",
)
parser.add_argument(
"--quiet",
"-q",
action="store_true",
help="The script won't print the measurements to the terminal (but will still save them into the CSV file).",
)
parser.add_argument(
"--readcsv",
"-r",
action="store_true",
help="Reads a previously generated CSV and plots it. "
"Be sure to provide the name of the CSV file if it's not the default name.",
)
args = parser.parse_args()
if args.filename.suffix != ".csv":
print("Error: Provided filename is invalid or does not have .csv extension")
sys.exit(1)
return args
def find_arduino_on_serial_port() -> serial.Serial:
devices = serial.tools.list_ports.comports()
for device in devices:
if device.manufacturer is not None:
if "Arduino" in device.manufacturer:
print(f"Found Arduino at {device[0]}")
return serial.Serial(device[0], 115200, timeout=5)
raise ConnectionRefusedError("Did not find Arduino on any serial port. Is it connected?")
def read_measurements_from_arduino(num_measurements: int, quiet_mode: bool) -> List[float]:
serial = find_arduino_on_serial_port()
print(f"Collecting {num_measurements} measurements from the Arduino")
if quiet_mode:
print("Running in quiet mode, won't print the measurements to the terminal")
# Read messages from Arduino
timeout = time.time() + 0.01
while True:
a = serial.readline().decode()
if time.time() > timeout:
break
measurements = []
i = 0
overall_rounds = 0
init_message = 0
while i < num_measurements:
overall_rounds += 1
a = serial.readline().decode()
if "." in a:
init_message = 1
i += 1
a = a.replace("\n", "")
a = a.replace("\r", "")
measurements.append(float(a))
if not quiet_mode:
print(f"G2G Delay trial {i}/{num_measurements}: {a} ms")
else:
print(f"G2G Delay trial {i}/{num_measurements}", end="\r")
else:
if overall_rounds > 0 and init_message == 1:
print(
"Did not receive msmt data from the Arduino for another 5 seconds. "
"Is the phototransistor still sensing the LED?"
)
else:
print(
"""Did not receive msmt data from the Arduino for 5 seconds.
Is the phototransistor sensing the LED on the screen?
Is the correct side of the PT pointing towards the screen (the flat side with the knob on it)?
Is the screen brightness high enough (max recommended)?"""
)
init_message = 1
return measurements
def write_measurements_to_csv(csv_file: Path, measurements: List[float], stats: Stats) -> None:
with open(csv_file, "w") as f:
writer = csv.writer(f)
writer.writerow(["Samples", "Min", "Max", "Mean", "Median", "stdDev"])
writer.writerow(
[
stats.num_measurements,
stats.min_delay,
stats.max_delay,
stats.mean_delay,
stats.median_delay,
stats.std_dev,
]
)
writer.writerow(measurements)
print(f"Saved results to {csv_file}")
def read_measurements_from_csv(csv_file: Path):
with open(csv_file, "r") as f:
reader = csv.reader(f)
for i, row in enumerate(reader):
if i == 0: # header row, do nothing
pass
elif i == 1: # stats values
stats = Stats(*(float(i) for i in row))
elif i == 2: # measurement samples
measurements = [float(i) for i in row]
break # ignore further rows
print(f"Obtained values from {csv_file}")
return measurements, stats
def generate_stats(measurements: List[float]) -> Stats:
measurements_np = np.array(measurements)
min_delay = np.min(measurements_np)
max_delay = np.max(measurements_np)
mean_delay = np.mean(measurements_np)
median_delay = np.median(measurements_np)
std_dev = np.std(measurements_np)
stats = Stats(len(measurements_np), min_delay, max_delay, mean_delay, median_delay, std_dev)
print(f"\nmin: {min_delay:.2f} ms | max: {max_delay:.2f} ms | median: {median_delay:.2f} ms")
print(f"mean: {mean_delay:.2f} ms | std_dev: {std_dev:.2f} ms\n")
return stats
def plot_results(measurements: List[float], stats: Stats, png_file: Path) -> None:
plt.hist(measurements, bins=20)
plt.gcf().canvas.manager.set_window_title(png_file.name)
plt.title("Latency Histogram")
plt.xlabel("Latency (ms)")
plt.ylabel("Frequency")
ax = plt.gca()
props = dict(boxstyle="round", facecolor="wheat", alpha=0.5)
textstr1 = "\n".join(
(
r"$\mathrm{min}=%.2f$" % (stats.min_delay,),
r"$\mathrm{max}=%.2f$" % (stats.max_delay,),
r"$\mathrm{median}=%.2f$" % (stats.median_delay,),
)
)
# place it at position x=0.05, y=0.95, relative to the top and left of the box
ax.text(
0.05,
0.95,
textstr1,
transform=ax.transAxes,
fontsize=14,
verticalalignment="top",
horizontalalignment="left",
bbox=props,
)
textstr2 = "\n".join((r"$\mu=%.2f$" % (stats.mean_delay,), r"$\sigma=%.2f$" % (stats.std_dev,)))
# place it at position x=0.95, y=0.90, relative to the top and right of the box
ax.text(
0.95,
0.95,
textstr2,
transform=ax.transAxes,
fontsize=14,
verticalalignment="top",
horizontalalignment="right",
bbox=props,
)
plt.savefig(png_file)
print(f"Saved histogram to {png_file}")
plt.show()
def main() -> None:
# Set up signal handler to handle keyboard interrupts
signal.signal(signal.SIGINT, sigint_handler)
args = parse_arguments()
if args.readcsv:
g2g_delays, stats = read_measurements_from_csv(args.filename)
else:
g2g_delays = read_measurements_from_arduino(args.num_measurements, args.quiet)
stats = generate_stats(g2g_delays)
write_measurements_to_csv(args.filename, g2g_delays, stats)
plot_results(g2g_delays, stats, args.filename.with_suffix(".png"))
if __name__ == "__main__":
main()