diff --git a/pypulseq/make_extended_trapezoid_area.py b/pypulseq/make_extended_trapezoid_area.py index 7537578..cd42676 100644 --- a/pypulseq/make_extended_trapezoid_area.py +++ b/pypulseq/make_extended_trapezoid_area.py @@ -1,9 +1,7 @@ -import math from types import SimpleNamespace -from typing import Tuple +from typing import Tuple, Union import numpy as np -from scipy.optimize import minimize from pypulseq.make_extended_trapezoid import make_extended_trapezoid from pypulseq.opts import Opts @@ -11,23 +9,25 @@ def make_extended_trapezoid_area( - area: float, channel: str, grad_end: float, grad_start: float, system: Opts + area: float, + channel: str, + grad_start: float, + grad_end: float, + system: Union[Opts, None] = None, ) -> Tuple[SimpleNamespace, np.array, np.array]: - """ - Makes the shortest possible extended trapezoid with a given area which starts and ends (optionally) as non-zero - gradient values. + """Make the shortest possible extended trapezoid for given area and gradient start and end point. Parameters ---------- + area : float + Area of extended trapezoid. channel : str Orientation of extended trapezoidal gradient event. Must be one of 'x', 'y' or 'z'. grad_start : float Starting non-zero gradient value. grad_end : float Ending non-zero gradient value. - area : float - Area of extended trapezoid. - system: Opts + system: Opts, optional System limits. Returns @@ -35,100 +35,196 @@ def make_extended_trapezoid_area( grad : SimpleNamespace Extended trapezoid event. times : numpy.ndarray + Time points of the extended trapezoid. amplitude : numpy.ndarray - """ - SR = system.max_slew * 0.99 + Amplitude values of the extended trapezoid. - Tp = 0 - obj1 = ( - lambda x: ( - area - __testGA(x, 0, SR, system.grad_raster_time, grad_start, grad_end) - ) - ** 2 - ) - arr_res = [ - minimize(fun=obj1, x0=-system.max_grad, method="Nelder-Mead"), - minimize(fun=obj1, x0=0, method="Nelder-Mead"), - minimize(fun=obj1, x0=system.max_grad, method="Nelder-Mead"), - ] - arr_res = np.array([(*res.x, res.fun) for res in arr_res]) - Gp, obj1val = arr_res[:, 0], arr_res[:, 1] - i_min = np.argmin(obj1val) - Gp = Gp[i_min] - obj1val = obj1val[i_min] - - if obj1val > 1e-3 or abs(Gp) > system.max_grad: # Search did not converge - Gp = math.copysign(system.max_grad, Gp) - obj2 = ( - lambda x: ( - area - - __testGA(Gp, x, SR, system.grad_raster_time, grad_start, grad_end) - ) - ** 2 + Raises + ------ + ValueError if no solution was found that satisfies the constraints and the desired area. + """ + if not system: + system = Opts() + + max_slew = system.max_slew * 0.99 + max_grad = system.max_grad * 0.99 + raster_time = system.grad_raster_time + + def _to_raster(time: float) -> float: + return np.ceil(time / raster_time) * raster_time + + def _calc_ramp_time(grad_1: float, grad_2: float) -> float: + return _to_raster(abs(grad_1 - grad_2) / max_slew) + + def _find_solution(duration: int) -> Union[None, Tuple[int, int, int, float]]: + """Find extended trapezoid gradient waveform for given duration. + + The function performs a grid search over all possible ramp-up, ramp-down and flat times + for the given duration and returns the solution with the lowest slew rate. + + Parameters + ---------- + duration + duration of the gradient in integer multiples of raster_time + + Returns + ------- + Tuple of ramp-up time, flat time, ramp-down time, gradient amplitude or None if no solution was found + """ + + # Determine timings to check for possible solutions + ramp_up_times = [] + ramp_down_times = [] + + # First, consider solutions that use maximum slew rate: + # Analytically calculate calculate the point where: + # grad_start + ramp_up_time * max_slew == grad_end + ramp_down_time * max_slew + ramp_up_time = (duration * max_slew * raster_time - grad_start + grad_end) / ( + 2*max_slew*raster_time + ) + ramp_up_time = round(ramp_up_time) + + # Check if gradient amplitude exceeds max_grad, if so, adjust ramp + # times for a trapezoidal gradient with maximum slew rate. + if grad_start + ramp_up_time * max_slew * raster_time > max_grad: + ramp_up_time = round(_calc_ramp_time(grad_start, max_grad) / raster_time) + ramp_down_time = round(_calc_ramp_time(grad_end, max_grad) / raster_time) + else: + ramp_down_time = duration - ramp_up_time + + # Add possible solution if timing is valid + if (ramp_up_time > 0 + and ramp_down_time > 0 + and ramp_up_time + ramp_down_time <= duration): + ramp_up_times.append(ramp_up_time) + ramp_down_times.append(ramp_down_time) + + # Analytically calculate calculate the point where: + # grad_start - ramp_up_time * max_slew == grad_end - ramp_down_time * max_slew + ramp_up_time = (duration * max_slew * raster_time + grad_start - grad_end) / ( + 2*max_slew*raster_time + ) + ramp_up_time = round(ramp_up_time) + + # Check if gradient amplitude exceeds -max_grad, if so, adjust ramp + # times for a trapezoidal gradient with maximum slew rate. + if grad_start - ramp_up_time * max_slew * raster_time < -max_grad: + ramp_up_time = round(_calc_ramp_time(grad_start, -max_grad) / raster_time) + ramp_down_time = round(_calc_ramp_time(grad_end, -max_grad) / raster_time) + else: + ramp_down_time = duration - ramp_up_time + + # Add possible solution if timing is valid + if (ramp_up_time > 0 + and ramp_down_time > 0 + and ramp_up_time + ramp_down_time <= duration): + ramp_up_times.append(ramp_up_time) + ramp_down_times.append(ramp_down_time) + + # Second, try any solution with flat_time == 0 + # This appears to be necessary for many cases, but going through all + # timings here is probably too conservative still. + for ramp_up_time in range(1, duration): + ramp_up_times.append(ramp_up_time) + ramp_down_times.append(duration - ramp_up_time) + + time_ramp_up = np.array(ramp_up_times) + time_ramp_down = np.array(ramp_down_times) + + # Calculate corresponding flat times + flat_time = duration - time_ramp_up - time_ramp_down + + # Filter search space for valid timings (flat time >= 0) + valid_indices = flat_time >= 0 + time_ramp_up = time_ramp_up[valid_indices] + time_ramp_down = time_ramp_down[valid_indices] + flat_time = flat_time[valid_indices] + + # Calculate gradient strength for given timing using analytical solution + grad_amp = -(time_ramp_up * raster_time * grad_start + time_ramp_down * raster_time * grad_end - 2 * area) / ( + (time_ramp_up + 2 * flat_time + time_ramp_down) * raster_time ) - res2 = minimize(fun=obj2, x0=0, method="Nelder-Mead") - T, obj2val = *res2.x, res2.fun - assert obj2val < 1e-2 - Tp = math.ceil(T / system.grad_raster_time) * system.grad_raster_time + # Calculate slew rates for given timings + slew_rate1 = abs(grad_start - grad_amp) / (time_ramp_up * raster_time) + slew_rate2 = abs(grad_end - grad_amp) / (time_ramp_down * raster_time) - # Fix the ramps - Tru = ( - math.ceil(abs(Gp - grad_start) / SR / system.grad_raster_time) - * system.grad_raster_time - ) - Trd = ( - math.ceil(abs(Gp - grad_end) / SR / system.grad_raster_time) - * system.grad_raster_time + # Filter solutions that satisfy max_grad and max_slew constraints + valid_indices = ( + (abs(grad_amp) <= max_grad + 1e-8) & (slew_rate1 <= max_slew + 1e-8) & (slew_rate2 <= max_slew + 1e-8) ) - obj3 = lambda x: (area - __testGA1(x, Tru, Tp, Trd, grad_start, grad_end)) ** 2 + solutions = np.flatnonzero(valid_indices) + + # Check if any valid solutions were found + if solutions.shape[0] == 0: + return None + + # Find solution with lowest slew rate and return it + ind = np.argmin(slew_rate1[valid_indices] + slew_rate2[valid_indices]) + ind = solutions[ind] + return (int(time_ramp_up[ind]), int(flat_time[ind]), int(time_ramp_down[ind]), float(grad_amp[ind])) + + # Perform a linear search + # This is necessary because there can exist a dead space where solutions + # do not exist for some durations longer than the optimal duration. The + # binary search below fails to find the optimum in those cases. + # TODO: Check if range is sufficient, try to calculate the dead space. + min_duration = max(round(_calc_ramp_time(grad_end, grad_start) / raster_time), 2) + + # Calculate duration needed to ramp down gradient to zero. + # From this point onwards, solutions can always be found by extending + # the duration and doing a binary search. + max_duration = max( + round(_calc_ramp_time(0, grad_start) / raster_time), + round(_calc_ramp_time(0, grad_end) / raster_time), + min_duration, + ) + + # Linear search + solution = None + for duration in range(min_duration, max_duration + 1): + solution = _find_solution(duration) + if solution: + break + + # Perform a binary search for duration > max_duration if no solution was found + if not solution: + # First, find the upper limit on duration where a solution exists by + # exponentially expanding the duration. + while not solution: + max_duration *= 2 + solution = _find_solution(max_duration) + + def binary_search(fun, lower_limit, upper_limit): + if lower_limit == upper_limit - 1: + return fun(upper_limit) + + test_value = (upper_limit + lower_limit) // 2 + + if fun(test_value): + return binary_search(fun, lower_limit, test_value) + else: + return binary_search(fun, test_value, upper_limit) - res = minimize(fun=obj3, x0=Gp, method="Nelder-Mead") - Gp, obj3val = *res.x, res.fun - assert obj3val < 1e-3 # Did the final search converge? + solution = binary_search(_find_solution, max_duration // 2, max_duration) - assert Tp >= 0 + # Get timing and gradient amplitude from solution + time_ramp_up = solution[0] * raster_time + flat_time = solution[1] * raster_time + time_ramp_down = solution[2] * raster_time + grad_amp = solution[3] - if Tp > 0: - times = cumsum(0, Tru, Tp, Trd) - amplitudes = [grad_start, Gp, Gp, grad_end] + # Create extended trapezoid + if flat_time > 0: + times = cumsum(0, time_ramp_up, flat_time, time_ramp_down) + amplitudes = np.array([grad_start, grad_amp, grad_amp, grad_end]) else: - Tru = ( - math.ceil(abs(Gp - grad_start) / SR / system.grad_raster_time) - * system.grad_raster_time - ) - Trd = ( - math.ceil(abs(Gp - grad_end) / SR / system.grad_raster_time) - * system.grad_raster_time - ) - - if Trd > 0: - if Tru > 0: - times = cumsum(0, Tru, Trd) - amplitudes = np.array([grad_start, Gp, grad_end]) - else: - times = cumsum(0, Trd) - amplitudes = np.array([grad_start, grad_end]) - else: - times = cumsum(0, Tru) - amplitudes = np.array([grad_start, grad_end]) + times = cumsum(0, time_ramp_up, time_ramp_down) + amplitudes = np.array([grad_start, grad_amp, grad_end]) - grad = make_extended_trapezoid( - channel=channel, system=system, times=times, amplitudes=amplitudes - ) - grad.area = __testGA1(Gp, Tru, Tp, Trd, grad_start, grad_end) + grad = make_extended_trapezoid(channel=channel, system=system, times=times, amplitudes=amplitudes) - assert abs(grad.area - area) < 1e-3 + if not abs(grad.area - area) < 1e-8: + raise ValueError(f"Could not find a solution for area={area}.") return grad, np.array(times), amplitudes - - -def __testGA(Gp, Tp, SR, dT, Gs, Ge): - Tru = math.ceil(abs(Gp - Gs) / SR / dT) * dT - Trd = math.ceil(abs(Gp - Ge) / SR / dT) * dT - ga = __testGA1(Gp, Tru, Tp, Trd, Gs, Ge) - return ga - - -def __testGA1(Gp, Tru, Tp, Trd, Gs, Ge): - return 0.5 * Tru * (Gp + Gs) + Gp * Tp + 0.5 * (Gp + Ge) * Trd diff --git a/pypulseq/tests/test_make_extended_trapezoid_area.py b/pypulseq/tests/test_make_extended_trapezoid_area.py new file mode 100644 index 0000000..bb04fbe --- /dev/null +++ b/pypulseq/tests/test_make_extended_trapezoid_area.py @@ -0,0 +1,137 @@ +import pytest +import numpy as np +import random +import math + +from pypulseq import Opts, make_extended_trapezoid_area, make_extended_trapezoid, calc_duration +from pypulseq.utils.cumsum import cumsum + +system = Opts() + +test_zoo = [ + (0, 0, 1), + (0, 0, 10), + (0, 0, 100), + (0, 0, 10000), + (0, 1000, 100), + (-1000, 1000, 100), + (-1000, 0, 100), + (0, 0, -1), + (0, 0, -10), + (0, 0, -100), + (0, 0, -10000), + (0, 1000, -100), + (-1000, 1000, -100), + (-1000, 0, -100), + (0, system.max_grad*0.99, 10000), + (0, system.max_grad*0.99, -10000), + (0, -system.max_grad*0.99, 1000), + (0, -system.max_grad*0.99, -1000), + (system.max_grad*0.99, 0, 100), + (system.max_grad*0.99, 0, -100), + (-system.max_grad*0.99, 0, 1), + (-system.max_grad*0.99, 0, -1), + (0, 100000, 1), + (0, 100000, -1), + (0, -100000, 1), + (0, -100000, -1), + (0, 90000, 0.45), + (0, 90000, -0.45), + (0, -90000, 0.45), + (0, -90000, -0.45), + (0, 10000, 0.5 * (10000)**2 / (system.max_slew*0.99)), + (0, system.max_grad*0.99, 0.5 * (system.max_grad*0.99)**2 / (system.max_slew*0.99)), + (system.max_grad*0.99, system.max_grad*0.99, 1), + (system.max_grad*0.99, system.max_grad*0.99, -1), + ] + + +@pytest.mark.parametrize("grad_start,grad_end,area", test_zoo) +def test_make_extended_trapezoid_area(grad_start, grad_end, area): + g,_,_ = make_extended_trapezoid_area(channel='x', grad_start=grad_start, grad_end=grad_end, area=area, system=system) + + grad_ok = all(abs(g.waveform) <= system.max_grad) + slew_ok = all(abs(np.diff(g.waveform) / np.diff(g.tt)) <= system.max_slew) + + assert pytest.approx(g.area) == area, 'Result area is not correct' + assert grad_ok, 'Maximum gradient strength violated' + assert slew_ok, 'Maximum slew rate violated' + + +random.seed(0) +test_zoo_random = [((random.random()-0.5) * 2 * system.max_grad * 0.99, + (random.random()-0.5) * 2 * system.max_grad * 0.99, + (random.random()-0.5) * 10000) for _ in range(100)] + +@pytest.mark.parametrize("grad_start,grad_end,area", test_zoo_random) +def test_make_extended_trapezoid_area_random_cases(grad_start, grad_end, area): + g,_,_ = make_extended_trapezoid_area(channel='x', grad_start=grad_start, grad_end=grad_end, area=area, system=system) + + grad_ok = all(abs(g.waveform) <= system.max_grad) + slew_ok = all(abs(np.diff(g.waveform) / np.diff(g.tt)) <= system.max_slew) + + assert pytest.approx(g.area) == area, 'Result area is not correct' + assert grad_ok, 'Maximum gradient strength violated' + assert slew_ok, 'Maximum slew rate violated' + + +random.seed(0) +test_zoo_random = [((random.random()-0.5) * 3 * system.max_grad, + (random.random()-0.5) * 3 * system.max_grad, + (random.random()-0.5) * 10 * system.max_grad) for _ in range(100)] + +@pytest.mark.parametrize("grad_start,grad_end,grad_amp", test_zoo_random) +def test_make_extended_trapezoid_area_recreate(grad_start, grad_end, grad_amp): + def _to_raster(time: float) -> float: + return np.ceil(time / system.grad_raster_time) * system.grad_raster_time + + def _calc_ramp_time(grad_amp, slew_rate, grad_start): + return _to_raster(abs(grad_amp - grad_start) / slew_rate) + + grad_start = np.clip(grad_start, -system.max_grad, system.max_grad) + grad_end = np.clip(grad_end, -system.max_grad, system.max_grad) + + # Construct extended gradient based on start, intermediate, and end gradient + # strengths, assuming maximum slew rates. + + # If grad_amp > max_grad, convert to max_grad, and keep total area + # approximately equal + if abs(grad_amp) > system.max_grad*0.99: + grad_amp_new = math.copysign(system.max_grad*0.99, grad_amp) + + flat_time = _to_raster(_calc_ramp_time(grad_amp_new, system.max_slew*0.99, grad_amp) * abs(grad_amp_new + grad_amp) / abs(grad_amp_new)) + grad_amp = grad_amp_new + else: + flat_time = 0 + + # Construct extended gradient + if flat_time == 0: + times = cumsum(0, + _calc_ramp_time(grad_amp, system.max_slew*0.99, grad_start), + _calc_ramp_time(grad_amp, system.max_slew*0.99, grad_end), + ) + amplitudes = (grad_start, grad_amp, grad_end) + else: + times = cumsum(0, + _calc_ramp_time(grad_amp, system.max_slew*0.99, grad_start), + _to_raster(flat_time), + _calc_ramp_time(grad_amp, system.max_slew*0.99, grad_end), + ) + amplitudes = (grad_start, grad_amp, grad_amp, grad_end) + + g_true = make_extended_trapezoid(channel='x', amplitudes=amplitudes, times=times) + + # Recreate gradient using make_extended_trapezoid_area + g,_,_ = make_extended_trapezoid_area(channel='x', grad_start=grad_start, grad_end=grad_end, area=g_true.area, system=system) + grad_ok = all(abs(g.waveform) <= system.max_grad) + slew_ok = all(abs(np.diff(g.waveform) / np.diff(g.tt)) <= system.max_slew) + + assert pytest.approx(g.area) == g_true.area, 'Result area is not correct' + assert grad_ok, 'Maximum gradient strength violated' + assert slew_ok, 'Maximum slew rate violated' + + # Check that new gradient duration is smaller or equal to original gradient duration + d1 = calc_duration(g) + d2 = calc_duration(g_true) + + assert pytest.approx(d1) == d2 or d1 < d2