-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutil.py
160 lines (128 loc) · 4.54 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import os
import numpy as np
import scipy.sparse as sp
import pandas as pd
from sklearn.preprocessing import minmax_scale
import seaborn as sns
import matplotlib.pyplot as plt
import umap
from sklearn.manifold import TSNE
import torch
from torch.utils.data import Dataset
def graph_celltype_regu_handler(adj, cluster_labels):
adjdense = sp.csr_matrix.todense(adj)
adjdense = normalize_cell_cell_matrix(adjdense)
celltypesample = generateCelltypeRegu(cluster_labels)
celltypesample = normalize_cell_cell_matrix(celltypesample)
return (adjdense, celltypesample)
def normalize_cell_cell_matrix(x):
avg_factor = 1 / np.ma.sum(x, axis=1).reshape((x.shape[0],-1))
avg_factor = np.ma.filled(avg_factor, fill_value=0)
avg_mtx = np.tile(avg_factor, [1, len(x)])
return avg_mtx * x
def generateCelltypeRegu(listResult):
celltypesample = np.zeros((len(listResult), len(listResult)))
tdict = {}
count = 0
for item in listResult:
if item in tdict:
tlist = tdict[item]
else:
tlist = []
tlist.append(count)
tdict[item] = tlist
count += 1
for key in sorted(tdict):
tlist = tdict[key]
for item1 in tlist:
for item2 in tlist:
celltypesample[item1, item2] = 1.0
return celltypesample
class ExpressionDataset(Dataset):
def __init__(
self,
X=None,
transform=None
):
"""
Args:
X : ndarray (dense) or list of lists (sparse) [cell * gene]
transform (callable, optional): apply transform function if not none
"""
self.X = X # [cell * gene]
# save nonzero
# self.nz_i,self.nz_j = self.features.nonzero()
self.transform = transform
def __len__(self):
return self.X.shape[0] # of cell
def __getitem__(self, idx):
# Get sample (one cell)
if torch.is_tensor(idx):
idx = idx.tolist()
sample = self.X[idx, :]
# Convert to Tensor
if type(sample) == sp.lil_matrix:
sample = torch.from_numpy(sample.toarray())
else:
sample = torch.from_numpy(sample)
# Transform
if self.transform:
sample = self.transform(sample)
return sample, idx
class ClusterDataset(ExpressionDataset):
def __init__(self, X=None, transform=None):
super().__init__(X, transform)
def plot(y, xlabel='epochs', ylabel='', hline=None, output_dir='', suffix=''):
plt.plot(range(len(y)), y)
plt.xlabel(xlabel)
plt.ylabel(ylabel)
plt.axhline(y=hline, color='green', linestyle='-') if hline else None
plt.savefig(os.path.join(output_dir, f"{ylabel.replace(' ', '_')}{suffix}.png"), dpi=200)
plt.clf()
def drawUMAP(z, listResult, output_dir, filename_suffix=None):
"""
UMAP
"""
reducer = umap.UMAP(random_state=1)
embedding = reducer.fit_transform(z)
size = len(set(listResult)) + 1
filename_suffix = f'_{filename_suffix}' if filename_suffix else ''
plt.scatter(embedding[:, 0], embedding[:, 1],
c=listResult, cmap='Spectral', s=5)
plt.gca().set_aspect('equal', 'datalim')
plt.colorbar(boundaries=np.arange(int(size)) - 0.5).set_ticks(np.arange(int(size)))
plt.title('UMAP projection', fontsize=24)
plt.savefig(os.path.join(output_dir, f"UMAP{filename_suffix}.png"), dpi=300)
plt.clf()
def drawTSNE(z, listResult, output_dir):
size = len(set(listResult))
tsne = TSNE(n_components=2, verbose=1, perplexity=40, n_iter=300)
tsne_results = tsne.fit_transform(z)
df_subset = pd.DataFrame()
df_subset['tsne-2d-one'] = tsne_results[:, 0]
df_subset['tsne-2d-two'] = tsne_results[:, 1]
df_subset['Cluster'] = listResult
plt.figure(figsize=(16, 10))
sns.scatterplot(
x="tsne-2d-one",
y="tsne-2d-two",
hue="Cluster",
palette=sns.color_palette("brg", int(size)),
data=df_subset,
legend="full",
# alpha=0.3
)
plt.savefig(os.path.join(output_dir, f"tSNE.png"), dpi=300)
plt.clf()
def imputation_err_heatmap(X_sc, X_imputed, cluster_labels=None, args=None):
pass
def normalizer(X, base, axis=0):
upper = np.quantile(base, q=0.9)
lower = np.quantile(base, q=0.1)
if upper != lower:
normalized = minmax_scale(X, feature_range=(lower, upper), axis=axis)
else:
max = np.quantile(base, q=1)
min = np.quantile(base, q=0)
normalized = minmax_scale(X, feature_range=(min, max), axis=axis)
return normalized