时间同步算法
状态: 🚧 Work In Progress
多传感器系统最大的挑战之一就是时间同步。本页面将详细解释时间同步的原理和实现。
🤔 为什么需要时间同步?
问题
三个传感器的采样率不同:
Marker: ●───●───●───●───●─── (128 Hz)
IMU: ●●●●●●●●●●●●●●●●●●● (240 Hz)
EMG: ●●●●●●●●●●●●●●●●●●●●●●●●●●●● (2048 Hz)如果直接按采样点对齐:
时刻 0:
Marker[0] = ?
IMU[0] = ?
EMG[0] = ?
这三个"0"不是同一时刻! ❌目标
找到一个"基准事件",让所有传感器以这个事件为 T=0:
Impact (基准事件)
↓
Marker: ─────●───── (T=0)
IMU: ──────●──── (T=0)
EMG: ─────●───── (T=0)🎯 基准事件: Impact
高尔夫挥杆的 Impact
Impact = 球杆击中球的瞬间
为什么选 Impact?
- 是整个挥杆最关键的时刻
- 所有传感器都能检测到
- 是一个"瞬时事件",时间点明确
跳跃的 Impact
Impact = 脚着地的瞬间
在三个传感器中的表现:
- IMU: Z 轴加速度突然增大(着地冲击)
- EMG: 肌肉激活达到峰值(着地瞬间用力最大)
- Marker: CoM 垂直速度为 0(最低点)
🛠️ Impact 检测算法
从 IMU 检测
python
def detect_impact_from_imu(imu_data, imu_rate, threshold=2.0):
"""
从 IMU Z 轴加速度检测 Impact
threshold: 加速度阈值(单位: g)
"""
# 提取 Z 轴加速度
acc_z = imu_data[:, 5] # 假设第 5 列是 acc_z
# 找到加速度突变点(超过阈值)
candidates = np.where(np.abs(acc_z) > threshold * 9.8)[0]
if len(candidates) == 0:
return None
# 返回第一个候选点
impact_frame = candidates[0]
impact_time = impact_frame / imu_rate
return impact_time, impact_frame从 EMG 检测
python
def detect_impact_from_emg(emg_data, emg_rate):
"""
从 EMG 肌肉激活峰值检测 Impact
"""
# 计算所有通道的总激活强度
total_activation = np.sum(np.abs(emg_data), axis=1)
# 计算 RMS 包络
window_size = int(emg_rate * 0.05) # 50ms
rms = np.sqrt(
np.convolve(
total_activation**2,
np.ones(window_size) / window_size,
mode='same'
)
)
# 找到峰值
impact_frame = np.argmax(rms)
impact_time = impact_frame / emg_rate
return impact_time, impact_frame从 Marker 检测
python
def detect_impact_from_marker(marker_data, marker_rate):
"""
从 CoM 垂直速度检测 Impact(速度为 0 的点)
"""
# 计算 CoM
com = np.nanmean(marker_data, axis=0) # (3, samples)
z_position = com[2, :]
# 填充 NaN
valid_mask = ~np.isnan(z_position)
z_position = np.interp(
np.arange(len(z_position)),
np.where(valid_mask)[0],
z_position[valid_mask]
)
# 计算垂直速度(差分)
z_velocity = np.diff(z_position) * marker_rate
# 找到速度最小的点(接近 0)
impact_frame = np.argmin(np.abs(z_velocity))
impact_time = impact_frame / marker_rate
return impact_time, impact_frame📐 时间对齐
步骤 1: 检测 Impact
python
imu_impact_t, imu_impact_f = detect_impact_from_imu(imu_data, imu_rate)
emg_impact_t, emg_impact_f = detect_impact_from_emg(emg_data, emg_rate)
marker_impact_t, marker_impact_f = detect_impact_from_marker(marker_data, marker_rate)步骤 2: 计算时间差
python
# 转换为毫秒
imu_t_ms = imu_impact_t * 1000
emg_t_ms = emg_impact_t * 1000
marker_t_ms = marker_impact_t * 1000
# 计算最大时间差
diffs = [
abs(imu_t_ms - emg_t_ms),
abs(imu_t_ms - marker_t_ms),
abs(emg_t_ms - marker_t_ms)
]
max_diff = max(diffs)
print(f"最大时间差: {max_diff:.1f} ms")步骤 3: 对齐数据
python
def align_to_impact(data, data_rate, impact_frame):
"""将数据对齐到 Impact = 0"""
# 创建新的时间轴,以 Impact 为 0
num_samples = len(data)
time_axis = (np.arange(num_samples) - impact_frame) / data_rate
return time_axis, data
# 对齐所有传感器
imu_time, imu_aligned = align_to_impact(imu_data, imu_rate, imu_impact_f)
emg_time, emg_aligned = align_to_impact(emg_data, emg_rate, emg_impact_f)
marker_time, marker_aligned = align_to_impact(marker_data, marker_rate, marker_impact_f)✅ 验证同步精度
目标精度
目标: <10ms
为什么 10ms?
- 人类反应时间 ~200ms
- 10ms 是运动科学中可接受的精度
- 对于运动分析足够准确
验证方法
python
if max_diff < 10:
print("✅ 同步精度 <10ms,符合要求")
elif max_diff < 20:
print("⚠️ 同步精度 <20ms,可接受但需改进")
else:
print(f"❌ 同步精度 {max_diff:.1f}ms,需要优化")🎓 高级话题 (待完善)
插值和重采样
如果需要统一采样率:
python
from scipy.interpolate import interp1d
# 将 Marker 从 128Hz 重采样到 240Hz
f = interp1d(marker_time, marker_data, axis=-1)
new_time = np.linspace(marker_time[0], marker_time[-1], new_num_samples)
marker_resampled = f(new_time)时钟漂移处理
长时间记录可能出现时钟漂移:
python
# 线性校正
drift_rate = (end_diff - start_diff) / duration
corrected_time = original_time - drift_rate * original_time多个 Impact 检测
一次记录可能有多个动作:
python
# 找到所有峰值
from scipy.signal import find_peaks
peaks, _ = find_peaks(signal, height=threshold, distance=min_distance)📊 数据集验证结果
我们的数据集时间同步很好:
IMU: 8956 / 240 = 37.316 秒
EMG: 76480 / 2048 = 37.344 秒
Marker: 4780 / 128 = 37.344 秒
最大时间差: ~28ms ✅这说明数据集本身的时间同步做得很好!
📚 参考资料
- Phase 3: 时间同步验证 - 实施计划
- Phase 1: 数据加载 - 数据规格
当前状态: 理论和基础算法已完成,实战验证在 Phase 3
这个页面会随着 Phase 3 的进展持续更新。