Source code for dimspy.portals.hdf5_portal

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright © 2017-2020 Ralf Weber, Albert Zhou.
#
# This file is part of DIMSpy.
#
# DIMSpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# DIMSpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with DIMSpy.  If not, see <https://www.gnu.org/licenses/>.
#


import logging
import os
import pickle as cp
import warnings
import zlib
from ast import literal_eval
from typing import Sequence

import h5py
import numpy as np
import tables as ptb
from ..models.peak_matrix import PeakMatrix, unmask_all_peakmatrix
from ..models.peaklist import PeakList
from ..models.peaklist_tags import Tag, PeakList_Tags

warnings.filterwarnings('ignore', category=ptb.NaturalNameWarning)


def _eval(v):
    try:
        return literal_eval(v)
    except (ValueError, SyntaxError):
        return str(v)


_packMeta = lambda x: np.string_(zlib.compress(cp.dumps(x)) + b'\xFF')  # numpy truncates right-side \x00
_unpackMeta = lambda x: cp.loads(zlib.decompress(x[:-1]))

_convByteStr = lambda x: x if isinstance(x, str) else x.decode('utf-8')


# peaklists portals
[docs]def save_peaklists_as_hdf5(pkls: Sequence[PeakList], filename: str, compatibility_mode: bool = False): """ Saves multiple peaklists in a HDF5 file. :param pkls: The target list of peaklist objects :param filename: Path to a new HDF5 file :param compatibility_mode: Change mode to read previous DIMSpy v1.* based HDF5 file To incorporate with different dtypes in the attribute matrix, this portal converts all the arribute values into fix-length strings for HDF5 data tables storage. The order of the peaklists will be retained. """ if os.path.isfile(filename): logging.warning('HDF5 database [%s] already exists and override', filename) if compatibility_mode: logging.warning('DeprecationWarning: exporting HDF file in the old format') f = h5py.File(filename, 'w') if compatibility_mode else ptb.open_file(filename, mode='w') def _old_savepkl(i, pkl): if pkl.ID in f.keys(): raise IOError('peaklist [%s] already exists' % pkl.ID) dm = np.string_(pkl.to_list()[:-1]) dset = f.create_dataset(pkl.ID, pkl.full_shape[::-1], dtype=dm.dtype) dset.attrs['class'] = 'PeakList' dset.attrs['order'] = i dset[...] = np.array(dm) # skip flags dset.attrs['dtable_names'], dset.attrs['dtable_types'] = list(map(np.string_, zip(*pkl.dtable.dtype.descr))) dset.attrs['flag_attrs'] = np.string_(pkl.flag_attributes) dset.attrs['tags'] = np.string_([(t or 'None', v) for v, t in pkl.tags.to_list()]) for k, v in pkl.metadata.items(): dset.attrs['metadata_' + k] = _packMeta(v) def _savepkl(i, pkl): dns, dts = zip(*pkl.dtable.dtype.descr) desc = type('_struct_array', (ptb.IsDescription,), {dn: ptb.Col.from_dtype(np.dtype(dt)) for dn, dt in zip(dns, dts)}) dset = ptb.Table(f.root, pkl.ID, desc) dset.append([pkl.get_attribute(n, flagged_only=False) for n in dset.colnames]) dset.attrs.data_class = 'PeakList' dset.attrs.order = i dset.attrs.dtable_names = dns dset.attrs.dtable_types = dts dset.attrs.flag_attrs = np.array(pkl.flag_attributes) dset.attrs.tags = np.array([(t or 'None', v) for v, t in pkl.tags.to_list()]) for k, v in pkl.metadata.items(): setattr(dset.attrs, 'metadata_' + k, _packMeta(v)) with warnings.catch_warnings(): warnings.simplefilter('ignore', ptb.NaturalNameWarning) _save = _old_savepkl if compatibility_mode else _savepkl for pl in enumerate(pkls): _save(*pl) f.close()
[docs]def load_peaklists_from_hdf5(filename: str, compatibility_mode: bool = False): """ Loads a list of peaklist objects from a HDF5 file. :param filename: Path to a HDF5 file :param compatibility_mode: Change mode to read previous DIMSpy v1.* based HDF5 file :rtype: Sequence[PeakList] The values in HDF5 data tables are automatically converted to their original dtypes before loading in the peaklist. """ if not os.path.isfile(filename): raise IOError('HDF5 database [%s] does not exist' % filename) if not h5py.is_hdf5(filename): raise IOError('input file [%s] is not a valid HDF5 database' % filename) if compatibility_mode: logging.warning('DeprecationWarning: loading HDF file in the old format') f = h5py.File(filename, 'r') if compatibility_mode else ptb.open_file(filename, mode='r') def _old_loadpkl(ID): dset = f[ID] if _convByteStr(dset.attrs.get('class', '')) != 'PeakList': raise IOError('unknown object found in the database') dm = dset[:] dn = dset.attrs['dtable_names'].astype(str) dt = dset.attrs['dtable_types'].astype(str) if dn[0] != 'mz' or dn[1] != 'intensity': raise IOError('PANIC: HDF5 dataset matrix not in order') pkl = PeakList(ID, dm[0].astype(np.float64), dm[1].astype(np.float64), **{k[9:]: _unpackMeta(v) for k, v in list(dset.attrs.items()) if k.startswith('metadata_')}) for n, v, t in zip(dn[2:], dm[2:], dt[2:]): pkl.add_attribute(n, v, t, is_flag=(n in dset.attrs['flag_attrs'].astype(str)), flagged_only=False) for t, v in map(lambda x: x.astype(str), dset.attrs['tags']): pkl.tags.add_tag(_eval(v), None if t == 'None' else t) return dset.attrs['order'], pkl def _loadpkl(dset): if dset.attrs.data_class != 'PeakList': raise IOError('unknown object found in the database') dn = dset.attrs.dtable_names dt = dset.attrs.dtable_types dm = [np.array(dset.colinstances[n]) for n in dn] if dn[0] != 'mz' or dn[1] != 'intensity': raise IOError('PANIC: HDF5 dataset matrix not in order') pkl = PeakList(dset.name, dm[0], dm[1], **{k[9:]: _unpackMeta(getattr(dset.attrs, k)) for k in dset.attrs._f_list('user') if k.startswith('metadata_')}) for n, v, t in zip(dn[2:], dm[2:], dt[2:]): pkl.add_attribute(n, v, t, is_flag=(n in dset.attrs.flag_attrs), flagged_only=False) for t, v in map(lambda x: x.astype(str), dset.attrs.tags): pkl.tags.add_tag(_eval(v), None if t == 'None' else t) return dset.attrs.order, pkl with warnings.catch_warnings(): warnings.simplefilter('ignore', ptb.NaturalNameWarning) pkls = [_old_loadpkl(dname) for dname in f.keys()] if compatibility_mode else \ [_loadpkl(dset) for dset in f.walk_nodes('/', 'Table')] pkls = list(zip(*sorted(pkls, key=lambda x: x[0])))[1] f.close() return pkls
# peak matrix portals
[docs]def save_peak_matrix_as_hdf5(pm: PeakMatrix, filename: str, compatibility_mode: bool = False): """ Saves a peak matrix object to a HDF5 file. :param pm: The target peak matrix object :param filename: Path to a new HDF5 file The order of the attributes and flags will be retained. """ if os.path.isfile(filename): logging.warning('HDF5 database [%s] already exists and override', filename) if compatibility_mode: logging.warning('DeprecationWarning: exporting HDF file in the old format') f = h5py.File(filename, 'w') if compatibility_mode else ptb.open_file(filename, mode='w') def _old_savepm(): def _saveattr(attr): with unmask_all_peakmatrix(pm) as m: dm = m.attr_matrix(attr, flagged_only=False) dt = np.float64 if dm.dtype.kind == 'f' else \ np.int64 if dm.dtype.kind in ('i', 'u') else \ ('S%d' % np.max([list(map(len, l)) for l in dm])) ds = f.create_dataset(attr, dm.shape, dtype=dt) ds[...] = dm.astype(dt) ds.attrs['dtype'] = dm.dtype.str for a in pm.attributes: _saveattr(a) dset = f['mz'] # must exists in pm dset.attrs['class'] = 'PeakMatrix' dset.attrs['attributes'] = np.string_(pm.attributes) dset.attrs['mask'] = pm.mask with unmask_all_peakmatrix(pm): dset.attrs['peaklist_ids'] = np.string_(pm.peaklist_ids) for i, tags in enumerate(pm.peaklist_tags): dset.attrs['peaklist_tags_%d' % i] = np.string_([(t or 'None', v) for v, t in tags.to_list()]) dset.attrs['flag_names'] = np.string_(pm.flag_names) for fn in pm.flag_names: dset.attrs[fn] = pm.flag_values(fn) def _savepm(): def _saveattr(attr): with unmask_all_peakmatrix(pm) as m: dm = m.attr_matrix(a, flagged_only=False) ds = f.create_array(f.root, a, dm) ds.attrs.dtype = dm.dtype.str for a in pm.attributes: _saveattr(a) dset = f.root.mz # must exists in pm dset.attrs.data_class = 'PeakMatrix' dset.attrs.attributes = pm.attributes dset.attrs.mask = pm.mask with unmask_all_peakmatrix(pm): dset.attrs.peaklist_ids = _packMeta(pm.peaklist_ids) for i, tags in enumerate(pm.peaklist_tags): dset.attrs['peaklist_tags_%d' % i] = np.array([(t or 'None', v) for v, t in tags.to_list()]) dset.attrs.flag_names = pm.flag_names for fn in pm.flag_names: vals = pm.flag_values(fn) dset.attrs[fn] = vals if len(vals) < 65000 else _packMeta(vals) with warnings.catch_warnings(): warnings.simplefilter('ignore', ptb.NaturalNameWarning) (_old_savepm if compatibility_mode else _savepm)() f.close()
[docs]def load_peak_matrix_from_hdf5(filename: str, compatibility_mode: bool = False): """ Loads a peak matrix from a HDF5 file. :param filename: Path to an existing HDF5 file :rtype: PeakMatrix object """ if not os.path.isfile(filename): raise IOError('HDF5 database [%s] does not exist' % filename) if not h5py.is_hdf5(filename): raise IOError('input file [%s] is not a valid HDF5 database' % filename) if compatibility_mode: logging.warning('DeprecationWarning: loading HDF file in the old format') f = h5py.File(filename, 'r') if compatibility_mode else ptb.open_file(filename, mode='r') def _old_loadpm(): dset = f['mz'] if _convByteStr(dset.attrs.get('class', '')) != 'PeakMatrix': raise IOError('input database is not a valid PeakMatrix') attl = dset.attrs['attributes'].astype(str) pids = dset.attrs['peaklist_ids'].astype(str) mask = dset.attrs['mask'] tatt = sorted([x for x in dset.attrs.keys() if x.startswith('peaklist_tags_')], key=lambda x: int(x[14:])) ptgs = [ PeakList_Tags(*[Tag(_eval(v), None if t == 'None' else t) for t, v in map(lambda x: x.astype(str), tags)]) for tags in [dset.attrs[x] for x in tatt]] flgs = [(fn, dset.attrs[fn]) for fn in dset.attrs['flag_names'].astype(str)] alst = [(attr, np.array(f[attr]).astype(f[attr].attrs['dtype'])) for attr in attl] return pids, ptgs, alst, mask, flgs def _loadpm(): dset = f.root.mz if dset.attrs.data_class != 'PeakMatrix': raise IOError('input database is not a valid PeakMatrix') attl = dset.attrs.attributes pids = (lambda x: _unpackMeta(x) if isinstance(x,bytes) else x)(dset.attrs.peaklist_ids) mask = dset.attrs.mask tatt = sorted([x for x in dset.attrs._f_list('user') if x.startswith('peaklist_tags_')], key=lambda x: int(x[14:])) ptgs = [ PeakList_Tags(*[Tag(_eval(v), None if t == 'None' else t) for t, v in map(lambda x: x.astype(str), tags)]) for tags in [dset.attrs[x] for x in tatt]] flgs = [(flg, (lambda x: _unpackMeta(x) if isinstance(x,bytes) else x)(dset.attrs[flg])) for flg in dset.attrs.flag_names] alst = [(attr, f.root[attr].read().astype(f.root[attr].attrs.dtype)) for attr in attl] return pids, ptgs, alst, mask, flgs with warnings.catch_warnings(): warnings.simplefilter('ignore', ptb.NaturalNameWarning) res = (_old_loadpm if compatibility_mode else _loadpm)() f.close() pm = PeakMatrix(*res[:3]) pm.mask = res[3] for fn, fv in res[4]: pm.add_flag(fn, fv, flagged_only=False) return pm