`

kafka生产者写入大消息

 
阅读更多

最近项目遇到一个特殊场景,需要kafka传递100万条数据过去,1个G左右,由于其他环节限制,不能进行拆包。

 

一开始生产者一直报网络问题,经过需要修改如下参数,为了探寻之前说的不能超过1G的说法,把所有参数上限都设置成了接近2G

  

config/server.properties

 

socket.request.max.bytes=2048576000  

log.segment.bytes=2073741824

message.max.bytes=2048576000

replica.fetch.max.bytes=2048576000

fetch.message.max.bytes=2048576000  ---这个好像不应该设置在server.propeties里面,这个是consumer的参数,后续验证吧。

replica.socket.timeout.ms=300000   --这个参数好像也不需要设置,但是需要在生产者里设置request.timeout.ms。否则,发送时间过长导致发送失败。

 

参数说明:

 

socket.request.max.bytes =100*1024*1024

socket请求的最大数值,防止serverOOMmessage.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖

 

 

log.segment.bytes =1024*1024*1024

topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖

 

message.max.bytes =6525000

表示消息体的最大大小,单位是字节

 

replica.fetch.max.bytes =1024*1024

replicas每次获取数据的最大大小

 

fetch.message.max.bytes=1024*1024

每个拉取请求的每个topic分区尝试获取的消息的字节大小。这些字节将被读入每个分区的内存,因此这有助于控制消费者使用的内存。 拉取请求的大小至少与服务器允许的最大消息的大小一样大,否则生产者可能发送大于消费者可以拉取的消息。

 

replica.socket.timeout.ms

网络请求的socket超时,该值最少是replica.fetch.wait.max.ms

 

 

 

 

 生产者设定

  props.put("max.request.size", 2073741824);

  props.put("buffer.memory", 2073741824);

  props.put("timeout.ms", 30000000);

  props.put("request.timeout.ms", 30000000);

分享到:
评论

相关推荐

    对python操作kafka写入json数据的简单demo分享

    生产者demo 向test_lyl2主题中循环写入10条json数据 注意事项:要写入json数据需加上value_serializer参数,如下代码 ''' producer = KafkaProducer( value_serializer=lambda v: json.dumps(v).encode('utf-8')...

    03-Kafka生产者--向Kafka写入数据(Java)-附件资源

    03-Kafka生产者--向Kafka写入数据(Java)-附件资源

    企业级消息队列Kafka视频教程

    Kafka生产者幂等性与事务 38 7.1 幂等性 第八章 分区和副本机制 8.1 生产者分区写入策略 8.2 消费者组Rebalance机制 8.3 消费者分区分配策略 8.4 副本机制 第九章 高级(High Level)API与低级(Low Level)API 9.1...

    node-red-contrib-kafka-nodes:节点将Apache Kafka集成到NodeRed中。 生产者和消费者有单独的节点

    这是NodeRed内部的Apache Kafka消费者和生产者客户端的实现。 这些使您能够收集和使用将通过Kafka服务器传递的事件。 它易于使用:将适当的节点拖到上颚,填写Zookeeper服务器信息,将其连接到其他节点,然后进行...

    kafka_producer.zip

    python模拟kafka生产者, 读取配置文件, 根据配置文件的信息, 向kafka中写入数据信息。

    springboot-kafka

    kafka生产者扫描任务表中数据写入kafka中,消费者根据不同的任务数据执行不同的业务逻辑

    kafka概述及原理.pdf

    每个Broker都是独立的,并且可以处理来自生产者和消费者的请求。这种分布式架构使得Kafka具有高可用性,即使部分Broker发生故障,系统仍然可以正常运行。此外,Kafka使用复制机制来确保数据的容错性,每个分区都有多...

    kafka 生产者

    kafka 生产者 创建一个包含目标主题和内容的 ProducerRecord 对象,可指定键或分区,发送前要把对象序列化成字符数组。 数据被传给分区器,如果指定了分区就直接把指定的分区返回。如果没有指定分区,分区器就根据 ...

    maxwell,maxwell的守护进程,一个mysql到json的kafka生产者.zip

    这是maxwell的守护进程,一个读取mysql binlogs并写入

    kafka连接池_python版本

    由于kafka在写入时会存在并发问题,采用连接池思想,抽取一种连接池的方式,连接池是采用Apache pool作为池管理,然后将生产者的连接点放到池中,在编译时需注意kafka版本问题以及所对应的scala,kafka版本是kafka_...

    基于Kafka的多台远程服务器上的网页文件接入到本地的技术方案以及Flume+Kafka调研

    远程服务器编写程序, 不停读取文件的内容成为一个字符串,然后再加上来自的网站、下载日期等信息,组合成一个JSON字符串,通过调用kafka的生产者API将字符串写入Kafka。 2、JSON数据格式设计: { “source”: ...

    kafka-tutorial-demo

    关于Kafka生产者/消费者/ Kafka Connect / Kafka Streams和KSQl的文章 创建kafka生产者和消费者API Suriya NUS示例Kafka项目: : 在本地运行Kafka群集: 在本地Mahcine上运行的Kafka Handy命令参考链接: ://...

    kafka-consumer-producer:这是一个Kafka Spring Boot应用程序,可监听一个主题并将其写入另一个主题

    消费者生产者 这是一个监听的Kafka Spring Boot应用程序 程序参数 -Dspring.application.name =消费者生产者服务 融合卡夫卡命令 融合本地服务开始 融合的本地服务站 融合局部破坏 生产者命令 kafka-topics.sh --...

    kafka基础笔记

    kafka是一个消息队列系统,不同的生产者和消费者,可以向kafka写入自己的数据,实现数据的流动;kafka解耦了生产者和消费者,它的ack机制保证了消息传递的可靠性,借助zookeeper等实现了分布式的消息队列。 生产者...

    深入了解如何基于Python读写Kafka

    本篇会给出如何使用python来读写kafka, 包含生产者和消费者. 以下使用kafka-python客户端 生产者 爬虫大多时候作为消息的发送端, 在消息发出去后最好能记录消息被发送到了哪个分区, offset是多少, 这些记录在很多...

    epochConvertor:从KAFKA主题读取纪元并将RFC3339写入另一个主题

    生产者:将RFC3339格式的时间转换为kafka并产生时间 指标:从生产者和消费者收集指标。 配置导出器 姓名 描述 类型 默认 METRICS_PATH 指标路径 细绳 /指标 LISTEN_PORT 出口商监听端口 数字 8081 配置使用者 姓名...

    kafka保证数据可靠性的方式

    生产者可以进行有关配置,使得不一定等到数据认为是已提交的之后,才进行下一轮的投递,这是在可用性和一致性的之间的平衡 分区副本复制方式和同步条件: 每个分区所在的broker需要向分区首领所在的broker每6s(可...

Global site tag (gtag.js) - Google Analytics