package com.paile.kafka.service.impl; import java.util.Properties; import java.util.Random; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.apache.log4j.Logger; import com.paile.kafka.bean.MessageBean; import com.paile.kafka.service.IKafkaService; import com.paile.utils.others.Const; /*** * kafka消息服务类 * * @author libo * */ public class KafkaServiceImpl implements IKafkaService { private Logger logger = Logger.getLogger(KafkaServiceImpl.class); /*** * 发送一条消息 */ public void sendSinglePartitionMessage(String broke_list, String topic, Object message) throws Exception { Producer<Integer, Object> producer = null; try { Properties props = new Properties(); props.put("metadata.broker.list", broke_list); props.put("serializer.class", "com.paile.kafka.CustomEncoder"); props.put("key.serializer.class", "kafka.serializer.StringEncoder"); props.put("producer.type", "sync");//是否同步 sync:同步 async:异步 props.put("request.required.acks", "1"); producer = new Producer<Integer, Object>(new ProducerConfig(props)); KeyedMessage<Integer, Object> data = new KeyedMessage<Integer, Object>(topic, Const.defaultPartitionKey, message); producer.send(data); } catch (Exception e) { logger.error("发送消息到Kafka失败,", e); throw e; }finally{ if(producer!=null) producer.close(); } } /*** * 发送多分区消息 */ public void sendMutilPartitionMessage(String broke_list, String topic,MessageBean message) throws Exception { Producer<String, MessageBean> producer = null; try { Properties props = new Properties(); props.put("metadata.broker.list", broke_list); props.put("serializer.class", "com.paile.kafka.productor.CustomEncoder"); props.put("key.serializer.class", "kafka.serializer.StringEncoder"); props.put("producer.type", "async");//是否同步 sync:同步 async:异步 props.put("partitioner.class", "com.paile.kafka.productor.SimplePartitioner");//分区算法类 props.put("request.required.acks", "1"); producer = new Producer<String, MessageBean>(new ProducerConfig(props)); Random random = new Random(); int partitionKey = random.nextInt(255); KeyedMessage<String, MessageBean> data = new KeyedMessage<String, MessageBean>(topic, String.valueOf(partitionKey), message); producer.send(data); } catch (Throwable e) { logger.error("发送消息到Kafka失败,", e); System.out.println(e.getMessage()); throw new Exception(e.getMessage()); }finally{ if(producer!=null) producer.close(); } } /*** * 接收消息 */ @Override public void startConsumer(String zookeeperConnect, String groupId, String topic,int threads) throws Exception { GroupConsumerManager example = new GroupConsumerManager(zookeeperConnect, groupId, topic); try { example.run(threads); } catch (Exception e) { e.printStackTrace(); } try { Thread.sleep(10000); } catch (InterruptedException ie) { } example.shutdown(); } public static void main(String[] args){ KafkaServiceImpl service = new KafkaServiceImpl(); try { for(int i=0;i<10;i++){ MessageBean bean = new MessageBean(); bean.setId("00"+i); bean.setData("111111111111111"); bean.setImg(new byte[0]); service.sendMutilPartitionMessage("192.168.1.101:9092", "paile01",bean); } System.out.println(""); } catch (Exception e) { e.printStackTrace(); } } }
相关推荐
星环kafka使用方法,基于星环大数据平台的kafka,了解kafka的基本功能和使用,内部培训文档
java程序实现springboot集成kafka进行消息发布和订阅并打包,可直接进行linux主机应用。支持动态修改kafka集群配置,和监听主题;支持动态指向jdk版本。
Kafka是一个分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。
kafka消息队列消费者生产者读写样例。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。
使用springboot整合kafka,并进行基于kafka的发布订阅消息队列模式的消息发布与消费测试。
基于Kafka的消息发布订阅服务的设计与实现_卢帅.caj
kafka分布式发布订阅消息系统 v3.3.1.tgz
kafka分布式发布订阅消息系统 v2.6.3.tgz
参与翻译(4人):fbm, 飞翔的猴子, Khiyuan, nesteaa 感谢这些同志们的辛勤工作,翻译的真不错,目前见到的最好的Kafka中文文章
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量...
教程视频:Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统
kafka的本质 是一个数据存储平台,流平台 , 只是他在做消息发布,消息消费的时候我们可以把他当做消息中间件来用。 而且kafka在设计之初就是采用分布式架构设计的, 基于集群的方式工作,且可以自由伸缩,所以 ...
课程分为理论和实践两部分,理论部分介绍Kafka架构、技术原理以及Kafka的功能拓展及应用,实践部分带领学员深入Kafka源代码,重点分析Kafka消息发布与订阅、存储、数据处理框架等技术细节。 视频大小:2.9G
Kafka消息保留在磁盘上,并在群集内复制以防止数据丢失。 Kafka构建在ZooKeeper同步服务之上。 它与Apache Storm和Spark非常好地集成,用于实时流式数据分析。 Kafka专为分布式高吞吐量系统而设计。 Kafka往往工作...
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量...
首先,让我们来看一下基础的消息(Message)相关术语: Topic: Kafka按照Topic分类来维护消息 Producer: 我们将发布(p
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作 流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量...
Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域
unity利用kafka接收数据,只需填写ip端口,topic 即可接收消息;适用范围,unity编辑器,发布PC应用 说明:如果发布PC不可用,请手动将Plugins\X64文件夹里的dll 文件拷贝到发布文件kafka-Test_Data\Managed 路径下...
emq连接kafka,并发布消息到kafka的emq插件。插件主要使用emq的钩子函数,将数据通过ekaf发送到kafka中。