分布式事务实战:RocketMQ事务消息的完整解决方案
·
目录
一、基础消息发送与消费
1.1 同步发送与基础消费
发送端:同步发送
@Service
@Slf4j
public class SyncMessageProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 同步发送消息
* 特点:发送后等待Broker响应,适合重要业务消息
*/
public SendResult sendSyncMessage() {
// 构建消息
Message<String> message = MessageBuilder
.withPayload("Hello RocketMQ") // 消息内容
.setHeader(MessageConst.PROPERTY_KEYS, "msg-key-001") // 消息键
.setHeader(MessageConst.PROPERTY_TAGS, "TagA") // 消息标签
.build();
// 发送消息
SendResult result = rocketMQTemplate.syncSend(
"test-topic:TagA", // topic:tag
message, // 消息
3000 // 超时时间(ms)
);
log.info("同步发送成功,消息ID: {}", result.getMsgId());
return result;
}
}
消费端:基础消费
@Component
@Slf4j
@RocketMQMessageListener(
topic = "test-topic", // 监听的Topic
consumerGroup = "test-consumer", // 消费者组
selectorExpression = "TagA", // 只消费TagA的消息
consumeMode = ConsumeMode.CONCURRENTLY // 并发消费
)
public class SyncMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("接收到消息: {}", message);
// 业务处理逻辑
processMessage(message);
}
private void processMessage(String message) {
// 模拟业务处理
log.info("处理消息: {}", message);
}
}
1.2 异步发送与异常处理消费
发送端:异步发送
@Service
@Slf4j
public class AsyncMessageProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 异步发送消息
* 特点:发送后立即返回,通过回调处理结果,适合高吞吐场景
*/
public void sendAsyncMessage() {
rocketMQTemplate.asyncSend("async-topic", "异步消息内容",
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("异步发送成功: {}", sendResult.getMsgId());
}
@Override
public void onException(Throwable throwable) {
log.error("异步发送失败", throwable);
// 可以在这里实现重试逻辑
retrySend();
}
}
);
}
}
消费端:异常处理消费
@Component
@Slf4j
@RocketMQMessageListener(
topic = "async-topic",
consumerGroup = "async-consumer",
maxReconsumeTimes = 3, // 最大重试次数
suspendCurrentQueueTimeMillis = 1000 // 失败后暂停1秒
)
public class AsyncMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
try {
log.info("开始处理异步消息: {}", message);
// 业务处理
boolean success = processBusiness(message);
if (!success) {
// 业务处理失败,抛出异常触发重试
throw new RuntimeException("业务处理失败");
}
log.info("消息处理完成");
} catch (Exception e) {
log.error("消息处理异常", e);
// 根据异常类型决定是否重试
if (e instanceof BusinessException) {
// 业务异常,不重试
log.warn("业务异常,消息丢弃: {}", message);
} else {
// 系统异常,需要重试
throw e;
}
}
}
}
二、顺序消息发送与消费
2.1 订单场景示例
发送端:顺序发送订单消息
@Service
@Slf4j
public class OrderMessageProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送订单相关消息
* 特点:相同订单ID的消息会按顺序发送到同一队列
*/
public void sendOrderMessages(String orderId) {
// 订单创建
sendOrderEvent(orderId, "CREATE", "订单创建");
// 订单支付
sendOrderEvent(orderId, "PAY", "订单支付");
// 订单发货
sendOrderEvent(orderId, "SHIP", "订单发货");
// 订单完成
sendOrderEvent(orderId, "COMPLETE", "订单完成");
}
/**
* 发送订单事件(顺序消息)
*/
private void sendOrderEvent(String orderId, String eventType, String eventDesc) {
OrderEvent event = new OrderEvent(orderId, eventType, eventDesc);
rocketMQTemplate.syncSendOrderly(
"order-topic", // Topic
event, // 消息内容
orderId, // 选择器参数(决定发送到哪个队列)
3000 // 超时时间
);
log.info("发送订单事件: 订单[{}] - {}", orderId, eventType);
}
}
@Data
@AllArgsConstructor
class OrderEvent {
private String orderId;
private String eventType;
private String eventDesc;
}
消费端:顺序消费订单消息
@Component
@Slf4j
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer",
consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式
selectorExpression = "*"
)
public class OrderMessageConsumer implements RocketMQListener<OrderEvent> {
// 用于跟踪每个订单的处理状态
private ConcurrentHashMap<String, OrderProcessor> orderProcessors =
new ConcurrentHashMap<>();
@Override
public void onMessage(OrderEvent event) {
String orderId = event.getOrderId();
log.info("开始处理订单事件: 订单[{}] - {}", orderId, event.getEventType());
// 获取或创建订单处理器
OrderProcessor processor = orderProcessors.computeIfAbsent(
orderId, k -> new OrderProcessor(orderId)
);
try {
// 按顺序处理订单事件
processor.processEvent(event);
log.info("订单事件处理完成: 订单[{}] - {}", orderId, event.getEventType());
} catch (Exception e) {
log.error("订单事件处理失败", e);
// 顺序消费中,抛出异常会触发重试
throw new RuntimeException("处理失败,需要重试", e);
} finally {
// 如果订单已完成,清理处理器
if (event.getEventType().equals("COMPLETE")) {
orderProcessors.remove(orderId);
log.info("订单处理完成,清理处理器: {}", orderId);
}
}
}
/**
* 订单处理器(内部类)
*/
private class OrderProcessor {
private String orderId;
private String currentStatus = "CREATED";
public OrderProcessor(String orderId) {
this.orderId = orderId;
}
public void processEvent(OrderEvent event) {
// 验证状态流转的合法性
validateStatusTransition(event.getEventType());
// 处理具体业务
switch (event.getEventType()) {
case "CREATE":
createOrder();
break;
case "PAY":
processPayment();
break;
case "SHIP":
shipOrder();
break;
case "COMPLETE":
completeOrder();
break;
default:
throw new IllegalArgumentException("未知事件类型");
}
// 更新状态
currentStatus = event.getEventType();
}
private void validateStatusTransition(String nextStatus) {
// 实现状态机验证
log.debug("状态流转: {} -> {}", currentStatus, nextStatus);
}
private void createOrder() { /* 创建订单逻辑 */ }
private void processPayment() { /* 支付逻辑 */ }
private void shipOrder() { /* 发货逻辑 */ }
private void completeOrder() { /* 完成逻辑 */ }
}
}
三、延迟消息发送与消费
3.1 订单超时检查场景
发送端:发送延迟消息
@Service
@Slf4j
public class DelayMessageProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送订单超时检查的延迟消息
* @param orderId 订单ID
* @param timeoutMinutes 超时时间(分钟)
*/
public void sendOrderTimeoutCheck(String orderId, int timeoutMinutes) {
// 将分钟转换为RocketMQ的延迟级别
int delayLevel = convertMinutesToDelayLevel(timeoutMinutes);
Message<String> message = MessageBuilder
.withPayload(orderId) // 消息体为订单ID
.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(delayLevel))
.setHeader("orderId", orderId)
.build();
rocketMQTemplate.syncSend("order-timeout-topic", message);
log.info("发送订单超时检查消息: 订单[{}], {}分钟后检查", orderId, timeoutMinutes);
}
/**
* 将分钟转换为RocketMQ延迟级别
* RocketMQ延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*/
private int convertMinutesToDelayLevel(int minutes) {
if (minutes <= 1) return 5; // 1分钟
else if (minutes <= 2) return 6; // 2分钟
else if (minutes <= 3) return 7; // 3分钟
else if (minutes <= 4) return 8; // 4分钟
else if (minutes <= 5) return 9; // 5分钟
else if (minutes <= 10) return 10; // 10分钟
else if (minutes <= 20) return 11; // 20分钟
else if (minutes <= 30) return 12; // 30分钟
else if (minutes <= 60) return 13; // 1小时
else if (minutes <= 120) return 14; // 2小时
else return 14; // 超过2小时按2小时处理
}
}
消费端:消费延迟消息进行检查
@Component
@Slf4j
@RocketMQMessageListener(
topic = "order-timeout-topic",
consumerGroup = "order-timeout-consumer",
selectorExpression = "*"
)
public class OrderTimeoutConsumer implements RocketMQListener<MessageExt> {
@Autowired
private OrderService orderService;
@Override
public void onMessage(MessageExt message) {
// 从消息体中获取订单ID
String orderId = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("检查订单超时: {}", orderId);
try {
// 查询订单当前状态
OrderStatus status = orderService.getOrderStatus(orderId);
if (status == OrderStatus.UNPAID) {
// 订单未支付,执行超时处理
log.warn("订单超时未支付: {}", orderId);
handleOrderTimeout(orderId);
} else if (status == OrderStatus.PAID) {
// 订单已支付,无需处理
log.info("订单已支付,无需处理: {}", orderId);
} else if (status == OrderStatus.CANCELLED) {
// 订单已取消
log.info("订单已取消: {}", orderId);
}
} catch (Exception e) {
log.error("订单超时检查失败", e);
// 可以记录到失败队列,人工处理
saveToErrorQueue(orderId, e.getMessage());
}
}
private void handleOrderTimeout(String orderId) {
// 1. 取消订单
orderService.cancelOrder(orderId);
// 2. 释放库存
inventoryService.releaseStock(orderId);
// 3. 发送通知
notificationService.sendOrderTimeoutAlert(orderId);
log.info("订单超时处理完成: {}", orderId);
}
private void saveToErrorQueue(String orderId, String errorMsg) {
// 将失败记录保存到数据库或Redis,供人工处理
ErrorLog errorLog = new ErrorLog();
errorLog.setOrderId(orderId);
errorLog.setErrorMsg(errorMsg);
errorLog.setCreateTime(new Date());
errorLogRepository.save(errorLog);
}
}
四、事务消息发送与消费
4.1 分布式事务场景
发送端:发送事务消息
@Service
@Slf4j
public class TransactionMessageProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 创建订单(分布式事务场景)
*/
@Transactional
public void createOrderWithTransaction(OrderCreateRequest request) {
// 1. 生成订单ID
String orderId = generateOrderId();
String txId = UUID.randomUUID().toString();
// 2. 准备消息
OrderCreatedEvent event = new OrderCreatedEvent(orderId, request);
Message<OrderCreatedEvent> message = MessageBuilder
.withPayload(event)
.setHeader("txId", txId) // 事务ID
.setHeader("orderId", orderId)
.build();
// 3. 发送事务消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"order-created-topic",
message,
request // 传递给事务监听器的业务参数
);
log.info("事务消息发送结果: {}", result.getLocalTransactionState());
}
}
@Data
@AllArgsConstructor
class OrderCreatedEvent {
private String orderId;
private OrderCreateRequest request;
}
消费端:事务消息监听器
@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "order-producer-group")
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderService orderService;
@Autowired
private InventoryService inventoryService;
/**
* 执行本地事务
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(
Message msg, Object arg) {
try {
String txId = (String) msg.getHeaders().get("txId");
String orderId = (String) msg.getHeaders().get("orderId");
log.info("开始执行本地事务,事务ID: {}, 订单ID: {}", txId, orderId);
OrderCreateRequest request = (OrderCreateRequest) arg;
// 1. 创建订单(本地数据库事务)
boolean orderCreated = orderService.createOrderInTransaction(
orderId, request, txId);
if (!orderCreated) {
log.error("创建订单失败,回滚");
return RocketMQLocalTransactionState.ROLLBACK;
}
// 2. 扣减库存(本地数据库事务)
boolean inventoryDeducted = inventoryService.deductInTransaction(
request.getItems(), txId);
if (!inventoryDeducted) {
log.error("扣减库存失败,回滚");
return RocketMQLocalTransactionState.ROLLBACK;
}
log.info("本地事务执行成功,提交");
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("本地事务执行异常", e);
return RocketMQLocalTransactionState.UNKNOWN;
}
}
/**
* 检查本地事务状态
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String txId = (String) msg.getHeaders().get("txId");
String orderId = (String) msg.getHeaders().get("orderId");
log.info("检查本地事务状态,事务ID: {}, 订单ID: {}", txId, orderId);
// 查询事务状态
TransactionStatus status = transactionService.queryStatus(txId);
switch (status) {
case COMMITTED:
return RocketMQLocalTransactionState.COMMIT;
case ROLLBACKED:
return RocketMQLocalTransactionState.ROLLBACK;
default:
// 继续等待或返回UNKNOWN让RocketMQ继续检查
return RocketMQLocalTransactionState.UNKNOWN;
}
}
}
消费端:事务消息的最终消费者
@Component
@Slf4j
@RocketMQMessageListener(
topic = "order-created-topic",
consumerGroup = "order-created-consumer",
selectorExpression = "*"
)
public class OrderCreatedConsumer implements RocketMQListener<OrderCreatedEvent> {
@Override
public void onMessage(OrderCreatedEvent event) {
log.info("接收到订单创建完成消息: {}", event.getOrderId());
// 这里的消息已经是事务提交后的最终消息
// 可以安全地执行后续操作
try {
// 1. 发送订单创建通知
notificationService.sendOrderCreated(event.getOrderId());
// 2. 更新搜索索引
searchService.indexOrder(event.getOrderId());
// 3. 发送到数据分析系统
analyticsService.trackOrderCreated(event);
log.info("订单创建后续处理完成: {}", event.getOrderId());
} catch (Exception e) {
log.error("订单创建后续处理失败", e);
// 记录错误,但不抛异常(避免重复消费)
errorService.recordError(event.getOrderId(), e);
}
}
}
五、批量消息发送与消费
5.1 日志收集场景
发送端:批量发送日志
@Service
@Slf4j
public class BatchLogProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 批量收集日志并发送
*/
public void sendBatchLogs(List<LogEntry> logs) {
if (logs.isEmpty()) {
return;
}
// 将日志列表转换为消息列表
List<Message<LogEntry>> messages = logs.stream()
.map(log -> MessageBuilder
.withPayload(log)
.setHeader("logType", log.getType())
.setHeader("timestamp", System.currentTimeMillis())
.build())
.collect(Collectors.toList());
// 批量发送
SendResult result = rocketMQTemplate.syncSend(
"log-collect-topic",
messages,
5000 // 超时时间
);
log.info("批量发送日志完成,数量: {}", logs.size());
}
/**
* 异步批量发送(更高吞吐)
*/
public void sendBatchLogsAsync(List<LogEntry> logs) {
CompletableFuture<SendResult> future = rocketMQTemplate.asyncSend(
"log-collect-topic",
logs,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.debug("日志批量发送成功,数量: {}", logs.size());
}
@Override
public void onException(Throwable throwable) {
log.error("日志批量发送失败", throwable);
// 可以在这里实现重试或降级逻辑
}
}
);
}
}
消费端:批量消费处理
@Component
@Slf4j
@RocketMQMessageListener(
topic = "log-collect-topic",
consumerGroup = "log-consumer",
consumeMessageBatchMaxSize = 50, // 每次最多消费50条
pullBatchSize = 32 // 每次拉取32条
)
public class BatchLogConsumer implements RocketMQListener<List<MessageExt>> {
@Autowired
private LogStorageService logStorageService;
@Override
public void onMessage(List<MessageExt> messages) {
log.info("批量接收到 {} 条日志", messages.size());
// 批量处理消息
List<LogEntry> logs = new ArrayList<>();
for (MessageExt message : messages) {
try {
// 反序列化日志对象
LogEntry logEntry = deserializeLogEntry(message);
logs.add(logEntry);
} catch (Exception e) {
log.error("日志反序列化失败", e);
// 记录错误,但不影响其他日志处理
recordDeserializeError(message);
}
}
if (!logs.isEmpty()) {
// 批量存储到数据库
boolean success = logStorageService.batchSave(logs);
if (!success) {
log.error("日志批量存储失败");
// 可以在这里实现补偿逻辑
} else {
log.debug("日志批量存储成功,数量: {}", logs.size());
}
}
}
private LogEntry deserializeLogEntry(MessageExt message) {
// 从消息体中反序列化LogEntry
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(message.getBody(), LogEntry.class);
}
}
六、过滤消息发送与消费
6.1 多租户消息过滤
发送端:发送带属性的消息
@Service
@Slf4j
public class FilterMessageProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送带属性的消息(支持SQL92过滤)
*/
public void sendMessageWithProperties() {
// 发送用户注册消息
UserRegisterEvent event = new UserRegisterEvent("user123", "tenantA");
Message<UserRegisterEvent> message = MessageBuilder
.withPayload(event)
.setHeader(MessageConst.PROPERTY_TAGS, "user.register") // 标签
.setHeader("tenantId", "tenantA") // 租户ID
.setHeader("userType", "VIP") // 用户类型
.setHeader("region", "CN") // 区域
.build();
rocketMQTemplate.syncSend("user-events", message);
log.info("发送用户注册消息: {}", event.getUserId());
}
/**
* 发送不同条件的消息
*/
public void sendVariousMessages() {
// 消息1:租户A的VIP用户
sendFilteredMessage("tenantA", "VIP", "user.event");
// 消息2:租户B的普通用户
sendFilteredMessage("tenantB", "NORMAL", "user.event");
// 消息3:租户A的普通用户
sendFilteredMessage("tenantA", "NORMAL", "user.event");
}
private void sendFilteredMessage(String tenantId, String userType, String tag) {
Map<String, Object> headers = new HashMap<>();
headers.put("tenantId", tenantId);
headers.put("userType", userType);
headers.put(MessageConst.PROPERTY_TAGS, tag);
Message<String> message = MessageBuilder
.withPayload("消息内容")
.copyHeaders(headers)
.build();
rocketMQTemplate.syncSend("filter-topic", message);
}
}
消费端:使用SQL92过滤消费
@Component
@Slf4j
public class FilterMessageConsumers {
/**
* 消费者1:只消费租户A的VIP用户消息
*/
@RocketMQMessageListener(
topic = "filter-topic",
consumerGroup = "tenantA-vip-consumer",
selectorType = SelectorType.SQL92,
selectorExpression = "tenantId = 'tenantA' and userType = 'VIP'"
)
public class TenantAVipConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
log.info("租户A-VIP消费者收到消息: {}", message.getMsgId());
// 处理租户A的VIP用户逻辑
}
}
/**
* 消费者2:消费所有租户的VIP用户消息
*/
@RocketMQMessageListener(
topic = "filter-topic",
consumerGroup = "all-vip-consumer",
selectorType = SelectorType.SQL92,
selectorExpression = "userType = 'VIP'"
)
public class AllVipConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String tenantId = message.getProperties().get("tenantId");
log.info("全租户-VIP消费者收到消息,租户: {}, 消息ID: {}",
tenantId, message.getMsgId());
// 处理所有VIP用户逻辑
}
}
/**
* 消费者3:消费租户B的所有消息
*/
@RocketMQMessageListener(
topic = "filter-topic",
consumerGroup = "tenantB-consumer",
selectorType = SelectorType.SQL92,
selectorExpression = "tenantId = 'tenantB'"
)
public class TenantBConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String userType = message.getProperties().get("userType");
log.info("租户B消费者收到消息,用户类型: {}, 消息ID: {}",
userType, message.getMsgId());
// 处理租户B的所有消息
}
}
}
七、广播消息发送与消费
7.1 配置更新广播场景
发送端:发送广播消息
@Service
@Slf4j
public class BroadcastMessageProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送配置更新广播消息
* 特点:所有订阅的消费者都会收到消息
*/
public void broadcastConfigUpdate(ConfigUpdateEvent event) {
Message<ConfigUpdateEvent> message = MessageBuilder
.withPayload(event)
.setHeader("configType", event.getConfigType())
.setHeader("version", event.getVersion())
.build();
// 广播消息通常使用异步发送
rocketMQTemplate.asyncSend("config-update-topic", message,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("配置更新广播发送成功");
}
@Override
public void onException(Throwable throwable) {
log.error("配置更新广播发送失败", throwable);
// 配置更新很重要,需要重试
retryBroadcast(event);
}
}
);
}
/**
* 发送全量配置同步广播
*/
public void broadcastFullConfigSync() {
Map<String, Object> fullConfig = configService.getAllConfigs();
// 对于大数据量的广播,可以考虑分片发送
int chunkSize = 100; // 每片100条配置
List<Map<String, Object>> chunks = splitConfigs(fullConfig, chunkSize);
for (int i = 0; i < chunks.size(); i++) {
ConfigChunkEvent chunk = new ConfigChunkEvent(i, chunks.size(), chunks.get(i));
sendConfigChunk(chunk);
}
}
private void sendConfigChunk(ConfigChunkEvent chunk) {
rocketMQTemplate.syncSend("config-sync-topic", chunk);
}
}
消费端:广播模式消费
@Component
@Slf4j
@RocketMQMessageListener(
topic = "config-update-topic",
consumerGroup = "config-update-consumer",
messageModel = MessageModel.BROADCASTING, // 广播模式
selectorExpression = "*"
)
public class ConfigUpdateConsumer implements RocketMQListener<ConfigUpdateEvent> {
@Value("${spring.application.name}")
private String applicationName;
@Value("${server.port}")
private String serverPort;
@Override
public void onMessage(ConfigUpdateEvent event) {
log.info("应用[{}:{}]接收到配置更新: {}",
applicationName, serverPort, event.getConfigType());
try {
// 1. 更新本地缓存
configCache.refresh(event.getConfigType(), event.getConfigData());
// 2. 记录更新日志
logConfigUpdate(event);
// 3. 通知相关组件
notifyComponents(event);
log.info("配置更新处理完成: {}", event.getConfigType());
} catch (Exception e) {
log.error("配置更新处理失败", e);
// 广播模式下,一个实例失败不影响其他实例
// 可以记录错误,供后续分析
recordError(event, e);
}
}
/**
* 全量配置同步消费者
*/
@RocketMQMessageListener(
topic = "config-sync-topic",
consumerGroup = "config-sync-consumer",
messageModel = MessageModel.BROADCASTING,
selectorExpression = "*"
)
public class ConfigSyncConsumer implements RocketMQListener<ConfigChunkEvent> {
// 用于收集配置分片
private Map<Integer, ConfigChunkEvent> chunkMap = new ConcurrentHashMap<>();
@Override
public void onMessage(ConfigChunkEvent chunk) {
log.info("接收到配置分片: {}/{}", chunk.getChunkIndex() + 1, chunk.getTotalChunks());
// 保存分片
chunkMap.put(chunk.getChunkIndex(), chunk);
// 检查是否收集完所有分片
if (chunkMap.size() == chunk.getTotalChunks()) {
try {
// 合并所有分片
Map<String, Object> fullConfig = mergeChunks(chunkMap);
// 应用全量配置
applyFullConfig(fullConfig);
log.info("全量配置同步完成,共{}条配置", fullConfig.size());
// 清理分片缓存
chunkMap.clear();
} catch (Exception e) {
log.error("配置分片合并失败", e);
}
}
}
private Map<String, Object> mergeChunks(Map<Integer, ConfigChunkEvent> chunks) {
Map<String, Object> fullConfig = new HashMap<>();
for (int i = 0; i < chunks.size(); i++) {
ConfigChunkEvent chunk = chunks.get(i);
if (chunk != null) {
fullConfig.putAll(chunk.getConfigData());
}
}
return fullConfig;
}
}
}
八、配套配置与最佳实践
8.1 统一配置类
rocketmq:
name-server: 127.0.0.1:9876 # NameServer地址
producer:
group: ${spring.application.name}-producer-group # 生产者组
send-message-timeout: 3000 # 发送超时时间(ms)
max-message-size: 5242880 # 最大消息大小(5MB)
compress-message-body-threshold: 8192 # 消息压缩阈值
retry-times-when-send-failed: 2 # 同步发送失败重试次数
retry-times-when-send-async-failed: 2 # 异步发送失败重试次数
retry-next-server: true # 发送失败时是否重试其他Broker
consumer:
# 消费端优化
pull-batch-size: 32 # 每次拉取消息数
consume-thread-min: 10
consume-thread-max: 64
adjust-threadpool-nums-threshold: 100000 # 调整线程池阈值
pull-threshold-for-queue: 1000 # 队列拉取阈值
pull-interval: 0 # 拉取间隔,0表示不等待
@Configuration
@Slf4j
public class RocketMQConfig {
/**
* 生产环境配置
*/
@Profile("prod")
@Configuration
public static class ProdConfig {
@Bean
public RocketMQTemplate rocketMQTemplate(
@Value("${rocketmq.name-server}") String nameServer) {
RocketMQTemplate template = new RocketMQTemplate();
// 配置生产者
DefaultMQProducer producer = new DefaultMQProducer("prod-producer-group");
producer.setNamesrvAddr(nameServer);
producer.setSendMsgTimeout(5000);
producer.setRetryTimesWhenSendFailed(3);
producer.setMaxMessageSize(1024 * 1024 * 4); // 4MB
template.setProducer(producer);
template.setMessageConverter(jacksonMessageConverter());
return template;
}
}
/**
* 开发环境配置
*/
@Profile("dev")
@Configuration
public static class DevConfig {
@Bean
public RocketMQTemplate rocketMQTemplate(
@Value("${rocketmq.name-server:127.0.0.1:9876}") String nameServer) {
RocketMQTemplate template = new RocketMQTemplate();
DefaultMQProducer producer = new DefaultMQProducer("dev-producer-group");
producer.setNamesrvAddr(nameServer);
producer.setSendMsgTimeout(3000);
template.setProducer(producer);
return template;
}
}
/**
* 消息转换器配置
*/
@Bean
public MessageConverter jacksonMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
converter.setObjectMapper(objectMapper);
converter.setSerializedPayloadClass(String.class);
return converter;
}
/**
* 监控配置
*/
@Bean
public MQMonitor mqMonitor(RocketMQTemplate rocketMQTemplate) {
return new MQMonitor(rocketMQTemplate);
}
}
8.2 监控和告警
@Component
@Slf4j
public class MQMonitor {
private final RocketMQTemplate rocketMQTemplate;
private final MeterRegistry meterRegistry;
public MQMonitor(RocketMQTemplate rocketMQTemplate, MeterRegistry meterRegistry) {
this.rocketMQTemplate = rocketMQTemplate;
this.meterRegistry = meterRegistry;
// 启动监控
startMonitoring();
}
/**
* 监控消息发送
*/
public void monitorSend(String topic, long duration, boolean success) {
// 记录指标
meterRegistry.timer("rocketmq.send.duration", "topic", topic)
.record(duration, TimeUnit.MILLISECONDS);
meterRegistry.counter("rocketmq.send.total", "topic", topic)
.increment();
if (!success) {
meterRegistry.counter("rocketmq.send.error", "topic", topic)
.increment();
}
}
/**
* 监控消息消费
*/
public void monitorConsume(String consumerGroup, String topic, long duration, boolean success) {
meterRegistry.timer("rocketmq.consume.duration",
"consumerGroup", consumerGroup, "topic", topic)
.record(duration, TimeUnit.MILLISECONDS);
if (!success) {
meterRegistry.counter("rocketmq.consume.error",
"consumerGroup", consumerGroup, "topic", topic)
.increment();
}
}
/**
* 检查消息堆积
*/
@Scheduled(fixedDelay = 60000) // 每分钟检查一次
public void checkMessageBacklog() {
// 实现消息堆积检查逻辑
// 可以通过RocketMQ管理API或直接查询Broker获取堆积情况
// 如果堆积超过阈值,发送告警
if (backlogExceedsThreshold()) {
sendAlert("消息堆积告警", getBacklogInfo());
}
}
}
总结
核心配套原则:
-
一对一匹配:发送和消费的Topic、Tag要匹配
-
序列化一致:发送端和消费端的消息序列化方式要一致
-
异常处理配套:发送失败的重试策略和消费失败的异常处理要协调
-
监控配套:发送和消费的监控指标要统一收集和分析
使用建议:
-
普通消息:使用同步发送 + 并发消费,适合大多数业务场景
-
顺序消息:使用同步顺序发送 + 顺序消费,保证消息处理顺序
-
延迟消息:使用延迟发送 + 普通消费,实现定时任务
-
事务消息:使用事务发送 + 事务监听器 + 最终消费者,保证分布式事务一致性
-
批量消息:使用批量发送 + 批量消费,提高吞吐量
-
过滤消息:使用属性发送 + SQL92过滤消费,实现精细化的消息路由
-
广播消息:使用异步发送 + 广播消费,实现配置同步等场景
更多推荐

所有评论(0)