1. 配置flume
kafka-source.properties
agent1.sources = source1 agent1.channels = channel1 agent1.sinks = sink1 agent1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource agent1.sources.source1.batchSize = 5000 agent1.sources.source1.batchDurationMillis = 2000 agent1.sources.source1.kafka.bootstrap.servers = centos1:9092 agent1.sources.source1.kafka.topics = mytopic agent1.sources.source1.kafka.consumer.group.id = group1 agent1.sinks.sink1.type=logger agent1.channels.channel1.type=memory agent1.channels.channel1.capacity=1000 agent1.channels.channel1.transactionCapacity=100 agent1.sources.source1.channels=channel1 agent1.sinks.sink1.channel=channel1
2. 启动flume
flume-ng agent -n agent1 -c conf -f conf/kafka-source.properties -Dflume.root.logger=INFO,console
3.启动Kafka producer
kafka-console-producer.sh --broker-list centos1:9092 --topic mytopic
参考
http://flume.apache.org/FlumeUserGuide.html
相关推荐
Flume二次开发,支持抽取MYSQL Oracle数据库数据 以JSON格式推送至Kafka。 demo: sql_json.sources.sql_source.type = com.hbn.rdb.source.SQLSource sql_json.sources.sql_source.connectionurl = jdbc:oracle:...
Flume配置文件kafkaSource 包含Intercepter,包含正则表达式。
如果在一个Flume Agent中同时使用Kafka Source和Kafka Sink来处理events,便会遇到Kafka Topic覆盖问题,具体 表现为,Kafka Source可以正常从指定的Topic中读取数据,但在Kafka Sink中配置的目标Topic不起作用,...
Flume配置文件kafkaSource Interceptor,包含获取数据中的关键词时间日期等信息
注意flume-ng-kafka-source 已合并到即将发布的flume 1.6 中。 这个 repo不会在维护中。 该项目用于 与进行通信。 Kafka源码配置 agent_log.sources.kafka0.type = com.vipshop.flume.source.kafka.KafkaSource ...
flume-ng从数据库抽取数据到kafka,支持按数据库中时间字段,准实时抽取实时数据。已经在oracle-kafka中长期测试可用
FlumeKafkaSink 这个项目是汇插件生产,以 。...# Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = 192.168.2.102 a1.sources.r1.port = 44444 a1.channels.c1.type =
第 6 章 Flume 对接 Kafka 1)配置 flume(flume-kafka.conf) # define a1.sources = r1 a1.sinks = k1 a1.channels = c1 # source a1.sources.r1.type = exec a1.sources.r1.command = tail -F -c +0 /opt/module/...
flume-ng-sql-source-release-1.5.2.jar 用flume-ng-sql-source 从数据库抽取数据到kafka,支持sql
远程服务器编写程序, 不停读取文件的内容成为一个字符串,然后再加上来自的网站、下载日期等信息,组合成一个JSON字符串,通过调用kafka的生产者API将字符串写入Kafka。 2、JSON数据格式设计: { “source”: ...
1. 什么是kafka? 2. 消息队列介绍 3. 为什么使用消息队列? 4. kafka的特点 5. kafka的使用场景 6. kafka系统的架构...19. kafka和flume的整合—kafkasource—kafkasink 20. kafka原理加强—日志分段条件 .....
flume实现oracle增量数据读取github最新版1.4.4,Flume是最初只是一个日志收集器,但随着flume-ng-sql-source插件的出现,使得Flume从关系数据库采集数据成为可能
里面由三部分组成 1、flume连接mysql的jar,可用于同步至kafka,数据格式:json格式 2、mysql的jar,flume需要这个 3、1.9版本的flume的tar包
Flume的“ Kafka消费者”渠道这是Flume的非常高吞吐量的通道,可将Flume用作高速可靠的Kafka用户。 它走多快? 当通过10GigE连接从1个经纪人Kafka群集消耗1000个字节的事件时,它的时钟速率约为360 MB / s,并附有...
此案例使用的是IDEA开发工具,项目...使用JavaAPI操作Flink的流处理,Flink从Kafka中获取数据,执行处理后再执行输出。 根据(《Flink入门与实战》徐葳著)教材最后的综合案例改变,适合没有学习不会使用Flume的人使用
Source:从哪收集,一般使用:avro(序列化),exec(命令行),spooling(目录),taildir(目录和文件,包含offset,不会数据丢失),kafka Channel:数据存哪里:(memory,kafka,file) Sink:数据输出到哪里:(hdfs,logger,...
后来读了flume中有关与kafka集成的代码(org.apache.flume.source.kafka.KafkaSource),觉得其中使用kafka Consumer的方式比较合理,可以实现消息的批处理,所以就参考并封装了一下,变得更加通用,形成了这个项目。...
Data Source Data Transformation Data Sink 窗口模型 状态管理与检查点机制 Standalone 集群部署 六、HBase 简介 系统架构及数据结构 基本环境搭建 集群环境搭建 常用 Shell 命令 Java API 过滤器详解 可显示字数...
flume agent收集数据 一个源对于两个sink 同时输出到hdfs和kafka 的配置文件,注意其中的 source绑定channel时候 channel1 channel2 不能分开写,该配置文件已经过集群实验成功收集到数据的
其实flume的使⽤就是编写配置⽂件,下⾯是使⽤flume将Nginx的⽇志对接kafka的配置⽂件,我们将该收集任务命名为 exec-memory-kafka,只需如下编写: #配置source、sink、channel exec-memory-kafka.sources=exec-...