feat(property): webSocket定时推送仪表状态功能

- 新增 HeartbeatTasks 类用于管理心跳任务
This commit is contained in:
2025-09-01 18:07:33 +08:00
parent 046e9d925a
commit 863366de7e
4 changed files with 170 additions and 27 deletions

View File

@@ -124,8 +124,9 @@ public class TbMeterInfoController extends BaseController {
* @param floorId 楼栋id
*/
@GetMapping("/currentReading")
public R<List<TbMeterInfoVo>> currentReading(@RequestParam Long meterType, @RequestParam Long floorId) {
return R.ok(tbMeterInfoService.getMeterStatus(meterType, floorId));
public R<Void> currentReading(@RequestParam Long meterType, @RequestParam Long floorId) {
tbMeterInfoService.getMeterStatus(meterType, floorId);
return R.ok();
}
}

View File

@@ -1,6 +1,8 @@
package org.dromara.property.service.impl.smartDevicesImpl;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.Assert;
import com.alibaba.fastjson.JSONObject;
import org.dromara.common.core.domain.TreeNode;
import org.dromara.common.core.utils.MapstructUtils;
import org.dromara.common.core.utils.StringUtils;
@@ -12,6 +14,9 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.satoken.utils.LoginHelper;
import org.dromara.common.websocket.dto.WebSocketMessageDto;
import org.dromara.common.websocket.utils.WebSocketUtils;
import org.dromara.property.domain.bo.TbFloorBo;
import org.dromara.property.domain.vo.TbBuildingVo;
import org.dromara.property.domain.vo.TbCommunityVo;
@@ -20,7 +25,9 @@ import org.dromara.property.rocketmq.domain.MeterResult;
import org.dromara.property.service.ITbBuildingService;
import org.dromara.property.service.ITbCommunityService;
import org.dromara.property.service.ITbFloorService;
import org.dromara.property.tasks.HeartbeatTasks;
import org.dromara.property.utils.MeterRecordUtil;
import org.dromara.system.api.model.LoginUser;
import org.springframework.stereotype.Service;
import org.dromara.property.domain.bo.smartDevicesBo.TbMeterInfoBo;
import org.dromara.property.domain.vo.smartDevicesVo.TbMeterInfoVo;
@@ -32,6 +39,7 @@ import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
@@ -51,6 +59,7 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService {
private final ITbCommunityService communityService;
private final MeterRecordUtil meterRecordUtil;
private final HeartbeatTasks heartbeatTasks;
/**
* 查询水电气
@@ -226,7 +235,7 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService {
}).toList();
treeList.addAll(l3);
if (isMeter){
if (isMeter) {
TbMeterInfoBo bo = new TbMeterInfoBo();
bo.setMeterType(meterType);
List<TbMeterInfoVo> meterInfoVos = this.queryList(bo);
@@ -252,42 +261,133 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService {
* @param floorId 楼栋id
*/
@Override
public List<TbMeterInfoVo> getMeterStatus(Long meterType, Long floorId) {
public void getMeterStatus(Long meterType, Long floorId) {
// 参数校验
if (meterType == 0L || floorId == 0L) {
heartbeatTasks.stopTask("Meter_Status_Reading");
return;
}
// 获取当前登录用户
LoginUser user = LoginHelper.getLoginUser();
if (user == null) {
heartbeatTasks.stopTask("Meter_Status_Reading");
return;
}
// 初始化WebSocket消息
WebSocketMessageDto webSocketMessage = new WebSocketMessageDto();
JSONObject jsonObject = new JSONObject();
jsonObject.put("type", "meter");
// 查询仪表信息
TbMeterInfoBo meterInfoBo = new TbMeterInfoBo();
meterInfoBo.setFloorId(floorId);
meterInfoBo.setMeterType(meterType);
List<TbMeterInfoVo> meterInfoVoList = this.queryList(meterInfoBo);
if (meterInfoVoList.isEmpty()) return null;
String[] hostIpArr = meterInfoVoList.stream().map(TbMeterInfoVo::getHostIp).distinct().toArray(String[]::new);
// 如果没有仪表信息,直接返回
if (meterInfoVoList.isEmpty()) {
heartbeatTasks.stopTask("Meter_Status_Reading");
jsonObject.put("readingTime", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"));
jsonObject.put("data", new ArrayList<>());
webSocketMessage.setMessage(jsonObject.toString());
webSocketMessage.setSessionKeys(List.of(user.getUserId()));
WebSocketUtils.publishMessage(webSocketMessage);
return;
}
// 获取唯一的主机IP列表
String[] hostIpArr = meterInfoVoList.stream()
.map(TbMeterInfoVo::getHostIp)
.distinct()
.toArray(String[]::new);
// 统计每个IP对应的仪表数量
Map<String, Long> ipCountMap = new HashMap<>();
for (String ip : hostIpArr) {
ipCountMap.put(ip, baseMapper.selectCount(new LambdaQueryWrapper<TbMeterInfo>().eq(TbMeterInfo::getHostIp, ip)));
ipCountMap.put(ip, baseMapper.selectCount(new LambdaQueryWrapper<TbMeterInfo>()
.eq(TbMeterInfo::getHostIp, ip)));
}
List<MeterResult> meterResults;
try{
meterResults = meterRecordUtil.getMeterStatus(ipCountMap, hostIpArr);
} catch (Exception e) {
// 获取数据失败返回所有数据设置通信状态为0
for (TbMeterInfoVo item : meterInfoVoList) {
item.setCommunicationState(0L);
// 启动定时任务
heartbeatTasks.startHeartbeatTask("Meter_Status_Reading", () -> {
jsonObject.put("readingTime", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"));
List<MeterResult> meterResults;
try {
meterResults = meterRecordUtil.getMeterStatus(ipCountMap, hostIpArr);
} catch (Exception e) {
// 获取数据失败设置所有仪表通信状态为0并发送消息
handleMeterCommunicationFailure(user, jsonObject, meterInfoVoList, webSocketMessage);
return;
}
return meterInfoVoList;
}
// 处理仪表读数和状态
processMeterResults(user, jsonObject, meterResults, meterInfoVoList, webSocketMessage);
}, 30000);
}
/**
* 处理仪表通信失败的情况
*
* @param meterInfoVoList 仪表列表
* @param jsonObject 消息对象
* @param webSocketMessage WebSocket消息
* @param user 当前用户
*/
private void handleMeterCommunicationFailure(LoginUser user, JSONObject jsonObject, List<TbMeterInfoVo> meterInfoVoList, WebSocketMessageDto webSocketMessage) {
meterInfoVoList.forEach(item -> item.setCommunicationState(0L));
jsonObject.put("data", meterInfoVoList);
webSocketMessage.setMessage(jsonObject.toString());
webSocketMessage.setSessionKeys(List.of(user.getUserId()));
WebSocketUtils.publishMessage(webSocketMessage);
}
/**
* 处理仪表读数结果
*
* @param meterInfoVoList 仪表列表
* @param meterResults 读数结果
* @param jsonObject 消息对象
* @param webSocketMessage WebSocket消息
* @param user 当前用户
*/
private void processMeterResults(LoginUser user, JSONObject jsonObject, List<MeterResult> meterResults, List<TbMeterInfoVo> meterInfoVoList, WebSocketMessageDto webSocketMessage) {
// 创建IP到结果的映射提高查找效率
Map<String, MeterResult> meterResultMap = meterResults.stream()
.collect(Collectors.toMap(MeterResult::getIp, Function.identity(), (v1, v2) -> v1));
for (TbMeterInfoVo item : meterInfoVoList) {
MeterResult meterResult = meterResults.stream().filter(o -> o.getIp().equals(item.getHostIp())).findFirst().orElse(null);
if (meterResult == null) continue;
BigDecimal initReading = BigDecimal.valueOf(meterResult.getCollectionValue().get(Integer.parseInt(item.getMeterCode())))
.setScale(2, RoundingMode.HALF_UP);
if (initReading.compareTo(BigDecimal.ZERO) == 0) {
MeterResult meterResult = meterResultMap.get(item.getHostIp());
if (meterResult == null) {
// 如果没有找到对应的结果设置通信状态为0
item.setCommunicationState(0L);
continue;
}
// 解析读数
List<Float> collectionValue = meterResult.getCollectionValue();
String meterCode = item.getMeterCode();
try {
int codeIndex = Integer.parseInt(meterCode);
if (codeIndex >= 0 && codeIndex < collectionValue.size()) {
BigDecimal initReading = BigDecimal.valueOf(collectionValue.get(codeIndex))
.setScale(2, RoundingMode.HALF_UP);
item.setInitReading(initReading);
item.setCommunicationState(initReading.compareTo(BigDecimal.ZERO) == 0 ? 0L : 1L);
} else {
item.setCommunicationState(0L);
}
} catch (NumberFormatException e) {
item.setCommunicationState(0L);
} else {
item.setCommunicationState(1L);
}
item.setInitReading(initReading);
}
return meterInfoVoList;
jsonObject.put("data", meterInfoVoList);
webSocketMessage.setMessage(jsonObject.toString());
webSocketMessage.setSessionKeys(List.of(user.getUserId()));
WebSocketUtils.publishMessage(webSocketMessage);
}
}

View File

@@ -82,5 +82,5 @@ public interface ITbMeterInfoService {
* @param meterType 水电气类型
* @param floorId 楼栋id
*/
List<TbMeterInfoVo> getMeterStatus(Long meterType, Long floorId);
void getMeterStatus(Long meterType, Long floorId);
}

View File

@@ -0,0 +1,42 @@
package org.dromara.property.tasks;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.*;
/**
* @author lsm
* @apiNote HeartbeatTasks
* @since 2025/9/1
*/
@Component
public class HeartbeatTasks {
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<String, ScheduledFuture<?>> tasks = new ConcurrentHashMap<>();
public void startHeartbeatTask(String taskId, Runnable task, long intervalMs) {
// 先停止同名任务(如果存在)
stopTask(taskId);
// 创建新任务
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(
task, 0, intervalMs, TimeUnit.MILLISECONDS);
tasks.put(taskId, future);
}
public void stopHeartbeatTask() {
// 停止所有心跳任务
tasks.values().forEach(future -> future.cancel(true));
tasks.clear();
}
public void stopTask(String taskId) {
ScheduledFuture<?> future = tasks.get(taskId);
if (future != null) {
future.cancel(true);
tasks.remove(taskId);
}
}
}