本文主要介绍什么是RocketMQ
RocketMQ是一个低延时、高可靠、可伸缩、易于使用的分布式消息中间件,是由阿里巴巴开源捐献给Apache的顶级项目。RocketMQ具有高吞吐、低延迟、海量消息堆积等优点,同时提供顺序消息、事务消息、定时消息、消息重试于追踪等功能,非常适合在电商、金融等领域使用。
RocketMQ的应用场景
RocketMQ的应用场景如下:
- 削峰填谷:诸如秒杀、抢红包等大型活动皆会带来较高的流量脉冲,很可能因为没有做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,RocketMQ可提供削峰填谷的服务来解决这些问题。
- 异步解耦:交易系统作为淘宝、天猫主站最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等,整体业务系统庞大而且复杂,RocketMQ可实现异步通信和应用解耦,确保主站业务的连续性。
- 顺序收发:细数一下,日常需要保证顺序的应用场景非常多,例如证券交易过程中的时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等,与先进先出原理类似,RocketMQ提供的顺序消息即保证消息的FIFO。
- 分布式事务一致性:交易系统、红包等场景需要确保数据的最终一致性,大量引入RocketMQ的分布式事务,即可以实现系统之间的解耦,又可以保证最终的数据一致性。
- 大数据分析:数据在”流动”中产生价值,传统数据分析大都基于批量计算模型,无法做到实时的数据分析,利用RocketMQ与流式计算引擎相结合,可以很方便地实现对业务数据进行实时分析。
- 分布式缓存同步:电商促销的时候商品需要实时感知价格的变化,大量并发访问会导致页面响应时间长,集中式缓存因为带宽瓶颈限制商品变更的访问流量,通过RocketMQ构建分布式缓存,可实时通知商品数据的变化。
安装RocketMQ(单机版)
- 安装JDK
- 从官网下载编译好的安装包

- 解压压缩包
- 进入bin目录,启动namesrv,启动NameServer。默认情况下NameServer监听的端口是9876。
tail -f ~/logs/rocketmqlogs/namesrv.log 可以查看启动日志

- 启动消息服务器Broker,指定NameServer的IP地址和端口。默认情况下会加载conf/broker.conf
输入 tail -f ~/logs/rocketmqlogs/broker.log 查看日志
如果 tail -f ~/logs/rocketmqlogs/broker.log 提示找不到文件,则打开当前目录下的 nohup.out
日志文件查看,出现如下日志表示启动失败,提示内存无法分配

内存不足的问题
这是因为bin 目录下启动 nameserv 与 broker 的 runbroker.sh 和 runserver.sh 文件中默认分配的内
存太大,rocketmq比较耗内存,所以默认分配的内存比较大,而系统实际内存却太小导致启动失败,
通常像虚拟机上安装的 CentOS 服务器内存可能是没有高的,只能调小。实际中应该根据服务器内存情况,配置一个合适的值。
解决办法
修改runbroker.sh和runserver.sh
1 2 3 4 5 6
| JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512g" Xms 是指设定程序启动时占用内存大小。一般来讲,大点,程序会启动的快一点,但是也可能会导致机器暂时 间变慢。 Xmx 是指设定程序运行期间最大可占用的内存大小。如果程序运行需要占用更多的内存,超出了这个设置值, 就会抛出OutOfMemory异常。 xmn 年轻代的heap大小,一般设置为Xmx的3、4分之一
|
停止服务
【sh mqshutdown broker】 //停止 brokersh
【sh mqshutdown namesrv】 //停止 nameserver
停止服务的时候需要注意,要先停止broker,其次停止nameserver。
broker.conf文件
默认情况下,启动broker会加载conf/broker.conf文件,这个文件里面就是一些常规的配置信息
namesrvAddr //nameserver地址
brokerCl usterName //Cluster名称,如果集群机器数比较多,可以分成多个cluster,每个cluster提供
给不同的业务场景使用
brokerName //broker名称,如果配置主从模式,master和slave需要配置相同的名称来表名关系
brokerId=0 //在主从模式中,一个master broker可以有多个slave,0表示master,大于0表示不同
slave的id
brokerRole=SYNC_MASTER/ASYNC_MASTER/SLAVE ; 同步表示slave和master消息同步完成后再返回
信息给客户端
autoCreateTopicEnable = true ; topic不存在的情况下自动创建
RocketMQ如何发送消息
Spring Cloud Alibaba已集成RocketMQ,使用Spring Cloud Stream可以对RocketMQ发送和接收消息。
- 在pom.xml中引入jar包
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-stream-binder-rocketmq</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-test-support</artifactId> <scope>test</scope> </dependency> </dependencies>
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>2.3.0.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>2.1.1.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
|
- 配置application.properties
1 2 3 4
| server.port=8080 spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876 spring.cloud.stream.bindings.output.destination=TopicTest spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
|
- 使用Binder发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @SpringBootApplication @EnableBinding({Source.class}) public class RocketmqDemoApplication {
public static void main(String[] args) { SpringApplication.run(RocketmqDemoApplication.class, args); } }
@RestController public class SendController { @Autowired private Source source;
@GetMapping("/send") public String send(String msg) { MessageBuilder builder = MessageBuilder.withPayload(msg); Message message = builder.build(); source.output().send(message); return "Hello RocketMQ, Send " + msg; } }
|
@EnableBinding({Source.class})表示绑定配置文件中名称为output的消息通道Binding,Source类中定义的消息通道名称为output。发送http请求http://localhost:8080/send?msg=test将消息发送到RocketMQ中。
实际项目中会存在多个发送消息通道,可以自定义消息通道的名称,参考Source类自定义一个接口,修改通道名称和相关配置即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public interface OrderSource { String OUTPUT = "orderSourcec"; @Output(OrderSource.OUTPUT) MessageChannel output(); }
@SpringBootApplication @EnableBinding({Source.class, OrderSource.class}) public class RocketmqDemoApplication {
public static void main(String[] args) { SpringApplication.run(RocketmqDemoApplication.class, args); } }
server.port=8080 spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876 spring.cloud.stream.bindings.output.destination=TopicTest spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
spring.cloud.stream.bindings.orderOutput.destination=TopicOrder spring.cloud.stream.rocketmq.bindings.orderOutput.producer.group=order-group
|
RocketMQ如何消费消息
- 引入相关依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-stream-binder-rocketmq</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> </dependencies>
|
- 配置application.properties
1 2 3 4
| server.port=8081 spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876 spring.cloud.stream.bindings.output.destination=TopicTest spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
|
- 定义消息监听
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @EnableBinding({Sink.class}) @SpringBootApplication public class App { public static void main( String[] args ) { SpringApplication.run(App.class); } @StreamListener(Sink.INPUT) public void receive(String msg) { System.out.println("TopicTest receive: " + msg + ", receiveTime= " + System.currentTimeMillis()); } }
|
@EnableBinding({Sink.class})表示绑定配置文件中名称为input的消息通道Binding,Sink类中定义的消息通道的名称为input,@StreamListener表示定义一个消息监听器,接收RocketMQ中的消息。
实际项目中会存在多个接收消息的通道,可以自定义消息通道的名称,参考Sink类自定义一个接口,修改通道名称和相关配置即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public interface InputChannel {
String USER_INPUT = "userInput"; String ORDER_INPUT = "orderInput";
@Input(InputChannel.USER_INPUT) SubscribableChannel userInput();
@Input(InputChannel.ORDER_INPUT) SubscribableChannel orderInput(); }
@EnableBinding({Sink.class, InputChannel.class}) @SpringBootApplication public class App { public static void main( String[] args ) { SpringApplication.run(App.class); }
@StreamListener(Sink.INPUT) public void receive(String msg) { System.out.println("TopicTest receive: " + msg + ", receiveTime= " + System.currentTimeMillis()); }
@StreamListener(InputChannel.ORDER_INPUT) public void receiveOrderInput(String msg) { System.out.println(" receive: " + msg + ", receiveTime= " + System.currentTimeMillis()); } }
server.port=8081 spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876 spring.cloud.stream.bindings.input.destination=TopicTest spring.cloud.stream.rocketmq.bindings.input.producer.group=demo-group
spring.cloud.stream.bindings.orderInput.destination=TopicOrder spring.cloud.stream.rocketmq.bindings.orderInput.producer.group=order-group
|