2.1 环境搭建
依赖工具
JDK :1.8+
Maven
IntelliJ IDEA
2.1.1 源码拉取
从官方仓库 https://github.com/apache/rocketmq clone
或者download
源码。
源码目录结构:
broker: broker 模块(broke 启动进程)
client :消息客户端,包含消息生产者、消息消费者相关类
common :公共包
dev :开发者信息(非源代码)
distribution :部署实例文件夹(非源代码)
example: RocketMQ 例代码
filter :消息过滤相关基础类
filtersrv:消息过滤服务器实现相关类(Filter启动进程)
logappender:日志实现相关类
namesrv:NameServer实现相关类(NameServer启动进程)
openmessageing:消息开放标准
remoting:远程通信模块,给予Netty
srcutil:服务工具类
store:消息存储实现相关类
style:checkstyle相关实现
test:测试相关类
tools:工具类,监控命令相关实现类
2.1.2 导入IDEA
执行安装
clean install -Dmaven.test.skip=true
2.1.3 调试
创建conf
配置文件夹,从distribution
拷贝broker.conf
和logback_broker.xml
和logback_namesrv.xml
启动NameServer
展开namesrv模块,右键NamesrvStartup.java
配置ROCKETMQ_HOME
重新启动
控制台打印结果
The Name Server boot success. serializeType=JSON
启动Broker
broker.conf
配置文件内容
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
# namesrvAddr地址
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable=true
# 存储路径
storePathRootDir=E:\\RocketMQ\\data\\rocketmq\\dataDir
# commitLog路径
storePathCommitLog=E:\\RocketMQ\\data\\rocketmq\\dataDir\\commitlog
# 消息队列存储路径
storePathConsumeQueue=E:\\RocketMQ\\data\\rocketmq\\dataDir\\consumequeue
# 消息索引存储路径
storePathIndex=E:\\RocketMQ\\data\\rocketmq\\dataDir\\index
# checkpoint文件路径
storeCheckpoint=E:\\RocketMQ\\data\\rocketmq\\dataDir\\checkpoint
# abort文件存储路径
abortFile=E:\\RocketMQ\\data\\rocketmq\\dataDir\\abort
创建数据文件夹
dataDir
启动
BrokerStartup
,配置broker.conf
和ROCKETMQ_HOME
发送消息
进入example模块的
org.apache.rocketmq.example.quickstart
指定Namesrv地址
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
运行
main
方法,发送消息
消费消息
进入example模块的
org.apache.rocketmq.example.quickstart
指定Namesrv地址
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("127.0.0.1:9876");
运行
main
方法,消费消息