博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RocketMQ的安装与配置
阅读量:6441 次
发布时间:2019-06-23

本文共 9029 字,大约阅读时间需要 30 分钟。

hot3.png

RocketMQ的安装与配置

RocketMQ的安装

下载地址:https://github.com/apache/rocketmq

➜  incubator-rocketmq git:(master) pwd/Users/xinxingegeya/workspace-github/incubator-rocketmq➜  incubator-rocketmq git:(master)

下载的是 rocketmq的master分支。

然后执行mvn install,并指定-P参数(相应的profile)

➜  incubator-rocketmq git:(master) mvn clean package install -Prelease-all -Dmaven.test.skip=true -U

使用-U参数:该参数能强制让Maven检查所有SNAPSHOT依赖更新,确保集成基于最新的状态,如果没有该参数,Maven默认以天为单位检查更新,而持续集成的频率应该比这高很多。

进入distribution/target目录,

➜  incubator-rocketmq git:(master) cd distribution/target/apache-rocketmq➜  apache-rocketmq git:(master) lltotal 56-rw-r--r--   1 xinxingegeya  staff   524B  2 23 14:40 DISCLAIMER-rw-r--r--   1 xinxingegeya  staff    15K  6  8 10:35 LICENSE-rw-r--r--   1 xinxingegeya  staff   1.3K  6  8 10:35 NOTICE-rw-r--r--   1 xinxingegeya  staff   2.4K  6  8 10:35 README.mddrwxr-xr-x  36 xinxingegeya  staff   1.2K  6 13 18:27 bindrwxr-xr-x  10 xinxingegeya  staff   340B  6  8 10:35 confdrwxr-xr-x  25 xinxingegeya  staff   850B  6 13 18:27 lib

运行Name Server

➜  apache-rocketmq-all git:(master) ✗ nohup sh bin/mqnamesrv &[1] 43055appending output to nohup.out➜  apache-rocketmq-all git:(master) ✗ tail -f ~/logs/rocketmqlogs/namesrv.log2017-05-15 14:07:28 INFO main - serverOnewaySemaphoreValue=2562017-05-15 14:07:28 INFO main - serverAsyncSemaphoreValue=642017-05-15 14:07:28 INFO main - serverChannelMaxIdleTimeSeconds=1202017-05-15 14:07:28 INFO main - serverSocketSndBufSize=40962017-05-15 14:07:28 INFO main - serverSocketRcvBufSize=40962017-05-15 14:07:28 INFO main - serverPooledByteBufAllocatorEnable=true2017-05-15 14:07:28 INFO main - useEpollNativeSelector=false2017-05-15 14:07:28 INFO main - load KV config table OK2017-05-15 14:07:28 INFO NettyEventExecuter - NettyEventExecuter service started2017-05-15 14:07:28 INFO main - The Name Server boot success. serializeType=JSON

运行Broker

➜  apache-rocketmq-all git:(master) ✗ nohup sh bin/mqbroker -n localhost:9876 &[2] 43087appending output to nohup.out➜  apache-rocketmq-all git:(master) ✗ tail -f ~/logs/rocketmqlogs/broker.log2017-05-15 14:08:25 INFO main - load exist subscription group, SubscriptionGroupConfig [groupName=FooBarGroup, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]2017-05-15 14:08:25 INFO main - load exist subscription group, SubscriptionGroupConfig [groupName=TOOLS_CONSUMER, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]2017-05-15 14:08:25 INFO main - load exist subscription group, SubscriptionGroupConfig [groupName=CID_ONS-HTTP-PROXY, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]2017-05-15 14:08:25 INFO main - load exist subscription group, SubscriptionGroupConfig [groupName=FILTERSRV_CONSUMER, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]2017-05-15 14:08:25 INFO main - load exist subscription group, SubscriptionGroupConfig [groupName=CID_ONSAPI_PULL, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]2017-05-15 14:08:25 INFO main - load /Users/xinxingegeya/store/config/subscriptionGroup.json OK2017-05-15 14:08:25 INFO main - Set user specified name server address: localhost:98762017-05-15 14:08:25 INFO PullRequestHoldService - PullRequestHoldService service started2017-05-15 14:08:26 INFO main - register broker to name server localhost:9876 OK2017-05-15 14:08:26 INFO main - The broker[Yale-Li, 10.99.24.152:10911] boot success. serializeType=JSON and name server is localhost:98762017-05-15 14:08:35 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes2017-05-15 14:08:35 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes2017-05-15 14:08:36 INFO BrokerControllerScheduledThread1 - register broker to name server localhost:9876 OK

关闭命令如下,

> sh bin/mqshutdown brokerThe mqbroker(36695) is running...Send shutdown request to mqbroker(36695) OK> sh bin/mqshutdown namesrvThe mqnamesrv(36664) is running...Send shutdown request to mqnamesrv(36664) OK

 

RocketMQ Console 的安装

下载地址:https://github.com/apache/rocketmq-externals

mvn spring-boot:run

执行该命令后,通过浏览器打开控制台,默认端口是8080

 

简单的 Producer 和 Consumer

生产者

package com.rocketmq.demo;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;/** * Created by xinxingegeya on 2017/5/13. */public class Producer {    public static void main(String[] args) throws MQClientException, InterruptedException {        DefaultMQProducer producer = new DefaultMQProducer("mytest_producer_group_name");        producer.setNamesrvAddr("127.0.0.1:9876");        producer.start();        for (int i = 0; i < 1000; i++) {            try {                // Create a message instance, specifying topic, tag and message body.                Message msg = new Message(                        "TopicTest1" /* Topic */,                        "TagA" /* Tag */,                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */                );                //Call send message to deliver message to one of brokers.                SendResult sendResult = producer.send(msg);                System.out.printf("%s%n", sendResult);            } catch (Exception e) {                e.printStackTrace();                Thread.sleep(1000);            }        }        // Shut down once the producer instance is not longer in use.        producer.shutdown();    }}

消费者

package com.rocketmq.demo;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/** * Created by xinxingegeya on 2017/5/15. */public class Consumer {    /**     * push模式是由consumer把轮询过程封装.     * 并注册MessageListener监听器,取到消息后调用MessageListener的consumeMessage()来消费,     * 对用户而言,感觉消息是被推送过来的.     *     * @param args     * @throws MQClientException     */    public static void main(String args[]) throws MQClientException {        /**         * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例         * 注意:ConsumerGroupName需要由应用来保证唯一         */        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("mytest_consumer_group_name");        consumer.setNamesrvAddr("127.0.0.1:9876");        consumer.setInstanceName("Consumer");        /**         * 订阅指定topic下tags分别等于TagA或TagC或TagD         */        consumer.subscribe("TopicTest1", "TagA || TagC || TagD");        /**         * 订阅指定topic下所有消息         * 注意:一个consumer对象可以订阅多个topic         */        consumer.subscribe("TopicTest2", "*");        consumer.registerMessageListener(new MessageListenerConcurrently() {            /**             * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息             */            public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size()); MessageExt msg = msgs.get(0); if (msg.getTopic().equals("TopicTest1")) { // 执行TopicTest1的消费逻辑 if (msg.getTags() != null && msg.getTags().equals("TagA")) { // 执行TagA的消费 System.out.println(new String(msg.getBody())); } else if (msg.getTags() != null && msg.getTags().equals("TagC")) { // 执行TagC的消费 } else if (msg.getTags() != null && msg.getTags().equals("TagD")) { // 执行TagD的消费 } } else if (msg.getTopic().equals("TopicTest2")) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer对象在使用之前必须要调用start初始化,初始化一次即可 */ consumer.start(); System.out.println("Consumer Started."); }}

============END============

转载于:https://my.oschina.net/xinxingegeya/blog/900162

你可能感兴趣的文章
正则表达式,替换所有HTML标签的简单实
查看>>
redis-布隆过滤器
查看>>
Object.hashCode()方法与System.identityHashCode(object)的区别
查看>>
vue的todolist -- 增删改查
查看>>
Go圣经-学习笔记入门
查看>>
微软工程师:构建强大的实时流式应用选择Apache Calcite
查看>>
混合云场景下容器技术在新能源功率预测产品中的最佳实践
查看>>
/etc/security/limits.conf的相关说明
查看>>
在docker中使用mysql数据库,在局域网访问
查看>>
10个最佳Node.js企业应用案例:从Uber到LinkedIn
查看>>
XML的解析方式
查看>>
HTTP各版本比较
查看>>
StringUtils.isEmpty和StringUtils.isBlank用法
查看>>
JSON学习
查看>>
echo和Shell特殊变量:Shell $0, $#, $*, $@, $?, $$和命令行参数
查看>>
VC++设置远程调试
查看>>
11-1 11 LAMP复习 安装
查看>>
Android调用JS,带传参到JS需要注意的点
查看>>
SpringMVC--纯净版框架整合配置
查看>>
深入解析php中的foreach问题
查看>>