diff --git a/ruoyi-modules/Sis/src/main/java/org/dromara/sis/config/RocketMQClusterConfig.java b/ruoyi-modules/Sis/src/main/java/org/dromara/sis/config/RocketMQClusterConfig.java index 5c115981..8096dfc1 100644 --- a/ruoyi-modules/Sis/src/main/java/org/dromara/sis/config/RocketMQClusterConfig.java +++ b/ruoyi-modules/Sis/src/main/java/org/dromara/sis/config/RocketMQClusterConfig.java @@ -45,31 +45,31 @@ public class RocketMQClusterConfig { // 从配置文件中读取 cluster 的配置 - @Value("${rocketmq.cluster2.name-server}") - private String nameServer2; - - @Value("${rocketmq.cluster2.producer.group}") - private String producerGroup2; - - // 为第二个集群创建生产者实例 - @Bean({"clusterProducerTwo"}) - public DefaultMQProducer clusterProducerTwo() throws Exception { - DefaultMQProducer producer = new DefaultMQProducer(producerGroup2); - producer.setNamesrvAddr(nameServer2); - // 设置发送超时时间 - producer.setSendMsgTimeout(5000); - // 设置重试次数 - producer.setRetryTimesWhenSendFailed(2); - producer.setRetryTimesWhenSendAsyncFailed(2); - return producer; - } - - // 使用上面的生产者实例创建 RocketMQTemplate - @Bean("rocketMQTemplateClusterTwo") - public RocketMQTemplate rocketMQTemplateClusterTwo(@Qualifier("clusterProducerTwo") DefaultMQProducer producer) { - RocketMQTemplate template = new RocketMQTemplate(); - template.setProducer(producer); - return template; - } +// @Value("${rocketmq.cluster2.name-server}") +// private String nameServer2; +// +// @Value("${rocketmq.cluster2.producer.group}") +// private String producerGroup2; +// +// // 为第二个集群创建生产者实例 +// @Bean({"clusterProducerTwo"}) +// public DefaultMQProducer clusterProducerTwo() throws Exception { +// DefaultMQProducer producer = new DefaultMQProducer(producerGroup2); +// producer.setNamesrvAddr(nameServer2); +// // 设置发送超时时间 +// producer.setSendMsgTimeout(5000); +// // 设置重试次数 +// producer.setRetryTimesWhenSendFailed(2); +// producer.setRetryTimesWhenSendAsyncFailed(2); +// return producer; +// } +// +// // 使用上面的生产者实例创建 RocketMQTemplate +// @Bean("rocketMQTemplateClusterTwo") +// public RocketMQTemplate rocketMQTemplateClusterTwo(@Qualifier("clusterProducerTwo") DefaultMQProducer producer) { +// RocketMQTemplate template = new RocketMQTemplate(); +// template.setProducer(producer); +// return template; +// } } diff --git a/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/consumer/FaceCaptureConsumer.java b/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/consumer/FaceCaptureConsumer.java index 94330117..af0e3f47 100644 --- a/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/consumer/FaceCaptureConsumer.java +++ b/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/consumer/FaceCaptureConsumer.java @@ -25,7 +25,7 @@ import org.springframework.stereotype.Component; topic = RocketMqConstants.TOPIC, consumerGroup = RocketMqConstants.GROUP, selectorExpression = RocketMqConstants.FACECAPTURE, - nameServer = "${rocketmq.cluster2.name-server}" + nameServer = "${rocketmq.cluster1.name-server}" ) public class FaceCaptureConsumer implements RocketMQListener { @@ -36,7 +36,7 @@ public class FaceCaptureConsumer implements RocketMQListener { log.info("消费人脸抓拍数据,数据长度={}", ext.getBody().length); try { FaceCapture capture = JSONObject.parseObject(ext.getBody(), FaceCapture.class); -// zeroSensationPassageService.pass(capture.getDeviceIp(), capture.getSmallImg(), capture.getBigImg()); + zeroSensationPassageService.pass(capture.getDeviceIp(), capture.getSmallImg(), capture.getBigImg()); } catch (Exception e) { log.error("消费人脸抓拍数据处理失败,", e); } diff --git a/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/consumer/FaceCompareConsumer.java b/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/consumer/FaceCompareConsumer.java index e4b0faa7..e97e731f 100644 --- a/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/consumer/FaceCompareConsumer.java +++ b/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/consumer/FaceCompareConsumer.java @@ -23,7 +23,7 @@ import org.springframework.stereotype.Component; topic = RocketMqConstants.TOPIC, consumerGroup = RocketMqConstants.GROUP, selectorExpression = RocketMqConstants.FACECOMPARE, - nameServer = "${rocketmq.cluster2.name-server}" + nameServer = "${rocketmq.cluster1.name-server}" ) public class FaceCompareConsumer implements RocketMQListener { diff --git a/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/consumer/MeterRecordConsumer.java b/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/consumer/MeterRecordConsumer.java index aa1a0fd2..8360613a 100644 --- a/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/consumer/MeterRecordConsumer.java +++ b/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/consumer/MeterRecordConsumer.java @@ -6,6 +6,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.dromara.sis.rocketmq.RocketMqConstants; +import org.dromara.sis.rocketmq.producer.ProducerService; import org.springframework.stereotype.Component; /** @@ -17,21 +18,26 @@ import org.springframework.stereotype.Component; @Component @RequiredArgsConstructor @RocketMQMessageListener( - consumerGroup = "Meter_Group", topic = RocketMqConstants.TOPIC, + consumerGroup = RocketMqConstants.METER_GROUP, selectorExpression = RocketMqConstants.METER_RECORD, - nameServer = "${rocketmq.cluster2.name-server}" + nameServer = "${rocketmq.cluster1.name-server}" ) public class MeterRecordConsumer implements RocketMQListener { + private final ProducerService producerService; + @Override public void onMessage(MessageExt ext) { - log.info("消费仪表上报数据,数据长度={}", ext.getBody().length); try { - String message = new String(ext.getBody()); - log.info("消费仪表上报数据,数据={}", message); + if (ext.getBody() == null) { + log.info("仪表上报消息数据,不转发!"); + } else { + producerService.defaultSend(RocketMqConstants.TOPIC, RocketMqConstants.METER_RECORD, new String(ext.getBody())); + log.info("转发仪表上报数据处理成功"); + } } catch (Exception e) { - log.error("消费仪表上报数据处理失败,", e); + log.error("转发仪表上报数据处理失败,", e); } } diff --git a/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/producer/ProducerService.java b/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/producer/ProducerService.java index 1f37ec02..7c40a8de 100644 --- a/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/producer/ProducerService.java +++ b/ruoyi-modules/Sis/src/main/java/org/dromara/sis/rocketmq/producer/ProducerService.java @@ -20,9 +20,9 @@ public class ProducerService { @Qualifier("rocketMQTemplateClusterOne") private RocketMQTemplate rocketMQTemplateClusterOne; - @Autowired +// @Autowired // @Qualifier("rocketMQTemplateClusterTwo") - private RocketMQTemplate rocketMQTemplateClusterTwo; +// private RocketMQTemplate rocketMQTemplateClusterTwo; /** * 向mq写入消息 @@ -34,14 +34,12 @@ public class ProducerService { public void defaultSend(String topic, String tag, String msg) { try { String destination = topic + ":" + tag; - log.info("准备向默认RocketMQ发送消息,目的地:{}", destination); - // 使用 RocketMQTemplate 的同步发送方法 rocketMQTemplateClusterOne.syncSend(destination, msg); - log.info("发送RocketMQ消息成功"); + log.info("发送RocketMQOne消息成功, nameServer:{}", rocketMQTemplateClusterOne.getProducer().getNamesrvAddr()); } catch (Exception e) { - log.error("发送RocketMQ消息失败", e); + log.error("发送RocketMQOne消息失败", e); } } @@ -53,17 +51,15 @@ public class ProducerService { * @param tag 消息tag * @param msg 消息 */ - public void clusterSend(String topic, String tag, String msg) { - try { - String destination = topic + ":" + tag; - log.info("准备向集群2 RocketMQ发送消息,目的地:{}", destination); - - // 使用 RocketMQTemplate 的同步发送方法 - rocketMQTemplateClusterTwo.syncSend(destination, msg); - - log.info("发送ClusterRocketMQ消息成功"); - } catch (Exception e) { - log.error("发送RocketMQ消息失败", e); - } - } +// public void clusterSend(String topic, String tag, String msg) { +// try { +// String destination = topic + ":" + tag; +// // 使用 RocketMQTemplate 的同步发送方法 +// rocketMQTemplateClusterTwo.syncSend(destination, msg); +// +// log.info("发送RocketMQTwo消息成功, nameServer:{}", rocketMQTemplateClusterTwo.getProducer().getNamesrvAddr()); +// } catch (Exception e) { +// log.error("发送RocketMQTwo消息失败", e); +// } +// } }