"""Tools for dimensionality reduction with PCA/SVD."""
import numpy as np
import scanpy as sc
import scipy
from scipy.sparse.linalg import svds
from kneed import KneeLocator
import deprecation
from sctoolbox import __version__
from beartype.typing import Optional, Any, Literal, List, Union
from beartype import beartype
import sctoolbox.tools.embedding as scem
import sctoolbox.utils.decorator as deco
from sctoolbox._settings import settings
logger = settings.logger
############################################################################
# PCA / SVD #
############################################################################
[docs]
@deco.log_anndata
@beartype
def compute_PCA(anndata: sc.AnnData,
use_highly_variable: bool = True,
inplace: bool = False,
**kwargs: Any) -> Optional[sc.AnnData]:
"""
Compute a principal component analysis.
Parameters
----------
anndata : sc.AnnData
Anndata object to add the PCA to.
use_highly_variable : bool, default True
If true, use highly variable genes to compute PCA.
inplace : bool, default False
Whether the anndata object is modified inplace.
**kwargs : Any
Additional parameters forwarded to scanpy.pp.pca().
Returns
-------
Optional[sc.AnnData]
Returns anndata object with PCA components. Or None if inplace = True.
"""
adata_m = anndata if inplace else anndata.copy()
# Computing PCA
logger.info("Computing PCA")
sc.pp.pca(adata_m, use_highly_variable=use_highly_variable, **kwargs)
# Adding info in anndata.uns["infoprocess"]
# cr.build_infor(adata_m, "Scanpy computed PCA", "use_highly_variable= " + str(use_highly_variable), inplace=True)
if not inplace:
return adata_m
[docs]
@beartype
def lsi(data: sc.AnnData,
scale_embeddings: bool = True,
n_comps: int = 50,
use_highly_variable: bool = False) -> None:
"""
Run Latent Semantic Indexing for dimensionality reduction.
Values represent the similarity of cells in the original space.
doi: 10.3389/fphys.2013.00008
Parameters
----------
data : sc.AnnData
AnnData object with peak counts.
scale_embeddings : bool, default True
Scale embeddings to zero mean and unit variance.
n_comps : int, default 50
Number of components to calculate with SVD.
use_highly_variable : bool, default True
If true, use highly variable genes to compute LSI.
Raises
------
ValueError:
If highly variable genes are used and not found in adata.var['highly_variable'].
Notes
-----
Function is taken from muon package.
"""
adata = data
# Subset adata to highly variable genes
if use_highly_variable:
if "highly_variable" not in adata.var:
raise ValueError("Highly variable genes not found in adata.var['highly_variable'].")
adata_comp = adata[:, adata.var['highly_variable']]
else:
adata_comp = adata
# In an unlikely scnenario when there are less 50 features, set n_comps to that value
n_comps = min(n_comps, adata_comp.X.shape[1])
# logging.info("Performing SVD")
cell_embeddings, svalues, peaks_loadings = svds(adata_comp.X, k=n_comps)
# Re-order components in the descending order
cell_embeddings = cell_embeddings[:, ::-1]
svalues = svalues[::-1]
peaks_loadings = peaks_loadings[::-1, :]
if scale_embeddings:
cell_embeddings = (cell_embeddings - cell_embeddings.mean(axis=0)) / cell_embeddings.std(
axis=0
)
var_explained = np.round(svalues ** 2 / np.sum(svalues ** 2), decimals=3)
stdev = svalues / np.sqrt(adata_comp.X.shape[0] - 1)
# Add results to adata
adata.obsm["X_lsi"] = cell_embeddings
# if highly variable genes are used, only store the loadings for those genes and set the rest to 0
if use_highly_variable:
adata.varm["LSI"] = np.zeros(shape=(adata.n_vars, n_comps))
adata.varm["LSI"][adata.var['highly_variable']] = peaks_loadings.T
else:
adata.varm["LSI"] = peaks_loadings.T
# Add variance explained to uns
adata.uns["lsi"] = {"stdev": stdev,
"variance": svalues,
"variance_ratio": var_explained}
# Save to PCA to make it compatible with scanpy
adata.obsm["X_pca"] = adata.obsm["X_lsi"]
adata.varm["PCs"] = adata.varm["LSI"]
adata.uns["pca"] = adata.uns["lsi"]
[docs]
@beartype
def apply_svd(adata: sc.AnnData,
layer: Optional[str] = None) -> sc.AnnData:
"""
Singular value decomposition of anndata object.
Parameters
----------
adata : sc.AnnData
The anndata object to be decomposed.
layer : Optional[str], default None
The layer to be decomposed. If None, the layer is set to "X".
Returns
-------
sc.AnnData
The decomposed anndata object containing .obsm, .varm and .uns information.
"""
if layer is None:
mat = adata.X
else:
mat = adata.layers[layer]
# SVD
u, s, v = scipy.sparse.linalg.svds(mat, k=30, which="LM") # find largest variance
# u/s/v are reversed in scipy.sparse.linalg.svds:
s = s[::-1]
u = np.fliplr(u)
v = np.flipud(v)
# Visualize explained variance
var_explained = np.round(s**2 / np.sum(s**2), decimals=3)
adata.obsm["X_svd"] = u
adata.varm["SVs"] = v.T
adata.uns["svd"] = {"variance": s,
"variance_ratio": var_explained}
# Hack to use the scanpy functions on SVD coordinates
# adata.obsm["X_pca"] = adata.obsm["X_svd"]
# adata.varm["PCs"] = adata.varm["SVs"]
# adata.uns["pca"] = adata.uns["svd"]
return adata
############################################################################
# Subset number of PCs #
############################################################################
[docs]
@deprecation.deprecated(deprecated_in="0.5", removed_in="0.7",
current_version=__version__,
details="Use the 'sctoolbox.tools.propose_pcs' function instead.")
@beartype
def define_PC(anndata: sc.AnnData) -> int:
"""
Define threshold for most variable PCA components.
Note: Function expects PCA to be computed beforehand.
Parameters
----------
anndata : sc.AnnData
Anndata object with PCA to get significant PCs threshold from.
Returns
-------
int
An int representing the number of PCs until elbow, defining PCs with significant variance.
Raises
------
ValueError:
If PCA is not found in anndata.
"""
# check if pca exists
if "pca" not in anndata.uns or "variance_ratio" not in anndata.uns["pca"]:
raise ValueError("PCA not found! Please make sure to compute PCA before running this function.")
# prepare values
y = anndata.uns["pca"]["variance_ratio"]
x = range(1, len(y) + 1)
# compute knee
kn = KneeLocator(x, y, curve='convex', direction='decreasing')
knee = int(kn.knee) # cast from numpy.int64
# Adding info in anndata.uns["infoprocess"]
# cr.build_infor(anndata, "PCA_knee_threshold", knee)
return knee
[docs]
@beartype
def propose_pcs(anndata: sc.AnnData,
how: List[Literal["variance", "cumulative variance", "correlation"]] = ["variance", "correlation"],
var_method: Literal["knee", "percent"] = "percent",
perc_thresh: Union[int, float] = 30,
corr_thresh: float = 0.3,
corr_kwargs: Optional[dict] = {}) -> List[int]:
"""
Propose a selection of PCs that can be used for further analysis.
Note: Function expects PCA to be computed beforehand.
Parameters
----------
anndata: sc.AnnData
Anndata object with PCA to get PCs from.
how: List[Literal["variance", "cumulative variance", "correlation"]], default ["variance", "correlation"]
Values to use for PC proposal. Will independently apply filters to all selected methods and return the union of PCs.
var_method: Literal["knee", "percent"], default "percent"
Either define a threshold based on a knee algorithm or use the percentile.
perc_thresh: Union[int, float], default 30
Percentile threshold of the PCs that should be included. Only for var_method="percent" and expects a value from 0-100.
corr_thresh: float, default 0.3
Filter PCs with a correlation greater than the given value.
corr_kwargs: Optional(dict), default None
Parameters forwarded to `sctoolbox.tools.correlation_matrix`.
Returns
-------
List[int]
List of PCs proposed for further use.
Raises
------
ValueError:
If PCA is not found in anndata.
"""
# check if pca exists
if "pca" not in anndata.uns or "variance_ratio" not in anndata.uns["pca"]:
raise ValueError("PCA not found! Please make sure to compute PCA before running this function.")
# setup PC names
PC_names = range(1, len(anndata.uns["pca"]["variance_ratio"]) + 1)
selected_pcs = []
if "variance" in how:
variance = anndata.uns["pca"]["variance_ratio"]
if var_method == "knee":
# compute knee
kn = KneeLocator(PC_names, variance, curve='convex', direction='decreasing')
knee = int(kn.knee) # cast from numpy.int64
selected_pcs.append(set(pc for pc in PC_names if pc <= knee))
elif var_method == "percent":
# compute percentile
percentile = np.percentile(a=variance, q=100 - perc_thresh)
selected_pcs.append(set(pc for pc, var in zip(PC_names, variance) if var > percentile))
if "cumulative variance" in how:
cumulative = np.cumsum(anndata.uns["pca"]["variance_ratio"])
if var_method == "knee":
# compute knee
kn = KneeLocator(PC_names, cumulative, curve='concave', direction='increasing')
knee = int(kn.knee) # cast from numpy.int64
selected_pcs.append(set(pc for pc in PC_names if pc <= knee))
elif var_method == "percent":
# compute percentile
percentile = np.percentile(a=cumulative, q=perc_thresh)
selected_pcs.append(set(pc for pc, cum in zip(PC_names, cumulative) if cum < percentile))
if "correlation" in how:
# color by highest absolute correlation
corrcoefs, _ = scem.correlation_matrix(adata=anndata, **corr_kwargs)
abs_corrcoefs = list(corrcoefs.abs().max(axis=0))
selected_pcs.append(set(pc for pc, cc in zip(PC_names, abs_corrcoefs) if cc < corr_thresh))
# create overlap of selected PCs
selected_pcs = list(set.intersection(*selected_pcs))
return selected_pcs
[docs]
@deco.log_anndata
@beartype
def subset_PCA(adata: sc.AnnData,
n_pcs: Optional[int] = None,
start: int = 0,
select: Optional[List[int]] = None,
inplace: bool = True) -> Optional[sc.AnnData]:
"""
Subset the PCA coordinates in adata.obsm["X_pca"] to the given number of pcs.
Additionally, subset the PCs in adata.varm["PCs"] and the variance ratio in adata.uns["pca"]["variance_ratio"].
Parameters
----------
adata : sc.AnnData
Anndata object containing the PCA coordinates.
n_pcs : Optional[int], default None
Number of PCs to keep.
start : int, default 0
Index (0-based) of the first PC to keep. E.g. if start = 1 and n_pcs = 10, you will exclude the first PC to keep 9 PCs.
select : Optional[List[int]], default None
Provide a list of PC numbers to keep. E.g. [2, 3, 5] will select the second, third and fifth PC. Will overwrite the n_pcs and start parameter.
inplace : bool, default True
Whether to work inplace on the anndata object.
Returns
-------
Optional[sc.AnnData]
Anndata object with the subsetted PCA coordinates. Or None if inplace = True.
"""
if inplace is False:
adata = adata.copy()
if select:
# adjust selection to be 0-based
select = [i - 1 for i in select]
adata.obsm["X_pca"] = adata.obsm["X_pca"][:, select]
adata.varm["PCs"] = adata.varm["PCs"][:, select]
if "variance_ratio" in adata.uns.get("pca", {}):
adata.uns["pca"]["variance_ratio"] = adata.uns["pca"]["variance_ratio"][select]
else:
adata.obsm["X_pca"] = adata.obsm["X_pca"][:, start:n_pcs]
adata.varm["PCs"] = adata.varm["PCs"][:, start:n_pcs]
if "variance_ratio" in adata.uns.get("pca", {}):
adata.uns["pca"]["variance_ratio"] = adata.uns["pca"]["variance_ratio"][start:n_pcs]
if inplace is False:
return adata