rocketmq使用示例 发表于 2018-12-08 | 更新于 2019-09-03 | 分类于 java , message-queue , rocketmq | 阅读次数: spring cloud中集成rocketmq {———-} spring cloud中集成rocketmq代码地址 12345678910111213141516171819202122232425262728293031323334/** * 接收bean 将消息发送到spring event事件中进行处理 解耦数据处理 * 屏蔽rockemq的依赖性 可以不影响业务转换别的消息组件 */@Componentpublic class ConsumerMessageListener implements MessageListener { private static final Logger logger = LoggerFactory.getLogger(ConsumerMessageListener.class); @Autowired ApplicationContext applicationContext; public Action consume(Message message, ConsumeContext context) { System.out.println("Receive: " + message); try { logger.info("rocket 接收到消息:{}",message); //do something.. applicationContext.publishEvent(new RocketmqEvent(message)); return Action.CommitMessage; //消息处理正常 } catch (Exception e) { //消费失败 logger.error("rocket 消息处理失败 :{}",message); return Action.ReconsumeLater;//消息加入重试 } } /** * 接收示例 */ @EventListener(condition = "#event.topic=='your.topic' && #event.tag=='your.tag'") public void MessageListener(RocketmqEvent rocketmqEvent) throws IOException { String string = rocketmqEvent.getMsg(); } 1234567891011121314151617181920/** * 发送bean类 * 引用统一发送 解耦数据处理 * 屏蔽rockemq的依赖性 可以不影响业务转换别的消息组件 */@Componentpublic class MessageQueueTemplate { @Autowired ProducerBean producerBean; public void send(RocketmqEvent rocketmqEvent){ Message message = new Message(); message.setTopic(rocketmqEvent.getTopic()); message.setTag(rocketmqEvent.getTag()); message.setBody(rocketmqEvent.getMsg().getBytes()); producerBean.send(message); }} 相关文章 rocketmq图解 rocketmq安装 rocketmq简介 赏了 微信支付 支付宝