From 0bf8cf45e8bd5d21ce6c8871009e1565582828f3 Mon Sep 17 00:00:00 2001 From: zcxlsm Date: Sat, 6 Sep 2025 13:16:40 +0800 Subject: [PATCH] =?UTF-8?q?feat(property):=20=E7=94=A8=20Server-Sent=20Eve?= =?UTF-8?q?nts=20(SSE)=20=E6=9B=BF=E4=BB=A3=20WebSocket=20=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=E6=B6=88=E6=81=AF=E6=8E=A8=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../resource/api/RemoteMessageService.java | 9 +++ .../api/RemoteMessageServiceStub.java | 16 +++++ .../common/sse/core/SseEmitterManager.java | 28 +++++++++ .../common/sse/utils/SseMessageUtils.java | 14 +++++ ruoyi-modules/Property/pom.xml | 5 ++ .../bo/residentBo/ResidentPersonBo.java | 2 +- .../TbMeterInfoServiceImpl.java | 63 ++++++++----------- .../dubbo/RemoteMessageServiceImpl.java | 12 ++++ 8 files changed, 111 insertions(+), 38 deletions(-) diff --git a/ruoyi-api/ruoyi-api-resource/src/main/java/org/dromara/resource/api/RemoteMessageService.java b/ruoyi-api/ruoyi-api-resource/src/main/java/org/dromara/resource/api/RemoteMessageService.java index d674c81f..7a8533e9 100644 --- a/ruoyi-api/ruoyi-api-resource/src/main/java/org/dromara/resource/api/RemoteMessageService.java +++ b/ruoyi-api/ruoyi-api-resource/src/main/java/org/dromara/resource/api/RemoteMessageService.java @@ -23,4 +23,13 @@ public interface RemoteMessageService { * @param message 消息内容 */ void publishAll(String message); + + /** + * 向指定的用户的指定会话发送消息 + * + * @param userId 要发送消息的用户id + * @param token 用户的会话令牌 + * @param message 要发送的消息内容 + */ + void sendMessage(Long userId, String token, String message); } diff --git a/ruoyi-api/ruoyi-api-resource/src/main/java/org/dromara/resource/api/RemoteMessageServiceStub.java b/ruoyi-api/ruoyi-api-resource/src/main/java/org/dromara/resource/api/RemoteMessageServiceStub.java index 0ee2791e..a980af95 100644 --- a/ruoyi-api/ruoyi-api-resource/src/main/java/org/dromara/resource/api/RemoteMessageServiceStub.java +++ b/ruoyi-api/ruoyi-api-resource/src/main/java/org/dromara/resource/api/RemoteMessageServiceStub.java @@ -44,4 +44,20 @@ public class RemoteMessageServiceStub implements RemoteMessageService { log.warn("推送功能未开启或服务未找到"); } } + + /** + * 向指定的用户的指定会话发送消息 + * + * @param userId 要发送消息的用户id + * @param token 用户的会话令牌 + * @param message 要发送的消息内容 + */ + @Override + public void sendMessage(Long userId, String token, String message) { + try { + remoteMessageService.sendMessage(userId, token, message); + } catch (Exception e) { + log.warn("推送功能未开启或服务未找到"); + } + } } diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java index cb944285..fec8fc6f 100644 --- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java @@ -132,6 +132,34 @@ public class SseEmitterManager { } } + + /** + * 向指定的用户的指定会话发送消息 + * + * @param userId 要发送消息的用户id + * @param token 用户的会话令牌 + * @param message 要发送的消息内容 + */ + public void sendMessage(Long userId, String token, String message) { + Map emitters = USER_TOKEN_EMITTERS.get(userId); + if (MapUtil.isNotEmpty(emitters)) { + for (Map.Entry entry : emitters.entrySet()){ + if (entry.getKey().equals(token)) { + try { + entry.getValue().send(SseEmitter.event() + .name("message") + .data(message)); + } catch (Exception e) { + SseEmitter remove = emitters.remove(entry.getKey()); + if (remove != null) { + remove.complete(); + } + } + } + } + } + } + /** * 本机全用户会话发送消息 * diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java index ce3aad47..84867370 100644 --- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java @@ -38,6 +38,20 @@ public class SseMessageUtils { MANAGER.sendMessage(userId, message); } + /** + * 向指定的用户的指定会话发送消息 + * + * @param userId 要发送消息的用户id + * @param token 用户的会话令牌 + * @param message 要发送的消息内容 + */ + public static void sendMessage(Long userId, String token, String message) { + if (!isEnable()) { + return; + } + MANAGER.sendMessage(userId, token, message); + } + /** * 本机全用户会话发送消息 * diff --git a/ruoyi-modules/Property/pom.xml b/ruoyi-modules/Property/pom.xml index 4135f974..d3af6ed2 100644 --- a/ruoyi-modules/Property/pom.xml +++ b/ruoyi-modules/Property/pom.xml @@ -125,6 +125,11 @@ ruoyi-common-websocket + + org.dromara + ruoyi-common-sse + + org.apache.rocketmq rocketmq-spring-boot-starter diff --git a/ruoyi-modules/Property/src/main/java/org/dromara/property/domain/bo/residentBo/ResidentPersonBo.java b/ruoyi-modules/Property/src/main/java/org/dromara/property/domain/bo/residentBo/ResidentPersonBo.java index 15eca760..d8eed187 100644 --- a/ruoyi-modules/Property/src/main/java/org/dromara/property/domain/bo/residentBo/ResidentPersonBo.java +++ b/ruoyi-modules/Property/src/main/java/org/dromara/property/domain/bo/residentBo/ResidentPersonBo.java @@ -114,7 +114,7 @@ public class ResidentPersonBo extends BaseEntity { /** * 状态 */ - private Integer state = 1; + private Integer state; /** * 备注 diff --git a/ruoyi-modules/Property/src/main/java/org/dromara/property/service/impl/smartDevicesImpl/TbMeterInfoServiceImpl.java b/ruoyi-modules/Property/src/main/java/org/dromara/property/service/impl/smartDevicesImpl/TbMeterInfoServiceImpl.java index 3c7ea77b..2a83acc9 100644 --- a/ruoyi-modules/Property/src/main/java/org/dromara/property/service/impl/smartDevicesImpl/TbMeterInfoServiceImpl.java +++ b/ruoyi-modules/Property/src/main/java/org/dromara/property/service/impl/smartDevicesImpl/TbMeterInfoServiceImpl.java @@ -1,8 +1,10 @@ package org.dromara.property.service.impl.smartDevicesImpl; +import cn.dev33.satoken.stp.StpUtil; import cn.hutool.core.date.DateUtil; import cn.hutool.core.lang.Assert; import com.alibaba.fastjson.JSONObject; +import org.apache.dubbo.config.annotation.DubboReference; import org.dromara.common.core.domain.TreeNode; import org.dromara.common.core.utils.MapstructUtils; import org.dromara.common.core.utils.StringUtils; @@ -15,8 +17,6 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.dromara.common.satoken.utils.LoginHelper; -import org.dromara.common.websocket.dto.WebSocketMessageDto; -import org.dromara.common.websocket.utils.WebSocketUtils; import org.dromara.property.domain.bo.TbFloorBo; import org.dromara.property.domain.vo.TbBuildingVo; import org.dromara.property.domain.vo.TbCommunityVo; @@ -27,6 +27,7 @@ import org.dromara.property.service.ITbCommunityService; import org.dromara.property.service.ITbFloorService; import org.dromara.property.tasks.HeartbeatTasks; import org.dromara.property.utils.MeterRecordUtil; +import org.dromara.resource.api.RemoteMessageService; import org.dromara.system.api.model.LoginUser; import org.springframework.stereotype.Service; import org.dromara.property.domain.bo.smartDevicesBo.TbMeterInfoBo; @@ -61,6 +62,9 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService { private final MeterRecordUtil meterRecordUtil; private final HeartbeatTasks heartbeatTasks; + @DubboReference + private RemoteMessageService remoteMessageService; + /** * 查询水电气 * @@ -85,8 +89,7 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService { Page result = baseMapper.selectVoPage(pageQuery.build(), lqw); // 创建楼层ID到楼层名称的映射,避免重复查询 List floorList = floorService.queryList(new TbFloorBo()); - Map floorMap = floorList.stream() - .collect(Collectors.toMap(TbFloorVo::getId, TbFloorVo::getFloorName, (key1, key2) -> key1)); + Map floorMap = floorList.stream().collect(Collectors.toMap(TbFloorVo::getId, TbFloorVo::getFloorName, (key1, key2) -> key1)); // 为每个灯控信息设置楼层名称 result.getRecords().forEach(record -> { @@ -270,13 +273,13 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService { // 获取当前登录用户 LoginUser user = LoginHelper.getLoginUser(); + String tokenValue = StpUtil.getTokenValue(); if (user == null) { heartbeatTasks.stopTask("Meter_Status_Reading"); return; } // 初始化WebSocket消息 - WebSocketMessageDto webSocketMessage = new WebSocketMessageDto(); JSONObject jsonObject = new JSONObject(); jsonObject.put("type", "meter"); @@ -291,23 +294,17 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService { heartbeatTasks.stopTask("Meter_Status_Reading"); jsonObject.put("readingTime", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss")); jsonObject.put("data", new ArrayList<>()); - webSocketMessage.setMessage(jsonObject.toString()); - webSocketMessage.setSessionKeys(List.of(user.getUserId())); - WebSocketUtils.publishMessage(webSocketMessage); + remoteMessageService.sendMessage(user.getUserId(), tokenValue, jsonObject.toString()); return; } // 获取唯一的主机IP列表 - String[] hostIpArr = meterInfoVoList.stream() - .map(TbMeterInfoVo::getHostIp) - .distinct() - .toArray(String[]::new); + String[] hostIpArr = meterInfoVoList.stream().map(TbMeterInfoVo::getHostIp).distinct().toArray(String[]::new); // 统计每个IP对应的仪表数量 Map ipCountMap = new HashMap<>(); for (String ip : hostIpArr) { - ipCountMap.put(ip, baseMapper.selectCount(new LambdaQueryWrapper() - .eq(TbMeterInfo::getHostIp, ip))); + ipCountMap.put(ip, baseMapper.selectCount(new LambdaQueryWrapper().eq(TbMeterInfo::getHostIp, ip))); } // 启动定时任务 @@ -319,44 +316,39 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService { meterResults = meterRecordUtil.getMeterStatus(ipCountMap, hostIpArr); } catch (Exception e) { // 获取数据失败,设置所有仪表通信状态为0并发送消息 - handleMeterCommunicationFailure(user, jsonObject, meterInfoVoList, webSocketMessage); + handleMeterCommunicationFailure(user, jsonObject, meterInfoVoList, tokenValue); return; } // 处理仪表读数和状态 - processMeterResults(user, jsonObject, meterResults, meterInfoVoList, webSocketMessage); + processMeterResults(user, jsonObject, meterResults, meterInfoVoList, tokenValue); }, 30000); } /** * 处理仪表通信失败的情况 * - * @param meterInfoVoList 仪表列表 - * @param jsonObject 消息对象 - * @param webSocketMessage WebSocket消息 - * @param user 当前用户 + * @param meterInfoVoList 仪表列表 + * @param jsonObject 消息对象 + * @param user 当前用户 */ - private void handleMeterCommunicationFailure(LoginUser user, JSONObject jsonObject, List meterInfoVoList, WebSocketMessageDto webSocketMessage) { + private void handleMeterCommunicationFailure(LoginUser user, JSONObject jsonObject, List meterInfoVoList, String tokenValue) { meterInfoVoList.forEach(item -> item.setCommunicationState(0L)); jsonObject.put("data", meterInfoVoList); - webSocketMessage.setMessage(jsonObject.toString()); - webSocketMessage.setSessionKeys(List.of(user.getUserId())); - WebSocketUtils.publishMessage(webSocketMessage); + remoteMessageService.sendMessage(user.getUserId(), tokenValue, jsonObject.toString()); } /** * 处理仪表读数结果 * - * @param meterInfoVoList 仪表列表 - * @param meterResults 读数结果 - * @param jsonObject 消息对象 - * @param webSocketMessage WebSocket消息 - * @param user 当前用户 + * @param meterInfoVoList 仪表列表 + * @param meterResults 读数结果 + * @param jsonObject 消息对象 + * @param user 当前用户 */ - private void processMeterResults(LoginUser user, JSONObject jsonObject, List meterResults, List meterInfoVoList, WebSocketMessageDto webSocketMessage) { + private void processMeterResults(LoginUser user, JSONObject jsonObject, List meterResults, List meterInfoVoList, String tokenValue) { // 创建IP到结果的映射,提高查找效率 - Map meterResultMap = meterResults.stream() - .collect(Collectors.toMap(MeterResult::getIp, Function.identity(), (v1, v2) -> v1)); + Map meterResultMap = meterResults.stream().collect(Collectors.toMap(MeterResult::getIp, Function.identity(), (v1, v2) -> v1)); for (TbMeterInfoVo item : meterInfoVoList) { MeterResult meterResult = meterResultMap.get(item.getHostIp()); @@ -373,8 +365,7 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService { try { int codeIndex = Integer.parseInt(meterCode); if (codeIndex >= 0 && codeIndex < collectionValue.size()) { - BigDecimal initReading = BigDecimal.valueOf(collectionValue.get(codeIndex)) - .setScale(2, RoundingMode.HALF_UP); + BigDecimal initReading = BigDecimal.valueOf(collectionValue.get(codeIndex)).setScale(2, RoundingMode.HALF_UP); item.setInitReading(initReading); item.setCommunicationState(initReading.compareTo(BigDecimal.ZERO) == 0 ? 0L : 1L); } else { @@ -386,8 +377,6 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService { } jsonObject.put("data", meterInfoVoList); - webSocketMessage.setMessage(jsonObject.toString()); - webSocketMessage.setSessionKeys(List.of(user.getUserId())); - WebSocketUtils.publishMessage(webSocketMessage); + remoteMessageService.sendMessage(user.getUserId(), tokenValue, jsonObject.toString()); } } diff --git a/ruoyi-modules/ruoyi-resource/src/main/java/org/dromara/resource/dubbo/RemoteMessageServiceImpl.java b/ruoyi-modules/ruoyi-resource/src/main/java/org/dromara/resource/dubbo/RemoteMessageServiceImpl.java index 887cd5fb..5a3dfbd7 100644 --- a/ruoyi-modules/ruoyi-resource/src/main/java/org/dromara/resource/dubbo/RemoteMessageServiceImpl.java +++ b/ruoyi-modules/ruoyi-resource/src/main/java/org/dromara/resource/dubbo/RemoteMessageServiceImpl.java @@ -45,4 +45,16 @@ public class RemoteMessageServiceImpl implements RemoteMessageService { SseMessageUtils.publishAll(message); } + /** + * 向指定的用户的指定会话发送消息 + * + * @param userId 要发送消息的用户id + * @param token 用户的会话令牌 + * @param message 要发送的消息内容 + */ + @Override + public void sendMessage(Long userId, String token, String message) { + SseMessageUtils.sendMessage(userId, token, message); + } + }