From b20828a800c7172e78526137e5ee3f9c2bb6b9f1 Mon Sep 17 00:00:00 2001 From: zcxlsm Date: Tue, 26 Aug 2025 20:32:51 +0800 Subject: [PATCH] =?UTF-8?q?feat(rocketmq):=20=E6=B7=BB=E5=8A=A0=E4=BB=AA?= =?UTF-8?q?=E8=A1=A8=E8=AE=B0=E5=BD=95=E6=B6=88=E8=B4=B9=E8=80=85=E5=92=8C?= =?UTF-8?q?=E7=94=9F=E4=BA=A7=E8=80=85=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ruoyi-modules/Property/pom.xml | 5 ++ .../bo/smartDevicesBo/TbLightInfoBo.java | 2 +- .../bo/smartDevicesBo/TbMeterInfoBo.java | 39 ++++++++-- .../entity/smartDevices/TbMeterInfo.java | 30 +++++++- .../vo/smartDevicesVo/TbMeterInfoVo.java | 44 ++++++++++- .../smartDevicesMapper/TbMeterInfoMapper.java | 4 +- .../property/rocketmq/RocketMqConstants.java | 18 +++++ .../consumer/MeterRecordConsumer.java | 40 ++++++++++ .../TbLightInfoServiceImpl.java | 18 ++++- .../TbMeterInfoServiceImpl.java | 38 ++++++++-- .../sis/config/RocketMQClusterConfig.java | 75 +++++++++++++++++++ .../sis/rocketmq/RocketMqConstants.java | 4 +- .../consumer/FaceCaptureConsumer.java | 6 +- .../consumer/FaceCompareConsumer.java | 5 +- .../consumer/MeterRecordConsumer.java | 38 ++++++++++ .../rocketmq/producer/ProducerService.java | 69 +++++++++++++++++ 16 files changed, 407 insertions(+), 28 deletions(-) create mode 100644 ruoyi-modules/Property/src/main/java/org/dromara/property/rocketmq/RocketMqConstants.java create mode 100644 ruoyi-modules/Property/src/main/java/org/dromara/property/rocketmq/consumer/MeterRecordConsumer.java create mode 100644 ruoyi-modules/Sis/src/main/java/org/dromara/sis/config/RocketMQClusterConfig.java create mode 100644 ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/consumer/MeterRecordConsumer.java create mode 100644 ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/producer/ProducerService.java diff --git a/ruoyi-modules/Property/pom.xml b/ruoyi-modules/Property/pom.xml index 4164e16b..4135f974 100644 --- a/ruoyi-modules/Property/pom.xml +++ b/ruoyi-modules/Property/pom.xml @@ -125,6 +125,11 @@ ruoyi-common-websocket + + org.apache.rocketmq + rocketmq-spring-boot-starter + + diff --git a/ruoyi-modules/Property/src/main/java/org/dromara/property/domain/bo/smartDevicesBo/TbLightInfoBo.java b/ruoyi-modules/Property/src/main/java/org/dromara/property/domain/bo/smartDevicesBo/TbLightInfoBo.java index 6a16cd7f..cbec0af7 100644 --- a/ruoyi-modules/Property/src/main/java/org/dromara/property/domain/bo/smartDevicesBo/TbLightInfoBo.java +++ b/ruoyi-modules/Property/src/main/java/org/dromara/property/domain/bo/smartDevicesBo/TbLightInfoBo.java @@ -47,7 +47,7 @@ public class TbLightInfoBo extends BaseEntity { /** * 楼层ID */ - @NotNull(message = "层ID不能为空", groups = {AddGroup.class, EditGroup.class}) + @NotNull(message = "楼层ID不能为空", groups = {AddGroup.class, EditGroup.class}) private Long floorId; /** diff --git a/ruoyi-modules/Property/src/main/java/org/dromara/property/domain/bo/smartDevicesBo/TbMeterInfoBo.java b/ruoyi-modules/Property/src/main/java/org/dromara/property/domain/bo/smartDevicesBo/TbMeterInfoBo.java index aceea63f..73db1b9f 100644 --- a/ruoyi-modules/Property/src/main/java/org/dromara/property/domain/bo/smartDevicesBo/TbMeterInfoBo.java +++ b/ruoyi-modules/Property/src/main/java/org/dromara/property/domain/bo/smartDevicesBo/TbMeterInfoBo.java @@ -9,6 +9,8 @@ import lombok.Data; import lombok.EqualsAndHashCode; import jakarta.validation.constraints.*; +import java.math.BigDecimal; + /** * 水电气业务对象 tb_meter_info * @@ -23,7 +25,7 @@ public class TbMeterInfoBo extends BaseEntity { /** * 主键id */ - @NotNull(message = "主键id不能为空", groups = { EditGroup.class }) + @NotNull(message = "主键id不能为空", groups = {EditGroup.class}) private Long id; /** @@ -44,15 +46,42 @@ public class TbMeterInfoBo extends BaseEntity { /** * 设备类型(1-电表,2-水表,3-气表) */ - @NotNull(message = "设备类型(1-电表,2-水表,3-气表)不能为空", groups = { AddGroup.class, EditGroup.class }) + @NotNull(message = "设备类型(1-电表,2-水表,3-气表)不能为空", groups = {AddGroup.class, EditGroup.class}) private Long meterType; /** * 计量单位(1-度,2-吨,3-立方米) */ - @NotNull(message = "计量单位(1-度,2-吨,3-立方米)不能为空", groups = { AddGroup.class, EditGroup.class }) + @NotNull(message = "计量单位(1-度,2-吨,3-立方米)不能为空", groups = {AddGroup.class, EditGroup.class}) private Long meterUnit; + /** + * 采集器IP + */ + @NotNull(message = "采集器IP不能为空", groups = {AddGroup.class, EditGroup.class}) + private String hostIp; + + /** + * 楼层ID + */ + @NotNull(message = "楼层ID不能为空", groups = {AddGroup.class, EditGroup.class}) + private Long floorId; + + /** + * 园区编码 + */ + private Long communityId; + + /** + * 建筑名称 + */ + private Long buildingId; + + /** + * 单元编码 + */ + private Long unitId; + /** * 安装位置 */ @@ -61,12 +90,12 @@ public class TbMeterInfoBo extends BaseEntity { /** * 初始读数 */ - private Long initReading; + private BigDecimal initReading; /** * 最大量程 */ - private Long maxRang; + private BigDecimal maxRang; /** * 通信状态 diff --git a/ruoyi-modules/Property/src/main/java/org/dromara/property/domain/entity/smartDevices/TbMeterInfo.java b/ruoyi-modules/Property/src/main/java/org/dromara/property/domain/entity/smartDevices/TbMeterInfo.java index cfdc1ab8..f363203d 100644 --- a/ruoyi-modules/Property/src/main/java/org/dromara/property/domain/entity/smartDevices/TbMeterInfo.java +++ b/ruoyi-modules/Property/src/main/java/org/dromara/property/domain/entity/smartDevices/TbMeterInfo.java @@ -6,6 +6,7 @@ import lombok.Data; import lombok.EqualsAndHashCode; import java.io.Serial; +import java.math.BigDecimal; /** * 水电气对象 tb_meter_info @@ -60,12 +61,12 @@ public class TbMeterInfo extends TenantEntity { /** * 初始读数 */ - private Long initReading; + private BigDecimal initReading; /** * 最大量程 */ - private Long maxRang; + private BigDecimal maxRang; /** * 通信状态 @@ -82,5 +83,30 @@ public class TbMeterInfo extends TenantEntity { */ private String remark; + /** + * 园区编码 + */ + private Long communityId; + + /** + * 建筑名称 + */ + private Long buildingId; + + /** + * 单元编码 + */ + private Long unitId; + + /** + * 所属楼层ID + */ + private Long floorId; + + /** + * 采集器IP + */ + private String hostIp; + } diff --git a/ruoyi-modules/Property/src/main/java/org/dromara/property/domain/vo/smartDevicesVo/TbMeterInfoVo.java b/ruoyi-modules/Property/src/main/java/org/dromara/property/domain/vo/smartDevicesVo/TbMeterInfoVo.java index 6e26f2f5..467f85c3 100644 --- a/ruoyi-modules/Property/src/main/java/org/dromara/property/domain/vo/smartDevicesVo/TbMeterInfoVo.java +++ b/ruoyi-modules/Property/src/main/java/org/dromara/property/domain/vo/smartDevicesVo/TbMeterInfoVo.java @@ -10,7 +10,7 @@ import lombok.Data; import java.io.Serial; import java.io.Serializable; - +import java.math.BigDecimal; /** @@ -54,7 +54,7 @@ public class TbMeterInfoVo implements Serializable { /** * 设备类型(1-电表,2-水表,3-气表) */ - @ExcelProperty(value = "设备类型(1-电表,2-水表,3-气表)" ,converter = ExcelDictConvert.class) + @ExcelProperty(value = "设备类型(1-电表,2-水表,3-气表)", converter = ExcelDictConvert.class) @ExcelDictFormat(dictType = "meter_type") private Long meterType; @@ -75,13 +75,13 @@ public class TbMeterInfoVo implements Serializable { * 初始读数 */ @ExcelProperty(value = "初始读数") - private Long initReading; + private BigDecimal initReading; /** * 最大量程 */ @ExcelProperty(value = "最大量程") - private Long maxRang; + private BigDecimal maxRang; /** * 通信状态 @@ -97,6 +97,42 @@ public class TbMeterInfoVo implements Serializable { @ExcelDictFormat(dictType = "sis_device_status") private Long runningState; + /** + * 园区编码 + */ + @ExcelProperty(value = "园区编码") + private Long communityId; + + /** + * 建筑名称 + */ + @ExcelProperty(value = "建筑名称") + private Long buildingId; + + /** + * 单元编码 + */ + @ExcelProperty(value = "单元编码") + private Long unitId; + + /** + * 楼层ID + */ + @ExcelProperty(value = "楼层ID") + private Long floorId; + + /** + * 楼层 + */ + @ExcelProperty(value = "楼层") + private String floorName; + + /** + * 采集器IP + */ + @ExcelProperty(value = "采集器IP") + private String hostIp; + /** * 备注 */ diff --git a/ruoyi-modules/Property/src/main/java/org/dromara/property/mapper/smartDevicesMapper/TbMeterInfoMapper.java b/ruoyi-modules/Property/src/main/java/org/dromara/property/mapper/smartDevicesMapper/TbMeterInfoMapper.java index 9088dabd..a9401847 100644 --- a/ruoyi-modules/Property/src/main/java/org/dromara/property/mapper/smartDevicesMapper/TbMeterInfoMapper.java +++ b/ruoyi-modules/Property/src/main/java/org/dromara/property/mapper/smartDevicesMapper/TbMeterInfoMapper.java @@ -1,5 +1,6 @@ package org.dromara.property.mapper.smartDevicesMapper; +import org.apache.ibatis.annotations.Mapper; import org.dromara.property.domain.entity.smartDevices.TbMeterInfo; import org.dromara.property.domain.vo.smartDevicesVo.TbMeterInfoVo; import org.dromara.common.mybatis.core.mapper.BaseMapperPlus; @@ -8,8 +9,9 @@ import org.dromara.common.mybatis.core.mapper.BaseMapperPlus; * 水电气Mapper接口 * * @author lsm - * @date 2025-07-19 + * @since 2025-07-19 */ +@Mapper public interface TbMeterInfoMapper extends BaseMapperPlus { } diff --git a/ruoyi-modules/Property/src/main/java/org/dromara/property/rocketmq/RocketMqConstants.java b/ruoyi-modules/Property/src/main/java/org/dromara/property/rocketmq/RocketMqConstants.java new file mode 100644 index 00000000..ab77d746 --- /dev/null +++ b/ruoyi-modules/Property/src/main/java/org/dromara/property/rocketmq/RocketMqConstants.java @@ -0,0 +1,18 @@ +package org.dromara.property.rocketmq; + +/** + * @author lsm + * @apiNote RocketMqConstants + * @since 2025/8/25 + */ +public interface RocketMqConstants { + // mq topic + String TOPIC = "SmartParks"; + + // mq GROUP + String GROUP = "Meter-Group"; + + /*-----------------------------------消息tag------------------------------------*/ + String METER_RECORD = "METER_RECORD_TAG"; + +} diff --git a/ruoyi-modules/Property/src/main/java/org/dromara/property/rocketmq/consumer/MeterRecordConsumer.java b/ruoyi-modules/Property/src/main/java/org/dromara/property/rocketmq/consumer/MeterRecordConsumer.java new file mode 100644 index 00000000..75aa5189 --- /dev/null +++ b/ruoyi-modules/Property/src/main/java/org/dromara/property/rocketmq/consumer/MeterRecordConsumer.java @@ -0,0 +1,40 @@ +package org.dromara.property.rocketmq.consumer; + +import com.alibaba.fastjson.JSONObject; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.dromara.property.rocketmq.RocketMqConstants; +import org.springframework.stereotype.Component; + +import java.util.Arrays; + +/** + * @author lsm + * @apiNote MeterRecordConsumer + * @since 2025/8/25 + */ +@Slf4j +@Component +@RequiredArgsConstructor +@RocketMQMessageListener( + topic = RocketMqConstants.TOPIC, + consumerGroup = RocketMqConstants.GROUP, + selectorExpression = RocketMqConstants.METER_RECORD +) +public class MeterRecordConsumer implements RocketMQListener { + + @Override + public void onMessage(MessageExt ext) { + log.info("消费仪表上报数据,数据长度={}", ext.getBody().length); + try { + String message = new String(ext.getBody()); + log.info("物业仪表上报数据,数据={}", message); + } catch (Exception e) { + log.error("消费仪表上报数据处理失败,", e); + } + + } +} diff --git a/ruoyi-modules/Property/src/main/java/org/dromara/property/service/impl/smartDevicesImpl/TbLightInfoServiceImpl.java b/ruoyi-modules/Property/src/main/java/org/dromara/property/service/impl/smartDevicesImpl/TbLightInfoServiceImpl.java index 27b80192..12ccb6cc 100644 --- a/ruoyi-modules/Property/src/main/java/org/dromara/property/service/impl/smartDevicesImpl/TbLightInfoServiceImpl.java +++ b/ruoyi-modules/Property/src/main/java/org/dromara/property/service/impl/smartDevicesImpl/TbLightInfoServiceImpl.java @@ -10,6 +10,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.dromara.property.domain.bo.TbFloorBo; import org.dromara.property.domain.vo.TbFloorVo; import org.dromara.property.service.ITbFloorService; import org.springframework.stereotype.Service; @@ -23,6 +24,7 @@ import org.springframework.transaction.annotation.Transactional; import java.util.List; import java.util.Map; import java.util.Collection; +import java.util.stream.Collectors; /** * 灯控开关信息Service业务层处理 @@ -60,7 +62,19 @@ public class TbLightInfoServiceImpl implements ITbLightInfoService { public TableDataInfo queryPageList(TbLightInfoBo bo, PageQuery pageQuery) { LambdaQueryWrapper lqw = buildQueryWrapper(bo); Page result = baseMapper.selectVoPage(pageQuery.build(), lqw); - result.getRecords().forEach(r -> r.setFloorName(floorService.queryById(r.getFloorId()).getFloorName())); + + // 创建楼层ID到楼层名称的映射,避免重复查询 + List floorVo = floorService.queryList(new TbFloorBo()); + Map floorMap = floorVo.stream() + .collect(Collectors.toMap(TbFloorVo::getId, TbFloorVo::getFloorName, (key1, key2) -> key1)); + + // 为每个灯控信息设置楼层名称 + result.getRecords().forEach(record -> { + String floorName = floorMap.get(record.getFloorId()); + if (floorName != null) { + record.setFloorName(floorName); + } + }); return TableDataInfo.build(result); } @@ -151,7 +165,7 @@ public class TbLightInfoServiceImpl implements ITbLightInfoService { */ @Override @Transactional(rollbackFor = Exception.class) - public Boolean switchSingleLight(TbLightInfoBo bo){ + public Boolean switchSingleLight(TbLightInfoBo bo) { TbLightInfo update = MapstructUtils.convert(bo, TbLightInfo.class); boolean flag = baseMapper.updateById(update) > 0; Assert.isTrue(flag, "修改灯开关失败"); diff --git a/ruoyi-modules/Property/src/main/java/org/dromara/property/service/impl/smartDevicesImpl/TbMeterInfoServiceImpl.java b/ruoyi-modules/Property/src/main/java/org/dromara/property/service/impl/smartDevicesImpl/TbMeterInfoServiceImpl.java index 95ad1150..35ebee2e 100644 --- a/ruoyi-modules/Property/src/main/java/org/dromara/property/service/impl/smartDevicesImpl/TbMeterInfoServiceImpl.java +++ b/ruoyi-modules/Property/src/main/java/org/dromara/property/service/impl/smartDevicesImpl/TbMeterInfoServiceImpl.java @@ -1,5 +1,6 @@ package org.dromara.property.service.impl.smartDevicesImpl; +import cn.hutool.core.lang.Assert; import org.dromara.common.core.utils.MapstructUtils; import org.dromara.common.core.utils.StringUtils; import org.dromara.common.mybatis.core.page.TableDataInfo; @@ -9,16 +10,21 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.dromara.property.domain.bo.TbFloorBo; +import org.dromara.property.domain.vo.TbFloorVo; +import org.dromara.property.service.ITbFloorService; import org.springframework.stereotype.Service; import org.dromara.property.domain.bo.smartDevicesBo.TbMeterInfoBo; import org.dromara.property.domain.vo.smartDevicesVo.TbMeterInfoVo; import org.dromara.property.domain.entity.smartDevices.TbMeterInfo; import org.dromara.property.mapper.smartDevicesMapper.TbMeterInfoMapper; import org.dromara.property.service.smartDevicesService.ITbMeterInfoService; +import org.springframework.transaction.annotation.Transactional; import java.util.List; import java.util.Map; import java.util.Collection; +import java.util.stream.Collectors; /** * 水电气Service业务层处理 @@ -27,11 +33,12 @@ import java.util.Collection; * @since 2025-07-19 */ @Slf4j -@RequiredArgsConstructor @Service +@RequiredArgsConstructor public class TbMeterInfoServiceImpl implements ITbMeterInfoService { private final TbMeterInfoMapper baseMapper; + private final ITbFloorService floorService; /** * 查询水电气 @@ -40,7 +47,7 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService { * @return 水电气 */ @Override - public TbMeterInfoVo queryById(Long id){ + public TbMeterInfoVo queryById(Long id) { return baseMapper.selectVoById(id); } @@ -55,6 +62,19 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService { public TableDataInfo queryPageList(TbMeterInfoBo bo, PageQuery pageQuery) { LambdaQueryWrapper lqw = buildQueryWrapper(bo); Page result = baseMapper.selectVoPage(pageQuery.build(), lqw); + // 创建楼层ID到楼层名称的映射,避免重复查询 + List floorList = floorService.queryList(new TbFloorBo()); + Map floorMap = floorList.stream() + .collect(Collectors.toMap(TbFloorVo::getId, TbFloorVo::getFloorName, (key1, key2) -> key1)); + + // 为每个灯控信息设置楼层名称 + result.getRecords().forEach(record -> { + String floorName = floorMap.get(record.getFloorId()); + if (floorName != null) { + record.setFloorName(floorName); + } + }); + return TableDataInfo.build(result); } @@ -94,13 +114,15 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService { * @return 是否新增成功 */ @Override + @Transactional(rollbackFor = Exception.class) public Boolean insertByBo(TbMeterInfoBo bo) { TbMeterInfo add = MapstructUtils.convert(bo, TbMeterInfo.class); - validEntityBeforeSave(add); + assert add != null; + TbFloorVo floor = floorService.queryById(add.getFloorId()); + add.setBuildingId(floor.getBuildingId()); + add.setCommunityId(floor.getCommunityId()); boolean flag = baseMapper.insert(add) > 0; - if (flag) { - bo.setId(add.getId()); - } + Assert.isTrue(flag, "新增水电气信息失败"); return flag; } @@ -120,7 +142,7 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService { /** * 保存前的数据校验 */ - private void validEntityBeforeSave(TbMeterInfo entity){ + private void validEntityBeforeSave(TbMeterInfo entity) { //TODO 做一些数据校验,如唯一约束 } @@ -133,7 +155,7 @@ public class TbMeterInfoServiceImpl implements ITbMeterInfoService { */ @Override public Boolean deleteWithValidByIds(Collection ids, Boolean isValid) { - if(isValid){ + if (isValid) { //TODO 做一些业务上的校验,判断是否需要校验 } return baseMapper.deleteByIds(ids) > 0; diff --git a/ruoyi-modules/Sis/src/main/java/org/dromara/sis/config/RocketMQClusterConfig.java b/ruoyi-modules/Sis/src/main/java/org/dromara/sis/config/RocketMQClusterConfig.java new file mode 100644 index 00000000..9e300b38 --- /dev/null +++ b/ruoyi-modules/Sis/src/main/java/org/dromara/sis/config/RocketMQClusterConfig.java @@ -0,0 +1,75 @@ +package org.dromara.sis.config; + +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author lsm + * @apiNote RocketMQClusterConfig + * @since 2025/8/26 + */ +@Configuration +public class RocketMQClusterConfig { + + // 从配置文件中读取 cluster 的配置 + @Value("${rocketmq1.cluster.name-server}") + private String nameServer1; + + @Value("${rocketmq1.cluster.producer.group}") + private String producerGroup1; + + // 为第一个集群创建生产者实例 + @Bean({"clusterProducerOne"}) + public DefaultMQProducer clusterProducerOne() throws Exception { + DefaultMQProducer producer = new DefaultMQProducer(producerGroup1); + producer.setNamesrvAddr(nameServer1); + // 设置发送超时时间 + producer.setSendMsgTimeout(5000); + // 设置重试次数 + producer.setRetryTimesWhenSendFailed(2); + producer.setRetryTimesWhenSendAsyncFailed(2); + return producer; + } + + // 使用上面的生产者实例创建 RocketMQTemplate + @Bean("rocketMQTemplateClusterOne") + public RocketMQTemplate rocketMQTemplateClusterOne(@Qualifier("clusterProducerOne") DefaultMQProducer producer) { + RocketMQTemplate template = new RocketMQTemplate(); + template.setProducer(producer); + return template; + } + + + // 从配置文件中读取 cluster 的配置 + @Value("${rocketmq2.cluster.name-server}") + private String nameServer2; + + @Value("${rocketmq2.cluster.producer.group}") + private String producerGroup2; + + // 为第二个集群创建生产者实例 + @Bean({"clusterProducerTwo"}) + public DefaultMQProducer clusterProducerTwo() throws Exception { + DefaultMQProducer producer = new DefaultMQProducer(producerGroup2); + producer.setNamesrvAddr(nameServer2); + // 设置发送超时时间 + producer.setSendMsgTimeout(5000); + // 设置重试次数 + producer.setRetryTimesWhenSendFailed(2); + producer.setRetryTimesWhenSendAsyncFailed(2); + return producer; + } + + // 使用上面的生产者实例创建 RocketMQTemplate + @Bean("rocketMQTemplateClusterTwo") + public RocketMQTemplate rocketMQTemplateClusterTwo(@Qualifier("clusterProducerTwo") DefaultMQProducer producer) { + RocketMQTemplate template = new RocketMQTemplate(); + template.setProducer(producer); + return template; + } + +} diff --git a/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/RocketMqConstants.java b/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/RocketMqConstants.java index d40423a5..739b8e42 100644 --- a/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/RocketMqConstants.java +++ b/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/RocketMqConstants.java @@ -10,7 +10,7 @@ public interface RocketMqConstants { String TOPIC = "SmartParks"; // mq GROUP - String GROUP = "SmartParksEqp"; + String GROUP = "SmartParks"; /*-----------------------------------消息tag------------------------------------*/ String HIKADD = "ADD_HIK_DEVICE_TAG"; @@ -19,4 +19,6 @@ public interface RocketMqConstants { // 人脸比对 String FACECOMPARE = "FACE_COMPARE_REPORT"; + String METER_RECORD = "METER_RECORD_TAG"; + } diff --git a/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/consumer/FaceCaptureConsumer.java b/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/consumer/FaceCaptureConsumer.java index 13f68f6c..2779d4dd 100644 --- a/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/consumer/FaceCaptureConsumer.java +++ b/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/consumer/FaceCaptureConsumer.java @@ -9,6 +9,7 @@ import org.apache.rocketmq.spring.core.RocketMQListener; import org.dromara.sis.rocketmq.RocketMqConstants; import org.dromara.sis.rocketmq.domain.FaceCapture; import org.dromara.sis.service.IZeroSensationPassageService; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** @@ -23,7 +24,8 @@ import org.springframework.stereotype.Component; @RocketMQMessageListener( topic = RocketMqConstants.TOPIC, consumerGroup = RocketMqConstants.GROUP, - selectorExpression = RocketMqConstants.FACECAPTURE + selectorExpression = RocketMqConstants.FACECAPTURE, + nameServer = "${rocketmq2.cluster.name-server}" ) public class FaceCaptureConsumer implements RocketMQListener { @@ -34,7 +36,7 @@ public class FaceCaptureConsumer implements RocketMQListener { log.info("消费人脸抓拍数据,数据长度={}", ext.getBody().length); try { FaceCapture capture = JSONObject.parseObject(ext.getBody(), FaceCapture.class); - zeroSensationPassageService.pass(capture.getDeviceIp(), capture.getSmallImg(), capture.getBigImg()); +// zeroSensationPassageService.pass(capture.getDeviceIp(), capture.getSmallImg(), capture.getBigImg()); } catch (Exception e) { log.error("消费人脸抓拍数据处理失败,", e); } diff --git a/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/consumer/FaceCompareConsumer.java b/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/consumer/FaceCompareConsumer.java index c67ce6ac..c9f50b2f 100644 --- a/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/consumer/FaceCompareConsumer.java +++ b/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/consumer/FaceCompareConsumer.java @@ -22,7 +22,8 @@ import org.springframework.stereotype.Component; @RocketMQMessageListener( topic = RocketMqConstants.TOPIC, consumerGroup = RocketMqConstants.GROUP, - selectorExpression = RocketMqConstants.FACECOMPARE + selectorExpression = RocketMqConstants.FACECOMPARE, + nameServer = "${rocketmq2.cluster.name-server}" ) public class FaceCompareConsumer implements RocketMQListener { @@ -32,7 +33,7 @@ public class FaceCompareConsumer implements RocketMQListener { log.info("消费人脸比对数据,数据长度={}", ext.getBody().length); try { FaceCompare compare = JSONObject.parseObject(ext.getBody(), FaceCompare.class); - + } catch (Exception e) { log.error("消费比对抓拍数据处理失败,", e); } diff --git a/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/consumer/MeterRecordConsumer.java b/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/consumer/MeterRecordConsumer.java new file mode 100644 index 00000000..734e71b5 --- /dev/null +++ b/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/consumer/MeterRecordConsumer.java @@ -0,0 +1,38 @@ +package org.dromara.sis.rocketmq.consumer; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.dromara.sis.rocketmq.RocketMqConstants; +import org.springframework.stereotype.Component; + +/** + * @author lsm + * @apiNote MeterRecordConsumer + * @since 2025/8/25 + */ +@Slf4j +@Component +@RequiredArgsConstructor +@RocketMQMessageListener( + consumerGroup = "Meter_Group", + topic = RocketMqConstants.TOPIC, + selectorExpression = RocketMqConstants.METER_RECORD, + nameServer = "${rocketmq2.cluster.name-server}" +) +public class MeterRecordConsumer implements RocketMQListener { + + @Override + public void onMessage(MessageExt ext) { + log.info("消费仪表上报数据,数据长度={}", ext.getBody().length); + try { + String message = new String(ext.getBody()); + log.info("消费仪表上报数据,数据={}", message); + } catch (Exception e) { + log.error("消费仪表上报数据处理失败,", e); + } + + } +} diff --git a/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/producer/ProducerService.java b/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/producer/ProducerService.java new file mode 100644 index 00000000..1f37ec02 --- /dev/null +++ b/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/producer/ProducerService.java @@ -0,0 +1,69 @@ +package org.dromara.sis.rocketmq.producer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +/** + * @author lsm + * @apiNote ProducerService + * @since 2025/8/26 + */ +@Slf4j +@Component +public class ProducerService { + + @Autowired + @Qualifier("rocketMQTemplateClusterOne") + private RocketMQTemplate rocketMQTemplateClusterOne; + + @Autowired +// @Qualifier("rocketMQTemplateClusterTwo") + private RocketMQTemplate rocketMQTemplateClusterTwo; + + /** + * 向mq写入消息 + * + * @param topic 消息topic + * @param tag 消息tag + * @param msg 消息 + */ + public void defaultSend(String topic, String tag, String msg) { + try { + String destination = topic + ":" + tag; + log.info("准备向默认RocketMQ发送消息,目的地:{}", destination); + + // 使用 RocketMQTemplate 的同步发送方法 + rocketMQTemplateClusterOne.syncSend(destination, msg); + + log.info("发送RocketMQ消息成功"); + } catch (Exception e) { + log.error("发送RocketMQ消息失败", e); + } + } + + + /** + * 向mq写入消息 + * + * @param topic 消息topic + * @param tag 消息tag + * @param msg 消息 + */ + public void clusterSend(String topic, String tag, String msg) { + try { + String destination = topic + ":" + tag; + log.info("准备向集群2 RocketMQ发送消息,目的地:{}", destination); + + // 使用 RocketMQTemplate 的同步发送方法 + rocketMQTemplateClusterTwo.syncSend(destination, msg); + + log.info("发送ClusterRocketMQ消息成功"); + } catch (Exception e) { + log.error("发送RocketMQ消息失败", e); + } + } +}