Python 特有语法糖
DarraEtherCAT Python SDK 在跨语言对齐 (C# / Java / C++ / Rust) 的"标准 API"之上, 额外提供 Python 特有语法糖, 让代码更 Pythonic, 同时不破坏跨语言兼容性.
启用方式 (一行):
import darra_ethercat
import darra_ethercat.sugar # 自动注入全部语法糖
按需导入:
from darra_ethercat.sugar import SlaveIdentityKey
from darra_ethercat.sugar.async_streams import EmergencyEvent
from darra_ethercat.sugar.pattern_match import classify_state, is_running
- 零侵入: sugar 只是 monkey-patch 主 SDK 的类, 不修改任何主 .py 文件
- 幂等: 重复
import darra_ethercat.sugar不会重复注入 - 可选: 不导入 sugar, 主 SDK 行为不变, 跨语言一致
- 向后兼容: 主 SDK 已有的 dunder (
__iter__/__len__/__getitem__...) sugar 不覆盖
一. 上下文管理 (Context Manager)
主 SDK 已支持 (无需 sugar)
from darra_ethercat import EtherCATMaster, EcState
# 同步 with: 自动 close (Stop + Dispose)
with EtherCATMaster() as master:
master.set_network(r"\\Device\\NPF_{GUID}")
master.set_state(EcState.OP)
master.start()
# ... 业务
# 退出 with 自动 master.close()
sugar 增强
1) master.session() — 一行启动+清理
import darra_ethercat.sugar # 激活
from darra_ethercat import EtherCATMaster, EcState
with EtherCATMaster() as master:
master.set_network(r"\\Device\\NPF_{GUID}")
with master.session(state=EcState.OP):
# 此时已 OP, 退出自动 stop()
for slave in master:
print(slave.summary())
2) EtherCATMaster.opening() — 工厂式上下文
import darra_ethercat.sugar
from darra_ethercat import EtherCATMaster, EcState
with EtherCATMaster.opening(r"\\Device\\NPF_{GUID}", state=EcState.OP) as master:
print(f"已 OP, 共 {len(master)} 个从站")
opening() 在一行里完成: 创建主站 + set_network + set_state + start. 退出自动 stop+close.
3) async with master: — asyncio 友好
import asyncio
import darra_ethercat.sugar
from darra_ethercat import EtherCATMaster
async def main():
async with EtherCATMaster() as master:
master.set_network(r"\\Device\\NPF_{GUID}")
master.start()
# ...
# 退出走 executor, 不阻塞事件循环
asyncio.run(main())
| 何时用 | 推荐写法 |
|---|---|
| 同步脚本, 一次性会话 | with EtherCATMaster.opening(nic, state=OP) as m: |
| 同步脚本, 已有主站想临时启动 | with master.session(state=OP): |
| 异步应用 (FastAPI / aiohttp) | async with EtherCATMaster() as m: |
| 跨语言对齐写法 | with EtherCATMaster() as m: (主 SDK 内置) |
二. 迭代器 / 索引 / 包含 (Iter / Index / Contain)
主 SDK 已支持
# for 迭代 (主 SDK 已支持)
for slave in master:
print(slave.name)
# 下标索引 (主 SDK 已支持)
slave = master[1] # 1-based, 与 EtherCAT 规范一致
# 长度 (主 SDK 已支持)
n = len(master) # 从站数
# 列表化 (主 SDK 已支持)
slaves = master.slaves # List[Slave]
sugar 增强: slave in master
import darra_ethercat.sugar
# 索引存在性
if 5 in master:
print(master[5].name)
# Slave 实例存在性 (基于 identity 比较)
if some_slave in another_master:
print("从站迁移到了另一个主站")
# 通过 SlaveIdentityKey 查询全网
from darra_ethercat.sugar import SlaveIdentityKey
beckhoff_ek1100 = SlaveIdentityKey(
vendor_id=0x00000002, product_code=0x044C2C52,
revision_no=0, serial_no=0,
)
if beckhoff_ek1100 in master:
print("发现 EK1100")
sugar 增强: LINQ 风格过滤
主 SDK 走 [s for s in master if ...] 列表推导. sugar 提供更短的链式写法:
# 过滤
running = master.where(lambda s: s.state == 0x08)
# 投影
names = master.select(lambda s: s.name)
# 首个匹配
first_drive = master.first(lambda s: s.coe is not None and s.cia402 is not None)
# 找不到抛错
master.first_or_raise(lambda s: s.name == "EL1008", message="缺 EL1008")
# 分组
by_vendor = master.group_by(lambda s: s.vendor_id)
# {0x2: [Slave[1], Slave[2]], 0x66f: [Slave[3]]}
# 计数
n_servo = master.count_where(lambda s: s.cia402 is not None)
| 写法 | 等价标准写法 | 何时用 |
|---|---|---|
master.where(p) | [s for s in master if p(s)] | 链式 + 短表达式 |
master.select(f) | [f(s) for s in master] | 链式投影 |
master.first(p) | next((s for s in master if p(s)), None) | 找首个 |
master.first_or_raise(p) | next + KeyError | 必定存在场景 |
master.group_by(k) | itertools.groupby + sort | 不要求排序 |
sugar 增强: Slave 可作 set / dict 元素
import darra_ethercat.sugar
s1 = master[1]
s1_again = master[1]
assert s1 == s1_again # __eq__ 走 (master_index, slave_index)
assert hash(s1) == hash(s1_again)
ignored = {master[3], master[5]} # set 去重
if some_slave not in ignored:
...
三. 异步生成器 (Async Generators)
把"回调注册"模式翻成 async for ...: 流式 API. 内部用 asyncio.Queue 做线程桥, 已注册的回调在 stream 退出时让队列填充安静失败.
主站状态变化流
import asyncio
import darra_ethercat.sugar
from darra_ethercat import EtherCATMaster, EcState
async def watch_state():
async with EtherCATMaster() as master:
master.set_network(r"\\Device\\NPF_{GUID}")
master.start()
async for change in master.state_changes():
print(f"主站: {change.old_state} -> {change.new_state}")
if change.new_state == EcState.OP:
break
asyncio.run(watch_state())
change 是 StateChange(old_state: int, new_state: int, timestamp: float) frozen dataclass.
从站状态变化流
async def watch_slaves():
async with EtherCATMaster() as master:
# ...
async for change in master.slave_state_changes():
print(f"slave[{change.slave_index}]: "
f"0x{change.old_state:02X} -> 0x{change.new_state:02X}")
紧急消息流
async def watch_emergency():
async with EtherCATMaster() as master:
# ...
async for ev in master.emergencies():
print(f"slave[{ev.slave_index}] EMCY 0x{ev.error_code:04X} "
f"reg=0x{ev.error_register:02X} @ {ev.timestamp:.3f}s")
if ev.error_code == 0x0000:
# 错误清除, 继续
continue
CoE 0x10F3 诊断历史流
ETG.1020 标准定义了 0x10F3 诊断消息环形缓冲. sugar 把"轮询 → 读 meta → 读 message → ack"四步封装成 async stream:
async def watch_diagnostics(slave):
async for raw in slave.coe.diagnostic_stream(poll_ms=50):
# raw 是原始 Octet 字节, 跳过 8 字节头部得到文本
text = raw[8:].decode("utf-8", errors="replace")
print(f"[diag] {text}")
| 参数 | 说明 |
|---|---|
poll_ms | 轮询周期, 默认 50 ms. 太小会压 CoE 邮箱, 建议 20-200 |
buf_size | 单条消息缓冲, 默认 1024 字节 |
退出: break 即可, 内部 sleep 自动结束, 无后台线程残留.
何时用 async stream
| 场景 | 推荐 |
|---|---|
| FastAPI / aiohttp 后端实时推送 EtherCAT 事件 | async stream |
| Jupyter notebook 持续监控 | async stream + nest_asyncio |
| 老式同步脚本, 一次拍快照 | 主 SDK 回调 (master.on_emergency(cb)) |
四. dataclass 与不可变身份 (SlaveIdentityKey)
主 SDK 中 slave.identity 返回 dict:
{'vendor_id': 0x2, 'product_code': 0x044C2C52, 'revision_no': 0, 'serial_no': 0}
dict 不能 hash, 不能元组解包, 不能作为 dict key. sugar 提供不可变包装:
from darra_ethercat.sugar import SlaveIdentityKey
key = SlaveIdentityKey.from_slave(master[1])
# key = SlaveIdentityKey(vendor=0x00000002, product=0x044C2C52, rev=0x0, serial=0x0)
# 元组解包
vendor, product, rev, serial = key
# 可作 dict key / set 元素
seen = {key}
catalog: dict[SlaveIdentityKey, str] = {key: "EK1100"}
# 等值比较
assert SlaveIdentityKey.from_slave(master[1]) == key
# 兼容主 SDK identity dict
key2 = SlaveIdentityKey.from_dict(master[1].identity)
assert key == key2
# 标准验证逻辑 (revision: actual >= configured 视为兼容)
assert actual_key.matches(configured_key, check_revision=True, check_serial=False)
为什么用 frozen dataclass 而不是 NamedTuple
- frozen dataclass 自动生成
__eq__/__hash__/__repr__, 字段类型注解保留 - NamedTuple 在 isinstance 检查上有歧义 (子类化 tuple)
- dataclass 字段名和顺序一致, 跨 SDK 文档一致更易维护
五. Pattern Matching (Python 3.10+)
EcState 是 IntEnum, match/case 直接支持:
from darra_ethercat import EcState
match slave.state:
case EcState.OP:
print("运行中")
case EcState.SAFE_OP:
print("安全运行")
case EcState.PRE_OP | EcState.INIT:
print("启动中")
case s if s & 0x10:
print(f"错误 ack 待处理: 0x{s:02X}")
case _:
print(f"未知 0x{s:02X}")
sugar 提供更高层的语义函数:
from darra_ethercat.sugar import classify_state, is_running, is_error
s = slave.state
print(classify_state(s)) # "operational" / "pre-operational" / "operational/error" ...
if is_running(s): # SafeOp 或 Op
...
if is_error(s): # ACK 位 = 1
slave.set_error_ack()
classify_state 输出表 (主 SDK 整型 → sugar 字符串):
| state (hex) | classify_state |
|---|---|
0x01 | init |
0x02 | pre-operational |
0x04 | safe-operational |
0x08 | operational |
0x12 | pre-operational/error |
0x14 | safe-operational/error |
0x18 | operational/ack-pending |
六. 调试摘要 (summary())
主 SDK 的 __str__/__repr__ 是单行, 适合日志:
>>> print(repr(slave))
Slave(index=3)
>>> print(slave)
Slave[3] EL2004 (0x00000002:0x07d43052)
sugar 提供多行 summary():
>>> print(slave.summary())
Slave[3] @ master 0
name : EL2004
identity : Vendor=0x00000002 Product=0x07D43052 Rev=0x00100000
state : 8
has_dc : True
group : 0
>>> print(master.summary())
EtherCATMaster[0] 7 slaves
state : 8
loop_cycle : 1000000 ns
slaves : 7
summary() 不替换 __str__, 是显式调用. 适合 REPL / 故障日志.
七. 静态类型 (Type Hints + Protocol)
sugar 子包带 py.typed marker, 所有公开符号有完整类型注解, 给 mypy / pyright 用:
from darra_ethercat.sugar.protocols import MasterLike, SlaveLike, CoELike
def configure_drive(slave: SlaveLike) -> None:
"""任何具备 .coe 等属性的对象都满足 SlaveLike."""
slave.coe.sdo_write_value(0x6060, 0, 8, dtype="u8")
slave.coe.sdo_write_value(0x6040, 0, 0x06, dtype="u16")
# 主 SDK Slave 自动满足 SlaveLike, 无需继承
configure_drive(master[1])
# 测试 mock 也能满足 (鸭子类型 + Protocol 静态可检)
class MockSlave:
coe: CoELike = ...
state: int = 0x08
name: str = "mock"
identity: dict | None = None
configure_drive(MockSlave()) # mypy 通过
@runtime_checkable 装饰过, 也支持 isinstance(obj, SlaveLike) 运行时检查 (建议仅做调试用, 不要走业务热路径 — Protocol isinstance 比 nominal 慢).
八. 主 SDK 已有的 Pythonic API (sugar 不重复)
| 能力 | 来源 |
|---|---|
with EtherCATMaster() as master: | EtherCATMaster.__enter__/__exit__ |
for slave in master: | EtherCATMaster.__iter__ |
master[1] | EtherCATMaster.__getitem__ |
len(master) | EtherCATMaster.__len__ |
master.slaves (List[Slave]) | EtherCATMaster.slaves |
str(slave) / repr(slave) | Slave.__str__/__repr__ |
str(master) / repr(master) | EtherCATMaster.__str__/__repr__ |
master.from_json(cfg) 一步初始化 | EtherCATMaster.from_json (classmethod) |
master.on_state_change(cb) 回调 | EtherCATMaster.on_state_change |
master.on_emergency(cb) 回调 | EtherCATMaster.on_emergency |
master.groups[g][i] 组访问 | _GroupAccessor / _GroupView |
调用上述时不需要 import darra_ethercat.sugar.
九. 速查表 (Cheat Sheet)
import darra_ethercat
import darra_ethercat.sugar
from darra_ethercat import EtherCATMaster, EcState
from darra_ethercat.sugar import SlaveIdentityKey, classify_state
# 1) 一行启动 OP + 清理
with EtherCATMaster.opening(r"\\Device\\NPF_{...}", state=EcState.OP) as master:
# 2) 迭代 / 索引 / 包含
for slave in master: # __iter__
if 5 in master and slave._si == 5: # __contains__
print(master[5].summary()) # __getitem__ + summary
# 3) LINQ 风格
drives = master.where(lambda s: s.cia402 is not None)
by_vendor = master.group_by(lambda s: s.vendor_id)
n_op = master.count_where(lambda s: classify_state(s.state) == "operational")
# 4) 不可变身份元组
keys = {SlaveIdentityKey.from_slave(s) for s in master}
# 5) pattern matching
match master[1].state:
case EcState.OP: ...
case EcState.SAFE_OP | EcState.PRE_OP: ...
# 6) async stream (在 async 函数里)
# async for ev in master.emergencies(): ...
# async for raw in master[1].coe.diagnostic_stream(): ...
十. 关闭 sugar
不导入 darra_ethercat.sugar 即可 — 主 SDK 行为完全不变, 与其他语言 SDK 对齐. sugar 只是可选增强, 不影响生产部署.