Flume-ng 聚集和移动大量日志数据的分布式的,有效的服务。这里我们解释怎样配置 Flume -ng 和 Spa rk Streaming 来从 Flume 获取数据。这里 介绍第二种方法 。
1. 使用自定义 Sink 的拉方式
不是 Flume 直接推送数据到 SparkStreaming ,这种方法运行了一个如下所示的 Flume Sink 。
1. Flume 将数据推送到 Sink 中,然后数据在此处缓存。
2. Spark Streaming 使用一个可靠的 Flume 接收器和操作从 Sink 中拉取数据。只有在 Spark Streaming 接收到数据并且把数据复制后才认为操作成功。
这个方法比前面的方法提供了强大的可靠性和容错保证。然而,这需要配置 Flume 运行一个自定义 Sink 。下面是配置步骤。
1.1 一般需求
选择一台在 Flume 代理中运行自定义 Sink 的机器。 Flume 其余的管道被配置为向那个代理发送数据。 Spark 集群中的机器都能连接到运行自定义 Sink 的那台机器上。
1.2 配置 Flume
在选定的机器上配置 Flume 需要如下的两步。
A . 添加如下的 JAR 包到要运行自定义 Sink 的机器中的 Flume 的 classpath 中 (这里我把如下 jar 放在 /usr/lib/flume-ng/lib/ 目录下)
spark-streaming-flume-sink_2.10-1.3.0-cdh5.4.3.jar
commons-lang3 -3.3.2.jar
B. 配置文件:在那台机器上,通过下面的配置文件配置 Flume 代理发送数据到一个 Avro Sink 中。
agent.sinks = spark
agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
agent.sinks.spark.hostname = <hostname of the local machine>
agent.sinks.spark.port = 41444
agent.sinks.spark.channel = memoryChannel
2. 配置 Spark Streaming 程序
2.1 编程:在流处理程序的代码中,引入 FlumeUtils 并如下创建一个输入 DStream 流。 这里给出 Spark Java 程序例子
import java.util.Arrays ;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction ;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.examples.streaming.StreamingExamples;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
import org.apache.spark.storage.StorageLevel;
public class SparkStreamingFlume2 {
public SparkStreamingFlume2() {
}
public static void main(String[] args) {
if (args. length != 2) {
System. err .println( "Usage: JavaFlumeEventCount1 <host> <port>" );
System. exit (1);
}
StreamingExamples. setStreamingLogLevels ();
String host = args[0];
int port = Integer. parseInt (args[1]);
Duration batchInterval = new Duration(2000);
SparkConf sparkConf = new SparkConf().setAppName( "JavaFlumeEventCount" );
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
StorageLevel storagelevel = StorageLevel. MEMORY_ONLY ();
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils. createPollingStream (ssc, host, port,storagelevel);
flumeStream.count();
flumeStream.count().map( new Function<Long, String>() {
@Override
public String call(Long in) {
return "Received " + in + " flume events." ;
}
}).print();
ssc.start();
ssc.awaitTermination();
}
2.2 启动 Flume 这里主要需要添加 spark-streaming-flume-sink_2.10-1.3.0-cdh5.4.3.jar 和 commons-lang3-3.3.2.jar 到 $FLUME_HOME/lib 目录下
F lume-ng agent – c /etc/flume-ng/conf – f /etc/flume-ng/conf/flume.conf – Dflume.root.logger=DEBUG,console – n agent02
2.3 提交 Spark ,这里需要注意的添加必要的 jar 包,可以在提交的时候加上 --jars 来指定相关的 jar 包,也可以在 sc 中调用 addJar() 添加
spark-submit --master spark://udh-spark-test-04:7077 --class SparkStreamingFlume2 --jars /root/spark-streaming-flume-sink_2.10-1.3.0-cdh5.4.3.jar,/usr/lib/flume-ng/lib/flume-ng-sdk-1.5.0-cdh5.4.3.jar,/usr/lib/spark/lib/spark-examples-1.3.0-cdh5.4.3-hadoop2.6.0-cdh5.4.3.jar,/usr/lib/spark/lib/spark-assembly-1.3.0-cdh5.4.3-hadoop2.6.0-cdh5.4.3.jar /root/flume-test02.jar udh-spark-test-03 41444
http://udn.yyuap.com/doc/ae/1511892.html
相关推荐
spark-streaming-flume-sink_2.11-2.0.0.jar的jar包。
使用spark集成flume,由于flume默认只支持pull消息的方式,不过它可以自定义消息拉取方式,现要使用poll方式,可以使用spark-streaming-flume-sink_2.11-2.1.0.jar包下的org.apache.spark.streaming.flume.sink....
spark-streaming-flume_2.11-2.1.0.jar
flume与spark streaming结合(pull方式)报错:org.apache.flume.FlumeException: Unable to load sink type: org.apache.spark.streaming.flume.sink.SparkSink, class: org.apache.spark.streaming.flume.sink....
flume pull 方式需要的jar包,spark-streaming-flume-sink_2.11_2.1.1.jar
将该jar包上传至flume/lib目录下,并将spark-streaming-flume其他版本jar包删除即可使用,该jar包适用于spark2.1.3版本使用
spark-streaming-kafka-0-8_2.11-2.4.0.jar
1.Spark Streaming整合Flume需要的安装包. 2. Spark Streaming拉取Flume数据的flume配置文件.conf 3. Flume向Spark Streaming推数据的flume配置文件.conf
sparkstreming结合flume需要的jar包,scala是2.11版本,spark是1.6.2版本。也有其他版本的,需要的留言找我要
spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar
spark-streaming-kafka-0-10_2.11-2.4.0-cdh6.1.1.jar
这个压缩包中包含的是Flume-1.6.0对接Spark-1.6.2下的Spark Streaming时需要用到的jar包,将这些jar包放置到flume安装目录下对应的lib中
spark-streaming的flume依赖
本科毕业设计项目,基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 本科毕业设计项目,基于spark streaming+flume+kafka+hbase的...
spark streaming 链接kafka必用包,欢迎大家下载与使用
spark3.0.0版本对接kafka数据源需要的jar包,最新的版本导致maven的阿里云仓库不能直接下载下来,所以需要手动导入jar包进行操作,有需要的朋友可以免费下载
spakr streaming的kafka依赖
使用pyspark的stream操作kafka时,需要用到的jar包
spark-streaming_2.11-2.4.0-cdh6.1.1.jar
spark streaming 链接kafka的必用包,国内下载很慢,特意分享出来,欢迎大家下载,速度很快哦