Source code for dimspy.portals.paths

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright © 2017-2020 Ralf Weber, Albert Zhou.
#
# This file is part of DIMSpy.
#
# DIMSpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# DIMSpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with DIMSpy.  If not, see <https://www.gnu.org/licenses/>.
#


import os

import h5py
import numpy as np
from datetime import datetime
import platform

from ..models.peaklist import PeakList
from ..portals import hdf5_portal
from ..portals.mzml_portal import Mzml
from ..portals.thermo_raw_portal import ThermoRaw


[docs]def sort_ms_files_by_timestamp(ps): """ Sort a set directory of .mzml or .raw files :param ps: List of paths :return List """ s_files = {} for i, fn in enumerate(ps): if fn.lower().endswith(".raw"): run = ThermoRaw(fn) elif fn.lower().endswith(".mzml"): run = Mzml(fn) else: continue s_files[fn] = str(run.timestamp) run.close() if list(s_files.keys())[0].lower().endswith(".mzml"): pattern = "%Y-%m-%dT%H:%M:%SZ" s_files_sorted = sorted(s_files.items(), key=lambda x: datetime.strptime(x[1], pattern), reverse=False) else: try: pattern = "%d/%m/%Y %H:%M:%S" s_files_sorted = sorted(s_files.items(), key=lambda x: datetime.strptime(x[1], pattern), reverse=False) except: pattern = "%m/%d/%Y %I:%M:%S %p" s_files_sorted = sorted(s_files.items(), key=lambda x: datetime.strptime(x[1], pattern), reverse=False) return s_files_sorted
[docs]def validate_and_sort_paths(source, tsv): """ Validate and sort a set (i.e. directory or hdf5 file) of .mzml or .raw files. :param tsv: Path to tab-separated file :param source: Path to a Path to the .hdf5 file to read from. :return: List """ if tsv is None: if type(source) == str: if os.path.isdir(source): filenames = [os.path.join(source, fn) for fn in os.listdir(source) if fn.lower().endswith(".mzml") or fn.lower().endswith(".raw")] filenames = [fd[0] for fd in sort_ms_files_by_timestamp(filenames)] elif h5py.is_hdf5(source): peaklists = hdf5_portal.load_peaklists_from_hdf5(source) filenames = [os.path.join(os.path.abspath(os.path.dirname(source)), pl.ID) for pl in peaklists] elif os.path.isfile(source): if source.lower().endswith(".raw") or source.lower().endswith(".mzml"): filenames = [source] else: raise IOError("Incorrect file format, provide .mzml or .raw files: {}".format(source)) else: raise IOError("[Errno 2] No such file or directory: {}".format(source)) elif type(source) == list or type(source) == tuple: if isinstance(source[0], PeakList): filenames = [pl.ID for pl in source] else: filenames = [] for fn in source: if os.path.isfile(fn): if fn.lower().endswith(".raw") or fn.lower().endswith(".mzml"): filenames.append(fn) else: raise IOError("Incorrect file format, provide .mzml or .raw files: {}".format(source)) else: raise IOError("[Errno 2] No such file or directory: {}".format(source)) else: raise IOError("[Errno 2] No such file or directory: {}".format(source)) elif os.path.isfile(tsv): fm = np.genfromtxt(tsv, dtype=None, delimiter="\t", names=True, encoding=None) if len(fm.shape) == 0: fm = np.array([fm]) if fm.dtype.names[0] != "filename" and fm.dtype.names[0] != "sample_id": raise IOError("Incorrect header for first column. Use filename or sample_id") filenames = [] if type(source) == list or type(source) == tuple: if isinstance(source[0], PeakList): for filename in fm[fm.dtype.names[0]]: if filename in [pl.ID for pl in source]: filenames.append(filename) else: raise IOError("{} does not exist in list with Peaklist objects".format(filename)) else: for fn in source: if not os.path.isfile(fn): raise IOError("[Errno 2] No such file or directory: {}".format(fn)) for filename in fm[fm.dtype.names[0]]: fns = [os.path.basename(fn) for fn in source] if filename in fns: filenames.append(source[fns.index(filename)]) else: raise IOError("{} (row {}) does not exist in source provided".format(filename, list( fm[fm.dtype.names[0]]).index(filename) + 1)) elif type(source) == str: if os.path.isdir(source): l = os.listdir(source) for fn in fm[fm.dtype.names[0]]: if os.path.basename(fn) not in l: raise IOError("{} does not exist in directory provided".format(os.path.basename(fn))) filenames.append(os.path.join(source, fn)) elif h5py.is_hdf5(source): peaklists = hdf5_portal.load_peaklists_from_hdf5(source) filenames = [pl.ID for pl in peaklists] elif os.path.isfile(source): if source.lower().endswith(".raw") or source.lower().endswith(".mzml"): filenames.append(source) else: raise IOError("Incorrect file format, provide .mzml or .raw files: {}".format(source)) else: raise IOError("[Errno 2] No such file or directory: {} or {}".format(source, tsv)) else: raise IOError("[Errno 2] No such file or directory: {}".format(tsv)) return filenames