-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
a64e847
commit c3337ec
Showing
5 changed files
with
316 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
#!/usr/bin/env python | ||
|
||
import rospy | ||
import numpy as np | ||
from nav_msgs.msg import Odometry | ||
from threading import Lock | ||
from scipy.optimize import curve_fit | ||
|
||
|
||
def sine_function(t, A, omega, phi, offset): | ||
return A * np.sin(omega * t + phi) + offset | ||
|
||
|
||
class LatencyEstimator: | ||
def __init__(self): | ||
# Parameters | ||
self.command_amplitude = 1.0 # Amplitude of the input sine wave command | ||
self.command_frequency = 0.2 # Frequency of the input sine wave command (Hz) | ||
self.buffer_size = 1000 # Number of data points to keep for analysis | ||
self.data_lock = Lock() | ||
|
||
# Data buffers | ||
self.times = [] | ||
self.velocities = [] | ||
|
||
# Subscriber | ||
self.odom_sub = rospy.Subscriber('/odom', Odometry, self.odom_callback) | ||
|
||
# Timer to perform latency estimation periodically | ||
self.timer_interval = 1.0 # Perform estimation every 1 second | ||
self.estimation_timer = rospy.Timer(rospy.Duration(self.timer_interval), self.perform_estimation) | ||
|
||
def odom_callback(self, msg): | ||
with self.data_lock: | ||
# Get the current time and linear x velocity | ||
current_time = msg.header.stamp.to_sec() | ||
linear_x = msg.twist.twist.linear.x | ||
|
||
# Append to buffers | ||
self.times.append(current_time) | ||
self.velocities.append(linear_x) | ||
|
||
# Ensure buffer does not exceed buffer_size | ||
if len(self.times) > self.buffer_size: | ||
self.times.pop(0) | ||
self.velocities.pop(0) | ||
|
||
def perform_estimation(self, event): | ||
with self.data_lock: | ||
if len(self.times) < 10: | ||
# Not enough data to perform estimation | ||
rospy.loginfo("Waiting for more data to perform latency estimation...") | ||
return | ||
|
||
# Convert lists to NumPy arrays | ||
times_array = np.array(self.times) | ||
velocities_array = np.array(self.velocities) | ||
|
||
# Normalize time to start from zero | ||
times_array -= times_array[0] | ||
|
||
# Perform latency estimation | ||
latency, phase_difference, fitted_params = self.fit_sine_wave( | ||
times_array, | ||
velocities_array, | ||
self.command_frequency, | ||
self.command_amplitude | ||
) | ||
|
||
if latency is not None: | ||
rospy.loginfo(f"Estimated Latency: {latency:.3f} seconds") | ||
# rospy.loginfo(f"Phase Difference: {phase_difference:.3f} radians") | ||
# rospy.loginfo(f"Fitted Parameters: {fitted_params}") | ||
else: | ||
rospy.loginfo("Could not estimate latency due to fitting error.") | ||
|
||
@staticmethod | ||
def fit_sine_wave(time, measured_velocity, command_frequency, command_amplitude): | ||
""" | ||
Fits a sine wave to the measured velocity data and calculates the phase difference | ||
and system latency compared to the input sine wave command. | ||
Parameters: | ||
- time: array_like, time stamps of the measurements. | ||
- measured_velocity: array_like, measured linear velocities from odometry. | ||
- command_frequency: float, frequency of the input sine wave command (Hz). | ||
- command_amplitude: float, amplitude of the input sine wave command (m/s). | ||
Returns: | ||
- latency: float, estimated system latency in seconds. | ||
- phase_difference: float, phase difference in radians between the measured and input sine waves. | ||
- fitted_params: dict, parameters of the fitted sine wave. | ||
""" | ||
# Define the sine wave function to fit | ||
def sine_function(t, phi): | ||
return command_amplitude * np.sin(2 * np.pi * command_frequency * t + phi) | ||
|
||
# Initial guesses for the parameters | ||
# guess_amplitude = np.std(measured_velocity) * np.sqrt(2) | ||
# guess_omega = 2 * np.pi * command_frequency | ||
guess_phi = 0 | ||
# guess_offset = np.mean(measured_velocity) | ||
p0 = [guess_phi] | ||
|
||
# Fit the sine wave to the measured data | ||
try: | ||
params, _ = curve_fit(sine_function, time, measured_velocity, p0=p0) | ||
except RuntimeError as e: | ||
print("Error in curve fitting:", e) | ||
return None, None, None | ||
|
||
# Extract fitted parameters | ||
phi_fit = params[0] | ||
|
||
# Calculate the phase difference | ||
phase_difference = phi_fit | ||
# Adjust phase difference to be within [-pi, pi] | ||
phase_difference = (phase_difference + np.pi) % (2 * np.pi) - np.pi | ||
|
||
# Compute the system latency | ||
latency = -phase_difference / (2 * np.pi * command_frequency) # Note the negative sign | ||
# latency = latency % (2 * np.pi / omega_fit) # Ensure latency is positive and within one period | ||
|
||
# Prepare fitted parameters for output | ||
fitted_params = { | ||
'phase': phi_fit | ||
} | ||
|
||
return latency, phase_difference, fitted_params | ||
|
||
|
||
if __name__ == '__main__': | ||
try: | ||
rospy.init_node('latency_estimator', anonymous=True) | ||
estimator = LatencyEstimator() | ||
rospy.spin() | ||
except rospy.ROSInterruptException: | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
import rosbag | ||
import rospy | ||
import numpy as np | ||
import matplotlib.pyplot as plt | ||
from easydict import EasyDict as edict | ||
|
||
|
||
def extract_twist_velocities(rosbag_path): | ||
data = {'/ldos/cmd_vel': {}, '/spot/ldos/odometry': {}} | ||
|
||
# Open the rosbag file | ||
with rosbag.Bag(rosbag_path, 'r') as bag: | ||
for topic, msg, t in bag.read_messages(topics=['/ldos/cmd_vel', '/spot/ldos/odometry']): | ||
if topic == '/ldos/cmd_vel': | ||
x_velocity = msg.linear.x | ||
data['/ldos/cmd_vel'][msg.sys_nano_time/1e9] = x_velocity | ||
elif topic == '/spot/ldos/odometry': | ||
x_velocity = msg.twist.twist.linear.x | ||
data['/spot/ldos/odometry'][msg.sys_nano_time/1e9] = x_velocity | ||
return data | ||
|
||
|
||
if __name__ == "__main__": | ||
bagpath = "/home/dynamo/Music/analyze/1.bag" | ||
data = extract_twist_velocities(bagpath) | ||
# plot cmd_vel and odom vs time | ||
plt.figure() | ||
# plt.plot(data['/cmd_vel'].keys(), data['/cmd_vel'].values(), label='Commanded Velocity') | ||
# plot cmd_vel as just single points | ||
for key in data['/ldos/cmd_vel'].keys(): | ||
plt.plot([key, key], [0, data['/ldos/cmd_vel'][key]], color='blue') | ||
plt.plot(data['/spot/ldos/odometry'].keys(), data['/spot/ldos/odometry'].values(), label='Measured Velocity') | ||
plt.xlabel('Time [s]') | ||
plt.ylabel('Velocity [m/s]') | ||
plt.legend() | ||
plt.show() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
#!/usr/bin/env python | ||
|
||
import rospy | ||
import math | ||
from geometry_msgs.msg import Twist | ||
import roslib | ||
roslib.load_manifest('amrl_msgs') | ||
from amrl_msgs.msg import AckermannCurvatureDriveMsg | ||
from std_msgs.msg import Header | ||
from collections import deque | ||
import time | ||
|
||
|
||
def sine_wave_publisher(): | ||
rospy.init_node('sine_wave_command_publisher', anonymous=False) | ||
cmd_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10) | ||
ackermann_pub = rospy.Publisher('/ackermann_curvature_drive', AckermannCurvatureDriveMsg, queue_size=10) | ||
latency_pub = rospy.Publisher('/latency_sim', Twist, queue_size=10) | ||
|
||
rate = rospy.Rate(15) | ||
|
||
amplitude = 1.0 # Peak velocity in m/s | ||
frequency = 0.2 # Frequency in Hz (one full cycle every 5 seconds) | ||
start_time = rospy.get_time() | ||
|
||
# Initialize latency queue | ||
latency_queue = deque() | ||
latency_duration = rospy.get_param("~latency_duration", 0.24) # Default 0.24 seconds | ||
|
||
prev_latency_duration = latency_duration | ||
|
||
while not rospy.is_shutdown(): | ||
current_time = rospy.get_time() | ||
elapsed_time = current_time - start_time | ||
velocity = amplitude * math.sin(2 * math.pi * frequency * elapsed_time) | ||
|
||
# Create and publish Twist message | ||
twist_msg = Twist() | ||
twist_msg.linear.x = velocity | ||
twist_msg.linear.y = 0.0 | ||
twist_msg.linear.z = 0.0 | ||
twist_msg.angular.x = 0.0 | ||
twist_msg.angular.y = 0.0 | ||
twist_msg.angular.z = 0.0 | ||
cmd_vel_pub.publish(twist_msg) | ||
|
||
# Create and publish AckermannCurvatureDriveMsg message | ||
ackermann_msg = AckermannCurvatureDriveMsg() | ||
ackermann_msg.header = Header() | ||
ackermann_msg.header.stamp = rospy.Time.now() | ||
ackermann_msg.velocity = velocity | ||
ackermann_msg.curvature = 0.0 # Straight line motion | ||
ackermann_pub.publish(ackermann_msg) | ||
|
||
# Update latency duration in case it was changed dynamically | ||
latency_duration = rospy.get_param("~latency_duration", None) | ||
if latency_duration is not None and latency_duration != prev_latency_duration: | ||
rospy.loginfo(f"Latency duration changed to {latency_duration} seconds") | ||
latency_queue.clear() | ||
prev_latency_duration = latency_duration | ||
continue | ||
|
||
# Add current twist message to the latency queue with timestamp | ||
latency_queue.append((current_time, twist_msg)) | ||
|
||
# Check if any message in queue has surpassed the latency duration | ||
if latency_queue and (current_time - latency_queue[0][0] >= latency_duration): | ||
_, delayed_msg = latency_queue.popleft() # Retrieve the delayed message | ||
latency_pub.publish(delayed_msg) # Publish the delayed message to latency_sim | ||
|
||
rate.sleep() | ||
|
||
|
||
if __name__ == '__main__': | ||
try: | ||
sine_wave_publisher() | ||
except rospy.ROSInterruptException: | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
import numpy as np | ||
import matplotlib.pyplot as plt | ||
from scipy.signal import firwin, lfilter | ||
|
||
# Parameters | ||
command_amplitude = 1.0 | ||
command_frequency = 0.2 | ||
known_latency = 0.5 | ||
noise_std = 0.05 | ||
|
||
# Time vector | ||
t_start = 0 | ||
t_end = 20 | ||
dt = 0.01 | ||
time = np.arange(t_start, t_end, dt) | ||
N = len(time) | ||
|
||
# Input signal | ||
omega = 2 * np.pi * command_frequency | ||
cmd_vel = command_amplitude * np.sin(omega * time) | ||
|
||
# Simulated measured signal with latency and noise | ||
odom_vel = command_amplitude * np.sin(omega * (time - known_latency)) | ||
noise = np.random.normal(0, noise_std, size=odom_vel.shape) | ||
odom_vel_noisy = odom_vel + noise | ||
|
||
# Smooth the noisy signal | ||
numtaps = 101 | ||
cutoff = command_frequency * 1.5 | ||
fs = 1 / dt | ||
b = firwin(numtaps, cutoff / (0.5 * fs)) | ||
odom_vel_smooth = lfilter(b, 1.0, odom_vel_noisy) | ||
|
||
# Use FFT to calculate phase difference | ||
period = 1 / command_frequency | ||
num_periods = int(t_end / period) | ||
fft_size = int(num_periods * period / dt) | ||
cmd_segment = cmd_vel[:fft_size] | ||
odom_segment = odom_vel_smooth[:fft_size] | ||
|
||
CMD = np.fft.fft(cmd_segment) | ||
ODOM = np.fft.fft(odom_segment) | ||
freqs = np.fft.fftfreq(fft_size, d=dt) | ||
idx = np.argmin(np.abs(freqs - command_frequency)) | ||
|
||
phase_cmd = np.angle(CMD[idx]) | ||
phase_odom = np.angle(ODOM[idx]) | ||
|
||
phase_difference = phase_odom - phase_cmd | ||
latency_estimated = -phase_difference / omega | ||
|
||
print(f"Known latency: {known_latency:.4f} seconds") | ||
print(f"Estimated latency: {latency_estimated:.4f} seconds") | ||
|
||
# Plotting | ||
plt.plot(time[:fft_size], cmd_segment, label='Commanded Velocity') | ||
plt.plot(time[:fft_size], odom_segment, label='Measured Velocity (Smoothed)') | ||
plt.xlabel('Time [s]') | ||
plt.ylabel('Velocity [m/s]') | ||
plt.legend() | ||
plt.show() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters