1,作为Producer的Flume端配置,其中是以netcat为source数据源,sink是kafka
#agent section producer.sources = s producer.channels = c producer.sinks = r #source section #producer.sources.s.type = seq producer.sources.s.type = netcat producer.sources.s.bind = localhost producer.sources.s.port = 44444 producer.sources.s.channels = c # Each sink's type must be defined producer.sinks.r.type = org.apache.flume.plugins.KafkaSink producer.sinks.r.metadata.broker.list=127.0.0.1:9092 producer.sinks.r.partition.key=0 producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition producer.sinks.r.serializer.class=kafka.serializer.StringEncoder producer.sinks.r.request.required.acks=0 producer.sinks.r.max.message.size=1000000 producer.sinks.r.producer.type=sync producer.sinks.r.custom.encoding=UTF-8 producer.sinks.r.custom.topic.name=test #Specify the channel the sink should use producer.sinks.r.channel = c # Each channel's type is defined. producer.channels.c.type = memory producer.channels.c.capacity = 1000
2,配置consumer,source是Kafka,sink是logger
consumer.sources = s consumer.channels = c consumer.sinks = r consumer.sources.s.type = seq consumer.sources.s.channels = c consumer.sinks.r.type = logger consumer.sinks.r.channel = c consumer.channels.c.type = memory consumer.channels.c.capacity = 100 consumer.sources.s.type = org.apache.flume.plugins.KafkaSource consumer.sources.s.zookeeper.connect=127.0.0.1:2181 consumer.sources.s.group.id=testGroup consumer.sources.s.zookeeper.session.timeout.ms=400 consumer.sources.s.zookeeper.sync.time.ms=200 consumer.sources.s.auto.commit.interval.ms=1000 consumer.sources.s.custom.topic.name=test consumer.sources.s.custom.thread.per.consumer=4
3,分别运行着两个agent
bin/flume-ng agent --conf conf --conf-file conf/producer1.properties --name producer -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf --conf-file conf/comsumer1.properties --name consumer -Dflume.root.logger=INFO,console
4,这时telnet上端口44444
hadoop@stormspark:~/bigdata/apache-flume-1.4.0-bin$ telnet localhost 44444 Trying ::1... Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. 1111111111111111 OK kak^Hfkakakkakakakkakkakkaakaknnnm OK abcdefghijklmnopqrstuvwxyz OK
两个agent都有信息输出了
org.apache.flume.plugins的代码参考 :https://github.com/baniuyao/flume-kafka上面也有详细的使用方法
相关推荐
Flume+kafka+Storm整合 示例简介: 以下为三个组建整合,这里只做操作也演示结果,原理性方面大家多学习基础。 流程顺序是flume获取telnet数据,将接收到的数据发送至kafak,kafka作为Storm的spout,Storm进行有向无...
flumeng-kafka-plugin 技术指标水槽1.4 Kafka 0.8.0 Beta
flume与kafka整合高可靠教程
如果你的flume中配置了向kafka中发送数据的sink,需要将这些jar包放到flume的lib目录下
整合flume、kafka以此接收消息时的执行步骤
Flume配置文件kafkaSource 包含Intercepter,包含正则表达式。
Flume配置文件kafkaSource Interceptor,包含获取数据中的关键词时间日期等信息
flume+kafka+flink+mysql实现nginx数据统计与分析
自己研究大数据多年,写的一个日志数据采集方案笔记,可快速熟悉Flume,Kafka,Hdfs的操作使用,以及相互的操作接口。
flume整合kafka的jar包,将其放入到flume的lib目录下即可。
lnmp(linux+nginx+mysql+php)安装配置及分布式系统大数据处理hadoop集群中的flume+Kafka+Storm+HDFS等实时系统搭分享
这是一本立足于企业真实的商用项目来讲解如何高效从事大数据实践的著作。技术层面,从全栈的角度系统梳理和详尽讲解了大数据的核心技术,包括Spark、Druid、Flume、Kafka等,让我们在纷繁复杂的技术中少走弯路......
Flume-NG-Kafka-Sink 这是一个 Sink 实现,可以将数据发布到主题。 目标是将 Flume 与 Kafka 集成,以便基于拉式的处理系统(如可以处理来自各种 Flume 源(如 Syslog)的数据。 这现在是官方 Flume 发行版(从 v...
Kafka+FlumeNG+Storm+HBase实时处理系统介绍
flume skin 直连kafka,kafka安装过程和flume配置样例,下载下来即可安装一个生产可用的日志采集系统
Flume+Kafka整合环境(监控目录+监控文件) 实验作业,仅供参考
storm与kafka整合jar包。storm与kafka整合jar包storm与kafka整合jar包storm与kafka整合jar包
Kafka+FlumeNG+Storm+HBase构架设计
Flume整合Kafka,应用实践
文档详细的,手把手教你配置流处理框架的前端,kafka,flume,等