跳到主要内容

Java 特有语法糖

com.darra.ethercat.sugar 包提供一组只增不改的辅助类, 把主线 SDK 与 Java 8/11 标准库 (Stream / Optional / CompletableFuture / BlockingQueue / record-style) 桥接起来, 让常见模式写起来更短、错误处理更轻、与现代 Java 生态融合更顺.

设计原则
  • 主线 API 完全不变 — 你已有的代码继续 work, 不需要迁移
  • 所有 sugar 类都在 com.darra.ethercat.sugar.* 包下, 静态工厂方法风格
  • null / 已释放的 master / 越界索引一律返回空 Stream/Optional/Map, 不抛异常
  • 与 GC、原生回调、PDO 周期解耦, 不引入新的内存泄漏风险
Sugar 类主要语法糖何时用
EtherCATMaster (主线)try-with-resources, Iterable<Slave>主流程必用, 自动释放
MasterStreamsStream<Slave>, Optional<Slave>, Map<Integer, Slave>过滤/索引/分组
SlaveListExt数组、排序、计数、布尔聚合集合代数
SlaveIdentityrecord 等价物, 不可变身份跨线程传递、缓存、HashMap key
MasterAsyncCompletableFuture<Boolean>异步组合状态切换/扫描/DC
DiagnosticStreamEMCY/0x10F3 诊断的 Stream API历史查询/筛选
DiagnosticQueueEMCY 回调 → BlockingQueue解耦回调线程, 慢消费
SlaveStateQueue状态变化 → BlockingQueue + Stream等待整网到 OP / 监控状态降级
MasterScope扩展版 try-with-resources退出前回 INIT, 关联子资源
MasterBuilder流式构造一句话声明 master

try-with-resources (RAII)

标准 API

主线 EtherCATMaster 已实现 AutoCloseable, 退出 try 块自动 close():

try (EtherCATMaster master = EtherCATMaster.create()) {
master.setNetwork("\\Device\\NPF_{xxx}");
master.setState(EcState.OP);
master.start();
// ... 业务代码 ...
} // 自动调 close() — stop PDO + dispose 原生资源 + 释放回调引用

close()幂等的, double close 安全 — 内部用 disposed 标志位保护.

语法糖: MasterScope

主线 close() 不会:

  • 退出前把硬件切回 INIT (避免下次启动撞 PDI watchdog)
  • 释放业务挂载的子资源 (例如 DiagnosticQueue)

MasterScope 包装一层 try-with-resources, 多做这两件事:

import com.darra.ethercat.sugar.MasterScope;
import com.darra.ethercat.sugar.DiagnosticQueue;

EtherCATMaster master = EtherCATMaster.create();
master.setNetwork(nic);

try (MasterScope scope = MasterScope.of(master).resetToInitOnClose()) {
// 子资源也挂在 scope 上, 退出时 LIFO 释放
DiagnosticQueue dq = scope.attach(DiagnosticQueue.attach(master, 1024));

master.setState(EcState.OP);
master.start();
// ... 业务代码 ...
}
// 退出顺序: 先 close DiagnosticQueue → 切 master 回 INIT → stop PDO → master.close()

何时用

场景推荐
简单 demo / 单元测试直接 try (EtherCATMaster m = ...)
长时间运行的 ServiceMasterScope + resetToInitOnClose()
业务挂多个事件队列MasterScope.attach() 统一管理
多个 master 实例嵌套 try-with-resources, JVM 保证逆序释放

Stream API

标准 API

EtherCATMaster 实现 Iterable<Slave> + Slaves() 返回 List<Slave>, 直接 .stream() 即可:

master.Slaves().stream()
.filter(s -> s.State() == EcState.OP)
.forEach(s -> System.out.println(s.Name()));

语法糖: MasterStreams

MasterStreamsmaster.Slaves().stream() 基础上加了防御常用过滤器:

import com.darra.ethercat.sugar.MasterStreams;

// 1. 空安全: master 还没 build / 已 dispose / Slaves 为 null 都返回空 Stream
MasterStreams.slaves(master).forEach(s -> ...);

// 2. 按状态过滤
MasterStreams.slavesInState(master, EcState.PRE_OP)
.forEach(s -> System.err.println("Stuck: " + s.Name()));

// 3. 按厂商
MasterStreams.slavesByVendor(master, 0x00000002) // Beckhoff
.map(Slave::Name)
.forEach(System.out::println);

// 4. 按名称包含过滤
MasterStreams.slavesByName(master, "EL2008")
.forEach(s -> s.PDO().writeOutputByte(0, (byte) 0xFF));

// 5. 已掉线
MasterStreams.lostSlaves(master).count();

// 6. 支持 DC 的从站
MasterStreams.dcCapableSlaves(master).count();

集合式视图

// 1. 1-based 序号 → Slave 字典
Map<Integer, Slave> idx = MasterStreams.slavesAsMap(master);
Slave s3 = idx.get(3); // 没第 3 号返回 null

// 2. 支持游标 (firstKey/lastKey/floor/ceiling)
NavigableMap<Integer, Slave> nav = MasterStreams.slavesAsNavigableMap(master);
Map.Entry<Integer, Slave> next = nav.ceilingEntry(currentSlave + 1);

// 3. 按 Group 字节分组
Map<Byte, List<Slave>> byGrp = MasterStreams.slavesByGroup(master);
List<Slave> group0 = byGrp.getOrDefault((byte) 0, Collections.emptyList());

// 4. 按状态分组 (诊断仪表盘核心)
Map<EcState, List<Slave>> byState = MasterStreams.slavesByState(master);
Map<EcState, Long> stateCounts = MasterStreams.stateCounts(master);
// stateCounts → {OP=8, SAFE_OP=0, PRE_OP=1, INIT=0}

// 5. 按 VendorId 分组
Map<Integer, List<Slave>> byVendor = MasterStreams.slavesByVendorId(master);

SlaveListExt — 集合代数

import com.darra.ethercat.sugar.SlaveListExt;

// 拍快照成数组 (跨线程安全)
Slave[] all = SlaveListExt.toArray(master);

// 排序
List<Slave> ordered = SlaveListExt.sortedByConfigAddr(master);
List<Slave> mixed = SlaveListExt.sortedByVendorThenIndex(master);

// 计数
long opCount = SlaveListExt.countByState(master, EcState.OP);
long lost = SlaveListExt.lostCount(master);
long dc = SlaveListExt.dcCapableCount(master);

// 全量布尔
if (SlaveListExt.allInState(master, EcState.OP)) {
enableMotion();
}
if (SlaveListExt.anyInState(master, EcState.PRE_OP)) {
raiseAlarm("发现 PreOp 从站");
}

何时用

场景选择
简单 forEach直接 for (Slave s : master)master.Slaves().stream()
多重过滤 + 收集MasterStreams.slavesXxx().filter(...).collect(...)
O(1) 随机访问MasterStreams.slavesAsMap()
跨线程冻结SlaveListExt.toArray()SlaveListExt.identities()
集合代数 (all/any/count)SlaveListExt.*

Optional<Slave>

标准 API

主线 master.getSlave(idx) 越界时抛 IndexOutOfBoundsException:

try {
Slave s = master.getSlave(99); // 可能抛
} catch (IndexOutOfBoundsException e) { ... }

语法糖: slaveOpt

import com.darra.ethercat.sugar.MasterStreams;

MasterStreams.slaveOpt(master, 99).ifPresent(s -> {
System.out.println(s.Name());
});

// 带默认值
String name = MasterStreams.slaveOpt(master, 99)
.map(Slave::Name)
.orElse("(missing)");

// 链式查询
Optional<Slave> el2008 = MasterStreams.findByName(master, "EL2008");
Optional<Slave> ek1100 = MasterStreams.findByConfigAddr(master, (short) 0x1001);
Optional<Slave> match = MasterStreams.findByIdentity(master, 0x00000002, 0x07D03052);

何时用

场景推荐
索引一定有效master.getSlave(idx) (主线), 简单直接
索引可能越界 / 还没 buildslaveOpt
按名/地址/身份查findByName / findByConfigAddr / findByIdentity
不存在时给默认值orElse / orElseGet

Iterable for-each

主线 EtherCATMaster 已实现 Iterable<Slave>, for-each 直接可用:

for (Slave s : master) {
System.out.println(s.Name());
}

iterator() 返回的是 Collections.unmodifiableList(slaves).iterator(), 迭代期间禁止结构性修改, 但读所有属性 (Name/State/...) 安全.

不需要额外 sugar — Java 标准 Iterable 足够好.


CompletableFuture 异步

标准 API

主线已有:

CompletableFuture<Boolean> f = master.setStateAsync(EcState.OP);
boolean ok = f.join();

但只覆盖一个方法, 其他长耗时操作 (setNetwork / scan / configureDC / applyStartupParameters) 都需要业务方自己包.

语法糖: MasterAsync

import com.darra.ethercat.sugar.MasterAsync;

// 1. 串行链
MasterAsync.setNetworkAsync(master, primary, secondary)
.thenCompose(ok -> MasterAsync.setStateAsync(master, EcState.OP, Duration.ofSeconds(30)))
.thenAccept(ok -> System.out.println("OP=" + ok));

// 2. 一键 boot
boolean ok = MasterAsync.bootToOpAsync(master, primary, secondary).join();

// 3. 并行扫描多张网卡
CompletableFuture<Integer> a = MasterAsync.scanAsync(nicA);
CompletableFuture<Integer> b = MasterAsync.scanAsync(nicB);
CompletableFuture<Integer> c = MasterAsync.scanAsync(nicC);
CompletableFuture.allOf(a, b, c)
.thenRun(() -> {
System.out.printf("A=%d B=%d C=%d%n", a.join(), b.join(), c.join());
});

// 4. 异步 DC 配置
MasterAsync.configureDCAllAsync(master, 1_000_000, 0)
.whenComplete((rc, ex) -> {
if (ex != null) log.error("DC failed", ex);
else log.info("DC rc={}", rc);
});

// 5. 暴露内部线程池, 后续回调走同一池
master.setStateAsync(EcState.SAFE_OP)
.thenApplyAsync(ok -> save(ok), MasterAsync.executor());

内部线程池

MasterAsync 维护一个 4 线程的守护池 (darra-ethercat-async-N), 比 ForkJoinPool.commonPool() 安全:

  • 线程都是 daemon, 不会阻止 JVM 退出
  • 线程数固定, 不会暴涨吃 CPU
  • 与应用层 commonPool 任务隔离, 测试更确定

何时用

场景推荐
单次状态切换, 等结果主线 master.setState() (同步, 简单)
多步串行MasterAsync.bootToOpAsyncthenCompose
并行多 NIC 扫描MasterAsync.scanAsync + allOf
Spring/Reactor 集成setStateAsync().toCompletionStage()
严格超时MasterAsync.setStateAsync(m, s, Duration.ofSeconds(30))
注意

SDK 内部 SetStateSequence 自带 5s × 3 次重试, 总耗时最长 ~22s. 业务超时 建议 ≥ 25s, 否则会"看起来失败"但 SDK 仍在重试.


BlockingQueue (诊断流)

标准 API

主线用回调订阅事件:

master.Events().addEmergencyEventListener((mi, si, ec, er, b1, w1, w2) -> {
saveToDb(ec); // 慢操作 — 在 PDO 线程上执行, 阻塞 EtherCAT 周期!
});

回调直接在原生 PDO 线程触发, 业务方做 IO/数据库/锁会拖慢整个网络.

语法糖: DiagnosticQueue

把回调一次注册, 转成线程安全的 BlockingQueue<EmergencyEvent>:

import com.darra.ethercat.sugar.DiagnosticQueue;

try (DiagnosticQueue dq = DiagnosticQueue.attach(master, 1024)) {
// 后台消费线程
Thread consumer = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
DiagnosticQueue.EmergencyEvent ev = dq.take();
saveToDb(ev); // 慢操作 — 不再阻塞 PDO
} catch (InterruptedException e) {
break;
}
}
});
consumer.setDaemon(true);
consumer.start();

master.start();
Thread.sleep(60_000);
} // close 自动反注册回调 + 清空队列

容量满策略

DiagnosticQueue 提供两种模式:

工厂方法满时行为用途
attach(master, cap)丢弃最早事件, 不阻塞推荐 — 保护 PDO 线程
attachStrict(master, cap)阻塞回调线程必须保证不丢事件时, 慎用

SlaveStateQueue — 状态变化版

import com.darra.ethercat.sugar.SlaveStateQueue;

// 等待整网到 OP, 30s 超时
try (SlaveStateQueue q = SlaveStateQueue.attach(master, 256)) {
master.setState(EcState.OP);
long deadline = System.currentTimeMillis() + 30_000;
while (System.currentTimeMillis() < deadline) {
SlaveStateQueue.StateChangeEvent ev = q.poll(500, TimeUnit.MILLISECONDS);
if (ev != null && ev.newState == EcState.OP) {
if (SlaveListExt.allInState(master, EcState.OP)) break;
}
}
}

// 或 Stream 风格 (在独立线程跑, 阻塞至 close)
SlaveStateQueue q = SlaveStateQueue.attach(master, 256);
new Thread(() -> q.streamUntilClosed()
.filter(ev -> ev.newState == EcState.PRE_OP)
.forEach(ev -> log.warn("从站 {} 跌回 PreOp", ev.slaveIndex))
).start();

何时用

场景推荐
单条事件实时记一笔日志直接 addXxxListener, 短小快
慢消费 (写数据库, 推 MQ)DiagnosticQueue
状态机判断 (等 OP)SlaveStateQueue.poll
流式订阅 (forEach 风格)SlaveStateQueue.streamUntilClosed

record (Java 14+) 等价物

标准 API

Java 14+ 的 record 关键字:

public record SlaveIdentity(
int slaveNum, long vendorId, long productId, long revisionId,
long serialNumber, String name, String deviceName) {}

但本 SDK 编译目标 Java 11, 无法用 record.

语法糖: SlaveIdentity

com.darra.ethercat.sugar.SlaveIdentity 是手写的 record 等价物 — 不可变 + accessor + equals/hashCode/toString:

import com.darra.ethercat.sugar.SlaveIdentity;

// 1. 从活的 Slave 抓快照
SlaveIdentity id = SlaveIdentity.snapshot(master.getSlave(3));

// 2. record 风格的 accessor (字段名直接当方法名, 无 get 前缀)
System.out.println("V=0x" + Long.toHexString(id.vendorId()));
System.out.println("P=0x" + Long.toHexString(id.productId()));

// 3. 跨线程传递 — 不锁定 master 生命周期
executor.submit(() -> report(id));

// 4. 作为 HashMap key
Map<SlaveIdentity, Long> lastSeen = new HashMap<>();
lastSeen.put(id, System.currentTimeMillis());

// 5. 配置校验 (热插拔)
SlaveIdentity expected = new SlaveIdentity(
3, 0x00000002L, 0x044C2C52L, 0x00100000L, 0L, "EL2008", "");
if (!actual.matchesIdentity(expected)) {
raiseAlarm("从站身份不符");
}

// 6. 严格匹配 (含 Serial)
if (!actual.equalsExact(expected)) {
rebindCalibration();
}

全网快照

// 一次性抓取所有从站的不可变身份
List<SlaveIdentity> all = SlaveListExt.identities(master);

// 持久化到磁盘 (跨进程比对)
saveToJson("snapshot.json", all);

升级到 Java 14+

如果将来项目升 Java 14+, 替换 SlaveIdentity 为真正的 record:

public record SlaveIdentity(int slaveNum, long vendorId, long productId,
long revisionId, long serialNumber, String name, String deviceName) {

public static SlaveIdentity snapshot(Slave s) { ... }
public boolean matchesIdentity(SlaveIdentity expected) { ... }
}

调用代码完全不变 — accessor 名一样, 构造参数顺序一样.

何时用

场景推荐
实时读最新状态直接用 Slave 对象 (有 master 上下文)
跨线程传递SlaveIdentity.snapshot(slave)
缓存 / HashMap keySlaveIdentity (主线 Slave 没有合适 equals)
序列化 (JSON / DB)SlaveIdentity 字段固定, 易序列化
配置校验SlaveIdentity.matchesIdentity()

Functional listener (lambda)

主线 MasterEvents 内的所有 listener 接口都已标 @FunctionalInterface, 可以直接传 lambda — 不需要额外 sugar:

master.Events().addEmergencyEventListener(
(mi, si, errorCode, errorReg, b1, w1, w2) ->
System.out.printf("Slave %d Emcy 0x%04X%n", si, errorCode & 0xFFFF));

master.Events().addSlaveStateChangedListener(
(mi, si, oldState, newState) ->
log.info("Slave {} {}->{}", si, oldState, newState));

master.Events().addPDOFrameLossListener(
(mi, group, consec, total) ->
alarm.raise("Group %d 连丢 %d, 累计 %d", group, consec, total));

注意 EmergencyEventListener 一个回调有 7 个参数, lambda 写法虽然短, 但参数 含义需要查文档. 推荐结合 DiagnosticQueue 把回调转成事件对象 (字段命名清晰).


Builder 模式

标准 API

主线已有链式构造:

EtherCATMaster m = EtherCATMaster.create();
m.SetNetwork(primary)
.setENI("config.deni")
.enableAutoStartup()
.build();

但需要先 create(), 再链式 — 顺序不那么自然, 而且 create() 已分配主站编号 (失败时需要 close() 才能释放).

语法糖: MasterBuilder

import com.darra.ethercat.sugar.MasterBuilder;

EtherCATMaster m = MasterBuilder.create()
.network(primary)
.redundant(secondary)
.esiFolder("C:/Esi")
.eni("C:/cfg/ec.deni")
.autoStartup()
.targetState(EcState.OP)
.startPdoAfterOp()
.masterNumber((short) 0)
.build();

失败回滚

任意步骤失败抛 IllegalStateException, 内部自动 close() 半初始化的 master, 不留遗物:

try {
EtherCATMaster m = MasterBuilder.create()
.network("\\Device\\NPF_BOGUS") // 不存在的网卡
.build();
} catch (IllegalStateException e) {
// master 已被 close, 无需手动清理
log.error("Build failed: {}", e.getMessage());
}

何时用

场景推荐
程序启动一次性配置MasterBuilder (声明式, 错失自动回滚)
中途调整 (动态加 ESI)主线链式 (master.setEsiFile().build())
单元测试 mock master直接 EtherCATMaster.create()

完整示例: 一键 OP + 诊断流

把所有 sugar 串起来:

import com.darra.ethercat.master.EtherCATMaster;
import com.darra.ethercat.data.EcState;
import com.darra.ethercat.slave.Slave;
import com.darra.ethercat.sugar.*;

public class Demo {
public static void main(String[] args) throws Exception {
// 1. Builder 一句话构造
EtherCATMaster master = MasterBuilder.create()
.network("\\Device\\NPF_{xxx}")
.esiFolder("C:/Esi")
.eni("C:/cfg/ec.deni")
.autoStartup()
.build();

// 2. MasterScope + 子资源
try (MasterScope scope = MasterScope.of(master).resetToInitOnClose()) {
DiagnosticQueue dq = scope.attach(DiagnosticQueue.attach(master, 1024));
SlaveStateQueue sq = scope.attach(SlaveStateQueue.attach(master, 256));

// 3. 后台消费线程: 把 EMCY 落库
Thread emcyWorker = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
DiagnosticQueue.EmergencyEvent ev = dq.take();
saveToDb(ev);
} catch (InterruptedException e) { break; }
}
}, "emcy-worker");
emcyWorker.setDaemon(true);
emcyWorker.start();

// 4. 异步 boot: 网卡 -> OP -> PDO
boolean ok = MasterAsync.bootToOpAsync(master,
"\\Device\\NPF_{xxx}", null).get(30, TimeUnit.SECONDS);
if (!ok) throw new IllegalStateException("Boot failed");

// 5. 用 Stream API 报告网络状态
Map<EcState, Long> counts = MasterStreams.stateCounts(master);
System.out.println("State distribution: " + counts);

// 6. 用 SlaveIdentity 抓全网快照, 跨线程持久化
List<SlaveIdentity> ids = SlaveListExt.identities(master);
CompletableFuture.runAsync(() -> saveSnapshot(ids));

// 7. 主循环 — 等待整网都到 OP, 否则 30s 超时
long deadline = System.currentTimeMillis() + 30_000;
while (System.currentTimeMillis() < deadline) {
if (SlaveListExt.allInState(master, EcState.OP)) {
System.out.println("全网 OP, 可以启动运动控制");
break;
}
Thread.sleep(100);
}

// 8. 业务运行 60s
Thread.sleep(60_000);

} // 自动: close DiagnosticQueue/SlaveStateQueue → master 切 INIT → stop PDO → master.close()
}
}

设计决策记录

为什么不直接修改主线 SDK?

  1. 向后兼容: 已有用户代码 master.Slaves().stream() 必须继续 work
  2. 可选依赖: sugar 类是"可选的", 不用就不引用, 不增加 jar 体积
  3. 测试隔离: sugar 单元测试不会污染主线 SDK 的 native 测试

为什么没有 Reactive (Mono / Flux) 包装?

MasterAsync.scanAsync(...).toCompletionStage() 已经能直接转给 Reactor:

import reactor.core.publisher.Mono;

Mono<Integer> count = Mono.fromCompletionStage(
MasterAsync.scanAsync(nic).toCompletionStage());

把 Reactor 依赖塞进 SDK 包会强制 5MB+ 体积膨胀, 不划算.

为什么 SlaveIdentity 用手写 final class 而非 record?

项目编译目标 Java 11, record 是 Java 14+. 手写等价物保证现在能用, 升级时 平滑替换 (调用代码不变).

为什么 DiagnosticQueue 默认丢弃最早?

EtherCAT PDO 周期可低至 250 µs. 一次回调阻塞 1 ms 就能让网络丢帧. 默认行为 牺牲少量历史事件保证实时网络稳定 — 这是工业现场的硬约束. 业务方如果 需要"绝对不丢", 用 attachStrict 并自己接受性能影响.


索引

文件
MasterStreamscom/darra/ethercat/sugar/MasterStreams.java
SlaveListExtcom/darra/ethercat/sugar/SlaveListExt.java
SlaveIdentitycom/darra/ethercat/sugar/SlaveIdentity.java
MasterAsynccom/darra/ethercat/sugar/MasterAsync.java
DiagnosticStreamcom/darra/ethercat/sugar/DiagnosticStream.java
DiagnosticQueuecom/darra/ethercat/sugar/DiagnosticQueue.java
SlaveStateQueuecom/darra/ethercat/sugar/SlaveStateQueue.java
MasterScopecom/darra/ethercat/sugar/MasterScope.java
MasterBuildercom/darra/ethercat/sugar/MasterBuilder.java
提示

所有 sugar 类都是静态工厂 + 不可变值对象 — 不持有原生资源, 多次调用 开销可忽略. 放心用!