`

将Kafka作为Flume的Source

 
阅读更多

1. 配置flume

kafka-source.properties

agent1.sources = source1  
agent1.channels = channel1  
agent1.sinks = sink1  
  
agent1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.source1.batchSize = 5000
agent1.sources.source1.batchDurationMillis = 2000
agent1.sources.source1.kafka.bootstrap.servers = centos1:9092
agent1.sources.source1.kafka.topics = mytopic
agent1.sources.source1.kafka.consumer.group.id = group1
  
agent1.sinks.sink1.type=logger  
  
agent1.channels.channel1.type=memory  
agent1.channels.channel1.capacity=1000  
agent1.channels.channel1.transactionCapacity=100  
  
agent1.sources.source1.channels=channel1  
agent1.sinks.sink1.channel=channel1  

 

2. 启动flume

flume-ng agent -n agent1 -c conf -f conf/kafka-source.properties -Dflume.root.logger=INFO,console

 

3.启动Kafka producer

kafka-console-producer.sh --broker-list centos1:9092 --topic mytopic

 

 

 

 参考

 http://flume.apache.org/FlumeUserGuide.html

分享到:
评论

相关推荐

    Flume 抽取MYSQL Oracle数据 JSON格式 推送Kafka

    Flume二次开发,支持抽取MYSQL Oracle数据库数据 以JSON格式推送至Kafka。 demo: sql_json.sources.sql_source.type = com.hbn.rdb.source.SQLSource sql_json.sources.sql_source.connectionurl = jdbc:oracle:...

    Flume配置文件kafkaSource

    Flume配置文件kafkaSource 包含Intercepter,包含正则表达式。

    kafka+flume+kafka中问题.pdf

    如果在一个Flume Agent中同时使用Kafka Source和Kafka Sink来处理events,便会遇到Kafka Topic覆盖问题,具体 表现为,Kafka Source可以正常从指定的Topic中读取数据,但在Kafka Sink中配置的目标Topic不起作用,...

    Flume配置文件kafkaSource Interceptor

    Flume配置文件kafkaSource Interceptor,包含获取数据中的关键词时间日期等信息

    flume-ng-kafka-source

    注意flume-ng-kafka-source 已合并到即将发布的flume 1.6 中。 这个 repo不会在维护中。 该项目用于 与进行通信。 Kafka源码配置 agent_log.sources.kafka0.type = com.vipshop.flume.source.kafka.KafkaSource ...

    flume-ng-sql-source

    flume-ng从数据库抽取数据到kafka,支持按数据库中时间字段,准实时抽取实时数据。已经在oracle-kafka中长期测试可用

    FlumeKafkaSink:Flume-ng Sink 插件生成到 Kafka

    FlumeKafkaSink 这个项目是汇插件生产,以 。...# Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = 192.168.2.102 a1.sources.r1.port = 44444 a1.channels.c1.type =

    快速学习-Flume 对接 Kafka

    第 6 章 Flume 对接 Kafka 1)配置 flume(flume-kafka.conf) # define a1.sources = r1 a1.sinks = k1 a1.channels = c1 # source a1.sources.r1.type = exec a1.sources.r1.command = tail -F -c +0 /opt/module/...

    flume-ng-sql-source-release-1.5.2.zip

    flume-ng-sql-source-release-1.5.2.jar 用flume-ng-sql-source 从数据库抽取数据到kafka,支持sql

    基于Kafka的多台远程服务器上的网页文件接入到本地的技术方案以及Flume+Kafka调研

    远程服务器编写程序, 不停读取文件的内容成为一个字符串,然后再加上来自的网站、下载日期等信息,组合成一个JSON字符串,通过调用kafka的生产者API将字符串写入Kafka。 2、JSON数据格式设计: { “source”: ...

    学习kafa的笔记,可以看看目录选择下载

    1. 什么是kafka? 2. 消息队列介绍 3. 为什么使用消息队列? 4. kafka的特点 5. kafka的使用场景 6. kafka系统的架构...19. kafka和flume的整合—kafkasource—kafkasink 20. kafka原理加强—日志分段条件 .....

    flume-ng-sql-source-1.4.4.jar

    flume实现oracle增量数据读取github最新版1.4.4,Flume是最初只是一个日志收集器,但随着flume-ng-sql-source插件的出现,使得Flume从关系数据库采集数据成为可能

    flume-mysql.zip

    里面由三部分组成 1、flume连接mysql的jar,可用于同步至kafka,数据格式:json格式 2、mysql的jar,flume需要这个 3、1.9版本的flume的tar包

    kafka-consumer-channel:允许将Flume用作高速通用Kafka消费者

    Flume的“ Kafka消费者”渠道这是Flume的非常高吞吐量的通道,可将Flume用作高速可靠的Kafka用户。 它走多快? 当通过10GigE连接从1个经纪人Kafka群集消耗1000个字节的事件时,它的时钟速率约为360 MB / s,并附有...

    Flink+Kafka的JavaAPI应用

    此案例使用的是IDEA开发工具,项目...使用JavaAPI操作Flink的流处理,Flink从Kafka中获取数据,执行处理后再执行输出。 根据(《Flink入门与实战》徐葳著)教材最后的综合案例改变,适合没有学习不会使用Flume的人使用

    Flume用法

    Source:从哪收集,一般使用:avro(序列化),exec(命令行),spooling(目录),taildir(目录和文件,包含offset,不会数据丢失),kafka Channel:数据存哪里:(memory,kafka,file) Sink:数据输出到哪里:(hdfs,logger,...

    kafka-effective:更有效地使用apache kafka消费者和生产者

    后来读了flume中有关与kafka集成的代码(org.apache.flume.source.kafka.KafkaSource),觉得其中使用kafka Consumer的方式比较合理,可以实现消息的批处理,所以就参考并封装了一下,变得更加通用,形成了这个项目。...

    Hadoop+Hive+Spark+Kafka+Zookeeper+Flume+Sqoop+Azkaban+Scala

    Data Source Data Transformation Data Sink 窗口模型 状态管理与检查点机制 Standalone 集群部署 六、HBase 简介 系统架构及数据结构 基本环境搭建 集群环境搭建 常用 Shell 命令 Java API 过滤器详解 可显示字数...

    flume 真实案例配置文件

    flume agent收集数据 一个源对于两个sink 同时输出到hdfs和kafka 的配置文件,注意其中的 source绑定channel时候 channel1 channel2 不能分开写,该配置文件已经过集群实验成功收集到数据的

    大数据流处理框架介绍.pdf

    其实flume的使⽤就是编写配置⽂件,下⾯是使⽤flume将Nginx的⽇志对接kafka的配置⽂件,我们将该收集任务命名为 exec-memory-kafka,只需如下编写: #配置source、sink、channel exec-memory-kafka.sources=exec-...

Global site tag (gtag.js) - Google Analytics