Source code for sctoolbox.utils.assemblers

"""Module to assemble anndata objects."""

import scanpy as sc
import pandas as pd
import os
import glob
from scipy import sparse
from scipy.io import mmread

from beartype.typing import Optional, Union, Literal, Any
from beartype import beartype

import sctoolbox.utils as utils
from sctoolbox._settings import settings
logger = settings.logger


#####################################################################
#                   Assemble from multiple h5ad files               #
#####################################################################

[docs] @beartype def prepare_atac_anndata(adata: sc.AnnData, set_index: bool = True, index_from: Optional[str] = None, coordinate_cols: Optional[list[str]] = None, h5ad_path: Optional[str] = None) -> sc.AnnData: """ Prepare AnnData object of ATAC-seq data to be in the correct format for the subsequent pipeline. This includes formatting the index, formatting the coordinate columns, and setting the barcode as the index. Parameters ---------- adata : sc.AnnData The AnnData object to be prepared. set_index : bool, default True If True, index will be formatted and can be set by a given column. index_from : Optional[str], default None Column to build the index from. coordinate_cols : Optional[list[str]], default None Location information of the peaks. h5ad_path : Optional[str], default None Path to the h5ad file. Returns ------- sc.AnnData The prepared AnnData object. """ if set_index: logger.info("formatting index") utils.var_index_from(adata, index_from) # Establish columns for coordinates if coordinate_cols is None: coordinate_cols = adata.var.columns[:3] # first three columns are coordinates else: utils.check_columns(adata.var, coordinate_cols, name="adata.var") # Check that coordinate_cols are in adata.var) # Format coordinate columns logger.info("formatting coordinate columns") utils.format_adata_var(adata, coordinate_cols, coordinate_cols) # check if the barcode is the index otherwise set it utils.barcode_index(adata) if h5ad_path is not None: adata.obs = adata.obs.assign(file=h5ad_path) return adata
##################################################################### # ASSEMBLING ANNDATA FROM STARSOLO OUTPUT FOLDERS # #####################################################################
[docs] @beartype def from_single_starsolo(path: str, dtype: Literal['filtered', 'raw'] = "filtered", header: Union[int, list[int], Literal['infer'], None] = 'infer') -> sc.AnnData: """ Assembles an anndata object from the starsolo folder. Parameters ---------- path : str Path to the "solo" folder from starsolo. dtype : Literal['filtered', 'raw'], default "filtered" The type of solo data to choose. header : Union[int, list[int], Literal['infer'], None], default "infer" Set header parameter for reading metadata tables using pandas.read_csv. Returns ------- sc.AnnData An anndata object based on the provided starsolo folder. Raises ------ FileNotFoundError If path does not exist or files are missing. """ # Establish which directory to look for data in genedir = os.path.join(path, "Gene", dtype) velodir = os.path.join(path, "Velocyto", dtype) for path in [genedir, velodir]: if not os.path.exists(path): raise FileNotFoundError(f"The path to the data does not exist: {path}") # File paths for different elements matrix_f = os.path.join(genedir, 'matrix.mtx') barcodes_f = os.path.join(genedir, "barcodes.tsv") genes_f = os.path.join(genedir, "genes.tsv") spliced_f = os.path.join(velodir, 'spliced.mtx') unspliced_f = os.path.join(velodir, 'unspliced.mtx') ambiguous_f = os.path.join(velodir, 'ambiguous.mtx') # Check whether files are present for f in [matrix_f, barcodes_f, genes_f, spliced_f, unspliced_f, ambiguous_f]: if not os.path.exists(f): raise FileNotFoundError(f"File '{f}' was not found. Please check that path contains the full output of starsolo.") # Setup main adata object from matrix/barcodes/genes logger.info("Setting up adata from solo files") adata = from_single_mtx(matrix_f, barcodes_f, genes_f, header=header) adata.var.columns = ["gene", "type"] # specific to the starsolo format # Add in velocity information logger.info("Adding velocity information from spliced/unspliced/ambiguous") spliced = sparse.csr_matrix(mmread(spliced_f).transpose()) unspliced = sparse.csr_matrix(mmread(unspliced_f).transpose()) ambiguous = sparse.csr_matrix(mmread(ambiguous_f).transpose()) adata.layers["spliced"] = spliced adata.layers["unspliced"] = unspliced adata.layers["ambiguous"] = ambiguous return adata
[docs] @beartype def from_quant(path: str, configuration: list = [], use_samples: Optional[list] = None, dtype: Literal["raw", "filtered"] = "filtered") -> sc.AnnData: """ Assemble an adata object from data in the 'quant' folder of the snakemake pipeline. Parameters ---------- path : str The directory where the quant folder from snakemake preprocessing is located. configuration : list Configurations to setup the samples for anndata assembling. It must containg the sample, the word used in snakemake to assign the condition, and the condition, e.g., sample1:condition:room_air use_samples : Optional[list], default None List of samples to use. If None, all samples will be used. dtype : Literal["raw", "filtered"], default 'filtered' The type of Solo data to choose. Returns ------- sc.AnnData The assembled anndata object. Raises ------ ValueError If `use_samples` contains not existing names. """ # TODO: test that quant folder is existing # Collect configuration into a dictionary config_dict = {} if configuration is not None: for string in configuration: sample, condition, condition_name = string.split(":") if sample not in config_dict: config_dict[sample] = {} config_dict[sample][condition] = condition_name # Establishing which samples to use for assembly sample_dirs = glob.glob(os.path.join(path, "*")) sample_names = [os.path.basename(x) for x in sample_dirs] logger.info(f"Found samples: {sample_names}") # Subset to use_samples if they are provided if use_samples is not None: idx = [i for i, sample in enumerate(sample_names) if sample in use_samples] sample_names = [sample_names[i] for i in idx] sample_dirs = [sample_dirs[i] for i in idx] logger.info(f"Using samples: {sample_names}") if len(sample_names) == 0: raise ValueError("None of the given 'use_samples' match the samples found in the directory.") # Assembling from different samples: adata_list = [] for sample_name, sample_dir in zip(sample_names, sample_dirs): logger.info(f"Assembling sample '{sample_name}'") solo_dir = os.path.join(sample_dir, "solo") adata = from_single_starsolo(solo_dir, dtype=dtype) # Make barcode index unique adata.obs.index = adata.obs.index + "-" + sample_name adata.obs["sample"] = sample_name # Add additional information from configuration if sample_name in config_dict: for key in config_dict[sample_name]: adata.obs[key] = config_dict[sample_name][key] adata.obs[key] = adata.obs[key].astype("category") adata_list.append(adata) # Concatenating the adata objects logger.info("Concatenating anndata objects") adata = adata_list[0].concatenate(adata_list[1:], join="outer") # Add information to uns utils.add_uns_info(adata, ["sctoolbox", "source"], os.path.abspath(path)) return adata
##################################################################### # CONVERTING FROM MTX+TSV/CSV TO ANNDATA OBJECT # #####################################################################
[docs] @beartype def from_single_mtx(mtx: str, barcodes: str, genes: str, transpose: bool = True, header: Union[int, list[int], Literal['infer'], None] = 'infer', barcode_index: int = 0, genes_index: int = 0, delimiter: str = "\t", **kwargs: Any) -> sc.AnnData: r""" Build an adata object from single mtx and two tsv/csv files. Parameters ---------- mtx : str Path to the mtx file (.mtx) barcodes : str Path to cell label file (.obs) genes : str Path to gene label file (.var) transpose : bool, default True Set True to transpose mtx matrix. header : Union[int, list[int], Literal['infer'], None], default 'infer' Set header parameter for reading metadata tables using pandas.read_csv. barcode_index : int, default 0 Column which contains the cell barcodes. genes_index : int, default 0 Column which contains the gene IDs. delimiter : str, default '\t' delimiter of genes and barcodes table. **kwargs : Any Contains additional arguments for scanpy.read_mtx method Returns ------- sc.AnnData Anndata object containing the mtx matrix, gene and cell labels Raises ------ ValueError If barcode or gene files contain duplicates. """ # Read mtx file adata = sc.read_mtx(filename=mtx, dtype='float32', **kwargs) # Transpose matrix if necessary if transpose: adata = adata.transpose() # Read in gene and cell annotation barcode_csv = pd.read_csv(barcodes, header=header, index_col=barcode_index, delimiter=delimiter) barcode_csv.index.names = ['index'] barcode_csv.columns = [str(c) for c in barcode_csv.columns] # convert to string genes_csv = pd.read_csv(genes, header=header, index_col=genes_index, delimiter=delimiter) genes_csv.index.names = ['index'] genes_csv.columns = [str(c) for c in genes_csv.columns] # convert to string # Test if they are unique if not barcode_csv.index.is_unique: raise ValueError("Barcode index column does not contain unique values") if not genes_csv.index.is_unique: raise ValueError("Genes index column does not contain unique values") # Add tables to anndata object adata.obs = barcode_csv adata.var = genes_csv # Add filename to .obs adata.obs["filename"] = os.path.basename(mtx) adata.obs["filename"] = adata.obs["filename"].astype("category") return adata
[docs] @beartype def from_mtx(path: str, mtx: str = "*_matrix.mtx*", barcodes: str = "*_barcodes.tsv*", genes: str = "*_genes.tsv*", **kwargs: Any) -> sc.AnnData: """ Build an adata object from list of mtx, barcodes and genes files. Parameters ---------- path : str Path to data files mtx : str, default '*_matrix.mtx*' String for glob to find matrix files. barcodes : str, default '*_barcodes.tsv*' String for glob to find barcode files. genes : str, default '*_genes.tsv*' String for glob to find gene label files. **kwargs : Any Contains additional arguments for scanpy.read_mtx method Returns ------- sc.AnnData Merged anndata object containing the mtx matrix, gene and cell labels Raises ------ ValueError If files are not found. """ mtx = glob.glob(os.path.join(path, mtx)) barcodes = glob.glob(os.path.join(path, barcodes)) genes = glob.glob(os.path.join(path, genes)) # Check if lists are same length # https://stackoverflow.com/questions/35791051/better-way-to-check-if-all-lists-in-a-list-are-the-same-length it = iter([mtx, barcodes, genes]) the_len = len(next(it)) if not all(len(list_len) == the_len for list_len in it): raise ValueError("Found different quantitys of mtx, genes, barcode files.\n Please check given suffixes or filenames") if not mtx: raise ValueError('No files were found with the given directory and suffixes') adata_objects = [] for i, m in enumerate(mtx): print(f"Reading files: {i+1} of {len(mtx)} ") adata_objects.append(from_single_mtx(m, barcodes[i], genes[i], **kwargs)) if len(adata_objects) > 1: adata = adata_objects[0].concatenate(*adata_objects[1:], join="outer") else: adata = adata_objects[0] return adata
[docs] @beartype def convertToAdata(file: str, output: Optional[str] = None, r_home: Optional[str] = None, layer: Optional[str] = None) -> Optional[sc.AnnData]: """ Convert .rds files containing Seurat or SingleCellExperiment to scanpy anndata. In order to work an R installation with Seurat & SingleCellExperiment is required. Parameters ---------- file : str Path to the .rds or .robj file. output : Optional[str], default None Path to output .h5ad file. Won't save if None. r_home : Optional[str], default None Path to the R home directory. If None will construct path based on location of python executable. E.g for ".conda/scanpy/bin/python" will look at ".conda/scanpy/lib/R" layer : Optional[str], default None Provide name of layer to be stored in anndata. By default the main layer is stored. In case of multiome data multiple layers are present e.g. RNA and ATAC. But anndata can only store a single layer. Returns ------- Optional[sc.AnnData] Returns converted anndata object if output is None. """ # Setup R utils.setup_R(r_home) # Initialize R <-> python interface utils.check_module("anndata2ri") import anndata2ri utils.check_module("rpy2") from rpy2.robjects import r, default_converter, conversion, globalenv anndata2ri.activate() # create rpy2 None to NULL converter # https://stackoverflow.com/questions/65783033/how-to-convert-none-to-r-null none_converter = conversion.Converter("None converter") none_converter.py2rpy.register(type(None), utils._none2null) # check if Seurat and SingleCellExperiment are installed r(""" if (!suppressPackageStartupMessages(require(Seurat))) { stop("R dependency Seurat not found.") } if (!suppressPackageStartupMessages(require(SingleCellExperiment))) { stop("R dependecy SingleCellExperiment not found.") } """) # add variables into R with conversion.localconverter(default_converter + none_converter): globalenv["file"] = file globalenv["layer"] = layer # ----- convert to anndata ----- # r(""" # ----- load object ----- # # try loading .robj object <- try({ # load file; returns vector of created variables new_vars <- load(file) # store new variable into another variable to work on get(new_vars[1]) }, silent = TRUE) # if .robj failed try .rds if (class(object) == "try-error") { # load object object <- try(readRDS(file), silent = TRUE) } # if both .robj and .rds failed throw error if (class(object) == "try-error") { stop("Unknown file. Expected '.robj' or '.rds' got", file) } # ----- convert to SingleCellExperiment ----- # # can only convert Seurat -> SingleCellExperiment -> anndata if (class(object) == "Seurat") { object <- as.SingleCellExperiment(object) } else if (class(object) == "SingleCellExperiment") { object <- object } else { stop("Unknown object! Expected class 'Seurat' or 'SingleCellExperiment' got ", class(object)) } # ----- change layer ----- # # adata can only store a single layer if (!is.null(layer)) { layers <- c(mainExpName(object), altExpNames(object)) # check if layer is valid if (!layer %in% layers) { stop("Invalid layer! Expected one of ", paste(layers, collapse = ", "), " got ", layer) } # select layer if (layer != mainExpName(object)) { object <- swapAltExp(object, layer, saved = mainExpName(object), withColData = TRUE) } } """) # pull SingleCellExperiment into python # this also converts to anndata adata = globalenv["object"] # fixes https://gitlab.gwdg.de/loosolab/software/sc_framework/-/issues/205 adata.obs.index = adata.obs.index.astype('object') adata.var.index = adata.var.index.astype('object') # Add information to uns utils.add_uns_info(adata, ["sctoolbox", "source"], os.path.abspath(file)) if output: # Saving adata.h5ad adata.write(filename=output, compression='gzip') else: return adata