本文共 8774 字,大约阅读时间需要 29 分钟。
RabbitMQ:小规模场景,吞吐量低,性能高,功能全面,erlang语言不好控制;
kafka:吞吐量大,会丢失数据,日志分析,大数据采集; RocketMQ:兼具了上面两种的优点;RocketMQ集群:(安装不用了解太多)在Broker Cluster中会有多组主从RabbitMQ,主节点负责数据交互,从节点负责数据备份,生产者与消费者不与集群直接交互,而是通过NameServer Cluster作为管理者进行交互。有点类似微服务架构;RocketMQ并没有提供RabbitMQ一样的图形化界面;但是有一个社区项目rocketmq-externals-master,项目上有rocketmq-console组件,提供图形化管理界面;运行他就可以访问图形化界面;
RocketMQ发送方式 单向发送 生产者向MQ推送完消息后,就放手不管了,效率最高。DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");// producer.setNamesrvAddr("192.168.232.128:9876"); producer.start(); try { { Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.sendOneway(msg); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown();
消费者:
核心代码解析:
发送的目标:TopicTest; 发送的内容:Hello world 对消息进行过滤分组:TagA,OrderID188 (暂时先放着) 需要在电脑上配置好环境变量,即可直接连接mq,否则使用 producer.setNamesrvAddr(“192.168.232.128:9876”);直接连上ip也能连接mq; sendOneway方法就是单向发送。 producer.sendOneway(msg); 没有返回值,不知道消息是否发送成功; 同步发送 生产者往mq发送消息,需要等待mq给生产者响应,他才继续执行自己的逻辑。好处在于成功失败非常明确; 把上面的producer.sendOneway(msg);方法改成SendResult sendResult = producer.send(msg);即是同步发送了;会返回一个SendResult,从里面可以进行判断。失败会进行重试; 异步发送 生产者往mq发送消息,就直接进行自己的逻辑了。但是生产者中有一个回调的方法会去处理mq返回的消息,但是生产者执行逻辑不会管这个,他只是去执行自己原有的逻辑。 代码实现: 将上面的producer.sendOneway(msg);改成下面这个即可,一个是回调失败的消息,一个是回调成功的消息;producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { countDownLatch.countDown(); System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { countDownLatch.countDown(); System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } });
我们如果需要保证回调函数执行完毕再继续执行代码可以加入
CountDownLatch 去执行,给他配置可能出现的执行次数进行计次;在所有消息都回调完毕后再去shutdown处理。 三者区别:单向发送最快,但是消息出错,丢失概率最大。同步发送最慢,但是消息最安全,异步发送介于两者之间; RocketMQ接收方式推模式:由Broker收到消息后主动推送给消费者;
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");// consumer.setNamesrvAddr("192.168.232.128:9876"); consumer.subscribe("TopicTest", "*");//按照生产者上面那个TagA去过滤 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n");
核心方法:
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { } }); 创建一个监听,当有消息来了,就会往消费者中发送消息,返回给Broker状态,没有往Broker中发送请求,而是等待。 拉模式:消费者主动去Broker上拉取消息;Setmqs = consumer.fetchSubscribeMessageQueues("TopicTest"); //MessageQueue是发布者和消费者消息的最小单位 for (MessageQueue mq : mqs) { //遍历拉取消息 System.out.printf("Consume from the queue: %s%n", mq); SINGLE_MQ: while (true) { try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); //null tarA的过滤 32最大消息数量 getMessageQueueOffset(mq)消息的偏移量,防止重复消费,知道消费到哪里了 System.out.printf("%s%n", pullResult); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); //判断当前消息的状态 switch (pullResult.getPullStatus()) { case FOUND: break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } }
核心代码:
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); null tarA的过滤 32最大消息数量 getMessageQueueOffset(mq)消息的偏移量,防止重复消费,知道消费到哪里了 getMessageQueueOffset方法中需要自行去管理偏移量OFFSE_TABLE.put(mq, offset); 新版拉取消息傻瓜模式DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test"); litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); litePullConsumer.subscribe("TopicTest", "*"); litePullConsumer.start(); try { while (running) { ListmessageExts = litePullConsumer.poll(); System.out.printf("%s%n", messageExts); } } finally { litePullConsumer.shutdown(); } }
不用管偏移量之类的东西,指定我们想要的内容,去拉取就可以了。
第三种拉取模式;定制模式CollectionmqSet = litePullConsumer.fetchMessageQueues("TopicTest"); List list = new ArrayList<>(mqSet); List assignList = new ArrayList<>(); for (int i = 0; i < list.size() / 2; i++) { assignList.add(list.get(i)); } litePullConsumer.assign(assignList); litePullConsumer.seek(assignList.get(0), 10); try { while (running) { List messageExts = litePullConsumer.poll(); System.out.printf("%s %n", messageExts); litePullConsumer.commitSync(); } } finally { litePullConsumer.shutdown(); }
依旧自己管理偏移量,我们可以管理偏移,知道我们消费到了哪个消息,如果想重复消费消息直接去获取偏移量(可以存到redis中),从偏移量的位置去开始重复消费。更加的灵活。
顺序消息 保证局部消息有序,并不保证全部消息有序;比如有10个订单,完成每个订单需要处理5个步骤,而我们顺序消息保证每个订单各自的5条消息有序,并不会管和订单之间的消息关系。 有序性需要生产者和消费者配合才能够做到有序性; 在生产者端将一个订单的消息放在一个队列中,多少个订单就多少个队列,而消费者端则按队列有序性去mq中拿消息; 生产者代码for (int i = 0; i < 10; i++) { int orderId = i; for(int j = 0 ; j <= 5 ; j ++){ Message msg = new Message("OrderTopicTest", "order_"+orderId, "KEY" + orderId, ("order_"+orderId+" step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(Listmqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.printf("%s%n", sendResult); } }
在生产者中,采用同一个reder去发送一个订单的消息,
在消费者中监听消息consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); for(MessageExt msg:msgs){ System.out.println("收到消息内容 "+new String(msg.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } });
采用MessageListenerOrderly有序的从一个队列中取得消息;
放在一个队列中当然也能保证全部有序,但是效率很低,没人会这样用; 广播消息 所有的消费者均能消费到的消息; consumer.setMessageModel(MessageModel.BROADCASTING); 设置消费者为广播模式,消息就会共享; 延迟消息 消息定时发送, 在发送消息的时候设置延迟级别,非常的简单; msg.setDelayTimeLevel(3); 延迟级别设置为3级别; 总共18个级别,代表了18个延迟的时常; 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 在商业版的当中可以设定时间,开源版当中只能这样设置级别; 所有开源版本,阿里巴巴都有保留。 批量消息 把多个消息组在一起传过去。 String topic = “BatchTest”; List messages = new ArrayList<>(); messages.add(new Message(topic, “Tag”, “OrderID001”, “Hello world 0”.getBytes())); messages.add(new Message(topic, “Tag”, “OrderID002”, “Hello world 1”.getBytes())); messages.add(new Message(topic, “Tag”, “OrderID003”, “Hello world 2”.getBytes())); producer.send(messages); 限制:消息的大小限制,官网上说是不能超过1M,但实际上可以扩大到4M这样子; ListSplitter splitter = new ListSplitter(messages); while (splitter.hasNext()) { List listItem = splitter.next(); producer.send(listItem); } 把消息拆分到发送过去;转载地址:http://rsqrn.baihongyu.com/