Source code for

from __future__ import annotations

from typing import TYPE_CHECKING

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scanpy as sc
import scipy.stats as ss
import seaborn as sns
import sklearn.metrics
from ott.geometry import pointcloud
from ott.problems.linear import linear_problem
from ott.solvers.linear import sinkhorn, sinkhorn_lr
from scanpy.plotting import _utils
from scipy.sparse import issparse
from sklearn.decomposition import FastICA
from sklearn.linear_model import LinearRegression
from sklearn.neighbors import NearestNeighbors

    from anndata import AnnData
    from matplotlib.axes import Axes
    from import ArrayLike

[docs] class Cinemaot: """CINEMA-OT is a causal framework for perturbation effect analysis to identify individual treatment effects and synergy.""" def __init__(self): pass
[docs] def causaleffect( self, adata: AnnData, pert_key: str, control: str, return_matching: bool = False, cf_rep: str = "cf", use_rep: str = "X_pca", batch_size: int | None = None, dim: int | None = 20, thres: float = 0.15, smoothness: float = 1e-4, rank: int = 200, eps: float = 1e-3, solver: str = "Sinkhorn", preweight_label: str | None = None, ): """Calculate the confounding variation, optimal transport counterfactual pairs, and single-cell level treatment effects. Args: adata: The annotated data object. pert_key: The column of `.obs` with perturbation categories, should also contain `control`. control: Control category from the `pert_key` column. return_matching: Whether to return the matching matrix in the returned de.obsm['ot']. cf_rep: the place to put the confounder embedding in the original adata.obsm. use_rep: Use the indicated representation. `'X'` or any key for `.obsm` is valid. batch_size: Size of batch to calculate the optimal transport map. dim: Use the first dim components in use_rep. If none, use a biwhitening procedure on the raw count matrix to derive a reasonable rank. thres: the threshold for the rank-dependence metric. smoothness: the coefficient determining the smooth level in entropic optimal transport problem. rank: Only used if the solver "LRSinkhorn" is used. Specifies the rank number of the transport map. eps: Tolerate error of the optimal transport. solver: Either "Sinkhorn" or "LRSinkhorn". The ott-jax solver used. preweight_label: The annotated label (e.g. cell type) that is used to assign weights for treated and control cells to balance across the label. Helps overcome the differential abundance issue. Returns: Returns an AnnData object that contains the single-cell level treatment effect as de.X and the corresponding low dimensional embedding in de.obsm['X_embedding'], and optional matching matrix stored in the de.obsm['ot']. Also puts the confounding variation in adata.obsm[cf_rep]. Examples: >>> import pertpy as pt >>> adata = pt.dt.cinemaot_example() >>> model = >>> out_adata = model.causaleffect( >>> adata, pert_key="perturbation", control="No stimulation", return_matching=True, >>> thres=0.5, smoothness=1e-5, eps=1e-3, solver="Sinkhorn", preweight_label="cell_type0528") """ available_solvers = ["Sinkhorn", "LRSinkhorn"] if solver not in available_solvers: raise ValueError(f"solver = {solver} is not one of the supported solvers:" f" {available_solvers}") if dim is None: dim = self.get_dim(adata, use_rep=use_rep) transformer = FastICA(n_components=dim, random_state=0, whiten="arbitrary-variance") X_transformed = transformer.fit_transform(adata.obsm[use_rep][:, :dim]) groupvec = (adata.obs[pert_key] == control * 1).values # control xi = np.zeros(dim) j = 0 for source_row in X_transformed.T: xi_obj = Xi(source_row, groupvec * 1) xi[j] = xi_obj.correlation j = j + 1 cf = X_transformed[:, xi < thres] cf1 = cf[adata.obs[pert_key] == control, :] cf2 = cf[adata.obs[pert_key] != control, :] if sum(xi < thres) == 1: sklearn.metrics.pairwise_distances(cf1.reshape(-1, 1), cf2.reshape(-1, 1)) elif sum(xi < thres) == 0: raise ValueError("No confounder components identified. Please try a higher threshold.") else: sklearn.metrics.pairwise_distances(cf1, cf2) e = smoothness * sum(xi < thres) geom = pointcloud.PointCloud(cf1, cf2, epsilon=e, batch_size=batch_size) if preweight_label is None: ot_prob = linear_problem.LinearProblem(geom, a=None, b=None) else: # Implement a simple function here, taking adata.obs, output inverse prob weight. # For consistency, c is still the empirical distribution, while r is weighted. a = np.zeros(cf1.shape[0]) b = np.zeros(cf2.shape[0]) adata1 = adata[adata.obs[pert_key] == control, :].copy() adata2 = adata[adata.obs[pert_key] != control, :].copy() for label in adata1.obs[pert_key].unique(): for ct in adata1.obs[preweight_label].unique(): a[((adata1.obs[preweight_label] == ct) & (adata1.obs[pert_key] == label)).values] = np.sum( (adata2.obs[preweight_label] == ct).values ) / np.sum((adata1.obs[preweight_label] == ct).values) a[(adata1.obs[pert_key] == label).values] = a[(adata1.obs[pert_key] == label).values] / np.sum( a[(adata1.obs[pert_key] == label).values] ) a = a / np.sum(a) b[:] = 1 / cf2.shape[0] ot_prob = linear_problem.LinearProblem(geom, a=a, b=b) if solver == "LRSinkhorn": if rank is None: rank = int(min(cf1.shape[0], cf2.shape[0]) / 2) _solver = sinkhorn_lr.LRSinkhorn(rank=rank, threshold=eps) ot_sink = _solver(ot_prob) embedding = ( X_transformed[adata.obs[pert_key] != control, :] - ot_sink.apply(X_transformed[adata.obs[pert_key] == control, :].T).T / ot_sink.apply(np.ones_like(X_transformed[adata.obs[pert_key] == control, :].T)).T ) if issparse(adata.X): te2 = ( adata.X.toarray()[adata.obs[pert_key] != control, :] - ot_sink.apply(adata.X.toarray()[adata.obs[pert_key] == control, :].T).T / ot_sink.apply(np.ones_like(adata.X.toarray()[adata.obs[pert_key] == control, :].T)).T ) else: te2 = ( adata.X[adata.obs[pert_key] != control, :] - ot_sink.apply(adata.X[adata.obs[pert_key] == control, :].T).T / ot_sink.apply(np.ones_like(adata.X[adata.obs[pert_key] == control, :].T)).T ) adata.obsm[cf_rep] = cf adata.obsm[cf_rep][adata.obs[pert_key] != control, :] = ( ot_sink.apply(adata.obsm[cf_rep][adata.obs[pert_key] == control, :].T).T / ot_sink.apply(np.ones_like(adata.obsm[cf_rep][adata.obs[pert_key] == control, :].T)).T ) else: _solver = sinkhorn.Sinkhorn(threshold=eps) ot_sink = _solver(ot_prob) ot_matrix = ot_sink.matrix.T embedding = X_transformed[adata.obs[pert_key] != control, :] - np.matmul( ot_matrix / np.sum(ot_matrix, axis=1)[:, None], X_transformed[adata.obs[pert_key] == control, :] ) if issparse(adata.X): te2 = adata.X.toarray()[adata.obs[pert_key] != control, :] - np.matmul( ot_matrix / np.sum(ot_matrix, axis=1)[:, None], adata.X.toarray()[adata.obs[pert_key] == control, :] ) else: te2 = adata.X[adata.obs[pert_key] != control, :] - np.matmul( ot_matrix / np.sum(ot_matrix, axis=1)[:, None], adata.X[adata.obs[pert_key] == control, :] ) adata.obsm[cf_rep] = cf adata.obsm[cf_rep][adata.obs[pert_key] != control, :] = np.matmul( ot_matrix / np.sum(ot_matrix, axis=1)[:, None], adata.obsm[cf_rep][adata.obs[pert_key] == control, :] ) TE = sc.AnnData(np.array(te2), obs=adata[adata.obs[pert_key] != control, :].obs.copy(), var=adata.var.copy()) TE.obsm["X_embedding"] = embedding if return_matching: TE.obsm["ot"] = ot_sink.matrix.T return TE else: return TE
[docs] def causaleffect_weighted( self, adata: AnnData, pert_key: str, control: str, return_matching: bool = False, cf_rep: str = "cf", use_rep: str = "X_pca", batch_size: int | None = None, k: int = 20, dim: int | None = 20, thres: float = 0.15, smoothness: float = 1e-4, rank: int = 200, eps: float = 1e-3, solver: str = "Sinkhorn", resolution: float = 1.0, ): """The resampling CINEMA-OT algorithm that allows tackling the differential abundance in an unsupervised manner. Args: adata: The annotated data object. pert_key: The column of `.obs` with perturbation categories, should also contain `control`. control: Control category from the `pert_key` column. return_matching: Whether to return the matching matrix in the returned de.obsm['ot']. cf_rep: the place to put the confounder embedding in the original adata.obsm. use_rep: Use the indicated representation. `'X'` or any key for `.obsm` is valid. batch_size: Size of batch to calculate the optimal transport map. k: the number of neighbors used in the k-NN matching phase. dim: Use the first dim components in use_rep. If None, use a biwhitening procedure on the raw count matrix to derive a reasonable rank. thres: the threshold for the rank-dependence metric. smoothness: the coefficient determining the smooth level in entropic optimal transport problem. rank: Only used if the solver "LRSinkhorn" is used. Specifies the rank number of the transport map. eps: Tolerate error of the optimal transport. solver: Either "Sinkhorn" or "LRSinkhorn". The ott-jax solver used. resolution: the clustering resolution used in the sampling phase. Returns: Returns an anndata object that contains the single-cell level treatment effect as de.X and the corresponding low dimensional embedding in de.obsm['X_embedding'], and optional matching matrix stored in the de.obsm['ot']. Also puts the confounding variation in adata.obsm[cf_rep]. Examples: >>> import pertpy as pt >>> adata = pt.dt.cinemaot_example() >>> model = >>> ad, de = model.causaleffect_weighted( >>> adata, pert_key="perturbation", control="No stimulation", return_matching=True, >>> thres=0.5, smoothness=1e-5, eps=1e-3, solver="Sinkhorn") """ available_solvers = ["Sinkhorn", "LRSinkhorn"] assert solver in available_solvers, ( f"solver = {solver} is not one of the supported solvers:" f" {available_solvers}" ) if dim is None: dim = self.get_dim(adata, use_rep=use_rep) adata.obs_names_make_unique() idx = self.get_weightidx(adata, pert_key=pert_key, control=control, k=k, use_rep=use_rep, resolution=resolution) adata_ = adata[idx].copy() TE = self.causaleffect( adata_, pert_key, control, return_matching=return_matching, cf_rep=cf_rep, use_rep=use_rep, batch_size=batch_size, dim=dim, thres=thres, smoothness=smoothness, rank=rank, eps=eps, solver=solver, ) return adata_, TE
[docs] def generate_pseudobulk( self, adata: AnnData, de: AnnData, pert_key: str, control: str, label_list: list, cf_rep: str = "cf", de_rep: str = "X_embedding", cf_resolution: float = 0.5, de_resolution: float = 0.5, use_raw: bool = True, ): """Generating pseudobulk anndata considering the differential response behaviors revealed by CINEMA-OT. Requires running Cinemaot.causaleffect() or Cinemaot.causaleffect_weighted() first. Args: adata: The annotated data object. de: The anndata output from Cinemaot.causaleffect() or Cinemaot.causaleffect_weighted(). pert_key: The column of `.obs` with perturbation categories, should also contain `control`. control: Control category from the `pert_key` column. label_list: Additional covariate labels used to segragate pseudobulk. Should at least contain sample information (sample 1, sample 2,..., etc). cf_rep: the place to put the confounder embedding in the original adata.obsm. de_rep: Use the indicated representation in de.obsm. assign_cf: If a str is passed, a label in adata.obs instead of confounder Leiden label is used. cf_resolution: The leiden clustering resolution for the confounder. de_resolution: The leiden clustering resolution for the differential response. use_raw: If true, use adata.raw.X to aggregate the pseudobulk profiles. Otherwise use adata.X. Returns: Returns an anndata object that contains aggregated pseudobulk profiles and associated metadata. Examples: >>> import pertpy as pt >>> adata = pt.dt.cinemaot_example() >>> model = >>> de = model.causaleffect( >>> adata, pert_key="perturbation", control="No stimulation", return_matching=True, thres=0.5, >>> smoothness=1e-5, eps=1e-3, solver="Sinkhorn", preweight_label="cell_type0528") >>> adata_pb = model.generate_pseudobulk( >>> adata, de, pert_key="perturbation", control="No stimulation", label_list=None) """ sc.pp.neighbors(de, use_rep=de_rep), resolution=de_resolution) if use_raw: if issparse(adata.raw.X): df = pd.DataFrame(adata.raw.X.toarray(), columns=adata.raw.var_names, index=adata.raw.obs_names) else: df = pd.DataFrame(adata.raw.X, columns=adata.raw.var_names, index=adata.raw.obs_names) else: if issparse(adata.X): df = pd.DataFrame(adata.X.toarray(), columns=adata.var_names, index=adata.obs_names) else: df = pd.DataFrame(adata.X, columns=adata.var_names, index=adata.obs_names) if label_list is None: label_list = ["ct"] sc.pp.neighbors(adata, use_rep=cf_rep), resolution=cf_resolution) df["ct"] = adata.obs["leiden"].astype(str) df["ptb"] = "control" df["ptb"][adata.obs[pert_key] != control] = de.obs["leiden"].astype(str) label_list.append("ptb") df = df.groupby(label_list).sum() new_index = x: "_".join(map(str, x))) df_ = df.set_index(new_index) adata_pb = sc.AnnData(df_) adata_pb.obs = pd.DataFrame( df.index.to_frame().values, index=adata_pb.obs_names, columns=df.index.to_frame().columns, ) return adata_pb
[docs] def get_dim( self, adata: AnnData, c: float = 0.5, use_rep: str = "X_pca", ): """Estimating the rank of the count matrix. Always use adata.raw.X. Make sure it is the raw count matrix. Args: adata: The annotated data object. c: the parameter regarding the quadratic variance distribution. c=0 means Poisson count matrices. use_rep: the embedding used to give a upper bound for the estimated rank. Returns: Returns the estimated dimension number. Examples: >>> import pertpy as pt >>> adata = pt.dt.cinemaot_example() >>> model = >>> dim = model.get_dim(adata) """ sk = SinkhornKnopp() if issparse(adata.raw.X): data = adata.raw.X.toarray() else: data = adata.raw.X vm = (1e-3 + data + c * data * data) / (1 + c) wm =, vm), np.sqrt(sk._D2)) u, s, vt = np.linalg.svd(wm) dim = min(sum(s > (np.sqrt(data.shape[0]) + np.sqrt(data.shape[1]))), adata.obsm[use_rep].shape[1]) return dim
[docs] def get_weightidx( self, adata: AnnData, pert_key: str, control: str, use_rep: str = "X_pca", k: int = 20, resolution: float = 1.0, ): """Generating the resampled indices that balances covariates across treatment conditions. Args: adata: The annotated data object. c: the parameter regarding the quadratic variance distribution. c=0 means Poisson count matrices. use_rep: the embedding used to give a upper bound for the estimated rank. k: the number of neighbors used in the k-NN matching phase. resolution: the clustering resolution used in the sampling phase. Returns: Returns the indices. Examples: >>> import pertpy as pt >>> adata = pt.dt.cinemaot_example() >>> model = >>> idx = model.get_weightidx(adata, pert_key="perturbation", control="No stimulation") """ adata_ = adata.copy() X_pca1 = adata_.obsm[use_rep][adata_.obs[pert_key] == control, :] X_pca2 = adata_.obsm[use_rep][adata_.obs[pert_key] != control, :] nbrs = NearestNeighbors(n_neighbors=k, algorithm="ball_tree").fit(X_pca1) mixscape_pca = adata.obsm[use_rep].copy() mixscapematrix = nbrs.kneighbors_graph(X_pca2).toarray() mixscape_pca[adata_.obs[pert_key] != control, :] = (, mixscape_pca[adata_.obs[pert_key] == control, :]) / k ) adata_.obsm["X_mpca"] = mixscape_pca sc.pp.neighbors(adata_, use_rep="X_mpca"), resolution=resolution) j = 0 ref_label = "noncontrol" expr_label = "control" adata_.obs["ct"] = ref_label adata_.obs["ct"][adata_.obs[pert_key] == control] = expr_label pert_key = "ct" z = np.zeros(adata_.shape[0]) + 1 for i in adata_.obs["leiden"].cat.categories: if ( adata_[(adata_.obs["leiden"] == i) & (adata_.obs[pert_key] == ref_label)].shape[0] >= adata_[(adata_.obs["leiden"] == i) & (adata_.obs[pert_key] == expr_label)].shape[0] ): z[(adata_.obs["leiden"] == i) & (adata_.obs[pert_key] == ref_label)] = ( adata_[(adata_.obs["leiden"] == i) & (adata_.obs[pert_key] == expr_label)].shape[0] / adata_[(adata_.obs["leiden"] == i) & (adata_.obs[pert_key] == ref_label)].shape[0] ) if j == 0: idx = sc.pp.subsample( adata_[(adata_.obs["leiden"] == i) & (adata_.obs[pert_key] == ref_label)], n_obs=adata_[(adata_.obs["leiden"] == i) & (adata_.obs[pert_key] == expr_label)].shape[0], copy=True, ).obs.index idx = idx.append( adata_[(adata_.obs["leiden"] == i) & (adata_.obs[pert_key] == expr_label)].obs.index ) j = j + 1 else: idx_tmp = sc.pp.subsample( adata_[(adata_.obs["leiden"] == i) & (adata_.obs[pert_key] == ref_label)], n_obs=adata_[(adata_.obs["leiden"] == i) & (adata_.obs[pert_key] == expr_label)].shape[0], copy=True, ).obs.index idx_tmp = idx_tmp.append( adata_[(adata_.obs["leiden"] == i) & (adata_.obs[pert_key] == expr_label)].obs.index ) idx = idx.append(idx_tmp) else: z[(adata_.obs["leiden"] == i) & (adata_.obs[pert_key] == expr_label)] = ( adata_[(adata_.obs["leiden"] == i) & (adata_.obs[pert_key] == ref_label)].shape[0] / adata_[(adata_.obs["leiden"] == i) & (adata_.obs[pert_key] == expr_label)].shape[0] ) if j == 0: idx = sc.pp.subsample( adata_[(adata_.obs["leiden"] == i) & (adata_.obs[pert_key] == expr_label)], n_obs=adata_[(adata_.obs["leiden"] == i) & (adata_.obs[pert_key] == ref_label)].shape[0], copy=True, ).obs.index idx = idx.append( adata_[(adata_.obs["leiden"] == i) & (adata_.obs[pert_key] == ref_label)].obs.index ) j = j + 1 else: idx_tmp = sc.pp.subsample( adata_[(adata_.obs["leiden"] == i) & (adata_.obs[pert_key] == expr_label)], n_obs=adata_[(adata_.obs["leiden"] == i) & (adata_.obs[pert_key] == ref_label)].shape[0], copy=True, ).obs.index idx_tmp = idx_tmp.append( adata_[(adata_.obs["leiden"] == i) & (adata_.obs[pert_key] == ref_label)].obs.index ) idx = idx.append(idx_tmp) return idx
[docs] def synergy( self, adata: AnnData, pert_key: str, base: str, A: str, B: str, AB: str, dim: int | None = 20, thres: float = 0.15, smoothness: float = 1e-4, preweight_label: str | None = None, **kwargs, ): """A wrapper for computing the synergy matrices. Args: adata: The annotated data object. pert_key: The column of `.obs` with perturbation categories, should also contain `control`. base: Control category from the `pert_key` column. A: the category for perturbation A. B: the category for perturbation B. AB: the category for the combinatorial perturbation A+B. dim: Use the first dim components in use_rep. If none, use a biwhitening procedure on the raw count matrix to derive a reasonable rank. thres: the threshold for the rank-dependence metric. smoothness: the coefficient determining the smooth level in entropic optimal transport problem. eps: Tolerate error of the optimal transport. preweight_label: the annotated label (e.g. cell type) that is used to assign weights for treated and control cells to balance across the label. Helps overcome the differential abundance issue. **kwargs: other parameters that can be passed to Cinemaot.causaleffect() Returns: Returns an AnnData object that contains the single-cell level synergy matrix de.X and the embedding. Examples: >>> import pertpy as pt >>> adata = pt.dt.dong_2023() >>> sc.pp.pca(adata) >>> model = >>> combo = model.synergy(adata, pert_key='perturbation', base='No stimulation', A='IFNb', B='IFNg', >>> AB='IFNb+ IFNg', thres=0.5, smoothness=1e-5, eps=1e-3, solver='Sinkhorn') """ adata1 = adata[adata.obs[pert_key].isin([base, A]), :].copy() adata2 = adata[adata.obs[pert_key].isin([B, AB]), :].copy() adata_link = adata[adata.obs[pert_key].isin([base, B]), :].copy() de1 = self.causaleffect( adata1, pert_key=pert_key, control=A, return_matching=True, dim=dim, thres=thres, smoothness=smoothness, preweight_label=preweight_label, **kwargs, ) ot1 = de1.obsm["ot"] # noqa: F841 de2 = self.causaleffect( adata2, pert_key=pert_key, control=AB, return_matching=True, dim=dim, thres=thres, smoothness=smoothness, preweight_label=preweight_label, **kwargs, ) ot2 = de2.obsm["ot"] # noqa: F841 de0 = self.causaleffect( adata_link, pert_key=pert_key, control=B, return_matching=True, dim=dim, thres=thres, smoothness=smoothness, preweight_label=preweight_label, **kwargs, ) ot0 = de0.obsm["ot"] syn = sc.AnnData( np.array(-((ot0 / np.sum(ot0, axis=1)[:, None]) @ de2.X - de1.X)), obs=de1.obs.copy(), var=de1.var.copy() ) syn.obsm["X_embedding"] = (ot0 / np.sum(ot0, axis=1)[:, None]) @ de2.obsm["X_embedding"] - de1.obsm[ "X_embedding" ] return syn
[docs] def attribution_scatter( self, adata: AnnData, pert_key: str, control: str, cf_rep: str = "cf", use_raw: bool = False, ): """A simple function for computing confounder-specific genes. Args: adata: The annotated data object. pert_key: The column of `.obs` with perturbation categories, should also contain `control`. control: Control category from the `pert_key` column. cf_rep: the place to put the confounder embedding in the original adata.obsm. use_raw: If true, use adata.raw.X to aggregate the pseudobulk profiles. Otherwise use adata.X. Returns: Returns the confounder effect (c_effect) and the residual effect (s_effect). Examples: >>> import pertpy as pt >>> adata = pt.dt.cinemaot_example() >>> model = >>> c_effect, s_effect = model.attribution_scatter(adata, pert_key="perturbation", control="No stimulation") """ cf = adata.obsm[cf_rep] if use_raw: if issparse(adata.X): Y0 = adata.raw.X.toarray()[adata.obs[pert_key] == control, :] Y1 = adata.raw.X.toarray()[adata.obs[pert_key] != control, :] else: Y0 = adata.raw.X[adata.obs[pert_key] == control, :] Y1 = adata.raw.X[adata.obs[pert_key] != control, :] else: if issparse(adata.X): Y0 = adata.X.toarray()[adata.obs[pert_key] == control, :] Y1 = adata.X.toarray()[adata.obs[pert_key] != control, :] else: Y0 = adata.X[adata.obs[pert_key] == control, :] Y1 = adata.X[adata.obs[pert_key] != control, :] X0 = cf[adata.obs[pert_key] == control, :] X1 = cf[adata.obs[pert_key] != control, :] ols0 = LinearRegression(), Y0) ols1 = LinearRegression(), Y1) c0 = ols0.predict(X0) - np.mean(ols0.predict(X0), axis=0) c1 = ols1.predict(X1) - np.mean(ols1.predict(X1), axis=0) e0 = Y0 - ols0.predict(X0) e1 = Y1 - ols1.predict(X1) c_effect = (np.linalg.norm(c1, axis=0) + 1e-6) / (np.linalg.norm(c0, axis=0) + 1e-6) s_effect = (np.linalg.norm(e1, axis=0) + 1e-6) / (np.linalg.norm(e0, axis=0) + 1e-6) return c_effect, s_effect
[docs] def plot_vis_matching( self, adata: AnnData, de: AnnData, pert_key: str, control: str, de_label: str, source_label: str, matching_rep: str = "ot", resolution: float = 0.5, normalize: str = "col", title: str = "CINEMA-OT matching matrix", min_val: float = 0.01, show: bool = True, save: str | None = None, ax: Axes | None = None, **kwargs, ) -> None: """Visualize the CINEMA-OT matching matrix. Args: adata: the original anndata after running cinemaot.causaleffect or cinemaot.causaleffect_weighted. de: The anndata output from Cinemaot.causaleffect() or Cinemaot.causaleffect_weighted(). pert_key: The column of `.obs` with perturbation categories, should also contain `control`. control: Control category from the `pert_key` column. de_label: the label for differential response. If none, use leiden cluster labels at resolution 1.0. source_label: the confounder / cell type label. matching_rep: the place that stores the matching matrix. default de.obsm['ot']. normalize: normalize the coarse-grained matching matrix by row / column. title: the title for the figure. min_val: The min value to truncate the matching matrix. show: Show the plot, do not return axis. save: If `True` or a `str`, save the figure. A string is appended to the default filename. Infer the filetype if ending on {`'.pdf'`, `'.png'`, `'.svg'`}. **kwargs: Other parameters to input for seaborn.heatmap. Examples: >>> import pertpy as pt >>> adata = pt.dt.cinemaot_example() >>> cot = >>> de = cot.causaleffect( >>> adata, pert_key="perturbation", control="No stimulation", return_matching=True, >>> thres=0.5, smoothness=1e-5, eps=1e-3, solver="Sinkhorn", preweight_label="cell_type0528") >>> cot.plot_vis_matching( >>> adata, de, pert_key="perturbation",control="No stimulation", de_label=None, source_label="cell_type0528") """ adata_ = adata[adata.obs[pert_key] == control] df = pd.DataFrame(de.obsm[matching_rep]) if de_label is None: de_label = "leiden" sc.pp.neighbors(de, use_rep="X_embedding"), resolution=resolution) df["de_label"] = de.obs[de_label].astype(str).values df["de_label"] = "Response " + df["de_label"] df = df.groupby("de_label").sum().T df["source_label"] = adata_.obs[source_label].astype(str).values df = df.groupby("source_label").sum() if normalize == "col": df = df / df.sum(axis=0) else: df = (df.T / df.sum(axis=1)).T df = df.clip(lower=min_val) - min_val if normalize == "col": df = df / df.sum(axis=0) else: df = (df.T / df.sum(axis=1)).T g = sns.heatmap(df, annot=True, ax=ax, **kwargs) plt.title(title) _utils.savefig_or_show("matching_heatmap", show=show, save=save) if not show: if ax is not None: return ax else: return g
class Xi: """ A fast implementation of cross-rank dependence metric used in CINEMA-OT. """ def __init__(self, x, y): self.x = x self.y = y @property def sample_size(self): return len(self.x) @property def x_ordered_rank(self): # PI is the rank vector for x, with ties broken at random # Not mine: source ( # random shuffling of the data - reason to use random.choice is that # pd.sample(frac=1) uses the same randomizing algorithm len_x = len(self.x) rng = np.random.default_rng() randomized_indices = rng.choice(np.arange(len_x), len_x, replace=False) randomized = [self.x[idx] for idx in randomized_indices] # same as pandas rank method 'first' rankdata = ss.rankdata(randomized, method="ordinal") # Reindexing based on pairs of indices before and after unrandomized = [rankdata[j] for i, j in sorted(zip(randomized_indices, range(len_x), strict=False))] return unrandomized @property def y_rank_max(self): # f[i] is number of j s.t. y[j] <= y[i], divided by n. return ss.rankdata(self.y, method="max") / self.sample_size @property def g(self): # g[i] is number of j s.t. y[j] >= y[i], divided by n. return ss.rankdata([-i for i in self.y], method="max") / self.sample_size @property def x_ordered(self): # order of the x's, ties broken at random. return np.argsort(self.x_ordered_rank) @property def x_rank_max_ordered(self): x_ordered_result = self.x_ordered y_rank_max_result = self.y_rank_max # Rearrange f according to ord. return [y_rank_max_result[i] for i in x_ordered_result] @property def mean_absolute(self): x1 = self.x_rank_max_ordered[0 : (self.sample_size - 1)] x2 = self.x_rank_max_ordered[1 : self.sample_size] return ( np.mean( np.abs( [ x - y for x, y in zip( x1, x2, strict=False, ) ] ) ) * (self.sample_size - 1) / (2 * self.sample_size) ) @property def inverse_g_mean(self): gvalue = self.g return np.mean(gvalue * (1 - gvalue)) @property def correlation(self): """xi correlation""" return 1 - self.mean_absolute / self.inverse_g_mean @classmethod def xi(cls, x, y): return cls(x, y) def pval_asymptotic(self, ties: bool = False): """Returns p values of the correlation. Args: ties: boolean If ties is true, the algorithm assumes that the data has ties and employs the more elaborated theory for calculating the P-value. Otherwise, it uses the simpler theory. There is no harm in setting tiles True, even if there are no ties. Returns: p value """ # If there are no ties, return xi and theoretical P-value: if ties: return 1 - ss.norm.cdf(np.sqrt(self.sample_size) * self.correlation / np.sqrt(2 / 5)) # If there are ties, and the theoretical method is to be used for calculation P-values: # The following steps calculate the theoretical variance in the presence of ties: sorted_ordered_x_rank = sorted(self.x_rank_max_ordered) ind = [i + 1 for i in range(self.sample_size)] ind2 = [2 * self.sample_size - 2 * ind[i - 1] + 1 for i in ind] a = np.mean([i * j * j for i, j in zip(ind2, sorted_ordered_x_rank, strict=False)]) / self.sample_size c = np.mean([i * j for i, j in zip(ind2, sorted_ordered_x_rank, strict=False)]) / self.sample_size cq = np.cumsum(sorted_ordered_x_rank) m = [ (i + (self.sample_size - j) * k) / self.sample_size for i, j, k in zip(cq, ind, sorted_ordered_x_rank, strict=False) ] b = np.mean([np.square(i) for i in m]) v = (a - 2 * b + np.square(c)) / np.square(self.inverse_g_mean) return 1 - ss.norm.cdf(np.sqrt(self.sample_size) * self.correlation / np.sqrt(v)) class SinkhornKnopp: """ An simple implementation of Sinkhorn iteration used in the biwhitening approach. """ def __init__(self, max_iter: float = 1000, setr: int = 0, setc: float = 0, epsilon: float = 1e-3): if max_iter < 0: raise ValueError(f"max_iter is {max_iter} but must be greater than 0.") self._max_iter = int(max_iter) if not epsilon > 0 and epsilon < 1: raise ValueError(f"epsilon is {epsilon} but must be between 0 and 1 exclusive.") self._epsilon = epsilon self._setr = setr self._setc = setc self._stopping_condition: str | None = None self._iterations = 0 self._D1 = np.ones(1) self._D2 = np.ones(1) def fit(self, P: ArrayLike): """Fit the diagonal matrices in Sinkhorn Knopp's algorithm. Args: P: 2d array-like Must be a square non-negative 2d array-like object, that is convertible to a numpy array. The matrix must not be equal to 0 and it must have total support for the algorithm to converge. Returns: A double stochastic matrix. """ P = np.asarray(P) assert np.all(P >= 0) assert P.ndim == 2 N = P.shape[0] if np.sum(abs(self._setr)) == 0: rsum = P.shape[1] else: rsum = self._setr if np.sum(abs(self._setc)) == 0: csum = P.shape[0] else: csum = self._setc max_threshr = rsum + self._epsilon min_threshr = rsum - self._epsilon max_threshc = csum + self._epsilon min_threshc = csum - self._epsilon # Initialize r and c, the diagonals of D1 and D2 # and warn if the matrix does not have support. r = np.ones((N, 1)) pdotr = c = 1 / pdotr pdotc = r = 1 / pdotc del pdotr, pdotc P_eps = np.copy(P) while ( np.any(np.sum(P_eps, axis=1) < min_threshr) or np.any(np.sum(P_eps, axis=1) > max_threshr) or np.any(np.sum(P_eps, axis=0) < min_threshc) or np.any(np.sum(P_eps, axis=0) > max_threshc) ): c = csum / r = rsum / self._D1 = np.diag(np.squeeze(r)) self._D2 = np.diag(np.squeeze(c)) P_eps = np.diag(self._D1)[:, None] * P * np.diag(self._D2)[None, :] self._iterations += 1 if self._iterations >= self._max_iter: self._stopping_condition = "max_iter" break if not self._stopping_condition: self._stopping_condition = "epsilon" self._D1 = np.diag(np.squeeze(r)) self._D2 = np.diag(np.squeeze(c)) P_eps = np.diag(self._D1)[:, None] * P * np.diag(self._D2)[None, :] return P_eps