refactor(rocketmq): 1
This commit is contained in:
@@ -45,31 +45,31 @@ public class RocketMQClusterConfig {
|
|||||||
|
|
||||||
|
|
||||||
// 从配置文件中读取 cluster 的配置
|
// 从配置文件中读取 cluster 的配置
|
||||||
@Value("${rocketmq.cluster2.name-server}")
|
// @Value("${rocketmq.cluster2.name-server}")
|
||||||
private String nameServer2;
|
// private String nameServer2;
|
||||||
|
//
|
||||||
@Value("${rocketmq.cluster2.producer.group}")
|
// @Value("${rocketmq.cluster2.producer.group}")
|
||||||
private String producerGroup2;
|
// private String producerGroup2;
|
||||||
|
//
|
||||||
// 为第二个集群创建生产者实例
|
// // 为第二个集群创建生产者实例
|
||||||
@Bean({"clusterProducerTwo"})
|
// @Bean({"clusterProducerTwo"})
|
||||||
public DefaultMQProducer clusterProducerTwo() throws Exception {
|
// public DefaultMQProducer clusterProducerTwo() throws Exception {
|
||||||
DefaultMQProducer producer = new DefaultMQProducer(producerGroup2);
|
// DefaultMQProducer producer = new DefaultMQProducer(producerGroup2);
|
||||||
producer.setNamesrvAddr(nameServer2);
|
// producer.setNamesrvAddr(nameServer2);
|
||||||
// 设置发送超时时间
|
// // 设置发送超时时间
|
||||||
producer.setSendMsgTimeout(5000);
|
// producer.setSendMsgTimeout(5000);
|
||||||
// 设置重试次数
|
// // 设置重试次数
|
||||||
producer.setRetryTimesWhenSendFailed(2);
|
// producer.setRetryTimesWhenSendFailed(2);
|
||||||
producer.setRetryTimesWhenSendAsyncFailed(2);
|
// producer.setRetryTimesWhenSendAsyncFailed(2);
|
||||||
return producer;
|
// return producer;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
// 使用上面的生产者实例创建 RocketMQTemplate
|
// // 使用上面的生产者实例创建 RocketMQTemplate
|
||||||
@Bean("rocketMQTemplateClusterTwo")
|
// @Bean("rocketMQTemplateClusterTwo")
|
||||||
public RocketMQTemplate rocketMQTemplateClusterTwo(@Qualifier("clusterProducerTwo") DefaultMQProducer producer) {
|
// public RocketMQTemplate rocketMQTemplateClusterTwo(@Qualifier("clusterProducerTwo") DefaultMQProducer producer) {
|
||||||
RocketMQTemplate template = new RocketMQTemplate();
|
// RocketMQTemplate template = new RocketMQTemplate();
|
||||||
template.setProducer(producer);
|
// template.setProducer(producer);
|
||||||
return template;
|
// return template;
|
||||||
}
|
// }
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@@ -25,7 +25,7 @@ import org.springframework.stereotype.Component;
|
|||||||
topic = RocketMqConstants.TOPIC,
|
topic = RocketMqConstants.TOPIC,
|
||||||
consumerGroup = RocketMqConstants.GROUP,
|
consumerGroup = RocketMqConstants.GROUP,
|
||||||
selectorExpression = RocketMqConstants.FACECAPTURE,
|
selectorExpression = RocketMqConstants.FACECAPTURE,
|
||||||
nameServer = "${rocketmq.cluster2.name-server}"
|
nameServer = "${rocketmq.cluster1.name-server}"
|
||||||
)
|
)
|
||||||
public class FaceCaptureConsumer implements RocketMQListener<MessageExt> {
|
public class FaceCaptureConsumer implements RocketMQListener<MessageExt> {
|
||||||
|
|
||||||
@@ -36,7 +36,7 @@ public class FaceCaptureConsumer implements RocketMQListener<MessageExt> {
|
|||||||
log.info("消费人脸抓拍数据,数据长度={}", ext.getBody().length);
|
log.info("消费人脸抓拍数据,数据长度={}", ext.getBody().length);
|
||||||
try {
|
try {
|
||||||
FaceCapture capture = JSONObject.parseObject(ext.getBody(), FaceCapture.class);
|
FaceCapture capture = JSONObject.parseObject(ext.getBody(), FaceCapture.class);
|
||||||
// zeroSensationPassageService.pass(capture.getDeviceIp(), capture.getSmallImg(), capture.getBigImg());
|
zeroSensationPassageService.pass(capture.getDeviceIp(), capture.getSmallImg(), capture.getBigImg());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("消费人脸抓拍数据处理失败,", e);
|
log.error("消费人脸抓拍数据处理失败,", e);
|
||||||
}
|
}
|
||||||
|
@@ -23,7 +23,7 @@ import org.springframework.stereotype.Component;
|
|||||||
topic = RocketMqConstants.TOPIC,
|
topic = RocketMqConstants.TOPIC,
|
||||||
consumerGroup = RocketMqConstants.GROUP,
|
consumerGroup = RocketMqConstants.GROUP,
|
||||||
selectorExpression = RocketMqConstants.FACECOMPARE,
|
selectorExpression = RocketMqConstants.FACECOMPARE,
|
||||||
nameServer = "${rocketmq.cluster2.name-server}"
|
nameServer = "${rocketmq.cluster1.name-server}"
|
||||||
)
|
)
|
||||||
public class FaceCompareConsumer implements RocketMQListener<MessageExt> {
|
public class FaceCompareConsumer implements RocketMQListener<MessageExt> {
|
||||||
|
|
||||||
|
@@ -6,6 +6,7 @@ import org.apache.rocketmq.common.message.MessageExt;
|
|||||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||||
import org.dromara.sis.rocketmq.RocketMqConstants;
|
import org.dromara.sis.rocketmq.RocketMqConstants;
|
||||||
|
import org.dromara.sis.rocketmq.producer.ProducerService;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -17,21 +18,26 @@ import org.springframework.stereotype.Component;
|
|||||||
@Component
|
@Component
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
@RocketMQMessageListener(
|
@RocketMQMessageListener(
|
||||||
consumerGroup = "Meter_Group",
|
|
||||||
topic = RocketMqConstants.TOPIC,
|
topic = RocketMqConstants.TOPIC,
|
||||||
|
consumerGroup = RocketMqConstants.METER_GROUP,
|
||||||
selectorExpression = RocketMqConstants.METER_RECORD,
|
selectorExpression = RocketMqConstants.METER_RECORD,
|
||||||
nameServer = "${rocketmq.cluster2.name-server}"
|
nameServer = "${rocketmq.cluster1.name-server}"
|
||||||
)
|
)
|
||||||
public class MeterRecordConsumer implements RocketMQListener<MessageExt> {
|
public class MeterRecordConsumer implements RocketMQListener<MessageExt> {
|
||||||
|
|
||||||
|
private final ProducerService producerService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(MessageExt ext) {
|
public void onMessage(MessageExt ext) {
|
||||||
log.info("消费仪表上报数据,数据长度={}", ext.getBody().length);
|
|
||||||
try {
|
try {
|
||||||
String message = new String(ext.getBody());
|
if (ext.getBody() == null) {
|
||||||
log.info("消费仪表上报数据,数据={}", message);
|
log.info("仪表上报消息数据,不转发!");
|
||||||
|
} else {
|
||||||
|
producerService.defaultSend(RocketMqConstants.TOPIC, RocketMqConstants.METER_RECORD, new String(ext.getBody()));
|
||||||
|
log.info("转发仪表上报数据处理成功");
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("消费仪表上报数据处理失败,", e);
|
log.error("转发仪表上报数据处理失败,", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@@ -20,9 +20,9 @@ public class ProducerService {
|
|||||||
@Qualifier("rocketMQTemplateClusterOne")
|
@Qualifier("rocketMQTemplateClusterOne")
|
||||||
private RocketMQTemplate rocketMQTemplateClusterOne;
|
private RocketMQTemplate rocketMQTemplateClusterOne;
|
||||||
|
|
||||||
@Autowired
|
// @Autowired
|
||||||
// @Qualifier("rocketMQTemplateClusterTwo")
|
// @Qualifier("rocketMQTemplateClusterTwo")
|
||||||
private RocketMQTemplate rocketMQTemplateClusterTwo;
|
// private RocketMQTemplate rocketMQTemplateClusterTwo;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 向mq写入消息
|
* 向mq写入消息
|
||||||
@@ -34,14 +34,12 @@ public class ProducerService {
|
|||||||
public void defaultSend(String topic, String tag, String msg) {
|
public void defaultSend(String topic, String tag, String msg) {
|
||||||
try {
|
try {
|
||||||
String destination = topic + ":" + tag;
|
String destination = topic + ":" + tag;
|
||||||
log.info("准备向默认RocketMQ发送消息,目的地:{}", destination);
|
|
||||||
|
|
||||||
// 使用 RocketMQTemplate 的同步发送方法
|
// 使用 RocketMQTemplate 的同步发送方法
|
||||||
rocketMQTemplateClusterOne.syncSend(destination, msg);
|
rocketMQTemplateClusterOne.syncSend(destination, msg);
|
||||||
|
|
||||||
log.info("发送RocketMQ消息成功");
|
log.info("发送RocketMQOne消息成功, nameServer:{}", rocketMQTemplateClusterOne.getProducer().getNamesrvAddr());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("发送RocketMQ消息失败", e);
|
log.error("发送RocketMQOne消息失败", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -53,17 +51,15 @@ public class ProducerService {
|
|||||||
* @param tag 消息tag
|
* @param tag 消息tag
|
||||||
* @param msg 消息
|
* @param msg 消息
|
||||||
*/
|
*/
|
||||||
public void clusterSend(String topic, String tag, String msg) {
|
// public void clusterSend(String topic, String tag, String msg) {
|
||||||
try {
|
// try {
|
||||||
String destination = topic + ":" + tag;
|
// String destination = topic + ":" + tag;
|
||||||
log.info("准备向集群2 RocketMQ发送消息,目的地:{}", destination);
|
// // 使用 RocketMQTemplate 的同步发送方法
|
||||||
|
// rocketMQTemplateClusterTwo.syncSend(destination, msg);
|
||||||
// 使用 RocketMQTemplate 的同步发送方法
|
//
|
||||||
rocketMQTemplateClusterTwo.syncSend(destination, msg);
|
// log.info("发送RocketMQTwo消息成功, nameServer:{}", rocketMQTemplateClusterTwo.getProducer().getNamesrvAddr());
|
||||||
|
// } catch (Exception e) {
|
||||||
log.info("发送ClusterRocketMQ消息成功");
|
// log.error("发送RocketMQTwo消息失败", e);
|
||||||
} catch (Exception e) {
|
// }
|
||||||
log.error("发送RocketMQ消息失败", e);
|
// }
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user