package com.paile.kafka.service.impl; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import org.apache.log4j.Logger; import com.paile.command.MessageConsumerCommand; import com.paile.command.receiver.message.MessageConsumerReceiver; import com.paile.kafka.bean.MessageBean; import com.paile.utils.others.ObjectStreamManager; public class GroupConsumerManager { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; private Logger logger = Logger.getLogger(GroupConsumerManager.class); public GroupConsumerManager(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); } public void run(int a_numThreads)throws Exception { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); System.out.println(streams.size()); // executor = Executors.newFixedThreadPool(a_numThreads); // for (final KafkaStream<byte[], byte[]> stream : streams) { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while(it.hasNext()){ byte[] bt = it.next().message(); try { Object object = null; try { object = ObjectStreamManager.getInstance().toObject(bt); if(object!=null&&object instanceof MessageBean){ //接收到消息 把消息封装成MessageConsumerCommand命令,交后续执行 MessageBean bean = (MessageBean) object; MessageConsumerReceiver receiver = new MessageConsumerReceiver(bean); MessageConsumerCommand command = new MessageConsumerCommand(receiver); command.execute(); } } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { throw e; } } } } private ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "15000"); props.put("zookeeper.sync.time.ms", "2000"); props.put("auto.commit.interval.ms", "10000"); return new ConsumerConfig(props); } public static void main(String[] args) { String zooKeeper = "192.168.1.101:2181"; String groupId = "pailegroup"; String topic = "paile01"; int threads = Integer.parseInt("1"); GroupConsumerManager example = new GroupConsumerManager(zooKeeper, groupId, topic); System.out.println("--------"); try { example.run(threads); } catch (Exception e) { e.printStackTrace(); } try { Thread.sleep(10000); } catch (InterruptedException ie) { } example.shutdown(); } }
相关推荐
消息中间件kafka 进行消息的分发和接收示例 应用中包含分通道发送和多线程接收
c# kafka 发送端与接收 完整demo 生成 消费
kafka 发送和接收消息-Java版
kettle kafka 消息者插件,用于集成到kettle,接收Kafka消息。
unity利用kafka接收数据,只需填写ip端口,topic 即可接收消息;适用范围,unity编辑器,发布PC应用 说明:如果发布PC不可用,请手动将Plugins\X64文件夹里的dll 文件拷贝到发布文件kafka-Test_Data\Managed 路径下...
使用netty实现TCP长链接消息写入kafka以及kafka批量消费数据,数据可以批量进行操作
.net core 使用kafka推送消息和接收消息,包含生产端和消费端
kafka-connect-twitter, Kafka 连接接收器/Twitter Kafka 连接 TwitterTwitter的Kafka 连接。 提供了源( 从 Twitter 到 Kafka ) 和接收器( 从 Kafka 到 Twitter ):接收接收来自 Kafka的普通字符串,这是使用 Tw
java操作kafka实例,包括发送和接收任务.zip
简单介绍了如何在Spring Cloud中使用RabbitMQ和Kafka来完成消息发送与接收
Springboot整合kafka做消息通信 1 一、 Kafka原理: 2 二、 应用搭建: 6 三、 Topi与死信主题 四、 Consumer 多线程 11 五、 关于kafka分区: 11 六、 Kafka日志刷新政策配置 13 七、 关于批量消息处理: 13...
kafka下载、安装、配置及完整运行代码
可视化kafka测试工具,配置好broker地址即可模拟发送topic消息
主要给大家介绍了关于spring-boot整合spring-kafka实现发送接收消息的相关资料,文中介绍的非常详细,对大家具有一定的参考学习价值,需要的朋友们下面跟着小编一起来看看吧。
生产者用于向Kafka主题发送消息,而消费者用于从Kafka主题接收消息。 生产者代码包括以下部分: 创建一个Properties对象,设置Kafka生产者的配置。这些配置包括Kafka服务器地址、主题名称等。 创建一个...
kafka solution to assignment
整合flume、kafka以此接收消息时的执行步骤
pykafka使用示例,自动消费最新消息,不重复消费: # -* coding:utf8 *- from pykafka import KafkaClient host = '192.168.200.38' client = KafkaClient(hosts="%s:9092" % host) print client.topics # 生产者 #...
ngx_kafka_module, Nginx Kafka 模块,将日志日志数据发送到 Kafka 群集 Nginx Kafka 模块 Nginx Kafka MODULE 用于接收 HTTP POST 数据并向 Kafka 传递消息。如果使用这里 MODULE 时存在任何问题,请随时向我发送...
最后, store-streams会监听来自Kafka消息,使用Kafka Streams对待它们,然后将新消息推回到Kafka 。 项目图 应用领域 商店API 整体式应用程序,它公开了REST API来管理Customers , Products和Orders 。 数据存储...