跳到主要内容

Python 特有语法糖

DarraEtherCAT Python SDK 在跨语言对齐 (C# / Java / C++ / Rust) 的"标准 API"之上, 额外提供 Python 特有语法糖, 让代码更 Pythonic, 同时不破坏跨语言兼容性.

启用方式 (一行):

import darra_ethercat
import darra_ethercat.sugar # 自动注入全部语法糖

按需导入:

from darra_ethercat.sugar import SlaveIdentityKey
from darra_ethercat.sugar.async_streams import EmergencyEvent
from darra_ethercat.sugar.pattern_match import classify_state, is_running
设计原则
  • 零侵入: sugar 只是 monkey-patch 主 SDK 的类, 不修改任何主 .py 文件
  • 幂等: 重复 import darra_ethercat.sugar 不会重复注入
  • 可选: 不导入 sugar, 主 SDK 行为不变, 跨语言一致
  • 向后兼容: 主 SDK 已有的 dunder (__iter__/__len__/__getitem__ ...) sugar 不覆盖

一. 上下文管理 (Context Manager)

主 SDK 已支持 (无需 sugar)

from darra_ethercat import EtherCATMaster, EcState

# 同步 with: 自动 close (Stop + Dispose)
with EtherCATMaster() as master:
master.set_network(r"\\Device\\NPF_{GUID}")
master.set_state(EcState.OP)
master.start()
# ... 业务
# 退出 with 自动 master.close()

sugar 增强

1) master.session() — 一行启动+清理

import darra_ethercat.sugar  # 激活
from darra_ethercat import EtherCATMaster, EcState

with EtherCATMaster() as master:
master.set_network(r"\\Device\\NPF_{GUID}")
with master.session(state=EcState.OP):
# 此时已 OP, 退出自动 stop()
for slave in master:
print(slave.summary())

2) EtherCATMaster.opening() — 工厂式上下文

import darra_ethercat.sugar
from darra_ethercat import EtherCATMaster, EcState

with EtherCATMaster.opening(r"\\Device\\NPF_{GUID}", state=EcState.OP) as master:
print(f"已 OP, 共 {len(master)} 个从站")

opening() 在一行里完成: 创建主站 + set_network + set_state + start. 退出自动 stop+close.

3) async with master: — asyncio 友好

import asyncio
import darra_ethercat.sugar
from darra_ethercat import EtherCATMaster

async def main():
async with EtherCATMaster() as master:
master.set_network(r"\\Device\\NPF_{GUID}")
master.start()
# ...
# 退出走 executor, 不阻塞事件循环

asyncio.run(main())
何时用推荐写法
同步脚本, 一次性会话with EtherCATMaster.opening(nic, state=OP) as m:
同步脚本, 已有主站想临时启动with master.session(state=OP):
异步应用 (FastAPI / aiohttp)async with EtherCATMaster() as m:
跨语言对齐写法with EtherCATMaster() as m: (主 SDK 内置)

二. 迭代器 / 索引 / 包含 (Iter / Index / Contain)

主 SDK 已支持

# for 迭代 (主 SDK 已支持)
for slave in master:
print(slave.name)

# 下标索引 (主 SDK 已支持)
slave = master[1] # 1-based, 与 EtherCAT 规范一致

# 长度 (主 SDK 已支持)
n = len(master) # 从站数

# 列表化 (主 SDK 已支持)
slaves = master.slaves # List[Slave]

sugar 增强: slave in master

import darra_ethercat.sugar

# 索引存在性
if 5 in master:
print(master[5].name)

# Slave 实例存在性 (基于 identity 比较)
if some_slave in another_master:
print("从站迁移到了另一个主站")

# 通过 SlaveIdentityKey 查询全网
from darra_ethercat.sugar import SlaveIdentityKey

beckhoff_ek1100 = SlaveIdentityKey(
vendor_id=0x00000002, product_code=0x044C2C52,
revision_no=0, serial_no=0,
)
if beckhoff_ek1100 in master:
print("发现 EK1100")

sugar 增强: LINQ 风格过滤

主 SDK 走 [s for s in master if ...] 列表推导. sugar 提供更短的链式写法:

# 过滤
running = master.where(lambda s: s.state == 0x08)

# 投影
names = master.select(lambda s: s.name)

# 首个匹配
first_drive = master.first(lambda s: s.coe is not None and s.cia402 is not None)

# 找不到抛错
master.first_or_raise(lambda s: s.name == "EL1008", message="缺 EL1008")

# 分组
by_vendor = master.group_by(lambda s: s.vendor_id)
# {0x2: [Slave[1], Slave[2]], 0x66f: [Slave[3]]}

# 计数
n_servo = master.count_where(lambda s: s.cia402 is not None)
写法等价标准写法何时用
master.where(p)[s for s in master if p(s)]链式 + 短表达式
master.select(f)[f(s) for s in master]链式投影
master.first(p)next((s for s in master if p(s)), None)找首个
master.first_or_raise(p)next + KeyError必定存在场景
master.group_by(k)itertools.groupby + sort不要求排序

sugar 增强: Slave 可作 set / dict 元素

import darra_ethercat.sugar

s1 = master[1]
s1_again = master[1]

assert s1 == s1_again # __eq__ 走 (master_index, slave_index)
assert hash(s1) == hash(s1_again)

ignored = {master[3], master[5]} # set 去重
if some_slave not in ignored:
...

三. 异步生成器 (Async Generators)

把"回调注册"模式翻成 async for ...: 流式 API. 内部用 asyncio.Queue 做线程桥, 已注册的回调在 stream 退出时让队列填充安静失败.

主站状态变化流

import asyncio
import darra_ethercat.sugar
from darra_ethercat import EtherCATMaster, EcState

async def watch_state():
async with EtherCATMaster() as master:
master.set_network(r"\\Device\\NPF_{GUID}")
master.start()

async for change in master.state_changes():
print(f"主站: {change.old_state} -> {change.new_state}")
if change.new_state == EcState.OP:
break

asyncio.run(watch_state())

changeStateChange(old_state: int, new_state: int, timestamp: float) frozen dataclass.

从站状态变化流

async def watch_slaves():
async with EtherCATMaster() as master:
# ...
async for change in master.slave_state_changes():
print(f"slave[{change.slave_index}]: "
f"0x{change.old_state:02X} -> 0x{change.new_state:02X}")

紧急消息流

async def watch_emergency():
async with EtherCATMaster() as master:
# ...
async for ev in master.emergencies():
print(f"slave[{ev.slave_index}] EMCY 0x{ev.error_code:04X} "
f"reg=0x{ev.error_register:02X} @ {ev.timestamp:.3f}s")
if ev.error_code == 0x0000:
# 错误清除, 继续
continue

CoE 0x10F3 诊断历史流

ETG.1020 标准定义了 0x10F3 诊断消息环形缓冲. sugar 把"轮询 → 读 meta → 读 message → ack"四步封装成 async stream:

async def watch_diagnostics(slave):
async for raw in slave.coe.diagnostic_stream(poll_ms=50):
# raw 是原始 Octet 字节, 跳过 8 字节头部得到文本
text = raw[8:].decode("utf-8", errors="replace")
print(f"[diag] {text}")
参数说明
poll_ms轮询周期, 默认 50 ms. 太小会压 CoE 邮箱, 建议 20-200
buf_size单条消息缓冲, 默认 1024 字节

退出: break 即可, 内部 sleep 自动结束, 无后台线程残留.

何时用 async stream

场景推荐
FastAPI / aiohttp 后端实时推送 EtherCAT 事件async stream
Jupyter notebook 持续监控async stream + nest_asyncio
老式同步脚本, 一次拍快照主 SDK 回调 (master.on_emergency(cb))

四. dataclass 与不可变身份 (SlaveIdentityKey)

主 SDK 中 slave.identity 返回 dict:

{'vendor_id': 0x2, 'product_code': 0x044C2C52, 'revision_no': 0, 'serial_no': 0}

dict 不能 hash, 不能元组解包, 不能作为 dict key. sugar 提供不可变包装:

from darra_ethercat.sugar import SlaveIdentityKey

key = SlaveIdentityKey.from_slave(master[1])
# key = SlaveIdentityKey(vendor=0x00000002, product=0x044C2C52, rev=0x0, serial=0x0)

# 元组解包
vendor, product, rev, serial = key

# 可作 dict key / set 元素
seen = {key}
catalog: dict[SlaveIdentityKey, str] = {key: "EK1100"}

# 等值比较
assert SlaveIdentityKey.from_slave(master[1]) == key

# 兼容主 SDK identity dict
key2 = SlaveIdentityKey.from_dict(master[1].identity)
assert key == key2

# 标准验证逻辑 (revision: actual >= configured 视为兼容)
assert actual_key.matches(configured_key, check_revision=True, check_serial=False)

为什么用 frozen dataclass 而不是 NamedTuple

  • frozen dataclass 自动生成 __eq__/__hash__/__repr__, 字段类型注解保留
  • NamedTuple 在 isinstance 检查上有歧义 (子类化 tuple)
  • dataclass 字段名和顺序一致, 跨 SDK 文档一致更易维护

五. Pattern Matching (Python 3.10+)

EcStateIntEnum, match/case 直接支持:

from darra_ethercat import EcState

match slave.state:
case EcState.OP:
print("运行中")
case EcState.SAFE_OP:
print("安全运行")
case EcState.PRE_OP | EcState.INIT:
print("启动中")
case s if s & 0x10:
print(f"错误 ack 待处理: 0x{s:02X}")
case _:
print(f"未知 0x{s:02X}")

sugar 提供更高层的语义函数:

from darra_ethercat.sugar import classify_state, is_running, is_error

s = slave.state
print(classify_state(s)) # "operational" / "pre-operational" / "operational/error" ...

if is_running(s): # SafeOp 或 Op
...
if is_error(s): # ACK 位 = 1
slave.set_error_ack()

classify_state 输出表 (主 SDK 整型 → sugar 字符串):

state (hex)classify_state
0x01init
0x02pre-operational
0x04safe-operational
0x08operational
0x12pre-operational/error
0x14safe-operational/error
0x18operational/ack-pending

六. 调试摘要 (summary())

主 SDK 的 __str__/__repr__ 是单行, 适合日志:

>>> print(repr(slave))
Slave(index=3)

>>> print(slave)
Slave[3] EL2004 (0x00000002:0x07d43052)

sugar 提供多行 summary():

>>> print(slave.summary())
Slave[3] @ master 0
name : EL2004
identity : Vendor=0x00000002 Product=0x07D43052 Rev=0x00100000
state : 8
has_dc : True
group : 0

>>> print(master.summary())
EtherCATMaster[0] 7 slaves
state : 8
loop_cycle : 1000000 ns
slaves : 7

summary() 不替换 __str__, 是显式调用. 适合 REPL / 故障日志.


七. 静态类型 (Type Hints + Protocol)

sugar 子包带 py.typed marker, 所有公开符号有完整类型注解, 给 mypy / pyright 用:

from darra_ethercat.sugar.protocols import MasterLike, SlaveLike, CoELike

def configure_drive(slave: SlaveLike) -> None:
"""任何具备 .coe 等属性的对象都满足 SlaveLike."""
slave.coe.sdo_write_value(0x6060, 0, 8, dtype="u8")
slave.coe.sdo_write_value(0x6040, 0, 0x06, dtype="u16")

# 主 SDK Slave 自动满足 SlaveLike, 无需继承
configure_drive(master[1])

# 测试 mock 也能满足 (鸭子类型 + Protocol 静态可检)
class MockSlave:
coe: CoELike = ...
state: int = 0x08
name: str = "mock"
identity: dict | None = None
configure_drive(MockSlave()) # mypy 通过

@runtime_checkable 装饰过, 也支持 isinstance(obj, SlaveLike) 运行时检查 (建议仅做调试用, 不要走业务热路径 — Protocol isinstance 比 nominal 慢).


八. 主 SDK 已有的 Pythonic API (sugar 不重复)

能力来源
with EtherCATMaster() as master:EtherCATMaster.__enter__/__exit__
for slave in master:EtherCATMaster.__iter__
master[1]EtherCATMaster.__getitem__
len(master)EtherCATMaster.__len__
master.slaves (List[Slave])EtherCATMaster.slaves
str(slave) / repr(slave)Slave.__str__/__repr__
str(master) / repr(master)EtherCATMaster.__str__/__repr__
master.from_json(cfg) 一步初始化EtherCATMaster.from_json (classmethod)
master.on_state_change(cb) 回调EtherCATMaster.on_state_change
master.on_emergency(cb) 回调EtherCATMaster.on_emergency
master.groups[g][i] 组访问_GroupAccessor / _GroupView

调用上述时不需要 import darra_ethercat.sugar.


九. 速查表 (Cheat Sheet)

import darra_ethercat
import darra_ethercat.sugar
from darra_ethercat import EtherCATMaster, EcState
from darra_ethercat.sugar import SlaveIdentityKey, classify_state

# 1) 一行启动 OP + 清理
with EtherCATMaster.opening(r"\\Device\\NPF_{...}", state=EcState.OP) as master:

# 2) 迭代 / 索引 / 包含
for slave in master: # __iter__
if 5 in master and slave._si == 5: # __contains__
print(master[5].summary()) # __getitem__ + summary

# 3) LINQ 风格
drives = master.where(lambda s: s.cia402 is not None)
by_vendor = master.group_by(lambda s: s.vendor_id)
n_op = master.count_where(lambda s: classify_state(s.state) == "operational")

# 4) 不可变身份元组
keys = {SlaveIdentityKey.from_slave(s) for s in master}

# 5) pattern matching
match master[1].state:
case EcState.OP: ...
case EcState.SAFE_OP | EcState.PRE_OP: ...

# 6) async stream (在 async 函数里)
# async for ev in master.emergencies(): ...
# async for raw in master[1].coe.diagnostic_stream(): ...

十. 关闭 sugar

不导入 darra_ethercat.sugar 即可 — 主 SDK 行为完全不变, 与其他语言 SDK 对齐. sugar 只是可选增强, 不影响生产部署.