From 0f34c4ff83f2f024f3cef130a7271cedd5ace803 Mon Sep 17 00:00:00 2001 From: Mohamed Sameh Date: Fri, 19 Jan 2024 16:40:38 +0100 Subject: [PATCH] feat: optimized the process of flattening a specLib --- alphabase/peptide/fragment.py | 225 +++++++++++++++++++++++----------- 1 file changed, 156 insertions(+), 69 deletions(-) diff --git a/alphabase/peptide/fragment.py b/alphabase/peptide/fragment.py index cd9a4908..94ee9521 100644 --- a/alphabase/peptide/fragment.py +++ b/alphabase/peptide/fragment.py @@ -504,50 +504,136 @@ def mask_fragments_for_charge_greater_than_precursor_charge( ] = 0 return fragment_df -@nb.njit -def parse_fragment_positions(frag_directions, frag_start_idxes, frag_stop_idxes): - frag_positions = np.zeros_like(frag_directions, dtype=np.uint32) - for frag_start, frag_end in zip(frag_start_idxes, frag_stop_idxes): - frag_positions[frag_start:frag_end] = np.arange(0,frag_end-frag_start).reshape(-1,1) - return frag_positions - -@nb.njit -def parse_fragment_numbers(frag_directions, frag_start_idxes, frag_stop_idxes): - frag_numbers = np.zeros_like(frag_directions, dtype=np.uint32) - for frag_start, frag_end in zip(frag_start_idxes, frag_stop_idxes): - frag_numbers[frag_start:frag_end] = _parse_fragment_number_of_one_peptide( - frag_directions[frag_start:frag_end] - ) - return frag_numbers - -@nb.njit -def _parse_fragment_number_of_one_peptide(frag_directions): - frag_number = np.zeros_like(frag_directions, dtype=np.uint32) - max_index = len(frag_number) - for (i,j), frag_direct in np.ndenumerate(frag_directions): - if frag_direct == 1: - frag_number[i,j] = i+1 - elif frag_direct == -1: - frag_number[i,j] = max_index-i - else: - pass - return frag_number -@nb.njit -def exclude_not_top_k( - fragment_intensities, top_k, - frag_start_idxes, frag_stop_idxes -)->np.ndarray: - excluded = np.zeros_like(fragment_intensities, dtype=np.bool_) - for frag_start, frag_end in zip(frag_start_idxes, frag_stop_idxes): - if top_k >= frag_end-frag_start: continue - idxes = np.argsort(fragment_intensities[frag_start:frag_end]) + +@nb.njit(parallel=True) +def fill_in_indices( + frag_start_idxes:np.ndarray, + frag_stop_idxes: np.ndarray, + indices:np.ndarray, + max_indices:np.ndarray, + excluded_indices:np.ndarray, + top_k: int, + flattened_intensity:np.ndarray, + number_of_fragment_types:int, + max_frag_per_peptide:int = 300) -> None: + """ + Fill in indices, max indices and excluded indices for each peptide. + indices: index of fragment per peptide (from 0 to max_index-1) + max_indices: max index of fragments per peptide (number of fragments per peptide) + excluded_indices: not top k excluded indices per peptide + + Parameters + ---------- + frag_start_idxes : np.ndarray + start indices of fragments for each peptide + frag_stop_idxes : np.ndarray + stop indices of fragments for each peptide + indices : np.ndarray + index of fragment per peptide (from 0 to max_index-1) it will be filled in this function + max_indices : np.ndarray + max index of fragments per peptide (number of fragments per peptide) it will be filled in this function + excluded_indices : np.ndarray + not top k excluded indices per peptide it will be filled in this function + top_k : int + top k highest peaks to keep + flattened_intensity : np.ndarray + Flattened fragment intensities + number_of_fragment_types : int + number of types of fragments (e.g. b,y,b_modloss,y_modloss, ...) equals to the number of columns in fragment mz dataframe + max_frag_per_peptide : int, optional + maximum number of fragments per peptide, Defaults to 300 + + """ + array = np.arange(0,max_frag_per_peptide).reshape(-1,1) + ones = np.ones(max_frag_per_peptide).reshape(-1,1) + length = len(frag_start_idxes) + + for i in nb.prange(length): + frag_start = frag_start_idxes[i] + frag_end = frag_stop_idxes[i] + max_index = frag_end-frag_start + indices[frag_start:frag_end] = array[:max_index] + max_indices[frag_start:frag_end] = ones[:max_index]*max_index + if top_k >= max_index*number_of_fragment_types: continue + idxes = np.argsort(flattened_intensity[frag_start*number_of_fragment_types:frag_end*number_of_fragment_types]) _excl = np.ones_like(idxes, dtype=np.bool_) _excl[idxes[-top_k:]] = False - excluded[frag_start:frag_end] = _excl - return excluded + excluded_indices[frag_start*number_of_fragment_types:frag_end*number_of_fragment_types] = _excl + + + +@nb.vectorize([nb.uint32(nb.int8, nb.uint32, nb.uint32, nb.uint32)],target='parallel') +def calculate_fragment_numbers(frag_direction:np.int8, frag_number:np.uint32, index:np.uint32, max_index:np.uint32): + """ + Calculate fragment numbers for each fragment based on the fragment direction. + + Parameters + ---------- + frag_direction : np.int8 + directions of fragments for each peptide + frag_number : np.uint32 + fragment numbers for each peptide + index : np.uint32 + index of fragment per peptide (from 0 to max_index-1) + max_index : np.uint32 + max index of fragments per peptide (number of fragments per peptide) + """ + if frag_direction == 1: + frag_number = index + 1 + elif frag_direction == -1: + frag_number = max_index - index + return frag_number + +def parse_fragment( + frag_directions:np.ndarray, + frag_start_idxes:np.ndarray, + frag_stop_idxes: np.ndarray, + top_k: int, + intensities:np.ndarray, + number_of_fragment_types:int + ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: + """ + Parse fragments to get fragment numbers, fragment positions and not top k excluded indices in one hit + faster than doing each operation individually, and makes the most of the operations that are done in parallel. + + Parameters + ---------- + frag_directions : np.ndarray + directions of fragments for each peptide + frag_start_idxes : np.ndarray + start indices of fragments for each peptide + frag_stop_idxes : np.ndarray + stop indices of fragments for each peptide + top_k : int + top k highest peaks to keep + intensities : np.ndarray + Flattened fragment intensities + number_of_fragment_types : int + number of types of fragments (e.g. b,y,b_modloss,y_modloss, ...) equals to the number of columns in fragment mz dataframe + + Returns + ------- + Tuple[np.ndarray, np.ndarray, np.ndarray] + Tuple of fragment numbers, fragment positions and not top k excluded indices + + """ + # Allocate memory for fragment numbers, indices, max indices and excluded indices + frag_numbers = np.empty_like(frag_directions, dtype=np.uint32) + indices = np.empty_like(frag_directions, dtype=np.uint32) + max_indices = np.empty_like(frag_directions, dtype=np.uint32) + excluded_indices = np.zeros(frag_directions.shape[0]*frag_directions.shape[1], dtype=np.bool_) + + # Fill in indices, max indices and excluded indices + fill_in_indices(frag_start_idxes, frag_stop_idxes,indices,max_indices, excluded_indices,top_k,intensities, number_of_fragment_types) + + # Calculate fragment numbers + frag_numbers = calculate_fragment_numbers(frag_directions, frag_numbers, indices, max_indices) + return frag_numbers, indices, excluded_indices + + def flatten_fragments( precursor_df: pd.DataFrame, fragment_mz_df: pd.DataFrame, @@ -621,11 +707,10 @@ def flatten_fragments( - charge: int8, fragment charge - loss_type: int16, fragment loss type, 0=noloss, 17=NH3, 18=H2O, 98=H3PO4 (phos), ... """ - if len(precursor_df) == 0: return precursor_df, pd.DataFrame() # new dataframes for fragments and precursors are created - frag_df = pd.DataFrame() + frag_df = {} frag_df['mz'] = fragment_mz_df.values.reshape(-1) if len(fragment_intensity_df) > 0: frag_df['intensity'] = fragment_intensity_df.values.astype( @@ -634,12 +719,11 @@ def flatten_fragments( use_intensity = True else: use_intensity = False - # add additional columns to the fragment dataframe # each column in the flat fragment dataframe is a whole pandas dataframe in the dense representation for col_name, df in custom_df.items(): frag_df[col_name] = df.values.reshape(-1) - + frag_types = [] frag_loss_types = [] frag_charges = [] @@ -658,60 +742,63 @@ def flatten_fragments( frag_loss_types.append(18) else: frag_loss_types.append(98) - if _types[0] in 'abc': frag_directions.append(1) elif _types[0] in 'xyz': frag_directions.append(-1) else: frag_directions.append(0) - + if 'type' in custom_columns: - frag_df['type'] = np.array(frag_types*len(fragment_mz_df), dtype=np.int8) + frag_df['type'] = np.array(np.tile(frag_types, len(fragment_mz_df)), dtype=np.int8) if 'loss_type' in custom_columns: - frag_df['loss_type'] = np.array(frag_loss_types*len(fragment_mz_df), dtype=np.int16) + frag_df['loss_type'] = np.array(np.tile(frag_loss_types, len(fragment_mz_df)), dtype=np.int16) if 'charge' in custom_columns: - frag_df['charge'] = np.array(frag_charges*len(fragment_mz_df), dtype=np.int8) + frag_df['charge'] = np.array(np.tile(frag_charges, len(fragment_mz_df)), dtype=np.int8) - frag_directions = np.array([frag_directions]*len(fragment_mz_df), dtype=np.int8) - if 'number' in custom_columns: - frag_df['number'] = parse_fragment_numbers( + frag_directions = np.array(np.tile(frag_directions,(len(fragment_mz_df),1)), dtype=np.int8) + + numbers, positions,excluded_indices = parse_fragment( frag_directions, precursor_df.frag_start_idx.values, - precursor_df.frag_stop_idx.values - ).reshape(-1) + precursor_df.frag_stop_idx.values, + keep_top_k_fragments, + frag_df['intensity'], + len(fragment_mz_df.columns) + ) + + if 'number' in custom_columns: + + frag_df['number'] = numbers.reshape(-1) + if 'position' in custom_columns: - frag_df['position'] = parse_fragment_positions( - frag_directions, - precursor_df.frag_start_idx.values, - precursor_df.frag_stop_idx.values - ).reshape(-1) + frag_df['position'] = positions.reshape(-1) + precursor_df['flat_frag_start_idx'] = precursor_df.frag_start_idx precursor_df['flat_frag_stop_idx'] = precursor_df.frag_stop_idx precursor_df[['flat_frag_start_idx','flat_frag_stop_idx']] *= len(fragment_mz_df.columns) - + if use_intensity: - frag_df.intensity.mask(frag_df.mz == 0.0, 0.0, inplace=True) + frag_df['intensity'][frag_df['mz'] == 0.0] = 0.0 + + excluded = ( - frag_df.mz.values == 0 if not use_intensity else + frag_df['mz'] == 0 if not use_intensity else ( - frag_df.intensity.values < min_fragment_intensity + frag_df['intensity'] < min_fragment_intensity ) | ( - frag_df.mz.values == 0 + frag_df['mz'] == 0 ) | ( - exclude_not_top_k( - frag_df.intensity.values, keep_top_k_fragments, - precursor_df.flat_frag_start_idx.values, - precursor_df.flat_frag_stop_idx.values, + excluded_indices ) ) - ) + + frag_df = pd.DataFrame(frag_df) frag_df = frag_df[~excluded] frag_df = frag_df.reset_index(drop=True) - # cumulative sum counts the number of fragments before the given fragment which were removed. # This sum does not include the fragment at the index position and has therefore len N +1 cum_sum_tresh = np.zeros(shape=len(excluded)+1, dtype=np.int64)