List<MessageVo> msgList = new ArrayList<MessageVo>(); try { Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(queueName); for (MessageQueue mq : mqs) { System.out.printf("Consume from the queue: " + mq + "%n"); try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); if (pullResult != null) { putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: if (pullResult.getMsgFoundList() != null && pullResult.getMsgFoundList().size() > 0) { for (MessageExt messageExt : pullResult.getMsgFoundList()) { msgList.add(new MessageVo(messageExt.getMsgId(), messageExt.getBody())); } } break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break; case OFFSET_ILLEGAL: break; default: break; } } } catch (Exception e) { e.printStackTrace(); }
private Queue<List<MessageVo>> messageQueue = new LinkedBlockingQueue<List<MessageVo>>(); DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(groupName); consumer.setNamesrvAddr(url); scheduleService = new MQPullConsumerScheduleService(groupName); scheduleService.setMessageModel(MessageModel.CLUSTERING); scheduleService.setDefaultMQPullConsumer(consumer); List<MessageVo> msgList = new ArrayList<MessageVo>(); try { Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(queueName); for (MessageQueue mq : mqs) { System.out.printf("Consume from the queue: " + mq + "%n"); try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); if (pullResult != null) { putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: if (pullResult.getMsgFoundList() != null && pullResult.getMsgFoundList().size() > 0) { for (MessageExt messageExt : pullResult.getMsgFoundList()) { msgList.add(new MessageVo(messageExt.getMsgId(), messageExt.getBody())); } } break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break; case OFFSET_ILLEGAL: break; default: break; } } } catch (Exception e) { e.printStackTrace(); } List<MessageVo> msgList = new ArrayList<MessageVo>(); try { Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(queueName); for (MessageQueue mq : mqs) { System.out.printf("Consume from the queue: " + mq + "%n"); try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); if (pullResult != null) { putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: if (pullResult.getMsgFoundList() != null && pullResult.getMsgFoundList().size() > 0) { for (MessageExt messageExt : pullResult.getMsgFoundList()) { msgList.add(new MessageVo(messageExt.getMsgId(), messageExt.getBody())); } } break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break; case OFFSET_ILLEGAL: break; default: break; } } } catch (Exception e) { e.printStackTrace(); }
https://segmentfault.com/a/1190000009007514
相关推荐
主要介绍了springboot中如何实现kafa指定offset消费,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
rocketmq推消息模式分享,讲述了基本原理,消息与消息队列,长轮询,offset存储机制,消息异常重推机制的总体概述
公司Ubuntu12.04服务器突然开机起来后就卡住不动,打印kernel offset信息 目前已尝试以下方法,均失败 1:开机启动选择Ubuntu选项时,选择recovery mode启动,仍然卡住,显示kernel offset信息 2:在开机选项...
2、广播消费(BROADCASTING) 1、CONSUME_FROM_LAST_OFFSET:默认策略, 2、CONSUME_FROM_FIRST_OFFSE
kafka tool offset explorer 2.2
offsetExplore2 实际上是 Kafka 的一个工具,用于管理和监控 Apache Kafka 中的偏移量(offset)。在 Kafka 中,偏移量是用来标识消费者在一个特定分区中的位置的标识符,它可以用来记录消费者消费消息的进度。 ...
kafak 0.9API 旧高级消费 新高级消费,指定offset消费,并且持续监控offset
Offset
offset函数高级使用方法,是中高级人员使用必学的一些常用技巧。
kafka-manager1.3.0.8 可修改offset值
比较器仿真必用,详细的仿真过程,以及EDA软件的充分介绍。
an algorithm to offset a polyline, the same funtion you can see in AutoCAD, command offset.
Apache Flink如何管理Kafka消费者offsets
共有3个安装包: kafka:kafka_2.12-2.8.0.tgz zookeeper:apache-zookeeper-3.7.0-bin.tar.gz kafka可视化工具:offsetexplorer_64bit.exe
小伙伴们好啊,今天老祝和大家一起来学习OFFSET函数的常用套路。 1、函数作用: 用于生成数据区域的引用,再将这个引用作为半成品,作为动态图表的数据源、或是作为其他函数的参数,进行二次加工。 2、函数用法: ...
kafka管理工具 监控 偏移量 消费 offset,必备技能.docx
Excel的offset函数基础教程,从零开始哦
offset()函数使用方法,这里有几个例子,可以看一下就非常明白这个用法了
包含Mac 和 windows版本, 可以连接kafka,非常方便的查看topic、consumer、consumer-group 等信息。 1、首先在Properties页签下填写好 zookeeper 地址和端口 2、再从 Advanced页签下填写 broker地址和端口
kafka-manager最新编译版1.3.3.22,开箱即用(已测试通过)。支持kafka版本2.x,解决了异常Unknown offset schema version 3