Source code for pertpy.tools._perturbation_space._simple

from __future__ import annotations

import decoupler as dc
import numpy as np
from anndata import AnnData
from sklearn.cluster import DBSCAN, KMeans

from pertpy.tools._perturbation_space._clustering import ClusteringSpace
from pertpy.tools._perturbation_space._perturbation_space import PerturbationSpace


[docs] class CentroidSpace(PerturbationSpace): """Computes the centroids per perturbation of a pre-computed embedding."""
[docs] def compute( self, adata: AnnData, target_col: str = "perturbation", layer_key: str = None, embedding_key: str = "X_umap", keep_obs: bool = True, ) -> AnnData: # type: ignore """Computes the centroids of a pre-computed embedding such as UMAP. Args: adata: Anndata object of size cells x genes target_col: .obs column that stores the label of the perturbation applied to each cell. layer_key: If specified pseudobulk computation is done by using the specified layer. Otherwise, computation is done with .X embedding_key: `obsm` key of the AnnData embedding to use for computation. Defaults to the 'X' matrix otherwise. keep_obs: Whether .obs columns in the input AnnData should be kept in the output pseudobulk AnnData. Only .obs columns with the same value for each cell of one perturbation are kept. Defaults to True. Returns: AnnData object with one observation per perturbation, storing the embedding data of the centroid of the respective perturbation. Examples: Compute the centroids of a UMAP embedding of the papalexi_2021 dataset: >>> import pertpy as pt >>> import scanpy as sc >>> mdata = pt.dt.papalexi_2021() >>> sc.pp.pca(mdata["rna"]) >>> sc.pp.neighbors(mdata["rna"]) >>> sc.tl.umap(mdata["rna"]) >>> cs = pt.tl.CentroidSpace() >>> cs_adata = cs.compute(mdata["rna"], target_col="gene_target") """ X = None if layer_key is not None and embedding_key is not None: raise ValueError("Please, select just either layer or embedding for computation.") if embedding_key is not None: if embedding_key not in adata.obsm_keys(): raise ValueError(f"Embedding {embedding_key!r} does not exist in the .obsm attribute.") else: X = np.empty((len(adata.obs[target_col].unique()), adata.obsm[embedding_key].shape[1])) if layer_key is not None: if layer_key not in adata.layers.keys(): raise ValueError(f"Layer {layer_key!r} does not exist in the .layers attribute.") else: X = np.empty((len(adata.obs[target_col].unique()), adata.layers[layer_key].shape[1])) if target_col not in adata.obs: raise ValueError(f"Obs {target_col!r} does not exist in the .obs attribute.") grouped = adata.obs.groupby(target_col) if X is None: X = np.empty((len(adata.obs[target_col].unique()), adata.obsm[embedding_key].shape[1])) index = [] pert_index = 0 for group_name, group_data in grouped: indices = group_data.index if layer_key is not None: points = adata[indices].layers[layer_key] elif embedding_key is not None: points = adata[indices].obsm[embedding_key] else: points = adata[indices].X index.append(group_name) centroid = np.mean(points, axis=0) # find centroid of cloud of points closest_point = min( points, key=lambda point: np.linalg.norm(point - centroid) ) # Find the point in the array closest to the centroid X[pert_index, :] = closest_point pert_index += 1 ps_adata = AnnData(X=X) ps_adata.obs_names = index ps_adata.obs[target_col] = index if embedding_key is not None: ps_adata.obsm[embedding_key] = X if keep_obs: # Save the values of the obs columns of interest in the ps_adata object obs_df = adata.obs obs_df = obs_df.groupby(target_col).agg( lambda pert_group: np.nan if len(set(pert_group)) != 1 else list(set(pert_group))[0] ) for obs_name in obs_df.columns: if not obs_df[obs_name].isnull().values.any(): mapping = {pert: obs_df.loc[pert][obs_name] for pert in index} ps_adata.obs[obs_name] = ps_adata.obs[target_col].map(mapping) ps_adata.obs[target_col] = ps_adata.obs[target_col].astype("category") return ps_adata
[docs] class PseudobulkSpace(PerturbationSpace): """Determines pseudobulks using decoupler."""
[docs] def compute( self, adata: AnnData, target_col: str = "perturbation", groups_col: str = None, layer_key: str = None, embedding_key: str = None, **kwargs, ) -> AnnData: # type: ignore """Determines pseudobulks of an AnnData object. It uses Decoupler implementation. Args: adata: Anndata object of size cells x genes target_col: .obs column that stores the label of the perturbation applied to each cell. groups_col: Optional .obs column that stores a grouping label to consider for pseudobulk computation. The summarized expression per perturbation (target_col) and group (groups_col) is computed. Defaults to None. layer_key: If specified pseudobulk computation is done by using the specified layer. Otherwise, computation is done with .X embedding_key: `obsm` key of the AnnData embedding to use for computation. Defaults to the 'X' matrix otherwise. **kwargs: Are passed to decoupler's get_pseuobulk. Returns: AnnData object with one observation per perturbation. Examples: >>> import pertpy as pt >>> mdata = pt.dt.papalexi_2021() >>> ps = pt.tl.PseudobulkSpace() >>> ps_adata = ps.compute(mdata["rna"], target_col="gene_target") """ if layer_key is not None and embedding_key is not None: raise ValueError("Please, select just either layer or embedding for computation.") if layer_key is not None and layer_key not in adata.layers.keys(): raise ValueError(f"Layer {layer_key!r} does not exist in the .layers attribute.") if target_col not in adata.obs: raise ValueError(f"Obs {target_col!r} does not exist in the .obs attribute.") if embedding_key is not None: if embedding_key not in adata.obsm_keys(): raise ValueError(f"Embedding {embedding_key!r} does not exist in the .obsm attribute.") else: adata_emb = AnnData(X=adata.obsm[embedding_key]) adata_emb.obs_names = adata.obs_names adata_emb.obs = adata.obs adata = adata_emb adata.obs[target_col] = adata.obs[target_col].astype("category") ps_adata = dc.get_pseudobulk(adata, sample_col=target_col, layer=layer_key, groups_col=groups_col, **kwargs) # type: ignore ps_adata.obs[target_col] = ps_adata.obs[target_col].astype("category") return ps_adata
[docs] class KMeansSpace(ClusteringSpace): """Computes K-Means clustering of the expression values."""
[docs] def compute( # type: ignore self, adata: AnnData, layer_key: str = None, embedding_key: str = None, cluster_key: str = "k-means", copy: bool = False, return_object: bool = False, **kwargs, ) -> tuple[AnnData, object] | AnnData: """Computes K-Means clustering of the expression values. Args: adata: Anndata object of size cells x genes layer_key: if specified and exists in the adata, the clustering is done by using it. Otherwise, clustering is done with .X embedding_key: if specified and exists in the adata, the clustering is done with that embedding. Otherwise, clustering is done with .X cluster_key: name of the .obs column to store the cluster labels. Default 'k-means' copy: if True returns a new Anndata of same size with the new column; otherwise it updates the initial adata return_object: if True returns the clustering object **kwargs: Are passed to sklearn's KMeans. Returns: If return_object is True, the adata and the clustering object is returned. Otherwise, only the adata is returned. The adata is updated with a new .obs column as specified in cluster_key, that stores the cluster labels. Examples: >>> import pertpy as pt >>> mdata = pt.dt.papalexi_2021() >>> kmeans = pt.tl.KMeansSpace() >>> kmeans_adata = kmeans.compute(mdata["rna"], n_clusters=26) """ if copy: adata = adata.copy() if layer_key is not None and embedding_key is not None: raise ValueError("Please, select just either layer or embedding for computation.") if embedding_key is not None: if embedding_key not in adata.obsm_keys(): raise ValueError(f"Embedding {embedding_key!r} does not exist in the .obsm attribute.") else: self.X = adata.obsm[embedding_key] elif layer_key is not None: if layer_key not in adata.layers.keys(): raise ValueError(f"Layer {layer_key!r} does not exist in the anndata.") else: self.X = adata.layers[layer_key] else: self.X = adata.X clustering = KMeans(**kwargs).fit(self.X) adata.obs[cluster_key] = clustering.labels_ adata.obs[cluster_key] = adata.obs[cluster_key].astype("category") if return_object: return adata, clustering return adata
[docs] class DBSCANSpace(ClusteringSpace): """Cluster the given data using DBSCAN"""
[docs] def compute( # type: ignore self, adata: AnnData, layer_key: str = None, embedding_key: str = None, cluster_key: str = "dbscan", copy: bool = True, return_object: bool = False, **kwargs, ) -> tuple[AnnData, object] | AnnData: """Computes a clustering using Density-based spatial clustering of applications (DBSCAN). Args: adata: Anndata object of size cells x genes layer_key: If specified and exists in the adata, the clustering is done by using it. Otherwise, clustering is done with .X embedding_key: if specified and exists in the adata, the clustering is done with that embedding. Otherwise, clustering is done with .X cluster_key: name of the .obs column to store the cluster labels. Defaults to 'dbscan' copy: if True returns a new Anndata of same size with the new column; otherwise it updates the initial adata return_object: if True returns the clustering object **kwargs: Are passed to sklearn's DBSCAN. Returns: If return_object is True, the adata and the clustering object is returned. Otherwise, only the adata is returned. The adata is updated with a new .obs column as specified in cluster_key, that stores the cluster labels. Examples: >>> import pertpy as pt >>> mdata = pt.dt.papalexi_2021() >>> dbscan = pt.tl.DBSCANSpace() >>> dbscan_adata = dbscan.compute(mdata["rna"]) """ if copy: adata = adata.copy() if embedding_key is not None: if embedding_key not in adata.obsm_keys(): raise ValueError(f"Embedding {embedding_key!r} does not exist in the .obsm attribute.") else: self.X = adata.obsm[embedding_key] elif layer_key is not None: if layer_key not in adata.layers.keys(): raise ValueError(f"Layer {layer_key!r} does not exist in the anndata.") else: self.X = adata.layers[layer_key] else: self.X = adata.X clustering = DBSCAN(**kwargs).fit(self.X) adata.obs[cluster_key] = clustering.labels_ adata.obs[cluster_key] = adata.obs[cluster_key].astype("category") if return_object: return adata, clustering return adata