Compare commits
2 Commits
9c0b0bad03
...
07b9e8b722
Author | SHA1 | Date | |
---|---|---|---|
07b9e8b722 | |||
b20828a800 |
@@ -125,6 +125,11 @@
|
||||
<artifactId>ruoyi-common-websocket</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@@ -47,7 +47,7 @@ public class TbLightInfoBo extends BaseEntity {
|
||||
/**
|
||||
* 楼层ID
|
||||
*/
|
||||
@NotNull(message = "层ID不能为空", groups = {AddGroup.class, EditGroup.class})
|
||||
@NotNull(message = "楼层ID不能为空", groups = {AddGroup.class, EditGroup.class})
|
||||
private Long floorId;
|
||||
|
||||
/**
|
||||
|
@@ -9,6 +9,8 @@ import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import jakarta.validation.constraints.*;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
|
||||
/**
|
||||
* 水电气业务对象 tb_meter_info
|
||||
*
|
||||
@@ -23,7 +25,7 @@ public class TbMeterInfoBo extends BaseEntity {
|
||||
/**
|
||||
* 主键id
|
||||
*/
|
||||
@NotNull(message = "主键id不能为空", groups = { EditGroup.class })
|
||||
@NotNull(message = "主键id不能为空", groups = {EditGroup.class})
|
||||
private Long id;
|
||||
|
||||
/**
|
||||
@@ -44,15 +46,42 @@ public class TbMeterInfoBo extends BaseEntity {
|
||||
/**
|
||||
* 设备类型(1-电表,2-水表,3-气表)
|
||||
*/
|
||||
@NotNull(message = "设备类型(1-电表,2-水表,3-气表)不能为空", groups = { AddGroup.class, EditGroup.class })
|
||||
@NotNull(message = "设备类型(1-电表,2-水表,3-气表)不能为空", groups = {AddGroup.class, EditGroup.class})
|
||||
private Long meterType;
|
||||
|
||||
/**
|
||||
* 计量单位(1-度,2-吨,3-立方米)
|
||||
*/
|
||||
@NotNull(message = "计量单位(1-度,2-吨,3-立方米)不能为空", groups = { AddGroup.class, EditGroup.class })
|
||||
@NotNull(message = "计量单位(1-度,2-吨,3-立方米)不能为空", groups = {AddGroup.class, EditGroup.class})
|
||||
private Long meterUnit;
|
||||
|
||||
/**
|
||||
* 采集器IP
|
||||
*/
|
||||
@NotNull(message = "采集器IP不能为空", groups = {AddGroup.class, EditGroup.class})
|
||||
private String hostIp;
|
||||
|
||||
/**
|
||||
* 楼层ID
|
||||
*/
|
||||
@NotNull(message = "楼层ID不能为空", groups = {AddGroup.class, EditGroup.class})
|
||||
private Long floorId;
|
||||
|
||||
/**
|
||||
* 园区编码
|
||||
*/
|
||||
private Long communityId;
|
||||
|
||||
/**
|
||||
* 建筑名称
|
||||
*/
|
||||
private Long buildingId;
|
||||
|
||||
/**
|
||||
* 单元编码
|
||||
*/
|
||||
private Long unitId;
|
||||
|
||||
/**
|
||||
* 安装位置
|
||||
*/
|
||||
@@ -61,12 +90,12 @@ public class TbMeterInfoBo extends BaseEntity {
|
||||
/**
|
||||
* 初始读数
|
||||
*/
|
||||
private Long initReading;
|
||||
private BigDecimal initReading;
|
||||
|
||||
/**
|
||||
* 最大量程
|
||||
*/
|
||||
private Long maxRang;
|
||||
private BigDecimal maxRang;
|
||||
|
||||
/**
|
||||
* 通信状态
|
||||
|
@@ -6,6 +6,7 @@ import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.math.BigDecimal;
|
||||
|
||||
/**
|
||||
* 水电气对象 tb_meter_info
|
||||
@@ -60,12 +61,12 @@ public class TbMeterInfo extends TenantEntity {
|
||||
/**
|
||||
* 初始读数
|
||||
*/
|
||||
private Long initReading;
|
||||
private BigDecimal initReading;
|
||||
|
||||
/**
|
||||
* 最大量程
|
||||
*/
|
||||
private Long maxRang;
|
||||
private BigDecimal maxRang;
|
||||
|
||||
/**
|
||||
* 通信状态
|
||||
@@ -82,5 +83,30 @@ public class TbMeterInfo extends TenantEntity {
|
||||
*/
|
||||
private String remark;
|
||||
|
||||
/**
|
||||
* 园区编码
|
||||
*/
|
||||
private Long communityId;
|
||||
|
||||
/**
|
||||
* 建筑名称
|
||||
*/
|
||||
private Long buildingId;
|
||||
|
||||
/**
|
||||
* 单元编码
|
||||
*/
|
||||
private Long unitId;
|
||||
|
||||
/**
|
||||
* 所属楼层ID
|
||||
*/
|
||||
private Long floorId;
|
||||
|
||||
/**
|
||||
* 采集器IP
|
||||
*/
|
||||
private String hostIp;
|
||||
|
||||
|
||||
}
|
||||
|
@@ -10,7 +10,7 @@ import lombok.Data;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.io.Serializable;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
|
||||
|
||||
/**
|
||||
@@ -54,7 +54,7 @@ public class TbMeterInfoVo implements Serializable {
|
||||
/**
|
||||
* 设备类型(1-电表,2-水表,3-气表)
|
||||
*/
|
||||
@ExcelProperty(value = "设备类型(1-电表,2-水表,3-气表)" ,converter = ExcelDictConvert.class)
|
||||
@ExcelProperty(value = "设备类型(1-电表,2-水表,3-气表)", converter = ExcelDictConvert.class)
|
||||
@ExcelDictFormat(dictType = "meter_type")
|
||||
private Long meterType;
|
||||
|
||||
@@ -75,13 +75,13 @@ public class TbMeterInfoVo implements Serializable {
|
||||
* 初始读数
|
||||
*/
|
||||
@ExcelProperty(value = "初始读数")
|
||||
private Long initReading;
|
||||
private BigDecimal initReading;
|
||||
|
||||
/**
|
||||
* 最大量程
|
||||
*/
|
||||
@ExcelProperty(value = "最大量程")
|
||||
private Long maxRang;
|
||||
private BigDecimal maxRang;
|
||||
|
||||
/**
|
||||
* 通信状态
|
||||
@@ -97,6 +97,42 @@ public class TbMeterInfoVo implements Serializable {
|
||||
@ExcelDictFormat(dictType = "sis_device_status")
|
||||
private Long runningState;
|
||||
|
||||
/**
|
||||
* 园区编码
|
||||
*/
|
||||
@ExcelProperty(value = "园区编码")
|
||||
private Long communityId;
|
||||
|
||||
/**
|
||||
* 建筑名称
|
||||
*/
|
||||
@ExcelProperty(value = "建筑名称")
|
||||
private Long buildingId;
|
||||
|
||||
/**
|
||||
* 单元编码
|
||||
*/
|
||||
@ExcelProperty(value = "单元编码")
|
||||
private Long unitId;
|
||||
|
||||
/**
|
||||
* 楼层ID
|
||||
*/
|
||||
@ExcelProperty(value = "楼层ID")
|
||||
private Long floorId;
|
||||
|
||||
/**
|
||||
* 楼层
|
||||
*/
|
||||
@ExcelProperty(value = "楼层")
|
||||
private String floorName;
|
||||
|
||||
/**
|
||||
* 采集器IP
|
||||
*/
|
||||
@ExcelProperty(value = "采集器IP")
|
||||
private String hostIp;
|
||||
|
||||
/**
|
||||
* 备注
|
||||
*/
|
||||
|
@@ -1,5 +1,6 @@
|
||||
package org.dromara.property.mapper.smartDevicesMapper;
|
||||
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
import org.dromara.property.domain.entity.smartDevices.TbMeterInfo;
|
||||
import org.dromara.property.domain.vo.smartDevicesVo.TbMeterInfoVo;
|
||||
import org.dromara.common.mybatis.core.mapper.BaseMapperPlus;
|
||||
@@ -8,8 +9,9 @@ import org.dromara.common.mybatis.core.mapper.BaseMapperPlus;
|
||||
* 水电气Mapper接口
|
||||
*
|
||||
* @author lsm
|
||||
* @date 2025-07-19
|
||||
* @since 2025-07-19
|
||||
*/
|
||||
@Mapper
|
||||
public interface TbMeterInfoMapper extends BaseMapperPlus<TbMeterInfo, TbMeterInfoVo> {
|
||||
|
||||
}
|
||||
|
@@ -0,0 +1,18 @@
|
||||
package org.dromara.property.rocketmq;
|
||||
|
||||
/**
|
||||
* @author lsm
|
||||
* @apiNote RocketMqConstants
|
||||
* @since 2025/8/25
|
||||
*/
|
||||
public interface RocketMqConstants {
|
||||
// mq topic
|
||||
String TOPIC = "SmartParks";
|
||||
|
||||
// mq GROUP
|
||||
String GROUP = "Meter-Group";
|
||||
|
||||
/*-----------------------------------消息tag------------------------------------*/
|
||||
String METER_RECORD = "METER_RECORD_TAG";
|
||||
|
||||
}
|
@@ -0,0 +1,40 @@
|
||||
package org.dromara.property.rocketmq.consumer;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.dromara.property.rocketmq.RocketMqConstants;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
* @author lsm
|
||||
* @apiNote MeterRecordConsumer
|
||||
* @since 2025/8/25
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@RocketMQMessageListener(
|
||||
topic = RocketMqConstants.TOPIC,
|
||||
consumerGroup = RocketMqConstants.GROUP,
|
||||
selectorExpression = RocketMqConstants.METER_RECORD
|
||||
)
|
||||
public class MeterRecordConsumer implements RocketMQListener<MessageExt> {
|
||||
|
||||
@Override
|
||||
public void onMessage(MessageExt ext) {
|
||||
log.info("消费仪表上报数据,数据长度={}", ext.getBody().length);
|
||||
try {
|
||||
String message = new String(ext.getBody());
|
||||
log.info("物业仪表上报数据,数据={}", message);
|
||||
} catch (Exception e) {
|
||||
log.error("消费仪表上报数据处理失败,", e);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@@ -10,6 +10,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.dromara.property.domain.bo.TbFloorBo;
|
||||
import org.dromara.property.domain.vo.TbFloorVo;
|
||||
import org.dromara.property.service.ITbFloorService;
|
||||
import org.springframework.stereotype.Service;
|
||||
@@ -23,6 +24,7 @@ import org.springframework.transaction.annotation.Transactional;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Collection;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 灯控开关信息Service业务层处理
|
||||
@@ -60,7 +62,19 @@ public class TbLightInfoServiceImpl implements ITbLightInfoService {
|
||||
public TableDataInfo<TbLightInfoVo> queryPageList(TbLightInfoBo bo, PageQuery pageQuery) {
|
||||
LambdaQueryWrapper<TbLightInfo> lqw = buildQueryWrapper(bo);
|
||||
Page<TbLightInfoVo> result = baseMapper.selectVoPage(pageQuery.build(), lqw);
|
||||
result.getRecords().forEach(r -> r.setFloorName(floorService.queryById(r.getFloorId()).getFloorName()));
|
||||
|
||||
// 创建楼层ID到楼层名称的映射,避免重复查询
|
||||
List<TbFloorVo> floorVo = floorService.queryList(new TbFloorBo());
|
||||
Map<Long, String> floorMap = floorVo.stream()
|
||||
.collect(Collectors.toMap(TbFloorVo::getId, TbFloorVo::getFloorName, (key1, key2) -> key1));
|
||||
|
||||
// 为每个灯控信息设置楼层名称
|
||||
result.getRecords().forEach(record -> {
|
||||
String floorName = floorMap.get(record.getFloorId());
|
||||
if (floorName != null) {
|
||||
record.setFloorName(floorName);
|
||||
}
|
||||
});
|
||||
return TableDataInfo.build(result);
|
||||
}
|
||||
|
||||
@@ -151,7 +165,7 @@ public class TbLightInfoServiceImpl implements ITbLightInfoService {
|
||||
*/
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public Boolean switchSingleLight(TbLightInfoBo bo){
|
||||
public Boolean switchSingleLight(TbLightInfoBo bo) {
|
||||
TbLightInfo update = MapstructUtils.convert(bo, TbLightInfo.class);
|
||||
boolean flag = baseMapper.updateById(update) > 0;
|
||||
Assert.isTrue(flag, "修改灯开关失败");
|
||||
|
@@ -1,5 +1,6 @@
|
||||
package org.dromara.property.service.impl.smartDevicesImpl;
|
||||
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import org.dromara.common.core.utils.MapstructUtils;
|
||||
import org.dromara.common.core.utils.StringUtils;
|
||||
import org.dromara.common.mybatis.core.page.TableDataInfo;
|
||||
@@ -9,16 +10,21 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.dromara.property.domain.bo.TbFloorBo;
|
||||
import org.dromara.property.domain.vo.TbFloorVo;
|
||||
import org.dromara.property.service.ITbFloorService;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.dromara.property.domain.bo.smartDevicesBo.TbMeterInfoBo;
|
||||
import org.dromara.property.domain.vo.smartDevicesVo.TbMeterInfoVo;
|
||||
import org.dromara.property.domain.entity.smartDevices.TbMeterInfo;
|
||||
import org.dromara.property.mapper.smartDevicesMapper.TbMeterInfoMapper;
|
||||
import org.dromara.property.service.smartDevicesService.ITbMeterInfoService;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Collection;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 水电气Service业务层处理
|
||||
@@ -27,11 +33,12 @@ import java.util.Collection;
|
||||
* @since 2025-07-19
|
||||
*/
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class TbMeterInfoServiceImpl implements ITbMeterInfoService {
|
||||
|
||||
private final TbMeterInfoMapper baseMapper;
|
||||
private final ITbFloorService floorService;
|
||||
|
||||
/**
|
||||
* 查询水电气
|
||||
@@ -40,7 +47,7 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService {
|
||||
* @return 水电气
|
||||
*/
|
||||
@Override
|
||||
public TbMeterInfoVo queryById(Long id){
|
||||
public TbMeterInfoVo queryById(Long id) {
|
||||
return baseMapper.selectVoById(id);
|
||||
}
|
||||
|
||||
@@ -55,6 +62,19 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService {
|
||||
public TableDataInfo<TbMeterInfoVo> queryPageList(TbMeterInfoBo bo, PageQuery pageQuery) {
|
||||
LambdaQueryWrapper<TbMeterInfo> lqw = buildQueryWrapper(bo);
|
||||
Page<TbMeterInfoVo> result = baseMapper.selectVoPage(pageQuery.build(), lqw);
|
||||
// 创建楼层ID到楼层名称的映射,避免重复查询
|
||||
List<TbFloorVo> floorList = floorService.queryList(new TbFloorBo());
|
||||
Map<Long, String> floorMap = floorList.stream()
|
||||
.collect(Collectors.toMap(TbFloorVo::getId, TbFloorVo::getFloorName, (key1, key2) -> key1));
|
||||
|
||||
// 为每个灯控信息设置楼层名称
|
||||
result.getRecords().forEach(record -> {
|
||||
String floorName = floorMap.get(record.getFloorId());
|
||||
if (floorName != null) {
|
||||
record.setFloorName(floorName);
|
||||
}
|
||||
});
|
||||
|
||||
return TableDataInfo.build(result);
|
||||
}
|
||||
|
||||
@@ -94,13 +114,15 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService {
|
||||
* @return 是否新增成功
|
||||
*/
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public Boolean insertByBo(TbMeterInfoBo bo) {
|
||||
TbMeterInfo add = MapstructUtils.convert(bo, TbMeterInfo.class);
|
||||
validEntityBeforeSave(add);
|
||||
assert add != null;
|
||||
TbFloorVo floor = floorService.queryById(add.getFloorId());
|
||||
add.setBuildingId(floor.getBuildingId());
|
||||
add.setCommunityId(floor.getCommunityId());
|
||||
boolean flag = baseMapper.insert(add) > 0;
|
||||
if (flag) {
|
||||
bo.setId(add.getId());
|
||||
}
|
||||
Assert.isTrue(flag, "新增水电气信息失败");
|
||||
return flag;
|
||||
}
|
||||
|
||||
@@ -120,7 +142,7 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService {
|
||||
/**
|
||||
* 保存前的数据校验
|
||||
*/
|
||||
private void validEntityBeforeSave(TbMeterInfo entity){
|
||||
private void validEntityBeforeSave(TbMeterInfo entity) {
|
||||
//TODO 做一些数据校验,如唯一约束
|
||||
}
|
||||
|
||||
@@ -133,7 +155,7 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService {
|
||||
*/
|
||||
@Override
|
||||
public Boolean deleteWithValidByIds(Collection<Long> ids, Boolean isValid) {
|
||||
if(isValid){
|
||||
if (isValid) {
|
||||
//TODO 做一些业务上的校验,判断是否需要校验
|
||||
}
|
||||
return baseMapper.deleteByIds(ids) > 0;
|
||||
|
@@ -0,0 +1,75 @@
|
||||
package org.dromara.sis.config;
|
||||
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @author lsm
|
||||
* @apiNote RocketMQClusterConfig
|
||||
* @since 2025/8/26
|
||||
*/
|
||||
@Configuration
|
||||
public class RocketMQClusterConfig {
|
||||
|
||||
// 从配置文件中读取 cluster 的配置
|
||||
@Value("${rocketmq1.cluster.name-server}")
|
||||
private String nameServer1;
|
||||
|
||||
@Value("${rocketmq1.cluster.producer.group}")
|
||||
private String producerGroup1;
|
||||
|
||||
// 为第一个集群创建生产者实例
|
||||
@Bean({"clusterProducerOne"})
|
||||
public DefaultMQProducer clusterProducerOne() throws Exception {
|
||||
DefaultMQProducer producer = new DefaultMQProducer(producerGroup1);
|
||||
producer.setNamesrvAddr(nameServer1);
|
||||
// 设置发送超时时间
|
||||
producer.setSendMsgTimeout(5000);
|
||||
// 设置重试次数
|
||||
producer.setRetryTimesWhenSendFailed(2);
|
||||
producer.setRetryTimesWhenSendAsyncFailed(2);
|
||||
return producer;
|
||||
}
|
||||
|
||||
// 使用上面的生产者实例创建 RocketMQTemplate
|
||||
@Bean("rocketMQTemplateClusterOne")
|
||||
public RocketMQTemplate rocketMQTemplateClusterOne(@Qualifier("clusterProducerOne") DefaultMQProducer producer) {
|
||||
RocketMQTemplate template = new RocketMQTemplate();
|
||||
template.setProducer(producer);
|
||||
return template;
|
||||
}
|
||||
|
||||
|
||||
// 从配置文件中读取 cluster 的配置
|
||||
@Value("${rocketmq2.cluster.name-server}")
|
||||
private String nameServer2;
|
||||
|
||||
@Value("${rocketmq2.cluster.producer.group}")
|
||||
private String producerGroup2;
|
||||
|
||||
// 为第二个集群创建生产者实例
|
||||
@Bean({"clusterProducerTwo"})
|
||||
public DefaultMQProducer clusterProducerTwo() throws Exception {
|
||||
DefaultMQProducer producer = new DefaultMQProducer(producerGroup2);
|
||||
producer.setNamesrvAddr(nameServer2);
|
||||
// 设置发送超时时间
|
||||
producer.setSendMsgTimeout(5000);
|
||||
// 设置重试次数
|
||||
producer.setRetryTimesWhenSendFailed(2);
|
||||
producer.setRetryTimesWhenSendAsyncFailed(2);
|
||||
return producer;
|
||||
}
|
||||
|
||||
// 使用上面的生产者实例创建 RocketMQTemplate
|
||||
@Bean("rocketMQTemplateClusterTwo")
|
||||
public RocketMQTemplate rocketMQTemplateClusterTwo(@Qualifier("clusterProducerTwo") DefaultMQProducer producer) {
|
||||
RocketMQTemplate template = new RocketMQTemplate();
|
||||
template.setProducer(producer);
|
||||
return template;
|
||||
}
|
||||
|
||||
}
|
@@ -10,7 +10,7 @@ public interface RocketMqConstants {
|
||||
String TOPIC = "SmartParks";
|
||||
|
||||
// mq GROUP
|
||||
String GROUP = "SmartParksEqp";
|
||||
String GROUP = "SmartParks";
|
||||
|
||||
/*-----------------------------------消息tag------------------------------------*/
|
||||
String HIKADD = "ADD_HIK_DEVICE_TAG";
|
||||
@@ -19,4 +19,6 @@ public interface RocketMqConstants {
|
||||
// 人脸比对
|
||||
String FACECOMPARE = "FACE_COMPARE_REPORT";
|
||||
|
||||
String METER_RECORD = "METER_RECORD_TAG";
|
||||
|
||||
}
|
||||
|
@@ -9,6 +9,7 @@ import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.dromara.sis.rocketmq.RocketMqConstants;
|
||||
import org.dromara.sis.rocketmq.domain.FaceCapture;
|
||||
import org.dromara.sis.service.IZeroSensationPassageService;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
@@ -23,7 +24,8 @@ import org.springframework.stereotype.Component;
|
||||
@RocketMQMessageListener(
|
||||
topic = RocketMqConstants.TOPIC,
|
||||
consumerGroup = RocketMqConstants.GROUP,
|
||||
selectorExpression = RocketMqConstants.FACECAPTURE
|
||||
selectorExpression = RocketMqConstants.FACECAPTURE,
|
||||
nameServer = "${rocketmq2.cluster.name-server}"
|
||||
)
|
||||
public class FaceCaptureConsumer implements RocketMQListener<MessageExt> {
|
||||
|
||||
@@ -34,7 +36,7 @@ public class FaceCaptureConsumer implements RocketMQListener<MessageExt> {
|
||||
log.info("消费人脸抓拍数据,数据长度={}", ext.getBody().length);
|
||||
try {
|
||||
FaceCapture capture = JSONObject.parseObject(ext.getBody(), FaceCapture.class);
|
||||
zeroSensationPassageService.pass(capture.getDeviceIp(), capture.getSmallImg(), capture.getBigImg());
|
||||
// zeroSensationPassageService.pass(capture.getDeviceIp(), capture.getSmallImg(), capture.getBigImg());
|
||||
} catch (Exception e) {
|
||||
log.error("消费人脸抓拍数据处理失败,", e);
|
||||
}
|
||||
|
@@ -22,7 +22,8 @@ import org.springframework.stereotype.Component;
|
||||
@RocketMQMessageListener(
|
||||
topic = RocketMqConstants.TOPIC,
|
||||
consumerGroup = RocketMqConstants.GROUP,
|
||||
selectorExpression = RocketMqConstants.FACECOMPARE
|
||||
selectorExpression = RocketMqConstants.FACECOMPARE,
|
||||
nameServer = "${rocketmq2.cluster.name-server}"
|
||||
)
|
||||
public class FaceCompareConsumer implements RocketMQListener<MessageExt> {
|
||||
|
||||
|
@@ -0,0 +1,38 @@
|
||||
package org.dromara.sis.rocketmq.consumer;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.dromara.sis.rocketmq.RocketMqConstants;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author lsm
|
||||
* @apiNote MeterRecordConsumer
|
||||
* @since 2025/8/25
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@RocketMQMessageListener(
|
||||
consumerGroup = "Meter_Group",
|
||||
topic = RocketMqConstants.TOPIC,
|
||||
selectorExpression = RocketMqConstants.METER_RECORD,
|
||||
nameServer = "${rocketmq2.cluster.name-server}"
|
||||
)
|
||||
public class MeterRecordConsumer implements RocketMQListener<MessageExt> {
|
||||
|
||||
@Override
|
||||
public void onMessage(MessageExt ext) {
|
||||
log.info("消费仪表上报数据,数据长度={}", ext.getBody().length);
|
||||
try {
|
||||
String message = new String(ext.getBody());
|
||||
log.info("消费仪表上报数据,数据={}", message);
|
||||
} catch (Exception e) {
|
||||
log.error("消费仪表上报数据处理失败,", e);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@@ -0,0 +1,69 @@
|
||||
package org.dromara.sis.rocketmq.producer;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.common.message.Message;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author lsm
|
||||
* @apiNote ProducerService
|
||||
* @since 2025/8/26
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class ProducerService {
|
||||
|
||||
@Autowired
|
||||
@Qualifier("rocketMQTemplateClusterOne")
|
||||
private RocketMQTemplate rocketMQTemplateClusterOne;
|
||||
|
||||
@Autowired
|
||||
// @Qualifier("rocketMQTemplateClusterTwo")
|
||||
private RocketMQTemplate rocketMQTemplateClusterTwo;
|
||||
|
||||
/**
|
||||
* 向mq写入消息
|
||||
*
|
||||
* @param topic 消息topic
|
||||
* @param tag 消息tag
|
||||
* @param msg 消息
|
||||
*/
|
||||
public void defaultSend(String topic, String tag, String msg) {
|
||||
try {
|
||||
String destination = topic + ":" + tag;
|
||||
log.info("准备向默认RocketMQ发送消息,目的地:{}", destination);
|
||||
|
||||
// 使用 RocketMQTemplate 的同步发送方法
|
||||
rocketMQTemplateClusterOne.syncSend(destination, msg);
|
||||
|
||||
log.info("发送RocketMQ消息成功");
|
||||
} catch (Exception e) {
|
||||
log.error("发送RocketMQ消息失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 向mq写入消息
|
||||
*
|
||||
* @param topic 消息topic
|
||||
* @param tag 消息tag
|
||||
* @param msg 消息
|
||||
*/
|
||||
public void clusterSend(String topic, String tag, String msg) {
|
||||
try {
|
||||
String destination = topic + ":" + tag;
|
||||
log.info("准备向集群2 RocketMQ发送消息,目的地:{}", destination);
|
||||
|
||||
// 使用 RocketMQTemplate 的同步发送方法
|
||||
rocketMQTemplateClusterTwo.syncSend(destination, msg);
|
||||
|
||||
log.info("发送ClusterRocketMQ消息成功");
|
||||
} catch (Exception e) {
|
||||
log.error("发送RocketMQ消息失败", e);
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user