From 65e7dc07806c150fdcad402b9e95415d8032c499 Mon Sep 17 00:00:00 2001 From: Tjomsaas <mat@monil.com> Date: Wed, 18 Dec 2024 10:07:25 +0100 Subject: [PATCH] ruff run and added github action to check formatting --- .github/workflows/ruff.yaml | 9 ++ example.py | 26 +++-- example_mp.py | 35 +++++-- setup.py | 2 +- src/power_profiler.py | 87 ++++++++++------ src/ppk2_api/ppk2_api.py | 193 ++++++++++++++++++++++-------------- 6 files changed, 224 insertions(+), 128 deletions(-) create mode 100644 .github/workflows/ruff.yaml diff --git a/.github/workflows/ruff.yaml b/.github/workflows/ruff.yaml new file mode 100644 index 0000000..a6da596 --- /dev/null +++ b/.github/workflows/ruff.yaml @@ -0,0 +1,9 @@ +name: Ruff +on: [ push, pull_request ] +jobs: + ruff: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: astral-sh/ruff-action@v3 + \ No newline at end of file diff --git a/example.py b/example.py index e89ef7b..5e0b031 100644 --- a/example.py +++ b/example.py @@ -1,4 +1,3 @@ - """ Basic usage of PPK2 Python API. The basic ampere mode sequence is: @@ -6,6 +5,7 @@ 2. set ampere mode 3. read stream of data """ + import time from ppk2_api.ppk2_api import PPK2_API @@ -22,17 +22,20 @@ ppk2_test.get_modifiers() ppk2_test.set_source_voltage(3300) -ppk2_test.use_source_meter() # set source meter mode -ppk2_test.toggle_DUT_power("ON") # enable DUT power +# set source meter mode +ppk2_test.use_source_meter() +# enable DUT power +ppk2_test.toggle_DUT_power("ON") +# start measuring +ppk2_test.start_measuring() -ppk2_test.start_measuring() # start measuring # measurements are a constant stream of bytes # the number of measurements in one sampling period depends on the wait between serial reads # it appears the maximum number of bytes received is 1024 # the sampling rate of the PPK2 is 100 samples per millisecond for i in range(0, 1000): read_data = ppk2_test.get_data() - if read_data != b'': + if read_data != b"": samples, raw_digital = ppk2_test.get_samples(read_data) print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA") @@ -46,14 +49,15 @@ print() time.sleep(0.01) -ppk2_test.toggle_DUT_power("OFF") # disable DUT power - -ppk2_test.use_ampere_meter() # set ampere meter mode +# disable DUT power +ppk2_test.toggle_DUT_power("OFF") +# set ampere meter mode +ppk2_test.use_ampere_meter() ppk2_test.start_measuring() for i in range(0, 1000): read_data = ppk2_test.get_data() - if read_data != b'': + if read_data != b"": samples, raw_digital = ppk2_test.get_samples(read_data) print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA") @@ -65,6 +69,8 @@ # Print last 10 values of each channel print(ch[-10:]) print() - time.sleep(0.01) # lower time between sampling -> less samples read in one sampling period + + # lower time between sampling -> less samples read in one sampling period + time.sleep(0.01) ppk2_test.stop_measuring() diff --git a/example_mp.py b/example_mp.py index 523dcd9..e0600c9 100644 --- a/example_mp.py +++ b/example_mp.py @@ -1,4 +1,3 @@ - """ Basic usage of PPK2 Python API - multiprocessing version. The basic ampere mode sequence is: @@ -6,6 +5,7 @@ 2. set ampere mode 3. read stream of data """ + import time from ppk2_api.ppk2_api import PPK2_MP as PPK2_API @@ -18,24 +18,34 @@ print(f"Too many connected PPK2's: {ppk2s_connected}") exit() -ppk2_test = PPK2_API(ppk2_port, buffer_max_size_seconds=1, buffer_chunk_seconds=0.01, timeout=1, write_timeout=1, exclusive=True) +ppk2_test = PPK2_API( + ppk2_port, + buffer_max_size_seconds=1, + buffer_chunk_seconds=0.01, + timeout=1, + write_timeout=1, + exclusive=True, +) ppk2_test.get_modifiers() ppk2_test.set_source_voltage(3300) """ Source mode example """ -ppk2_test.use_source_meter() # set source meter mode -ppk2_test.toggle_DUT_power("ON") # enable DUT power +# set source meter mode +ppk2_test.use_source_meter() +# enable DUT power +ppk2_test.toggle_DUT_power("ON") +# start measuring +ppk2_test.start_measuring() -ppk2_test.start_measuring() # start measuring # measurements are a constant stream of bytes # the number of measurements in one sampling period depends on the wait between serial reads # it appears the maximum number of bytes received is 1024 # the sampling rate of the PPK2 is 100 samples per millisecond while True: read_data = ppk2_test.get_data() - if read_data != b'': + if read_data != b"": samples, raw_digital = ppk2_test.get_samples(read_data) print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA") @@ -50,18 +60,21 @@ time.sleep(0.001) -ppk2_test.toggle_DUT_power("OFF") # disable DUT power + +# disable DUT power +ppk2_test.toggle_DUT_power("OFF") ppk2_test.stop_measuring() """ Ampere mode example """ -ppk2_test.use_ampere_meter() # set ampere meter mode +# set ampere meter mode +ppk2_test.use_ampere_meter() ppk2_test.start_measuring() while True: read_data = ppk2_test.get_data() - if read_data != b'': + if read_data != b"": samples, raw_digital = ppk2_test.get_samples(read_data) print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA") @@ -73,6 +86,8 @@ # Print last 10 values of each channel print(ch[-10:]) print() - time.sleep(0.001) # lower time between sampling -> less samples read in one sampling period + + # lower time between sampling -> less samples read in one sampling period + time.sleep(0.001) ppk2_test.stop_measuring() diff --git a/setup.py b/setup.py index bab7e24..ec1fc0d 100644 --- a/setup.py +++ b/setup.py @@ -37,4 +37,4 @@ def read(*names, **kwargs): "License :: OSI Approved :: GNU General Public License v2 (GPLv2)", "Operating System :: OS Independent", ], -) \ No newline at end of file +) diff --git a/src/power_profiler.py b/src/power_profiler.py index 7af0f6f..478290c 100644 --- a/src/power_profiler.py +++ b/src/power_profiler.py @@ -2,19 +2,21 @@ import csv import datetime from threading import Thread + # import numpy as np # import matplotlib.pyplot as plt # import matplotlib from ppk2_api.ppk2_api import PPK2_MP as PPK2_API -class PowerProfiler(): + +class PowerProfiler: def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None): """Initialize PPK2 power profiler with serial""" self.measuring = None self.measurement_thread = None self.ppk2 = None - print(f"Initing power profiler") + print("Initing power profiler") # try: if serial_port: @@ -26,7 +28,8 @@ def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None): self.ppk2 = PPK2_API(serial_port) try: - ret = self.ppk2.get_modifiers() # try to read modifiers, if it fails serial port is probably not correct + # try to read modifiers, if it fails serial port is probably not correct + ret = self.ppk2.get_modifiers() print(f"Initialized ppk2 api: {ret}") except Exception as e: print(f"Error initializing power profiler: {e}") @@ -35,13 +38,16 @@ def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None): if not ret: self.ppk2 = None - raise Exception(f"Error when initing PowerProfiler with serial port {serial_port}") + raise Exception( + f"Error when initing PowerProfiler with serial port {serial_port}" + ) else: self.ppk2.use_source_meter() self.source_voltage_mV = source_voltage_mV - self.ppk2.set_source_voltage(self.source_voltage_mV) # set to 3.3V + # set to 3.3V + self.ppk2.set_source_voltage(self.source_voltage_mV) print(f"Set power profiler source voltage: {self.source_voltage_mV}") @@ -62,7 +68,7 @@ def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None): # write to csv self.filename = filename if self.filename is not None: - with open(self.filename, 'w', newline='') as file: + with open(self.filename, "w", newline="") as file: writer = csv.writer(file) row = [] for key in ["ts", "avg1000"]: @@ -71,10 +77,10 @@ def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None): def write_csv_rows(self, samples): """Write csv row""" - with open(self.filename, 'a', newline='') as file: + with open(self.filename, "a", newline="") as file: writer = csv.writer(file) for sample in samples: - row = [datetime.datetime.now().strftime('%d-%m-%Y %H:%M:%S.%f'), sample] + row = [datetime.datetime.now().strftime("%d-%m-%Y %H:%M:%S.%f"), sample] writer.writerow(row) def delete_power_profiler(self): @@ -85,26 +91,26 @@ def delete_power_profiler(self): print("Deleting power profiler") if self.measurement_thread: - print(f"Joining measurement thread") + print("Joining measurement thread") self.measurement_thread.join() self.measurement_thread = None if self.ppk2: - print(f"Disabling ppk2 power") + print("Disabling ppk2 power") self.disable_power() del self.ppk2 - print(f"Deleted power profiler") + print("Deleted power profiler") def discover_port(self): """Discovers ppk2 serial port""" ppk2s_connected = PPK2_API.list_devices() - if(len(ppk2s_connected) == 1): + if len(ppk2s_connected) == 1: ppk2_port = ppk2s_connected[0] - print(f'Found PPK2 at {ppk2_port}') + print(f"Found PPK2 at {ppk2_port}") return ppk2_port else: - print(f'Too many connected PPK2\'s: {ppk2s_connected}') + print(f"Too many connected PPK2's: {ppk2s_connected}") return None def enable_power(self): @@ -124,16 +130,21 @@ def disable_power(self): def measurement_loop(self): """Endless measurement loop will run in a thread""" while True and not self.stop: - if self.measuring: # read data if currently measuring + # read data if currently measuring + if self.measuring: read_data = self.ppk2.get_data() - if read_data != b'': + if read_data != b"": samples = self.ppk2.get_samples(read_data) - self.current_measurements += samples # can easily sum lists, will append individual data - time.sleep(0.001) # TODO figure out correct sleep duration + # can easily sum lists, will append individual data + self.current_measurements += samples + # TODO figure out correct sleep duration + time.sleep(0.001) def _average_samples(self, list, window_size): """Average samples based on window size""" - chunks = [list[val:val + window_size] for val in range(0, len(list), window_size)] + chunks = [ + list[val : val + window_size] for val in range(0, len(list), window_size) + ] avgs = [] for chunk in chunks: avgs.append(sum(chunk) / len(chunk)) @@ -142,19 +153,24 @@ def _average_samples(self, list, window_size): def start_measuring(self): """Start measuring""" - if not self.measuring: # toggle measuring flag only if currently not measuring - self.current_measurements = [] # reset current measurements - self.measuring = True # set internal flag - self.ppk2.start_measuring() # send command to ppk2 + # toggle measuring flag only if currently not measuring + if not self.measuring: + # reset current measurements + self.current_measurements = [] + # set internal flag + self.measuring = True + # send command to ppk2 + self.ppk2.start_measuring() self.measurement_start_time = time.time() def stop_measuring(self): """Stop measuring and return average of period""" self.measurement_stop_time = time.time() self.measuring = False - self.ppk2.stop_measuring() # send command to ppk2 + # send command to ppk2 + self.ppk2.stop_measuring() - #samples_average = self._average_samples(self.current_measurements, 1000) + # samples_average = self._average_samples(self.current_measurements, 1000) if self.filename is not None: self.write_csv_rows(self.current_measurements) @@ -172,24 +188,33 @@ def get_average_current_mA(self): if len(self.current_measurements) == 0: return 0 - average_current_mA = (sum(self.current_measurements) / len(self.current_measurements)) / 1000 # measurements are in microamperes, divide by 1000 + # measurements are in microamperes, divide by 1000 + average_current_mA = ( + sum(self.current_measurements) / len(self.current_measurements) + ) / 1000 return average_current_mA def get_average_power_consumption_mWh(self): """Return average power consumption of last measurement in mWh""" average_current_mA = self.get_average_current_mA() - average_power_mW = (self.source_voltage_mV / 1000) * average_current_mA # divide by 1000 as source voltage is in millivolts - this gives us milliwatts - measurement_duration_h = self.get_measurement_duration_s() / 3600 # duration in seconds, divide by 3600 to get hours + # divide by 1000 as source voltage is in millivolts - this gives us milliwatts + average_power_mW = (self.source_voltage_mV / 1000) * average_current_mA + # duration in seconds, divide by 3600 to get hours + measurement_duration_h = self.get_measurement_duration_s() / 3600 average_consumption_mWh = average_power_mW * measurement_duration_h return average_consumption_mWh def get_average_charge_mC(self): """Returns average charge in milli coulomb""" average_current_mA = self.get_average_current_mA() - measurement_duration_s = self.get_measurement_duration_s() # in seconds + # in seconds + measurement_duration_s = self.get_measurement_duration_s() return average_current_mA * measurement_duration_s def get_measurement_duration_s(self): """Returns duration of measurement""" - measurement_duration_s = (self.measurement_stop_time - self.measurement_start_time) # measurement duration in seconds - return measurement_duration_s \ No newline at end of file + # measurement duration in seconds + measurement_duration_s = ( + self.measurement_stop_time - self.measurement_start_time + ) + return measurement_duration_s diff --git a/src/ppk2_api/ppk2_api.py b/src/ppk2_api/ppk2_api.py index 4b39a69..decdc7f 100644 --- a/src/ppk2_api/ppk2_api.py +++ b/src/ppk2_api/ppk2_api.py @@ -12,8 +12,10 @@ import queue import threading -class PPK2_Command(): + +class PPK2_Command: """Serial command opcodes""" + NO_OP = 0x00 TRIGGER_SET = 0x01 AVG_NUM_SET = 0x02 # no-firmware @@ -24,11 +26,11 @@ class PPK2_Command(): AVERAGE_STOP = 0x07 RANGE_SET = 0x08 LCD_SET = 0x09 - TRIGGER_STOP = 0x0a - DEVICE_RUNNING_SET = 0x0c - REGULATOR_SET = 0x0d - SWITCH_POINT_DOWN = 0x0e - SWITCH_POINT_UP = 0x0f + TRIGGER_STOP = 0x0A + DEVICE_RUNNING_SET = 0x0C + REGULATOR_SET = 0x0D + SWITCH_POINT_DOWN = 0x0E + SWITCH_POINT_UP = 0x0F TRIGGER_EXT_TOGGLE = 0x11 SET_POWER_MODE = 0x11 RES_USER_SET = 0x12 @@ -39,18 +41,19 @@ class PPK2_Command(): SET_USER_GAINS = 0x25 -class PPK2_Modes(): +class PPK2_Modes: """PPK2 measurement modes""" + AMPERE_MODE = "AMPERE_MODE" SOURCE_MODE = "SOURCE_MODE" -class PPK2_API(): +class PPK2_API: def __init__(self, port: str, **kwargs): - ''' + """ port - port where PPK2 is connected **kwargs - keyword arguments to pass to the pySerial constructor - ''' + """ self.ser = None self.ser = serial.Serial(port, **kwargs) @@ -66,7 +69,7 @@ def __init__(self, port: str, **kwargs): "I": {"0": 0, "1": 0, "2": 0, "3": 0, "4": 0}, "UG": {"0": 1, "1": 1, "2": 1, "3": 1, "4": 1}, "HW": None, - "IA": None + "IA": None, } self.vdd_low = 800 @@ -93,7 +96,7 @@ def __init__(self, port: str, **kwargs): self.after_spike = 0 # adc measurement buffer remainder and len of remainder - self.remainder = {"sequence": b'', "len": 0} + self.remainder = {"sequence": b"", "len": 0} def __del__(self): """Destructor""" @@ -121,7 +124,8 @@ def _write_serial(self, cmd_tuple): def _twos_comp(self, val): """Compute the 2's complement of int32 value""" if (val & (1 << (32 - 1))) != 0: - val = val - (1 << 32) # compute negative value + # compute negative value + val = val - (1 << 32) return val def _convert_source_voltage(self, mV): @@ -138,11 +142,13 @@ def _convert_source_voltage(self, mV): # get difference to baseline (the baseline is 800mV but the initial offset is 32) diff_to_baseline = mV - self.vdd_low + offset base_b_1 = 3 - base_b_2 = 0 # is actually 32 - compensated with above offset + # is actually 32 - compensated with above offset + base_b_2 = 0 # get the number of times we have to increase the first byte of the command ratio = int(diff_to_baseline / 256) - remainder = diff_to_baseline % 256 # get the remainder for byte 2 + # get the remainder for byte 2 + remainder = diff_to_baseline % 256 set_b_1 = base_b_1 + ratio set_b_2 = base_b_2 + remainder @@ -158,7 +164,7 @@ def _read_metadata(self): time.sleep(0.1) # TODO add a read_until serial read function with a timeout - if read != b'' and "END" in read.decode("utf-8"): + if read != b"" and "END" in read.decode("utf-8"): return read.decode("utf-8") def _parse_metadata(self, metadata): @@ -172,23 +178,22 @@ def _parse_metadata(self, metadata): if key == data_pair[0]: self.modifiers[key] = data_pair[1] for ind in range(0, 5): - if key+str(ind) == data_pair[0]: + if key + str(ind) == data_pair[0]: if "R" in data_pair[0]: # problem on some PPK2s with wrong calibration values - this doesn't fix it if float(data_pair[1]) != 0: - self.modifiers[key][str(ind)] = float( - data_pair[1]) + self.modifiers[key][str(ind)] = float(data_pair[1]) else: - self.modifiers[key][str(ind)] = float( - data_pair[1]) + self.modifiers[key][str(ind)] = float(data_pair[1]) return True except Exception as e: # if exception triggers serial port is probably not correct + print(f"Failde to parse metadata: {e}") return None def _generate_mask(self, bits, pos): pos = pos - mask = ((2**bits-1) << pos) + mask = (2**bits - 1) << pos mask = self._twos_comp(mask) return {"mask": mask, "pos": pos} @@ -199,15 +204,18 @@ def _get_masked_value(self, value, meas, is_bits=False): def _handle_raw_data(self, adc_value): """Convert raw value to analog value""" try: - current_measurement_range = min(self._get_masked_value( - adc_value, self.MEAS_RANGE), 4) # 5 is the number of parameters + # 5 is the number of parameters + current_measurement_range = min( + self._get_masked_value(adc_value, self.MEAS_RANGE), 4 + ) adc_result = self._get_masked_value(adc_value, self.MEAS_ADC) * 4 bits = self._get_masked_value(adc_value, self.MEAS_LOGIC) - analog_value = self.get_adc_result( - current_measurement_range, adc_result) * 10**6 + analog_value = ( + self.get_adc_result(current_measurement_range, adc_result) * 10**6 + ) return analog_value, bits except Exception as e: - print("Measurement outside of range!") + print(f"Measurement outside of range! {e}") return None, None @staticmethod @@ -219,7 +227,8 @@ def list_devices(): devices = [ (port.device, port.serial_number[:8]) for port in ports - if port.description.startswith("nRF Connect USB CDC ACM") and port.location.endswith("1") + if port.description.startswith("nRF Connect USB CDC ACM") + and port.location.endswith("1") ] else: devices = [ @@ -236,7 +245,7 @@ def get_data(self): def get_modifiers(self): """Gets and sets modifiers from device memory""" - self._write_serial((PPK2_Command.GET_META_DATA, )) + self._write_serial((PPK2_Command.GET_META_DATA,)) metadata = self._read_metadata() ret = self._parse_metadata(metadata) return ret @@ -249,14 +258,14 @@ def start_measuring(self): if self.mode == PPK2_Modes.AMPERE_MODE: raise Exception("Input voltage not set!") - self._write_serial((PPK2_Command.AVERAGE_START, )) + self._write_serial((PPK2_Command.AVERAGE_START,)) def stop_measuring(self): """Stop continuous measurement""" - self._write_serial((PPK2_Command.AVERAGE_STOP, )) + self._write_serial((PPK2_Command.AVERAGE_STOP,)) def set_source_voltage(self, mV): - """Inits device - based on observation only REGULATOR_SET is the command. + """Inits device - based on observation only REGULATOR_SET is the command. The other two values correspond to the voltage level. 800mV is the lowest setting - [3,32] - the values then increase linearly @@ -268,32 +277,44 @@ def set_source_voltage(self, mV): def toggle_DUT_power(self, state): """Toggle DUT power based on parameter""" if state == "ON": + # 12,1 self._write_serial( - (PPK2_Command.DEVICE_RUNNING_SET, PPK2_Command.TRIGGER_SET)) # 12,1 + (PPK2_Command.DEVICE_RUNNING_SET, PPK2_Command.TRIGGER_SET) + ) if state == "OFF": - self._write_serial( - (PPK2_Command.DEVICE_RUNNING_SET, PPK2_Command.NO_OP)) # 12,0 + # 12,0 + self._write_serial((PPK2_Command.DEVICE_RUNNING_SET, PPK2_Command.NO_OP)) def use_ampere_meter(self): """Configure device to use ampere meter""" self.mode = PPK2_Modes.AMPERE_MODE - self._write_serial((PPK2_Command.SET_POWER_MODE, - PPK2_Command.TRIGGER_SET)) # 17,1 + # 17,1 + self._write_serial((PPK2_Command.SET_POWER_MODE, PPK2_Command.TRIGGER_SET)) def use_source_meter(self): """Configure device to use source meter""" self.mode = PPK2_Modes.SOURCE_MODE - self._write_serial((PPK2_Command.SET_POWER_MODE, - PPK2_Command.AVG_NUM_SET)) # 17,2 + # 17,2 + self._write_serial((PPK2_Command.SET_POWER_MODE, PPK2_Command.AVG_NUM_SET)) def get_adc_result(self, current_range, adc_value): """Get result of adc conversion""" current_range = str(current_range) result_without_gain = (adc_value - self.modifiers["O"][current_range]) * ( - self.adc_mult / self.modifiers["R"][current_range]) - adc = self.modifiers["UG"][current_range] * (result_without_gain * (self.modifiers["GS"][current_range] * result_without_gain + self.modifiers["GI"][current_range]) + ( - self.modifiers["S"][current_range] * (self.current_vdd / 1000) + self.modifiers["I"][current_range])) + self.adc_mult / self.modifiers["R"][current_range] + ) + adc = self.modifiers["UG"][current_range] * ( + result_without_gain + * ( + self.modifiers["GS"][current_range] * result_without_gain + + self.modifiers["GI"][current_range] + ) + + ( + self.modifiers["S"][current_range] * (self.current_vdd / 1000) + + self.modifiers["I"][current_range] + ) + ) prev_rolling_avg = self.rolling_avg prev_rolling_avg4 = self.rolling_avg4 @@ -302,12 +323,18 @@ def get_adc_result(self, current_range, adc_value): if self.rolling_avg is None: self.rolling_avg = adc else: - self.rolling_avg = self.spike_filter_alpha * adc + (1 - self.spike_filter_alpha) * self.rolling_avg - + self.rolling_avg = ( + self.spike_filter_alpha * adc + + (1 - self.spike_filter_alpha) * self.rolling_avg + ) + if self.rolling_avg4 is None: self.rolling_avg4 = adc else: - self.rolling_avg4 = self.spike_filter_alpha5 * adc + (1 - self.spike_filter_alpha5) * self.rolling_avg4 + self.rolling_avg4 = ( + self.spike_filter_alpha5 * adc + + (1 - self.spike_filter_alpha5) * self.rolling_avg4 + ) if self.prev_range is None: self.prev_range = current_range @@ -326,7 +353,7 @@ def get_adc_result(self, current_range, adc_value): adc = self.rolling_avg4 else: adc = self.rolling_avg - + self.after_spike -= 1 self.prev_range = current_range @@ -334,7 +361,8 @@ def get_adc_result(self, current_range, adc_value): def _digital_to_analog(self, adc_value): """Convert discrete value to analog value""" - return int.from_bytes(adc_value, byteorder="little", signed=False) # convert reading to analog value + # convert reading to analog value + return int.from_bytes(adc_value, byteorder="little", signed=False) def digital_channels(self, bits): """ @@ -363,14 +391,13 @@ def get_samples(self, buf): Manipulation of samples is left to the user. See example for more info. """ - - sample_size = 4 # one analog value is 4 bytes in size + # one analog value is 4 bytes in size + sample_size = 4 offset = self.remainder["len"] samples = [] raw_digital_output = [] - first_reading = ( - self.remainder["sequence"] + buf[0:sample_size-offset])[:4] + first_reading = (self.remainder["sequence"] + buf[0 : sample_size - offset])[:4] adc_val = self._digital_to_analog(first_reading) measurement, bits = self._handle_raw_data(adc_val) if measurement is not None: @@ -381,7 +408,7 @@ def get_samples(self, buf): offset = sample_size - offset while offset <= len(buf) - sample_size: - next_val = buf[offset:offset + sample_size] + next_val = buf[offset : offset + sample_size] offset += sample_size adc_val = self._digital_to_analog(next_val) measurement, bits = self._handle_raw_data(adc_val) @@ -390,18 +417,19 @@ def get_samples(self, buf): if bits is not None: raw_digital_output.append(bits) - self.remainder["sequence"] = buf[offset:len(buf)] - self.remainder["len"] = len(buf)-offset + self.remainder["sequence"] = buf[offset : len(buf)] + self.remainder["len"] = len(buf) - offset # return list of samples and raw digital outputs # handle those lists in PPK2 API wrapper - return samples, raw_digital_output + return samples, raw_digital_output class PPK_Fetch(threading.Thread): - ''' + """ Background process for polling the data in multi-threaded variant - ''' + """ + def __init__(self, ppk2, quit_evt, buffer_len_s=10, buffer_chunk_s=0.5): super().__init__() self._ppk2 = ppk2 @@ -411,8 +439,10 @@ def __init__(self, ppk2, quit_evt, buffer_len_s=10, buffer_chunk_s=0.5): self._stats = (None, None) self._last_timestamp = 0 - self._buffer_max_len = int(buffer_len_s * 100000 * 4) # 100k 4-byte samples per second - self._buffer_chunk = int(buffer_chunk_s * 100000 * 4) # put in the queue in chunks of 0.5s + # 100k 4-byte samples per second + self._buffer_max_len = int(buffer_len_s * 100000 * 4) + # put in the queue in chunks of 0.5s + self._buffer_chunk = int(buffer_chunk_s * 100000 * 4) # round buffers to a whole sample if self._buffer_max_len % 4 != 0: @@ -425,17 +455,19 @@ def __init__(self, ppk2, quit_evt, buffer_len_s=10, buffer_chunk_s=0.5): def run(self): s = 0 t = time.time() - local_buffer = b'' + local_buffer = b"" while not self._quit.is_set(): d = PPK2_API.get_data(self._ppk2) tm_now = time.time() local_buffer += d while len(local_buffer) >= self._buffer_chunk: # FIXME: check if lock might be needed when discarding old data - self._buffer_q.put(local_buffer[:self._buffer_chunk]) - while self._buffer_q.qsize()>self._buffer_max_len/self._buffer_chunk: + self._buffer_q.put(local_buffer[: self._buffer_chunk]) + while ( + self._buffer_q.qsize() > self._buffer_max_len / self._buffer_chunk + ): self._buffer_q.get() - local_buffer = local_buffer[self._buffer_chunk:] + local_buffer = local_buffer[self._buffer_chunk :] self._last_timestamp = tm_now # calculate stats @@ -458,11 +490,12 @@ def run(self): break def get_data(self): - ret = b'' + ret = b"" count = 0 while True: try: - ret += self._buffer_q.get(timeout=0.001) # get_nowait sometimes skips a chunk for some reason + # get_nowait sometimes skips a chunk for some reason + ret += self._buffer_q.get(timeout=0.001) count += 1 except queue.Empty: break @@ -470,17 +503,20 @@ def get_data(self): class PPK2_MP(PPK2_API): - ''' + """ Multiprocessing variant of the object. The interface is the same as for the regular one except it spawns a background process on start_measuring() - ''' - def __init__(self, port, buffer_max_size_seconds=10, buffer_chunk_seconds=0.1, **kwargs): - ''' + """ + + def __init__( + self, port, buffer_max_size_seconds=10, buffer_chunk_seconds=0.1, **kwargs + ): + """ port - port where PPK2 is connected buffer_max_size_seconds - how many seconds of data to keep in the buffer buffer_chunk_seconds - how many seconds of data to put in the queue at once **kwargs - keyword arguments to pass to the pySerial constructor - ''' + """ super().__init__(port, **kwargs) self._fetcher = None @@ -502,27 +538,32 @@ def __del__(self): def start_measuring(self): # discard the data in the buffer self.stop_measuring() - while self.get_data()!=b'': + while self.get_data() != b"": pass PPK2_API.start_measuring(self) self._quit_evt.clear() if self._fetcher is not None: return - - self._fetcher = PPK_Fetch(self, self._quit_evt, self._buffer_max_size_seconds, self._buffer_chunk_seconds) + + self._fetcher = PPK_Fetch( + self, + self._quit_evt, + self._buffer_max_size_seconds, + self._buffer_chunk_seconds, + ) self._fetcher.start() def stop_measuring(self): PPK2_API.stop_measuring(self) - self.get_data() # flush the serial buffer (to prevent unicode error on next command) + self.get_data() # flush the serial buffer (to prevent unicode error on next command) self._quit_evt.set() if self._fetcher is not None: - self._fetcher.join() # join() will block if the queue isn't empty + self._fetcher.join() # join() will block if the queue isn't empty self._fetcher = None def get_data(self): try: return self._fetcher.get_data() except (TypeError, AttributeError): - return b'' + return b""