rocketmq使用示例

  • spring cloud中集成rocketmq

{———-}

spring cloud中集成rocketmq

代码地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 接收bean 将消息发送到spring event事件中进行处理 解耦数据处理
* 屏蔽rockemq的依赖性 可以不影响业务转换别的消息组件
*/
@Component
public class ConsumerMessageListener implements MessageListener {

private static final Logger logger = LoggerFactory.getLogger(ConsumerMessageListener.class);
@Autowired
ApplicationContext applicationContext;

public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
try {
logger.info("rocket 接收到消息:{}",message);
//do something..
applicationContext.publishEvent(new RocketmqEvent(message));

return Action.CommitMessage; //消息处理正常
} catch (Exception e) {
//消费失败
logger.error("rocket 消息处理失败 :{}",message);
return Action.ReconsumeLater;//消息加入重试
}
}

/**
* 接收示例
*/

@EventListener(condition = "#event.topic=='your.topic' && #event.tag=='your.tag'")
public void MessageListener(RocketmqEvent rocketmqEvent) throws IOException {
String string = rocketmqEvent.getMsg();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 发送bean类
* 引用统一发送 解耦数据处理
* 屏蔽rockemq的依赖性 可以不影响业务转换别的消息组件
*/
@Component
public class MessageQueueTemplate {

@Autowired
ProducerBean producerBean;

public void send(RocketmqEvent rocketmqEvent){

Message message = new Message();
message.setTopic(rocketmqEvent.getTopic());
message.setTag(rocketmqEvent.getTag());
message.setBody(rocketmqEvent.getMsg().getBytes());
producerBean.send(message);
}
}