Source code for sctoolbox.tools.dim_reduction

"""Tools for dimensionality reduction with PCA/SVD."""
import numpy as np
import scanpy as sc
import scipy
from scipy.sparse.linalg import svds
from kneed import KneeLocator

import deprecation
from sctoolbox import __version__

from beartype.typing import Optional, Any, Literal, List, Union
from beartype import beartype

import sctoolbox.tools.embedding as scem
import sctoolbox.utils.decorator as deco
from sctoolbox._settings import settings
logger = settings.logger


############################################################################
#                             PCA / SVD                                    #
############################################################################

[docs] @deco.log_anndata @beartype def compute_PCA(anndata: sc.AnnData, use_highly_variable: bool = True, inplace: bool = False, **kwargs: Any) -> Optional[sc.AnnData]: """ Compute a principal component analysis. Parameters ---------- anndata : sc.AnnData Anndata object to add the PCA to. use_highly_variable : bool, default True If true, use highly variable genes to compute PCA. inplace : bool, default False Whether the anndata object is modified inplace. **kwargs : Any Additional parameters forwarded to scanpy.pp.pca(). Returns ------- Optional[sc.AnnData] Returns anndata object with PCA components. Or None if inplace = True. """ adata_m = anndata if inplace else anndata.copy() # Computing PCA logger.info("Computing PCA") sc.pp.pca(adata_m, use_highly_variable=use_highly_variable, **kwargs) # Adding info in anndata.uns["infoprocess"] # cr.build_infor(adata_m, "Scanpy computed PCA", "use_highly_variable= " + str(use_highly_variable), inplace=True) if not inplace: return adata_m
[docs] @beartype def lsi(data: sc.AnnData, scale_embeddings: bool = True, n_comps: int = 50, use_highly_variable: bool = False) -> None: """ Run Latent Semantic Indexing for dimensionality reduction. Values represent the similarity of cells in the original space. doi: 10.3389/fphys.2013.00008 Parameters ---------- data : sc.AnnData AnnData object with peak counts. scale_embeddings : bool, default True Scale embeddings to zero mean and unit variance. n_comps : int, default 50 Number of components to calculate with SVD. use_highly_variable : bool, default True If true, use highly variable genes to compute LSI. Raises ------ ValueError: If highly variable genes are used and not found in adata.var['highly_variable']. Notes ----- Function is taken from muon package. """ adata = data # Subset adata to highly variable genes if use_highly_variable: if "highly_variable" not in adata.var: raise ValueError("Highly variable genes not found in adata.var['highly_variable'].") adata_comp = adata[:, adata.var['highly_variable']] else: adata_comp = adata # In an unlikely scnenario when there are less 50 features, set n_comps to that value n_comps = min(n_comps, adata_comp.X.shape[1]) # logging.info("Performing SVD") cell_embeddings, svalues, peaks_loadings = svds(adata_comp.X, k=n_comps) # Re-order components in the descending order cell_embeddings = cell_embeddings[:, ::-1] svalues = svalues[::-1] peaks_loadings = peaks_loadings[::-1, :] if scale_embeddings: cell_embeddings = (cell_embeddings - cell_embeddings.mean(axis=0)) / cell_embeddings.std( axis=0 ) var_explained = np.round(svalues ** 2 / np.sum(svalues ** 2), decimals=3) stdev = svalues / np.sqrt(adata_comp.X.shape[0] - 1) # Add results to adata adata.obsm["X_lsi"] = cell_embeddings # if highly variable genes are used, only store the loadings for those genes and set the rest to 0 if use_highly_variable: adata.varm["LSI"] = np.zeros(shape=(adata.n_vars, n_comps)) adata.varm["LSI"][adata.var['highly_variable']] = peaks_loadings.T else: adata.varm["LSI"] = peaks_loadings.T # Add variance explained to uns adata.uns["lsi"] = {"stdev": stdev, "variance": svalues, "variance_ratio": var_explained} # Save to PCA to make it compatible with scanpy adata.obsm["X_pca"] = adata.obsm["X_lsi"] adata.varm["PCs"] = adata.varm["LSI"] adata.uns["pca"] = adata.uns["lsi"]
[docs] @beartype def apply_svd(adata: sc.AnnData, layer: Optional[str] = None) -> sc.AnnData: """ Singular value decomposition of anndata object. Parameters ---------- adata : sc.AnnData The anndata object to be decomposed. layer : Optional[str], default None The layer to be decomposed. If None, the layer is set to "X". Returns ------- sc.AnnData The decomposed anndata object containing .obsm, .varm and .uns information. """ if layer is None: mat = adata.X else: mat = adata.layers[layer] # SVD u, s, v = scipy.sparse.linalg.svds(mat, k=30, which="LM") # find largest variance # u/s/v are reversed in scipy.sparse.linalg.svds: s = s[::-1] u = np.fliplr(u) v = np.flipud(v) # Visualize explained variance var_explained = np.round(s**2 / np.sum(s**2), decimals=3) adata.obsm["X_svd"] = u adata.varm["SVs"] = v.T adata.uns["svd"] = {"variance": s, "variance_ratio": var_explained} # Hack to use the scanpy functions on SVD coordinates # adata.obsm["X_pca"] = adata.obsm["X_svd"] # adata.varm["PCs"] = adata.varm["SVs"] # adata.uns["pca"] = adata.uns["svd"] return adata
############################################################################ # Subset number of PCs # ############################################################################
[docs] @deprecation.deprecated(deprecated_in="0.5", removed_in="0.7", current_version=__version__, details="Use the 'sctoolbox.tools.propose_pcs' function instead.") @beartype def define_PC(anndata: sc.AnnData) -> int: """ Define threshold for most variable PCA components. Note: Function expects PCA to be computed beforehand. Parameters ---------- anndata : sc.AnnData Anndata object with PCA to get significant PCs threshold from. Returns ------- int An int representing the number of PCs until elbow, defining PCs with significant variance. Raises ------ ValueError: If PCA is not found in anndata. """ # check if pca exists if "pca" not in anndata.uns or "variance_ratio" not in anndata.uns["pca"]: raise ValueError("PCA not found! Please make sure to compute PCA before running this function.") # prepare values y = anndata.uns["pca"]["variance_ratio"] x = range(1, len(y) + 1) # compute knee kn = KneeLocator(x, y, curve='convex', direction='decreasing') knee = int(kn.knee) # cast from numpy.int64 # Adding info in anndata.uns["infoprocess"] # cr.build_infor(anndata, "PCA_knee_threshold", knee) return knee
[docs] @beartype def propose_pcs(anndata: sc.AnnData, how: List[Literal["variance", "cumulative variance", "correlation"]] = ["variance", "correlation"], var_method: Literal["knee", "percent"] = "percent", perc_thresh: Union[int, float] = 30, corr_thresh: float = 0.3, corr_kwargs: Optional[dict] = {}) -> List[int]: """ Propose a selection of PCs that can be used for further analysis. Note: Function expects PCA to be computed beforehand. Parameters ---------- anndata: sc.AnnData Anndata object with PCA to get PCs from. how: List[Literal["variance", "cumulative variance", "correlation"]], default ["variance", "correlation"] Values to use for PC proposal. Will independently apply filters to all selected methods and return the union of PCs. var_method: Literal["knee", "percent"], default "percent" Either define a threshold based on a knee algorithm or use the percentile. perc_thresh: Union[int, float], default 30 Percentile threshold of the PCs that should be included. Only for var_method="percent" and expects a value from 0-100. corr_thresh: float, default 0.3 Filter PCs with a correlation greater than the given value. corr_kwargs: Optional(dict), default None Parameters forwarded to `sctoolbox.tools.correlation_matrix`. Returns ------- List[int] List of PCs proposed for further use. Raises ------ ValueError: If PCA is not found in anndata. """ # check if pca exists if "pca" not in anndata.uns or "variance_ratio" not in anndata.uns["pca"]: raise ValueError("PCA not found! Please make sure to compute PCA before running this function.") # setup PC names PC_names = range(1, len(anndata.uns["pca"]["variance_ratio"]) + 1) selected_pcs = [] if "variance" in how: variance = anndata.uns["pca"]["variance_ratio"] if var_method == "knee": # compute knee kn = KneeLocator(PC_names, variance, curve='convex', direction='decreasing') knee = int(kn.knee) # cast from numpy.int64 selected_pcs.append(set(pc for pc in PC_names if pc <= knee)) elif var_method == "percent": # compute percentile percentile = np.percentile(a=variance, q=100 - perc_thresh) selected_pcs.append(set(pc for pc, var in zip(PC_names, variance) if var > percentile)) if "cumulative variance" in how: cumulative = np.cumsum(anndata.uns["pca"]["variance_ratio"]) if var_method == "knee": # compute knee kn = KneeLocator(PC_names, cumulative, curve='concave', direction='increasing') knee = int(kn.knee) # cast from numpy.int64 selected_pcs.append(set(pc for pc in PC_names if pc <= knee)) elif var_method == "percent": # compute percentile percentile = np.percentile(a=cumulative, q=perc_thresh) selected_pcs.append(set(pc for pc, cum in zip(PC_names, cumulative) if cum < percentile)) if "correlation" in how: # color by highest absolute correlation corrcoefs, _ = scem.correlation_matrix(adata=anndata, **corr_kwargs) abs_corrcoefs = list(corrcoefs.abs().max(axis=0)) selected_pcs.append(set(pc for pc, cc in zip(PC_names, abs_corrcoefs) if cc < corr_thresh)) # create overlap of selected PCs selected_pcs = list(set.intersection(*selected_pcs)) return selected_pcs
[docs] @deco.log_anndata @beartype def subset_PCA(adata: sc.AnnData, n_pcs: Optional[int] = None, start: int = 0, select: Optional[List[int]] = None, inplace: bool = True) -> Optional[sc.AnnData]: """ Subset the PCA coordinates in adata.obsm["X_pca"] to the given number of pcs. Additionally, subset the PCs in adata.varm["PCs"] and the variance ratio in adata.uns["pca"]["variance_ratio"]. Parameters ---------- adata : sc.AnnData Anndata object containing the PCA coordinates. n_pcs : Optional[int], default None Number of PCs to keep. start : int, default 0 Index (0-based) of the first PC to keep. E.g. if start = 1 and n_pcs = 10, you will exclude the first PC to keep 9 PCs. select : Optional[List[int]], default None Provide a list of PC numbers to keep. E.g. [2, 3, 5] will select the second, third and fifth PC. Will overwrite the n_pcs and start parameter. inplace : bool, default True Whether to work inplace on the anndata object. Returns ------- Optional[sc.AnnData] Anndata object with the subsetted PCA coordinates. Or None if inplace = True. """ if inplace is False: adata = adata.copy() if select: # adjust selection to be 0-based select = [i - 1 for i in select] adata.obsm["X_pca"] = adata.obsm["X_pca"][:, select] adata.varm["PCs"] = adata.varm["PCs"][:, select] if "variance_ratio" in adata.uns.get("pca", {}): adata.uns["pca"]["variance_ratio"] = adata.uns["pca"]["variance_ratio"][select] else: adata.obsm["X_pca"] = adata.obsm["X_pca"][:, start:n_pcs] adata.varm["PCs"] = adata.varm["PCs"][:, start:n_pcs] if "variance_ratio" in adata.uns.get("pca", {}): adata.uns["pca"]["variance_ratio"] = adata.uns["pca"]["variance_ratio"][start:n_pcs] if inplace is False: return adata