跳到主要内容

主站诊断

通过 master.diagnostics_info 访问高级诊断功能(帧计数、抖动、丢包率等 MasterDiagnosticsInfo 对象)。

通过 master.diagnostics 访问 ETG.1510 基础诊断数据(返回 dict,包含 cyclic_lost_framesacyclic_lost_framescyclic_fpsacyclic_fpsstate)。

配合事件使用

建议通过 事件 驱动异常处理,而非自行轮询。 直接读取诊断属性适用于 UI 显示等场景。

从站诊断

单个从站的状态诊断、链路质量请参考 从站诊断

功能概览

功能说明
通信与性能统计帧计数、丢包、抖动、PDO 丢帧、网口状态、拓扑
DC 同步同步窗口阈值、dc_sync_lost 事件
冗余状态冗余激活、故障点检测
诊断控制启停数据采集、重置统计

通信与性能统计

类别属性类型读写需启用说明
性能统计rt_cntint只读每秒帧数(Hz),5秒平均
error_cntint只读每秒错误数,5秒平均
packet_loss_ratefloat只读丢包率(0.0~1.0)— TX vs RX 5 秒滑窗, pipeline 在途不算丢
late_frame_ratefloat只读过慢帧率(0.0~1.0)— idx 出 8 帧窗 stale, 不计入丢包
cycle_time_spanint只读实际周期时间(微秒)
avg_jitter_usfloat只读最近5秒平均抖动(微秒),总线抖动
mailbox_latency_usfloat只读邮箱收发延迟 - 最大(微秒),见下方说明
mailbox_latency_avg_usfloat只读邮箱收发延迟 - 平均(微秒),见下方说明
丢帧统计pdo.total_lostint只读累计丢帧数(所有组合计)
pdo.consecutive_lostint只读当前连续丢帧数
pdo.get_frame_loss_stats(group)PDOFrameLossStats只读指定组的丢帧统计
从站健康worst_slave_indexint只读异常率最高的从站索引
worst_link_qualityint只读最差从站的通信健康度(%)
WKC 镜像wkc_actual_mirrorint只读总线实测工作计数器镜像(actual WKC)— 反映此刻哪些从站真的在响应。R1:如实值永不篡改。薄读零帧,实时
wkc_expected_mirrorint只读期望工作计数器镜像(expected WKC)— 配置期/进 OP 确定的固定真值。R1:拓扑固定它就固定,永不动态下调迁就劣化总线。薄读零帧,实时
wc_deficitint只读WKC 缺额 = expected − actual(>0 表示有映射从站此刻没贡献 WKC,疑似掉站/热插拔)。从站恢复后自动归 0。薄读零帧,实时
mapped_slave_countint只读已映射(参与 WKC 计算)的从站数。薄读零帧,实时
wc_state_seqint只读内核 WcState 缓存序列号(每次刷新自增,64bit)。判断 per-slave 镜像是否在两次读取间更新过。薄读零帧,实时
冗余端口primary_wkcint只读主网口工作计数器(冗余模式下独立跟踪)
secondary_wkcint只读副网口工作计数器(冗余模式下独立跟踪)
primary_port_okbool只读主端口是否正常
secondary_port_okbool只读副端口是否正常
primary_port_errorsint只读主端口最近5秒错误数
secondary_port_errorsint只读副端口最近5秒错误数
拓扑与定时topology_descriptionstr只读拓扑模式描述
timing_modestr只读定时模式("硬件定时器" / "RT就绪" / "降级" / "RT错误")。WDK 驱动必须就绪

PDOFrameLossStats:

@dataclass
class PDOFrameLossStats:
total_lost: int # 累计丢帧数
consecutive_lost: int # 当前连续丢帧数
max_consecutive_lost: int # 历史最大连续丢帧数
邮箱收发延迟(mailbox_latency_us / mailbox_latency_avg_us,2026-05-21)

max_jitter_us("应用抖动")测的是异步通知线程被 PDO 每周期唤醒的间隔抖动。PDO 过程数据已是纯内核 RT 收发 + 内核共享内存指针零拷贝,应用层直接读写过程映像,不存在"用户态每周期通知"环节,该指标失去意义,已移除。

同一底层字段(layout 不变)改为承载邮箱事务收发往返时延——邮箱(CoE/SoE/FoE/EoE/AoE/VoE 等非周期、请求-响应)从请求发出到响应返回的耗时(最近 1 秒结算)。mailbox_latency_us 为窗口最大值,mailbox_latency_avg_us 为平均值。

实时性抖动监控请改用 bus_max_jitter_us / bus_avg_jitter_us(RT 内核线程帧发送时序偏差),即旧 max_jitter_us 在 WDK 模式下的实际返回值。

WKC 镜像(掉站可观测)

wkc_actual_mirror / wkc_expected_mirror / wc_deficit 是内核 per-slave wc_state 诊断缓存的主站级聚合镜像,均为 master.diagnostics_info 上的属性,全部薄读零帧(不缓存、不需刷新——内核每周期、WKC 异常时立即维护,DLL 保证读到即此刻真实总线现实)。配合每个从站的 slave.wc_contributed 可定位到具体掉了哪个从站。

R1 可观测性约定(不可违反):

  • wkc_expected_mirror = 配置期 / 进 OP 时确定的固定真值,拓扑固定它就固定,概念上不可变,永不动态下调迁就劣化总线
  • wkc_actual_mirror = 总线实测值,如实反映此刻哪些从站真的在响应,永不篡改
  • wc_deficit = wkc_expected_mirror − wkc_actual_mirrorwc_deficit > 0 不是 master 故障,而是有映射从站此刻没贡献 WKC(疑似掉站 / 热插拔恢复中);从站恢复后 wc_deficit 自动归 0,无需任何重置调用。

示例:

diag = master.diagnostics_info

# 主站级 WKC 镜像(薄读零帧,实时)
print(f"WKC: {diag.wkc_actual_mirror} / {diag.wkc_expected_mirror} "
f"(映射从站 {diag.mapped_slave_count} 个)")
if diag.wc_deficit > 0:
print(f"⚠️ WKC 缺额 {diag.wc_deficit} — 有从站没在响应,逐个从站定位:")
from darra_ethercat import WcContribution
for slave in master.slaves:
if slave.wc_contributed == WcContribution.NOT_CONTRIBUTED:
print(f" 从站 {slave.slave_num} 未贡献 WKC")

# 序列号判断镜像是否更新过(无需轮询帧)
seq = diag.wc_state_seq
per-slave wc_state

单个从站对 WKC 的贡献状态请用 slave.wc_contributedWcContribution 枚举)。wc_deficit > 0 时遍历从站即可定位掉站点。

诊断快照

get_snapshot()

def get_snapshot(self) -> DiagnosticsSnapshot

获取诊断数据的一致快照。返回当前时刻所有诊断指标的冻结快照(不可变 dataclass),适用于日志记录、UI 刷新、跨线程传递等场景。

DiagnosticsSnapshot 数据类:

@dataclass(frozen=True)
class DiagnosticsSnapshot:
frequency: int # 每秒帧数 (Hz)
error_count: int # 每秒错误数
packet_loss_rate: float # 丢包率 (0.0-1.0) — TX vs RX 5s 滑窗
late_frame_rate: float # 过慢帧率 (0.0-1.0) — idx 出 8 帧窗 stale, 不计丢
avg_jitter_us: float # 平均抖动 (µs)
mailbox_latency_us: float # 邮箱收发延迟 - 最大 (µs)
mailbox_latency_avg_us: float # 邮箱收发延迟 - 平均 (µs)
cycle_time_us: int # 实际周期时间 (µs)
wkc_actual: int # 当前 WKC
wkc_expected: int # 期望 WKC
bus_cycle_hz: int # 总线频率 (Hz)
bus_max_jitter_us: float # 总线最大抖动 (µs)
bus_avg_jitter_us: float # 总线平均抖动 (µs)
bus_roundtrip_us: float # 总线往返延迟 (µs)
bus_load_percent: float # 通讯负载 (%) — RTT/周期×100
smi_count: int # SMI 次数
smi_peak_us: float # SMI 峰值 (µs)
primary_port_ok: bool # 主端口正常
secondary_port_ok: bool # 副端口正常
redundancy_active: bool # 冗余激活

示例:

diag = master.diagnostics_info
diag.enabled = True

snap = diag.get_snapshot()
print(f"频率: {snap.frequency} Hz, 丢包率: {snap.packet_loss_rate:.2%}")
print(f"WKC: {snap.wkc_actual}/{snap.wkc_expected}")
print(f"总线抖动: 平均 {snap.bus_avg_jitter_us:.2f} us, 最大 {snap.bus_max_jitter_us:.2f} us")
print(f"邮箱延迟: 平均 {snap.mailbox_latency_avg_us:.2f} us, 最大 {snap.mailbox_latency_us:.2f} us")

print(f"冗余: {'激活' if snap.redundancy_active else '未激活'}")

计算公式:

  • rt_cnt = 采样周期帧数 / 窗口秒数 — 滑动窗口平均帧频
  • error_cnt = 采样周期错误数 / 窗口秒数 — 滑动窗口平均错误率
  • packet_loss_rate = (TX - RX - pipeline) / TX — 5 秒滑窗, pipeline 在途不算丢
  • late_frame_rate = LateDrop / TX — idx 出 8 帧窗 stale, 不计入丢包

enabled = True

示例:

diag = master.diagnostics_info
diag.enabled = True # 启用诊断数据采集

print(f"帧频: {diag.rt_cnt} Hz")
print(f"丢包率: {diag.packet_loss_rate:.2%}")
print(f"错误数: {diag.error_cnt}")
print(f"周期时间: {diag.cycle_time_span} us")
print(f"总线抖动: 平均 {diag.bus_avg_jitter_us:.2f} us, 最大 {diag.bus_max_jitter_us:.2f} us")
print(f"邮箱延迟: 平均 {diag.mailbox_latency_avg_us:.2f} us, 最大 {diag.mailbox_latency_us:.2f} us")

# PDO 丢帧
pdo = diag.pdo
print(f"PDO 丢帧: 累计={pdo.total_lost}, 连续={pdo.consecutive_lost}")

# 按组查询
stats0 = pdo.get_frame_loss_stats(0)
stats1 = pdo.get_frame_loss_stats(1)
print(f"组0丢帧: {stats0.total_lost}, 组1丢帧: {stats1.total_lost}")

# 从站异常
print(f"最差从站: #{diag.worst_slave_index} ({diag.worst_link_quality}%)")

# 网口状态
print(f"主端口: {'正常' if diag.primary_port_ok else '异常'}")
print(f"副端口: {'正常' if diag.secondary_port_ok else '未连接'}")

# 拓扑信息
print(f"拓扑: {diag.topology_description}")
print(f"定时: {diag.timing_mode}")
从站通信诊断

每个从站的 ESC 端口错误通过 slave.diagnostics.read_port_errors() 获取。详见 从站诊断 - 通信诊断

热插拔重建

hot_swap_rebuild()

def hot_swap_rebuild(self) -> int

运行中一次性热插拔重建拓扑:任意状态下调用一次,在线重扫总线 + 重建拓扑图 + 重配 PDO + 恢复到运行态(OP)。适用于现场拔掉 / 换上一个模块后,不停总线地把网络重新带回运行态。它不是 build() 那种"释放全网从头扫"的重建——是面向热插拔的在线增量恢复。

返回值(错误码语义):

返回码名称含义与处理
0OK成功:从站全部回到 OP
-20BUSY另一操作正在进行——稍后重试
-21RESCAN_0重扫到 0 个从站——已停在 PreOp 安全态,检查物理链路 / 供电
-22SDO_ABORT重配被从站 SDO abort 拒绝——检查 PDO 映射 / Startup 参数
-23NO_OP部分从站未达 OP——已回滚停安全态,查各从站 AL Status Code
-24TIMEOUT重建超时
-25IDX_FULL帧索引池饱和
-1(通用失败)调用层异常,不抛异常直接返通用失败码

行为 / 约束(R1 如实可观测):

  • OP 状态调用会短暂中断 PDO(内部降 PreOp 重配再升回 OP),调用方需容忍这一周期的过程数据空窗。
  • 失败时返错误码不掩盖——绝不靠篡改内部计数让结果"看起来对"。
  • 即使最终 0 个从站在 OP,PDO 循环也照常运行(周期不掉、帧不停),expected_wkc 不动态下调迁就劣化总线。

示例:

rc = master.diagnostics_info.hot_swap_rebuild()
if rc == 0:
print("热插拔重建成功,从站已回到 OP")
elif rc == -20:
print("有操作进行中,稍后重试")
elif rc == -21:
print("重扫到 0 站,已停 PreOp 安全态——检查链路/供电")
elif rc == -23:
print("部分从站未达 OP,已回滚安全态——逐个查 AL Status Code:")
for slave in master.slaves:
if slave.state != EcState.OP:
print(f" 从站 {slave.slave_num}: AL=0x{slave.error_code.value:04X}")
else:
print(f"热插拔重建失败: {rc}")

DC 同步

自动监控(ETG.1500 5.13.3),每秒检查各从站时间偏差。超出 sync_window_threshold 阈值时触发 dc_sync_lost 事件。

单个从站

单个从站的同步状态请使用 slave.diagnostics.dc.is_in_syncslave.diagnostics.dc.sync_time_difference

sync_window_threshold

@property
def sync_window_threshold(self) -> int

@sync_window_threshold.setter
def sync_window_threshold(self, value: int) -> None

同步窗口阈值(纳秒),默认 1000ns。超出阈值触发 dc_sync_lost 事件。

冗余状态

类别属性类型读写说明
冗余状态redundancy_activebool只读冗余是否激活
break_pointOptional[dict]只读当前故障点(count, slave_index, port, type)

RingMode 枚举:

class RingMode(IntEnum):
INACTIVE = 0 # 未激活
DUAL = 1 # 双向冗余
DEGRADED = 2 # 降级模式

break_point 返回字典,包含以下字段:

{
'count': int, # 故障点数量
'slave_index': int, # 故障从站索引 (1-based)
'port': int, # 故障端口号 (0-3)
'type': int, # 故障类型:0=断线,1=CRC 故障
}

break_point 统一检测两类物理故障:

类型fault_type检测方式典型场景
断线0DL Status 端口物理链路丢失拔线、线缆断裂
CRC 故障1端口级 RxError + InvalidFrame 持续增长接触不良、线缆老化、连接器氧化

故障线缆段定位: 当相邻从站的对向端口(如从站 N 的 P1 和从站 N+1 的 P0)同时报故障,说明连接线缆有问题。仅单侧报故障则定位到该端口连接器。

从站冗余诊断

单个从站的冗余状态请参考 从站诊断 - 冗余诊断

示例:

diag = master.diagnostics_info

print(f"冗余模式: {master.ring_mode.name}")
if master.ring_mode == RingMode.DEGRADED:
print("警告: secondary链路不可用")

if diag.redundancy_active:
print("冗余已激活")

bp = diag.break_point
if bp is not None:
print(f"故障: 从站{bp['slave_index']} P{bp['port']} "
f"{'断线' if bp['type'] == 0 else 'CRC故障'}")
if bp['type'] == 1:
print("建议检查线缆/连接器")

诊断控制

Reset (诊断计数器)

@property
def enabled(self) -> bool # 诊断数据采集开关 (默认关闭)
def reset(self) -> None # 一次性重置所有诊断统计

enabled 控制诊断数据采集开关 (默认关闭)。启用后周期性采样, 记录标记为"需启用"的统计数据; 其他功能 (PDO 丢帧、从站异常、网口状态、拓扑等) 始终活跃。reset() 一次性清零所有诊断计数器。

AL 错误分类

对 AL Status Code 进行分类,帮助快速判断错误性质和处理策略。

classify_al_error()

def classify_al_error(al_status_code: int) -> ALErrorCategory

对 AL Status Code(从站返回的错误码)进行分类,帮助快速判断错误性质和处理策略。

参数:

  • al_status_code (int) — AL Status Code,从 slave.error_code 或状态转换失败时获取

返回值:

  • ALErrorCategory — 错误分类枚举

ALErrorCategory 枚举:

class ALErrorCategory(IntEnum):
NONE = 0 # 无错误
TRANSIENT = 1 # 瞬态错误,可重试状态转换,通常自动恢复
CONFIGURATION = 2 # 配置错误,检查 PDO 映射、SM 配置、Startup 参数等
HARDWARE = 3 # 硬件错误,检查从站硬件、线缆、电源
UNKNOWN = 4 # 未知错误,查阅 ETG.1000 或从站手册

示例:

from darra_ethercat import classify_al_error, ALErrorCategory

slave = master[1]
if slave.error_code != 0:
category = classify_al_error(slave.error_code)
print(f"从站 {slave.slave_num} 错误 0x{slave.error_code:04X}: {category.name}")

if category == ALErrorCategory.TRANSIENT:
print("瞬态错误,尝试重新切换状态...")
elif category == ALErrorCategory.CONFIGURATION:
print("配置错误,请检查 PDO/SM 配置")
elif category == ALErrorCategory.HARDWARE:
print("硬件错误,请检查从站设备")
常见 AL Status Code
  • 0x001E 无效输入映射 — CONFIGURATION
  • 0x001D 无效输出映射 — CONFIGURATION
  • 0x0011 无效邮箱配置 — CONFIGURATION
  • 0x002D 同步错误 — TRANSIENT
  • 0x0032 DC 同步超时 — TRANSIENT
  • 0x0050 EEPROM 错误 — HARDWARE

从站错误计数器

read_slave_error_counters()

def read_slave_error_counters(self, slave_index: int) -> dict

读取指定从站的错误计数器。

参数:

  • slave_index (int) — 从站索引(1-based)

返回值:

  • dict — 错误计数器字典,包含以下字段:
{
'slave_index': int, # 从站编号
'rx_error': List[int], # 各端口 RX 错误计数 [Port0-3]
'invalid_frame': List[int], # 各端口无效帧计数 [Port0-3]
'lost_link': List[int], # 各端口链路丢失计数 [Port0-3]
}

示例:

counters = master.diagnostics_info.read_slave_error_counters(1)
if any(v > 0 for v in counters['rx_error']):
print(f"从站 1 错误计数:")
print(f" RX 错误: {counters['rx_error']}")
print(f" 无效帧: {counters['invalid_frame']}")
print(f" 链路丢失: {counters['lost_link']}")

诊断消息

read_diagnostic_messages()

from darra_ethercat.slave.coe import read_diagnostic_messages

def read_diagnostic_messages(coe: CoE) -> List[DiagnosticMessage]

通过 CoE 读取从站对象 0x10F3(诊断历史对象,ETG.1020)中的诊断消息。返回从站记录的诊断事件列表,包含时间戳、错误码和描述信息。

参数:

  • coe (CoE) — 从站的 CoE 接口(slave.coe

返回值:

  • List[DiagnosticMessage] — 诊断消息列表,无消息时返回空列表

示例:

from darra_ethercat.slave.coe import read_diagnostic_messages

for slave in master.slaves:
if slave.coe is None:
continue
messages = read_diagnostic_messages(slave.coe)
for msg in messages:
print(f"[从站 {slave.slave_num}] 代码=0x{msg.diag_code:08X}, {msg}")
备注

并非所有从站都支持 0x10F3 诊断历史对象。不支持的从站调用时返回空列表。此方法通过 SDO 读取,不建议在实时路径中高频调用。


按问题分块的诊断字段指南

本章按现场症状分块, 帮你定位问题在哪。每个字段都有真实数据来源, 没有永远 0 的虚假数据。

现场问题 → 字段映射

字段所属对象: D = SlaveDiagnosticsData (经 get_slave_diagnostics()); M = master.diagnostics_info (MasterDiagnosticsInfo)。

现场症状看哪几个字段
是不是丢包?lost_frames (D) + packet_loss_rate (M)
是偶发还是持续?consecutive_wkc_errors (D, ≥5 = 持续)
哪个从站链路质量差?link_quality_percent[i] (D) / worst_slave_index (M)
网线接触不稳?lost_link_count[i] (D)
哪根网线断了?break_point (M)
主副口错误?primary_port_errors / secondary_port_errors (D 或 M)
冗余真在工作吗?redundancy_active (M)
总线负载高吗?bus_load_percent (M)

🟦 1. 帧错误 (SlaveDiagnosticsData)

字段检测什么
lost_frames帧根本没回来
timeout_framesPDO 周期未收到响应
working_counter_errorsWKC 不匹配
consecutive_wkc_errors连续 WKC 错误 (区分偶发/持续)
frame_errors派生总和
checksum_errors物理层 CRC (从 ESC 派生)

🟦 2. 端口错误计数

字段检测什么
primary_port_errors / secondary_port_errors主/副网卡 5 秒滑动窗口错误 (SlaveDiagnosticsDataMasterDiagnosticsInfo 均有此属性)

🟦 3. per-slave 物理层 (ESC 寄存器)

SlaveDiagnosticsData 由模块级函数 get_slave_diagnostics(dll, master_index, slave_count) 获取 (位于 darra_ethercat.master.other), EtherCATMaster 上没有 get_diagnostics() 方法。该数据类提供下列逐从站列表字段 (索引 = 从站编号 1-based):

字段含义
link_quality_percent[i]从站 i 链路质量百分比 (0-100)
port_error_count[i]从站 i 端口错误合计
lost_link_count[i]从站 i 链路丢失计数

🟦 4. SlaveDiagnosticsData 全局统计字段

SlaveDiagnosticsData 的标量字段 (5 秒滑动窗口):

字段检测什么
frame_errors全局帧错误合计
lost_frames全局丢帧合计
checksum_errors物理层 CRC 错误
timeout_frames超时帧数
working_counter_errorsWKC 错误合计
consecutive_wkc_errors连续 WKC 错误 (≥5 = 持续)
primary_port_errors / secondary_port_errors主/副网口 5 秒窗口错误

🟦 5. RT 性能 + 内核稳定性

下列字段是 master.diagnostics_info (MasterDiagnosticsInfo) 的属性, 不在 SlaveDiagnosticsData 上:

字段检测什么
bus_cycle_hzRT 内核帧频率
bus_max_jitter_us / bus_avg_jitter_us总线抖动
bus_roundtrip_usRTT
bus_load_percent通讯负载 (>70% 降频)

完整诊断流程示例

from darra_ethercat.master.other import get_slave_diagnostics

# MasterDiagnosticsInfo 对象 (帧率/抖动/丢包/故障点/总线负载)
diag = master.diagnostics_info
diag.enabled = True

# SlaveDiagnosticsData 快照 (逐从站链路 + WKC 统计) — 经模块级函数获取
slave_diag = get_slave_diagnostics(master._dll, master.master_index,
master.slave_count)

# 1. 持续 vs 偶发 (consecutive_wkc_errors 在 SlaveDiagnosticsData 上)
if slave_diag and slave_diag.consecutive_wkc_errors >= 5:
print(f"⚠️ 持续 WKC 错误 ({slave_diag.consecutive_wkc_errors})")
elif slave_diag and slave_diag.consecutive_wkc_errors > 0:
print("偶发 WKC 错误")

# 2. 丢包率 (MasterDiagnosticsInfo)
if diag.packet_loss_rate > 0.01:
print(f"⚠️ 丢包率 {diag.packet_loss_rate:.2%} 严重")

# 3. 故障定位 (break_point 返回 dict)
bp = diag.break_point
if bp is not None:
print(f"故障点: 从站 {bp['slave_index']} P{bp['port']}")

# 4. 链路质量最差从站 (link_quality_percent 列表, 索引 1-based)
if slave_diag and slave_diag.link_quality_percent:
lq = slave_diag.link_quality_percent
worst = min(range(1, len(lq)), key=lambda i: lq[i], default=None)
if worst is not None and lq[worst] < 100:
print(f"链路质量最差从站: #{worst} ({lq[worst]}%)")

# 5. 网线接触诊断 (lost_link_count 列表)
for i, n in enumerate(slave_diag.lost_link_count if slave_diag else []):
if i > 0 and n > 0:
print(f"从站 {i} 链路丢失: {n}")

# 6. 总线负载 (MasterDiagnosticsInfo)
if diag.bus_load_percent > 70:
print(f"⚠️ 总线负载 {diag.bus_load_percent:.1f}%, 必须降频")