Skip to content

Commit

Permalink
fix conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
ieivanov committed Apr 20, 2022
2 parents f684030 + 4796ea9 commit c6a50a9
Show file tree
Hide file tree
Showing 2 changed files with 421 additions and 34 deletions.
256 changes: 222 additions & 34 deletions recOrder/calib/Calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from recOrder.calib.Optimization import BrentOptimizer, MinScalarOptimizer
from mpl_toolkits.axes_grid1.axes_divider import make_axes_locatable
from scipy.interpolate import interp1d
from scipy.optimize import least_squares
import json
import os
import logging
Expand Down Expand Up @@ -43,8 +44,9 @@ def __init__(self, mmc, mm, group='Channel', optimization='min_scalar', mode='re

#Set Mode
self.mode = mode
self.LC_DAC_conversion = 4 # convert between the input range of LCs (0-20V) and the output range of the DAC (0-5V)
dir_path = os.path.dirname(os.path.realpath(__file__))
self.curves = CalibrationCurves(os.path.join(dir_path, './Meadowlark_Curves.npy')) if self.mode != 'retardance' else None
self.curves = CalibrationData(os.path.join(dir_path, './calibration_data.csv')) if self.mode != 'retardance' else None

# Optimizer
if optimization == 'min_scalar':
Expand Down Expand Up @@ -107,19 +109,43 @@ def set_wavelength(self, wavelength):
self.curves.set_wavelength(wavelength)

def set_lc(self, val, device_property):
"""
Parameters
----------
val : float
Retardance in waves or voltage in volts, depending on the LC control mode
device_property
Returns
-------
"""

if self.mode == 'retardance':
set_lc_waves(self.mmc, val, self.PROPERTIES[device_property])
else:
volt = self.curves.get_voltage(val)/4000
set_lc_volts(self.mmc, volt, self.PROPERTIES[f'{device_property}-DAC'])
volts = self.curves.get_voltage(val)
dac_volts = volts / self.LC_DAC_conversion
set_lc_volts(self.mmc, dac_volts, self.PROPERTIES[f'{device_property}-DAC'])

def get_lc(self, device_property):
"""
Parameters
----------
device_property
Returns
-------
LC retardance in waves or voltage in volts, depending on the control mode
"""

if self.mode == 'retardance':
return get_lc_waves(self.mmc, self.PROPERTIES[device_property])
else:
volts = get_lc_volts(self.mmc, self.PROPERTIES[f'{device_property}-DAC'])*4000
dac_volts = get_lc_volts(self.mmc, self.PROPERTIES[f'{device_property}-DAC'])
volts = dac_volts * self.LC_DAC_conversion
return self.curves.get_retardance(volts)

def define_lc_state(self, state, lca, lcb):
Expand Down Expand Up @@ -910,22 +936,121 @@ def capture_bg(self, n_avg, directory):
return np.asarray(imgs)


class CalibrationCurves:
class CalibrationData:
"""
Interpolates LC calibration data between retardance (in waves), voltage (in mV), and wavelength (in nm)
"""

def __init__(self, path, wavelength = None):
def __init__(self, path, wavelength=532, interp_method='linear'):
"""
self.raw_curves = np.load(path)
Parameters
----------
path : str
path to .csv calibration data file
wavelength : int
usage wavelength, in nanometers
interp_method : str
interpolation method, either "linear" or "schnoor_fit"
"""

# 0V to 20V step size 1 mV
self.x_range = np.arange(0, 20000, 1)
header, raw_data = self.read_data(path)
calib_wavelengths = np.array([i[:3] for i in header[1::3]]).astype('double')

# interpolate curves
self.spline490 = interp1d(self.raw_curves[0, 0], self.raw_curves[0, 1])
self.spline546 = interp1d(self.raw_curves[1, 0], self.raw_curves[1, 1])
self.spline630 = interp1d(self.raw_curves[2, 0], self.raw_curves[2, 1])
self.wavelength = None
self.set_wavelength(wavelength)

if interp_method == 'linear':
self.interp_method = interp_method
self.interpolate_data(raw_data, calib_wavelengths)
elif interp_method == 'schnoor_fit':
self.interp_method = interp_method
self.fit_params = self.fit_data(raw_data, calib_wavelengths)
else:
raise ValueError('Unknown interpolation method.')

self.wavelength = wavelength
self.curve = None
@staticmethod
def read_data(path):
"""
Read raw calibration data
The first row of the CSV file is a header row, structured as [Voltage (mV), XXX-A, XXX-B,
Voltage (nm), XXX-A, XXX-B, ...] where XXX is the calibration wavelength in nanometers. For example 532-A would
contain measurements of the retardance of LCA as a function of applied voltage at 532 nm. Retardance is
recorded in nanometers and voltage is recorded in millivolts. Measurements are repeated for a number
of wavelengths in subsequent columns.
Parameters
----------
path : str
path to .csv calibration data file
Returns
-------
header : list
Calibration data file header line. Contains information on calibration wavelength
raw_data : ndarray
Calibration data. Voltage is in millivolts and retardance is in nanometers
"""
with open(path, 'r') as f:
header = f.readline().strip().split(',')

raw_data = np.loadtxt(path, delimiter=',', skiprows=1)
return header, raw_data

@staticmethod
def schnoor_fit(V, a, b1, b2, c, d, e, wavelength):
"""
Parameters
----------
V : float
Voltage in volts
a, b1, b2, c, d, e : float
Fit parameters
wavelength : float
Wavelength in nanometers
Returns
-------
retardance : float
Retardance in nanometers
"""
retardance = a + (b1 + b2 / wavelength ** 2) / (1 + (V / c) ** d) ** e

return retardance

@staticmethod
def schnoor_fit_inv(retardance, a, b1, b2, c, d, e, wavelength):
"""
Parameters
----------
retardance : float
Retardance in nanometers
a, b1, b2, c, d, e : float
Fit parameters
wavelength : float
Wavelength in nanometers
Returns
-------
voltage : float
Voltage in volts
"""

voltage = c * (((b1 + b2 / wavelength ** 2) / (retardance - a)) ** (1 / e) - 1) ** (1 / d)

return voltage

@staticmethod
def _fun(x, wavelengths, xdata, ydata):
fval = CalibrationData.schnoor_fit(xdata, *x, wavelengths)
res = ydata - fval
return res.flatten()

def set_wavelength(self, wavelength):
self.wavelength = wavelength
Expand All @@ -936,9 +1061,29 @@ def set_wavelength(self, wavelength):
if self.wavelength > 720:
self.wavelength = 720

self.create_wavelength_curve()
def fit_data(self, raw_data, calib_wavelengths):
# TODO: add checks that the fit has worked properly
# TODO: check that the fit works when only a single wavelength is provided
# TODO: implement boundaries on wavelength extrapolation based on wavelengths used in calibration data

xdata = raw_data[:, 0::3] / 1000 # convert to volts
ydata = raw_data[:, 1::3] # in nanometers

x0 = [10, 1000, 1e7, 1, 10, 0.1]
p = least_squares(self._fun, x0, method='trf', args=(calib_wavelengths, xdata, ydata),
bounds=((-np.inf, 0, 0, 0, 0, 0), (np.inf,)*6),
x_scale=[10, 1000, 1e7, 1, 10, 0.1])

return p.x

def interpolate_data(self, raw_data, calib_wavelengths):
# 0V to 20V step size 1 mV
self.x_range = np.arange(0, np.max(raw_data[:, ::3]), 1)

def create_wavelength_curve(self):
# interpolate curves - only LCA data is used
self.spline490 = interp1d(raw_data[:, 0], raw_data[:, 1])
self.spline546 = interp1d(raw_data[:, 3], raw_data[:, 4])
self.spline630 = interp1d(raw_data[:, 5], raw_data[:, 7])

if self.wavelength < 490:
new_a1_y = np.interp(self.x_range, self.x_range, self.spline490(self.x_range))
Expand All @@ -950,8 +1095,8 @@ def create_wavelength_curve(self):

temp_curve = np.asarray([[i, 2 * new_a1_y[i] - (fact1 * new_a1_y[i] + fact2 * new_a2_y[i])]
for i in range(len(new_a1_y))])
self.spline = interp1d(temp_curve[:, 0], temp_curve[:, 1])
self.curve = self.spline(self.x_range)
self.voltage_to_retardance = interp1d(temp_curve[:, 0], temp_curve[:, 1])
self.curve = self.voltage_to_retardance(self.x_range)

elif self.wavelength > 630:

Expand All @@ -964,8 +1109,8 @@ def create_wavelength_curve(self):

temp_curve = np.asarray([[i, 2 * new_a1_y[i] - (fact1 * new_a1_y[i] + fact2 * new_a2_y[i])]
for i in range(len(new_a1_y))])
self.spline = interp1d(temp_curve[:, 0], temp_curve[:, 1])
self.curve = self.spline(self.x_range)
self.voltage_to_retardance = interp1d(temp_curve[:, 0], temp_curve[:, 1])
self.curve = self.voltage_to_retardance(self.x_range)


elif 490 < self.wavelength < 546:
Expand All @@ -977,8 +1122,8 @@ def create_wavelength_curve(self):
fact2 = np.abs(546 - self.wavelength) / (546 - 490)

temp_curve = np.asarray([[i, fact1 * new_a1_y[i] + fact2 * new_a2_y[i]] for i in range(len(new_a1_y))])
self.spline = interp1d(temp_curve[:, 0], temp_curve[:, 1])
self.curve = self.spline(self.x_range)
self.voltage_to_retardance = interp1d(temp_curve[:, 0], temp_curve[:, 1])
self.curve = self.voltage_to_retardance(self.x_range)

elif 546 < self.wavelength < 630:

Expand All @@ -989,32 +1134,75 @@ def create_wavelength_curve(self):
fact2 = np.abs(630 - self.wavelength) / (630 - 546)

temp_curve = np.asarray([[i, fact1 * new_a1_y[i] + fact2 * new_a2_y[i]] for i in range(len(new_a1_y))])
self.spline = interp1d(temp_curve[:, 0], temp_curve[:, 1])
self.curve = self.spline(self.x_range)
self.voltage_to_retardance = interp1d(temp_curve[:, 0], temp_curve[:, 1])
self.curve = self.voltage_to_retardance(self.x_range)

elif self.wavelength == 490:
self.curve = self.spline490(self.x_range)
self.spline = self.spline490
self.voltage_to_retardance = self.spline490

elif self.wavelength == 546:
self.curve = self.spline546(self.x_range)
self.spline = self.spline546
self.voltage_to_retardance = self.spline546

elif self.wavelength == 630:
self.curve = self.spline630(self.x_range)
self.spline = self.spline630
self.voltage_to_retardance = self.spline630

else:
raise ValueError(f'Wavelength {self.wavelength} not understood')

def get_voltage(self, retardance):
"""
Parameters
----------
retardance : float, list, or ndarray
retardance in waves
ret_abs = retardance*self.wavelength
Returns
-------
voltage
voltage in volts
"""

retardance = np.asarray(retardance, dtype='double')
voltage = None
ret_nanometers = retardance*self.wavelength

if self.interp_method == 'linear':
if ret_nanometers.ndim > 0:
ret_nanometers = ret_nanometers[:,np.newaxis]
voltage = np.abs(self.curve - ret_nanometers).argmin(axis=1) / 1000
else:
voltage = np.abs(self.curve - ret_nanometers).argmin() / 1000
elif self.interp_method == 'schnoor_fit':
voltage = self.schnoor_fit_inv(ret_nanometers, *self.fit_params, self.wavelength)

# Since x-step is 1mV starting at 0, returned index = voltage (mV)
index = np.abs(self.curve - ret_abs).argmin()
return voltage

def get_retardance(self, volts):
"""
Parameters
----------
volts : float, list, or ndarray
voltage in volts
Returns
-------
retardance
retardance in waves
"""

return index
volts = np.asarray(volts, dtype='double')
ret_nanometers = None
if self.interp_method == 'linear':
ret_nanometers = self.voltage_to_retardance(volts*1000)
elif self.interp_method == 'schnoor_fit':
ret_nanometers = self.schnoor_fit(volts, *self.fit_params, self.wavelength)
retardance = ret_nanometers / self.wavelength

def get_retardance(self, volt):
return self.spline(volt) / self.wavelength
return retardance
Loading

0 comments on commit c6a50a9

Please sign in to comment.