热爱技术,追求卓越
不断求索,精益求精

rocketmq安装以及和Spring Boot快速集成

rocketmq安装

以安装目前rocket最新版本4.9.4为例,我的安装目录是:/var/www/data/rocketMq,进入该目录执行如下命令,下载二进制版本:

curl -O https://dlcdn.apache.org/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip

解压:

unzip rocketmq-all-4.9.4-bin-release.zip

目录重命名:

mv rocketmq-all-4.9.4-bin-release rocketmq-4.9.4

在rocketmq-4.9.4目录下创建日志目录:

cd rocketmq-4.9.4
mkdir logs

在rocketmq-4.9.4目录下创建存储目录:

mkdir store

进入到存储目录store,创建commitlog、consumequeue、index目录

cd store
mkdir commitlog
mkdir consumequeue
mkdir index

我们以单点配置模式为例,进入到如下目录:

/var/www/data/rocketMq/rocketmq-4.9.4/conf/2m-2s-async

目录下的文件说明:

broker-a.properties  :单结点a配置文件
broker-a-s.properties :单结点a的从结点配置文件
broker-b.properties  :单结点b配置文件
broker-b-s.properties :单结点b的从结点配置文件

修改broker-a.properties的内容如下:

brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerId=0
#nameServer地址,分号分割
namesrvAddr=127.0.0.1:9876;
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
brokerIP1=127.0.0.1
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/var/www/data/rocketMq/rocketmq-4.9.4/store
#commitLog 存储路径
storePathCommitLog=/var/www/data/rocketMq/rocketmq-4.9.4/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/var/www/data/rocketMq/rocketmq-4.9.4/store/consumequeue
#消息索引存储路径
storePathIndex=/var/www/data/rocketMq/rocketmq-4.9.4/store/index
#checkpoint 文件存储路径
storeCheckpoint=/var/www/data/rocketMq/rocketmq-4.9.4/store/checkpoint
#abort 文件存储路径
abortFile=/var/www/data/rocketMq/rocketmq-4.9.4/store/abort
#限制的消息大小
maxMessageSize=65536
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH

进入conf 目录,替换所有xml中的${user.home},保证日志路径正确

sed -i ‘s#原字符串#新字符串#g’ 替换的文件

/var/www/data/rocketMq/rocketmq-4.9.4

修改 runbroker.sh,调整内存大小(这里只是演示用途,实际的内存大小要以生产环境为准)

找到 “JAVA_OPT=”${JAVA_OPT} -server -Xms8g -Xmx8g”,调整为512m

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"

修改 修改 runserver.sh,调整内存大小(这里只是演示用途,实际的内存大小要以生产环境为准)

找到 “JAVA_OPT=”${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m””,修改为

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

启动mqnamesrv

./mqnamesrv

启动mqbroker

./mqbroker -c /var/www/data/rocketMq/rocketmq-4.9.4/conf/2m-2s-async/broker-a.properties

由于是前台运行的模式启动,关闭mqnamesrv和mqbroker直接前台中断执行即可。若有后台启动和关闭可请用nohup命令。

nohup mqnamesrv >/dev/null 2>&1 &
nohup mqbroker -c /var/www/data/rocketMq/rocketmq-4.9.4/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &

参考:

https://mp.weixin.qq.com/s/ROyOh-SVBnssrqrG8FRGxw

Spring Boot快速集成RocketMQ

本文使用的是目前最新版本2.2.2,核心依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>

生产者模块pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.youyijiu</groupId>
    <version>0.0.1-SNAPSHOT</version>

    <artifactId>youyijiu-test-mq-producer</artifactId>

    <!-- 基于spring boot 2.6.4 -->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.4</version>
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <skipTests>true</skipTests>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- rocketMq集成 -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.2</version>
        </dependency>
    </dependencies>

    <!-- Package as an executable jar -->
    <build>
        <plugins>

            <!-- Compiler 插件, 设定JDK版本 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                    <showWarnings>true</showWarnings>
                </configuration>
            </plugin>

            <!--配置生成源码包 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-source-plugin</artifactId>
                <executions>
                    <execution>
                        <id>attach-sources</id>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

生产者模块application.properties内容:

##web端口
server.port=8080

rocketmq.name-server=localhost:9876
rocketmq.producer.group=my-group1
rocketmq.producer.sendMessageTimeout=300000

测试的MQ用到的常量

/**   
* @Title: MqTopicConstant.java 
* @Package com.youyijiu.mq.consumer 
* @author Jun.Yang 24696026@qq.com   
* @date 2022年8月13日 下午2:42:41    
*/
package com.youyijiu.mq.producer;

/** 
 * @ClassName: MqTopicConstant 
 * @Description: TODO(这里用一句话描述这个类的作用) 
 * @author Jun.Yang 24696026@qq.com 
 * @date 2022年8月13日 下午2:42:41 
 *  
 */
public class MqTopicConstant {

    /**
     * 示例消息队列,test-top-youyijiu-topic个
     */
    public static final String DEMO_TOPIC = "test-top-youyijiu-topic";

    /**
     * 注册tag
     */
    public static final String DEMO_TAG_REGISTERED = "registered";

    /**
     * 修改tag
     */
    public static final String DEMO_TAG_MODIFY = "modify";

    /**
     * consumer group
     */
    public static final String DEMO_CONSUMER_GROUP_REGISTERED = "test-top-youyijiu-group_registered";

    public static final String DEMO_CONSUMER_GROUP_MODIFY = "test-top-youyijiu-group_modify";

}

生产者发送测试

/**   
* @Title: TestApp.java 
* @Package com.youyijiu.mq 
* @author Jun.Yang 24696026@qq.com   
* @date 2022年8月13日 下午2:23:30    
*/
package com.youyijiu.mq;

import javax.annotation.Resource;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;

import com.youyijiu.mq.producer.MqTopicConstant;


/** 
 * @ClassName: TestApp 
 * @Description: TODO(这里用一句话描述这个类的作用) 
 * @author Jun.Yang 24696026@qq.com 
 * @date 2022年8月13日 下午2:23:30 
 *  
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class TestApp {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void send(){
        rocketMQTemplate.send(MqTopicConstant.DEMO_TOPIC + ":" + MqTopicConstant.DEMO_TAG_REGISTERED,
                MessageBuilder.withPayload("Hello, World! I'm from youyijiu-test-mq-producer 1 ").build());
        rocketMQTemplate.send(MqTopicConstant.DEMO_TOPIC + ":" + MqTopicConstant.DEMO_TAG_REGISTERED,
                MessageBuilder.withPayload("Hello, World! I'm from youyijiu-test-mq-producer 2 ").build());
        rocketMQTemplate.send(MqTopicConstant.DEMO_TOPIC + ":" + MqTopicConstant.DEMO_TAG_REGISTERED,
                MessageBuilder.withPayload("Hello, World! I'm from youyijiu-test-mq-producer 3 ").build());
    }

}

消费者消费

/**   
 * @Title: MqRegisteredListener.java 
 * @Package com.youyijiu.mq.consumer 
 * @author Jun.Yang 24696026@qq.com   
 * @date 2022年8月13日 下午2:34:03    
 */
package com.youyijiu.mq.consumer;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

/**
 * @ClassName: MqRegisteredListener
 * @Description: TODO(这里用一句话描述这个类的作用)
 * @author Jun.Yang 24696026@qq.com
 * @date 2022年8月13日 下午2:34:03
 * 
 */
@Service
@RocketMQMessageListener(topic = MqTopicConstant.DEMO_TOPIC, 
    consumerGroup = MqTopicConstant.DEMO_CONSUMER_GROUP_REGISTERED, 
    selectorExpression = MqTopicConstant.DEMO_TAG_REGISTERED)
public class MqRegisteredListener implements RocketMQListener<String> {

    private static final Logger log = LoggerFactory
            .getLogger(MqRegisteredListener.class);

    /*
     * (non-Javadoc)
     * 
     * @see
     * org.apache.rocketmq.spring.core.RocketMQListener#onMessage(java.lang.
     * Object)
     */
    @Override
    public void onMessage(String arg0) {
        log.info("received registered message: {}", arg0);
    }

}

上述的消费者和生产者只是简单测试下安装rocketMq是否可用,spring boot引入rocketMq变得非常方便。

参考:

Spring Boot快速集成RocketMQ实战教程

https://mp.weixin.qq.com/s/3LQLDT_X6q0EoFdjZVIaIg
赞(0)
未经允许不得转载:LoveCTO » rocketmq安装以及和Spring Boot快速集成

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

热爱技术 追求卓越 精益求精