diff --git a/ruoyi-modules/Property/src/main/java/org/dromara/property/controller/smartDevicesController/TbMeterInfoController.java b/ruoyi-modules/Property/src/main/java/org/dromara/property/controller/smartDevicesController/TbMeterInfoController.java index 203a4850..967996d2 100644 --- a/ruoyi-modules/Property/src/main/java/org/dromara/property/controller/smartDevicesController/TbMeterInfoController.java +++ b/ruoyi-modules/Property/src/main/java/org/dromara/property/controller/smartDevicesController/TbMeterInfoController.java @@ -124,8 +124,9 @@ public class TbMeterInfoController extends BaseController { * @param floorId 楼栋id */ @GetMapping("/currentReading") - public R> currentReading(@RequestParam Long meterType, @RequestParam Long floorId) { - return R.ok(tbMeterInfoService.getMeterStatus(meterType, floorId)); + public R currentReading(@RequestParam Long meterType, @RequestParam Long floorId) { + tbMeterInfoService.getMeterStatus(meterType, floorId); + return R.ok(); } } diff --git a/ruoyi-modules/Property/src/main/java/org/dromara/property/service/impl/smartDevicesImpl/TbMeterInfoServiceImpl.java b/ruoyi-modules/Property/src/main/java/org/dromara/property/service/impl/smartDevicesImpl/TbMeterInfoServiceImpl.java index 697bc188..3c7ea77b 100644 --- a/ruoyi-modules/Property/src/main/java/org/dromara/property/service/impl/smartDevicesImpl/TbMeterInfoServiceImpl.java +++ b/ruoyi-modules/Property/src/main/java/org/dromara/property/service/impl/smartDevicesImpl/TbMeterInfoServiceImpl.java @@ -1,6 +1,8 @@ package org.dromara.property.service.impl.smartDevicesImpl; +import cn.hutool.core.date.DateUtil; import cn.hutool.core.lang.Assert; +import com.alibaba.fastjson.JSONObject; import org.dromara.common.core.domain.TreeNode; import org.dromara.common.core.utils.MapstructUtils; import org.dromara.common.core.utils.StringUtils; @@ -12,6 +14,9 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.dromara.common.satoken.utils.LoginHelper; +import org.dromara.common.websocket.dto.WebSocketMessageDto; +import org.dromara.common.websocket.utils.WebSocketUtils; import org.dromara.property.domain.bo.TbFloorBo; import org.dromara.property.domain.vo.TbBuildingVo; import org.dromara.property.domain.vo.TbCommunityVo; @@ -20,7 +25,9 @@ import org.dromara.property.rocketmq.domain.MeterResult; import org.dromara.property.service.ITbBuildingService; import org.dromara.property.service.ITbCommunityService; import org.dromara.property.service.ITbFloorService; +import org.dromara.property.tasks.HeartbeatTasks; import org.dromara.property.utils.MeterRecordUtil; +import org.dromara.system.api.model.LoginUser; import org.springframework.stereotype.Service; import org.dromara.property.domain.bo.smartDevicesBo.TbMeterInfoBo; import org.dromara.property.domain.vo.smartDevicesVo.TbMeterInfoVo; @@ -32,6 +39,7 @@ import org.springframework.transaction.annotation.Transactional; import java.math.BigDecimal; import java.math.RoundingMode; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -51,6 +59,7 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService { private final ITbCommunityService communityService; private final MeterRecordUtil meterRecordUtil; + private final HeartbeatTasks heartbeatTasks; /** * 查询水电气 @@ -226,7 +235,7 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService { }).toList(); treeList.addAll(l3); - if (isMeter){ + if (isMeter) { TbMeterInfoBo bo = new TbMeterInfoBo(); bo.setMeterType(meterType); List meterInfoVos = this.queryList(bo); @@ -252,42 +261,133 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService { * @param floorId 楼栋id */ @Override - public List getMeterStatus(Long meterType, Long floorId) { + public void getMeterStatus(Long meterType, Long floorId) { + // 参数校验 + if (meterType == 0L || floorId == 0L) { + heartbeatTasks.stopTask("Meter_Status_Reading"); + return; + } + + // 获取当前登录用户 + LoginUser user = LoginHelper.getLoginUser(); + if (user == null) { + heartbeatTasks.stopTask("Meter_Status_Reading"); + return; + } + + // 初始化WebSocket消息 + WebSocketMessageDto webSocketMessage = new WebSocketMessageDto(); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("type", "meter"); + + // 查询仪表信息 TbMeterInfoBo meterInfoBo = new TbMeterInfoBo(); meterInfoBo.setFloorId(floorId); meterInfoBo.setMeterType(meterType); List meterInfoVoList = this.queryList(meterInfoBo); - if (meterInfoVoList.isEmpty()) return null; - String[] hostIpArr = meterInfoVoList.stream().map(TbMeterInfoVo::getHostIp).distinct().toArray(String[]::new); + // 如果没有仪表信息,直接返回 + if (meterInfoVoList.isEmpty()) { + heartbeatTasks.stopTask("Meter_Status_Reading"); + jsonObject.put("readingTime", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss")); + jsonObject.put("data", new ArrayList<>()); + webSocketMessage.setMessage(jsonObject.toString()); + webSocketMessage.setSessionKeys(List.of(user.getUserId())); + WebSocketUtils.publishMessage(webSocketMessage); + return; + } + + // 获取唯一的主机IP列表 + String[] hostIpArr = meterInfoVoList.stream() + .map(TbMeterInfoVo::getHostIp) + .distinct() + .toArray(String[]::new); + + // 统计每个IP对应的仪表数量 Map ipCountMap = new HashMap<>(); for (String ip : hostIpArr) { - ipCountMap.put(ip, baseMapper.selectCount(new LambdaQueryWrapper().eq(TbMeterInfo::getHostIp, ip))); + ipCountMap.put(ip, baseMapper.selectCount(new LambdaQueryWrapper() + .eq(TbMeterInfo::getHostIp, ip))); } - List meterResults; - try{ - meterResults = meterRecordUtil.getMeterStatus(ipCountMap, hostIpArr); - } catch (Exception e) { - // 获取数据失败,返回所有数据,设置通信状态为0 - for (TbMeterInfoVo item : meterInfoVoList) { - item.setCommunicationState(0L); + + // 启动定时任务 + heartbeatTasks.startHeartbeatTask("Meter_Status_Reading", () -> { + jsonObject.put("readingTime", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss")); + + List meterResults; + try { + meterResults = meterRecordUtil.getMeterStatus(ipCountMap, hostIpArr); + } catch (Exception e) { + // 获取数据失败,设置所有仪表通信状态为0并发送消息 + handleMeterCommunicationFailure(user, jsonObject, meterInfoVoList, webSocketMessage); + return; } - return meterInfoVoList; - } + + // 处理仪表读数和状态 + processMeterResults(user, jsonObject, meterResults, meterInfoVoList, webSocketMessage); + }, 30000); + } + + /** + * 处理仪表通信失败的情况 + * + * @param meterInfoVoList 仪表列表 + * @param jsonObject 消息对象 + * @param webSocketMessage WebSocket消息 + * @param user 当前用户 + */ + private void handleMeterCommunicationFailure(LoginUser user, JSONObject jsonObject, List meterInfoVoList, WebSocketMessageDto webSocketMessage) { + meterInfoVoList.forEach(item -> item.setCommunicationState(0L)); + jsonObject.put("data", meterInfoVoList); + webSocketMessage.setMessage(jsonObject.toString()); + webSocketMessage.setSessionKeys(List.of(user.getUserId())); + WebSocketUtils.publishMessage(webSocketMessage); + } + + /** + * 处理仪表读数结果 + * + * @param meterInfoVoList 仪表列表 + * @param meterResults 读数结果 + * @param jsonObject 消息对象 + * @param webSocketMessage WebSocket消息 + * @param user 当前用户 + */ + private void processMeterResults(LoginUser user, JSONObject jsonObject, List meterResults, List meterInfoVoList, WebSocketMessageDto webSocketMessage) { + // 创建IP到结果的映射,提高查找效率 + Map meterResultMap = meterResults.stream() + .collect(Collectors.toMap(MeterResult::getIp, Function.identity(), (v1, v2) -> v1)); for (TbMeterInfoVo item : meterInfoVoList) { - MeterResult meterResult = meterResults.stream().filter(o -> o.getIp().equals(item.getHostIp())).findFirst().orElse(null); - if (meterResult == null) continue; - - BigDecimal initReading = BigDecimal.valueOf(meterResult.getCollectionValue().get(Integer.parseInt(item.getMeterCode()))) - .setScale(2, RoundingMode.HALF_UP); - if (initReading.compareTo(BigDecimal.ZERO) == 0) { + MeterResult meterResult = meterResultMap.get(item.getHostIp()); + if (meterResult == null) { + // 如果没有找到对应的结果,设置通信状态为0 + item.setCommunicationState(0L); + continue; + } + + // 解析读数 + List collectionValue = meterResult.getCollectionValue(); + String meterCode = item.getMeterCode(); + + try { + int codeIndex = Integer.parseInt(meterCode); + if (codeIndex >= 0 && codeIndex < collectionValue.size()) { + BigDecimal initReading = BigDecimal.valueOf(collectionValue.get(codeIndex)) + .setScale(2, RoundingMode.HALF_UP); + item.setInitReading(initReading); + item.setCommunicationState(initReading.compareTo(BigDecimal.ZERO) == 0 ? 0L : 1L); + } else { + item.setCommunicationState(0L); + } + } catch (NumberFormatException e) { item.setCommunicationState(0L); - } else { - item.setCommunicationState(1L); } - item.setInitReading(initReading); } - return meterInfoVoList; + + jsonObject.put("data", meterInfoVoList); + webSocketMessage.setMessage(jsonObject.toString()); + webSocketMessage.setSessionKeys(List.of(user.getUserId())); + WebSocketUtils.publishMessage(webSocketMessage); } } diff --git a/ruoyi-modules/Property/src/main/java/org/dromara/property/service/smartDevicesService/ITbMeterInfoService.java b/ruoyi-modules/Property/src/main/java/org/dromara/property/service/smartDevicesService/ITbMeterInfoService.java index d71fb417..7430854c 100644 --- a/ruoyi-modules/Property/src/main/java/org/dromara/property/service/smartDevicesService/ITbMeterInfoService.java +++ b/ruoyi-modules/Property/src/main/java/org/dromara/property/service/smartDevicesService/ITbMeterInfoService.java @@ -82,5 +82,5 @@ public interface ITbMeterInfoService { * @param meterType 水电气类型 * @param floorId 楼栋id */ - List getMeterStatus(Long meterType, Long floorId); + void getMeterStatus(Long meterType, Long floorId); } diff --git a/ruoyi-modules/Property/src/main/java/org/dromara/property/tasks/HeartbeatTasks.java b/ruoyi-modules/Property/src/main/java/org/dromara/property/tasks/HeartbeatTasks.java new file mode 100644 index 00000000..3ea0917e --- /dev/null +++ b/ruoyi-modules/Property/src/main/java/org/dromara/property/tasks/HeartbeatTasks.java @@ -0,0 +1,42 @@ +package org.dromara.property.tasks; + +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.concurrent.*; + +/** + * @author lsm + * @apiNote HeartbeatTasks + * @since 2025/9/1 + */ +@Component +public class HeartbeatTasks { + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final Map> tasks = new ConcurrentHashMap<>(); + + public void startHeartbeatTask(String taskId, Runnable task, long intervalMs) { + // 先停止同名任务(如果存在) + stopTask(taskId); + + // 创建新任务 + ScheduledFuture future = scheduler.scheduleAtFixedRate( + task, 0, intervalMs, TimeUnit.MILLISECONDS); + + tasks.put(taskId, future); + } + + public void stopHeartbeatTask() { + // 停止所有心跳任务 + tasks.values().forEach(future -> future.cancel(true)); + tasks.clear(); + } + + public void stopTask(String taskId) { + ScheduledFuture future = tasks.get(taskId); + if (future != null) { + future.cancel(true); + tasks.remove(taskId); + } + } +}