diff --git a/src/data_write.py b/src/data_write.py index 005af0fb..1f23b705 100644 --- a/src/data_write.py +++ b/src/data_write.py @@ -349,7 +349,7 @@ def output_data( log.info( "wrote record", write_time=write_time * 1e3, - time_units="ms", + time_unit="ms", dataset_name=self.timestamp, ) @@ -670,7 +670,7 @@ def main(): expected_type=int, ) log.debug( - "Received CFS sequence, increasing expected_sqn_num", + "received CFS sequence, increasing expected_sqn_num", cfs_sqn_num=cfs_sqn_num, ) cfs_nums.append(cfs_sqn_num) @@ -750,7 +750,7 @@ def main(): log.info( f"parsed sequence {pd.sequence_num}", parse_time=parse_time * 1e3, - time_units="ms", + time_unit="ms", slice_ids=[dset.slice_id for dset in pd.output_datasets], ) diff --git a/src/experiment_prototype/experiment_slice.py b/src/experiment_prototype/experiment_slice.py index 2044b433..e65be24d 100644 --- a/src/experiment_prototype/experiment_slice.py +++ b/src/experiment_prototype/experiment_slice.py @@ -624,15 +624,12 @@ def check_rx_antenna_pattern(cls, rx_antenna_pattern, values): rx_antenna_pattern( values["beam_angle"], values["freq"], - options.main_antenna_count, - options.main_antenna_spacing, + options.main_antenna_locations, ), rx_antenna_pattern( values["beam_angle"], values["freq"], - options.intf_antenna_count, - options.intf_antenna_spacing, - offset=-100, + options.intf_antenna_locations, ), ] for index in range(0, len(antenna_pattern)): diff --git a/src/experiment_prototype/interface_classes/averaging_periods.py b/src/experiment_prototype/interface_classes/averaging_periods.py index 15403d2e..ea158459 100644 --- a/src/experiment_prototype/interface_classes/averaging_periods.py +++ b/src/experiment_prototype/interface_classes/averaging_periods.py @@ -11,6 +11,8 @@ :author: Marci Detwiller """ +import datetime + # built-in import inspect from pathlib import Path @@ -50,7 +52,7 @@ class AveragingPeriod(InterfaceClassBase): """ Set up the AveragingPeriods. - An averagingperiod contains sequences and integrates one or multiple pulse sequences together in + An averaging period contains sequences and integrates one or multiple pulse sequences together in a given time frame or in a given number of averages, if that is the preferred limiter. **The unique members of the averagingperiod are (not a member of the interfaceclassbase):** @@ -61,10 +63,6 @@ class AveragingPeriod(InterfaceClassBase): slice_to_beamdir passed in by the scan that this AveragingPeriod instance is contained in. A dictionary of slice: beamdir(s) for all slices contained in this aveperiod. - cfs_flag - Boolean, True if clrfrqsearch should be performed. - cfs_range - The range of frequency to search if cfs_flag is True. Otherwise empty. intt The priority limitation. The time limit (ms) at which time the aveperiod will end. If None, we will use intn to end the aveperiod (a number of sequences). @@ -98,29 +96,6 @@ def __init__( self.slice_to_beamorder = slice_to_beamorder_dict self.slice_to_beamdir = slice_to_beamdir_dict - # Metadata for an AveragingPeriod: clear frequency search, integration time, number of averages goal - self.cfs_flag = False - self.cfs_always_run = False - self.cfs_sequence = None - self.cfs_slice_ids = [] - self.cfs_scan_order = [] - self.cfs_stable_time = 0 - self.cfs_pwr_threshold = 0 - self.cfs_fft_n = 0 - # there may be multiple slices in this averaging period at different frequencies so - # we may have to search multiple ranges. - self.cfs_range = [] - for slice_id in self.slice_ids: - if self.slice_dict[slice_id].cfs_flag: - self.cfs_stable_time = self.slice_dict[slice_id].cfs_stable_time - self.cfs_pwr_threshold = self.slice_dict[slice_id].cfs_pwr_threshold - self.cfs_fft_n = self.slice_dict[slice_id].cfs_fft_n - self.cfs_flag = True - self.cfs_slice_ids.append(slice_id) - self.cfs_range.append(self.slice_dict[slice_id].cfs_range) - if self.slice_dict[slice_id].cfs_always_run: - self.cfs_always_run = True - self.intt = self.slice_dict[self.slice_ids[0]].intt self.intn = self.slice_dict[self.slice_ids[0]].intn self.txctrfreq = self.slice_dict[self.slice_ids[0]].txctrfreq @@ -166,43 +141,16 @@ def __init__( " interfaced and do not have the same rxctrfreq" ) raise ExperimentException(errmsg) - for slice_id in self.cfs_slice_ids: - if self.slice_dict[slice_id].cfs_pwr_threshold != self.cfs_pwr_threshold: - errmsg = ( - f"Slices {self.slice_ids[0]} and {slice_id} are SEQUENCE or CONCURRENT" - " interfaced and do not have the same cfs_power_threshold" - ) - raise ExperimentException(errmsg) - if self.slice_dict[slice_id].cfs_fft_n != self.cfs_fft_n: - errmsg = ( - f"Slices {self.slice_ids[0]} and {slice_id} are SEQUENCE or CONCURRENT" - " interfaced and do not have the same cfs_fft_n" - ) - raise ExperimentException(errmsg) - if self.slice_dict[slice_id].cfs_stable_time != self.cfs_stable_time: - errmsg = ( - f"Slices {self.slice_ids[0]} and {slice_id} are SEQUENCE or CONCURRENT" - " interfaced and do not have the same cfs_stable_time" - ) - raise ExperimentException(errmsg) self.num_beams_in_scan = len(self.slice_dict[self.slice_ids[0]].rx_beam_order) # NOTE: Do not need beam information inside the AveragingPeriod, this is in Scan. - if self.cfs_flag: - self.build_cfs_sequence() - # Determine how this averaging period is made by separating out the SEQUENCE interfaced. self.nested_slice_list = self.get_nested_slice_ids() self.sequences = [] - - self.cfs_sequences = [] for params in self.prep_for_nested_interface_class(): - new_sequence = Sequence(*params) - if new_sequence.cfs_flag: - self.cfs_sequences.append(new_sequence) - self.sequences.append(new_sequence) + self.sequences.append(Sequence(*params)) self.one_pulse_only = False @@ -242,7 +190,111 @@ def set_beamdirdict(self, beamiter): return slice_to_beamdir_dict - def select_cfs_freqs(self, cfs_spectrum): + +class CFSAveragingPeriod(AveragingPeriod): + """ + A variation of AveragingPeriod that conducts a clear frequency search to determine the frequency to use + for some or all of the slices within the averaging period. + """ + + def __init__( + self, + ave_keys, + ave_slice_dict, + ave_interface, + transmit_metadata, + slice_to_beamorder_dict, + slice_to_beamdir_dict, + ): + super().__init__( + ave_keys, + ave_slice_dict, + ave_interface, + transmit_metadata, + slice_to_beamorder_dict, + slice_to_beamdir_dict, + ) + + # Metadata for an AveragingPeriod: clear frequency search, integration time, number of averages goal + self.cfs_always_run = False + self.cfs_sequence = None + self.cfs_slice_ids = [] + self.cfs_scan_order = [] + self.cfs_stable_time = 0 + self.cfs_pwr_threshold = 0 + self.cfs_fft_n = 0 + + # {slice_id : np.ndarray of shape [num_freqs]} + self.cfs_freq = dict() + + # {slice_id : [np.ndarray of shape [num_freqs]] * num_beams} + self.cfs_mags = dict() + + # {slice_id : list of [lower_freq, upper_freq]} + self.cfs_range = dict() + + # {slice_id : [np.ndarray of shape [num_freqs]] * num_beams} + self.cfs_masks = dict() + + # [datetime] * num_beams + self.last_cfs_set_time = list() + + # {slice_id : [float] * num_beams} + self.beam_frequency = dict() + + # {slice_id : [bool] * num_beams} + self.set_new_freq = dict() + + # there may be multiple slices in this averaging period at different frequencies so + # we may have to search multiple ranges. + for slice_id in self.slice_ids: + if self.slice_dict[slice_id].cfs_flag: + self.cfs_stable_time = self.slice_dict[slice_id].cfs_stable_time + self.cfs_pwr_threshold = self.slice_dict[slice_id].cfs_pwr_threshold + self.cfs_fft_n = self.slice_dict[slice_id].cfs_fft_n + self.cfs_flag = True + self.cfs_slice_ids.append(slice_id) + self.cfs_freq[slice_id] = None + self.cfs_mags[slice_id] = [None] * self.num_beams_in_scan + self.cfs_masks[slice_id] = [None] * self.num_beams_in_scan + self.beam_frequency[slice_id] = [None] * self.num_beams_in_scan + self.cfs_range[slice_id] = self.slice_dict[slice_id].cfs_range + self.set_new_freq[slice_id] = [True] * self.num_beams_in_scan + if self.slice_dict[slice_id].cfs_always_run: + self.cfs_always_run = True + + for slice_id in self.cfs_slice_ids: + if self.slice_dict[slice_id].cfs_pwr_threshold != self.cfs_pwr_threshold: + errmsg = ( + f"Slices {self.slice_ids[0]} and {slice_id} are SEQUENCE or CONCURRENT" + " interfaced and do not have the same cfs_power_threshold" + ) + raise ExperimentException(errmsg) + if self.slice_dict[slice_id].cfs_fft_n != self.cfs_fft_n: + errmsg = ( + f"Slices {self.slice_ids[0]} and {slice_id} are SEQUENCE or CONCURRENT" + " interfaced and do not have the same cfs_fft_n" + ) + raise ExperimentException(errmsg) + if self.slice_dict[slice_id].cfs_stable_time != self.cfs_stable_time: + errmsg = ( + f"Slices {self.slice_ids[0]} and {slice_id} are SEQUENCE or CONCURRENT" + " interfaced and do not have the same cfs_stable_time" + ) + raise ExperimentException(errmsg) + + # Set to a time in the past that is guaranteed to trigger a clear frequency search on the + # first averaging period run + self.last_cfs_set_time = [ + datetime.datetime.utcnow() + - datetime.timedelta(seconds=self.cfs_stable_time) + ] * len(self.slice_dict[self.slice_ids[0]].rx_beam_order) + + self.build_cfs_sequence() + + self.cfs_sequences = [sqn for sqn in self.sequences if sqn.cfs_flag] + + def select_cfs_freqs(self, cfs_packet): """ Accepts the analysis results of the clear frequency search and uses the passed frequencies and powers to determine what frequencies to set each clear frequency search slice to. @@ -263,11 +315,11 @@ def select_cfs_freqs(self, cfs_spectrum): * Builds each CFS sequence * Return the frequency masks - :param cfs_spectrum: Analyzed CFS sequence data - :type cfs_spectrum: ProcessedSequenceMessage + :param cfs_packet: Analyzed CFS sequence data + :type cfs_packet: ProcessedSequenceMessage """ - cfs_freq_hz = np.array(cfs_spectrum.cfs_freq) # at baseband - cfs_data = [dset.cfs_data for dset in cfs_spectrum.output_datasets] + cfs_freq_hz = np.array(cfs_packet.cfs_freq) # at baseband + cfs_data = [dset.cfs_data for dset in cfs_packet.output_datasets] # Sort measured frequencies based on measured power at each freq slice_masks = dict() slice_used_freqs = dict() @@ -356,6 +408,7 @@ def select_cfs_freqs(self, cfs_spectrum): slice_masks[slice_id] = mask ind = np.argmin(cfs_data[i][mask]) cfs_set_freq[slice_id] = int(np.round(shifted_cfs_khz[ind])) + self.beam_frequency[slice_id][self.beam_iter] = cfs_set_freq[slice_id] for sqn in self.cfs_sequences: if slice_id in sqn.slice_ids: @@ -370,14 +423,12 @@ def select_cfs_freqs(self, cfs_spectrum): ) # Set cfs slice frequency and add frequency to used_freqs for all other concurrent slices - self.update_cfs_freqs(cfs_set_freq) - return slice_masks, cfs_set_freq - def update_cfs_freqs(self, cfs_set_freq): + def update_cfs_freqs(self): for i, slice_id in enumerate(self.cfs_slice_ids): slice_obj = self.slice_dict[slice_id] - slice_obj.freq = cfs_set_freq[slice_id] + slice_obj.freq = self.beam_frequency[slice_id][self.beam_iter] log.verbose( "selecting cfs slice freq", slice_id=slice_obj.slice_id, @@ -386,6 +437,39 @@ def update_cfs_freqs(self, cfs_set_freq): for sequence in self.cfs_sequences: sequence.build_sequence_pulses() + def check_update_freq(self, cfs_spectrum, cfs_slices, threshold, beam_iter): + """ + Checks if any scanned frequencies have power levels that + exceed the current power of each cfs slice based on a threshold + + :params cfs_packet: Results of the CFS analysis + :type cfs_spectrum: OutputDataset dataclass from message_formats.py + :params cfs_slices: Slice ids of each cfs slice to be checked + :type cfs_slices: list + :params threshold: Power threshold (dB) used in check + :type threshold: float + :params beam_iter: current beam index + :type beam_iter: int + """ + cfs_data = [dset.cfs_data for dset in cfs_spectrum.output_datasets] + for i, slice_id in enumerate(cfs_slices): + # Shift the current frequency down to baseband and then use the + # result to determine the index in the measured frequency + # spectrum that the current frequency is from + shifted_frequency = self.beam_frequency[slice_id][beam_iter] - int( + sum(self.cfs_range[slice_id]) / 2 + ) + idx = ( + np.abs(np.asarray(self.cfs_freq) - shifted_frequency * 1000) + ).argmin() + + # calculate the ratio of the current freq power over all other freqs + pwr_ratio = cfs_data[i][idx] - np.asarray( + cfs_data[i][self.cfs_masks[slice_id][beam_iter]] + ) + if any(pwr_ratio > threshold): + self.set_new_freq[slice_id][beam_iter] = True + def build_cfs_sequence(self): """ Builds an empty rx only pulse sequence to collect clear frequency search data diff --git a/src/experiment_prototype/interface_classes/interface_class_base.py b/src/experiment_prototype/interface_classes/interface_class_base.py index d82cc131..079feeb9 100644 --- a/src/experiment_prototype/interface_classes/interface_class_base.py +++ b/src/experiment_prototype/interface_classes/interface_class_base.py @@ -238,6 +238,7 @@ def get_nested_slice_ids(self): "AveragingPeriod": [ "CONCURRENT" ], # Combine everything CONCURRENT interfaced + "CFSAveragingPeriod": ["CONCURRENT"], # Same as AveragingPeriod "Sequence": [], # All slices in a Sequence are already CONCURRENT and should be combined already } diff --git a/src/experiment_prototype/interface_classes/scans.py b/src/experiment_prototype/interface_classes/scans.py index 32fd7fb3..ff0ad37d 100644 --- a/src/experiment_prototype/interface_classes/scans.py +++ b/src/experiment_prototype/interface_classes/scans.py @@ -21,7 +21,10 @@ import structlog # local -from experiment_prototype.interface_classes.averaging_periods import AveragingPeriod +from experiment_prototype.interface_classes.averaging_periods import ( + AveragingPeriod, + CFSAveragingPeriod, +) from experiment_prototype.interface_classes.interface_class_base import ( InterfaceClassBase, ) @@ -76,7 +79,10 @@ def __init__(self, scan_keys, scan_slice_dict, scan_interface, transmit_metadata self.nested_slice_list = self.get_nested_slice_ids() for params in self.prep_for_nested_interface_class(): - self.aveperiods.append(AveragingPeriod(*params)) + if any([s.cfs_flag for s in params[1].values()]): + self.aveperiods.append(CFSAveragingPeriod(*params)) + else: + self.aveperiods.append(AveragingPeriod(*params)) # determine how many beams in scan: num_unique_aveperiods = 0 diff --git a/src/experiment_prototype/interface_classes/sequences.py b/src/experiment_prototype/interface_classes/sequences.py index 7645d845..6cb70e55 100644 --- a/src/experiment_prototype/interface_classes/sequences.py +++ b/src/experiment_prototype/interface_classes/sequences.py @@ -171,11 +171,6 @@ def build_sequence_pulses(self): txrate = self.transmit_metadata["txrate"] main_antenna_locations = self.transmit_metadata["main_antenna_locations"] intf_antenna_locations = self.transmit_metadata["intf_antenna_locations"] - main_antenna_count = self.transmit_metadata["main_antenna_count"] - main_antenna_spacing = self.transmit_metadata["main_antenna_spacing"] - intf_antenna_count = self.transmit_metadata["intf_antenna_count"] - intf_antenna_spacing = self.transmit_metadata["intf_antenna_spacing"] - max_usrp_dac_amplitude = self.transmit_metadata["max_usrp_dac_amplitude"] tr_window_time = self.transmit_metadata["tr_window_time"] intf_offset = self.transmit_metadata["intf_offset"] @@ -194,14 +189,12 @@ def build_sequence_pulses(self): rx_main_phase_shift = exp_slice.rx_antenna_pattern( exp_slice.beam_angle, freq_khz, - main_antenna_count, - main_antenna_spacing, + main_antenna_locations[self.rx_main_antennas], ) rx_intf_phase_shift = exp_slice.rx_antenna_pattern( exp_slice.beam_angle, freq_khz, - intf_antenna_count, - intf_antenna_spacing, + intf_antenna_locations[self.rx_intf_antennas], intf_offset, ) else: diff --git a/src/radar_control.py b/src/radar_control.py index eefb8089..b3021c5a 100644 --- a/src/radar_control.py +++ b/src/radar_control.py @@ -22,6 +22,7 @@ from functools import reduce from dataclasses import dataclass, field +from experiment_prototype.interface_classes.averaging_periods import CFSAveragingPeriod from utils.options import Options import utils.message_formats as messages from utils import socket_operations as so @@ -41,7 +42,6 @@ class CFSParameters: Parameters used to track clear frequency search data, each use of this class should be linked to a unique aveperiod. - :param total_beam_num: number of beams that will be scanned by slice :param cfs_freq: list of frequencies sampled by CFS :param cfs_mags: power measurements corresponding to cfs_freq, indexed by beam iterator then by slice order in an aveperiod @@ -56,54 +56,13 @@ class CFSParameters: slices on a beam. Indexed by beam iterator """ - total_beam_num: int = 0 - cfs_freq: list = field(default_factory=list) - cfs_mags: dict = field(default_factory=dict) - cfs_range: dict = field(default_factory=dict) - cfs_masks: dict = field(default_factory=dict) - last_cfs_set_time: dict = field(default_factory=dict) - beam_frequency: dict = field(default_factory=dict) - set_new_freq: dict = field(default_factory=dict) - - def __post_init__(self): - for beam_iter in range(self.total_beam_num): - self.last_cfs_set_time[beam_iter] = 0 - self.set_new_freq[beam_iter] = True - self.cfs_mags[beam_iter] = dict() - self.cfs_masks[beam_iter] = dict() - - def check_update_freq(self, cfs_spectrum, cfs_slices, threshold, beam_iter): - """ - Checks if any scanned frequencies have power levels that - exceed the current power of each cfs slice based on a threshold - - :params cfs_spectrum: Results of the CFS analysis - :type cfs_spectrum: OutputDataset dataclass from message_formats.py - :params cfs_slices: Slice ids of each cfs slice to be checked - :type cfs_slices: list - :params threshold: Power threshold (dB) used in check - :type threshold: float - :params beam_iter: current beam index - :type beam_iter: int - """ - cfs_data = [dset.cfs_data for dset in cfs_spectrum.output_datasets] - for i, slice_id in enumerate(cfs_slices): - # Shift the current frequency down to baseband and then use the - # result to determine the index in the measured frequency - # spectrum that the current frequency is from - shifted_frequency = self.beam_frequency[beam_iter][slice_id] - int( - sum(self.cfs_range[slice_id]) / 2 - ) - idx = ( - np.abs(np.asarray(self.cfs_freq) - shifted_frequency * 1000) - ).argmin() - - # calculate the ratio of the current freq power over all other freqs - pwr_ratio = cfs_data[i][idx] - np.asarray( - cfs_data[i][self.cfs_masks[beam_iter][slice_id]] - ) - if any(pwr_ratio > threshold): - self.set_new_freq[beam_iter] = True + cfs_freq: list + cfs_mags: dict + cfs_range: dict + cfs_masks: dict + last_cfs_set_time: dict + beam_frequency: dict + set_new_freq: dict @dataclass @@ -131,7 +90,6 @@ class RadctrlParameters: pulse_transmit_data_tracker: dict = field(default_factory=dict) slice_dict: dict = field(default_factory=dict, init=False) cfs_scan_flag: bool = False - cfs_params: ... = field(default_factory=CFSParameters, init=False) scan_flag: bool = False dsp_cfs_identity: str = "" router_address: str = "" @@ -141,7 +99,6 @@ class RadctrlParameters: def __post_init__(self): self.slice_dict = self.experiment.slice_dict - self.cfs_params = CFSParameters(self.num_beams) # Set slice_dict after an experiment has been assigned if self.sequence: self.decimation_scheme = self.sequence.decimation_scheme @@ -368,11 +325,7 @@ def create_dsp_message(radctrl_params): beam_num = slice_dict[slice_id].tx_beam_order[ radctrl_params.aveperiod.beam_iter ] - scan_id = int( - np.argwhere( - np.array(radctrl_params.aveperiod.cfs_scan_order) == slice_id - ) - ) + scan_id = radctrl_params.aveperiod.cfs_scan_order.index(slice_id) beam_dict[scan_id] = { "main": np.array([tx_phases[beam_num]]), "intf": np.array([intf_phases]), @@ -482,8 +435,8 @@ def make_next_samples(radctrl_params): new_sequence_time = datetime.utcnow() - radctrl_params.start_time log.verbose( "make new sequence time", - new_sequence_time=new_sequence_time, - new_sequence_time_units="s", + time=new_sequence_time, + time_unit="s", ) @@ -544,10 +497,21 @@ def create_dw_message(radctrl_params): message.experiment_name = radctrl_params.experiment.experiment_name message.experiment_comment = radctrl_params.experiment.comment_string message.rx_ctr_freq = radctrl_params.experiment.slice_dict[0].rxctrfreq - if radctrl_params.aveperiod.cfs_flag: + if isinstance(radctrl_params.aveperiod, CFSAveragingPeriod): message.num_sequences = ( radctrl_params.num_sequences - 1 ) # first sequence was CFS + message.cfs_freqs = radctrl_params.aveperiod.cfs_freq + message.cfs_noise = [ + x[radctrl_params.aveperiod.beam_iter] + for x in radctrl_params.aveperiod.cfs_mags.values() + ] + message.cfs_range = radctrl_params.aveperiod.cfs_range + message.cfs_masks = [ + x[radctrl_params.aveperiod.beam_iter] + for x in radctrl_params.aveperiod.cfs_masks.values() + ] + message.cfs_slice_ids = radctrl_params.aveperiod.cfs_slice_ids else: message.num_sequences = radctrl_params.num_sequences message.last_sqn_num = radctrl_params.last_sequence_num @@ -558,15 +522,6 @@ def create_dw_message(radctrl_params): lambda x, y: x * y, radctrl_params.decimation_scheme.filter_scaling_factors ) # multiply all message.scheduling_mode = radctrl_params.experiment.scheduling_mode - message.cfs_freqs = radctrl_params.cfs_params.cfs_freq - message.cfs_noise = radctrl_params.cfs_params.cfs_mags[ - radctrl_params.aveperiod.beam_iter - ] - message.cfs_range = radctrl_params.cfs_params.cfs_range - message.cfs_masks = radctrl_params.cfs_params.cfs_masks[ - radctrl_params.aveperiod.beam_iter - ] - message.cfs_slice_ids = radctrl_params.aveperiod.cfs_slice_ids for sequence_index, sequence in enumerate(radctrl_params.aveperiod.sequences): sequence_add = messages.Sequence() @@ -680,8 +635,9 @@ def run_cfs_scan(radctrl_params, sockets): aveperiod = radctrl_params.aveperiod sequence = aveperiod.cfs_sequence - sqn, dbg = sequence.make_sequence(aveperiod.beam_iter, 0) + sqn = sequence.make_sequence(aveperiod.beam_iter, 0) + # todo: modify the sqn_num to avoid dimension mismatch? cfs_sqn_num = radctrl_params.seqnum_start + radctrl_params.num_sequences so.send_pyobj(cfs_socket, radctrl_params.dw_cfs_identity, cfs_sqn_num) @@ -715,66 +671,58 @@ def run_cfs_scan(radctrl_params, sockets): return freq_data -def cfs_block(ave_params, cfs_params_dict, cfs_sockets): +def cfs_block(ave_params, cfs_sockets): aveperiod = ave_params.aveperiod - cfs_params = cfs_params_dict[aveperiod] beam = aveperiod.beam_iter if not ( - cfs_params.last_cfs_set_time[beam] < time.time() - aveperiod.cfs_stable_time + aveperiod.last_cfs_set_time[beam] + < datetime.utcnow() - timedelta(seconds=aveperiod.cfs_stable_time) or aveperiod.cfs_always_run ): - ave_params.cfs_params = cfs_params - return False + return + + aveperiod.last_cfs_set_time[beam] = datetime.utcnow() # Only let CFS run after the user set stable time has - # passed to prevent CFS from switching freqs to quickly + # passed to prevent CFS from switching freqs too quickly ave_params.sequence = aveperiod.cfs_sequence ave_params.decimation_scheme = aveperiod.cfs_sequence.decimation_scheme - freq_spectrum = run_cfs_scan(ave_params, cfs_sockets) + processed_cfs_packet = run_cfs_scan(ave_params, cfs_sockets) ave_params.num_sequences += 1 ave_params.cfs_scan_flag = False - cfs_params.cfs_freq = freq_spectrum.cfs_freq + aveperiod.cfs_freq = processed_cfs_packet.cfs_freq - for ind, dset in enumerate(freq_spectrum.output_datasets): - cfs_params.cfs_mags[beam][aveperiod.cfs_slice_ids[ind]] = dset.cfs_data + for ind, dset in enumerate(processed_cfs_packet.output_datasets): + aveperiod.cfs_mags[aveperiod.cfs_slice_ids[ind]][beam] = dset.cfs_data - if not ( - cfs_params.last_cfs_set_time[beam] < time.time() - aveperiod.cfs_stable_time + if ( + not any([x[beam] for x in aveperiod.set_new_freq.values()]) + and aveperiod.cfs_pwr_threshold is not None ): - ave_params.cfs_params = cfs_params - return False - - # Only attempt to update the cfs frequency of stable time has elapsed - cfs_params.last_cfs_set_time[beam] = time.time() - - if not cfs_params.set_new_freq[beam] and aveperiod.cfs_pwr_threshold is not None: - # If using a user set power threshold to trigger CFS freq setting, check - # if any power related condition are triggered and set the corresponding - # flag - cfs_params.check_update_freq( - freq_spectrum, + # If using a user set power threshold to trigger CFS freq setting, check if any + # power related conditions are triggered and set the corresponding flag + aveperiod.check_update_freq( + processed_cfs_packet, aveperiod.cfs_slice_ids, aveperiod.cfs_pwr_threshold, beam, ) - if not (cfs_params.set_new_freq[beam] or aveperiod.cfs_pwr_threshold is None): + if not ( + any([x[beam] for x in aveperiod.set_new_freq.values()]) + or aveperiod.cfs_pwr_threshold is None + ): # Return if the frequency does not need to be changed. - ave_params.cfs_params = cfs_params - return False + return # If using a power threshold and one of the power conditions were - # triggerd, or if not using a power threshold, set the CFS params - cfs_params.set_new_freq[beam] = False - cfs_params.cfs_masks[beam], last_set_cfs = aveperiod.select_cfs_freqs(freq_spectrum) - cfs_params.beam_frequency[beam] = last_set_cfs - - for ind in range(len(aveperiod.cfs_slice_ids)): - cfs_params.cfs_range[aveperiod.cfs_slice_ids[ind]] = aveperiod.cfs_range[ind] - - ave_params.cfs_params = cfs_params - return True + # triggered, or if not using a power threshold, set the CFS params + slice_masks, last_set_cfs = aveperiod.select_cfs_freqs(processed_cfs_packet) + for slice_id in aveperiod.cfs_slice_ids: + aveperiod.set_new_freq[slice_id][beam] = False + aveperiod.cfs_masks[slice_id][beam] = slice_masks[slice_id] + aveperiod.beam_frequency[slice_id][beam] = last_set_cfs[slice_id] def main(): @@ -894,7 +842,6 @@ def main(): first_aveperiod = True next_scan_start = None - cfs_params_dict = dict() while True: # This loops through all scans in an experiment, or restarts this loop if a new experiment occurs. # TODO : further documentation throughout in comments (high level) and in separate documentation. @@ -1002,10 +949,6 @@ def main(): ): # If there are multiple aveperiods in a scan they are alternated (AVEPERIOD interfaced) aveperiod = scan.aveperiods[scan.aveperiod_iter] - if aveperiod not in cfs_params_dict.keys() and aveperiod.cfs_flag: - cfs_params_dict[aveperiod] = CFSParameters( - aveperiod.num_beams_in_scan - ) if TIME_PROFILE: time_start_of_aveperiod = datetime.utcnow() @@ -1029,8 +972,8 @@ def main(): averaging_period_start_time = datetime.utcnow() # ms log.verbose( "averaging period start time", - averaging_period_start_time=averaging_period_start_time, - averaging_period_start_time_units="", + time=averaging_period_start_time, + time_unit="s", ) if aveperiod.intt is not None: intt_break = True @@ -1047,16 +990,16 @@ def main(): if first_aveperiod: log.verbose( "seconds to next avg period", - time_until_avg_period=time_diff.total_seconds(), - time_until_avg_period_units="s", + time=time_diff.total_seconds(), + time_unit="s", scan_iter=scan_iter, beam_scanbound=beam_scanbound, ) else: log.debug( "seconds to next avg period", - time_until_avg_period=time_diff.total_seconds(), - time_until_avg_period_units="s", + time=time_diff.total_seconds(), + time_unit="s", scan_iter=scan_iter, beam_scanbound=beam_scanbound, ) @@ -1075,8 +1018,8 @@ def main(): averaging_period_start_time = datetime.utcnow() log.verbose( "avg period start time", - avg_period_start_time=averaging_period_start_time, - avg_period_start_time_units="s", + time=averaging_period_start_time, + time_unit="s", scan_iter=scan_iter, beam_scanbound=beam_scanbound, ) @@ -1104,8 +1047,8 @@ def main(): log.verbose( "bound time remaining", - bound_time_remaining=bound_time_remaining, - bound_time_remaining_units="s", + time=bound_time_remaining, + time_unit="s", scan_num=scan_num, scan_iter=scan_iter, # scan_iter is averaging period number for some reason beam_scanbound=beam_scanbound, @@ -1144,8 +1087,8 @@ def main(): aveperiod_prep_time = datetime.utcnow() - time_start_of_aveperiod log.verbose( "time to prep aveperiod", - aveperiod_prep_time=aveperiod_prep_time, - aveperiod_prep_time_units="", + time=aveperiod_prep_time, + time_unit="", ) # Time to start averaging in the below loop @@ -1161,29 +1104,19 @@ def main(): time_remains = True while time_remains: - if ave_params.num_sequences == 0 and aveperiod.cfs_flag: + if ave_params.num_sequences == 0 and isinstance( + aveperiod, CFSAveragingPeriod + ): cfs_time = time.time() - updated = cfs_block( + cfs_block( ave_params, - cfs_params_dict, [ radctrl_inproc_socket, radctrl_brian_socket, driver_comms_socket, ], ) - if not updated: - # Need to update current beam frequency to pre-determined value - beam = aveperiod.beam_iter - cfs_set_freqs = dict() - for slice_id in cfs_params_dict[aveperiod].beam_frequency[ - beam - ]: - cfs_set_freqs[slice_id] = cfs_params_dict[ - aveperiod - ].beam_frequency[beam][slice_id] - aveperiod.update_cfs_freqs(cfs_set_freqs) - + aveperiod.update_cfs_freqs() # always update, to use correct freq for beam log.verbose("CFS block run time", time=time.time() - cfs_time) for sequence_index, sequence in enumerate(aveperiod.sequences): @@ -1272,8 +1205,8 @@ def main(): avg_period_end_time = datetime.utcnow() log.verbose( "avg period end time", - avg_period_end_time=avg_period_end_time, - avg_period_end_time_units="s", + time=avg_period_end_time, + time_unit="s", ) log.info( @@ -1292,7 +1225,6 @@ def main(): ave_params.last_sequence_num = ( ave_params.seqnum_start + ave_params.num_sequences - 1 ) - dw_message = create_dw_message(ave_params) dw_comms_socket.send_pyobj(dw_message) # Send metadata to dw_comms_thread, so it can package into a message for data write @@ -1306,8 +1238,8 @@ def main(): time_to_finish_aveperiod = datetime.utcnow() - avg_period_end_time log.verbose( "time to finish avg period", - avg_period_elapsed_time=time_to_finish_aveperiod, - avg_period_elapsed_time_units="s", + time=time_to_finish_aveperiod, + time_unit="s", ) aveperiod.beam_iter += 1 diff --git a/src/rx_signal_processing.py b/src/rx_signal_processing.py index 9c829912..752e8877 100644 --- a/src/rx_signal_processing.py +++ b/src/rx_signal_processing.py @@ -174,7 +174,7 @@ def sequence_worker(options, ringbuffer): processed_socket = sequence_worker_sockets[2] # Generate a timer dict for a uniform log - log_dict = {"time_units": "ms"} + log_dict = {"time_unit": "ms"} start_timer = time.perf_counter() # Copy samples from ring buffer @@ -330,7 +330,7 @@ def sequence_worker(options, ringbuffer): ) # Generate a new timer dict for a uniform log - log_dict = {"time_units": "ms"} + log_dict = {"time_unit": "ms"} start_timer = time.perf_counter() # Extract outputs from processing into groups that will be put into message fields. @@ -487,7 +487,7 @@ def debug_data_in_shm(holder, data_array, array_name): "done with sequence", sequence_num=rx_params.sequence_num, processing_time=total_processing_time, - time_units="ms", + time_unit="ms", slice_ids=[d["slice_id"] for d in rx_params.slice_details], ) log.verbose("sequence timing", sequence_num=rx_params.sequence_num, **log_dict)