博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RocketMq入门
阅读量:3920 次
发布时间:2019-05-23

本文共 8774 字,大约阅读时间需要 29 分钟。

RabbitMQ:小规模场景,吞吐量低,性能高,功能全面,erlang语言不好控制;

kafka:吞吐量大,会丢失数据,日志分析,大数据采集;
RocketMQ:兼具了上面两种的优点;

RocketMQ集群:(安装不用了解太多)在Broker Cluster中会有多组主从RabbitMQ,主节点负责数据交互,从节点负责数据备份,生产者与消费者不与集群直接交互,而是通过NameServer Cluster作为管理者进行交互。有点类似微服务架构;RocketMQ并没有提供RabbitMQ一样的图形化界面;但是有一个社区项目rocketmq-externals-master,项目上有rocketmq-console组件,提供图形化管理界面;运行他就可以访问图形化界面;

RocketMQ发送方式
单向发送
生产者向MQ推送完消息后,就放手不管了,效率最高。

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");//        producer.setNamesrvAddr("192.168.232.128:9876");        producer.start();            try {
{
Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.sendOneway(msg); } } catch (Exception e) {
e.printStackTrace(); } producer.shutdown();

消费者:

核心代码解析:

在这里插入图片描述
发送的目标:TopicTest;
发送的内容:Hello world
对消息进行过滤分组:TagA,OrderID188 (暂时先放着)
需要在电脑上配置好环境变量,即可直接连接mq,否则使用
producer.setNamesrvAddr(“192.168.232.128:9876”);直接连上ip也能连接mq;
sendOneway方法就是单向发送。
producer.sendOneway(msg);
没有返回值,不知道消息是否发送成功;
同步发送
生产者往mq发送消息,需要等待mq给生产者响应,他才继续执行自己的逻辑。好处在于成功失败非常明确;
把上面的producer.sendOneway(msg);方法改成SendResult sendResult = producer.send(msg);即是同步发送了;会返回一个SendResult,从里面可以进行判断。失败会进行重试;
异步发送
生产者往mq发送消息,就直接进行自己的逻辑了。但是生产者中有一个回调的方法会去处理mq返回的消息,但是生产者执行逻辑不会管这个,他只是去执行自己原有的逻辑。
代码实现:
将上面的producer.sendOneway(msg);改成下面这个即可,一个是回调失败的消息,一个是回调成功的消息;

producer.send(msg, new SendCallback() {
@Override public void onSuccess(SendResult sendResult) {
countDownLatch.countDown(); System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) {
countDownLatch.countDown(); System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } });

我们如果需要保证回调函数执行完毕再继续执行代码可以加入

CountDownLatch 去执行,给他配置可能出现的执行次数进行计次;在所有消息都回调完毕后再去shutdown处理。
三者区别:单向发送最快,但是消息出错,丢失概率最大。同步发送最慢,但是消息最安全,异步发送介于两者之间;
RocketMQ接收方式

推模式:由Broker收到消息后主动推送给消费者;

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");//        consumer.setNamesrvAddr("192.168.232.128:9876");        consumer.subscribe("TopicTest", "*");//按照生产者上面那个TagA去过滤        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);        consumer.setConsumeTimestamp("20181109221800");        consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override public ConsumeConcurrentlyStatus consumeMessage(List
msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n");

核心方法:

consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
}
});
创建一个监听,当有消息来了,就会往消费者中发送消息,返回给Broker状态,没有往Broker中发送请求,而是等待。
拉模式:消费者主动去Broker上拉取消息;

Set
mqs = consumer.fetchSubscribeMessageQueues("TopicTest"); //MessageQueue是发布者和消费者消息的最小单位 for (MessageQueue mq : mqs) {
//遍历拉取消息 System.out.printf("Consume from the queue: %s%n", mq); SINGLE_MQ: while (true) {
try {
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); //null tarA的过滤 32最大消息数量 getMessageQueueOffset(mq)消息的偏移量,防止重复消费,知道消费到哪里了 System.out.printf("%s%n", pullResult); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); //判断当前消息的状态 switch (pullResult.getPullStatus()) {
case FOUND: break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) {
e.printStackTrace(); } } }

核心代码:

consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
null tarA的过滤 32最大消息数量 getMessageQueueOffset(mq)消息的偏移量,防止重复消费,知道消费到哪里了
getMessageQueueOffset方法中需要自行去管理偏移量OFFSE_TABLE.put(mq, offset);
新版拉取消息傻瓜模式

DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");      litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);        litePullConsumer.subscribe("TopicTest", "*");        litePullConsumer.start();        try {
while (running) {
List
messageExts = litePullConsumer.poll(); System.out.printf("%s%n", messageExts); } } finally {
litePullConsumer.shutdown(); } }

不用管偏移量之类的东西,指定我们想要的内容,去拉取就可以了。

第三种拉取模式;定制模式

Collection
mqSet = litePullConsumer.fetchMessageQueues("TopicTest"); List
list = new ArrayList<>(mqSet); List
assignList = new ArrayList<>(); for (int i = 0; i < list.size() / 2; i++) {
assignList.add(list.get(i)); } litePullConsumer.assign(assignList); litePullConsumer.seek(assignList.get(0), 10); try {
while (running) {
List
messageExts = litePullConsumer.poll(); System.out.printf("%s %n", messageExts); litePullConsumer.commitSync(); } } finally {
litePullConsumer.shutdown(); }

依旧自己管理偏移量,我们可以管理偏移,知道我们消费到了哪个消息,如果想重复消费消息直接去获取偏移量(可以存到redis中),从偏移量的位置去开始重复消费。更加的灵活。

顺序消息
保证局部消息有序,并不保证全部消息有序;比如有10个订单,完成每个订单需要处理5个步骤,而我们顺序消息保证每个订单各自的5条消息有序,并不会管和订单之间的消息关系。
有序性需要生产者和消费者配合才能够做到有序性;
在生产者端将一个订单的消息放在一个队列中,多少个订单就多少个队列,而消费者端则按队列有序性去mq中拿消息;
生产者代码

for (int i = 0; i < 10; i++) {
int orderId = i; for(int j = 0 ; j <= 5 ; j ++){
Message msg = new Message("OrderTopicTest", "order_"+orderId, "KEY" + orderId, ("order_"+orderId+" step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override public MessageQueue select(List
mqs, Message msg, Object arg) {
Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.printf("%s%n", sendResult); } }

在生产者中,采用同一个reder去发送一个订单的消息,

在消费者中监听消息

consumer.registerMessageListener(new MessageListenerOrderly() {
@Override public ConsumeOrderlyStatus consumeMessage(List
msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true); for(MessageExt msg:msgs){
System.out.println("收到消息内容 "+new String(msg.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } });

采用MessageListenerOrderly有序的从一个队列中取得消息;

放在一个队列中当然也能保证全部有序,但是效率很低,没人会这样用;
广播消息
所有的消费者均能消费到的消息;
consumer.setMessageModel(MessageModel.BROADCASTING);
设置消费者为广播模式,消息就会共享;
延迟消息
消息定时发送,
在发送消息的时候设置延迟级别,非常的简单;
msg.setDelayTimeLevel(3);
延迟级别设置为3级别;
总共18个级别,代表了18个延迟的时常;
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
在商业版的当中可以设定时间,开源版当中只能这样设置级别;
所有开源版本,阿里巴巴都有保留。
批量消息
把多个消息组在一起传过去。
String topic = “BatchTest”;
List messages = new ArrayList<>();
messages.add(new Message(topic, “Tag”, “OrderID001”, “Hello world 0”.getBytes()));
messages.add(new Message(topic, “Tag”, “OrderID002”, “Hello world 1”.getBytes()));
messages.add(new Message(topic, “Tag”, “OrderID003”, “Hello world 2”.getBytes()));
producer.send(messages);
限制:消息的大小限制,官网上说是不能超过1M,但实际上可以扩大到4M这样子;
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
List listItem = splitter.next();
producer.send(listItem);
}
把消息拆分到发送过去;

转载地址:http://rsqrn.baihongyu.com/

你可能感兴趣的文章
固态硬盘与普通硬盘区别
查看>>
Linux压缩文件格式总结
查看>>
ethtool命令
查看>>
修改ip时发现的问题
查看>>
修改路由
查看>>
systool指令
查看>>
dos2unix指令
查看>>
centos7安装dos2unix
查看>>
centos系统安装tar/rpm文件
查看>>
/etc/sysconfig/network-scripts下文件
查看>>
centos7安装ansible遇到的问题
查看>>
centos安装ansible及其依赖包
查看>>
常用ansible命令
查看>>
paramiko
查看>>
dhcp简单配置
查看>>
dhcp安装出现的问题
查看>>
划分子网
查看>>
nmcli中的几种模式
查看>>
建立bond的几种参数
查看>>
查看建立的bond的状态
查看>>