Compare commits

...

3 Commits

Author SHA1 Message Date
78d97e14ee refactor(rocketmq): 1 2025-08-27 15:46:17 +08:00
c2c1818ba6 refactor(rocketmq): 1 2025-08-27 15:43:27 +08:00
3ebc58f5e3 refactor(rocketmq): 手动装配rocketMQ 2025-08-26 21:02:38 +08:00
7 changed files with 62 additions and 57 deletions

View File

@@ -10,7 +10,7 @@ public interface RocketMqConstants {
String TOPIC = "SmartParks"; String TOPIC = "SmartParks";
// mq GROUP // mq GROUP
String GROUP = "Meter-Group"; String METER_GROUP = "METER_GROUP";
/*-----------------------------------消息tag------------------------------------*/ /*-----------------------------------消息tag------------------------------------*/
String METER_RECORD = "METER_RECORD_TAG"; String METER_RECORD = "METER_RECORD_TAG";

View File

@@ -16,10 +16,10 @@ import org.springframework.context.annotation.Configuration;
public class RocketMQClusterConfig { public class RocketMQClusterConfig {
// 从配置文件中读取 cluster 的配置 // 从配置文件中读取 cluster 的配置
@Value("${rocketmq1.cluster.name-server}") @Value("${rocketmq.cluster1.name-server}")
private String nameServer1; private String nameServer1;
@Value("${rocketmq1.cluster.producer.group}") @Value("${rocketmq.cluster1.producer.group}")
private String producerGroup1; private String producerGroup1;
// 为第一个集群创建生产者实例 // 为第一个集群创建生产者实例
@@ -45,31 +45,31 @@ public class RocketMQClusterConfig {
// 从配置文件中读取 cluster 的配置 // 从配置文件中读取 cluster 的配置
@Value("${rocketmq2.cluster.name-server}") // @Value("${rocketmq.cluster2.name-server}")
private String nameServer2; // private String nameServer2;
//
@Value("${rocketmq2.cluster.producer.group}") // @Value("${rocketmq.cluster2.producer.group}")
private String producerGroup2; // private String producerGroup2;
//
// 为第二个集群创建生产者实例 // // 为第二个集群创建生产者实例
@Bean({"clusterProducerTwo"}) // @Bean({"clusterProducerTwo"})
public DefaultMQProducer clusterProducerTwo() throws Exception { // public DefaultMQProducer clusterProducerTwo() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(producerGroup2); // DefaultMQProducer producer = new DefaultMQProducer(producerGroup2);
producer.setNamesrvAddr(nameServer2); // producer.setNamesrvAddr(nameServer2);
// 设置发送超时时间 // // 设置发送超时时间
producer.setSendMsgTimeout(5000); // producer.setSendMsgTimeout(5000);
// 设置重试次数 // // 设置重试次数
producer.setRetryTimesWhenSendFailed(2); // producer.setRetryTimesWhenSendFailed(2);
producer.setRetryTimesWhenSendAsyncFailed(2); // producer.setRetryTimesWhenSendAsyncFailed(2);
return producer; // return producer;
} // }
//
// 使用上面的生产者实例创建 RocketMQTemplate // // 使用上面的生产者实例创建 RocketMQTemplate
@Bean("rocketMQTemplateClusterTwo") // @Bean("rocketMQTemplateClusterTwo")
public RocketMQTemplate rocketMQTemplateClusterTwo(@Qualifier("clusterProducerTwo") DefaultMQProducer producer) { // public RocketMQTemplate rocketMQTemplateClusterTwo(@Qualifier("clusterProducerTwo") DefaultMQProducer producer) {
RocketMQTemplate template = new RocketMQTemplate(); // RocketMQTemplate template = new RocketMQTemplate();
template.setProducer(producer); // template.setProducer(producer);
return template; // return template;
} // }
} }

View File

@@ -12,6 +12,9 @@ public interface RocketMqConstants {
// mq GROUP // mq GROUP
String GROUP = "SmartParks"; String GROUP = "SmartParks";
// meter Group
String METER_GROUP = "METER_GROUP";
/*-----------------------------------消息tag------------------------------------*/ /*-----------------------------------消息tag------------------------------------*/
String HIKADD = "ADD_HIK_DEVICE_TAG"; String HIKADD = "ADD_HIK_DEVICE_TAG";
// 人脸抓拍上报 // 人脸抓拍上报

View File

@@ -25,7 +25,7 @@ import org.springframework.stereotype.Component;
topic = RocketMqConstants.TOPIC, topic = RocketMqConstants.TOPIC,
consumerGroup = RocketMqConstants.GROUP, consumerGroup = RocketMqConstants.GROUP,
selectorExpression = RocketMqConstants.FACECAPTURE, selectorExpression = RocketMqConstants.FACECAPTURE,
nameServer = "${rocketmq2.cluster.name-server}" nameServer = "${rocketmq.cluster1.name-server}"
) )
public class FaceCaptureConsumer implements RocketMQListener<MessageExt> { public class FaceCaptureConsumer implements RocketMQListener<MessageExt> {
@@ -36,7 +36,7 @@ public class FaceCaptureConsumer implements RocketMQListener<MessageExt> {
log.info("消费人脸抓拍数据,数据长度={}", ext.getBody().length); log.info("消费人脸抓拍数据,数据长度={}", ext.getBody().length);
try { try {
FaceCapture capture = JSONObject.parseObject(ext.getBody(), FaceCapture.class); FaceCapture capture = JSONObject.parseObject(ext.getBody(), FaceCapture.class);
// zeroSensationPassageService.pass(capture.getDeviceIp(), capture.getSmallImg(), capture.getBigImg()); zeroSensationPassageService.pass(capture.getDeviceIp(), capture.getSmallImg(), capture.getBigImg());
} catch (Exception e) { } catch (Exception e) {
log.error("消费人脸抓拍数据处理失败,", e); log.error("消费人脸抓拍数据处理失败,", e);
} }

View File

@@ -23,7 +23,7 @@ import org.springframework.stereotype.Component;
topic = RocketMqConstants.TOPIC, topic = RocketMqConstants.TOPIC,
consumerGroup = RocketMqConstants.GROUP, consumerGroup = RocketMqConstants.GROUP,
selectorExpression = RocketMqConstants.FACECOMPARE, selectorExpression = RocketMqConstants.FACECOMPARE,
nameServer = "${rocketmq2.cluster.name-server}" nameServer = "${rocketmq.cluster1.name-server}"
) )
public class FaceCompareConsumer implements RocketMQListener<MessageExt> { public class FaceCompareConsumer implements RocketMQListener<MessageExt> {

View File

@@ -6,6 +6,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQListener;
import org.dromara.sis.rocketmq.RocketMqConstants; import org.dromara.sis.rocketmq.RocketMqConstants;
import org.dromara.sis.rocketmq.producer.ProducerService;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/** /**
@@ -17,21 +18,26 @@ import org.springframework.stereotype.Component;
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor
@RocketMQMessageListener( @RocketMQMessageListener(
consumerGroup = "Meter_Group",
topic = RocketMqConstants.TOPIC, topic = RocketMqConstants.TOPIC,
consumerGroup = RocketMqConstants.METER_GROUP,
selectorExpression = RocketMqConstants.METER_RECORD, selectorExpression = RocketMqConstants.METER_RECORD,
nameServer = "${rocketmq2.cluster.name-server}" nameServer = "${rocketmq.cluster1.name-server}"
) )
public class MeterRecordConsumer implements RocketMQListener<MessageExt> { public class MeterRecordConsumer implements RocketMQListener<MessageExt> {
private final ProducerService producerService;
@Override @Override
public void onMessage(MessageExt ext) { public void onMessage(MessageExt ext) {
log.info("消费仪表上报数据,数据长度={}", ext.getBody().length);
try { try {
String message = new String(ext.getBody()); if (ext.getBody() == null) {
log.info("消费仪表上报数据,数据={}", message); log.info("仪表上报消息数据,不转发!");
} else {
producerService.defaultSend(RocketMqConstants.TOPIC, RocketMqConstants.METER_RECORD, new String(ext.getBody()));
log.info("转发仪表上报数据处理成功");
}
} catch (Exception e) { } catch (Exception e) {
log.error("消费仪表上报数据处理失败,", e); log.error("转发仪表上报数据处理失败,", e);
} }
} }

View File

@@ -20,9 +20,9 @@ public class ProducerService {
@Qualifier("rocketMQTemplateClusterOne") @Qualifier("rocketMQTemplateClusterOne")
private RocketMQTemplate rocketMQTemplateClusterOne; private RocketMQTemplate rocketMQTemplateClusterOne;
@Autowired // @Autowired
// @Qualifier("rocketMQTemplateClusterTwo") // @Qualifier("rocketMQTemplateClusterTwo")
private RocketMQTemplate rocketMQTemplateClusterTwo; // private RocketMQTemplate rocketMQTemplateClusterTwo;
/** /**
* 向mq写入消息 * 向mq写入消息
@@ -34,14 +34,12 @@ public class ProducerService {
public void defaultSend(String topic, String tag, String msg) { public void defaultSend(String topic, String tag, String msg) {
try { try {
String destination = topic + ":" + tag; String destination = topic + ":" + tag;
log.info("准备向默认RocketMQ发送消息目的地{}", destination);
// 使用 RocketMQTemplate 的同步发送方法 // 使用 RocketMQTemplate 的同步发送方法
rocketMQTemplateClusterOne.syncSend(destination, msg); rocketMQTemplateClusterOne.syncSend(destination, msg);
log.info("发送RocketMQ消息成功"); log.info("发送RocketMQOne消息成功, nameServer:{}", rocketMQTemplateClusterOne.getProducer().getNamesrvAddr());
} catch (Exception e) { } catch (Exception e) {
log.error("发送RocketMQ消息失败", e); log.error("发送RocketMQOne消息失败", e);
} }
} }
@@ -53,17 +51,15 @@ public class ProducerService {
* @param tag 消息tag * @param tag 消息tag
* @param msg 消息 * @param msg 消息
*/ */
public void clusterSend(String topic, String tag, String msg) { // public void clusterSend(String topic, String tag, String msg) {
try { // try {
String destination = topic + ":" + tag; // String destination = topic + ":" + tag;
log.info("准备向集群2 RocketMQ发送消息目的地{}", destination); // // 使用 RocketMQTemplate 的同步发送方法
// rocketMQTemplateClusterTwo.syncSend(destination, msg);
// 使用 RocketMQTemplate 的同步发送方法 //
rocketMQTemplateClusterTwo.syncSend(destination, msg); // log.info("发送RocketMQTwo消息成功, nameServer:{}", rocketMQTemplateClusterTwo.getProducer().getNamesrvAddr());
// } catch (Exception e) {
log.info("发送ClusterRocketMQ消息成功"); // log.error("发送RocketMQTwo消息失败", e);
} catch (Exception e) { // }
log.error("发送RocketMQ消息失败", e); // }
}
}
} }