掌握RocketMQ核心概念、架构部署、 操作应用和 高 质量特性 rocketmq架构
1.概述
全方位带你掌握RocketMQ的核心概念、部署方式、API使用和 高 质量特性。 RocketMQ 一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的 顶级开源项目,具有高性能、高可靠、高实时、分布式特点。 自诞生以来,Apache RocketMQ 凭借其简单的架构、 丰盛的业务功能和极高的可扩展性,已被企业开发人员和云供应商广泛采用。经过十多年的广泛场景打磨,RocketMQ 已成为金融级可靠业务消息的行业标准,广泛应用于互联网、大数据、移动互联网、物联网等领域。
2.核心概念
众所周知,RocketMQ是参考借鉴Kafka研发出来的, 因此很多核心 想法和概念几乎是一致的, 因此我这里不会一一讲述相关 智慧概念,主要重点讲述RocketMQ独有的概念 想法,在叙述之前先来看看RocketMQ的架构图,RocketMQ相关的组件都可以集群部署,比如说NameServer、broker、生产者、消费者等,broker集群还可以部署成一主多从,多主多从等模式,下面是一主多从的示例:
NameServer:可以认为它 一个轻量级注册中心,类似于zookeeper在kafka中的应用, 然而zookeeper相对比较重, 因此kafka都在去zookeeper化,NameServer比较轻量,主要用于对RocketMQ集群信息的管理,包broker、topic、message queue和路由信息`等等,可集群部署。每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,Consumer 也会定时获取 Topic 的路由信息。 Broker:消息中转角色,负责存储消息,转发消息。分为Master Broker和Slave Broker,一个Master Broker可以对应多个Slave Broker, 然而一个Slave Broker只能对应一个Master Broker。Broker启动后需要完成一次将自己注册至Name Server的操作;随后每隔30s定期向Name Server上报Topic路由信息
生产者:与Name Server集群中的其中一个节点(随机)建立长连接(Keep-alive),定期从Name Server读取Topic路由信息,并向提供Topic服务的Master Broker建立长连接,且定时向Master Broker发送心跳。
消费者:与Name Server集群中的其中一个节点(随机)建立长连接,定期从Name Server拉取Topic路由信息,并向提供Topic服务的Master Broker、Slave Broker建立长连接,且定时向Master Broker、Slave Broker发送心跳。Consumer既可以从Master Broker订阅消息,也可以从Slave Broker订阅消息,订阅 制度由Broker配置决定。
消费者组:消费者组是包含使用相同消费行为的消费者的负载均衡组。与作为运行实体的消费者不同,消费者组是逻辑资源。 RocketMQ 初始化消费者组中的多个消费者,以实现消费性能的扩展和高可用性灾难恢复。这和kafka中消费者组概念一样,主题topic被消费者组订阅,但实际上是消费者组中的消费者真正地消息消费。
主题: 主题在逻辑上是队列的 ;我们可以发布消息到主题或从主题接收消息。主题是消息的第一级分类
Tag:消息标签,消息的第二级分类。可以用来区分同一topic下的不同业务类型的消息,发送消息的时候也需要指定。消息标签是细粒度的消息分类属性,允许消息在主题级别 下面内容进行细分。消费者通过订阅特定标签来实现消息过滤。这是RocketMQ特有的功能。
队列(message queue) : 队列是RocketMQ 中用于存储和传输消息的容器,是消息存储的最小单位。RocketMQ 中的主题包含多个队列。这样队列支持水平分区和流式存储。 RocketMQ 的队列模型类似于 Kafka 的分区模型,虽然都是存储消息, 然而实现逻辑、使用玩法都是不尽相同的,这在之前分享的文章中有详细 拓展资料,可自行查看
消息: 消息是RocketMQ 中数据传输的最小单位。生产者将业务数据的负载和扩展属性封装成消息,并将消息发送到RocketMQ 代理。 接着,代理根据相关语义将消息传递给消费者。RocketMQ有 下面内容几种消息类型:
普通:普通消息不需要 独特语义,也不与其他普通消息相关联。 FIFO:RocketMQ 使用消息组来确定一组指定消息的顺序。消息按发送顺序传递。 延迟: 无论兄弟们可以指定延迟,使消息仅在延迟 时刻过去后才对消费者可用,而不是在生产时立即传递消息。 事务:RocketMQ 支持分布式事务消息,并确保数据库更新和消息调用的事务一致性
后面三个概念都是RocketMQ中区别于kafka的不同逻辑 智慧点,需要重点已关注下。
3.环境搭建
为了简单直接,快速上手,我们这里使用单机模式部署,首先要求服务器装有jdk1.8+环境, 由于RocketMQ是Java开发的。 这里我按照官网教程安装当前最新版本RocketMQ5.2.0版本,先下载安装包rocketmq-all-5.2.0-bin-release.zip上传服务器, 解压安装包: python 代码解读 代码unzip rocketmq-all-5.2.0-bin-release.zip
查看安装包: csharp 代码解读 代码bench rk bin conf lib LICENSE nohup.out NOTICE README.md
启动NameServer: nohup sh bin/mqnamesrv &
验证是否启动成功: tail -f ~/logs/rocketmqlogs/namesrv.log
日志输出打印: 2025-06-05 15:56:10 INFO in – The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876
启动 Broker 和 Proxy ohup sh bin/mqbroker -n localhost:9876 –enable-proxy &
验证是否启动成功: tail -f ~/logs/rocketmqlogs/proxy.log
日志输出: 2025-06-05 16:06:42 INFO in – The broker[broker-a, 192.168.231.137:10911] boot success. serializeType=JSON and name server is localhost:9876
按照上面的流程,RocketMQ已经安装好,我们就可以开始发送、消费消息了, 然而为了清楚直观,我们再安装一个RocketMQ 可视化控制台,教程:rocketmq.apache.ac.cn/docs/deploy…,比较简单,这里碍于篇幅 难题就不赘述了。同时关于集群模式部署教程,也请参考官网教程:rocketmq.apache.ac.cn/docs/deploy… 安装好之后访问路径:http://10.10.0.10:8021,这里我改成端口号为8021,防止默认的端口号8080冲突
4.集成客户端发送、消费消息
Java项目中添加依赖:当前最新版本5.0.7 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>5.0.7</version> </dependency>
4.1 普通消息
普通消息是指在RocketMQ 中没有 独特功能的消息 在控制台先创建普调消息类型的主题topic_nor l_test:
编写生产者发送消息代码: public class ProducerNor lExample { private static final Logger logger = LoggerFactory.getLogger(ProducerNor lExample.class); public static void in(String[] args) throws ClientException, IOException { // 代理地址 String endpoint = “10.10.0.10:8081”; // 主题名称 主题需要提前创建 String topic = “topic_nor l_test”; ClientServiceProvider provider = ClientServiceProvider.loadService(); // 客户端配置 ClientConfiguration configuration = ClientConfiguration.newBuilder() .setEndpoints(endpoint) .build(); // 创建生产者 Producer producer = provider.newProducerBuilder() .setClientConfiguration(configuration) .build(); // 构建消息 Message message = provider.newMessageBuilder() // 设置消息发送到的主题 .setTopic(topic) // 消息key,方便你后续根据消息key快速查找和定位消息,一般用消息体里面的主键 比如说订单id或者用户id .setKeys(“key-1”) // 标签,消息的二级分类,消费者可以标签过滤消费特定消息 .setTag(“tag-A”) // 消息体 .setBody(“哈哈