从站事件
通过 slave.events 访问 SlaveEvents 对象。使用 add_* 方法注册回调,remove_* 方法移除。
事件自动路由
主站级回调会自动路由到对应从站的 slave.events。订阅从站事件无需 master_index/slave_index 参数,系统已自动过滤到当前从站。
功能概览
| 类别 | 方法 | 说明 |
|---|---|---|
| 状态事件 | slave.events.add_state_changed() | 从站 EtherCAT 状态变化 |
| 紧急消息 | slave.events.add_emergency() | CoE Emergency 紧急消息 |
| 热插拔 | slave.events.add_offline() / add_online() | 从站离线/上线 |
| DC 同步 | slave.events.add_dc_sync_lost() | DC 同步丢失 |
| FSoE | slave.events.add_fsoe_state_changed() 等 | FSoE 安全事件 |
从站状态事件
add_state_changed()
def add_state_changed(self, callback: Callable[[int, int], None]) -> None
注册从站状态变化监听。回调参数: (old_state, new_state)。
示例:
slave = master[1]
def on_state_changed(old_state, new_state):
print(f"状态变化: 0x{old_state:02X} -> 0x{new_state:02X}")
slave.events.add_state_changed(on_state_changed)
add_emergency()
def add_emergency(self, callback: Callable[[int, int, int, int, int], None]) -> None
注册紧急事件监听。回调参数: (error_code, error_reg, b1, w1, w2)。
示例:
slave.events.add_emergency(lambda ec, er, b1, w1, w2:
print(f"紧急消息: 错误码=0x{ec:04X}"))
add_offline() / add_online()
def add_offline(self, callback: Callable[[], None]) -> None
def add_online(self, callback: Callable[[], None]) -> None
注册从站离线/上线监听。无参数回调。
示例:
slave.events.add_offline(lambda: print("从站离线"))
slave.events.add_online(lambda: print("从站上线"))
add_dc_sync_lost()
def add_dc_sync_lost(self, callback: Callable[[int], None]) -> None
注册 DC 同步丢失监听。回调参数: (diff_ns)。
示例:
slave.events.add_dc_sync_lost(lambda diff_ns:
print(f"DC 同步丢失: 偏差 {diff_ns}ns"))
输入数据变化(已移除,改用轮询)
已移除(2026-05-21)
slave.events.add_input_changed() 已删除。旧机制靠底层每周期 memcmp 检测输入 PDO 变化并回调;PDO 数据通路改为纯内核共享内存指针轮询(零拷贝)后,该机制不再驱动。
PDO 数据访问
PDO 过程数据已是纯内核 RT 收发 + 内核共享内存指针零拷贝。应用层直接读 slave.pdo.inputs 即得最新输入过程映像,无需事件回调。需感知"输入变了",在 master.on_pdo_cycle() 同步回调或自建轮询线程里读取并与上一周期快照比对。
零拷贝结构体读取用 slave.pdo.bind_pdo_struct(struct_type, is_input=True),返回一个直接映射到 IOmap 内存的 ctypes 结构体引用——绑定一次后字段始终反映最新 PDO 值。
示例:
import ctypes
class ServoInput(ctypes.Structure):
_pack_ = 1
_fields_ = [
("status_word", ctypes.c_uint16),
("actual_position", ctypes.c_int32),
("actual_velocity", ctypes.c_int32),
]
# 绑定一次, 之后字段始终反映最新 PDO 值 (零拷贝)
input_ref = slave.pdo.bind_pdo_struct(ServoInput, is_input=True)
# 在 PDO 周期同步回调里轮询比对
_last_pos = None
def on_pdo_cycle(mi):
global _last_pos
if input_ref and input_ref.actual_position != _last_pos:
_last_pos = input_ref.actual_position
print(f"当前位置: {input_ref.actual_position}")
master.on_pdo_cycle(on_pdo_cycle)
FSoE 安全事件
# FSoE 状态变化 (old_state, new_state)
slave.events.add_fsoe_state_changed(lambda old, new:
print(f"FSoE: {old} -> {new}"))
# FSoE 错误 (error_code)
slave.events.add_fsoe_error(lambda code:
print(f"FSoE 错误: 0x{code:04X}"))
# FSoE 进入失效安全
slave.events.add_fsoe_failsafe_triggered(lambda:
print("进入失效安全模式!"))
# FSoE 安全数据更新 (data: bytes)
slave.events.add_fsoe_safe_data_updated(lambda data:
print(f"安全数据: {data.hex()}"))
# FSoE 数据交换 (input_data, output_data)
slave.events.add_fsoe_data_exchange(lambda inp, out:
print(f"输入: {inp.hex()}, 输出: {out.hex()}"))
移除监听器
所有 add_* 方法都有对应的 remove_* 方法:
slave.events.remove_state_changed(my_callback)
slave.events.remove_emergency(my_callback)
slave.events.remove_offline(my_callback)
slave.events.remove_online(my_callback)
slave.events.remove_dc_sync_lost(my_callback)
线程安全
事件回调在非 UI 线程上触发。更新 UI 时需要线程同步:
import queue
data_queue = queue.Queue()
def on_offline():
data_queue.put(("offline", slave.index))
slave.events.add_offline(on_offline)
注意
回调中避免执行耗时操作。如需处理大量数据,将数据放入队列异步处理。PDO 周期同步回调里读 slave.pdo 过程映像后只做快速入队,耗时处理交给后台线程。
完整示例
import ctypes
from darra_ethercat import EtherCATMaster, EcState
class ServoInput(ctypes.Structure):
_pack_ = 1
_fields_ = [
("status_word", ctypes.c_uint16),
("actual_position", ctypes.c_int32),
("actual_velocity", ctypes.c_int32),
]
with EtherCATMaster() as master:
master.set_network(r"\\Device\\NPF_{GUID}")
master.set_state(EcState.OP)
master.start()
slave = master[1]
# ===== 从站状态事件 =====
slave.events.add_state_changed(lambda old, new:
print(f"状态: 0x{old:02X} -> 0x{new:02X}"))
slave.events.add_emergency(lambda ec, er, b1, w1, w2:
print(f"紧急消息: 0x{ec:04X}"))
slave.events.add_offline(lambda: print("从站离线"))
slave.events.add_online(lambda: print("从站上线"))
slave.events.add_dc_sync_lost(lambda diff:
print(f"DC 同步丢失: {diff}ns"))
# ===== 输入数据访问(零拷贝结构体 + PDO 周期轮询比对) =====
input_ref = slave.pdo.bind_pdo_struct(ServoInput, is_input=True)
_last_pos = [None]
def on_pdo_cycle(mi):
if input_ref and input_ref.actual_position != _last_pos[0]:
_last_pos[0] = input_ref.actual_position
print(f"位置: {input_ref.actual_position}")
master.on_pdo_cycle(on_pdo_cycle)
# ===== FSoE 安全事件(协议专属) =====
if slave.fsoe is not None:
slave.events.add_fsoe_state_changed(lambda old, new:
print(f"FSoE: {old} -> {new}"))
slave.events.add_fsoe_error(lambda code:
print(f"FSoE 错误: 0x{code:04X}"))
slave.events.add_fsoe_failsafe_triggered(lambda:
print("进入失效安全模式!"))
slave.events.add_fsoe_safe_data_updated(lambda data:
print(f"安全数据: {data.hex()}"))
slave.events.add_fsoe_data_exchange(lambda inp, out:
print(f"输入: {inp.hex()}, 输出: {out.hex()}"))
import time
time.sleep(30)
master.stop()