最近项目遇到一个特殊场景,需要kafka传递100万条数据过去,1个G左右,由于其他环节限制,不能进行拆包。
一开始生产者一直报网络问题,经过需要修改如下参数,为了探寻之前说的不能超过1G的说法,把所有参数上限都设置成了接近2G
config/server.properties
socket.request.max.bytes=2048576000
log.segment.bytes=2073741824
message.max.bytes=2048576000
replica.fetch.max.bytes=2048576000
fetch.message.max.bytes=2048576000 ---这个好像不应该设置在server.propeties里面,这个是consumer的参数,后续验证吧。
replica.socket.timeout.ms=300000 --这个参数好像也不需要设置,但是需要在生产者里设置request.timeout.ms。否则,发送时间过长导致发送失败。
参数说明:
socket.request.max.bytes =100*1024*1024 |
socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖 |
log.segment.bytes =1024*1024*1024 |
topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖 |
message.max.bytes =6525000 |
表示消息体的最大大小,单位是字节 |
replica.fetch.max.bytes =1024*1024 |
replicas每次获取数据的最大大小 |
fetch.message.max.bytes=1024*1024 |
每个拉取请求的每个topic分区尝试获取的消息的字节大小。这些字节将被读入每个分区的内存,因此这有助于控制消费者使用的内存。 拉取请求的大小至少与服务器允许的最大消息的大小一样大,否则生产者可能发送大于消费者可以拉取的消息。 |
replica.socket.timeout.ms |
网络请求的socket超时,该值最少是replica.fetch.wait.max.ms |
生产者设定
props.put("max.request.size", 2073741824);
props.put("buffer.memory", 2073741824);
props.put("timeout.ms", 30000000);
props.put("request.timeout.ms", 30000000);
相关推荐
生产者demo 向test_lyl2主题中循环写入10条json数据 注意事项:要写入json数据需加上value_serializer参数,如下代码 ''' producer = KafkaProducer( value_serializer=lambda v: json.dumps(v).encode('utf-8')...
03-Kafka生产者--向Kafka写入数据(Java)-附件资源
Kafka生产者幂等性与事务 38 7.1 幂等性 第八章 分区和副本机制 8.1 生产者分区写入策略 8.2 消费者组Rebalance机制 8.3 消费者分区分配策略 8.4 副本机制 第九章 高级(High Level)API与低级(Low Level)API 9.1...
这是NodeRed内部的Apache Kafka消费者和生产者客户端的实现。 这些使您能够收集和使用将通过Kafka服务器传递的事件。 它易于使用:将适当的节点拖到上颚,填写Zookeeper服务器信息,将其连接到其他节点,然后进行...
python模拟kafka生产者, 读取配置文件, 根据配置文件的信息, 向kafka中写入数据信息。
kafka生产者扫描任务表中数据写入kafka中,消费者根据不同的任务数据执行不同的业务逻辑
每个Broker都是独立的,并且可以处理来自生产者和消费者的请求。这种分布式架构使得Kafka具有高可用性,即使部分Broker发生故障,系统仍然可以正常运行。此外,Kafka使用复制机制来确保数据的容错性,每个分区都有多...
kafka 生产者 创建一个包含目标主题和内容的 ProducerRecord 对象,可指定键或分区,发送前要把对象序列化成字符数组。 数据被传给分区器,如果指定了分区就直接把指定的分区返回。如果没有指定分区,分区器就根据 ...
这是maxwell的守护进程,一个读取mysql binlogs并写入
由于kafka在写入时会存在并发问题,采用连接池思想,抽取一种连接池的方式,连接池是采用Apache pool作为池管理,然后将生产者的连接点放到池中,在编译时需注意kafka版本问题以及所对应的scala,kafka版本是kafka_...
远程服务器编写程序, 不停读取文件的内容成为一个字符串,然后再加上来自的网站、下载日期等信息,组合成一个JSON字符串,通过调用kafka的生产者API将字符串写入Kafka。 2、JSON数据格式设计: { “source”: ...
关于Kafka生产者/消费者/ Kafka Connect / Kafka Streams和KSQl的文章 创建kafka生产者和消费者API Suriya NUS示例Kafka项目: : 在本地运行Kafka群集: 在本地Mahcine上运行的Kafka Handy命令参考链接: ://...
消费者生产者 这是一个监听的Kafka Spring Boot应用程序 程序参数 -Dspring.application.name =消费者生产者服务 融合卡夫卡命令 融合本地服务开始 融合的本地服务站 融合局部破坏 生产者命令 kafka-topics.sh --...
kafka是一个消息队列系统,不同的生产者和消费者,可以向kafka写入自己的数据,实现数据的流动;kafka解耦了生产者和消费者,它的ack机制保证了消息传递的可靠性,借助zookeeper等实现了分布式的消息队列。 生产者...
本篇会给出如何使用python来读写kafka, 包含生产者和消费者. 以下使用kafka-python客户端 生产者 爬虫大多时候作为消息的发送端, 在消息发出去后最好能记录消息被发送到了哪个分区, offset是多少, 这些记录在很多...
生产者:将RFC3339格式的时间转换为kafka并产生时间 指标:从生产者和消费者收集指标。 配置导出器 姓名 描述 类型 默认 METRICS_PATH 指标路径 细绳 /指标 LISTEN_PORT 出口商监听端口 数字 8081 配置使用者 姓名...
生产者可以进行有关配置,使得不一定等到数据认为是已提交的之后,才进行下一轮的投递,这是在可用性和一致性的之间的平衡 分区副本复制方式和同步条件: 每个分区所在的broker需要向分区首领所在的broker每6s(可...