#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright © 2017-2020 Ralf Weber, Albert Zhou.
#
# This file is part of DIMSpy.
#
# DIMSpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# DIMSpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with DIMSpy. If not, see <https://www.gnu.org/licenses/>.
#
import logging
from collections import OrderedDict
from collections.abc import Iterable
from functools import reduce
from typing import Sequence, Tuple, Any, Union
import numpy as np
from .peaklist import PeakList
from .peaklist_tags import Tag, PeakList_Tags
[docs]class PeakMatrix(object):
"""
The PeakMatrix class.
Stores aligned mass spectrometry peaks matrix data. It requires IDs, tags, and attributes from the source peak
lists. It uses tags based mask to "hide" the unrelated samples for convenient processing. It utilises the
automatically managed flags to "remove" peaks without actually delete them. Therefore the filterings on the peaks
are traceable. Normally, PeakMatrix object is created by functions e.g. align_peaks() rather than manual.
:param peaklist_ids: The IDs of the source peak lists
:param peaklist_tags: The tags (PeakList_Tags) of the source peak lists
:param peaklist_attributes: The attributes of the source peak lists. Must be a list or tuple in the format of
[(attr_name, attr_matrix), ...], where attr_name is name of the attribute, and attr_matrix is the vertically
stacked arrtibute values in the shape of samples x peaks. The order of the attributes will be kept in the
PeakMatrix. The first two attributes must be "mz" and "intensity".
>>> pids = [pl.ID for pl in peaklists]
>>> tags = [pl.tags for pl in peaklists]
>>> attrs = [(attr_name, np.vstack([pl[attr_name] for pl in peaklists])) \
for attr_name in peaklists[0].attributes]
>>> pm = PeakMatrix(pids, tags, attrs)
Internally the attribute data is stored in OrderedDict as a list of matrix. An attribute matrix can be illustrated
as follows, in which the mask and flags are the same for all attributes. The final row "flags" is automatically
calculated based on the manually added flags. It decides which peaks are "removed" i.e. unflagged. Particularly,
the "--" indicates no peak in that sample can be aligned into the mz value.
**attribute: "mz"**
+------------+--------+--------+--------+-----+
| mask | peak_1 | peak_2 | peak_3 | ... |
+============+========+========+========+=====+
| False | 12.7 | 14.9 | 21.0 | |
+------------+--------+--------+--------+ +
| True | -- | 15.1 | 21.1 | ... |
+------------+--------+--------+--------+ +
| False | 12.1 | 14.7 | -- | |
+------------+--------+--------+--------+ +
| False | 12.9 | 14.8 | 20.9 | |
+------------+--------+--------+--------+-----+
| ... | | | | |
+------------+--------+--------+--------+-----+
| **flag_1** | True | False | True | |
+------------+--------+--------+--------+ +
| **flag_2** | True | True | False | ... |
+------------+--------+--------+--------+ +
| **flags*** | True | False | False | |
+------------+--------+--------+--------+-----+
.. warning::
Removing a flag may change the overall "flags", and cause the unflagged peaks to be flagged again. As
most the processes are applied only on the flagged peaks, these peaks, if the others have gone through such process,
may have incorrect values.
In principle, setting a flag attribute should be considered as an irreversible process.
Different from the flags, mask should be considered as a more temporary way to hide the unrelated samples. A masked
sample (row) will not be used for processing, but its data is still in the attribute matrix. For this reason,
the mask_peakmatrix, unmask_peakmatrix, and unmask_all_peakmatrix statements are provided as a more flexible way
to set / unset the mask.
"""
def __init__(self, peaklist_ids: Sequence[str], peaklist_tags: Sequence[PeakList_Tags],
peaklist_attributes: Sequence[Tuple[str, Any]]):
if len(set(peaklist_ids)) != len(peaklist_ids):
raise ValueError('duplicate peaklist IDs found')
self._pids = np.array(peaklist_ids)
self._tags = np.array(peaklist_tags)
self._attr_dict = OrderedDict((str(k), np.array(v)) for k, v in peaklist_attributes)
if not (list(self._attr_dict.keys())[0] == 'mz' and list(self._attr_dict.keys())[1] == 'intensity'):
raise ValueError('required attribute fields "mz" and "intensity" not available')
pnum, psize = list(self._attr_dict.values())[0].shape
if not len(peaklist_ids) == len(peaklist_tags) == pnum:
raise ValueError('input peaklists number not match')
if not all([x.shape == (pnum, psize) for x in list(self._attr_dict.values())]):
raise ValueError('input attribute matrix shape not match')
self._mask = np.zeros(pnum, dtype=bool)
self._flags_dict = OrderedDict()
# privates
# np.average accepts weights, set to 0 to exclude value
def _present_mask(self, x, _func, flagged_only):
pmx = self.property('present_matrix', flagged_only)
mskx = np.ma.masked_array(x, mask=~pmx)
vals = _func(mskx)
rval = np.array(vals)
rval[vals.mask] = np.nan
return rval
def _present_mean(self, x, axis, flagged_only):
return self._present_mask(x, lambda v: np.mean(v, axis=axis), flagged_only)
def _present_std(self, x, axis, flagged_only):
return self._present_mask(x, lambda v: np.std(v, axis=axis, ddof=1), flagged_only)
# built-ins
def __len__(self):
return self.shape[0]
def __str__(self):
return self.to_str(attr_name='intensity', delimiter=', ', samples_in_rows=True, comprehensive=False)
# properties
@property
def mask(self): # mask is different from flags in that they will not be masked by themselves, i.e., more temporary
"""
Property of the mask.
:getter: returns a deep copy of the mask array
:setter: sets the mask array. Provide None to unmask all samples
:type: numpy array
"""
return self._mask.copy()
@mask.setter
def mask(self, value):
self._mask = np.zeros(self.full_shape[0], dtype=bool)
if value is not None: self._mask[value] = True
@property
def flag_names(self):
"""
Property of the flag names.
:getter: returns a tuple including the names of the manually set flags
:type: tuple
"""
return tuple(self._flags_dict.keys())
@property
def flags(self):
"""
Property of the flags.
:getter: returns a deep copy of the flags array
:type: numpy array
"""
return np.ones(self.full_shape[1], dtype=bool) if len(self._flags_dict) == 0 else \
np.logical_and.reduce(list(self._flags_dict.values()))
@property
def attributes(self):
"""
Property of the attribute names.
:getter: returns a tuple including the names of the attribute matrix
:type: tuple
"""
return tuple(self._attr_dict.keys())
@property
def peaklist_ids(self):
"""
Property of the source peaklist IDs.
:getter: returns a tuple including the IDs of the source peaklists
:type: tuple
"""
return tuple(self._pids[~self._mask])
@property
def peaklist_tags(self):
"""
Property of the source peaklist tags.
:getter: returns a tuple including the Peaklist_Tags objects of the source peaklists
:type: tuple
"""
return tuple(self._tags[~self._mask])
@property
def peaklist_tag_types(self):
"""
Property of the source peaklist tag types.
:getter: returns a tuple including the types of the typed tags of the source peaklists
:type: set
"""
return reduce(lambda x, y: x.union(y), [x.tag_types for x in self.peaklist_tags])
@property
def peaklist_tag_values(self):
"""
Property of the source peaklist tag values.
:getter: returns a tuple including the values of the source peaklists tags, both typed and untyped
:type: set
"""
return reduce(lambda x, y: x.union(y), [x.tag_values for x in self.peaklist_tags])
@property
def shape(self):
"""
Property of the peak matrix shape.
:getter: returns the shape of the attribute matrix
:type: tuple
"""
return np.sum(~self._mask), np.sum(self.flags)
@property
def full_shape(self):
"""
Property of the peak matrix full shape.
:getter: returns the full shape of the attribute matrix, i.e., ignore mask and flags
:type: tuple
"""
return self._attr_dict['mz'].shape
@property
def present(self):
"""
Property of the present array.
:getter: returns the present array, indicating how many peaks are aligned in each mz value
:type: numpy array
"""
return self.property('present')
@property
def present_matrix(self):
"""
Property of the present matrix.
:getter: returns the present matrix, indicating whether a sample has peak(s) aligned in each mz value
:type: numpy array
>>> print pm.present_matrix
array([[ True, True, True, True, False],
[ True, True, False, False, True],
[ True, True, True, True, True],
[False, True, False, True, True],])
>>> print pm.present
array([3, 4, 2, 3, 3])
"""
return self.property('present_matrix')
@property
def fraction(self):
"""
Property of the fraction array.
:getter: returns the fraction array, indicating the ratio of present peaks on each mz value
:type: numpy array
>>> print pm.present
array([3, 4, 2, 3, 3])
>>> print pm.shape[0]
4
>>> print pm.fraction
array([0.75, 1.0, 0.5, 0.75, 0.75])
"""
return self.property('fraction')
@property
def missing_values(self):
"""
Property of the missing values array.
:getter: returns the missing values array, indicating the number of unaligned peaks on each sample
:type: numpy array
>>> print pm.present_matrix
array([[ True, True, True, True, False],
[ True, True, False, False, True],
[ True, True, True, True, True],
[False, True, False, True, True],])
>>> print pm.missing_values
array([1, 2, 0, 2])
"""
return self.property('missing_values')
@property
def occurrence(self):
"""
Property of the occurrence array.
:getter: returns the occurrence array, indicating the total number of peaks (including peaks in the same sample)
aliged in each mz value. This property is valid only when the *intra_count* attribute matrix is available
:type: numpy array
>>> print pm.attr_matrix('intra_count')
array([[ 2, 1, 1, 1, 0],
[ 1, 1, 0, 0, 1],
[ 1, 3, 1, 2, 1],
[ 0, 1, 0, 1, 1],])
>>> print pm.occurrence
array([ 4, 6, 2, 4, 3])
"""
return self.property('occurrence')
@property
def purity(self):
"""
Property of the purity level array.
:getter: returns the purity array, indicating the ratio of only one peak in each sample being aligned in each mz
value. This property is valid only when the *intra_count* attribute matrix is available
:type: numpy array
>>> print pm.attr_matrix('intra_count')
array([[ 2, 1, 1, 1, 0],
[ 1, 1, 0, 0, 1],
[ 1, 3, 1, 2, 1],
[ 0, 1, 0, 1, 1],])
>>> print pm.purity
array([ 0.667, 0.75, 1.0, 0.667, 1.0])
"""
return self.property('purity')
# attribute matrix
@property
def mz_matrix(self):
"""
Property of the mz matrix.
:getter: returns the mz attribute matrix, unmasked and flagged values only
:type: numpy array
"""
return self.attr_matrix('mz')
@property
def intensity_matrix(self):
"""
Property of the intensity matrix.
:getter: returns the intensity attribute matrix, unmasked and flagged values only
:type: numpy array
"""
return self.attr_matrix('intensity')
@property
def mz_mean_vector(self):
"""
Property of the mz mean values array.
:getter: returns the mean values array of the mz attribute matrix, unmasked and flagged values only
:type: numpy array
"""
return self.attr_mean_vector('mz')
@property
def intensity_mean_vector(self):
"""
Property of the intensity mean values array.
:getter: returns the mean values array of the intensity attribute matrix, unmasked and flagged values only
:type: numpy array
"""
return self.attr_mean_vector('intensity')
[docs] def rsd(self, *args, **kwargs):
"""
Calculates relative standard deviation (RSD) array.
:param args: tags or untyped tag values for RSD calculation, no value = calculate over all samples
:param kwargs: typed tags for RSD calculation, no value = calculate over all samples
:param on_attr: calculate RSD on given attribute. Default = "intensity"
:param flagged_only: whether to calculate on flagged peaks only. Default = True
:type: numpy array
The RSD is calculated as:
>>> rsd = std(pm.intensity_matrix, axis = 0, ddof = 1) / mean(pm.intensity_matrix, axis = 0) * 100
Noting that the means delta degrees of freedom (ddof) is set to 1 for standard deviation calculation.
Moreover, only the "present" peaks will be used for calculation. If a column has less than 2 peaks, the
corresponding rsd value will be set to np.nan.
"""
on_attr = kwargs.pop('on_attr') if 'on_attr' in kwargs else 'intensity'
flagged_only = kwargs.pop('flagged_only') if 'flagged_only' in kwargs else True
if self.shape[0] < 2:
# logging.warning('calculating RSD on less than 2 samples')
return np.ones(self.shape[1] if flagged_only else self.full_shape[1]) * np.nan
with (unmask_all_peakmatrix(self) if len(args) + len(kwargs) == 0 else
unmask_peakmatrix(self, *args, **kwargs)) as m:
if m.shape[0] == 0: raise AttributeError('peak matrix does not have label(s) [%s]' %
str.join(', ', [str(x)[1:-1] for x in (args, kwargs)]))
ints = m.attr_matrix(on_attr, flagged_only)
rsd = m._present_std(ints, 0, flagged_only) / m._present_mean(ints, 0, flagged_only) * 100
rsd[np.where([len(set(x[np.nonzero(x)])) == 1 for x in ints.T])] = np.nan # only one valid value
return rsd
# publics
# tags and mask
# flags
[docs] def add_flag(self, flag_name: str, flag_values: Sequence[bool], flagged_only: bool = True):
"""
Adds a flag to the peak matrix peaks.
:param flag_name: name of the flag, it must be unique and not equal to "flags"
:param flag_values: values of the flag. It must have a length of pm.shape[1] if flagged_only = True, or
pm.full_shape[1] if flagged_only = False
:param flagged_only: whether to set the flagged peaks only. Default = True, and the values of the unflagged peaks
are set to False
The overall flags property will be automatically recalculated.
"""
if flag_name == 'flags':
raise KeyError('reserved flag name [flags] cannot be added')
if flag_name in self._flags_dict:
raise KeyError('flag name [%s] already exists' % flag_name)
if flagged_only:
if not len(flag_values) == self.shape[1]: raise ValueError('flag values and peak matrix shape not match')
newf = np.zeros(self.full_shape[1], dtype=bool)
newf[self.flags] = flag_values
self._flags_dict[flag_name] = newf
else:
if not len(flag_values) == self.full_shape[1]: raise ValueError(
'flag values and peak matrix shape not match')
self._flags_dict[flag_name] = np.array(flag_values, dtype=bool)
[docs] def drop_flag(self, flag_name: str):
"""
Drops a existing flag from the peak matrix.
:param flag_name: name of the flag to drop. It must exist and not equal to "flags"
The overall flags property will be automatically recalculated.
"""
if flag_name == 'flags':
raise KeyError('reserved flag name [flags] cannot be droppped')
if flag_name not in self._flags_dict:
raise KeyError('flag name [%s] does not exist' % flag_name)
del self._flags_dict[flag_name]
[docs] def flag_values(self, flag_name: str):
"""
Obtains values of an existing flag.
:param flag_name: name of the target flag. It must exist and not equal to "flags"
:rtype: numpy array
"""
if flag_name == 'flags':
raise KeyError('use PeakMatrix.flags to access reserved flag name [flags]')
if flag_name not in self._flags_dict:
raise KeyError('flag name [%s] does not exist' % flag_name)
return self._flags_dict[flag_name]
# access
[docs] def property(self, prop_name: str, flagged_only: bool = True):
"""
Obtains an existing attribute matrix.
:param prop_name: name of the target property. Valid properties include 'present', 'present_matrix', 'fraction',
'missing_values', 'occurrence', and 'purity'
:param flagged_only: whether to return the flagged values only. Default = True
:rtype: numpy array
"""
if prop_name in ('occurrence', 'purity') and 'intra_count' not in self.attributes:
logging.warning("attribute matrix ['intra_count'] not available")
def _purity():
if 'intra_count' not in self.attributes: np.ones(self.shape[1] if flagged_only else self.full_shape[1])
if 0 in self.property('present'):
logging.warning("found empty peak when calculating property ['purity']")
with np.errstate(divide='ignore', invalid='ignore'):
ret = 1 - np.sum(self.attr_matrix('intra_count', flagged_only) > 1, axis=0) / self.property('present',
flagged_only)
ret[np.isinf(ret)] = np.nan
return ret
# use lambda to postpone the actual calculations
_prop = {
'present_matrix':
lambda: self.attr_matrix('intra_count' if 'intra_count' in self.attributes else 'mz', flagged_only) > 0,
'present':
lambda: np.sum(self.property('present_matrix', flagged_only), axis=0),
'fraction':
lambda: self.property('present', flagged_only) / self.shape[0],
'missing_values':
lambda: np.sum(~self.property('present_matrix', flagged_only), axis=1),
'occurrence':
lambda: np.ones(
self.shape[1] if flagged_only else self.full_shape[1]) if 'intra_count' not in self.attributes else
np.sum(self.attr_matrix('intra_count', flagged_only), axis=0),
'purity': _purity
}.get(prop_name, None)
if _prop is None: raise ValueError('unknown property name [%s]' % prop_name)
return _prop()
[docs] def attr_matrix(self, attr_name: str, flagged_only: bool = True):
"""
Obtains an existing attribute matrix.
:param attr_name: name of the target attribute
:param flagged_only: whether to return the flagged values only. Default = True
:rtype: numpy array
"""
if attr_name not in self._attr_dict:
raise KeyError('attribute matrix [%s] not available' % attr_name)
aM = self._attr_dict[attr_name][~self._mask]
return aM[:, self.flags] if flagged_only else aM
[docs] def attr_mean_vector(self, attr_name: str, flagged_only: bool = True):
"""
Obtains the mean array of an existing attribute matrix.
:param attr_name: name of the target attribute
:param flagged_only: whether to return the mean array of the flagged values only. Default = True
:rtype: numpy array
Noting that only the "present" peaks will be used for mean values calculation. If the attribute matrix has a
string / unicode data type, the values in each column will be concatenated.
"""
aM = self.attr_matrix(attr_name, flagged_only)
aV = self._present_mean(aM, 0, flagged_only) if aM.dtype.kind in ('i', 'u', 'f') else \
np.array([str.join(';', [str(v) for v, p in zip(ln, self.present_matrix[:, j]) if p]) for j, ln in
enumerate(zip(*aM))]) # for strings
return aV
[docs] def remove_samples(self, sample_ids, masked_only: bool = True):
"""
Removes samples from the peak matrix.
:param sample_ids: the indices of the samples to remove
:param masked_only: whether the indices are for unmasked samples or all samples. Default = True
:rtype: PeakMatrix object (self)
"""
if isinstance(sample_ids, tuple) and len(sample_ids) == 1 and isinstance(sample_ids[0], np.ndarray): sample_ids = sample_ids[0].tolist() # fix warning from np.where index
elif isinstance(sample_ids, (np.ndarray, Iterable)): sample_ids = list(sample_ids)
rmids = np.where(~self.mask)[0][sample_ids] if masked_only else sample_ids
self._pids = np.delete(self._pids, rmids, axis=0)
self._tags = np.delete(self._tags, rmids, axis=0)
self._attr_dict = OrderedDict((k, np.delete(v, rmids, axis=0)) for k, v in list(self._attr_dict.items()))
self._mask = np.delete(self._mask, rmids, axis=0)
self.remove_empty_peaks()
if self.is_empty(): logging.warning('matrix is empty after removal')
return self
[docs] def remove_peaks(self, peak_ids, flagged_only: bool = True):
"""
Removes peaks from the peak matrix.
:param peak_ids: the indices of the peaks to remove
:param flagged_only: whether the indices are for flagged peaks or all peaks. Default = True
:rtype: PeakMatrix object (self)
"""
if isinstance(peak_ids, Iterable): peak_ids = list(peak_ids)
rmids = np.where(self.flags)[0][peak_ids] if flagged_only else peak_ids
self._attr_dict = OrderedDict((k, np.delete(v, rmids, axis=1)) for k, v in list(self._attr_dict.items()))
self._flags_dict = OrderedDict((k, np.delete(v, rmids)) for k, v in list(self._flags_dict.items()))
if self.is_empty(): logging.warning('matrix is empty after removal')
return self
[docs] def remove_empty_peaks(self):
"""
Removes empty peaks from the peak matrix.
Empty peaks are peaks with not valid m/z or intensity value over the samples. They may occur after removing
an entire sample from the peak matrix, e.g., remove the blank samples in the blank filter.
:rtype: PeakMatrix object (self)
"""
prm = self.property('present_matrix', flagged_only=False)
self.remove_peaks(np.where(np.sum(prm, axis=0) == 0)[0], flagged_only=False)
return self
[docs] def is_empty(self):
"""
Checks whether the peak matrix is empty under the current mask and flags.
:rtype: bool
"""
return 0 in self.shape
# exports
[docs] def to_peaklist(self, ID: str):
"""
Averages the peak matrix into a single peaklist.
:param ID: ID of the merged peaklist
:rtype: PeakList object
Only the "present" peaks will be included in the result peaklist. The new peaklist will only contain the
following attributes: mz, intensity, present, fraction, rsd, occurence, and purity.
Use unmask statement to calculate the peaklist for a particular group of samples:
>>> with unmask_peakmatrix(pm, 'Sample') as m: pkl = m.to_peaklist('averaged_peaklist')
Or use mask statement to exclude a particular group of samples:
>>> with mask_peakmatrix(pm, 'QC') as m: pkl = m.to_peaklist('averaged_peaklist')
"""
presids = self.present > 0 # presented peaks only
if False in presids:
logging.warning('[%d] empty peaks removed when exporting PeakMatrix to PeakList' % np.sum(~presids))
pl = PeakList(ID, self.mz_mean_vector[presids], self.intensity_mean_vector[presids],
aligned_ids=self.peaklist_ids)
pl.add_attribute('present', self.present[presids])
pl.add_attribute('fraction', self.fraction[presids])
pl.add_attribute('occurrence', self.occurrence[presids])
pl.add_attribute('purity', self.purity[presids])
return pl
[docs] def to_str(self, attr_name: str = 'intensity', delimiter: str = '\t', samples_in_rows: bool = True,
comprehensive: bool = True, rsd_tags: Sequence = ()):
"""
Exports the peak matrix to a string.
:param attr_name: name of the attribute matrix for exporting. Default = 'intensity'
:param delimiter: delimiter to separate the matrix. Default = '\t', i.e., TSV format
:param samples_in_rows: whether or not the samples are stored in rows. Default = True
:param comprehensive: whether to include comprehensive info, e.g., mask, flags, present, rsd etc. Default = True
:param rsd_tags: peaklist tags for RSD calculation. Default = (), indicating only the overall RSD is included
:rtype: str
"""
self.remove_empty_peaks()
mz = self.attr_matrix('mz', flagged_only=not comprehensive)
hd = ['mz'] + list(map(str, np.average(mz, axis=0, weights=mz.astype(bool))))
dm = [list(map(str, self.peaklist_ids))] + \
[list(map(str, ln)) for ln in self.attr_matrix(attr_name, flagged_only=not comprehensive).T]
if comprehensive:
ttypes = self.peaklist_tag_types
if None in ttypes: ttypes.remove(None)
ttypes = sorted([str(t) for t in ttypes])
tnum = len(ttypes)
hd = [hd[0]] + ['missing values'] + ['tags_' + x for x in ttypes] + ['tags_untyped'] + hd[1:]
dm = [dm[0]] + \
[list(map(str, self.missing_values))] + \
[[(lambda v: str(v.value) if v else '')(x.tag_of(t)) for x in self.peaklist_tags] for t in ttypes] + \
[[str.join(';', (lambda v: map(str, v) if v else ())(x.tag_of(None))) for x in self.peaklist_tags]] + \
dm[1:]
rsd_tags = tuple(rsd_tags) if isinstance(rsd_tags, Iterable) else (rsd_tags,)
prelst = ['present'] + ([''] * (tnum + 2)) + list(map(str, self.property('present', flagged_only=False)))
ocrlst = ['occurrence'] + ([''] * (tnum + 2)) + list(
map(str, self.property('occurrence', flagged_only=False)))
puplst = ['purity'] + ([''] * (tnum + 2)) + list(map(str, self.property('purity', flagged_only=False)))
rsdmtx = [['rsd_' + str(rt.value if isinstance(rt, Tag) else rt)] + ([''] * (tnum + 2)) + list(
map(str, self.rsd(rt, flagged_only=False))) for rt in rsd_tags]
rsdlst = ['rsd_all'] + ([''] * (tnum + 2)) + list(map(str, self.rsd(flagged_only=False)))
flgmtx = [[fn] + ([''] * (tnum + 2)) + list(map(str, self.flag_values(fn).astype(int))) for fn in
self.flag_names]
flglst = ['flags'] + ([''] * (tnum + 2)) + list(map(str, self.flags.astype(int)))
dm = list(zip(*([prelst, ocrlst, puplst] + rsdmtx + [rsdlst] + flgmtx + [flglst] + list(zip(*dm)))))
lm = [hd] + list(zip(*dm))
return str.join('\n', map(lambda x: str.join(delimiter, x), lm if samples_in_rows else zip(*lm)))
# with statements
[docs]class mask_peakmatrix:
"""
The mask_peakmatrix statement.
Temporary mask the peak matrix with particular tags. Within the statement the samples can be motified or removed.
After leaving the statement the original mask will be recoverd.
:param pm: the target peak matrix
:param override: whether to override the current mask, default = True
:param args: target tag values, both typed and untyped
:param kwargs: target typed tag types and values
:rtype: PeakMatrix object
>>> print pm.peaklist_ids
('sample_1', 'sample_2', 'qc_1', 'sample_3', 'sample_4', 'qc_2')
>>> with mask_peakmatrix(pm., 'qc') as m: print m.peaklist_ids
('sample_1', 'sample_2', 'sample_3', 'sample_4')
>>> print pm.peaklist_ids
('sample_1', 'sample_2', 'qc_1', 'sample_3', 'sample_4', 'qc_2')
"""
def __init__(self, pm: PeakMatrix, *args, **kwargs):
self._pm = pm
self._args = args
self._kwargs = kwargs
if 'override' not in self._kwargs: self._kwargs['override'] = True # default for with statement
self._oldmask = dict(zip(pm._pids, pm._mask))
def __enter__(self):
self._pm.mask_tags(*self._args, **self._kwargs)
return self._pm
def __exit__(self, exc_type, exc_val, exc_tb):
self._pm.mask = [self._oldmask[i] for i in self._pm._pids]
[docs]class unmask_peakmatrix:
"""
The unmask_peakmatrix statement.
Temporary unmask the peak matrix with particular tags. Within the statement the samples can be motified or removed.
After leaving the statement the original mask will be recoverd.
:param pm: the target peak matrix
:param override: whether to override the current mask, default = True
:param args: target tag values, both typed and untyped
:param kwargs: target typed tag types and values
:rtype: PeakMatrix object
>>> print pm.peaklist_ids
('sample_1', 'sample_2', 'qc_1', 'sample_3', 'sample_4', 'qc_2')
>>> with unmask_peakmatrix(pm, 'qc') as m: print m.peaklist_ids
('qc_1', 'qc_2') # no need to set pm.mask to True
>>> print pm.peaklist_ids
('sample_1', 'sample_2', 'qc_1', 'sample_3', 'sample_4', 'qc_2')
"""
def __init__(self, pm: PeakMatrix, *args, **kwargs):
self._pm = pm
self._args = args
self._kwargs = kwargs
if 'override' not in self._kwargs: self._kwargs['override'] = True # default for with statement
self._oldmask = dict(zip(pm._pids, pm._mask))
def __enter__(self):
self._pm.unmask_tags(*self._args, **self._kwargs)
return self._pm
def __exit__(self, exc_type, exc_val, exc_tb):
self._pm.mask = [self._oldmask[i] for i in self._pm._pids]
[docs]class mask_all_peakmatrix:
"""
The mask_all_peakmatrix statement.
Temporary mask all the peak matrix samples. Within the statement the samples can be motified or removed.
After leaving the statement the original mask will be recoverd.
:param pm: the target peak matrix
:rtype: PeakMatrix object
>>> print pm.peaklist_ids
('sample_1', 'sample_2', 'qc_1', 'sample_3', 'sample_4', 'qc_2')
>>> with mask_all_peakmatrix(pm) as m: print m.peaklist_ids
()
>>> print pm.peaklist_ids
('sample_1', 'sample_2', 'qc_1', 'sample_3', 'sample_4', 'qc_2')
"""
def __init__(self, pm: PeakMatrix):
self._pm = pm
self._oldmask = dict(zip(pm._pids, pm._mask))
def __enter__(self):
self._pm.mask = [True] * self._pm.full_shape[0]
return self._pm
def __exit__(self, exc_type, exc_val, exc_tb):
self._pm.mask = [self._oldmask[i] for i in self._pm._pids]
[docs]class unmask_all_peakmatrix:
"""
The unmask_all_peakmatrix statement.
Temporary unmask all the peak matrix samples. Within the statement the samples can be motified or removed.
After leaving the statement the original mask will be recoverd.
:param pm: the target peak matrix
:rtype: PeakMatrix object
>>> print pm.peaklist_ids
('sample_1', 'sample_2', 'qc_1', 'sample_3', 'sample_4', 'qc_2')
>>> with unmask_all_peakmatrix(pm) as m: print m.peaklist_ids
('sample_1', 'sample_2', 'qc_1', 'sample_3', 'sample_4', 'qc_2')
>>> print pm.peaklist_ids
('sample_1', 'sample_2', 'qc_1', 'sample_3', 'sample_4', 'qc_2')
"""
def __init__(self, pm: PeakMatrix):
self._pm = pm
self._oldmask = dict(zip(pm._pids, pm._mask))
def __enter__(self):
self._pm.mask = None
return self._pm
def __exit__(self, exc_type, exc_val, exc_tb):
self._pm.mask = [self._oldmask[i] for i in self._pm._pids]