跳到主要内容

从站事件

通过 slave.events 访问 SlaveEvents 对象。使用 add_* 方法注册回调,remove_* 方法移除。

事件自动路由

主站级回调会自动路由到对应从站的 slave.events。订阅从站事件无需 master_index/slave_index 参数,系统已自动过滤到当前从站。

功能概览

类别方法说明
状态事件slave.events.add_state_changed()从站 EtherCAT 状态变化
紧急消息slave.events.add_emergency()CoE Emergency 紧急消息
热插拔slave.events.add_offline() / add_online()从站离线/上线
DC 同步slave.events.add_dc_sync_lost()DC 同步丢失
FSoEslave.events.add_fsoe_state_changed() 等FSoE 安全事件

从站状态事件

add_state_changed()

def add_state_changed(self, callback: Callable[[int, int], None]) -> None

注册从站状态变化监听。回调参数: (old_state, new_state)

示例:

slave = master[1]

def on_state_changed(old_state, new_state):
print(f"状态变化: 0x{old_state:02X} -> 0x{new_state:02X}")

slave.events.add_state_changed(on_state_changed)

add_emergency()

def add_emergency(self, callback: Callable[[int, int, int, int, int], None]) -> None

注册紧急事件监听。回调参数: (error_code, error_reg, b1, w1, w2)

示例:

slave.events.add_emergency(lambda ec, er, b1, w1, w2:
print(f"紧急消息: 错误码=0x{ec:04X}"))

add_offline() / add_online()

def add_offline(self, callback: Callable[[], None]) -> None
def add_online(self, callback: Callable[[], None]) -> None

注册从站离线/上线监听。无参数回调。

示例:

slave.events.add_offline(lambda: print("从站离线"))
slave.events.add_online(lambda: print("从站上线"))

add_dc_sync_lost()

def add_dc_sync_lost(self, callback: Callable[[int], None]) -> None

注册 DC 同步丢失监听。回调参数: (diff_ns)

示例:

slave.events.add_dc_sync_lost(lambda diff_ns:
print(f"DC 同步丢失: 偏差 {diff_ns}ns"))

输入数据变化(已移除,改用轮询)

已移除(2026-05-21)

slave.events.add_input_changed() 已删除。旧机制靠底层每周期 memcmp 检测输入 PDO 变化并回调;PDO 数据通路改为纯内核共享内存指针轮询(零拷贝)后,该机制不再驱动。

PDO 数据访问

PDO 过程数据已是纯内核 RT 收发 + 内核共享内存指针零拷贝。应用层直接读 slave.pdo.inputs 即得最新输入过程映像,无需事件回调。需感知"输入变了",在 master.on_pdo_cycle() 同步回调或自建轮询线程里读取并与上一周期快照比对。

零拷贝结构体读取用 slave.pdo.bind_pdo_struct(struct_type, is_input=True),返回一个直接映射到 IOmap 内存的 ctypes 结构体引用——绑定一次后字段始终反映最新 PDO 值。

示例:

import ctypes

class ServoInput(ctypes.Structure):
_pack_ = 1
_fields_ = [
("status_word", ctypes.c_uint16),
("actual_position", ctypes.c_int32),
("actual_velocity", ctypes.c_int32),
]

# 绑定一次, 之后字段始终反映最新 PDO 值 (零拷贝)
input_ref = slave.pdo.bind_pdo_struct(ServoInput, is_input=True)

# 在 PDO 周期同步回调里轮询比对
_last_pos = None

def on_pdo_cycle(mi):
global _last_pos
if input_ref and input_ref.actual_position != _last_pos:
_last_pos = input_ref.actual_position
print(f"当前位置: {input_ref.actual_position}")

master.on_pdo_cycle(on_pdo_cycle)

FSoE 安全事件

# FSoE 状态变化 (old_state, new_state)
slave.events.add_fsoe_state_changed(lambda old, new:
print(f"FSoE: {old} -> {new}"))

# FSoE 错误 (error_code)
slave.events.add_fsoe_error(lambda code:
print(f"FSoE 错误: 0x{code:04X}"))

# FSoE 进入失效安全
slave.events.add_fsoe_failsafe_triggered(lambda:
print("进入失效安全模式!"))

# FSoE 安全数据更新 (data: bytes)
slave.events.add_fsoe_safe_data_updated(lambda data:
print(f"安全数据: {data.hex()}"))

# FSoE 数据交换 (input_data, output_data)
slave.events.add_fsoe_data_exchange(lambda inp, out:
print(f"输入: {inp.hex()}, 输出: {out.hex()}"))

移除监听器

所有 add_* 方法都有对应的 remove_* 方法:

slave.events.remove_state_changed(my_callback)
slave.events.remove_emergency(my_callback)
slave.events.remove_offline(my_callback)
slave.events.remove_online(my_callback)
slave.events.remove_dc_sync_lost(my_callback)

线程安全

事件回调在非 UI 线程上触发。更新 UI 时需要线程同步:

import queue
data_queue = queue.Queue()

def on_offline():
data_queue.put(("offline", slave.index))

slave.events.add_offline(on_offline)
注意

回调中避免执行耗时操作。如需处理大量数据,将数据放入队列异步处理。PDO 周期同步回调里读 slave.pdo 过程映像后只做快速入队,耗时处理交给后台线程。

完整示例

import ctypes
from darra_ethercat import EtherCATMaster, EcState

class ServoInput(ctypes.Structure):
_pack_ = 1
_fields_ = [
("status_word", ctypes.c_uint16),
("actual_position", ctypes.c_int32),
("actual_velocity", ctypes.c_int32),
]

with EtherCATMaster() as master:
master.set_network(r"\\Device\\NPF_{GUID}")
master.set_state(EcState.OP)
master.start()

slave = master[1]

# ===== 从站状态事件 =====
slave.events.add_state_changed(lambda old, new:
print(f"状态: 0x{old:02X} -> 0x{new:02X}"))

slave.events.add_emergency(lambda ec, er, b1, w1, w2:
print(f"紧急消息: 0x{ec:04X}"))

slave.events.add_offline(lambda: print("从站离线"))
slave.events.add_online(lambda: print("从站上线"))

slave.events.add_dc_sync_lost(lambda diff:
print(f"DC 同步丢失: {diff}ns"))

# ===== 输入数据访问(零拷贝结构体 + PDO 周期轮询比对) =====
input_ref = slave.pdo.bind_pdo_struct(ServoInput, is_input=True)
_last_pos = [None]

def on_pdo_cycle(mi):
if input_ref and input_ref.actual_position != _last_pos[0]:
_last_pos[0] = input_ref.actual_position
print(f"位置: {input_ref.actual_position}")

master.on_pdo_cycle(on_pdo_cycle)

# ===== FSoE 安全事件(协议专属) =====
if slave.fsoe is not None:
slave.events.add_fsoe_state_changed(lambda old, new:
print(f"FSoE: {old} -> {new}"))

slave.events.add_fsoe_error(lambda code:
print(f"FSoE 错误: 0x{code:04X}"))

slave.events.add_fsoe_failsafe_triggered(lambda:
print("进入失效安全模式!"))

slave.events.add_fsoe_safe_data_updated(lambda data:
print(f"安全数据: {data.hex()}"))

slave.events.add_fsoe_data_exchange(lambda inp, out:
print(f"输入: {inp.hex()}, 输出: {out.hex()}"))

import time
time.sleep(30)
master.stop()