Source code for dimspy.process.peak_filters

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright © 2017-2020 Ralf Weber, Albert Zhou.
# This file is part of DIMSpy.
# DIMSpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# DIMSpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with DIMSpy.  If not, see <>.

import logging
from functools import reduce
from typing import Union, Sequence, Tuple, Any

import numpy as np
from ..models.peak_matrix import PeakMatrix, mask_peakmatrix, unmask_peakmatrix
from ..models.peaklist import PeakList

# peaklist filters
[docs]def filter_attr(pl: PeakList, attr_name: str, max_threshold: Union[int, float, None] = None, min_threshold: [int, float, None] = None, flag_name: Union[str, None] = None, flag_index: Union[int, None] = None): """ Peaklist attribute values filter. :param pl: The target peaklist :param attr_name: Name of the target attribute :param max_threshold: Maximum threshold. A peak will be unflagged if the value of it's attr_name is larger than the threshold. Default = None, indicating no threshold :param min_threshold: Minimum threshold. A peak will be unflagged if the value of it's attr_name is smaller than the threshold. Default = None, indicating no threshold :param flag_name: Name of the new flag attribute. Default = None, indicating using attr_name + '_flag' :param flag_index: Index of the new flag to be inserted into the peaklist. Default = None :rtype: PeakList object This filter accepts real value attributes only. """ if min_threshold is None and max_threshold is None: raise ValueError('must specify minimum or maximum threshold value') flt = lambda x: np.logical_and((min_threshold <= x) if min_threshold is not None else True, (x <= max_threshold) if max_threshold is not None else True) if flag_name is None: flag_name = attr_name + '_flag' return pl.add_attribute(flag_name, flt(pl[attr_name]), is_flag=True, on_index=flag_index)
[docs]def filter_ringing(pl: PeakList, threshold: float, bin_size: Union[int, float] = 1.0, flag_name: str = 'ringing_flag', flag_index: Union[int, None] = None): """ Peaklist ringing filter. :param pl: The target peaklist :param threshold: Intensity threshold ratio :param bin_size: size of the mz chunk for intensity filtering. Default = 1.0 ppm :param flag_name: Name of the new flag attribute. Default = 'ringing_flag' :param flag_index: Index of the new flag to be inserted into the peaklist. Default = None :rtype: PeakList object This filter will split the mz values into bin_size chunks, and search the highest intensity value for each chunk. All other peaks, if it's intensity is smaller than threshold x the highest intensity in that chunk, will be unflagged. """ if not 0 <= threshold <= 1: raise ValueError('mzr_remove: Provide a value in the range [0.0, 1.0]') inds = np.digitize(, np.arange(np.floor(np.min(, np.ceil(np.max( + bin_size, bin_size) - 0.5) blks = [(inds == i) for i in np.unique(inds)] mask = np.array(reduce(lambda x, y: x + y, [[np.max(pl.intensity[c])] * np.sum(c) for c in blks])) return pl.add_attribute(flag_name, pl.intensity > (mask * threshold), is_flag=True, on_index=flag_index)
[docs]def filter_mz_ranges(pl: PeakList, mz_ranges: Sequence[Tuple[float, float]], flag_name: str = 'mz_ranges_flag', flagged_only: bool = False, flag_index: Union[int, None] = None): """ Peaklist mz range filter. :param pl: The target peaklist :param mz_ranges: The mz ranges to remove. Must be in the format of [(mz_min1, mz_max2), (mz_min2, mz_max2), ...] :param flag_name: Name of the new flag attribute. Default = 'mz_range_remove_flag' :param flag_index: Index of the new flag to be inserted into the peaklist. Default = None :rtype: PeakList This filter will remove all the peaks whose mz values are within any of the ranges in the mz_remove_rngs. """ if flagged_only: flags = np.ones(pl.shape[0], dtype=bool) else: flags = np.ones(pl.full_size, dtype=bool) for mzr in mz_ranges: if len(mzr) != 2: raise ValueError( 'mzr_remove: Provide a list of "start" and "end" values for each m/z range that needs to be removed.') if mzr[0] >= mzr[1]: raise ValueError('mzr_remove: Start value cannot be larger then end value.') flags[ (pl.get_attribute("mz", flagged_only) >= mzr[0]) & (pl.get_attribute("mz", flagged_only) <= mzr[1])] = False pl.add_attribute(flag_name, flags, flagged_only=flagged_only, is_flag=True, on_index=flag_index) return pl
# PeakMatrix filters
[docs]def filter_rsd(pm: PeakMatrix, rsd_threshold: Union[int, float], qc_tag: Any, on_attr: str = 'intensity', flag_name: str = 'rsd_flag'): """ PeakMatrix RSD filter. :param pm: The target peak matrix :param rsd_threshold: Threshold of the RSD of the QC samples :param qc_tag: Tag (label) to unmask qc samples :param on_attr: Calculate RSD on given attribute. Default = "intensity" :param flag_name: Name of the new flag. Default = 'rsd_flag' :rtype: PeakMatrix This filter will calculate the RSD values of the QC samples. A peak with a QC RSD value larger than the threshold will be unflagged. """ rsd_values = pm.rsd(qc_tag, on_attr=on_attr) if np.any(np.isnan(rsd_values)): logging.warning('nan found in QC rsd values, filter might not work properly') pm.add_flag(flag_name, [not (np.isnan(v) or v > rsd_threshold) for v in rsd_values]) return pm
[docs]def filter_fraction(pm: PeakMatrix, fraction_threshold: float, within_classes: bool = False, class_tag_type: Any = None, flag_name: str = 'fraction_flag'): """ PeakMatrix fraction filter. :param pm: The target peak matrix :param fraction_threshold: Threshold of the sample fractions :param within_classes: Whether to calculate the fraction array within each class. Default = False :param class_tag_type: Tag type to unmask samples within the same class (e.g. "classLabel"). Default = None :param flag_name: Name of the new flag. Default = 'fraction_flag' :rtype: PeakMatrix object This filter will calculate the fraction array over all samples or within each class (based on class_tag_type). The peaks with a fraction value smaller than the threshold will be unflagged. """ if not within_classes: pm.add_flag(flag_name, pm.fraction >= fraction_threshold) else: if class_tag_type is None: raise KeyError('must provide class tag type for within classes filtering') if not all([t.has_tag_type(class_tag_type) for t in pm.peaklist_tags]): raise AttributeError('not all tags have tag type [%s]' % class_tag_type) flg = np.zeros(pm.shape[1]) for tag in pm.tags_of(class_tag_type): with unmask_peakmatrix(pm, tag) as m: flg = np.logical_or(flg, (m.fraction >= fraction_threshold)) pm.add_flag(flag_name, flg) return pm
[docs]def filter_blank_peaks(pm: PeakMatrix, blank_tag: Any, fraction_threshold: Union[int, float] = 1, fold_threshold: Union[int, float] = 1, method: str = 'mean', rm_blanks: bool = True, flag_name: str = 'blank_flag'): """ PeakMatrix blank filter. :param pm: The target peak matrix :param blank_tag: Tag (label) to mask blank samples. e.g Tag("blank", "classLabel") :param fraction_threshold: Threshold of the sample fractions. Default = 1 :param fold_threshold: Threshold of the blank sample intensity folds. Default = 1 :param method: Method to calculate blank sample intensity array. Valid values include 'mean', 'median', and 'max'. Default = 'mean' :param rm_blanks: Whether to remove (not mask) blank samples after filtering :param flag_name: Name of the new flag. Default = 'blank_flag' :rtype: PeakMatrix object This filter will calculate the intensity array of the blanks using the "method", and compare with the intensities of the other samples. If fraction_threshold% of the intensity values of a peak are smaller than the blank intensities x fold_threshold, this peak will be unflagged. """ if not any([blank_tag in x for x in pm.peaklist_tags]): raise ValueError('blank tag [%s] does not exist' % blank_tag) if method not in ('mean', 'median', 'max'): raise ValueError('filter method must be mean, median or max') with unmask_peakmatrix(pm, blank_tag) as m: mm =, mask = ~(m.intensity_matrix > 0)) ints = mm[0] if mm.shape[0] == 1 else getattr(np, method)(mm, axis = 0) imsk = ints.mask ints = np.array(ints) * fold_threshold with mask_peakmatrix(pm, blank_tag) as m: faild_int = np.sum(m.intensity_matrix >= ints, axis=0) < (fraction_threshold * m.shape[0]) m.add_flag(flag_name, ~(~imsk & faild_int)) if rm_blanks: pm = pm.remove_samples(np.where([x.has_tag(blank_tag) for x in pm.peaklist_tags])[0]) return pm