目录

一、基础消息发送与消费

二、顺序消息发送与消费

三、延迟消息发送与消费

四、事务消息发送与消费

五、批量消息发送与消费

六、过滤消息发送与消费

七、广播消息发送与消费

八、配套配置与最佳实践

总结

一、基础消息发送与消费

1.1 同步发送与基础消费

发送端:同步发送

@Service
@Slf4j
public class SyncMessageProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 同步发送消息
     * 特点:发送后等待Broker响应,适合重要业务消息
     */
    public SendResult sendSyncMessage() {
        // 构建消息
        Message<String> message = MessageBuilder
            .withPayload("Hello RocketMQ")  // 消息内容
            .setHeader(MessageConst.PROPERTY_KEYS, "msg-key-001")  // 消息键
            .setHeader(MessageConst.PROPERTY_TAGS, "TagA")  // 消息标签
            .build();
        
        // 发送消息
        SendResult result = rocketMQTemplate.syncSend(
            "test-topic:TagA",  // topic:tag
            message,  // 消息
            3000  // 超时时间(ms)
        );
        
        log.info("同步发送成功,消息ID: {}", result.getMsgId());
        return result;
    }
}

消费端:基础消费

@Component
@Slf4j
@RocketMQMessageListener(
    topic = "test-topic",           // 监听的Topic
    consumerGroup = "test-consumer", // 消费者组
    selectorExpression = "TagA",     // 只消费TagA的消息
    consumeMode = ConsumeMode.CONCURRENTLY  // 并发消费
)
public class SyncMessageConsumer implements RocketMQListener<String> {
    
    @Override
    public void onMessage(String message) {
        log.info("接收到消息: {}", message);
        // 业务处理逻辑
        processMessage(message);
    }
    
    private void processMessage(String message) {
        // 模拟业务处理
        log.info("处理消息: {}", message);
    }
}

1.2 异步发送与异常处理消费

发送端:异步发送

@Service
@Slf4j
public class AsyncMessageProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 异步发送消息
     * 特点:发送后立即返回,通过回调处理结果,适合高吞吐场景
     */
    public void sendAsyncMessage() {
        rocketMQTemplate.asyncSend("async-topic", "异步消息内容", 
            new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("异步发送成功: {}", sendResult.getMsgId());
                }
                
                @Override
                public void onException(Throwable throwable) {
                    log.error("异步发送失败", throwable);
                    // 可以在这里实现重试逻辑
                    retrySend();
                }
            }
        );
    }
}

消费端:异常处理消费

@Component
@Slf4j
@RocketMQMessageListener(
    topic = "async-topic",
    consumerGroup = "async-consumer",
    maxReconsumeTimes = 3,  // 最大重试次数
    suspendCurrentQueueTimeMillis = 1000  // 失败后暂停1秒
)
public class AsyncMessageConsumer implements RocketMQListener<String> {
    
    @Override
    public void onMessage(String message) {
        try {
            log.info("开始处理异步消息: {}", message);
            
            // 业务处理
            boolean success = processBusiness(message);
            
            if (!success) {
                // 业务处理失败,抛出异常触发重试
                throw new RuntimeException("业务处理失败");
            }
            
            log.info("消息处理完成");
            
        } catch (Exception e) {
            log.error("消息处理异常", e);
            // 根据异常类型决定是否重试
            if (e instanceof BusinessException) {
                // 业务异常,不重试
                log.warn("业务异常,消息丢弃: {}", message);
            } else {
                // 系统异常,需要重试
                throw e;
            }
        }
    }
}

二、顺序消息发送与消费

2.1 订单场景示例

发送端:顺序发送订单消息

@Service
@Slf4j
public class OrderMessageProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 发送订单相关消息
     * 特点:相同订单ID的消息会按顺序发送到同一队列
     */
    public void sendOrderMessages(String orderId) {
        // 订单创建
        sendOrderEvent(orderId, "CREATE", "订单创建");
        
        // 订单支付
        sendOrderEvent(orderId, "PAY", "订单支付");
        
        // 订单发货
        sendOrderEvent(orderId, "SHIP", "订单发货");
        
        // 订单完成
        sendOrderEvent(orderId, "COMPLETE", "订单完成");
    }
    
    /**
     * 发送订单事件(顺序消息)
     */
    private void sendOrderEvent(String orderId, String eventType, String eventDesc) {
        OrderEvent event = new OrderEvent(orderId, eventType, eventDesc);
        
        rocketMQTemplate.syncSendOrderly(
            "order-topic",  // Topic
            event,          // 消息内容
            orderId,        // 选择器参数(决定发送到哪个队列)
            3000            // 超时时间
        );
        
        log.info("发送订单事件: 订单[{}] - {}", orderId, eventType);
    }
}

@Data
@AllArgsConstructor
class OrderEvent {
    private String orderId;
    private String eventType;
    private String eventDesc;
}

消费端:顺序消费订单消息

@Component
@Slf4j
@RocketMQMessageListener(
    topic = "order-topic",
    consumerGroup = "order-consumer",
    consumeMode = ConsumeMode.ORDERLY,  // 顺序消费模式
    selectorExpression = "*"
)
public class OrderMessageConsumer implements RocketMQListener<OrderEvent> {
    
    // 用于跟踪每个订单的处理状态
    private ConcurrentHashMap<String, OrderProcessor> orderProcessors = 
        new ConcurrentHashMap<>();
    
    @Override
    public void onMessage(OrderEvent event) {
        String orderId = event.getOrderId();
        
        log.info("开始处理订单事件: 订单[{}] - {}", orderId, event.getEventType());
        
        // 获取或创建订单处理器
        OrderProcessor processor = orderProcessors.computeIfAbsent(
            orderId, k -> new OrderProcessor(orderId)
        );
        
        try {
            // 按顺序处理订单事件
            processor.processEvent(event);
            
            log.info("订单事件处理完成: 订单[{}] - {}", orderId, event.getEventType());
            
        } catch (Exception e) {
            log.error("订单事件处理失败", e);
            // 顺序消费中,抛出异常会触发重试
            throw new RuntimeException("处理失败,需要重试", e);
        } finally {
            // 如果订单已完成,清理处理器
            if (event.getEventType().equals("COMPLETE")) {
                orderProcessors.remove(orderId);
                log.info("订单处理完成,清理处理器: {}", orderId);
            }
        }
    }
    
    /**
     * 订单处理器(内部类)
     */
    private class OrderProcessor {
        private String orderId;
        private String currentStatus = "CREATED";
        
        public OrderProcessor(String orderId) {
            this.orderId = orderId;
        }
        
        public void processEvent(OrderEvent event) {
            // 验证状态流转的合法性
            validateStatusTransition(event.getEventType());
            
            // 处理具体业务
            switch (event.getEventType()) {
                case "CREATE":
                    createOrder();
                    break;
                case "PAY":
                    processPayment();
                    break;
                case "SHIP":
                    shipOrder();
                    break;
                case "COMPLETE":
                    completeOrder();
                    break;
                default:
                    throw new IllegalArgumentException("未知事件类型");
            }
            
            // 更新状态
            currentStatus = event.getEventType();
        }
        
        private void validateStatusTransition(String nextStatus) {
            // 实现状态机验证
            log.debug("状态流转: {} -> {}", currentStatus, nextStatus);
        }
        
        private void createOrder() { /* 创建订单逻辑 */ }
        private void processPayment() { /* 支付逻辑 */ }
        private void shipOrder() { /* 发货逻辑 */ }
        private void completeOrder() { /* 完成逻辑 */ }
    }
}

三、延迟消息发送与消费

3.1 订单超时检查场景

发送端:发送延迟消息

@Service
@Slf4j
public class DelayMessageProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 发送订单超时检查的延迟消息
     * @param orderId 订单ID
     * @param timeoutMinutes 超时时间(分钟)
     */
    public void sendOrderTimeoutCheck(String orderId, int timeoutMinutes) {
        // 将分钟转换为RocketMQ的延迟级别
        int delayLevel = convertMinutesToDelayLevel(timeoutMinutes);
        
        Message<String> message = MessageBuilder
            .withPayload(orderId)  // 消息体为订单ID
            .setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(delayLevel))
            .setHeader("orderId", orderId)
            .build();
        
        rocketMQTemplate.syncSend("order-timeout-topic", message);
        
        log.info("发送订单超时检查消息: 订单[{}], {}分钟后检查", orderId, timeoutMinutes);
    }
    
    /**
     * 将分钟转换为RocketMQ延迟级别
     * RocketMQ延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     */
    private int convertMinutesToDelayLevel(int minutes) {
        if (minutes <= 1) return 5;      // 1分钟
        else if (minutes <= 2) return 6; // 2分钟
        else if (minutes <= 3) return 7; // 3分钟
        else if (minutes <= 4) return 8; // 4分钟
        else if (minutes <= 5) return 9; // 5分钟
        else if (minutes <= 10) return 10; // 10分钟
        else if (minutes <= 20) return 11; // 20分钟
        else if (minutes <= 30) return 12; // 30分钟
        else if (minutes <= 60) return 13; // 1小时
        else if (minutes <= 120) return 14; // 2小时
        else return 14; // 超过2小时按2小时处理
    }
}

消费端:消费延迟消息进行检查

@Component
@Slf4j
@RocketMQMessageListener(
    topic = "order-timeout-topic",
    consumerGroup = "order-timeout-consumer",
    selectorExpression = "*"
)
public class OrderTimeoutConsumer implements RocketMQListener<MessageExt> {
    
    @Autowired
    private OrderService orderService;
    
    @Override
    public void onMessage(MessageExt message) {
        // 从消息体中获取订单ID
        String orderId = new String(message.getBody(), StandardCharsets.UTF_8);
        
        log.info("检查订单超时: {}", orderId);
        
        try {
            // 查询订单当前状态
            OrderStatus status = orderService.getOrderStatus(orderId);
            
            if (status == OrderStatus.UNPAID) {
                // 订单未支付,执行超时处理
                log.warn("订单超时未支付: {}", orderId);
                handleOrderTimeout(orderId);
            } else if (status == OrderStatus.PAID) {
                // 订单已支付,无需处理
                log.info("订单已支付,无需处理: {}", orderId);
            } else if (status == OrderStatus.CANCELLED) {
                // 订单已取消
                log.info("订单已取消: {}", orderId);
            }
            
        } catch (Exception e) {
            log.error("订单超时检查失败", e);
            // 可以记录到失败队列,人工处理
            saveToErrorQueue(orderId, e.getMessage());
        }
    }
    
    private void handleOrderTimeout(String orderId) {
        // 1. 取消订单
        orderService.cancelOrder(orderId);
        
        // 2. 释放库存
        inventoryService.releaseStock(orderId);
        
        // 3. 发送通知
        notificationService.sendOrderTimeoutAlert(orderId);
        
        log.info("订单超时处理完成: {}", orderId);
    }
    
    private void saveToErrorQueue(String orderId, String errorMsg) {
        // 将失败记录保存到数据库或Redis,供人工处理
        ErrorLog errorLog = new ErrorLog();
        errorLog.setOrderId(orderId);
        errorLog.setErrorMsg(errorMsg);
        errorLog.setCreateTime(new Date());
        errorLogRepository.save(errorLog);
    }
}

四、事务消息发送与消费

4.1 分布式事务场景

发送端:发送事务消息

@Service
@Slf4j
public class TransactionMessageProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 创建订单(分布式事务场景)
     */
    @Transactional
    public void createOrderWithTransaction(OrderCreateRequest request) {
        // 1. 生成订单ID
        String orderId = generateOrderId();
        String txId = UUID.randomUUID().toString();
        
        // 2. 准备消息
        OrderCreatedEvent event = new OrderCreatedEvent(orderId, request);
        
        Message<OrderCreatedEvent> message = MessageBuilder
            .withPayload(event)
            .setHeader("txId", txId)  // 事务ID
            .setHeader("orderId", orderId)
            .build();
        
        // 3. 发送事务消息
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
            "order-created-topic",
            message,
            request  // 传递给事务监听器的业务参数
        );
        
        log.info("事务消息发送结果: {}", result.getLocalTransactionState());
    }
}

@Data
@AllArgsConstructor
class OrderCreatedEvent {
    private String orderId;
    private OrderCreateRequest request;
}

消费端:事务消息监听器

@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "order-producer-group")
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
    
    @Autowired
    private OrderService orderService;
    
    @Autowired
    private InventoryService inventoryService;
    
    /**
     * 执行本地事务
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(
            Message msg, Object arg) {
        
        try {
            String txId = (String) msg.getHeaders().get("txId");
            String orderId = (String) msg.getHeaders().get("orderId");
            
            log.info("开始执行本地事务,事务ID: {}, 订单ID: {}", txId, orderId);
            
            OrderCreateRequest request = (OrderCreateRequest) arg;
            
            // 1. 创建订单(本地数据库事务)
            boolean orderCreated = orderService.createOrderInTransaction(
                orderId, request, txId);
            
            if (!orderCreated) {
                log.error("创建订单失败,回滚");
                return RocketMQLocalTransactionState.ROLLBACK;
            }
            
            // 2. 扣减库存(本地数据库事务)
            boolean inventoryDeducted = inventoryService.deductInTransaction(
                request.getItems(), txId);
            
            if (!inventoryDeducted) {
                log.error("扣减库存失败,回滚");
                return RocketMQLocalTransactionState.ROLLBACK;
            }
            
            log.info("本地事务执行成功,提交");
            return RocketMQLocalTransactionState.COMMIT;
            
        } catch (Exception e) {
            log.error("本地事务执行异常", e);
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
    
    /**
     * 检查本地事务状态
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String txId = (String) msg.getHeaders().get("txId");
        String orderId = (String) msg.getHeaders().get("orderId");
        
        log.info("检查本地事务状态,事务ID: {}, 订单ID: {}", txId, orderId);
        
        // 查询事务状态
        TransactionStatus status = transactionService.queryStatus(txId);
        
        switch (status) {
            case COMMITTED:
                return RocketMQLocalTransactionState.COMMIT;
            case ROLLBACKED:
                return RocketMQLocalTransactionState.ROLLBACK;
            default:
                // 继续等待或返回UNKNOWN让RocketMQ继续检查
                return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}

消费端:事务消息的最终消费者

@Component
@Slf4j
@RocketMQMessageListener(
    topic = "order-created-topic",
    consumerGroup = "order-created-consumer",
    selectorExpression = "*"
)
public class OrderCreatedConsumer implements RocketMQListener<OrderCreatedEvent> {
    
    @Override
    public void onMessage(OrderCreatedEvent event) {
        log.info("接收到订单创建完成消息: {}", event.getOrderId());
        
        // 这里的消息已经是事务提交后的最终消息
        // 可以安全地执行后续操作
        
        try {
            // 1. 发送订单创建通知
            notificationService.sendOrderCreated(event.getOrderId());
            
            // 2. 更新搜索索引
            searchService.indexOrder(event.getOrderId());
            
            // 3. 发送到数据分析系统
            analyticsService.trackOrderCreated(event);
            
            log.info("订单创建后续处理完成: {}", event.getOrderId());
            
        } catch (Exception e) {
            log.error("订单创建后续处理失败", e);
            // 记录错误,但不抛异常(避免重复消费)
            errorService.recordError(event.getOrderId(), e);
        }
    }
}

五、批量消息发送与消费

5.1 日志收集场景

发送端:批量发送日志

@Service
@Slf4j
public class BatchLogProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 批量收集日志并发送
     */
    public void sendBatchLogs(List<LogEntry> logs) {
        if (logs.isEmpty()) {
            return;
        }
        
        // 将日志列表转换为消息列表
        List<Message<LogEntry>> messages = logs.stream()
            .map(log -> MessageBuilder
                .withPayload(log)
                .setHeader("logType", log.getType())
                .setHeader("timestamp", System.currentTimeMillis())
                .build())
            .collect(Collectors.toList());
        
        // 批量发送
        SendResult result = rocketMQTemplate.syncSend(
            "log-collect-topic",
            messages,
            5000  // 超时时间
        );
        
        log.info("批量发送日志完成,数量: {}", logs.size());
    }
    
    /**
     * 异步批量发送(更高吞吐)
     */
    public void sendBatchLogsAsync(List<LogEntry> logs) {
        CompletableFuture<SendResult> future = rocketMQTemplate.asyncSend(
            "log-collect-topic",
            logs,
            new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.debug("日志批量发送成功,数量: {}", logs.size());
                }
                
                @Override
                public void onException(Throwable throwable) {
                    log.error("日志批量发送失败", throwable);
                    // 可以在这里实现重试或降级逻辑
                }
            }
        );
    }
}

消费端:批量消费处理

@Component
@Slf4j
@RocketMQMessageListener(
    topic = "log-collect-topic",
    consumerGroup = "log-consumer",
    consumeMessageBatchMaxSize = 50,  // 每次最多消费50条
    pullBatchSize = 32                // 每次拉取32条
)
public class BatchLogConsumer implements RocketMQListener<List<MessageExt>> {
    
    @Autowired
    private LogStorageService logStorageService;
    
    @Override
    public void onMessage(List<MessageExt> messages) {
        log.info("批量接收到 {} 条日志", messages.size());
        
        // 批量处理消息
        List<LogEntry> logs = new ArrayList<>();
        
        for (MessageExt message : messages) {
            try {
                // 反序列化日志对象
                LogEntry logEntry = deserializeLogEntry(message);
                logs.add(logEntry);
                
            } catch (Exception e) {
                log.error("日志反序列化失败", e);
                // 记录错误,但不影响其他日志处理
                recordDeserializeError(message);
            }
        }
        
        if (!logs.isEmpty()) {
            // 批量存储到数据库
            boolean success = logStorageService.batchSave(logs);
            
            if (!success) {
                log.error("日志批量存储失败");
                // 可以在这里实现补偿逻辑
            } else {
                log.debug("日志批量存储成功,数量: {}", logs.size());
            }
        }
    }
    
    private LogEntry deserializeLogEntry(MessageExt message) {
        // 从消息体中反序列化LogEntry
        ObjectMapper mapper = new ObjectMapper();
        return mapper.readValue(message.getBody(), LogEntry.class);
    }
}

六、过滤消息发送与消费

6.1 多租户消息过滤

发送端:发送带属性的消息

@Service
@Slf4j
public class FilterMessageProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 发送带属性的消息(支持SQL92过滤)
     */
    public void sendMessageWithProperties() {
        // 发送用户注册消息
        UserRegisterEvent event = new UserRegisterEvent("user123", "tenantA");
        
        Message<UserRegisterEvent> message = MessageBuilder
            .withPayload(event)
            .setHeader(MessageConst.PROPERTY_TAGS, "user.register")  // 标签
            .setHeader("tenantId", "tenantA")  // 租户ID
            .setHeader("userType", "VIP")      // 用户类型
            .setHeader("region", "CN")         // 区域
            .build();
        
        rocketMQTemplate.syncSend("user-events", message);
        
        log.info("发送用户注册消息: {}", event.getUserId());
    }
    
    /**
     * 发送不同条件的消息
     */
    public void sendVariousMessages() {
        // 消息1:租户A的VIP用户
        sendFilteredMessage("tenantA", "VIP", "user.event");
        
        // 消息2:租户B的普通用户
        sendFilteredMessage("tenantB", "NORMAL", "user.event");
        
        // 消息3:租户A的普通用户
        sendFilteredMessage("tenantA", "NORMAL", "user.event");
    }
    
    private void sendFilteredMessage(String tenantId, String userType, String tag) {
        Map<String, Object> headers = new HashMap<>();
        headers.put("tenantId", tenantId);
        headers.put("userType", userType);
        headers.put(MessageConst.PROPERTY_TAGS, tag);
        
        Message<String> message = MessageBuilder
            .withPayload("消息内容")
            .copyHeaders(headers)
            .build();
        
        rocketMQTemplate.syncSend("filter-topic", message);
    }
}

消费端:使用SQL92过滤消费

@Component
@Slf4j
public class FilterMessageConsumers {
    
    /**
     * 消费者1:只消费租户A的VIP用户消息
     */
    @RocketMQMessageListener(
        topic = "filter-topic",
        consumerGroup = "tenantA-vip-consumer",
        selectorType = SelectorType.SQL92,
        selectorExpression = "tenantId = 'tenantA' and userType = 'VIP'"
    )
    public class TenantAVipConsumer implements RocketMQListener<MessageExt> {
        @Override
        public void onMessage(MessageExt message) {
            log.info("租户A-VIP消费者收到消息: {}", message.getMsgId());
            // 处理租户A的VIP用户逻辑
        }
    }
    
    /**
     * 消费者2:消费所有租户的VIP用户消息
     */
    @RocketMQMessageListener(
        topic = "filter-topic",
        consumerGroup = "all-vip-consumer",
        selectorType = SelectorType.SQL92,
        selectorExpression = "userType = 'VIP'"
    )
    public class AllVipConsumer implements RocketMQListener<MessageExt> {
        @Override
        public void onMessage(MessageExt message) {
            String tenantId = message.getProperties().get("tenantId");
            log.info("全租户-VIP消费者收到消息,租户: {}, 消息ID: {}", 
                tenantId, message.getMsgId());
            // 处理所有VIP用户逻辑
        }
    }
    
    /**
     * 消费者3:消费租户B的所有消息
     */
    @RocketMQMessageListener(
        topic = "filter-topic",
        consumerGroup = "tenantB-consumer",
        selectorType = SelectorType.SQL92,
        selectorExpression = "tenantId = 'tenantB'"
    )
    public class TenantBConsumer implements RocketMQListener<MessageExt> {
        @Override
        public void onMessage(MessageExt message) {
            String userType = message.getProperties().get("userType");
            log.info("租户B消费者收到消息,用户类型: {}, 消息ID: {}", 
                userType, message.getMsgId());
            // 处理租户B的所有消息
        }
    }
}

七、广播消息发送与消费

7.1 配置更新广播场景

发送端:发送广播消息

@Service
@Slf4j
public class BroadcastMessageProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 发送配置更新广播消息
     * 特点:所有订阅的消费者都会收到消息
     */
    public void broadcastConfigUpdate(ConfigUpdateEvent event) {
        Message<ConfigUpdateEvent> message = MessageBuilder
            .withPayload(event)
            .setHeader("configType", event.getConfigType())
            .setHeader("version", event.getVersion())
            .build();
        
        // 广播消息通常使用异步发送
        rocketMQTemplate.asyncSend("config-update-topic", message, 
            new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("配置更新广播发送成功");
                }
                
                @Override
                public void onException(Throwable throwable) {
                    log.error("配置更新广播发送失败", throwable);
                    // 配置更新很重要,需要重试
                    retryBroadcast(event);
                }
            }
        );
    }
    
    /**
     * 发送全量配置同步广播
     */
    public void broadcastFullConfigSync() {
        Map<String, Object> fullConfig = configService.getAllConfigs();
        
        // 对于大数据量的广播,可以考虑分片发送
        int chunkSize = 100; // 每片100条配置
        List<Map<String, Object>> chunks = splitConfigs(fullConfig, chunkSize);
        
        for (int i = 0; i < chunks.size(); i++) {
            ConfigChunkEvent chunk = new ConfigChunkEvent(i, chunks.size(), chunks.get(i));
            sendConfigChunk(chunk);
        }
    }
    
    private void sendConfigChunk(ConfigChunkEvent chunk) {
        rocketMQTemplate.syncSend("config-sync-topic", chunk);
    }
}

消费端:广播模式消费

@Component
@Slf4j
@RocketMQMessageListener(
    topic = "config-update-topic",
    consumerGroup = "config-update-consumer",
    messageModel = MessageModel.BROADCASTING,  // 广播模式
    selectorExpression = "*"
)
public class ConfigUpdateConsumer implements RocketMQListener<ConfigUpdateEvent> {
    
    @Value("${spring.application.name}")
    private String applicationName;
    
    @Value("${server.port}")
    private String serverPort;
    
    @Override
    public void onMessage(ConfigUpdateEvent event) {
        log.info("应用[{}:{}]接收到配置更新: {}", 
            applicationName, serverPort, event.getConfigType());
        
        try {
            // 1. 更新本地缓存
            configCache.refresh(event.getConfigType(), event.getConfigData());
            
            // 2. 记录更新日志
            logConfigUpdate(event);
            
            // 3. 通知相关组件
            notifyComponents(event);
            
            log.info("配置更新处理完成: {}", event.getConfigType());
            
        } catch (Exception e) {
            log.error("配置更新处理失败", e);
            // 广播模式下,一个实例失败不影响其他实例
            // 可以记录错误,供后续分析
            recordError(event, e);
        }
    }
    
    /**
     * 全量配置同步消费者
     */
    @RocketMQMessageListener(
        topic = "config-sync-topic",
        consumerGroup = "config-sync-consumer",
        messageModel = MessageModel.BROADCASTING,
        selectorExpression = "*"
    )
    public class ConfigSyncConsumer implements RocketMQListener<ConfigChunkEvent> {
        
        // 用于收集配置分片
        private Map<Integer, ConfigChunkEvent> chunkMap = new ConcurrentHashMap<>();
        
        @Override
        public void onMessage(ConfigChunkEvent chunk) {
            log.info("接收到配置分片: {}/{}", chunk.getChunkIndex() + 1, chunk.getTotalChunks());
            
            // 保存分片
            chunkMap.put(chunk.getChunkIndex(), chunk);
            
            // 检查是否收集完所有分片
            if (chunkMap.size() == chunk.getTotalChunks()) {
                try {
                    // 合并所有分片
                    Map<String, Object> fullConfig = mergeChunks(chunkMap);
                    
                    // 应用全量配置
                    applyFullConfig(fullConfig);
                    
                    log.info("全量配置同步完成,共{}条配置", fullConfig.size());
                    
                    // 清理分片缓存
                    chunkMap.clear();
                    
                } catch (Exception e) {
                    log.error("配置分片合并失败", e);
                }
            }
        }
        
        private Map<String, Object> mergeChunks(Map<Integer, ConfigChunkEvent> chunks) {
            Map<String, Object> fullConfig = new HashMap<>();
            for (int i = 0; i < chunks.size(); i++) {
                ConfigChunkEvent chunk = chunks.get(i);
                if (chunk != null) {
                    fullConfig.putAll(chunk.getConfigData());
                }
            }
            return fullConfig;
        }
    }
}

八、配套配置与最佳实践

8.1 统一配置类

rocketmq:
  name-server: 127.0.0.1:9876  # NameServer地址
  producer:
    group: ${spring.application.name}-producer-group  # 生产者组
    send-message-timeout: 3000  # 发送超时时间(ms)
    max-message-size: 5242880  # 最大消息大小(5MB)
    compress-message-body-threshold: 8192  # 消息压缩阈值
    retry-times-when-send-failed: 2  # 同步发送失败重试次数
    retry-times-when-send-async-failed: 2  # 异步发送失败重试次数
    retry-next-server: true  # 发送失败时是否重试其他Broker
    
  consumer:
    # 消费端优化
    pull-batch-size: 32  # 每次拉取消息数
    consume-thread-min: 10
    consume-thread-max: 64
    adjust-threadpool-nums-threshold: 100000  # 调整线程池阈值
    pull-threshold-for-queue: 1000  # 队列拉取阈值
    pull-interval: 0  # 拉取间隔,0表示不等待
@Configuration
@Slf4j
public class RocketMQConfig {
    
    /**
     * 生产环境配置
     */
    @Profile("prod")
    @Configuration
    public static class ProdConfig {
        
        @Bean
        public RocketMQTemplate rocketMQTemplate(
                @Value("${rocketmq.name-server}") String nameServer) {
            
            RocketMQTemplate template = new RocketMQTemplate();
            
            // 配置生产者
            DefaultMQProducer producer = new DefaultMQProducer("prod-producer-group");
            producer.setNamesrvAddr(nameServer);
            producer.setSendMsgTimeout(5000);
            producer.setRetryTimesWhenSendFailed(3);
            producer.setMaxMessageSize(1024 * 1024 * 4); // 4MB
            
            template.setProducer(producer);
            template.setMessageConverter(jacksonMessageConverter());
            
            return template;
        }
    }
    
    /**
     * 开发环境配置
     */
    @Profile("dev")
    @Configuration
    public static class DevConfig {
        
        @Bean
        public RocketMQTemplate rocketMQTemplate(
                @Value("${rocketmq.name-server:127.0.0.1:9876}") String nameServer) {
            
            RocketMQTemplate template = new RocketMQTemplate();
            
            DefaultMQProducer producer = new DefaultMQProducer("dev-producer-group");
            producer.setNamesrvAddr(nameServer);
            producer.setSendMsgTimeout(3000);
            
            template.setProducer(producer);
            
            return template;
        }
    }
    
    /**
     * 消息转换器配置
     */
    @Bean
    public MessageConverter jacksonMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.registerModule(new JavaTimeModule());
        objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        
        converter.setObjectMapper(objectMapper);
        converter.setSerializedPayloadClass(String.class);
        
        return converter;
    }
    
    /**
     * 监控配置
     */
    @Bean
    public MQMonitor mqMonitor(RocketMQTemplate rocketMQTemplate) {
        return new MQMonitor(rocketMQTemplate);
    }
}

8.2 监控和告警

@Component
@Slf4j
public class MQMonitor {
    
    private final RocketMQTemplate rocketMQTemplate;
    private final MeterRegistry meterRegistry;
    
    public MQMonitor(RocketMQTemplate rocketMQTemplate, MeterRegistry meterRegistry) {
        this.rocketMQTemplate = rocketMQTemplate;
        this.meterRegistry = meterRegistry;
        
        // 启动监控
        startMonitoring();
    }
    
    /**
     * 监控消息发送
     */
    public void monitorSend(String topic, long duration, boolean success) {
        // 记录指标
        meterRegistry.timer("rocketmq.send.duration", "topic", topic)
            .record(duration, TimeUnit.MILLISECONDS);
        
        meterRegistry.counter("rocketmq.send.total", "topic", topic)
            .increment();
        
        if (!success) {
            meterRegistry.counter("rocketmq.send.error", "topic", topic)
                .increment();
        }
    }
    
    /**
     * 监控消息消费
     */
    public void monitorConsume(String consumerGroup, String topic, long duration, boolean success) {
        meterRegistry.timer("rocketmq.consume.duration", 
                "consumerGroup", consumerGroup, "topic", topic)
            .record(duration, TimeUnit.MILLISECONDS);
        
        if (!success) {
            meterRegistry.counter("rocketmq.consume.error",
                    "consumerGroup", consumerGroup, "topic", topic)
                .increment();
        }
    }
    
    /**
     * 检查消息堆积
     */
    @Scheduled(fixedDelay = 60000) // 每分钟检查一次
    public void checkMessageBacklog() {
        // 实现消息堆积检查逻辑
        // 可以通过RocketMQ管理API或直接查询Broker获取堆积情况
        
        // 如果堆积超过阈值,发送告警
        if (backlogExceedsThreshold()) {
            sendAlert("消息堆积告警", getBacklogInfo());
        }
    }
}

总结

核心配套原则:

  1. 一对一匹配:发送和消费的Topic、Tag要匹配

  2. 序列化一致:发送端和消费端的消息序列化方式要一致

  3. 异常处理配套:发送失败的重试策略和消费失败的异常处理要协调

  4. 监控配套:发送和消费的监控指标要统一收集和分析

使用建议:

  • 普通消息:使用同步发送 + 并发消费,适合大多数业务场景

  • 顺序消息:使用同步顺序发送 + 顺序消费,保证消息处理顺序

  • 延迟消息:使用延迟发送 + 普通消费,实现定时任务

  • 事务消息:使用事务发送 + 事务监听器 + 最终消费者,保证分布式事务一致性

  • 批量消息:使用批量发送 + 批量消费,提高吞吐量

  • 过滤消息:使用属性发送 + SQL92过滤消费,实现精细化的消息路由

  • 广播消息:使用异步发送 + 广播消费,实现配置同步等场景

Logo

更多推荐