Source code for dimspy.metadata
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright © 2017-2020 Ralf Weber, Albert Zhou.
#
# This file is part of DIMSpy.
#
# DIMSpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# DIMSpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with DIMSpy. If not, see <https://www.gnu.org/licenses/>.
#
import collections
import csv
import os
import re
import warnings
from typing import Sequence, Dict
import numpy as np
from .models.peak_matrix import PeakMatrix
from .models.peaklist import PeakList
[docs]def mz_range_from_header(h: str) -> Sequence[float]:
"""
Extract m/z range from header or filter string
:param h: Header or filter string
:return: m/z range
"""
return [float(m) for m in re.findall(r'([\w\.-]+)-([\w\.-]+)', h)[0]]
[docs]def ms_type_from_header(h: str) -> str:
"""
Extract the ms type from header or filter string
:param h: header or filter string
:return: ms type (e.g. FTMS and ITMS)
"""
return h.split(" ")[0]
[docs]def scan_type_from_header(h: str) -> str:
"""
Extract the scan type from the header of filter string
:param h: header or filter string
:return: Scan type (e.g. full or sim)
"""
if " full " in h.lower():
return "Full"
elif " sim " in h.lower():
return "SIM"
else:
return None
[docs]def mode_type_from_header(h: str) -> str:
"""
Extract scan mode from the header of filter string
:param h: header or filter string
:return: Scan type (e.g. p = profile, c = centroid)
"""
if " p " in h.lower():
return "p"
elif " c " in h.lower():
return "c"
else:
return None
[docs]def count_scan_types(hs: list) -> int:
"""
Count the number of unique scan types
:param hs: List of headers or filter strings
:return: Count
"""
return len(set([scan_type_from_header(h) for h in hs]))
[docs]def count_ms_types(hs: list) -> int:
"""
Count the number of unique ms types
:param hs: List of headers or filter strings
:return: Count
"""
return len(set([ms_type_from_header(h) for h in hs]))
def _partially_overlapping_windows(mzrs: list) -> list:
"""
Select all adjacent m/z windows that partially overlap
For example: [100-200] and [185-285] (Valid for SIM-stitch)
:param mzrs: Nested list of mz ranges / windows
:return: Nested list of m/z ranges / windows
"""
assert type(mzrs) == list, "List required"
temp = []
for i in range(0, len(mzrs) - 1):
if mzrs[i][0] < mzrs[i + 1][0] and mzrs[i][1] > mzrs[i + 1][0] and mzrs[i][1] < mzrs[i + 1][1]:
if mzrs[i] not in temp:
temp.append(mzrs[i])
if mzrs[i + 1] not in temp:
temp.append(mzrs[i + 1])
return temp
def _first_fully_overlapping_windows(mzrs: list) -> list:
"""
Select m/z windows that fall within another window and have a different mass ranges
For example: [100-200] and [125-175] (Invalid)
:param mzrs: Nested list of m/z ranges / windows
:return: Nested list of m/z ranges / windows
"""
assert type(mzrs) == list, "List required"
for i in range(0, len(mzrs) - 1):
if mzrs[i][0] <= mzrs[i + 1][0] and mzrs[i][1] >= mzrs[i + 1][1]:
return mzrs[i], mzrs[i + 1] # Temporary print
return []
def _non_overlapping_windows(mzrs: list) -> list:
"""
Select windows that do not overlap with other windows.
For example: [100-200] and [200-400] (Valid for merging)
:param mzrs: Nested list of m/z ranges / windows
:return: Nested list of m/z ranges / windows
"""
assert type(mzrs) == list, "List required"
temp = []
for i in range(0, len(mzrs)):
c = 0
for j in range(0, len(mzrs)):
if mzrs[i][0] <= mzrs[j][0] and mzrs[i][1] <= mzrs[j][0]:
c += 1
elif mzrs[i][0] >= mzrs[j][1] and mzrs[i][1] >= mzrs[j][1]:
c += 1
if c == len(mzrs) - 1:
temp.append(mzrs[i])
return temp
[docs]def interpret_method(mzrs: list):
"""
Interpret and define type of method
:param mzrs: Nested list of m/z ranges / windows
:return: Type of MS method
"""
mzrs.sort(key=lambda x: x[1])
now = _non_overlapping_windows(mzrs)
pow = _partially_overlapping_windows(mzrs)
if len(mzrs) == 1:
print("Single m/z window.....")
method = "single"
elif len(now) == len(mzrs):
print("Adjacent m/z windows.....")
method = "adjacent"
elif len(pow) == len(mzrs):
print("SIM-Stitch method - Overlapping m/z windows.....")
method = "overlapping"
else:
raise IOError("SIM-Stitch cannot be applied; 'filter_scan_events' required or set 'skip_stitching' to False")
return method
[docs]def to_int(x):
"""
:param x: Value to convert to int
:return: Value as int (or False if conversion not possible)
"""
try:
i = int(x)
return i
except ValueError as e:
return False
[docs]def validate_metadata(fn_tsv: str) -> collections.OrderedDict:
"""
Check and validate metadata within a tab-separated file
:param fn_tsv: Path to tab-separated file
:return: Dictionary
"""
assert os.path.isfile(fn_tsv.encode('unicode_escape')), "{} does not exist".format(fn_tsv)
with open(fn_tsv.encode('unicode_escape')) as tsv:
fm_dict = collections.OrderedDict()
for row in csv.DictReader(tsv, delimiter="\t"):
for k, v in row.items():
fm_dict.setdefault(k, []).append(v)
if "filename" not in fm_dict:
raise IOError("Column 'filename' missing.")
unique, counts = np.unique(fm_dict["filename"], return_counts=True)
if len(unique) != sum(counts):
raise ValueError("Duplicate filename in list")
# convert relevant columns to int
for h in ['replicate', 'batch', 'injectionOrder', 'multilist']:
if h in fm_dict:
int_l = []
for c, x in enumerate(fm_dict[h]):
i = to_int(x)
assert to_int(i), "Column '{}' values should be integers, see row {}".format(h, c+1)
int_l.append(i)
fm_dict[h] = int_l
if "replicate" in fm_dict.keys():
if 0 in fm_dict["replicate"]:
raise IOError("Incorrect replicate number in list. Row {}".format(list(fm_dict["replicate"]).index(0)))
idxs_replicates = idxs_reps_from_filelist(fm_dict["replicate"])
counts = {}
for idxs in idxs_replicates:
if len(idxs) not in counts:
counts[len(idxs)] = 1
else:
counts[len(idxs)] += 1
for k, v in list(counts.items()):
print("{} sample(s) with {} replicate(s)".format(v, k))
else:
print("Column for replicate numbers missing. Only required for replicate filter.")
if "batch" in fm_dict.keys():
unique_batches, counts = np.unique(fm_dict["batch"], return_counts=True)
print("Batch numbers:", unique_batches)
print("Number of samples in each Batch:", dict(list(zip(unique_batches, counts))))
else:
print("Column for batch number missing. Not required.")
if "injectionOrder" in fm_dict:
assert np.array_equal(fm_dict["injectionOrder"], sorted(
fm_dict["injectionOrder"])), "Check the injectionOrder column - samples not in order"
else:
print("Column for sample injection order missing. Not required.")
if "classLabel" in fm_dict:
if "replicate" in fm_dict:
for i in range(len(idxs_replicates)):
assert len(np.unique(fm_dict["classLabel"][min(idxs_replicates[i]):max(
idxs_replicates[i]) + 1])) == 1, "class names do not match with number of replicates"
unique, counts = np.unique(fm_dict["classLabel"], return_counts=True)
cls = dict(list(zip(unique, counts)))
print("Classes:", cls)
else:
warnings.warn("Column 'classLabel' for class labels missing. Not required.")
if "multilist" not in fm_dict:
print("Column 'multilist' for spliting peaklists is missing. Not required.")
return fm_dict
[docs]def update_metadata_and_labels(peaklists: Sequence[PeakList], fl: Dict):
"""
Update metadata
:param peaklists: List of peaklist Objects
:param fl: Dictionary with meta data
:return: List of peaklist objects
"""
if not isinstance(peaklists[0], PeakList):
raise IOError("PeakList object required")
for k in list(fl.keys()):
for pl in peaklists:
if pl.ID not in fl[list(fl.keys())[0]]:
raise IOError("filelist and peaklist do not match {}".format(pl.ID))
index = fl[list(fl.keys())[0]].index(pl.ID)
pl.metadata[k] = fl[k][index]
# pl.metadata["filelist"] = {k:fl[k][index] for k in fl.keys()}
for tag_name in ["replicate", "replicates", "batch", "injectionOrder", "classLabel"]:
if tag_name in list(fl.keys()):
if pl.tags.has_tag_type(tag_name):
pl.tags.drop_tag_type(tag_name)
pl.tags.add_tag(fl[tag_name][index], tag_name)
return peaklists
[docs]def update_labels(pm: PeakMatrix, fn_tsv: str) -> PeakMatrix:
"""
Update Sample labels PeakMatrix object
:param pm: peakMatrix Object
:param fn_tsv: Path to tab-separated file
:return: peakMatrix Object
"""
assert os.path.isfile(fn_tsv.encode('unicode_escape')), "{} does not exist".format(fn_tsv)
with open(fn_tsv.encode('unicode_escape')) as tsv:
fm_dict = collections.OrderedDict()
for row in csv.DictReader(tsv, delimiter="\t"):
for k, v in row.items():
fm_dict.setdefault(k, []).append(v)
assert "sample_id" == list(fm_dict.keys())[0] or "filename" == list(fm_dict.keys())[
0], "Column for class labels not available"
assert "classLabel" in fm_dict.keys(), "Column for class label (classLabel) not available"
assert (fm_dict[list(fm_dict.keys())[0]] == pm.peaklist_ids).all(), "Sample ids do not match {}".format(
np.setdiff1d(fm_dict[list(fm_dict.keys())[0]], pm.peaklist_ids))
for tag_name in ["replicate", "replicates", "batch", "injectionOrder", "classLabel"]:
if tag_name in fm_dict:
for i in range(len(fm_dict[tag_name])):
if pm.peaklist_tags[i].has_tag_type(tag_name):
pm.peaklist_tags[i].drop_tag_type(tag_name)
pm.peaklist_tags[i].add_tag(fm_dict[tag_name][i], tag_name)
return pm
[docs]def idxs_reps_from_filelist(replicates: list):
"""
:param replicates:
:return:
"""
idxs, temp = [], [0]
replicates = [int(r) for r in replicates]
for i in range(1, len(replicates)):
if (replicates[i - 1] == replicates[i] or replicates[i - 1] > replicates[i]) and replicates[i] == 1:
idxs.append(temp)
temp = [i]
elif replicates[i - 1] < replicates[i] and replicates[i - 1] - replicates[i] == -1:
temp.append(i)
else:
raise ValueError("Incorrect numbering for replicates. Row {}".format(i))
idxs.append(temp)
return idxs