Source code for pertpy.tools._mixscape

from __future__ import annotations

import copy
from collections import OrderedDict
from typing import TYPE_CHECKING, Literal

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scanpy as sc
import seaborn as sns
from scanpy import get
from scanpy._settings import settings
from scanpy._utils import _check_use_raw, sanitize_anndata
from scanpy.plotting import _utils
from scanpy.tools._utils import _choose_representation
from scipy.sparse import csr_matrix, issparse, spmatrix
from sklearn.mixture import GaussianMixture

import pertpy as pt

if TYPE_CHECKING:
    from collections.abc import Sequence

    from anndata import AnnData
    from matplotlib.axes import Axes
    from matplotlib.colors import Colormap
    from scipy import sparse


[docs] class Mixscape: """Python implementation of Mixscape.""" def __init__(self): pass
[docs] def perturbation_signature( self, adata: AnnData, pert_key: str, control: str, split_by: str | None = None, n_neighbors: int = 20, use_rep: str | None = None, n_pcs: int | None = None, batch_size: int | None = None, copy: bool = False, **kwargs, ): """Calculate perturbation signature. For each cell, we identify `n_neighbors` cells from the control pool with the most similar mRNA expression profiles. The perturbation signature is calculated by subtracting the averaged mRNA expression profile of the control neighbors from the mRNA expression profile of each cell. Args: adata: The annotated data object. pert_key: The column of `.obs` with perturbation categories, should also contain `control`. control: Control category from the `pert_key` column. split_by: Provide the column `.obs` if multiple biological replicates exist to calculate the perturbation signature for every replicate separately. n_neighbors: Number of neighbors from the control to use for the perturbation signature. use_rep: Use the indicated representation. `'X'` or any key for `.obsm` is valid. If `None`, the representation is chosen automatically: For `.n_vars` < 50, `.X` is used, otherwise 'X_pca' is used. If 'X_pca' is not present, it’s computed with default parameters. n_pcs: Use this many PCs. If `n_pcs==0` use `.X` if `use_rep is None`. batch_size: Size of batch to calculate the perturbation signature. If 'None', the perturbation signature is calcuated in the full mode, requiring more memory. The batched mode is very inefficient for sparse data. copy: Determines whether a copy of the `adata` is returned. **kwargs: Additional arguments for the `NNDescent` class from `pynndescent`. Returns: If `copy=True`, returns the copy of `adata` with the perturbation signature in `.layers["X_pert"]`. Otherwise, writes the perturbation signature directly to `.layers["X_pert"]` of the provided `adata`. Examples: Calcutate perturbation signature for each cell in the dataset: >>> import pertpy as pt >>> mdata = pt.dt.papalexi_2021() >>> ms_pt = pt.tl.Mixscape() >>> ms_pt.perturbation_signature(mdata["rna"], "perturbation", "NT", "replicate") """ if copy: adata = adata.copy() adata.layers["X_pert"] = adata.X.copy() control_mask = adata.obs[pert_key] == control if split_by is None: split_masks = [np.full(adata.n_obs, True, dtype=bool)] else: split_obs = adata.obs[split_by] split_masks = [split_obs == cat for cat in split_obs.unique()] representation = _choose_representation(adata, use_rep=use_rep, n_pcs=n_pcs) for split_mask in split_masks: control_mask_split = control_mask & split_mask R_split = representation[split_mask] R_control = representation[control_mask_split] from pynndescent import NNDescent eps = kwargs.pop("epsilon", 0.1) nn_index = NNDescent(R_control, **kwargs) indices, _ = nn_index.query(R_split, k=n_neighbors, epsilon=eps) X_control = np.expm1(adata.X[control_mask_split]) n_split = split_mask.sum() n_control = X_control.shape[0] if batch_size is None: col_indices = np.ravel(indices) row_indices = np.repeat(np.arange(n_split), n_neighbors) neigh_matrix = csr_matrix( (np.ones_like(col_indices, dtype=np.float64), (row_indices, col_indices)), shape=(n_split, n_control), ) neigh_matrix /= n_neighbors adata.layers["X_pert"][split_mask] -= np.log1p(neigh_matrix @ X_control) else: is_sparse = issparse(X_control) split_indices = np.where(split_mask)[0] for i in range(0, n_split, batch_size): size = min(i + batch_size, n_split) select = slice(i, size) batch = np.ravel(indices[select]) split_batch = split_indices[select] size = size - i # sparse is very slow means_batch = X_control[batch] means_batch = means_batch.toarray() if is_sparse else means_batch means_batch = means_batch.reshape(size, n_neighbors, -1).mean(1) adata.layers["X_pert"][split_batch] -= np.log1p(means_batch) if copy: return adata
[docs] def mixscape( self, adata: AnnData, labels: str, control: str, new_class_name: str | None = "mixscape_class", min_de_genes: int | None = 5, layer: str | None = None, logfc_threshold: float | None = 0.25, iter_num: int | None = 10, split_by: str | None = None, pval_cutoff: float | None = 5e-2, perturbation_type: str | None = "KO", copy: bool | None = False, ): """Identify perturbed and non-perturbed gRNA expressing cells that accounts for multiple treatments/conditions/chemical perturbations. The implementation resembles https://satijalab.org/seurat/reference/runmixscape Args: adata: The annotated data object. labels: The column of `.obs` with target gene labels. control: Control category from the `pert_key` column. new_class_name: Name of mixscape classification to be stored in `.obs`. min_de_genes: Required number of genes that are differentially expressed for method to separate perturbed and non-perturbed cells. layer: Key from adata.layers whose value will be used to perform tests on. Default is using `.layers["X_pert"]`. logfc_threshold: Limit testing to genes which show, on average, at least X-fold difference (log-scale) between the two groups of cells (default: 0.25). iter_num: Number of normalmixEM iterations to run if convergence does not occur. split_by: Provide the column `.obs` if multiple biological replicates exist to calculate the perturbation signature for every replicate separately. pval_cutoff: P-value cut-off for selection of significantly DE genes. perturbation_type: specify type of CRISPR perturbation expected for labeling mixscape classifications. Defaults to KO. copy: Determines whether a copy of the `adata` is returned. Returns: If `copy=True`, returns the copy of `adata` with the classification result in `.obs`. Otherwise, writes the results directly to `.obs` of the provided `adata`. - mixscape_class: pandas.Series (`adata.obs['mixscape_class']`). Classification result with cells being either classified as perturbed (KO, by default) or non-perturbed (NP) based on their target gene class. - mixscape_class_global: pandas.Series (`adata.obs['mixscape_class_global']`). Global classification result (perturbed, NP or NT). - mixscape_class_p_ko: pandas.Series (`adata.obs['mixscape_class_p_ko']`). Posterior probabilities used to determine if a cell is KO (default). Name of this item will change to match perturbation_type parameter setting. (>0.5) or NP. Examples: Calcutate perturbation signature for each cell in the dataset: >>> import pertpy as pt >>> mdata = pt.dt.papalexi_2021() >>> ms_pt = pt.tl.Mixscape() >>> ms_pt.perturbation_signature(mdata["rna"], "perturbation", "NT", "replicate") >>> ms_pt.mixscape(adata=mdata["rna"], control="NT", labels="gene_target", layer="X_pert") """ if copy: adata = adata.copy() if split_by is None: split_masks = [np.full(adata.n_obs, True, dtype=bool)] categories = ["all"] else: split_obs = adata.obs[split_by] categories = split_obs.unique() split_masks = [split_obs == category for category in categories] perturbation_markers = self._get_perturbation_markers( adata, split_masks, categories, labels, control, layer, pval_cutoff, min_de_genes, logfc_threshold ) adata_comp = adata if layer is not None: X = adata_comp.layers[layer] else: try: X = adata_comp.layers["X_pert"] except KeyError: raise KeyError( "No 'X_pert' found in .layers! Please run pert_sign first to calculate perturbation signature!" ) from None # initialize return variables adata.obs[f"{new_class_name}_p_{perturbation_type.lower()}"] = 0 adata.obs[new_class_name] = adata.obs[labels].astype(str) adata.obs[f"{new_class_name}_global"] = np.empty( [ adata.n_obs, ], dtype=np.object_, ) gv_list: dict[str, dict] = {} for split, split_mask in enumerate(split_masks): category = categories[split] genes = list(set(adata[split_mask].obs[labels]).difference([control])) for gene in genes: post_prob = 0 orig_guide_cells = (adata.obs[labels] == gene) & split_mask orig_guide_cells_index = list(orig_guide_cells.index[orig_guide_cells]) nt_cells = (adata.obs[labels] == control) & split_mask all_cells = orig_guide_cells | nt_cells if len(perturbation_markers[(category, gene)]) == 0: adata.obs.loc[orig_guide_cells, new_class_name] = f"{gene} NP" else: de_genes = perturbation_markers[(category, gene)] de_genes_indices = self._get_column_indices(adata, list(de_genes)) dat = X[all_cells][:, de_genes_indices] converged = False n_iter = 0 old_classes = adata.obs[labels][all_cells] while not converged and n_iter < iter_num: # Get all cells in current split&Gene guide_cells = (adata.obs[labels] == gene) & split_mask # get average value for each gene over all selected cells # all cells in current split&Gene minus all NT cells in current split # Each row is for each cell, each column is for each gene, get mean for each column vec = np.mean(X[guide_cells][:, de_genes_indices], axis=0) - np.mean( X[nt_cells][:, de_genes_indices], axis=0 ) # project cells onto the perturbation vector if isinstance(dat, spmatrix): pvec = np.sum(np.multiply(dat.toarray(), vec), axis=1) / np.sum(np.multiply(vec, vec)) else: pvec = np.sum(np.multiply(dat, vec), axis=1) / np.sum(np.multiply(vec, vec)) pvec = pd.Series(np.asarray(pvec).flatten(), index=list(all_cells.index[all_cells])) if n_iter == 0: gv = pd.DataFrame(columns=["pvec", labels]) gv["pvec"] = pvec gv[labels] = control gv.loc[guide_cells, labels] = gene if gene not in gv_list.keys(): gv_list[gene] = {} gv_list[gene][category] = gv guide_norm = self._define_normal_mixscape(pvec[guide_cells]) nt_norm = self._define_normal_mixscape(pvec[nt_cells]) means_init = np.array([[nt_norm[0]], [guide_norm[0]]]) precisions_init = np.array([nt_norm[1], guide_norm[1]]) mm = GaussianMixture( n_components=2, covariance_type="spherical", means_init=means_init, precisions_init=precisions_init, ).fit(np.asarray(pvec).reshape(-1, 1)) probabilities = mm.predict_proba(np.array(pvec[orig_guide_cells_index]).reshape(-1, 1)) lik_ratio = probabilities[:, 0] / probabilities[:, 1] post_prob = 1 / (1 + lik_ratio) # based on the posterior probability, assign cells to the two classes adata.obs.loc[ [orig_guide_cells_index[cell] for cell in np.where(post_prob > 0.5)[0]], new_class_name ] = gene adata.obs.loc[ [orig_guide_cells_index[cell] for cell in np.where(post_prob <= 0.5)[0]], new_class_name ] = f"{gene} NP" if sum(adata.obs[new_class_name][split_mask] == gene) < min_de_genes: adata.obs.loc[guide_cells, new_class_name] = "NP" converged = True if adata.obs[new_class_name][all_cells].equals(old_classes): converged = True old_classes = adata.obs[new_class_name][all_cells] n_iter += 1 adata.obs.loc[(adata.obs[new_class_name] == gene) & split_mask, new_class_name] = ( f"{gene} {perturbation_type}" ) adata.obs[f"{new_class_name}_global"] = [a.split(" ")[-1] for a in adata.obs[new_class_name]] adata.obs.loc[orig_guide_cells_index, f"{new_class_name}_p_{perturbation_type.lower()}"] = post_prob adata.uns["mixscape"] = gv_list if copy: return adata
[docs] def lda( self, adata: AnnData, labels: str, control: str, mixscape_class_global: str | None = "mixscape_class_global", layer: str | None = None, n_comps: int | None = 10, min_de_genes: int | None = 5, logfc_threshold: float | None = 0.25, split_by: str | None = None, pval_cutoff: float | None = 5e-2, perturbation_type: str | None = "KO", copy: bool | None = False, ): """Linear Discriminant Analysis on pooled CRISPR screen data. Requires `pt.tl.mixscape()` to be run first. Args: adata: The annotated data object. labels: The column of `.obs` with target gene labels. control: Control category from the `pert_key` column. mixscape_class_global: The column of `.obs` with mixscape global classification result (perturbed, NP or NT). layer: Key from `adata.layers` whose value will be used to perform tests on. control: Control category from the `pert_key` column. Defaults to 'NT'. n_comps: Number of principal components to use. Defaults to 10. min_de_genes: Required number of genes that are differentially expressed for method to separate perturbed and non-perturbed cells. logfc_threshold: Limit testing to genes which show, on average, at least X-fold difference (log-scale) between the two groups of cells. Defaults to 0.25. split_by: Provide the column `.obs` if multiple biological replicates exist to calculate pval_cutoff: P-value cut-off for selection of significantly DE genes. perturbation_type: Specify type of CRISPR perturbation expected for labeling mixscape classifications. Defaults to KO. copy: Determines whether a copy of the `adata` is returned. Returns: If `copy=True`, returns the copy of `adata` with the LDA result in `.uns`. Otherwise, writes the results directly to `.uns` of the provided `adata`. mixscape_lda: numpy.ndarray (`adata.uns['mixscape_lda']`). LDA result. Examples: Use LDA dimensionality reduction to visualize the perturbation effects: >>> import pertpy as pt >>> mdata = pt.dt.papalexi_2021() >>> ms_pt = pt.tl.Mixscape() >>> ms_pt.perturbation_signature(mdata["rna"], "perturbation", "NT", "replicate") >>> ms_pt.mixscape(adata=mdata["rna"], control="NT", labels="gene_target", layer="X_pert") >>> ms_pt.lda(adata=mdata["rna"], control="NT", labels="gene_target", layer="X_pert") """ if copy: adata = adata.copy() from sklearn.discriminant_analysis import LinearDiscriminantAnalysis if mixscape_class_global not in adata.obs: raise ValueError("Please run `pt.tl.mixscape` first.") if split_by is None: split_masks = [np.full(adata.n_obs, True, dtype=bool)] categories = ["all"] else: split_obs = adata.obs[split_by] categories = split_obs.unique() split_masks = [split_obs == category for category in categories] mixscape_identifier = pt.tl.Mixscape() # determine gene sets across all splits/groups through differential gene expression perturbation_markers = mixscape_identifier._get_perturbation_markers( adata=adata, split_masks=split_masks, categories=categories, labels=labels, control=control, layer=layer, pval_cutoff=pval_cutoff, min_de_genes=min_de_genes, logfc_threshold=logfc_threshold, ) adata_subset = adata[ (adata.obs[mixscape_class_global] == perturbation_type) | (adata.obs[mixscape_class_global] == control) ].copy() projected_pcs: dict[str, np.ndarray] = {} # performs PCA on each mixscape class separately and projects each subspace onto all cells in the data. for _, (key, value) in enumerate(perturbation_markers.items()): if len(value) == 0: continue else: gene_subset = adata_subset[ (adata_subset.obs[labels] == key[1]) | (adata_subset.obs[labels] == control) ].copy() sc.pp.scale(gene_subset) sc.tl.pca(gene_subset, n_comps=n_comps) sc.pp.neighbors(gene_subset) # projects each subspace onto all cells in the data. sc.tl.ingest(adata=adata_subset, adata_ref=gene_subset, embedding_method="pca") projected_pcs[key[1]] = adata_subset.obsm["X_pca"] # concatenate all pcs into a single matrix. for index, (_, value) in enumerate(projected_pcs.items()): if index == 0: projected_pcs_array = value else: projected_pcs_array = np.concatenate((projected_pcs_array, value), axis=1) clf = LinearDiscriminantAnalysis(n_components=len(np.unique(adata_subset.obs[labels])) - 1) clf.fit(projected_pcs_array, adata_subset.obs[labels]) cell_embeddings = clf.transform(projected_pcs_array) adata.uns["mixscape_lda"] = cell_embeddings if copy: return adata
def _get_perturbation_markers( self, adata: AnnData, split_masks: list[np.ndarray], categories: list[str], labels: str, control: str, layer: str, pval_cutoff: float, min_de_genes: float, logfc_threshold: float, ) -> dict[tuple, np.ndarray]: """Determine gene sets across all splits/groups through differential gene expression Args: adata: :class:`~anndata.AnnData` object col_names: Column names to extract the indices for Returns: Set of column indices. """ perturbation_markers: dict[tuple, np.ndarray] = {} # type: ignore for split, split_mask in enumerate(split_masks): category = categories[split] # get gene sets for each split genes = list(set(adata[split_mask].obs[labels]).difference([control])) adata_split = adata[split_mask].copy() # find top DE genes between cells with targeting and non-targeting gRNAs sc.tl.rank_genes_groups( adata_split, layer=layer, groupby=labels, groups=genes, reference=control, method="t-test" ) # get DE genes for each gene for gene in genes: logfc_threshold_mask = adata_split.uns["rank_genes_groups"]["logfoldchanges"][gene] >= logfc_threshold de_genes = adata_split.uns["rank_genes_groups"]["names"][gene][logfc_threshold_mask] pvals_adj = adata_split.uns["rank_genes_groups"]["pvals_adj"][gene][logfc_threshold_mask] de_genes = de_genes[pvals_adj < pval_cutoff] if len(de_genes) < min_de_genes: de_genes = np.array([]) perturbation_markers[(category, gene)] = de_genes return perturbation_markers def _get_column_indices(self, adata, col_names): if isinstance(col_names, str): # pragma: no cover col_names = [col_names] indices = [] for idx, col in enumerate(adata.var_names): if col in col_names: indices.append(idx) return indices def _define_normal_mixscape(self, X: np.ndarray | sparse.spmatrix | pd.DataFrame | None) -> list[float]: """Calculates the mean and standard deviation of a matrix. Args: X: The matrix to calculate the properties for. Returns: Mean and standard deviation of the matrix. """ mu = X.mean() sd = X.std() return [mu, sd]
[docs] def plot_barplot( # pragma: no cover self, adata: AnnData, guide_rna_column: str, mixscape_class_global: str = "mixscape_class_global", axis_text_x_size: int = 8, axis_text_y_size: int = 6, axis_title_size: int = 8, legend_title_size: int = 8, legend_text_size: int = 8, return_fig: bool | None = None, ax: Axes | None = None, show: bool | None = None, save: bool | str | None = None, ): """Barplot to visualize perturbation scores calculated by the `mixscape` function. Args: adata: The annotated data object. guide_rna_column: The column of `.obs` with guide RNA labels. The target gene labels. The format must be <gene_target>g<#>. Examples are 'STAT2g1' and 'ATF2g1'. mixscape_class_global: The column of `.obs` with mixscape global classification result (perturbed, NP or NT). show: Show the plot, do not return axis. save: If True or a str, save the figure. A string is appended to the default filename. Infer the filetype if ending on {'.pdf', '.png', '.svg'}. Returns: If `show==False`, return a :class:`~matplotlib.axes.Axes. Examples: >>> import pertpy as pt >>> mdata = pt.dt.papalexi_2021() >>> ms_pt = pt.tl.Mixscape() >>> ms_pt.perturbation_signature(mdata["rna"], "perturbation", "NT", "replicate") >>> ms_pt.mixscape(adata=mdata["rna"], control="NT", labels="gene_target", layer="X_pert") >>> ms_pt.plot_barplot(mdata["rna"], guide_rna_column="NT") Preview: .. image:: /_static/docstring_previews/mixscape_barplot.png """ if mixscape_class_global not in adata.obs: raise ValueError("Please run the `mixscape` function first.") count = pd.crosstab(index=adata.obs[mixscape_class_global], columns=adata.obs[guide_rna_column]) all_cells_percentage = pd.melt(count / count.sum(), ignore_index=False).reset_index() KO_cells_percentage = all_cells_percentage[all_cells_percentage[mixscape_class_global] == "KO"] KO_cells_percentage = KO_cells_percentage.sort_values("value", ascending=False) new_levels = KO_cells_percentage[guide_rna_column] all_cells_percentage[guide_rna_column] = pd.Categorical( all_cells_percentage[guide_rna_column], categories=new_levels, ordered=False ) all_cells_percentage[mixscape_class_global] = pd.Categorical( all_cells_percentage[mixscape_class_global], categories=["NT", "NP", "KO"], ordered=False ) all_cells_percentage["gene"] = all_cells_percentage[guide_rna_column].str.rsplit("g", expand=True)[0] all_cells_percentage["guide_number"] = all_cells_percentage[guide_rna_column].str.rsplit("g", expand=True)[1] all_cells_percentage["guide_number"] = "g" + all_cells_percentage["guide_number"] NP_KO_cells = all_cells_percentage[all_cells_percentage["gene"] != "NT"] if show: color_mapping = {"KO": "salmon", "NP": "lightgray", "NT": "grey"} unique_genes = NP_KO_cells["gene"].unique() fig, axs = plt.subplots(int(len(unique_genes) / 5), 5, figsize=(25, 25), sharey=True) for i, gene in enumerate(unique_genes): ax = axs[int(i / 5), i % 5] grouped_df = ( NP_KO_cells[NP_KO_cells["gene"] == gene] .groupby(["guide_number", "mixscape_class_global"], observed=False)["value"] .sum() .unstack() ) grouped_df.plot( kind="bar", stacked=True, color=[color_mapping[col] for col in grouped_df.columns], ax=ax, width=0.8, legend=False, ) ax.set_title( gene, bbox={"facecolor": "white", "edgecolor": "black", "pad": 1}, fontsize=axis_title_size ) ax.set(xlabel="sgRNA", ylabel="% of cells") sns.despine(ax=ax, top=True, right=True, left=False, bottom=False) ax.set_xticklabels(ax.get_xticklabels(), rotation=0, ha="right", fontsize=axis_text_x_size) ax.set_yticklabels(ax.get_yticklabels(), rotation=0, fontsize=axis_text_y_size) fig.subplots_adjust(right=0.8) fig.subplots_adjust(hspace=0.5, wspace=0.5) ax.legend( title="mixscape_class_global", loc="center right", bbox_to_anchor=(2.2, 3.5), frameon=True, fontsize=legend_text_size, title_fontsize=legend_title_size, ) plt.tight_layout() _utils.savefig_or_show("mixscape_barplot", show=show, save=save)
[docs] def plot_heatmap( # pragma: no cover self, adata: AnnData, labels: str, target_gene: str, control: str, layer: str | None = None, method: str | None = "wilcoxon", subsample_number: int | None = 900, vmin: float | None = -2, vmax: float | None = 2, return_fig: bool | None = None, show: bool | None = None, save: bool | str | None = None, **kwds, ) -> Axes | None: """Heatmap plot using mixscape results. Requires `pt.tl.mixscape()` to be run first. Args: adata: The annotated data object. labels: The column of `.obs` with target gene labels. target_gene: Target gene name to visualize heatmap for. control: Control category from the `pert_key` column. layer: Key from `adata.layers` whose value will be used to perform tests on. method: The default method is 'wilcoxon', see `method` parameter in `scanpy.tl.rank_genes_groups` for more options. subsample_number: Subsample to this number of observations. vmin: The value representing the lower limit of the color scale. Values smaller than vmin are plotted with the same color as vmin. vmax: The value representing the upper limit of the color scale. Values larger than vmax are plotted with the same color as vmax. show: Show the plot, do not return axis. save: If `True` or a `str`, save the figure. A string is appended to the default filename. Infer the filetype if ending on {`'.pdf'`, `'.png'`, `'.svg'`}. ax: A matplotlib axes object. Only works if plotting a single component. **kwds: Additional arguments to `scanpy.pl.rank_genes_groups_heatmap`. Returns: If `show==False`, return a :class:`~matplotlib.axes.Axes`. Examples: >>> import pertpy as pt >>> mdata = pt.dt.papalexi_2021() >>> ms_pt = pt.tl.Mixscape() >>> ms_pt.perturbation_signature(mdata["rna"], "perturbation", "NT", "replicate") >>> ms_pt.mixscape(adata=mdata["rna"], control="NT", labels="gene_target", layer="X_pert") >>> ms_pt.plot_heatmap( ... adata=mdata["rna"], labels="gene_target", target_gene="IFNGR2", layer="X_pert", control="NT" ... ) Preview: .. image:: /_static/docstring_previews/mixscape_heatmap.png """ if "mixscape_class" not in adata.obs: raise ValueError("Please run `pt.tl.mixscape` first.") adata_subset = adata[(adata.obs[labels] == target_gene) | (adata.obs[labels] == control)].copy() sc.tl.rank_genes_groups(adata_subset, layer=layer, groupby=labels, method=method) sc.pp.scale(adata_subset, max_value=vmax) sc.pp.subsample(adata_subset, n_obs=subsample_number) return sc.pl.rank_genes_groups_heatmap( adata_subset, groupby="mixscape_class", vmin=vmin, vmax=vmax, n_genes=20, groups=["NT"], return_fig=return_fig, show=show, save=save, **kwds, )
[docs] def plot_perturbscore( # pragma: no cover self, adata: AnnData, labels: str, target_gene: str, mixscape_class: str = "mixscape_class", color: str = "orange", palette: dict[str, str] = None, split_by: str = None, before_mixscape: bool = False, perturbation_type: str = "KO", return_fig: bool | None = None, ax: Axes | None = None, show: bool | None = None, save: bool | str | None = None, ) -> None: """Density plots to visualize perturbation scores calculated by the `pt.tl.mixscape` function. Requires `pt.tl.mixscape` to be run first. https://satijalab.org/seurat/reference/plotperturbscore Args: adata: The annotated data object. labels: The column of `.obs` with target gene labels. target_gene: Target gene name to visualize perturbation scores for. mixscape_class: The column of `.obs` with mixscape classifications. color: Specify color of target gene class or knockout cell class. For control non-targeting and non-perturbed cells, colors are set to different shades of grey. palette: Optional full color palette to overwrite all colors. split_by: Provide the column `.obs` if multiple biological replicates exist to calculate the perturbation signature for every replicate separately. before_mixscape: Option to split densities based on mixscape classification (default) or original target gene classification. Default is set to NULL and plots cells by original class ID. perturbation_type: Specify type of CRISPR perturbation expected for labeling mixscape classifications. Defaults to `KO`. Examples: Visualizing the perturbation scores for the cells in a dataset: >>> import pertpy as pt >>> mdata = pt.dt.papalexi_2021() >>> ms_pt = pt.tl.Mixscape() >>> ms_pt.perturbation_signature(mdata["rna"], "perturbation", "NT", "replicate") >>> ms_pt.mixscape(adata=mdata["rna"], control="NT", labels="gene_target", layer="X_pert") >>> ms_pt.plot_perturbscore(adata=mdata["rna"], labels="gene_target", target_gene="IFNGR2", color="orange") Preview: .. image:: /_static/docstring_previews/mixscape_perturbscore.png """ if "mixscape" not in adata.uns: raise ValueError("Please run the `mixscape` function first.") perturbation_score = None for key in adata.uns["mixscape"][target_gene].keys(): perturbation_score_temp = adata.uns["mixscape"][target_gene][key] perturbation_score_temp["name"] = key if perturbation_score is None: perturbation_score = copy.deepcopy(perturbation_score_temp) else: perturbation_score = pd.concat([perturbation_score, perturbation_score_temp]) perturbation_score["mix"] = adata.obs[mixscape_class][perturbation_score.index] gd = list(set(perturbation_score[labels]).difference({target_gene}))[0] # If before_mixscape is True, split densities based on original target gene classification if before_mixscape is True: palette = {gd: "#7d7d7d", target_gene: color} plot_dens = sns.kdeplot(data=perturbation_score, x="pvec", hue=labels, fill=False, common_norm=False) top_r = max(plot_dens.get_lines()[cond].get_data()[1].max() for cond in range(len(plot_dens.get_lines()))) plt.close() perturbation_score["y_jitter"] = perturbation_score["pvec"] rng = np.random.default_rng() perturbation_score.loc[perturbation_score[labels] == gd, "y_jitter"] = rng.uniform( low=0.001, high=top_r / 10, size=sum(perturbation_score[labels] == gd) ) perturbation_score.loc[perturbation_score[labels] == target_gene, "y_jitter"] = rng.uniform( low=-top_r / 10, high=0, size=sum(perturbation_score[labels] == target_gene) ) # If split_by is provided, split densities based on the split_by if split_by is not None: sns.set_theme(style="whitegrid") g = sns.FacetGrid( data=perturbation_score, col=split_by, hue=split_by, palette=palette, height=5, sharey=False ) g.map(sns.kdeplot, "pvec", fill=True, common_norm=False, palette=palette) g.map(sns.scatterplot, "pvec", "y_jitter", s=10, alpha=0.5, palette=palette) g.set_axis_labels("Perturbation score", "Cell density") g.add_legend(title=split_by, fontsize=14, title_fontsize=16) g.despine(left=True) # If split_by is not provided, create a single plot else: sns.set_theme(style="whitegrid") sns.kdeplot( data=perturbation_score, x="pvec", hue="gene_target", fill=True, common_norm=False, palette=palette ) sns.scatterplot( data=perturbation_score, x="pvec", y="y_jitter", hue="gene_target", palette=palette, s=10, alpha=0.5 ) plt.xlabel("Perturbation score", fontsize=16) plt.ylabel("Cell density", fontsize=16) plt.title("Density Plot", fontsize=18) plt.legend(title="gene_target", title_fontsize=14, fontsize=12) sns.despine() if save: plt.savefig(save, bbox_inches="tight") if show: plt.show() if return_fig: return plt.gcf() if not (show or save): return plt.gca() # If before_mixscape is False, split densities based on mixscape classifications else: if palette is None: palette = {gd: "#7d7d7d", f"{target_gene} NP": "#c9c9c9", f"{target_gene} {perturbation_type}": color} plot_dens = sns.kdeplot(data=perturbation_score, x="pvec", hue=labels, fill=False, common_norm=False) top_r = max(plot_dens.get_lines()[i].get_data()[1].max() for i in range(len(plot_dens.get_lines()))) plt.close() perturbation_score["y_jitter"] = perturbation_score["pvec"] rng = np.random.default_rng() gd2 = list( set(perturbation_score["mix"]).difference([f"{target_gene} NP", f"{target_gene} {perturbation_type}"]) )[0] perturbation_score.loc[perturbation_score["mix"] == gd2, "y_jitter"] = rng.uniform( low=0.001, high=top_r / 10, size=sum(perturbation_score["mix"] == gd2) ).astype(np.float32) perturbation_score.loc[perturbation_score["mix"] == f"{target_gene} {perturbation_type}", "y_jitter"] = ( rng.uniform( low=-top_r / 10, high=0, size=sum(perturbation_score["mix"] == f"{target_gene} {perturbation_type}") ) ) perturbation_score.loc[perturbation_score["mix"] == f"{target_gene} NP", "y_jitter"] = rng.uniform( low=-top_r / 10, high=0, size=sum(perturbation_score["mix"] == f"{target_gene} NP") ) # If split_by is provided, split densities based on the split_by if split_by is not None: sns.set_theme(style="whitegrid") g = sns.FacetGrid( data=perturbation_score, col=split_by, hue="mix", palette=palette, height=5, sharey=False ) g.map(sns.kdeplot, "pvec", fill=True, common_norm=False, alpha=0.7) g.map(sns.scatterplot, "pvec", "y_jitter", s=10, alpha=0.5) g.set_axis_labels("Perturbation score", "Cell density") g.add_legend(title="mix", fontsize=14, title_fontsize=16) g.despine(left=True) # If split_by is not provided, create a single plot else: sns.set_theme(style="whitegrid") sns.kdeplot( data=perturbation_score, x="pvec", hue="mix", fill=True, common_norm=False, palette=palette, alpha=0.7, ) sns.scatterplot( data=perturbation_score, x="pvec", y="y_jitter", hue="mix", palette=palette, s=10, alpha=0.5 ) plt.xlabel("Perturbation score", fontsize=16) plt.ylabel("Cell density", fontsize=16) plt.title("Density", fontsize=18) plt.legend(title="mixscape class", title_fontsize=14, fontsize=12) sns.despine() if save: plt.savefig(save, bbox_inches="tight") if show: plt.show() if return_fig: return plt.gcf() if not (show or save): return plt.gca()
[docs] def plot_violin( # pragma: no cover self, adata: AnnData, target_gene_idents: str | list[str], keys: str | Sequence[str] = "mixscape_class_p_ko", groupby: str | None = "mixscape_class", log: bool = False, use_raw: bool | None = None, stripplot: bool = True, hue: str | None = None, jitter: float | bool = True, size: int = 1, layer: str | None = None, scale: Literal["area", "count", "width"] = "width", order: Sequence[str] | None = None, multi_panel: bool | None = None, xlabel: str = "", ylabel: str | Sequence[str] | None = None, rotation: float | None = None, ax: Axes | None = None, show: bool | None = None, save: bool | str | None = None, **kwargs, ): """Violin plot using mixscape results. Requires `pt.tl.mixscape` to be run first. Args: adata: The annotated data object. target_gene_idents: Target gene name to plot. keys: Keys for accessing variables of `.var_names` or fields of `.obs`. Default is 'mixscape_class_p_ko'. groupby: The key of the observation grouping to consider. Default is 'mixscape_class'. log: Plot on logarithmic axis. use_raw: Whether to use `raw` attribute of `adata`. Defaults to `True` if `.raw` is present. stripplot: Add a stripplot on top of the violin plot. order: Order in which to show the categories. xlabel: Label of the x-axis. Defaults to `groupby` if `rotation` is `None`, otherwise, no label is shown. ylabel: Label of the y-axis. If `None` and `groupby` is `None`, defaults to `'value'`. If `None` and `groubpy` is not `None`, defaults to `keys`. show: Show the plot, do not return axis. save: If `True` or a `str`, save the figure. A string is appended to the default filename. Infer the filetype if ending on {`'.pdf'`, `'.png'`, `'.svg'`}. ax: A matplotlib axes object. Only works if plotting a single component. **kwargs: Additional arguments to `seaborn.violinplot`. Returns: A :class:`~matplotlib.axes.Axes` object if `ax` is `None` else `None`. Examples: >>> import pertpy as pt >>> mdata = pt.dt.papalexi_2021() >>> ms_pt = pt.tl.Mixscape() >>> ms_pt.perturbation_signature(mdata["rna"], "perturbation", "NT", "replicate") >>> ms_pt.mixscape(adata=mdata["rna"], control="NT", labels="gene_target", layer="X_pert") >>> ms_pt.plot_violin( ... adata=mdata["rna"], target_gene_idents=["NT", "IFNGR2 NP", "IFNGR2 KO"], groupby="mixscape_class" ... ) Preview: .. image:: /_static/docstring_previews/mixscape_violin.png """ if isinstance(target_gene_idents, str): mixscape_class_mask = adata.obs[groupby] == target_gene_idents elif isinstance(target_gene_idents, list): mixscape_class_mask = np.full_like(adata.obs[groupby], False, dtype=bool) for ident in target_gene_idents: mixscape_class_mask |= adata.obs[groupby] == ident adata = adata[mixscape_class_mask] sanitize_anndata(adata) use_raw = _check_use_raw(adata, use_raw) if isinstance(keys, str): keys = [keys] keys = list(OrderedDict.fromkeys(keys)) # remove duplicates, preserving the order if isinstance(ylabel, str | type(None)): ylabel = [ylabel] * (1 if groupby is None else len(keys)) if groupby is None: if len(ylabel) != 1: raise ValueError(f"Expected number of y-labels to be `1`, found `{len(ylabel)}`.") elif len(ylabel) != len(keys): raise ValueError(f"Expected number of y-labels to be `{len(keys)}`, " f"found `{len(ylabel)}`.") if groupby is not None: if hue is not None: obs_df = get.obs_df(adata, keys=[groupby] + keys + [hue], layer=layer, use_raw=use_raw) else: obs_df = get.obs_df(adata, keys=[groupby] + keys, layer=layer, use_raw=use_raw) else: obs_df = get.obs_df(adata, keys=keys, layer=layer, use_raw=use_raw) if groupby is None: obs_tidy = pd.melt(obs_df, value_vars=keys) x = "variable" ys = ["value"] else: obs_tidy = obs_df x = groupby ys = keys if multi_panel and groupby is None and len(ys) == 1: # This is a quick and dirty way for adapting scales across several # keys if groupby is None. y = ys[0] g = sns.catplot( y=y, data=obs_tidy, kind="violin", scale=scale, col=x, col_order=keys, sharey=False, order=keys, cut=0, inner=None, **kwargs, ) if stripplot: grouped_df = obs_tidy.groupby(x) for ax_id, key in zip(range(g.axes.shape[1]), keys, strict=False): sns.stripplot( y=y, data=grouped_df.get_group(key), jitter=jitter, size=size, color="black", ax=g.axes[0, ax_id], ) if log: g.set(yscale="log") g.set_titles(col_template="{col_name}").set_xlabels("") if rotation is not None: for ax in g.axes[0]: ax.tick_params(axis="x", labelrotation=rotation) else: # set by default the violin plot cut=0 to limit the extend # of the violin plot (see stacked_violin code) for more info. kwargs.setdefault("cut", 0) kwargs.setdefault("inner") if ax is None: axs, _, _, _ = _utils.setup_axes( ax=ax, panels=["x"] if groupby is None else keys, show_ticks=True, right_margin=0.3, ) else: axs = [ax] for ax, y, ylab in zip(axs, ys, ylabel, strict=False): ax = sns.violinplot( x=x, y=y, data=obs_tidy, order=order, orient="vertical", scale=scale, ax=ax, hue=hue, **kwargs, ) # Get the handles and labels. handles, labels = ax.get_legend_handles_labels() if stripplot: ax = sns.stripplot( x=x, y=y, data=obs_tidy, order=order, jitter=jitter, color="black", size=size, ax=ax, hue=hue, dodge=True, ) if xlabel == "" and groupby is not None and rotation is None: xlabel = groupby.replace("_", " ") ax.set_xlabel(xlabel) if ylab is not None: ax.set_ylabel(ylab) if log: ax.set_yscale("log") if rotation is not None: ax.tick_params(axis="x", labelrotation=rotation) show = settings.autoshow if show is None else show if hue is not None and stripplot is True: plt.legend(handles, labels) _utils.savefig_or_show("mixscape_violin", show=show, save=save) if not show: if multi_panel and groupby is None and len(ys) == 1: return g elif len(axs) == 1: return axs[0] else: return axs
[docs] def plot_lda( # pragma: no cover self, adata: AnnData, control: str, mixscape_class: str = "mixscape_class", mixscape_class_global: str = "mixscape_class_global", perturbation_type: str | None = "KO", lda_key: str | None = "mixscape_lda", n_components: int | None = None, color_map: Colormap | str | None = None, palette: str | Sequence[str] | None = None, return_fig: bool | None = None, ax: Axes | None = None, show: bool | None = None, save: bool | str | None = None, **kwds, ) -> None: """Visualizing perturbation responses with Linear Discriminant Analysis. Requires `pt.tl.mixscape()` to be run first. Args: adata: The annotated data object. control: Control category from the `pert_key` column. mixscape_class: The column of `.obs` with the mixscape classification result. mixscape_class_global: The column of `.obs` with mixscape global classification result (perturbed, NP or NT). perturbation_type: Specify type of CRISPR perturbation expected for labeling mixscape classifications. Defaults to 'KO'. lda_key: If not specified, lda looks .uns["mixscape_lda"] for the LDA results. n_components: The number of dimensions of the embedding. show: Show the plot, do not return axis. save: If `True` or a `str`, save the figure. A string is appended to the default filename. Infer the filetype if ending on {`'.pdf'`, `'.png'`, `'.svg'`}. **kwds: Additional arguments to `scanpy.pl.umap`. Examples: >>> import pertpy as pt >>> mdata = pt.dt.papalexi_2021() >>> ms_pt = pt.tl.Mixscape() >>> ms_pt.perturbation_signature(mdata["rna"], "perturbation", "NT", "replicate") >>> ms_pt.mixscape(adata=mdata["rna"], control="NT", labels="gene_target", layer="X_pert") >>> ms_pt.lda(adata=mdata["rna"], control="NT", labels="gene_target", layer="X_pert") >>> ms_pt.plot_lda(adata=mdata["rna"], control="NT") Preview: .. image:: /_static/docstring_previews/mixscape_lda.png """ if mixscape_class not in adata.obs: raise ValueError(f'Did not find `.obs["{mixscape_class!r}"]`. Please run the `mixscape` function first.') if lda_key not in adata.uns: raise ValueError(f'Did not find `.uns["{lda_key!r}"]`. Please run the `lda` function first.') adata_subset = adata[ (adata.obs[mixscape_class_global] == perturbation_type) | (adata.obs[mixscape_class_global] == control) ].copy() adata_subset.obsm[lda_key] = adata_subset.uns[lda_key] if n_components is None: n_components = adata_subset.uns[lda_key].shape[1] sc.pp.neighbors(adata_subset, use_rep=lda_key) sc.tl.umap(adata_subset, n_components=n_components) sc.pl.umap( adata_subset, color=mixscape_class, palette=palette, color_map=color_map, return_fig=return_fig, show=show, save=save, ax=ax, **kwds, )