Source code for pertpy.tools._milo

from __future__ import annotations

import logging
import random
import re
from typing import TYPE_CHECKING, Literal

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scanpy as sc
import seaborn as sns
from anndata import AnnData
from lamin_utils import logger
from mudata import MuData

if TYPE_CHECKING:
    from collections.abc import Sequence

    from matplotlib.axes import Axes
    from matplotlib.colors import Colormap

try:
    from rpy2.robjects import conversion, numpy2ri, pandas2ri
    from rpy2.robjects.packages import STAP, PackageNotInstalledError, importr
except ModuleNotFoundError:
    logger.warn("ryp2 is not installed. Install with pip install rpy2 [yellow]to run tools with R support.")
from scipy.sparse import csr_matrix
from sklearn.metrics.pairwise import euclidean_distances


[docs] class Milo: """Python implementation of Milo.""" def __init__(self): pass
[docs] def load( self, input: AnnData, feature_key: str | None = "rna", ) -> MuData: """Prepare a MuData object for subsequent processing. Args: input: AnnData feature_key: Key to store the cell-level AnnData object in the MuData object Returns: MuData: MuData object with original AnnData. Defaults to`mudata[feature_key]`. Examples: >>> import pertpy as pt >>> adata = pt.dt.bhattacherjee() >>> milo = pt.tl.Milo() >>> mdata = milo.load(adata) """ mdata = MuData({feature_key: input, "milo": AnnData()}) return mdata
[docs] def make_nhoods( self, data: AnnData | MuData, neighbors_key: str | None = None, feature_key: str | None = "rna", prop: float = 0.1, seed: int = 0, copy: bool = False, ): """Randomly sample vertices on a KNN graph to define neighbourhoods of cells. The set of neighborhoods get refined by computing the median profile for the neighbourhood in reduced dimensional space and by selecting the nearest vertex to this position. Thus, multiple neighbourhoods may be collapsed to prevent over-sampling the graph space. Args: data: AnnData object with KNN graph defined in `obsp` or MuData object with a modality with KNN graph defined in `obsp` neighbors_key: The key in `adata.obsp` or `mdata[feature_key].obsp` to use as KNN graph. If not specified, `make_nhoods` looks .obsp[‘connectivities’] for connectivities (default storage places for `scanpy.pp.neighbors`). If specified, it looks at .obsp[.uns[neighbors_key][‘connectivities_key’]] for connectivities. Defaults to None. feature_key: If input data is MuData, specify key to cell-level AnnData object. Defaults to 'rna'. prop: Fraction of cells to sample for neighbourhood index search. Defaults to 0.1. seed: Random seed for cell sampling. Defaults to 0. copy: Determines whether a copy of the `adata` is returned. Defaults to False. Returns: If `copy=True`, returns the copy of `adata` with the result in `.obs`, `.obsm`, and `.uns`. Otherwise: nhoods: scipy.sparse._csr.csr_matrix in `adata.obsm['nhoods']`. A binary matrix of cell to neighbourhood assignments. Neighbourhoods in the columns are ordered by the order of the index cell in adata.obs_names nhood_ixs_refined: pandas.Series in `adata.obs['nhood_ixs_refined']`. A boolean indicating whether a cell is an index for a neighbourhood nhood_kth_distance: pandas.Series in `adata.obs['nhood_kth_distance']`. The distance to the kth nearest neighbour for each index cell (used for SpatialFDR correction) nhood_neighbors_key: `adata.uns["nhood_neighbors_key"]` KNN graph key, used for neighbourhood construction Examples: >>> import pertpy as pt >>> import scanpy as sc >>> adata = pt.dt.bhattacherjee() >>> milo = pt.tl.Milo() >>> mdata = milo.load(adata) >>> sc.pp.neighbors(mdata["rna"]) >>> milo.make_nhoods(mdata["rna"]) """ if isinstance(data, MuData): adata = data[feature_key] if isinstance(data, AnnData): adata = data if copy: adata = adata.copy() # Get reduced dim used for KNN graph if neighbors_key is None: try: use_rep = adata.uns["neighbors"]["params"]["use_rep"] except KeyError: logging.warning("Using X_pca as default embedding") use_rep = "X_pca" try: knn_graph = adata.obsp["connectivities"].copy() except KeyError: logger.error('No "connectivities" slot in adata.obsp -- please run scanpy.pp.neighbors(adata) first') raise else: try: use_rep = adata.uns[neighbors_key]["params"]["use_rep"] except KeyError: logging.warning("Using X_pca as default embedding") use_rep = "X_pca" knn_graph = adata.obsp[neighbors_key + "_connectivities"].copy() X_dimred = adata.obsm[use_rep] n_ixs = int(np.round(adata.n_obs * prop)) knn_graph[knn_graph != 0] = 1 random.seed(seed) random_vertices = random.sample(range(adata.n_obs), k=n_ixs) random_vertices.sort() ixs_nn = knn_graph[random_vertices, :] non_zero_rows = ixs_nn.nonzero()[0] non_zero_cols = ixs_nn.nonzero()[1] refined_vertices = np.empty( shape=[ len(random_vertices), ] ) for i in range(len(random_vertices)): nh_pos = np.median(X_dimred[non_zero_cols[non_zero_rows == i], :], 0).reshape(-1, 1) nn_ixs = non_zero_cols[non_zero_rows == i] # Find closest real point (amongst nearest neighbors) dists = euclidean_distances(X_dimred[non_zero_cols[non_zero_rows == i], :], nh_pos.T) # Update vertex index refined_vertices[i] = nn_ixs[dists.argmin()] refined_vertices = np.unique(refined_vertices.astype("int")) refined_vertices.sort() nhoods = knn_graph[:, refined_vertices] adata.obsm["nhoods"] = nhoods # Add ixs to adata adata.obs["nhood_ixs_random"] = adata.obs_names.isin(adata.obs_names[random_vertices]) adata.obs["nhood_ixs_refined"] = adata.obs_names.isin(adata.obs_names[refined_vertices]) adata.obs["nhood_ixs_refined"] = adata.obs["nhood_ixs_refined"].astype("int") adata.obs["nhood_ixs_random"] = adata.obs["nhood_ixs_random"].astype("int") adata.uns["nhood_neighbors_key"] = neighbors_key # Store distance to K-th nearest neighbor (used for spatial FDR correction) if neighbors_key is None: knn_dists = adata.obsp["distances"] else: knn_dists = adata.obsp[neighbors_key + "_distances"] nhood_ixs = adata.obs["nhood_ixs_refined"] == 1 dist_mat = knn_dists[nhood_ixs, :] k_distances = dist_mat.max(1).toarray().ravel() adata.obs["nhood_kth_distance"] = 0 adata.obs.loc[adata.obs["nhood_ixs_refined"] == 1, "nhood_kth_distance"] = k_distances if copy: return adata
[docs] def count_nhoods( self, data: AnnData | MuData, sample_col: str, feature_key: str | None = "rna", ): """Builds a sample-level AnnData object storing the matrix of cell counts per sample per neighbourhood. Args: data: AnnData object with neighbourhoods defined in `obsm['nhoods']` or MuData object with a modality with neighbourhoods defined in `obsm['nhoods']` sample_col: Column in adata.obs that contains sample information feature_key: If input data is MuData, specify key to cell-level AnnData object. Defaults to 'rna'. Returns: MuData object storing the original (i.e. rna) AnnData in `mudata[feature_key]` and the compositional anndata storing the neighbourhood cell counts in `mudata['milo']`. Here: - `mudata['milo'].obs_names` are samples (defined from `adata.obs['sample_col']`) - `mudata['milo'].var_names` are neighbourhoods - `mudata['milo'].X` is the matrix counting the number of cells from each sample in each neighbourhood Examples: >>> import pertpy as pt >>> import scanpy as sc >>> adata = pt.dt.bhattacherjee() >>> milo = pt.tl.Milo() >>> mdata = milo.load(adata) >>> sc.pp.neighbors(mdata["rna"]) >>> milo.make_nhoods(mdata["rna"]) >>> mdata = milo.count_nhoods(mdata, sample_col="orig.ident") """ if isinstance(data, MuData): adata = data[feature_key] is_MuData = True if isinstance(data, AnnData): adata = data is_MuData = False if isinstance(adata, AnnData): try: nhoods = adata.obsm["nhoods"] except KeyError: logger.error('Cannot find "nhoods" slot in adata.obsm -- please run milopy.make_nhoods(adata)') raise # Make nhood abundance matrix sample_dummies = pd.get_dummies(adata.obs[sample_col]) all_samples = sample_dummies.columns sample_dummies = csr_matrix(sample_dummies.values) nhood_count_mat = nhoods.T.dot(sample_dummies) sample_obs = pd.DataFrame(index=all_samples) sample_adata = AnnData(X=nhood_count_mat.T, obs=sample_obs, dtype=np.float32) sample_adata.uns["sample_col"] = sample_col # Save nhood index info sample_adata.var["index_cell"] = adata.obs_names[adata.obs["nhood_ixs_refined"] == 1] sample_adata.var["kth_distance"] = adata.obs.loc[ adata.obs["nhood_ixs_refined"] == 1, "nhood_kth_distance" ].values if is_MuData is True: data.mod["milo"] = sample_adata return data else: milo_mdata = MuData({feature_key: adata, "milo": sample_adata}) return milo_mdata
[docs] def da_nhoods( self, mdata: MuData, design: str, model_contrasts: str | None = None, subset_samples: list[str] | None = None, add_intercept: bool = True, feature_key: str | None = "rna", solver: Literal["edger", "batchglm"] = "edger", ): """Performs differential abundance testing on neighbourhoods using QLF test implementation as implemented in edgeR. Args: mdata: MuData object design: Formula for the test, following glm syntax from R (e.g. '~ condition'). Terms should be columns in `milo_mdata[feature_key].obs`. model_contrasts: A string vector that defines the contrasts used to perform DA testing, following glm syntax from R (e.g. "conditionDisease - conditionControl"). If no contrast is specified (default), then the last categorical level in condition of interest is used as the test group. Defaults to None. subset_samples: subset of samples (obs in `milo_mdata['milo']`) to use for the test. Defaults to None. add_intercept: whether to include an intercept in the model. If False, this is equivalent to adding + 0 in the design formula. When model_contrasts is specified, this is set to False by default. Defaults to True. feature_key: If input data is MuData, specify key to cell-level AnnData object. Defaults to 'rna'. solver: The solver to fit the model to. One of "edger" (requires R, rpy2 and edgeR to be installed) or "batchglm" Returns: None, modifies `milo_mdata['milo']` in place, adding the results of the DA test to `.var`: - `logFC` stores the log fold change in cell abundance (coefficient from the GLM) - `PValue` stores the p-value for the QLF test before multiple testing correction - `SpatialFDR` stores the p-value adjusted for multiple testing to limit the false discovery rate, calculated with weighted Benjamini-Hochberg procedure Examples: >>> import pertpy as pt >>> import scanpy as sc >>> adata = pt.dt.bhattacherjee() >>> milo = pt.tl.Milo() >>> mdata = milo.load(adata) >>> sc.pp.neighbors(mdata["rna"]) >>> milo.make_nhoods(mdata["rna"]) >>> mdata = milo.count_nhoods(mdata, sample_col="orig.ident") >>> milo.da_nhoods(mdata, design="~label") """ try: sample_adata = mdata["milo"] except KeyError: logger.error( "milo_mdata should be a MuData object with two slots:" " feature_key and 'milo' - please run milopy.count_nhoods() first" ) raise adata = mdata[feature_key] covariates = [x.strip(" ") for x in set(re.split("\\+|\\*", design.lstrip("~ ")))] # Add covariates used for testing to sample_adata.var sample_col = sample_adata.uns["sample_col"] try: sample_obs = adata.obs[covariates + [sample_col]].drop_duplicates() except KeyError: missing_cov = [x for x in covariates if x not in sample_adata.obs.columns] logger.warning("Covariates {c} are not columns in adata.obs".format(c=" ".join(missing_cov))) raise sample_obs = sample_obs[covariates + [sample_col]] sample_obs.index = sample_obs[sample_col].astype("str") try: assert sample_obs.loc[sample_adata.obs_names].shape[0] == len(sample_adata.obs_names) except AssertionError: logger.warning( f"Values in mdata[{feature_key}].obs[{covariates}] cannot be unambiguously assigned to each sample" f" -- each sample value should match a single covariate value" ) raise sample_adata.obs = sample_obs.loc[sample_adata.obs_names] # Get design dataframe try: design_df = sample_adata.obs[covariates] except KeyError: missing_cov = [x for x in covariates if x not in sample_adata.obs.columns] logger.error( 'Covariates {c} are not columns in adata.uns["sample_adata"].obs'.format(c=" ".join(missing_cov)) ) raise # Get count matrix count_mat = sample_adata.X.T.toarray() lib_size = count_mat.sum(0) # Filter out samples with zero counts keep_smp = lib_size > 0 # Subset samples if subset_samples is not None: keep_smp = keep_smp & sample_adata.obs_names.isin(subset_samples) design_df = design_df[keep_smp] for i, e in enumerate(design_df.columns): if design_df.dtypes[i].name == "category": design_df[e] = design_df[e].cat.remove_unused_categories() # Filter out nhoods with zero counts (they can appear after sample filtering) keep_nhoods = count_mat[:, keep_smp].sum(1) > 0 if solver == "edger": # Set up rpy2 to run edgeR edgeR, limma, stats, base = self._setup_rpy2() # Define model matrix if not add_intercept or model_contrasts is not None: design = design + " + 0" model = stats.model_matrix(object=stats.formula(design), data=design_df) # Fit NB-GLM dge = edgeR.DGEList(counts=count_mat[keep_nhoods, :][:, keep_smp], lib_size=lib_size[keep_smp]) dge = edgeR.calcNormFactors(dge, method="TMM") dge = edgeR.estimateDisp(dge, model) fit = edgeR.glmQLFit(dge, model, robust=True) # Test n_coef = model.shape[1] if model_contrasts is not None: r_str = """ get_model_cols <- function(design_df, design){ m = model.matrix(object=formula(design), data=design_df) return(colnames(m)) } """ get_model_cols = STAP(r_str, "get_model_cols") model_mat_cols = get_model_cols.get_model_cols(design_df, design) model_df = pd.DataFrame(model) model_df.columns = model_mat_cols try: mod_contrast = limma.makeContrasts(contrasts=model_contrasts, levels=model_df) except ValueError: logger.error("Model contrasts must be in the form 'A-B' or 'A+B'") raise res = base.as_data_frame( edgeR.topTags(edgeR.glmQLFTest(fit, contrast=mod_contrast), sort_by="none", n=np.inf) ) else: res = base.as_data_frame(edgeR.topTags(edgeR.glmQLFTest(fit, coef=n_coef), sort_by="none", n=np.inf)) res = conversion.rpy2py(res) if not isinstance(res, pd.DataFrame): res = pd.DataFrame(res) # Save outputs res.index = sample_adata.var_names[keep_nhoods] # type: ignore if any(col in sample_adata.var.columns for col in res.columns): sample_adata.var = sample_adata.var.drop(res.columns, axis=1) sample_adata.var = pd.concat([sample_adata.var, res], axis=1) # Run Graph spatial FDR correction self._graph_spatial_fdr(sample_adata, neighbors_key=adata.uns["nhood_neighbors_key"])
[docs] def annotate_nhoods( self, mdata: MuData, anno_col: str, feature_key: str | None = "rna", ): """Assigns a categorical label to neighbourhoods, based on the most frequent label among cells in each neighbourhood. This can be useful to stratify DA testing results by cell types or samples. Args: mdata: MuData object anno_col: Column in adata.obs containing the cell annotations to use for nhood labelling feature_key: If input data is MuData, specify key to cell-level AnnData object. Defaults to 'rna'. Returns: None. Adds in place: - `milo_mdata['milo'].var["nhood_annotation"]`: assigning a label to each nhood - `milo_mdata['milo'].var["nhood_annotation_frac"]` stores the fraciton of cells in the neighbourhood with the assigned label - `milo_mdata['milo'].varm['frac_annotation']`: stores the fraction of cells from each label in each nhood - `milo_mdata['milo'].uns["annotation_labels"]`: stores the column names for `milo_mdata['milo'].varm['frac_annotation']` Examples: >>> import pertpy as pt >>> import scanpy as sc >>> adata = pt.dt.bhattacherjee() >>> milo = pt.tl.Milo() >>> mdata = milo.load(adata) >>> sc.pp.neighbors(mdata["rna"]) >>> milo.make_nhoods(mdata["rna"]) >>> mdata = milo.count_nhoods(mdata, sample_col="orig.ident") >>> milo.annotate_nhoods(mdata, anno_col="cell_type") """ try: sample_adata = mdata["milo"] except KeyError: logger.error( "milo_mdata should be a MuData object with two slots: feature_key and 'milo' - please run milopy.count_nhoods(adata) first" ) raise adata = mdata[feature_key] # Check value is not numeric if pd.api.types.is_numeric_dtype(adata.obs[anno_col]): raise ValueError( "adata.obs[anno_col] is not of categorical type - please use milopy.utils.annotate_nhoods_continuous for continuous variables" ) anno_dummies = pd.get_dummies(adata.obs[anno_col]) anno_count = adata.obsm["nhoods"].T.dot(csr_matrix(anno_dummies.values)) anno_count_dense = anno_count.toarray() anno_sum = anno_count_dense.sum(1) anno_frac = np.divide(anno_count_dense, anno_sum[:, np.newaxis]) anno_frac_dataframe = pd.DataFrame(anno_frac, columns=anno_dummies.columns, index=sample_adata.var_names) sample_adata.varm["frac_annotation"] = anno_frac_dataframe.values sample_adata.uns["annotation_labels"] = anno_frac_dataframe.columns sample_adata.uns["annotation_obs"] = anno_col sample_adata.var["nhood_annotation"] = anno_frac_dataframe.idxmax(1) sample_adata.var["nhood_annotation_frac"] = anno_frac_dataframe.max(1)
[docs] def annotate_nhoods_continuous(self, mdata: MuData, anno_col: str, feature_key: str | None = "rna"): """Assigns a continuous value to neighbourhoods, based on mean cell level covariate stored in adata.obs. This can be useful to correlate DA log-foldChanges with continuous covariates such as pseudotime, gene expression scores etc... Args: mdata: MuData object anno_col: Column in adata.obs containing the cell annotations to use for nhood labelling feature_key: If input data is MuData, specify key to cell-level AnnData object. Defaults to 'rna'. Returns: None. Adds in place: - `milo_mdata['milo'].var["nhood_{anno_col}"]`: assigning a continuous value to each nhood Examples: >>> import pertpy as pt >>> import scanpy as sc >>> adata = pt.dt.bhattacherjee() >>> milo = pt.tl.Milo() >>> mdata = milo.load(adata) >>> sc.pp.neighbors(mdata["rna"]) >>> milo.make_nhoods(mdata["rna"]) >>> mdata = milo.count_nhoods(mdata, sample_col="orig.ident") >>> milo.annotate_nhoods_continuous(mdata, anno_col="nUMI") """ if "milo" not in mdata.mod: raise ValueError( "milo_mdata should be a MuData object with two slots: feature_key and 'milo' - please run milopy.count_nhoods(adata) first" ) adata = mdata[feature_key] # Check value is not categorical if not pd.api.types.is_numeric_dtype(adata.obs[anno_col]): raise ValueError( "adata.obs[anno_col] is not of continuous type - please use milopy.utils.annotate_nhoods for categorical variables" ) anno_val = adata.obsm["nhoods"].T.dot(csr_matrix(adata.obs[anno_col]).T) mean_anno_val = anno_val.toarray() / np.array(adata.obsm["nhoods"].T.sum(1)) mdata["milo"].var[f"nhood_{anno_col}"] = mean_anno_val
[docs] def add_covariate_to_nhoods_var(self, mdata: MuData, new_covariates: list[str], feature_key: str | None = "rna"): """Add covariate from cell-level obs to sample-level obs. These should be covariates for which a single value can be assigned to each sample. Args: mdata: MuData object new_covariates: columns in `milo_mdata[feature_key].obs` to add to `milo_mdata['milo'].obs`. feature_key: If input data is MuData, specify key to cell-level AnnData object. Defaults to 'rna'. Returns: None, adds columns to `milo_mdata['milo']` in place Examples: >>> import pertpy as pt >>> import scanpy as sc >>> adata = pt.dt.bhattacherjee() >>> milo = pt.tl.Milo() >>> mdata = milo.load(adata) >>> sc.pp.neighbors(mdata["rna"]) >>> milo.make_nhoods(mdata["rna"]) >>> mdata = milo.count_nhoods(mdata, sample_col="orig.ident") >>> milo.add_covariate_to_nhoods_var(mdata, new_covariates=["label"]) """ try: sample_adata = mdata["milo"] except KeyError: logger.error( "milo_mdata should be a MuData object with two slots: feature_key and 'milo' - please run milopy.count_nhoods(adata) first" ) raise adata = mdata[feature_key] sample_col = sample_adata.uns["sample_col"] covariates = list( set(sample_adata.obs.columns[sample_adata.obs.columns != sample_col].tolist() + new_covariates) ) try: sample_obs = adata.obs[covariates + [sample_col]].drop_duplicates() except KeyError: missing_cov = [covar for covar in covariates if covar not in sample_adata.obs.columns] logger.error("Covariates {c} are not columns in adata.obs".format(c=" ".join(missing_cov))) raise sample_obs = sample_obs[covariates + [sample_col]].astype("str") sample_obs.index = sample_obs[sample_col] try: assert sample_obs.loc[sample_adata.obs_names].shape[0] == len(sample_adata.obs_names) except ValueError: logger.error( "Covariates cannot be unambiguously assigned to each sample -- each sample value should match a single covariate value" ) raise sample_adata.obs = sample_obs.loc[sample_adata.obs_names]
[docs] def build_nhood_graph(self, mdata: MuData, basis: str = "X_umap", feature_key: str | None = "rna"): """Build graph of neighbourhoods used for visualization of DA results Args: mdata: MuData object basis: Name of the obsm basis to use for layout of neighbourhoods (key in `adata.obsm`). Defaults to "X_umap". feature_key: If input data is MuData, specify key to cell-level AnnData object. Defaults to 'rna'. Returns: - `milo_mdata['milo'].varp['nhood_connectivities']`: graph of overlap between neighbourhoods (i.e. no of shared cells) - `milo_mdata['milo'].var["Nhood_size"]`: number of cells in neighbourhoods Examples: >>> import pertpy as pt >>> import scanpy as sc >>> adata = pt.dt.bhattacherjee() >>> milo = pt.tl.Milo() >>> mdata = milo.load(adata) >>> sc.pp.neighbors(mdata["rna"]) >>> sc.tl.umap(mdata["rna"]) >>> milo.make_nhoods(mdata["rna"]) >>> mdata = milo.count_nhoods(mdata, sample_col="orig.ident") >>> milo.build_nhood_graph(mdata) """ adata = mdata[feature_key] # # Add embedding positions mdata["milo"].varm["X_milo_graph"] = adata[adata.obs["nhood_ixs_refined"] == 1].obsm[basis] # Add nhood size mdata["milo"].var["Nhood_size"] = np.array(adata.obsm["nhoods"].sum(0)).flatten() # Add adjacency graph mdata["milo"].varp["nhood_connectivities"] = adata.obsm["nhoods"].T.dot(adata.obsm["nhoods"]) mdata["milo"].varp["nhood_connectivities"].setdiag(0) mdata["milo"].varp["nhood_connectivities"].eliminate_zeros() mdata["milo"].uns["nhood"] = { "connectivities_key": "nhood_connectivities", "distances_key": "", }
[docs] def add_nhood_expression(self, mdata: MuData, layer: str | None = None, feature_key: str | None = "rna"): """Calculates the mean expression in neighbourhoods of each feature. Args: mdata: MuData object layer: If provided, use `milo_mdata[feature_key][layer]` as expression matrix instead of `milo_mdata[feature_key].X`. Defaults to None. feature_key: If input data is MuData, specify key to cell-level AnnData object. Defaults to 'rna'. Returns: Updates adata in place to store the matrix of average expression in each neighbourhood in `milo_mdata['milo'].varm['expr']` Examples: >>> import pertpy as pt >>> import scanpy as sc >>> adata = pt.dt.bhattacherjee() >>> milo = pt.tl.Milo() >>> mdata = milo.load(adata) >>> sc.pp.neighbors(mdata["rna"]) >>> milo.make_nhoods(mdata["rna"]) >>> mdata = milo.count_nhoods(mdata, sample_col="orig.ident") >>> milo.add_nhood_expression(mdata) """ try: sample_adata = mdata["milo"] except KeyError: logger.error( "milo_mdata should be a MuData object with two slots:" " feature_key and 'milo' - please run milopy.count_nhoods(adata) first" ) raise adata = mdata[feature_key] # Get gene expression matrix if layer is None: X = adata.X expr_id = "expr" else: X = adata.layers[layer] expr_id = "expr_" + layer # Aggregate over nhoods -- taking the mean nhoods_X = X.T.dot(adata.obsm["nhoods"]) nhoods_X = csr_matrix(nhoods_X / adata.obsm["nhoods"].toarray().sum(0)) sample_adata.varm[expr_id] = nhoods_X.T
def _setup_rpy2( self, ): """Set up rpy2 to run edgeR""" numpy2ri.activate() pandas2ri.activate() edgeR = self._try_import_bioc_library("edgeR") limma = self._try_import_bioc_library("limma") stats = importr("stats") base = importr("base") return edgeR, limma, stats, base def _try_import_bioc_library( self, name: str, ): """Import R packages. Args: name (str): R packages name """ try: _r_lib = importr(name) return _r_lib except PackageNotInstalledError: logger.error(f"Install Bioconductor library `{name!r}` first as `BiocManager::install({name!r}).`") raise def _graph_spatial_fdr( self, sample_adata: AnnData, neighbors_key: str | None = None, ): """FDR correction weighted on inverse of connectivity of neighbourhoods. The distance to the k-th nearest neighbor is used as a measure of connectivity. Args: sample_adata: Sample-level AnnData. neighbors_key: The key in `adata.obsp` to use as KNN graph. Defaults to None. """ # use 1/connectivity as the weighting for the weighted BH adjustment from Cydar w = 1 / sample_adata.var["kth_distance"] w[np.isinf(w)] = 0 # Computing a density-weighted q-value. pvalues = sample_adata.var["PValue"] keep_nhoods = ~pvalues.isna() # Filtering in case of test on subset of nhoods o = pvalues[keep_nhoods].argsort() pvalues = pvalues[keep_nhoods][o] w = w[keep_nhoods][o] adjp = np.zeros(shape=len(o)) adjp[o] = (sum(w) * pvalues / np.cumsum(w))[::-1].cummin()[::-1] adjp = np.array([x if x < 1 else 1 for x in adjp]) sample_adata.var["SpatialFDR"] = np.nan sample_adata.var.loc[keep_nhoods, "SpatialFDR"] = adjp
[docs] def plot_nhood_graph( self, mdata: MuData, alpha: float = 0.1, min_logFC: float = 0, min_size: int = 10, plot_edges: bool = False, title: str = "DA log-Fold Change", color_map: Colormap | str | None = None, palette: str | Sequence[str] | None = None, ax: Axes | None = None, show: bool | None = None, save: bool | str | None = None, **kwargs, ) -> None: """Visualize DA results on abstracted graph (wrapper around sc.pl.embedding) Args: mdata: MuData object alpha: Significance threshold. (default: 0.1) min_logFC: Minimum absolute log-Fold Change to show results. If is 0, show all significant neighbourhoods. Defaults to 0. min_size: Minimum size of nodes in visualization. (default: 10) plot_edges: If edges for neighbourhood overlaps whould be plotted. Defaults to False. title: Plot title. Defaults to "DA log-Fold Change". show: Show the plot, do not return axis. save: If `True` or a `str`, save the figure. A string is appended to the default filename. Infer the filetype if ending on {`'.pdf'`, `'.png'`, `'.svg'`}. **kwargs: Additional arguments to `scanpy.pl.embedding`. Examples: >>> import pertpy as pt >>> import scanpy as sc >>> adata = pt.dt.bhattacherjee() >>> milo = pt.tl.Milo() >>> mdata = milo.load(adata) >>> sc.pp.neighbors(mdata["rna"]) >>> sc.tl.umap(mdata["rna"]) >>> milo.make_nhoods(mdata["rna"]) >>> mdata = milo.count_nhoods(mdata, sample_col="orig.ident") >>> milo.da_nhoods(mdata, >>> design='~label', >>> model_contrasts='labelwithdraw_15d_Cocaine-labelwithdraw_48h_Cocaine') >>> milo.build_nhood_graph(mdata) >>> milo.plot_nhood_graph(mdata) Preview: .. image:: /_static/docstring_previews/milo_nhood_graph.png """ nhood_adata = mdata["milo"].T.copy() if "Nhood_size" not in nhood_adata.obs.columns: raise KeyError( 'Cannot find "Nhood_size" column in adata.uns["nhood_adata"].obs -- \ please run milopy.utils.build_nhood_graph(adata)' ) nhood_adata.obs["graph_color"] = nhood_adata.obs["logFC"] nhood_adata.obs.loc[nhood_adata.obs["SpatialFDR"] > alpha, "graph_color"] = np.nan nhood_adata.obs["abs_logFC"] = abs(nhood_adata.obs["logFC"]) nhood_adata.obs.loc[nhood_adata.obs["abs_logFC"] < min_logFC, "graph_color"] = np.nan # Plotting order - extreme logFC on top nhood_adata.obs.loc[nhood_adata.obs["graph_color"].isna(), "abs_logFC"] = np.nan ordered = nhood_adata.obs.sort_values("abs_logFC", na_position="first").index nhood_adata = nhood_adata[ordered] vmax = np.max([nhood_adata.obs["graph_color"].max(), abs(nhood_adata.obs["graph_color"].min())]) vmin = -vmax sc.pl.embedding( nhood_adata, "X_milo_graph", color="graph_color", cmap="RdBu_r", size=nhood_adata.obs["Nhood_size"] * min_size, edges=plot_edges, neighbors_key="nhood", sort_order=False, frameon=False, vmax=vmax, vmin=vmin, title=title, color_map=color_map, palette=palette, ax=ax, show=show, save=save, **kwargs, )
[docs] def plot_nhood( self, mdata: MuData, ix: int, feature_key: str | None = "rna", basis: str = "X_umap", color_map: Colormap | str | None = None, palette: str | Sequence[str] | None = None, return_fig: bool | None = None, ax: Axes | None = None, show: bool | None = None, save: bool | str | None = None, **kwargs, ) -> None: """Visualize cells in a neighbourhood. Args: mdata: MuData object with feature_key slot, storing neighbourhood assignments in `mdata[feature_key].obsm['nhoods']` ix: index of neighbourhood to visualize basis: Embedding to use for visualization. Defaults to "X_umap". show: Show the plot, do not return axis. save: If True or a str, save the figure. A string is appended to the default filename. Infer the filetype if ending on {'.pdf', '.png', '.svg'}. **kwargs: Additional arguments to `scanpy.pl.embedding`. Examples: >>> import pertpy as pt >>> import scanpy as sc >>> adata = pt.dt.bhattacherjee() >>> milo = pt.tl.Milo() >>> mdata = milo.load(adata) >>> sc.pp.neighbors(mdata["rna"]) >>> sc.tl.umap(mdata["rna"]) >>> milo.make_nhoods(mdata["rna"]) >>> milo.plot_nhood(mdata, ix=0) Preview: .. image:: /_static/docstring_previews/milo_nhood.png """ mdata[feature_key].obs["Nhood"] = mdata[feature_key].obsm["nhoods"][:, ix].toarray().ravel() sc.pl.embedding( mdata[feature_key], basis, color="Nhood", size=30, title="Nhood" + str(ix), color_map=color_map, palette=palette, return_fig=return_fig, ax=ax, show=show, save=save, **kwargs, )
[docs] def plot_da_beeswarm( self, mdata: MuData, feature_key: str | None = "rna", anno_col: str = "nhood_annotation", alpha: float = 0.1, subset_nhoods: list[str] = None, palette: str | Sequence[str] | dict[str, str] | None = None, return_fig: bool | None = None, save: bool | str | None = None, show: bool | None = None, ) -> None: """Plot beeswarm plot of logFC against nhood labels Args: mdata: MuData object anno_col: Column in adata.uns['nhood_adata'].obs to use as annotation. (default: 'nhood_annotation'.) alpha: Significance threshold. (default: 0.1) subset_nhoods: List of nhoods to plot. If None, plot all nhoods. Defaults to None. palette: Name of Seaborn color palette for violinplots. Defaults to pre-defined category colors for violinplots. Examples: >>> import pertpy as pt >>> import scanpy as sc >>> adata = pt.dt.bhattacherjee() >>> milo = pt.tl.Milo() >>> mdata = milo.load(adata) >>> sc.pp.neighbors(mdata["rna"]) >>> milo.make_nhoods(mdata["rna"]) >>> mdata = milo.count_nhoods(mdata, sample_col="orig.ident") >>> milo.da_nhoods(mdata, design="~label") >>> milo.annotate_nhoods(mdata, anno_col="cell_type") >>> milo.plot_da_beeswarm(mdata) Preview: .. image:: /_static/docstring_previews/milo_da_beeswarm.png """ try: nhood_adata = mdata["milo"].T.copy() except KeyError: raise RuntimeError( "mdata should be a MuData object with two slots: feature_key and 'milo'. Run 'milopy.count_nhoods(adata)' first." ) from None try: nhood_adata.obs[anno_col] except KeyError: raise RuntimeError( f"Unable to find {anno_col} in mdata['milo'].var. Run 'milopy.utils.annotate_nhoods(adata, anno_col)' first" ) from None if subset_nhoods is not None: nhood_adata = nhood_adata[nhood_adata.obs[anno_col].isin(subset_nhoods)] try: nhood_adata.obs["logFC"] except KeyError: raise RuntimeError( "Unable to find 'logFC' in mdata.uns['nhood_adata'].obs. Run 'core.da_nhoods(adata)' first." ) from None sorted_annos = ( nhood_adata.obs[[anno_col, "logFC"]].groupby(anno_col).median().sort_values("logFC", ascending=True).index ) anno_df = nhood_adata.obs[[anno_col, "logFC", "SpatialFDR"]].copy() anno_df["is_signif"] = anno_df["SpatialFDR"] < alpha anno_df = anno_df[anno_df[anno_col] != "nan"] try: obs_col = nhood_adata.uns["annotation_obs"] if palette is None: palette = dict( zip( mdata[feature_key].obs[obs_col].cat.categories, mdata[feature_key].uns[f"{obs_col}_colors"], strict=False, ) ) sns.violinplot( data=anno_df, y=anno_col, x="logFC", order=sorted_annos, inner=None, orient="h", palette=palette, linewidth=0, scale="width", ) except BaseException: # noqa: BLE001 sns.violinplot( data=anno_df, y=anno_col, x="logFC", order=sorted_annos, inner=None, orient="h", linewidth=0, scale="width", ) sns.stripplot( data=anno_df, y=anno_col, x="logFC", order=sorted_annos, size=2, hue="is_signif", palette=["grey", "black"], orient="h", alpha=0.5, ) plt.legend(loc="upper left", title=f"< {int(alpha * 100)}% SpatialFDR", bbox_to_anchor=(1, 1), frameon=False) plt.axvline(x=0, ymin=0, ymax=1, color="black", linestyle="--") if save: plt.savefig(save, bbox_inches="tight") if show: plt.show() if return_fig: return plt.gcf() if (not show and not save) or (show is None and save is None): return plt.gca()
[docs] def plot_nhood_counts_by_cond( self, mdata: MuData, test_var: str, subset_nhoods: list[str] = None, log_counts: bool = False, return_fig: bool | None = None, save: bool | str | None = None, show: bool | None = None, ) -> None: """Plot boxplot of cell numbers vs condition of interest. Args: mdata: MuData object storing cell level and nhood level information test_var: Name of column in adata.obs storing condition of interest (y-axis for boxplot) subset_nhoods: List of obs_names for neighbourhoods to include in plot. If None, plot all nhoods. Defaults to None. log_counts: Whether to plot log1p of cell counts. Defaults to False. """ try: nhood_adata = mdata["milo"].T.copy() except KeyError: raise RuntimeError( "mdata should be a MuData object with two slots: feature_key and 'milo'. Run milopy.count_nhoods(mdata) first" ) from None if subset_nhoods is None: subset_nhoods = nhood_adata.obs_names pl_df = pd.DataFrame(nhood_adata[subset_nhoods].X.A, columns=nhood_adata.var_names).melt( var_name=nhood_adata.uns["sample_col"], value_name="n_cells" ) pl_df = pd.merge(pl_df, nhood_adata.var) pl_df["log_n_cells"] = np.log1p(pl_df["n_cells"]) if not log_counts: sns.boxplot(data=pl_df, x=test_var, y="n_cells", color="lightblue") sns.stripplot(data=pl_df, x=test_var, y="n_cells", color="black", s=3) plt.ylabel("# cells") else: sns.boxplot(data=pl_df, x=test_var, y="log_n_cells", color="lightblue") sns.stripplot(data=pl_df, x=test_var, y="log_n_cells", color="black", s=3) plt.ylabel("log(# cells + 1)") plt.xticks(rotation=90) plt.xlabel(test_var) if save: plt.savefig(save, bbox_inches="tight") if show: plt.show() if return_fig: return plt.gcf() if not (show or save): return plt.gca()