主动异步隔离 (Async Isolation)
本页是"主动异步隔离层"的参考。 它把
build/set_state/ SDO 读写 / 静态扫描这些会阻塞总线/邮箱的同步操作,包装成async def+await的协程版本:await即在后台串行执行、可取消、带进度,避免在 asyncio 事件循环 / GUI 线程上卡死,并保证同一主站任意时刻只有一个操作打总线。语义对齐 C# —— 行为与 C# 主动异步隔离 逐点一致:每主站串行闸(单线程后台串行执行 + await 入口可取消排队,等价
SemaphoreSlim(1,1))、关闭守门、取消时安全中断并复位、独立静态扫描闸。命名遵循 Python 惯例(snake_case+_async后缀 +await)。旧同步 API 全部保留。
异步隔离层经 ctypes 调底层 native 库。import darra_ethercat 即把 *_async 协程注入到 EtherCATMaster / Slave,无需额外导入。
一、设计要点
| 要点 | 实现 |
|---|---|
| 串行 (Serial) | 每主站一把串行闸 = 单线程后台串行执行(物理串行)+ await 入口可取消排队。同一主站的 build / set_state / SDO 共用同一把闸,绝不并发打总线/邮箱。 |
| 后台隔离 (Isolation) | *_async 协程 await 时把阻塞调用丢后台线程执行,不堵事件循环。 |
| 守门 (Shutdown Guard) | close_async 先置关闭标志;拿闸后与执行体内各查一次,命中即抛 CancelledError 不进 native,防释放后访问。 |
| 取消 (Cancel) | CancelToken(threading.Event + 回调)。取消时在后台对正在执行的阻塞调用发起安全中断并复位,不堵 cancel() 调用方。静态扫描走独立的扫描中断通道,与主站互不干扰。 |
| 进度 (Progress) | report_init_progress fanout 到 on_init_progress 多播 + 单参 progress 回调。回调在后台 worker 线程触发,调用方需自己 marshal。 |
所有进度回调都在后台 worker 线程触发,不是事件循环线程。回事件循环用 loop.call_soon_threadsafe、Qt 用信号、tkinter 用 widget.after,否则跨线程改 UI 会出问题。
loop = asyncio.get_running_loop()
await master.build_async(
progress=lambda msg: loop.call_soon_threadsafe(update_label, msg)
)
二、API 总览
| 类别 | 方法 | 返回类型 | 读写 | 说明 |
|---|---|---|---|---|
| 主站一条龙 | build_async(progress, cancel) | await → dict | 异步 | 异步执行 连接→扫描→配置→进 OP,带进度与取消 |
| 状态切换 | set_state_async(state, progress, cancel) | await → bool | 异步 | 异步状态切换,经串行闸排队 |
| 网络 | set_network_async(adapter, redundant_adapter, progress, cancel) | await → int | 异步 | 异步设主/冗余网卡(链式串行用) |
| 从站 SDO | sdo_read_async(slave_index, index, subindex, complete_access, cancel) | await → bytes | None | 异步 | 异步读 SDO,转调父主站同一把闸 |
| 从站 SDO | sdo_write_async(slave_index, index, subindex, data, complete_access, cancel) | await → bool | 异步 | 异步写 SDO |
| 静态扫描 | scan_async(adapter_name, secondary_adapter, dll_path, cancel) | await → list | 异步 | 异步静态扫描,走独立扫描闸 + 独立扫描中断通道 |
| 生命周期 | close_async(cancel) / dispose_async(cancel) | await → None | 异步 | 异步软停车 + 释放,先置关闭守门标志再释放 native |
| 进度 | on_init_progress | 多播订阅属性 | 读写 | 初始化进度多播事件(后台线程触发) |
| 状态查询 | is_async_operation_in_progress | bool | 只读 | 当前是否有异步操作在串行闸中执行 |
| 取消令牌 | CancelToken | class | — | 取消令牌(threading.Event + 回调注册) |
Slave上也注入了sdo_read_async(index, subindex, ...)/sdo_write_async(...),内部转调父主站串行闸;找不到父主站时退化为后台执行。
三、方法详解
build_async(progress=None, cancel=None)
async def build_async(self, progress=None, cancel=None) -> dict
异步执行"连接 → 扫描 → 配置 → 进 OP"一条龙。await 时在单线程池后台串行执行。
参数:
progress(Callable[[str], None]) — 进度回调,在后台线程触发,调用方自行 marshal(可选)cancel(CancelToken) — 取消令牌;取消时对正在执行的阻塞调用发起安全中断并自动复位(可选)
返回值:
dict— 构建结果(成功标志 / 从站数等)
示例:
token = CancelToken()
loop = asyncio.get_running_loop()
result = await master.build_async(
progress=lambda m: loop.call_soon_threadsafe(print, m),
cancel=token)
set_state_async(state, progress=None, cancel=None)
async def set_state_async(self, state, progress=None, cancel=None) -> bool
异步状态切换,经主站串行闸排队,与同主站的 build / SDO 互斥串行。
示例:
ok = await master.set_state_async(EcState.OP)
sdo_read_async / sdo_write_async
async def sdo_read_async(self, slave_index, index, subindex=0, complete_access=False, cancel=None) -> bytes | None
async def sdo_write_async(self, slave_index, index, subindex, data, complete_access=False, cancel=None) -> bool
异步 SDO 读写。转调父主站的同一把串行闸,绝不与同主站 PDO / 状态切换并发打邮箱。Slave 上的同名方法省略 slave_index。
示例:
data = await master.sdo_read_async(1, 0x6041, 0x00)
ok = await master.slave(1).sdo_write_async(0x6040, 0x00, b'\x06\x00')
scan_async(adapter_name, secondary_adapter=None, dll_path=None, cancel=None)
async def scan_async(adapter_name, secondary_adapter=None, dll_path=None, cancel=None) -> list
异步静态扫描。走进程级独立扫描闸,取消时走独立的扫描中断通道(与主站的中断通道严格区分,互不干扰)。
close_async / dispose_async(cancel=None)
async def close_async(self, cancel=None)
async def dispose_async(self, cancel=None) # close_async 别名
异步软停车 + 释放。先置关闭守门标志,再释放 native —— 此后入队的异步操作会被守门以 CancelledError 拒绝,不再打 native。
四、取消语义
CancelToken 触发后,隔离层对正在执行的阻塞调用发起安全中断并立即自动复位中断状态:
| 操作类型 | 取消行为 | 中断通道 |
|---|---|---|
| 主站 build / set_state / SDO | 中断当前阻塞调用并自动复位 | 主站中断通道 |
| 静态扫描 scan_async | 中断当前扫描并自动复位 | 独立扫描中断通道 |
中断后隔离层已内置复位,确保后续 build / set_state 不会被残留的中断状态误判为"被中断"而失败。应用层只需调 token.cancel(),无需手动复位。
token = CancelToken()
task = asyncio.create_task(master.build_async(cancel=token))
# 用户点取消
token.cancel()
try:
await task
except asyncio.CancelledError:
print("已取消")
向后兼容
import darra_ethercat 时异步隔离层(串行闸 + 守门 + 取消)自动接线注入,公开 *_async 协程即可用。原同步 build / set_network / set_state / slave / CoE.sdo_read / sdo_write 全部保留,向后兼容。