feat(property): 用 Server-Sent Events (SSE) 替代 WebSocket 实现消息推送

This commit is contained in:
2025-09-06 13:16:40 +08:00
parent 22c6efea63
commit 0bf8cf45e8
8 changed files with 111 additions and 38 deletions

View File

@@ -125,6 +125,11 @@
<artifactId>ruoyi-common-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>ruoyi-common-sse</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>

View File

@@ -114,7 +114,7 @@ public class ResidentPersonBo extends BaseEntity {
/**
* 状态
*/
private Integer state = 1;
private Integer state;
/**
* 备注

View File

@@ -1,8 +1,10 @@
package org.dromara.property.service.impl.smartDevicesImpl;
import cn.dev33.satoken.stp.StpUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.Assert;
import com.alibaba.fastjson.JSONObject;
import org.apache.dubbo.config.annotation.DubboReference;
import org.dromara.common.core.domain.TreeNode;
import org.dromara.common.core.utils.MapstructUtils;
import org.dromara.common.core.utils.StringUtils;
@@ -15,8 +17,6 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.satoken.utils.LoginHelper;
import org.dromara.common.websocket.dto.WebSocketMessageDto;
import org.dromara.common.websocket.utils.WebSocketUtils;
import org.dromara.property.domain.bo.TbFloorBo;
import org.dromara.property.domain.vo.TbBuildingVo;
import org.dromara.property.domain.vo.TbCommunityVo;
@@ -27,6 +27,7 @@ import org.dromara.property.service.ITbCommunityService;
import org.dromara.property.service.ITbFloorService;
import org.dromara.property.tasks.HeartbeatTasks;
import org.dromara.property.utils.MeterRecordUtil;
import org.dromara.resource.api.RemoteMessageService;
import org.dromara.system.api.model.LoginUser;
import org.springframework.stereotype.Service;
import org.dromara.property.domain.bo.smartDevicesBo.TbMeterInfoBo;
@@ -61,6 +62,9 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService {
private final MeterRecordUtil meterRecordUtil;
private final HeartbeatTasks heartbeatTasks;
@DubboReference
private RemoteMessageService remoteMessageService;
/**
* 查询水电气
*
@@ -85,8 +89,7 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService {
Page<TbMeterInfoVo> result = baseMapper.selectVoPage(pageQuery.build(), lqw);
// 创建楼层ID到楼层名称的映射避免重复查询
List<TbFloorVo> floorList = floorService.queryList(new TbFloorBo());
Map<Long, String> floorMap = floorList.stream()
.collect(Collectors.toMap(TbFloorVo::getId, TbFloorVo::getFloorName, (key1, key2) -> key1));
Map<Long, String> floorMap = floorList.stream().collect(Collectors.toMap(TbFloorVo::getId, TbFloorVo::getFloorName, (key1, key2) -> key1));
// 为每个灯控信息设置楼层名称
result.getRecords().forEach(record -> {
@@ -270,13 +273,13 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService {
// 获取当前登录用户
LoginUser user = LoginHelper.getLoginUser();
String tokenValue = StpUtil.getTokenValue();
if (user == null) {
heartbeatTasks.stopTask("Meter_Status_Reading");
return;
}
// 初始化WebSocket消息
WebSocketMessageDto webSocketMessage = new WebSocketMessageDto();
JSONObject jsonObject = new JSONObject();
jsonObject.put("type", "meter");
@@ -291,23 +294,17 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService {
heartbeatTasks.stopTask("Meter_Status_Reading");
jsonObject.put("readingTime", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"));
jsonObject.put("data", new ArrayList<>());
webSocketMessage.setMessage(jsonObject.toString());
webSocketMessage.setSessionKeys(List.of(user.getUserId()));
WebSocketUtils.publishMessage(webSocketMessage);
remoteMessageService.sendMessage(user.getUserId(), tokenValue, jsonObject.toString());
return;
}
// 获取唯一的主机IP列表
String[] hostIpArr = meterInfoVoList.stream()
.map(TbMeterInfoVo::getHostIp)
.distinct()
.toArray(String[]::new);
String[] hostIpArr = meterInfoVoList.stream().map(TbMeterInfoVo::getHostIp).distinct().toArray(String[]::new);
// 统计每个IP对应的仪表数量
Map<String, Long> ipCountMap = new HashMap<>();
for (String ip : hostIpArr) {
ipCountMap.put(ip, baseMapper.selectCount(new LambdaQueryWrapper<TbMeterInfo>()
.eq(TbMeterInfo::getHostIp, ip)));
ipCountMap.put(ip, baseMapper.selectCount(new LambdaQueryWrapper<TbMeterInfo>().eq(TbMeterInfo::getHostIp, ip)));
}
// 启动定时任务
@@ -319,44 +316,39 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService {
meterResults = meterRecordUtil.getMeterStatus(ipCountMap, hostIpArr);
} catch (Exception e) {
// 获取数据失败设置所有仪表通信状态为0并发送消息
handleMeterCommunicationFailure(user, jsonObject, meterInfoVoList, webSocketMessage);
handleMeterCommunicationFailure(user, jsonObject, meterInfoVoList, tokenValue);
return;
}
// 处理仪表读数和状态
processMeterResults(user, jsonObject, meterResults, meterInfoVoList, webSocketMessage);
processMeterResults(user, jsonObject, meterResults, meterInfoVoList, tokenValue);
}, 30000);
}
/**
* 处理仪表通信失败的情况
*
* @param meterInfoVoList 仪表列表
* @param jsonObject 消息对象
* @param webSocketMessage WebSocket消息
* @param user 当前用户
* @param meterInfoVoList 仪表列表
* @param jsonObject 消息对象
* @param user 当前用户
*/
private void handleMeterCommunicationFailure(LoginUser user, JSONObject jsonObject, List<TbMeterInfoVo> meterInfoVoList, WebSocketMessageDto webSocketMessage) {
private void handleMeterCommunicationFailure(LoginUser user, JSONObject jsonObject, List<TbMeterInfoVo> meterInfoVoList, String tokenValue) {
meterInfoVoList.forEach(item -> item.setCommunicationState(0L));
jsonObject.put("data", meterInfoVoList);
webSocketMessage.setMessage(jsonObject.toString());
webSocketMessage.setSessionKeys(List.of(user.getUserId()));
WebSocketUtils.publishMessage(webSocketMessage);
remoteMessageService.sendMessage(user.getUserId(), tokenValue, jsonObject.toString());
}
/**
* 处理仪表读数结果
*
* @param meterInfoVoList 仪表列表
* @param meterResults 读数结果
* @param jsonObject 消息对象
* @param webSocketMessage WebSocket消息
* @param user 当前用户
* @param meterInfoVoList 仪表列表
* @param meterResults 读数结果
* @param jsonObject 消息对象
* @param user 当前用户
*/
private void processMeterResults(LoginUser user, JSONObject jsonObject, List<MeterResult> meterResults, List<TbMeterInfoVo> meterInfoVoList, WebSocketMessageDto webSocketMessage) {
private void processMeterResults(LoginUser user, JSONObject jsonObject, List<MeterResult> meterResults, List<TbMeterInfoVo> meterInfoVoList, String tokenValue) {
// 创建IP到结果的映射提高查找效率
Map<String, MeterResult> meterResultMap = meterResults.stream()
.collect(Collectors.toMap(MeterResult::getIp, Function.identity(), (v1, v2) -> v1));
Map<String, MeterResult> meterResultMap = meterResults.stream().collect(Collectors.toMap(MeterResult::getIp, Function.identity(), (v1, v2) -> v1));
for (TbMeterInfoVo item : meterInfoVoList) {
MeterResult meterResult = meterResultMap.get(item.getHostIp());
@@ -373,8 +365,7 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService {
try {
int codeIndex = Integer.parseInt(meterCode);
if (codeIndex >= 0 && codeIndex < collectionValue.size()) {
BigDecimal initReading = BigDecimal.valueOf(collectionValue.get(codeIndex))
.setScale(2, RoundingMode.HALF_UP);
BigDecimal initReading = BigDecimal.valueOf(collectionValue.get(codeIndex)).setScale(2, RoundingMode.HALF_UP);
item.setInitReading(initReading);
item.setCommunicationState(initReading.compareTo(BigDecimal.ZERO) == 0 ? 0L : 1L);
} else {
@@ -386,8 +377,6 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService {
}
jsonObject.put("data", meterInfoVoList);
webSocketMessage.setMessage(jsonObject.toString());
webSocketMessage.setSessionKeys(List.of(user.getUserId()));
WebSocketUtils.publishMessage(webSocketMessage);
remoteMessageService.sendMessage(user.getUserId(), tokenValue, jsonObject.toString());
}
}