RocketMQ的安装与配置
RocketMQ的安装
下载地址:https://github.com/apache/rocketmq
➜ incubator-rocketmq git:(master) pwd/Users/xinxingegeya/workspace-github/incubator-rocketmq➜ incubator-rocketmq git:(master)
下载的是 rocketmq的master分支。
然后执行mvn install,并指定-P参数(相应的profile)
➜ incubator-rocketmq git:(master) mvn clean package install -Prelease-all -Dmaven.test.skip=true -U
使用-U参数:该参数能强制让Maven检查所有SNAPSHOT依赖更新,确保集成基于最新的状态,如果没有该参数,Maven默认以天为单位检查更新,而持续集成的频率应该比这高很多。
进入distribution/target目录,
➜ incubator-rocketmq git:(master) cd distribution/target/apache-rocketmq➜ apache-rocketmq git:(master) lltotal 56-rw-r--r-- 1 xinxingegeya staff 524B 2 23 14:40 DISCLAIMER-rw-r--r-- 1 xinxingegeya staff 15K 6 8 10:35 LICENSE-rw-r--r-- 1 xinxingegeya staff 1.3K 6 8 10:35 NOTICE-rw-r--r-- 1 xinxingegeya staff 2.4K 6 8 10:35 README.mddrwxr-xr-x 36 xinxingegeya staff 1.2K 6 13 18:27 bindrwxr-xr-x 10 xinxingegeya staff 340B 6 8 10:35 confdrwxr-xr-x 25 xinxingegeya staff 850B 6 13 18:27 lib
运行Name Server
➜ apache-rocketmq-all git:(master) ✗ nohup sh bin/mqnamesrv &[1] 43055appending output to nohup.out➜ apache-rocketmq-all git:(master) ✗ tail -f ~/logs/rocketmqlogs/namesrv.log2017-05-15 14:07:28 INFO main - serverOnewaySemaphoreValue=2562017-05-15 14:07:28 INFO main - serverAsyncSemaphoreValue=642017-05-15 14:07:28 INFO main - serverChannelMaxIdleTimeSeconds=1202017-05-15 14:07:28 INFO main - serverSocketSndBufSize=40962017-05-15 14:07:28 INFO main - serverSocketRcvBufSize=40962017-05-15 14:07:28 INFO main - serverPooledByteBufAllocatorEnable=true2017-05-15 14:07:28 INFO main - useEpollNativeSelector=false2017-05-15 14:07:28 INFO main - load KV config table OK2017-05-15 14:07:28 INFO NettyEventExecuter - NettyEventExecuter service started2017-05-15 14:07:28 INFO main - The Name Server boot success. serializeType=JSON
运行Broker
➜ apache-rocketmq-all git:(master) ✗ nohup sh bin/mqbroker -n localhost:9876 &[2] 43087appending output to nohup.out➜ apache-rocketmq-all git:(master) ✗ tail -f ~/logs/rocketmqlogs/broker.log2017-05-15 14:08:25 INFO main - load exist subscription group, SubscriptionGroupConfig [groupName=FooBarGroup, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]2017-05-15 14:08:25 INFO main - load exist subscription group, SubscriptionGroupConfig [groupName=TOOLS_CONSUMER, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]2017-05-15 14:08:25 INFO main - load exist subscription group, SubscriptionGroupConfig [groupName=CID_ONS-HTTP-PROXY, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]2017-05-15 14:08:25 INFO main - load exist subscription group, SubscriptionGroupConfig [groupName=FILTERSRV_CONSUMER, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]2017-05-15 14:08:25 INFO main - load exist subscription group, SubscriptionGroupConfig [groupName=CID_ONSAPI_PULL, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]2017-05-15 14:08:25 INFO main - load /Users/xinxingegeya/store/config/subscriptionGroup.json OK2017-05-15 14:08:25 INFO main - Set user specified name server address: localhost:98762017-05-15 14:08:25 INFO PullRequestHoldService - PullRequestHoldService service started2017-05-15 14:08:26 INFO main - register broker to name server localhost:9876 OK2017-05-15 14:08:26 INFO main - The broker[Yale-Li, 10.99.24.152:10911] boot success. serializeType=JSON and name server is localhost:98762017-05-15 14:08:35 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes2017-05-15 14:08:35 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes2017-05-15 14:08:36 INFO BrokerControllerScheduledThread1 - register broker to name server localhost:9876 OK
关闭命令如下,
> sh bin/mqshutdown brokerThe mqbroker(36695) is running...Send shutdown request to mqbroker(36695) OK> sh bin/mqshutdown namesrvThe mqnamesrv(36664) is running...Send shutdown request to mqnamesrv(36664) OK
RocketMQ Console 的安装
下载地址:https://github.com/apache/rocketmq-externals
mvn spring-boot:run
执行该命令后,通过浏览器打开控制台,默认端口是8080
简单的 Producer 和 Consumer
生产者
package com.rocketmq.demo;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;/** * Created by xinxingegeya on 2017/5/13. */public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("mytest_producer_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); for (int i = 0; i < 1000; i++) { try { // Create a message instance, specifying topic, tag and message body. Message msg = new Message( "TopicTest1" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } // Shut down once the producer instance is not longer in use. producer.shutdown(); }}
消费者
package com.rocketmq.demo;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/** * Created by xinxingegeya on 2017/5/15. */public class Consumer { /** * push模式是由consumer把轮询过程封装. * 并注册MessageListener监听器,取到消息后调用MessageListener的consumeMessage()来消费, * 对用户而言,感觉消息是被推送过来的. * * @param args * @throws MQClientException */ public static void main(String args[]) throws MQClientException { /** * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例 * 注意:ConsumerGroupName需要由应用来保证唯一 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("mytest_consumer_group_name"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setInstanceName("Consumer"); /** * 订阅指定topic下tags分别等于TagA或TagC或TagD */ consumer.subscribe("TopicTest1", "TagA || TagC || TagD"); /** * 订阅指定topic下所有消息 * 注意:一个consumer对象可以订阅多个topic */ consumer.subscribe("TopicTest2", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { /** * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息 */ public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size()); MessageExt msg = msgs.get(0); if (msg.getTopic().equals("TopicTest1")) { // 执行TopicTest1的消费逻辑 if (msg.getTags() != null && msg.getTags().equals("TagA")) { // 执行TagA的消费 System.out.println(new String(msg.getBody())); } else if (msg.getTags() != null && msg.getTags().equals("TagC")) { // 执行TagC的消费 } else if (msg.getTags() != null && msg.getTags().equals("TagD")) { // 执行TagD的消费 } } else if (msg.getTopic().equals("TopicTest2")) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer对象在使用之前必须要调用start初始化,初始化一次即可 */ consumer.start(); System.out.println("Consumer Started."); }}
============END============