Skip to content

时间同步算法

状态: 🚧 Work In Progress

多传感器系统最大的挑战之一就是时间同步。本页面将详细解释时间同步的原理和实现。


🤔 为什么需要时间同步?

问题

三个传感器的采样率不同:

Marker: ●───●───●───●───●───  (128 Hz)
IMU:    ●●●●●●●●●●●●●●●●●●●  (240 Hz)
EMG:    ●●●●●●●●●●●●●●●●●●●●●●●●●●●●  (2048 Hz)

如果直接按采样点对齐:

时刻 0:
Marker[0] = ?
IMU[0]    = ?
EMG[0]    = ?

这三个"0"不是同一时刻! ❌

目标

找到一个"基准事件",让所有传感器以这个事件为 T=0:

Impact (基准事件)

Marker: ─────●─────  (T=0)
IMU:    ──────●────  (T=0)
EMG:    ─────●─────  (T=0)

🎯 基准事件: Impact

高尔夫挥杆的 Impact

Impact = 球杆击中球的瞬间

为什么选 Impact?

  1. 是整个挥杆最关键的时刻
  2. 所有传感器都能检测到
  3. 是一个"瞬时事件",时间点明确

跳跃的 Impact

Impact = 脚着地的瞬间

在三个传感器中的表现:

  • IMU: Z 轴加速度突然增大(着地冲击)
  • EMG: 肌肉激活达到峰值(着地瞬间用力最大)
  • Marker: CoM 垂直速度为 0(最低点)

🛠️ Impact 检测算法

从 IMU 检测

python
def detect_impact_from_imu(imu_data, imu_rate, threshold=2.0):
    """
    从 IMU Z 轴加速度检测 Impact

    threshold: 加速度阈值(单位: g)
    """
    # 提取 Z 轴加速度
    acc_z = imu_data[:, 5]  # 假设第 5 列是 acc_z

    # 找到加速度突变点(超过阈值)
    candidates = np.where(np.abs(acc_z) > threshold * 9.8)[0]

    if len(candidates) == 0:
        return None

    # 返回第一个候选点
    impact_frame = candidates[0]
    impact_time = impact_frame / imu_rate

    return impact_time, impact_frame

从 EMG 检测

python
def detect_impact_from_emg(emg_data, emg_rate):
    """
    从 EMG 肌肉激活峰值检测 Impact
    """
    # 计算所有通道的总激活强度
    total_activation = np.sum(np.abs(emg_data), axis=1)

    # 计算 RMS 包络
    window_size = int(emg_rate * 0.05)  # 50ms
    rms = np.sqrt(
        np.convolve(
            total_activation**2,
            np.ones(window_size) / window_size,
            mode='same'
        )
    )

    # 找到峰值
    impact_frame = np.argmax(rms)
    impact_time = impact_frame / emg_rate

    return impact_time, impact_frame

从 Marker 检测

python
def detect_impact_from_marker(marker_data, marker_rate):
    """
    从 CoM 垂直速度检测 Impact(速度为 0 的点)
    """
    # 计算 CoM
    com = np.nanmean(marker_data, axis=0)  # (3, samples)
    z_position = com[2, :]

    # 填充 NaN
    valid_mask = ~np.isnan(z_position)
    z_position = np.interp(
        np.arange(len(z_position)),
        np.where(valid_mask)[0],
        z_position[valid_mask]
    )

    # 计算垂直速度(差分)
    z_velocity = np.diff(z_position) * marker_rate

    # 找到速度最小的点(接近 0)
    impact_frame = np.argmin(np.abs(z_velocity))
    impact_time = impact_frame / marker_rate

    return impact_time, impact_frame

📐 时间对齐

步骤 1: 检测 Impact

python
imu_impact_t, imu_impact_f = detect_impact_from_imu(imu_data, imu_rate)
emg_impact_t, emg_impact_f = detect_impact_from_emg(emg_data, emg_rate)
marker_impact_t, marker_impact_f = detect_impact_from_marker(marker_data, marker_rate)

步骤 2: 计算时间差

python
# 转换为毫秒
imu_t_ms = imu_impact_t * 1000
emg_t_ms = emg_impact_t * 1000
marker_t_ms = marker_impact_t * 1000

# 计算最大时间差
diffs = [
    abs(imu_t_ms - emg_t_ms),
    abs(imu_t_ms - marker_t_ms),
    abs(emg_t_ms - marker_t_ms)
]
max_diff = max(diffs)

print(f"最大时间差: {max_diff:.1f} ms")

步骤 3: 对齐数据

python
def align_to_impact(data, data_rate, impact_frame):
    """将数据对齐到 Impact = 0"""
    # 创建新的时间轴,以 Impact 为 0
    num_samples = len(data)
    time_axis = (np.arange(num_samples) - impact_frame) / data_rate

    return time_axis, data

# 对齐所有传感器
imu_time, imu_aligned = align_to_impact(imu_data, imu_rate, imu_impact_f)
emg_time, emg_aligned = align_to_impact(emg_data, emg_rate, emg_impact_f)
marker_time, marker_aligned = align_to_impact(marker_data, marker_rate, marker_impact_f)

✅ 验证同步精度

目标精度

目标: <10ms

为什么 10ms?

  • 人类反应时间 ~200ms
  • 10ms 是运动科学中可接受的精度
  • 对于运动分析足够准确

验证方法

python
if max_diff < 10:
    print("✅ 同步精度 <10ms,符合要求")
elif max_diff < 20:
    print("⚠️ 同步精度 <20ms,可接受但需改进")
else:
    print(f"❌ 同步精度 {max_diff:.1f}ms,需要优化")

🎓 高级话题 (待完善)

插值和重采样

如果需要统一采样率:

python
from scipy.interpolate import interp1d

# 将 Marker 从 128Hz 重采样到 240Hz
f = interp1d(marker_time, marker_data, axis=-1)
new_time = np.linspace(marker_time[0], marker_time[-1], new_num_samples)
marker_resampled = f(new_time)

时钟漂移处理

长时间记录可能出现时钟漂移:

python
# 线性校正
drift_rate = (end_diff - start_diff) / duration
corrected_time = original_time - drift_rate * original_time

多个 Impact 检测

一次记录可能有多个动作:

python
# 找到所有峰值
from scipy.signal import find_peaks
peaks, _ = find_peaks(signal, height=threshold, distance=min_distance)

📊 数据集验证结果

我们的数据集时间同步很好:

IMU:    8956 / 240   = 37.316 秒
EMG:    76480 / 2048 = 37.344 秒
Marker: 4780 / 128   = 37.344 秒

最大时间差: ~28ms ✅

这说明数据集本身的时间同步做得很好!


📚 参考资料


当前状态: 理论和基础算法已完成,实战验证在 Phase 3

这个页面会随着 Phase 3 的进展持续更新。