forked from yongbinfeng/DQEMCalTests
-
Notifications
You must be signed in to change notification settings - Fork 0
/
applyCorrection.py
172 lines (137 loc) · 6 KB
/
applyCorrection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import numpy as np
import ROOT
import os
from modules.CNNModel import loadCNNModel
from modules.fitFunction import fitFunction, loadResults
from modules.utils import getChannelMap
from modules.runinfo import runinfo, GetFitRange, GetRunInfo, CheckRunExists
def applyCNNRegression(model, chans, run, verbose=False, applyCutForWeightPlots=True):
predictions, weights = model.predict(chans)
predictions = predictions.astype(np.float64)
raw = np.sum(chans, axis=(1, 2, 3))
selection = np.ones(len(raw), dtype=bool)
if applyCutForWeightPlots:
if CheckRunExists(run):
be, atten, ndf, _ = GetRunInfo(run)
try:
_, _, fitmin, fitmax = GetFitRange(be * 8.0, atten, ndf)
selection = (raw > fitmin) & (raw < fitmax)
except KeyError:
print(
f"Run {run} with {be*8.0} GeV and attenuation {atten} not found in FitRange")
else:
print(f"Run {run} not found in runinfo")
weights_sel = weights[selection]
weights_avg = np.mean(weights_sel, axis=0)
weights_std = np.std(weights_sel, axis=0)
if verbose:
print("weights_avg: ", weights_avg)
print("weights_std: ", weights_std)
print("predictions after trainng: ", predictions)
mu, std = np.mean(predictions), np.std(predictions)
print(f"mu = {mu}, std = {std}, relative std = {std/mu}")
return predictions, weights_avg, weights_std
def applyLinearRegression(chans, scales, verbose=False):
predictions = fitFunction(chans, scales)
if verbose:
mu, std = np.mean(predictions), np.std(predictions)
print(f"mu = {mu}, std = {std}, relative std = {std/mu}")
return predictions
def Evaluate(run, model=None, scales=None, mipcalibs=None, verbose=False):
fname = f"root/Run{run}_list.root"
if not os.path.exists(fname):
print(f"File {fname} does not exist")
return
print("Evaluating Run ", run)
f = ROOT.TFile(fname)
t = f.Get("save")
nentries = t.GetEntries()
print(f"Number of entries: {nentries}")
if nentries == 0:
f.Close()
print("No entries in the file ", fname)
return
chans = np.zeros((nentries, 4, 4, 1), dtype=np.float32)
for i in range(nentries):
t.GetEntry(i)
for j in range(16):
x, y = getChannelMap(j)
chans[i][x][y][0] = t.ch_lg[j]
if not os.path.exists("calibrated"):
os.makedirs("calibrated")
ofile = ROOT.TFile(f"calibrated/Run{run}_list.root", "RECREATE")
if model:
print("Applying CNN Regression")
predictions, weights_avg, weights_std = applyCNNRegression(
model, chans, run, verbose)
hweights = ROOT.TH2F("hweights", "Weights", 4, -0.5, 3.5, 4, -0.5, 3.5)
for i in range(4):
for j in range(4):
hweights.SetBinContent(i+1, j+1, weights_avg[i][j])
hweights.SetBinError(i+1, j+1, weights_std[i][j])
hweights.Write()
hcal = ROOT.TH1F("hcal", "Calibrated Energy with CNN", 4000, 0, 8000)
hcal.FillN(nentries, predictions, np.ones(nentries))
hcal.Write()
chans = chans.reshape(nentries, 16)
if type(scales) is np.ndarray:
print("Applying Linear Regression")
predictions_linear = applyLinearRegression(chans, scales, verbose)
hcal_linear = ROOT.TH1F(
"hcal_linear", "Calibrated Energy with Linear Regression", 4000, 0, 8000)
hcal_linear.FillN(nentries, predictions_linear, np.ones(nentries))
hcal_linear.Write()
if type(mipcalibs) is np.ndarray:
print("Applying MIP Calibration")
predictions_mip = applyLinearRegression(chans, mipcalibs, verbose)
hcal_mip = ROOT.TH1F(
"hcal_mip", "Calibrated Energy with MIP Calibration", 4000, 0, 8000)
hcal_mip.FillN(nentries, predictions_mip, np.ones(nentries))
hcal_mip.Write()
print("Simple sum (uncalibrated) of all channels")
predictions_unc = fitFunction(chans, np.ones(17))
hcal_unc = ROOT.TH1F("hcal_unc", "Uncalibrated Energy", 4000, 0, 8000)
hcal_unc.FillN(nentries, predictions_unc, np.ones(nentries))
hcal_unc.Write()
ofile.Close()
if __name__ == "__main__":
# model = loadCNNModel("results/best_model.keras")
# scales = loadResults("results/results_withattu.json")
# scales = np.array(scales)
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-m", "--file_mipcalib", type=str,
default="", help="MIP calibration file")
parser.add_argument("-l", "--file_linear", type=str,
default="", help="Linear regression file")
parser.add_argument("-c", "--file_cnn", type=str,
default="", help="CNN model file")
parser.add_argument("-s", "--start", type=int,
default=369, help="Run number to start")
parser.add_argument("-e", "--end", type=int,
default=695, help="Run number to end")
args, unknown = parser.parse_known_args()
run_start, run_end = args.start, args.end
print(f"Selecting runs from {run_start} to {run_end}")
mipcalibs = None
if args.file_mipcalib != "":
fname = args.file_mipcalib
print(f"Using MIP calibration file: {fname}")
mipcalibs = loadResults(fname)
# linear regression has a bias term but not used.
# add a dummy value for the bias term, only place holder, not used
mipcalibs.append(1.0)
mipcalibs = np.array(mipcalibs)
linearcalibs = None
if args.file_linear != "":
fname = args.file_linear
print(f"Using Linear regression file: {fname}")
linearcalibs = loadResults(fname)
linearcalibs = np.array(linearcalibs)
cnnmodel = None
if args.file_cnn != "":
fname = args.file_cnn
print(f"Using CNN model file: {fname}")
cnnmodel = loadCNNModel(fname)
for i in range(run_start, run_end+1):
Evaluate(i, cnnmodel, linearcalibs, mipcalibs)