feat(Property): 优化定时任务执行逻辑并添加时间限制
This commit is contained in:
@@ -266,17 +266,17 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService {
|
||||
*/
|
||||
@Override
|
||||
public void getMeterStatus(Long meterType, Long floorId) {
|
||||
// 参数校验
|
||||
if (meterType == 0L || floorId == 0L) {
|
||||
heartbeatTasks.stopTask("Meter_Status_Reading");
|
||||
return;
|
||||
}
|
||||
|
||||
// 获取当前登录用户
|
||||
LoginUser user = LoginHelper.getLoginUser();
|
||||
String tokenValue = StpUtil.getTokenValue();
|
||||
if (user == null) {
|
||||
heartbeatTasks.stopTask("Meter_Status_Reading");
|
||||
heartbeatTasks.stopTask(tokenValue);
|
||||
return;
|
||||
}
|
||||
|
||||
// 参数校验
|
||||
if (meterType == 0L || floorId == 0L) {
|
||||
heartbeatTasks.stopTask(tokenValue);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -292,7 +292,7 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService {
|
||||
|
||||
// 如果没有仪表信息,直接返回
|
||||
if (meterInfoVoList.isEmpty()) {
|
||||
heartbeatTasks.stopTask("Meter_Status_Reading");
|
||||
heartbeatTasks.stopTask(tokenValue);
|
||||
jsonObject.put("readingTime", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"));
|
||||
jsonObject.put("data", new ArrayList<>());
|
||||
remoteMessageService.sendMessage(user.getUserId(), tokenValue, jsonObject.toString());
|
||||
@@ -309,7 +309,7 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService {
|
||||
}
|
||||
|
||||
// 启动定时任务
|
||||
heartbeatTasks.startHeartbeatTask("Meter_Status_Reading", () -> {
|
||||
heartbeatTasks.startHeartbeatTask(tokenValue, () -> {
|
||||
jsonObject.put("readingTime", DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"));
|
||||
|
||||
List<MeterResult> meterResults;
|
||||
@@ -325,7 +325,7 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService {
|
||||
|
||||
// 处理仪表读数和状态
|
||||
processMeterResults(user, jsonObject, meterResults, meterInfoVoList, tokenValue);
|
||||
}, 30000);
|
||||
}, 30000,300000);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@@ -212,12 +212,8 @@ public class TbMeterRecordServiceImpl implements ITbMeterRecordService {
|
||||
// 设置上次读数
|
||||
if (hasOldRecords) {
|
||||
TbMeterRecord oldRecord = oldRecordMap.get(info.getId());
|
||||
if (oldRecord != null) {
|
||||
record.setPreviousReading(oldRecord.getCurrentReading());
|
||||
} else {
|
||||
// 如果没有找到对应的旧记录,使用默认值
|
||||
record.setPreviousReading(record.getCurrentReading());
|
||||
}
|
||||
// 如果没有找到对应的旧记录,使用默认值
|
||||
record.setPreviousReading(Objects.requireNonNullElse(oldRecord, record).getCurrentReading());
|
||||
} else {
|
||||
record.setPreviousReading(record.getCurrentReading());
|
||||
}
|
||||
@@ -308,7 +304,6 @@ public class TbMeterRecordServiceImpl implements ITbMeterRecordService {
|
||||
private Map<String, Object> trendMonthData(String floorId, String meterId, Long meterType, String year) {
|
||||
Map<String, Object> resultMap = new HashMap<>();
|
||||
List<Map<String, Object>> monthList = baseMapper.getMonthTrend(StrUtil.isBlank(floorId) ? null : Long.parseLong(floorId), StrUtil.isBlank(meterId) ? null : Long.parseLong(meterId), meterType, year);
|
||||
log.info("year:{},monthList:{}", year, monthList);
|
||||
List<String[]> monthData = new ArrayList<>();
|
||||
monthList.forEach(item -> monthData.add(new String[]{item.get("month").toString(), item.get("total_consumption").toString()}));
|
||||
Float total = monthList.stream().map(map -> new BigDecimal(map.get("total_consumption").toString())).reduce(BigDecimal::add).orElse(BigDecimal.ZERO).floatValue();
|
||||
|
@@ -15,14 +15,36 @@ public class HeartbeatTasks {
|
||||
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
|
||||
private final Map<String, ScheduledFuture<?>> tasks = new ConcurrentHashMap<>();
|
||||
|
||||
public void startHeartbeatTask(String taskId, Runnable task, long intervalMs) {
|
||||
public void startHeartbeatTask(String taskId, Runnable task, long intervalMs, long durationMs) {
|
||||
// 先停止同名任务(如果存在)
|
||||
stopTask(taskId);
|
||||
|
||||
// 创建新任务
|
||||
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(
|
||||
task, 0, intervalMs, TimeUnit.MILLISECONDS);
|
||||
// 如果 durationMs 为 0,表示任务无需时间限制
|
||||
if (durationMs <= 0) {
|
||||
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(
|
||||
task,
|
||||
0,
|
||||
intervalMs,
|
||||
TimeUnit.MILLISECONDS
|
||||
);
|
||||
tasks.put(taskId, future);
|
||||
return;
|
||||
}
|
||||
|
||||
// 创建一个包装任务,带时长检查
|
||||
long startTime = System.currentTimeMillis();
|
||||
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(
|
||||
() -> {
|
||||
long elapsedTime = System.currentTimeMillis() - startTime;
|
||||
if (elapsedTime >= durationMs) {
|
||||
stopTask(taskId);
|
||||
}
|
||||
task.run();
|
||||
},
|
||||
0,
|
||||
intervalMs,
|
||||
TimeUnit.MILLISECONDS
|
||||
);
|
||||
tasks.put(taskId, future);
|
||||
}
|
||||
|
||||
|
@@ -9,6 +9,8 @@ import java.io.IOException;
|
||||
import java.net.Socket;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static com.ghgande.j2mod.modbus.Modbus.WRITE_SINGLE_REGISTER;
|
||||
|
||||
@@ -24,9 +26,8 @@ public class LightingUtil {
|
||||
private static final int MODBUS_PORT = 502;
|
||||
// 功能码03(读保持寄存器)
|
||||
private static final byte FUNCTION_CODE = 0x03;
|
||||
// 采集寄存器范围(协议地址)
|
||||
private static final int START_ADDRESS = 42; // 40043 - 40001 = 42
|
||||
private static final int REGISTER_COUNT = 4; // 40046 - 40043 + 1 = 4
|
||||
// Modbus协议一次最多读取的寄存器数量
|
||||
private static final int MAX_REGISTERS_PER_REQUEST = 63;
|
||||
|
||||
private Socket socket;
|
||||
private DataInputStream input;
|
||||
@@ -70,78 +71,6 @@ public class LightingUtil {
|
||||
return header;
|
||||
}
|
||||
|
||||
/**
|
||||
* 读取40043-40046寄存器数据
|
||||
*
|
||||
* @return 包含4个寄存器值的int数组
|
||||
*/
|
||||
public int[] readRegisters() throws IOException {
|
||||
// 发送读取请求
|
||||
sendRequest();
|
||||
|
||||
// 接收并解析响应
|
||||
return parseResponse();
|
||||
}
|
||||
|
||||
/**
|
||||
* 构造并发送Modbus TCP请求帧
|
||||
*/
|
||||
private void sendRequest() throws IOException {
|
||||
// 事务ID(递增)
|
||||
int currentTransactionId = transactionId++;
|
||||
|
||||
// 创建请求帧(12字节)
|
||||
ByteBuffer buffer = ByteBuffer.allocate(12)
|
||||
.order(ByteOrder.BIG_ENDIAN);
|
||||
|
||||
// Header(7字节)
|
||||
buffer.putShort((short) currentTransactionId); // 事务ID
|
||||
buffer.putShort((short) 0); // 协议ID(0=Modbus)
|
||||
buffer.putShort((short) 6); // 长度(后续字节数)
|
||||
buffer.put((byte) 1); // 单元ID
|
||||
|
||||
// PDU(协议数据单元)
|
||||
buffer.put(FUNCTION_CODE); // 功能码
|
||||
buffer.putShort((short) START_ADDRESS); // 起始地址
|
||||
buffer.putShort((short) REGISTER_COUNT); // 寄存器数量
|
||||
|
||||
output.write(buffer.array());
|
||||
output.flush();
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析Modbus TCP响应
|
||||
*/
|
||||
private int[] parseResponse() throws IOException {
|
||||
// 读取头(7字节)
|
||||
byte[] header = initParse();
|
||||
|
||||
// 读取PDU(协议数据单元)
|
||||
int pduLength = ByteBuffer.wrap(header, 4, 2)
|
||||
.getShort() & 0xFFFF - 1; // 减去单元ID长度
|
||||
byte[] pdu = new byte[pduLength];
|
||||
input.readFully(pdu);
|
||||
|
||||
// 检查异常响应(功能码高位为1)
|
||||
if ((pdu[0] & 0xFF) == (FUNCTION_CODE | 0x80)) {
|
||||
throw new IOException("Modbus异常响应,错误码: " + (pdu[1] & 0xFF));
|
||||
}
|
||||
|
||||
// 验证功能码和字节数
|
||||
if (pdu[0] != FUNCTION_CODE || pdu[1] != REGISTER_COUNT * 2) {
|
||||
throw new IOException("无效响应格式");
|
||||
}
|
||||
|
||||
// 提取寄存器数据(每个寄存器2字节)
|
||||
int[] values = new int[REGISTER_COUNT];
|
||||
for (int i = 0; i < REGISTER_COUNT; i++) {
|
||||
int offset = 2 + i * 2;
|
||||
values[i] = ByteBuffer.wrap(pdu, offset, 2)
|
||||
.order(ByteOrder.BIG_ENDIAN).getShort() & 0xFFFF;
|
||||
}
|
||||
return values;
|
||||
}
|
||||
|
||||
/**
|
||||
* 写单个保持寄存器(功能码06)
|
||||
*
|
||||
@@ -214,5 +143,128 @@ public class LightingUtil {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* 读取指定数量的开关状态
|
||||
*
|
||||
* @param startAddress 起始地址(协议地址)
|
||||
* @param switchCount 开关数量
|
||||
* @return 包含所有开关状态的int数组,每个元素代表一个开关的状态
|
||||
* @throws IOException 如果读取过程中发生IO错误
|
||||
*/
|
||||
public int[] readSwitchStatus(int startAddress, int switchCount) throws IOException {
|
||||
// 如果开关数量超过单次请求限制,分批读取
|
||||
if (switchCount > MAX_REGISTERS_PER_REQUEST) {
|
||||
return readSwitchStatusBatch(startAddress, switchCount);
|
||||
}
|
||||
|
||||
// 发送读取请求
|
||||
sendCustomRequest(startAddress, switchCount);
|
||||
|
||||
// 接收并解析响应
|
||||
return parseCustomResponse(switchCount);
|
||||
}
|
||||
|
||||
/**
|
||||
* 分批读取大量开关状态
|
||||
*/
|
||||
private int[] readSwitchStatusBatch(int startAddress, int totalSwitchCount) throws IOException {
|
||||
List<Integer> allValues = new ArrayList<>();
|
||||
int remaining = totalSwitchCount;
|
||||
int currentAddress = startAddress;
|
||||
|
||||
while (remaining > 0) {
|
||||
int batchSize = Math.min(remaining, MAX_REGISTERS_PER_REQUEST);
|
||||
|
||||
// 发送读取请求
|
||||
sendCustomRequest(currentAddress, batchSize);
|
||||
|
||||
// 接收并解析响应
|
||||
int[] batchValues = parseCustomResponse(batchSize);
|
||||
|
||||
// 添加到结果列表
|
||||
for (int value : batchValues) {
|
||||
allValues.add(value);
|
||||
}
|
||||
|
||||
// 更新剩余数量和当前地址
|
||||
remaining -= batchSize;
|
||||
currentAddress += batchSize;
|
||||
|
||||
// 添加小延迟,避免请求过快
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IOException("读取过程被中断", e);
|
||||
}
|
||||
}
|
||||
|
||||
// 转换为数组返回
|
||||
int[] result = new int[allValues.size()];
|
||||
for (int i = 0; i < allValues.size(); i++) {
|
||||
result[i] = allValues.get(i);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构造并发送自定义Modbus TCP请求帧
|
||||
*/
|
||||
private void sendCustomRequest(int startAddress, int registerCount) throws IOException {
|
||||
// 事务ID(递增)
|
||||
int currentTransactionId = transactionId++;
|
||||
|
||||
// 创建请求帧(12字节)
|
||||
ByteBuffer buffer = ByteBuffer.allocate(12)
|
||||
.order(ByteOrder.BIG_ENDIAN);
|
||||
|
||||
// Header(7字节)
|
||||
buffer.putShort((short) currentTransactionId); // 事务ID
|
||||
buffer.putShort((short) 0); // 协议ID(0=Modbus)
|
||||
buffer.putShort((short) 6); // 长度(后续字节数)
|
||||
buffer.put((byte) 1); // 单元ID
|
||||
|
||||
// PDU(协议数据单元)
|
||||
buffer.put(FUNCTION_CODE); // 功能码
|
||||
buffer.putShort((short) startAddress); // 起始地址
|
||||
buffer.putShort((short) registerCount); // 寄存器数量
|
||||
|
||||
output.write(buffer.array());
|
||||
output.flush();
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析自定义Modbus TCP响应
|
||||
*/
|
||||
private int[] parseCustomResponse(int expectedRegisterCount) throws IOException {
|
||||
// 读取头(7字节)
|
||||
byte[] header = initParse();
|
||||
|
||||
// 读取PDU(协议数据单元)
|
||||
int pduLength = ByteBuffer.wrap(header, 4, 2)
|
||||
.getShort() & 0xFFFF - 1; // 减去单元ID长度
|
||||
byte[] pdu = new byte[pduLength];
|
||||
input.readFully(pdu);
|
||||
|
||||
// 检查异常响应(功能码高位为1)
|
||||
if ((pdu[0] & 0xFF) == (FUNCTION_CODE | 0x80)) {
|
||||
throw new IOException("Modbus异常响应,错误码: " + (pdu[1] & 0xFF));
|
||||
}
|
||||
|
||||
// 验证功能码和字节数
|
||||
if (pdu[0] != FUNCTION_CODE || pdu[1] != expectedRegisterCount * 2) {
|
||||
throw new IOException("无效响应格式");
|
||||
}
|
||||
|
||||
// 提取寄存器数据(每个寄存器2字节)
|
||||
int[] values = new int[expectedRegisterCount];
|
||||
for (int i = 0; i < expectedRegisterCount; i++) {
|
||||
int offset = 2 + i * 2;
|
||||
values[i] = ByteBuffer.wrap(pdu, offset, 2)
|
||||
.order(ByteOrder.BIG_ENDIAN).getShort() & 0xFFFF;
|
||||
}
|
||||
return values;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user