1. 首页 > 电脑知识

掌握RocketMQ核心概念、架构部署、 操作应用和 高 质量特性 rocketmq架构

作者:admin 更新时间:2025-06-13
摘要:1.概述 全方位带你掌握RocketMQ的核心概念、部署方式、API使用和高级特性。 RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。 自诞生以来,Apache RocketMQ 凭借其简单的架构、丰富的业,掌握RocketMQ核心概念、架构部署、 操作应用和 高 质量特性 rocketmq架构

 

1.概述

全方位带你掌握RocketMQ的核心概念、部署方式、API使用和 高 质量特性。 RocketMQ 一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的 顶级开源项目,具有高性能、高可靠、高实时、分布式特点。 自诞生以来,Apache RocketMQ 凭借其简单的架构、 丰盛的业务功能和极高的可扩展性,已被企业开发人员和云供应商广泛采用。经过十多年的广泛场景打磨,RocketMQ 已成为金融级可靠业务消息的行业标准,广泛应用于互联网、大数据、移动互联网、物联网等领域。

2.核心概念

众所周知,RocketMQ是参考借鉴Kafka研发出来的, 因此很多核心 想法和概念几乎是一致的, 因此我这里不会一一讲述相关 智慧概念,主要重点讲述RocketMQ独有的概念 想法,在叙述之前先来看看RocketMQ的架构图,RocketMQ相关的组件都可以集群部署,比如说NameServer、broker、生产者、消费者等,broker集群还可以部署成一主多从,多主多从等模式,下面是一主多从的示例:

NameServer:可以认为它 一个轻量级注册中心,类似于zookeeper在kafka中的应用, 然而zookeeper相对比较重, 因此kafka都在去zookeeper化,NameServer比较轻量,主要用于对RocketMQ集群信息的管理,包broker、topic、message queue和路由信息`等等,可集群部署。每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,Consumer 也会定时获取 Topic 的路由信息。 Broker:消息中转角色,负责存储消息,转发消息。分为Master Broker和Slave Broker,一个Master Broker可以对应多个Slave Broker, 然而一个Slave Broker只能对应一个Master Broker。Broker启动后需要完成一次将自己注册至Name Server的操作;随后每隔30s定期向Name Server上报Topic路由信息

生产者:与Name Server集群中的其中一个节点(随机)建立长连接(Keep-alive),定期从Name Server读取Topic路由信息,并向提供Topic服务的Master Broker建立长连接,且定时向Master Broker发送心跳。

消费者:与Name Server集群中的其中一个节点(随机)建立长连接,定期从Name Server拉取Topic路由信息,并向提供Topic服务的Master Broker、Slave Broker建立长连接,且定时向Master Broker、Slave Broker发送心跳。Consumer既可以从Master Broker订阅消息,也可以从Slave Broker订阅消息,订阅 制度由Broker配置决定。

消费者组:消费者组是包含使用相同消费行为的消费者的负载均衡组。与作为运行实体的消费者不同,消费者组是逻辑资源。 RocketMQ 初始化消费者组中的多个消费者,以实现消费性能的扩展和高可用性灾难恢复。这和kafka中消费者组概念一样,主题topic被消费者组订阅,但实际上是消费者组中的消费者真正地消息消费。

主题: 主题在逻辑上是队列的 ;我们可以发布消息到主题或从主题接收消息。主题是消息的第一级分类

Tag:消息标签,消息的第二级分类。可以用来区分同一topic下的不同业务类型的消息,发送消息的时候也需要指定。消息标签是细粒度的消息分类属性,允许消息在主题级别 下面内容进行细分。消费者通过订阅特定标签来实现消息过滤。这是RocketMQ特有的功能。

队列(message queue) : 队列是RocketMQ 中用于存储和传输消息的容器,是消息存储的最小单位。RocketMQ 中的主题包含多个队列。这样队列支持水平分区和流式存储。 RocketMQ 的队列模型类似于 Kafka 的分区模型,虽然都是存储消息, 然而实现逻辑、使用玩法都是不尽相同的,这在之前分享的文章中有详细 拓展资料,可自行查看

消息: 消息是RocketMQ 中数据传输的最小单位。生产者将业务数据的负载和扩展属性封装成消息,并将消息发送到RocketMQ 代理。 接着,代理根据相关语义将消息传递给消费者。RocketMQ有 下面内容几种消息类型:

普通:普通消息不需要 独特语义,也不与其他普通消息相关联。 FIFO:RocketMQ 使用消息组来确定一组指定消息的顺序。消息按发送顺序传递。 延迟: 无论兄弟们可以指定延迟,使消息仅在延迟 时刻过去后才对消费者可用,而不是在生产时立即传递消息。 事务:RocketMQ 支持分布式事务消息,并确保数据库更新和消息调用的事务一致性

后面三个概念都是RocketMQ中区别于kafka的不同逻辑 智慧点,需要重点已关注下。

3.环境搭建

为了简单直接,快速上手,我们这里使用单机模式部署,首先要求服务器装有jdk1.8+环境, 由于RocketMQ是Java开发的。 这里我按照官网教程安装当前最新版本RocketMQ5.2.0版本,先下载安装包rocketmq-all-5.2.0-bin-release.zip上传服务器, 解压安装包: python 代码解读 代码unzip rocketmq-all-5.2.0-bin-release.zip 

查看安装包: csharp 代码解读 代码bench rk  bin  conf  lib  LICENSE  nohup.out  NOTICE  README.md

启动NameServer: nohup sh bin/mqnamesrv &

验证是否启动成功: tail -f ~/logs/rocketmqlogs/namesrv.log

日志输出打印: 2025-06-05 15:56:10 INFO in – The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876

启动 Broker 和 Proxy ohup sh bin/mqbroker -n localhost:9876 –enable-proxy &

验证是否启动成功: tail -f ~/logs/rocketmqlogs/proxy.log

日志输出: 2025-06-05 16:06:42 INFO in – The broker[broker-a, 192.168.231.137:10911] boot success. serializeType=JSON and name server is localhost:9876

按照上面的流程,RocketMQ已经安装好,我们就可以开始发送、消费消息了, 然而为了清楚直观,我们再安装一个RocketMQ 可视化控制台,教程:rocketmq.apache.ac.cn/docs/deploy…,比较简单,这里碍于篇幅 难题就不赘述了。同时关于集群模式部署教程,也请参考官网教程:rocketmq.apache.ac.cn/docs/deploy… 安装好之后访问路径:http://10.10.0.10:8021,这里我改成端口号为8021,防止默认的端口号8080冲突

4.集成客户端发送、消费消息

Java项目中添加依赖:当前最新版本5.0.7  <dependency>       <groupId>org.apache.rocketmq</groupId>       <artifactId>rocketmq-client-java</artifactId>       <version>5.0.7</version>   </dependency>

4.1 普通消息

普通消息是指在RocketMQ 中没有 独特功能的消息 在控制台先创建普调消息类型的主题topic_nor l_test:

编写生产者发送消息代码: public class ProducerNor lExample { ​     private static final Logger logger = LoggerFactory.getLogger(ProducerNor lExample.class); ​     public static void in(String[] args) throws ClientException, IOException {         // 代理地址         String endpoint = “10.10.0.10:8081”;         // 主题名称 主题需要提前创建         String topic = “topic_nor l_test”;         ClientServiceProvider provider = ClientServiceProvider.loadService();         // 客户端配置         ClientConfiguration configuration = ClientConfiguration.newBuilder()                 .setEndpoints(endpoint)                 .build();         // 创建生产者         Producer producer = provider.newProducerBuilder()                 .setClientConfiguration(configuration)                 .build();         // 构建消息         Message message = provider.newMessageBuilder()                 // 设置消息发送到的主题                 .setTopic(topic)                 // 消息key,方便你后续根据消息key快速查找和定位消息,一般用消息体里面的主键 比如说订单id或者用户id                 .setKeys(“key-1”)                 // 标签,消息的二级分类,消费者可以标签过滤消费特定消息                 .setTag(“tag-A”)                 // 消息体                 .setBody(“哈哈