Source code for pertpy.tools._enrichment

from collections import ChainMap
from collections.abc import Sequence
from typing import Any, Literal

import blitzgsea
import numpy as np
import pandas as pd
import scanpy as sc
from anndata import AnnData
from matplotlib.axes import Axes
from scanpy.plotting import DotPlot
from scanpy.tools._score_genes import _sparse_nanmean
from scipy.sparse import issparse
from scipy.stats import hypergeom
from statsmodels.stats.multitest import multipletests

from pertpy.metadata import Drug


def _prepare_targets(
    targets: dict[str, list[str]] | dict[str, dict[str, list[str]]] = None,
    nested: bool = False,
    categories: str | Sequence[str] = None,
) -> ChainMap | dict:
    if categories is not None:
        if isinstance(categories, str):
            categories = [categories]
        else:
            categories = list(categories)

    if targets is None:
        pt_drug = Drug()
        pt_drug.chembl.set()
        targets = pt_drug.chembl.dictionary
        nested = True
    else:
        targets = targets.copy()
    if categories is not None:
        targets = {k: targets[k] for k in categories}  # type: ignore
    if nested:
        targets = dict(ChainMap(*[targets[cat] for cat in targets]))  # type: ignore

    return targets


def _mean(X, names, axis):
    """Helper function to compute a mean of X across an axis, respecting names and possible nans."""
    if issparse(X):
        obs_avg = pd.Series(
            np.array(_sparse_nanmean(X, axis=axis)).flatten(),
            index=names,
        )
    else:
        obs_avg = pd.Series(np.nanmean(X, axis=axis), index=names)
    return obs_avg


[docs] class Enrichment:
[docs] def score( self, adata: AnnData, layer: str = None, targets: dict[str, list[str]] | dict[str, dict[str, list[str]]] = None, nested: bool = False, categories: Sequence[str] = None, method: Literal["mean", "seurat"] = "mean", n_bins: int = 25, ctrl_size: int = 50, key_added: str = "pertpy_enrichment", ) -> None: """Obtain per-cell scoring of gene groups of interest. Inspired by drug2cell score: https://github.com/Teichlab/drug2cell. Ensure that the gene nomenclature in your target sets is compatible with your `.var_names`. The ChEMBL drug targets use HGNC. Args: adata: An AnnData object. It is recommended to use log-normalised data. targets: Gene groups to evaluate, which can be targets of known drugs, GO terms, pathway memberships, etc. Accepts two forms: - A dictionary with group names as keys and corresponding gene lists as entries. - A dictionary of dictionaries with group categories as keys. Use `nested=True` in this case. If not provided, ChEMBL-derived drug target sets are used. nested: Indicates if `targets` is a dictionary of dictionaries with group categories as keys. Defaults to False. categories: To subset the gene groups to specific categories, especially when `targets=None` or `nested=True`. For ChEMBL drug targets, these are ATC level 1/level 2 category codes. method: Method for scoring gene groups. `"mean"` calculates the mean over all genes, while `"seurat"` uses a background profile subtraction approach. Defaults to 'mean'. layer: Specifies which `.layers` of AnnData to use for expression values. Defaults to `.X` if None. n_bins: The number of expression bins for the `'seurat'` method. ctrl_size: The number of genes to randomly sample from each expression bin for the `"seurat"` method. key_added: Prefix key that adds the results to `uns`. Note that the actual values are `key_added_score`, `key_added_variables`, `key_added_genes`, `key_added_all_genes`. Defaults to `pertpy_enrichment`. Returns: An AnnData object with scores. """ if layer is not None: mtx = adata.layers[layer] else: mtx = adata.X targets = _prepare_targets(targets=targets, nested=nested, categories=categories) # type: ignore full_targets = targets.copy() for drug in targets: targets[drug] = np.isin(adata.var_names, targets[drug]) # Scoring is done via matrix multiplication of the original cell by gene matrix by a new gene by drug matrix # with the entries in the new matrix being the weights of each gene for that group (such as drug) # The mean across targets is constant -> prepare weights for that weights = pd.DataFrame(targets, index=adata.var_names) weights = weights.loc[:, weights.sum() > 0] weights = weights / weights.sum() if issparse(mtx): scores = mtx.dot(weights) else: scores = np.dot(mtx, weights) if method == "seurat": obs_avg = _mean(mtx, names=adata.var_names, axis=0) n_items = int(np.round(len(obs_avg) / (n_bins - 1))) obs_cut = obs_avg.rank(method="min") // n_items obs_cut = obs_cut.values control_groups = {} for cut in np.unique(obs_cut): mask = obs_cut == cut r_genes = np.nonzero(mask)[0] rng = np.random.default_rng() rng.shuffle(r_genes) mask[r_genes[ctrl_size:]] = False control_groups[cut] = mask control_gene_weights = pd.DataFrame(control_groups, index=adata.var_names) control_gene_weights = control_gene_weights / control_gene_weights.sum() if issparse(mtx): control_profiles = mtx.dot(control_gene_weights) else: control_profiles = np.dot(mtx, control_gene_weights) drug_bins = {} for drug in weights.columns: bins = np.unique(obs_cut[targets[drug]]) drug_bins[drug] = np.isin(control_gene_weights.columns, bins) drug_weights = pd.DataFrame(drug_bins, index=control_gene_weights.columns) drug_weights = drug_weights / drug_weights.sum() seurat = np.dot(control_profiles, drug_weights) scores = scores - seurat adata.uns[f"{key_added}_score"] = scores adata.uns[f"{key_added}_variables"] = weights.columns adata.uns[f"{key_added}_genes"] = {"var": pd.DataFrame(columns=["genes"]).astype(object)} adata.uns[f"{key_added}_all_genes"] = {"var": pd.DataFrame(columns=["all_genes"]).astype(object)} for drug in weights.columns: adata.uns[f"{key_added}_genes"]["var"].loc[drug, "genes"] = "|".join(adata.var_names[targets[drug]]) adata.uns[f"{key_added}_all_genes"]["var"].loc[drug, "all_genes"] = "|".join(full_targets[drug])
[docs] def hypergeometric( self, adata: AnnData, targets: dict[str, list[str] | dict[str, list[str]]] | None = None, nested: bool = False, categories: str | list[str] | None = None, pvals_adj_thresh: float = 0.05, direction: str = "both", corr_method: Literal["benjamini-hochberg", "bonferroni"] = "benjamini-hochberg", ): """Perform a hypergeometric test to assess the overrepresentation of gene group members. Args: adata: With marker genes computed via `sc.tl.rank_genes_groups()` in the original expression space. targets: The gene groups to evaluate. Can be targets of known drugs, GO terms, pathway memberships, anything you can assign genes to. If `None`, will use `d2c.score()` output if present, and if not present load the ChEMBL-derived drug target sets distributed with the package. Accepts two forms: - A dictionary with the names of the groups as keys, and the entries being the corresponding gene lists. - A dictionary of dictionaries defined like above, with names of gene group categories as keys. If passing one of those, specify `nested=True`. nested: Whether `targets` is a dictionary of dictionaries with group categories as keys. categories: If `targets=None` or `nested=True`, this argument can be used to subset the gene groups to one or more categories (keys of the original dictionary). In case of the ChEMBL drug targets, these are ATC level 1/level 2 category codes. pvals_adj_thresh: The `pvals_adj` cutoff to use on the `sc.tl.rank_genes_groups()` output to identify markers. direction: Whether to seek out up/down-regulated genes for the groups, based on the values from `scores`. Can be `up`, `down`, or `both` (for no selection). corr_method: Which FDR correction to apply to the p-values of the hypergeometric test. Can be `benjamini-hochberg` or `bonferroni`. Returns: Dictionary with clusters for which the original object markers were computed as the keys, and data frames of test results sorted on q-value as the items. """ universe = set(adata.var_names) targets = _prepare_targets(targets=targets, nested=nested, categories=categories) # type: ignore for group in targets: targets[group] = set(targets[group]).intersection(universe) # type: ignore # We remove empty keys since we don't need them targets = {k: v for k, v in targets.items() if v} overrepresentation = {} for cluster in adata.uns["rank_genes_groups"]["names"].dtype.names: results = pd.DataFrame( 1, index=list(targets.keys()), columns=[ "intersection", "gene_group", "markers", "universe", "pvals", "pvals_adj", ], ) mask = adata.uns["rank_genes_groups"]["pvals_adj"][cluster] < pvals_adj_thresh if direction == "up": mask = mask & (adata.uns["rank_genes_groups"]["scores"][cluster] > 0) elif direction == "down": mask = mask & (adata.uns["rank_genes_groups"]["scores"][cluster] < 0) markers = set(adata.uns["rank_genes_groups"]["names"][cluster][mask]) results["markers"] = len(markers) results["universe"] = len(universe) results["pvals"] = results["pvals"].astype(float) for ind in results.index: gene_group = targets[ind] common = gene_group.intersection(markers) # type: ignore results.loc[ind, "intersection"] = len(common) results.loc[ind, "gene_group"] = len(gene_group) # need to subtract 1 from the intersection length # https://alexlenail.medium.com/understanding-and-implementing-the-hypergeometric-test-in-python-a7db688a7458 pval = hypergeom.sf(len(common) - 1, len(universe), len(markers), len(gene_group)) results.loc[ind, "pvals"] = pval # Just in case any NaNs popped up somehow, fill them to 1 so FDR works results = results.fillna(1) if corr_method == "benjamini-hochberg": results["pvals_adj"] = multipletests(results["pvals"], method="fdr_bh")[1] elif corr_method == "bonferroni": results["pvals_adj"] = np.minimum(results["pvals"] * results.shape[0], 1.0) overrepresentation[cluster] = results.sort_values("pvals_adj") return overrepresentation
[docs] def gsea( self, adata: "AnnData", targets: dict[str, list[str] | dict[str, list[str]]] | None = None, nested: bool = False, categories: str | list[str] | None = None, absolute: bool = False, key_added: str = "pertpy_enrichment_gsea", ) -> dict[str, pd.DataFrame] | tuple[dict[str, pd.DataFrame], dict[str, dict]]: # pragma: no cover """Perform gene set enrichment analysis on the marker gene scores using blitzgsea. Args: adata: AnnData object with marker genes computed via `sc.tl.rank_genes_groups()` in the original expression space. targets: The gene groups to evaluate, either as a dictionary with names of the groups as keys and gene lists as values, or a dictionary of dictionaries with names of gene group categories as keys. Defaults to None, in which case it uses `d2c.score()` output or loads ChEMBL-derived drug target sets. nested: Indicates if `targets` is a dictionary of dictionaries with group categories as keys. Defaults to False. categories: Used to subset the gene groups to one or more categories, applicable if `targets=None` or `nested=True`. Defaults to None. absolute: If True, passes the absolute values of scores to GSEA, improving statistical power. Defaults to False. key_added: Prefix key that adds the results to `uns`. Defaults to `pertpy_enrichment_gsea`. Returns: A dictionary with clusters as keys and data frames of test results sorted on q-value as the items. """ targets = _prepare_targets(targets=targets, nested=nested, categories=categories) # type: ignore enrichment = {} plot_gsea_args: dict[str, Any] = {"targets": targets, "scores": {}} for cluster in adata.uns["rank_genes_groups"]["names"].dtype.names: df = pd.DataFrame( { "0": adata.uns["rank_genes_groups"]["names"][cluster], "1": adata.uns["rank_genes_groups"]["scores"][cluster], } ) if absolute: df["1"] = np.absolute(df["1"]) df = df.sort_values("1", ascending=False) enrichment[cluster] = blitzgsea.gsea(df, targets) plot_gsea_args["scores"][cluster] = df adata.uns[key_added] = plot_gsea_args return enrichment
[docs] def plot_dotplot( self, adata: AnnData, targets: dict[str, dict[str, list[str]]] = None, source: Literal["chembl", "dgidb", "pharmgkb"] = "chembl", category_name: str = "interaction_type", categories: Sequence[str] = None, groupby: str = None, key: str = "pertpy_enrichment", ax: Axes | None = None, save: bool | str | None = None, show: bool | None = None, **kwargs, ) -> DotPlot | dict | None: """Plots a dotplot by groupby and categories. Wraps scanpy's dotplot but formats it nicely by categories. Args: adata: An AnnData object with enrichment results stored in `.uns["pertpy_enrichment_score"]`. targets: Gene groups to evaluate, which can be targets of known drugs, GO terms, pathway memberships, etc. Accepts a dictionary of dictionaries with group categories as keys. If not provided, ChEMBL-derived or dgbidb drug target sets are used, given by `source`. source: Source of drug target sets when `targets=None`, `chembl`, `dgidb` or `pharmgkb`. Defaults to `chembl`. categories: To subset the gene groups to specific categories, especially when `targets=None`. For ChEMBL drug targets, these are ATC level 1/level 2 category codes. category_name: The name of category used to generate a nested drug target set when `targets=None` and `source=dgidb|pharmgkb`. Defaults to `interaction_type`. groupby: dotplot groupby such as clusters or cell types. key: Prefix key of enrichment results in `uns`. Defaults to `pertpy_enrichment`. kwargs: Passed to scanpy dotplot. Returns: If `return_fig` is `True`, returns a :class:`~scanpy.pl.DotPlot` object, else if `show` is false, return axes dict. Examples: >>> import pertpy as pt >>> import scanpy as sc >>> pt_enrichment = pt.tl.Enrichment() >>> adata = sc.datasets.pbmc3k_processed() >>> pt_enrichment.score(adata) >>> sc.tl.rank_genes_groups(adata, method="wilcoxon", groupby="louvain") >>> pt_enrichment.plot_dotplot(adata, categories=["B01", "B02", "B03"], groupby="louvain") Preview: .. image:: /_static/docstring_previews/enrichment_dotplot.png """ if categories is not None: if isinstance(categories, str): categories = [categories] else: categories = list(categories) if targets is None: pt_drug = Drug() if source == "chembl": pt_drug.chembl.set() targets = pt_drug.chembl.dictionary elif source == "dgidb": pt_drug.dgidb.set() interaction = pt_drug.dgidb.data if category_name not in interaction.columns: raise ValueError("The category name is not available in dgidb drug target data.") interaction[category_name] = interaction[category_name].fillna("Unknown/Other") targets = ( interaction.groupby(category_name) .apply(lambda x: x.groupby("drug_claim_name")["gene_claim_name"].apply(list).to_dict()) .to_dict() ) else: pt_drug.pharmgkb.set() interaction = pt_drug.pharmgkb.data if category_name not in interaction.columns: raise ValueError("The category name is not available in pharmgkb drug target data.") interaction[category_name] = interaction[category_name].fillna("Unknown/Other") targets = ( interaction.groupby(category_name) .apply(lambda x: x.groupby("Compound|Disease")["Gene"].apply(list).to_dict()) .to_dict() ) else: targets = targets.copy() if categories is not None: targets = {k: targets[k] for k in categories} # type: ignore for group in targets: targets[group] = list(targets[group].keys()) # type: ignore var_names: list[str] = [] var_group_positions: list[tuple[int, int]] = [] var_group_labels: list[str] = [] start = 0 enrichment_score_adata = AnnData(adata.uns[f"{key}_score"], obs=adata.obs) enrichment_score_adata.var_names = adata.uns[f"{key}_variables"] for group in targets: targets[group] = list( # type: ignore enrichment_score_adata.var_names[np.isin(enrichment_score_adata.var_names, targets[group])] ) if len(targets[group]) == 0: continue var_names = var_names + targets[group] # type: ignore var_group_positions = var_group_positions + [(start, len(var_names) - 1)] var_group_labels = var_group_labels + [group] start = len(var_names) plot_args = { "var_names": var_names, "var_group_positions": var_group_positions, "var_group_labels": var_group_labels, } return sc.pl.dotplot( enrichment_score_adata, groupby=groupby, swap_axes=True, ax=ax, save=save, show=show, **plot_args, **kwargs, )
[docs] def plot_gsea( self, adata: AnnData, enrichment: dict[str, pd.DataFrame], n: int = 10, key: str = "pertpy_enrichment_gsea", interactive_plot: bool = False, ) -> None: """Generates a blitzgsea top_table plot. This function is designed to visualize the results from a Gene Set Enrichment Analysis (GSEA). It uses the output from the `gsea()` method, which provides the enrichment data, and displays the top results using blitzgsea's `top_table()` plot. Args: adata: AnnData object to plot. enrichment: Cluster names as keys, blitzgsea's ``gsea()`` output as values. n: How many top scores to show for each group. Defaults to 10. key: GSEA results key in `uns`. Defaults to "pertpy_enrichment_gsea". interactive_plot: Whether to plot interactively or not. Defaults to False. Examples: >>> import pertpy as pt >>> import scanpy as sc >>> pt_enrichment = pt.tl.Enrichment() >>> adata = sc.datasets.pbmc3k_processed() >>> pt_enrichment.score(adata) >>> sc.tl.rank_genes_groups(adata, method="wilcoxon", groupby="louvain") >>> enrichment = pt_enrichment.gsea(adata) >>> pt_enrichment.plot_gsea(adata, enrichment, interactive_plot=True) Preview: .. image:: /_static/docstring_previews/enrichment_gsea.png """ for cluster in enrichment: fig = blitzgsea.plot.top_table( adata.uns[key]["scores"][cluster], adata.uns[key]["targets"], enrichment[cluster], n=n, interactive_plot=interactive_plot, ) fig.suptitle(cluster) fig.show()