Java 特有语法糖
com.darra.ethercat.sugar 包提供一组只增不改的辅助类, 把主线 SDK 与
Java 8/11 标准库 (Stream / Optional / CompletableFuture / BlockingQueue / record-style)
桥接起来, 让常见模式写起来更短、错误处理更轻、与现代 Java 生态融合更顺.
- 主线 API 完全不变 — 你已有的代码继续 work, 不需要迁移
- 所有 sugar 类都在
com.darra.ethercat.sugar.*包下, 静态工厂方法风格 - 对
null/ 已释放的 master / 越界索引一律返回空 Stream/Optional/Map, 不抛异常 - 与 GC、原生回调、PDO 周期解耦, 不引入新的内存泄漏风险
| Sugar 类 | 主要语法糖 | 何时用 |
|---|---|---|
EtherCATMaster (主线) | try-with-resources, Iterable<Slave> | 主流程必用, 自动释放 |
MasterStreams | Stream<Slave>, Optional<Slave>, Map<Integer, Slave> | 过滤/索引/分组 |
SlaveListExt | 数组、排序、计数、布尔聚合 | 集合代数 |
SlaveIdentity | record 等价物, 不可变身份 | 跨线程传递、缓存、HashMap key |
MasterAsync | CompletableFuture<Boolean> | 异步组合状态切换/扫描/DC |
DiagnosticStream | EMCY/0x10F3 诊断的 Stream API | 历史查询/筛选 |
DiagnosticQueue | EMCY 回调 → BlockingQueue | 解耦回调线程, 慢消费 |
SlaveStateQueue | 状态变化 → BlockingQueue + Stream | 等待整网到 OP / 监控状态降级 |
MasterScope | 扩展版 try-with-resources | 退出前回 INIT, 关联子资源 |
MasterBuilder | 流式构造 | 一句话声明 master |
try-with-resources (RAII)
标准 API
主线 EtherCATMaster 已实现 AutoCloseable, 退出 try 块自动 close():
try (EtherCATMaster master = EtherCATMaster.create()) {
master.setNetwork("\\Device\\NPF_{xxx}");
master.setState(EcState.OP);
master.start();
// ... 业务代码 ...
} // 自动调 close() — stop PDO + dispose 原生资源 + 释放回调引用
close() 是幂等的, double close 安全 — 内部用 disposed 标志位保护.
语法糖: MasterScope
主线 close() 不会:
- 退出前把硬件切回 INIT (避免下次启动撞 PDI watchdog)
- 释放业务挂载的子资源 (例如
DiagnosticQueue)
MasterScope 包装一层 try-with-resources, 多做这两件事:
import com.darra.ethercat.sugar.MasterScope;
import com.darra.ethercat.sugar.DiagnosticQueue;
EtherCATMaster master = EtherCATMaster.create();
master.setNetwork(nic);
try (MasterScope scope = MasterScope.of(master).resetToInitOnClose()) {
// 子资源也挂在 scope 上, 退出时 LIFO 释放
DiagnosticQueue dq = scope.attach(DiagnosticQueue.attach(master, 1024));
master.setState(EcState.OP);
master.start();
// ... 业务代码 ...
}
// 退出顺序: 先 close DiagnosticQueue → 切 master 回 INIT → stop PDO → master.close()
何时用
| 场景 | 推荐 |
|---|---|
| 简单 demo / 单元测试 | 直接 try (EtherCATMaster m = ...) |
| 长时间运行的 Service | MasterScope + resetToInitOnClose() |
| 业务挂多个事件队列 | MasterScope.attach() 统一管理 |
| 多个 master 实例 | 嵌套 try-with-resources, JVM 保证逆序释放 |
Stream API
标准 API
EtherCATMaster 实现 Iterable<Slave> + Slaves() 返回 List<Slave>,
直接 .stream() 即可:
master.Slaves().stream()
.filter(s -> s.State() == EcState.OP)
.forEach(s -> System.out.println(s.Name()));
语法糖: MasterStreams
MasterStreams 在 master.Slaves().stream() 基础上加了防御和常用过滤器:
import com.darra.ethercat.sugar.MasterStreams;
// 1. 空安全: master 还没 build / 已 dispose / Slaves 为 null 都返回空 Stream
MasterStreams.slaves(master).forEach(s -> ...);
// 2. 按状态过滤
MasterStreams.slavesInState(master, EcState.PRE_OP)
.forEach(s -> System.err.println("Stuck: " + s.Name()));
// 3. 按厂商
MasterStreams.slavesByVendor(master, 0x00000002) // Beckhoff
.map(Slave::Name)
.forEach(System.out::println);
// 4. 按名称包含过滤
MasterStreams.slavesByName(master, "EL2008")
.forEach(s -> s.PDO().writeOutputByte(0, (byte) 0xFF));
// 5. 已掉线
MasterStreams.lostSlaves(master).count();
// 6. 支持 DC 的从站
MasterStreams.dcCapableSlaves(master).count();
集合式视图
// 1. 1-based 序号 → Slave 字典
Map<Integer, Slave> idx = MasterStreams.slavesAsMap(master);
Slave s3 = idx.get(3); // 没第 3 号返回 null
// 2. 支持游标 (firstKey/lastKey/floor/ceiling)
NavigableMap<Integer, Slave> nav = MasterStreams.slavesAsNavigableMap(master);
Map.Entry<Integer, Slave> next = nav.ceilingEntry(currentSlave + 1);
// 3. 按 Group 字节分组
Map<Byte, List<Slave>> byGrp = MasterStreams.slavesByGroup(master);
List<Slave> group0 = byGrp.getOrDefault((byte) 0, Collections.emptyList());
// 4. 按状态分组 (诊断仪表盘核心)
Map<EcState, List<Slave>> byState = MasterStreams.slavesByState(master);
Map<EcState, Long> stateCounts = MasterStreams.stateCounts(master);
// stateCounts → {OP=8, SAFE_OP=0, PRE_OP=1, INIT=0}
// 5. 按 VendorId 分组
Map<Integer, List<Slave>> byVendor = MasterStreams.slavesByVendorId(master);
SlaveListExt — 集合代数
import com.darra.ethercat.sugar.SlaveListExt;
// 拍快照成数组 (跨线程安全)
Slave[] all = SlaveListExt.toArray(master);
// 排序
List<Slave> ordered = SlaveListExt.sortedByConfigAddr(master);
List<Slave> mixed = SlaveListExt.sortedByVendorThenIndex(master);
// 计数
long opCount = SlaveListExt.countByState(master, EcState.OP);
long lost = SlaveListExt.lostCount(master);
long dc = SlaveListExt.dcCapableCount(master);
// 全量布尔
if (SlaveListExt.allInState(master, EcState.OP)) {
enableMotion();
}
if (SlaveListExt.anyInState(master, EcState.PRE_OP)) {
raiseAlarm("发现 PreOp 从站");
}
何时用
| 场景 | 选择 |
|---|---|
| 简单 forEach | 直接 for (Slave s : master) 或 master.Slaves().stream() |
| 多重过滤 + 收集 | MasterStreams.slavesXxx().filter(...).collect(...) |
| O(1) 随机访问 | MasterStreams.slavesAsMap() |
| 跨线程冻结 | SlaveListExt.toArray() 或 SlaveListExt.identities() |
| 集合代数 (all/any/count) | SlaveListExt.* |
Optional<Slave>
标准 API
主线 master.getSlave(idx) 越界时抛 IndexOutOfBoundsException:
try {
Slave s = master.getSlave(99); // 可能抛
} catch (IndexOutOfBoundsException e) { ... }
语法糖: slaveOpt
import com.darra.ethercat.sugar.MasterStreams;
MasterStreams.slaveOpt(master, 99).ifPresent(s -> {
System.out.println(s.Name());
});
// 带默认值
String name = MasterStreams.slaveOpt(master, 99)
.map(Slave::Name)
.orElse("(missing)");
// 链式查询
Optional<Slave> el2008 = MasterStreams.findByName(master, "EL2008");
Optional<Slave> ek1100 = MasterStreams.findByConfigAddr(master, (short) 0x1001);
Optional<Slave> match = MasterStreams.findByIdentity(master, 0x00000002, 0x07D03052);
何时用
| 场景 | 推荐 |
|---|---|
| 索引一定有效 | master.getSlave(idx) (主线), 简单直接 |
| 索引可能越界 / 还没 build | slaveOpt |
| 按名/地址/身份查 | findByName / findByConfigAddr / findByIdentity |
| 不存在时给默认值 | orElse / orElseGet |
Iterable for-each
主线 EtherCATMaster 已实现 Iterable<Slave>, for-each 直接可用:
for (Slave s : master) {
System.out.println(s.Name());
}
iterator() 返回的是 Collections.unmodifiableList(slaves).iterator(),
迭代期间禁止结构性修改, 但读所有属性 (Name/State/...) 安全.
不需要额外 sugar — Java 标准 Iterable 足够好.
CompletableFuture 异步
标准 API
主线已有:
CompletableFuture<Boolean> f = master.setStateAsync(EcState.OP);
boolean ok = f.join();
但只覆盖一个方法, 其他长耗时操作 (setNetwork / scan / configureDC / applyStartupParameters) 都需要业务方自己包.
语法糖: MasterAsync
import com.darra.ethercat.sugar.MasterAsync;
// 1. 串行链
MasterAsync.setNetworkAsync(master, primary, secondary)
.thenCompose(ok -> MasterAsync.setStateAsync(master, EcState.OP, Duration.ofSeconds(30)))
.thenAccept(ok -> System.out.println("OP=" + ok));
// 2. 一键 boot
boolean ok = MasterAsync.bootToOpAsync(master, primary, secondary).join();
// 3. 并行扫描多张网卡
CompletableFuture<Integer> a = MasterAsync.scanAsync(nicA);
CompletableFuture<Integer> b = MasterAsync.scanAsync(nicB);
CompletableFuture<Integer> c = MasterAsync.scanAsync(nicC);
CompletableFuture.allOf(a, b, c)
.thenRun(() -> {
System.out.printf("A=%d B=%d C=%d%n", a.join(), b.join(), c.join());
});
// 4. 异步 DC 配置
MasterAsync.configureDCAllAsync(master, 1_000_000, 0)
.whenComplete((rc, ex) -> {
if (ex != null) log.error("DC failed", ex);
else log.info("DC rc={}", rc);
});
// 5. 暴露内部线程池, 后续回调走同一池
master.setStateAsync(EcState.SAFE_OP)
.thenApplyAsync(ok -> save(ok), MasterAsync.executor());
内部线程池
MasterAsync 维护一个 4 线程的守护池 (darra-ethercat-async-N),
比 ForkJoinPool.commonPool() 安全:
- 线程都是 daemon, 不会阻止 JVM 退出
- 线程数固定, 不会暴涨吃 CPU
- 与应用层
commonPool任务隔离, 测试更确定
何时用
| 场景 | 推荐 |
|---|---|
| 单次状态切换, 等结果 | 主线 master.setState() (同步, 简单) |
| 多步串行 | MasterAsync.bootToOpAsync 或 thenCompose |
| 并行多 NIC 扫描 | MasterAsync.scanAsync + allOf |
| Spring/Reactor 集成 | setStateAsync().toCompletionStage() |
| 严格超时 | MasterAsync.setStateAsync(m, s, Duration.ofSeconds(30)) |
SDK 内部 SetStateSequence 自带 5s × 3 次重试, 总耗时最长 ~22s. 业务超时
建议 ≥ 25s, 否则会"看起来失败"但 SDK 仍在重试.
BlockingQueue (诊断流)
标准 API
主线用回调订阅事件:
master.Events().addEmergencyEventListener((mi, si, ec, er, b1, w1, w2) -> {
saveToDb(ec); // 慢操作 — 在 PDO 线程上执行, 阻塞 EtherCAT 周期!
});
回调直接在原生 PDO 线程触发, 业务方做 IO/数据库/锁会拖慢整个网络.
语法糖: DiagnosticQueue
把回调一次注册, 转成线程安全的 BlockingQueue<EmergencyEvent>:
import com.darra.ethercat.sugar.DiagnosticQueue;
try (DiagnosticQueue dq = DiagnosticQueue.attach(master, 1024)) {
// 后台消费线程
Thread consumer = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
DiagnosticQueue.EmergencyEvent ev = dq.take();
saveToDb(ev); // 慢操作 — 不再阻塞 PDO
} catch (InterruptedException e) {
break;
}
}
});
consumer.setDaemon(true);
consumer.start();
master.start();
Thread.sleep(60_000);
} // close 自动反注册回调 + 清空队列
容量满策略
DiagnosticQueue 提供两种模式:
| 工厂方法 | 满时行为 | 用途 |
|---|---|---|
attach(master, cap) | 丢弃最早事件, 不阻塞 | 推荐 — 保护 PDO 线程 |
attachStrict(master, cap) | 阻塞回调线程 | 必须保证不丢事件时, 慎用 |
SlaveStateQueue — 状态变化版
import com.darra.ethercat.sugar.SlaveStateQueue;
// 等待整网到 OP, 30s 超时
try (SlaveStateQueue q = SlaveStateQueue.attach(master, 256)) {
master.setState(EcState.OP);
long deadline = System.currentTimeMillis() + 30_000;
while (System.currentTimeMillis() < deadline) {
SlaveStateQueue.StateChangeEvent ev = q.poll(500, TimeUnit.MILLISECONDS);
if (ev != null && ev.newState == EcState.OP) {
if (SlaveListExt.allInState(master, EcState.OP)) break;
}
}
}
// 或 Stream 风格 (在独立线程跑, 阻塞至 close)
SlaveStateQueue q = SlaveStateQueue.attach(master, 256);
new Thread(() -> q.streamUntilClosed()
.filter(ev -> ev.newState == EcState.PRE_OP)
.forEach(ev -> log.warn("从站 {} 跌回 PreOp", ev.slaveIndex))
).start();
何时用
| 场景 | 推荐 |
|---|---|
| 单条事件实时记一笔日志 | 直接 addXxxListener, 短小快 |
| 慢消费 (写数据库, 推 MQ) | DiagnosticQueue |
| 状态机判断 (等 OP) | SlaveStateQueue.poll |
| 流式订阅 (forEach 风格) | SlaveStateQueue.streamUntilClosed |
record (Java 14+) 等价物
标准 API
Java 14+ 的 record 关键字:
public record SlaveIdentity(
int slaveNum, long vendorId, long productId, long revisionId,
long serialNumber, String name, String deviceName) {}
但本 SDK 编译目标 Java 11, 无法用 record.
语法糖: SlaveIdentity
com.darra.ethercat.sugar.SlaveIdentity 是手写的 record 等价物 — 不可变 +
accessor + equals/hashCode/toString:
import com.darra.ethercat.sugar.SlaveIdentity;
// 1. 从活的 Slave 抓快照
SlaveIdentity id = SlaveIdentity.snapshot(master.getSlave(3));
// 2. record 风格的 accessor (字段名直接当方法名, 无 get 前缀)
System.out.println("V=0x" + Long.toHexString(id.vendorId()));
System.out.println("P=0x" + Long.toHexString(id.productId()));
// 3. 跨线程传递 — 不锁定 master 生命周期
executor.submit(() -> report(id));
// 4. 作为 HashMap key
Map<SlaveIdentity, Long> lastSeen = new HashMap<>();
lastSeen.put(id, System.currentTimeMillis());
// 5. 配置校验 (热插拔)
SlaveIdentity expected = new SlaveIdentity(
3, 0x00000002L, 0x044C2C52L, 0x00100000L, 0L, "EL2008", "");
if (!actual.matchesIdentity(expected)) {
raiseAlarm("从站身份不符");
}
// 6. 严格匹配 (含 Serial)
if (!actual.equalsExact(expected)) {
rebindCalibration();
}
全网快照
// 一次性抓取所有从站的不可变身份
List<SlaveIdentity> all = SlaveListExt.identities(master);
// 持久化到磁盘 (跨进程比对)
saveToJson("snapshot.json", all);
升级到 Java 14+
如果将来项目升 Java 14+, 替换 SlaveIdentity 为真正的 record:
public record SlaveIdentity(int slaveNum, long vendorId, long productId,
long revisionId, long serialNumber, String name, String deviceName) {
public static SlaveIdentity snapshot(Slave s) { ... }
public boolean matchesIdentity(SlaveIdentity expected) { ... }
}
调用代码完全不变 — accessor 名一样, 构造参数顺序一样.
何时用
| 场景 | 推荐 |
|---|---|
| 实时读最新状态 | 直接用 Slave 对象 (有 master 上下文) |
| 跨线程传递 | SlaveIdentity.snapshot(slave) |
| 缓存 / HashMap key | SlaveIdentity (主线 Slave 没有合适 equals) |
| 序列化 (JSON / DB) | SlaveIdentity 字段固定, 易序列化 |
| 配置校验 | SlaveIdentity.matchesIdentity() |
Functional listener (lambda)
主线 MasterEvents 内的所有 listener 接口都已标 @FunctionalInterface,
可以直接传 lambda — 不需要额外 sugar:
master.Events().addEmergencyEventListener(
(mi, si, errorCode, errorReg, b1, w1, w2) ->
System.out.printf("Slave %d Emcy 0x%04X%n", si, errorCode & 0xFFFF));
master.Events().addSlaveStateChangedListener(
(mi, si, oldState, newState) ->
log.info("Slave {} {}->{}", si, oldState, newState));
master.Events().addPDOFrameLossListener(
(mi, group, consec, total) ->
alarm.raise("Group %d 连丢 %d, 累计 %d", group, consec, total));
注意 EmergencyEventListener 一个回调有 7 个参数, lambda 写法虽然短, 但参数
含义需要查文档. 推荐结合 DiagnosticQueue 把回调转成事件对象 (字段命名清晰).
Builder 模式
标准 API
主线已有链式构造:
EtherCATMaster m = EtherCATMaster.create();
m.SetNetwork(primary)
.setENI("config.deni")
.enableAutoStartup()
.build();
但需要先 create(), 再链式 — 顺序不那么自然, 而且 create() 已分配主站编号
(失败时需要 close() 才能释放).
语法糖: MasterBuilder
import com.darra.ethercat.sugar.MasterBuilder;
EtherCATMaster m = MasterBuilder.create()
.network(primary)
.redundant(secondary)
.esiFolder("C:/Esi")
.eni("C:/cfg/ec.deni")
.autoStartup()
.targetState(EcState.OP)
.startPdoAfterOp()
.masterNumber((short) 0)
.build();
失败回滚
任意步骤失败抛 IllegalStateException, 内部自动 close() 半初始化的
master, 不留遗物:
try {
EtherCATMaster m = MasterBuilder.create()
.network("\\Device\\NPF_BOGUS") // 不存在的网卡
.build();
} catch (IllegalStateException e) {
// master 已被 close, 无需手动清理
log.error("Build failed: {}", e.getMessage());
}
何时用
| 场景 | 推荐 |
|---|---|
| 程序启动一次性配置 | MasterBuilder (声明式, 错失自动回滚) |
| 中途调整 (动态加 ESI) | 主线链式 (master.setEsiFile().build()) |
| 单元测试 mock master | 直接 EtherCATMaster.create() |
完整示例: 一键 OP + 诊断流
把所有 sugar 串起来:
import com.darra.ethercat.master.EtherCATMaster;
import com.darra.ethercat.data.EcState;
import com.darra.ethercat.slave.Slave;
import com.darra.ethercat.sugar.*;
public class Demo {
public static void main(String[] args) throws Exception {
// 1. Builder 一句话构造
EtherCATMaster master = MasterBuilder.create()
.network("\\Device\\NPF_{xxx}")
.esiFolder("C:/Esi")
.eni("C:/cfg/ec.deni")
.autoStartup()
.build();
// 2. MasterScope + 子资源
try (MasterScope scope = MasterScope.of(master).resetToInitOnClose()) {
DiagnosticQueue dq = scope.attach(DiagnosticQueue.attach(master, 1024));
SlaveStateQueue sq = scope.attach(SlaveStateQueue.attach(master, 256));
// 3. 后台消费线程: 把 EMCY 落库
Thread emcyWorker = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
DiagnosticQueue.EmergencyEvent ev = dq.take();
saveToDb(ev);
} catch (InterruptedException e) { break; }
}
}, "emcy-worker");
emcyWorker.setDaemon(true);
emcyWorker.start();
// 4. 异步 boot: 网卡 -> OP -> PDO
boolean ok = MasterAsync.bootToOpAsync(master,
"\\Device\\NPF_{xxx}", null).get(30, TimeUnit.SECONDS);
if (!ok) throw new IllegalStateException("Boot failed");
// 5. 用 Stream API 报告网络状态
Map<EcState, Long> counts = MasterStreams.stateCounts(master);
System.out.println("State distribution: " + counts);
// 6. 用 SlaveIdentity 抓全网快照, 跨线程持久化
List<SlaveIdentity> ids = SlaveListExt.identities(master);
CompletableFuture.runAsync(() -> saveSnapshot(ids));
// 7. 主循环 — 等待整网都到 OP, 否则 30s 超时
long deadline = System.currentTimeMillis() + 30_000;
while (System.currentTimeMillis() < deadline) {
if (SlaveListExt.allInState(master, EcState.OP)) {
System.out.println("全网 OP, 可以启动运动控制");
break;
}
Thread.sleep(100);
}
// 8. 业务运行 60s
Thread.sleep(60_000);
} // 自动: close DiagnosticQueue/SlaveStateQueue → master 切 INIT → stop PDO → master.close()
}
}
设计决策记录
为什么不直接修改主线 SDK?
- 向后兼容: 已有用户代码
master.Slaves().stream()必须继续 work - 可选依赖: sugar 类是"可选的", 不用就不引用, 不增加 jar 体积
- 测试隔离: sugar 单元测试不会污染主线 SDK 的 native 测试
为什么没有 Reactive (Mono / Flux) 包装?
MasterAsync.scanAsync(...).toCompletionStage() 已经能直接转给 Reactor:
import reactor.core.publisher.Mono;
Mono<Integer> count = Mono.fromCompletionStage(
MasterAsync.scanAsync(nic).toCompletionStage());
把 Reactor 依赖塞进 SDK 包会强制 5MB+ 体积膨胀, 不划算.
为什么 SlaveIdentity 用手写 final class 而非 record?
项目编译目标 Java 11, record 是 Java 14+. 手写等价物保证现在能用, 升级时 平滑替换 (调用代码不变).
为什么 DiagnosticQueue 默认丢弃最早?
EtherCAT PDO 周期可低至 250 µs. 一次回调阻塞 1 ms 就能让网络丢帧. 默认行为
牺牲少量历史事件保证实时网络稳定 — 这是工业现场的硬约束. 业务方如果
需要"绝对不丢", 用 attachStrict 并自己接受性能影响.
索引
| 类 | 文件 |
|---|---|
MasterStreams | com/darra/ethercat/sugar/MasterStreams.java |
SlaveListExt | com/darra/ethercat/sugar/SlaveListExt.java |
SlaveIdentity | com/darra/ethercat/sugar/SlaveIdentity.java |
MasterAsync | com/darra/ethercat/sugar/MasterAsync.java |
DiagnosticStream | com/darra/ethercat/sugar/DiagnosticStream.java |
DiagnosticQueue | com/darra/ethercat/sugar/DiagnosticQueue.java |
SlaveStateQueue | com/darra/ethercat/sugar/SlaveStateQueue.java |
MasterScope | com/darra/ethercat/sugar/MasterScope.java |
MasterBuilder | com/darra/ethercat/sugar/MasterBuilder.java |
所有 sugar 类都是静态工厂 + 不可变值对象 — 不持有原生资源, 多次调用 开销可忽略. 放心用!