`
bit1129
  • 浏览: 1051484 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark九十】Spark定义计算逻辑函数最佳实践

阅读更多

这里所谓的Spark定义的计算逻辑函数指的是在Spark中,任务执行的计算逻辑都是定义在Driver Program的函数中的,由于Scala定义函数的多样性,因此有必要总结下各种情况下的函数定义,对Spark将函数序列化到计算节点(Worker)的影响

 

Spark建议的三种做法+一种不推荐的做法

1.定义内部函数常量

package spark.examples.rddapi

import org.apache.spark.{SparkContext, SparkConf}

object ReduceTest_20 {

  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("CoGroupTest_05")
    val sc = new SparkContext(conf);
    val z1 = sc.parallelize(List((3, "A"), (6, "B1"), (7, "Z1"), (9, "E"), (7, "F"), (9, "Y"), (77, "Z"), (31, "X")), 3)

    /**
     * Reduces the elements of this RDD using the specified commutative and
     * associative binary operator.
     */
    //r是结果不是集合,直接不是RDD
    def func(k1: (Int, String), k2: (Int, String)) = {
      (k1._1 + k2._1, k1._2 + k2._2)
    }

    //对RDD的元素类型不要求,不需要是KV类型
    val r = z1.reduce(func)
    println(r) //结果:(149,AB1Z1EFYZX),对二元组的第一个元素和第二个元素分别做累加操作
  }

}

 在上面这个例子定义了一个函数func,并且将它放到了main函数中作为一个局部变量,其实也可以把func定义为和main平级(此时func是个全局函数),这种全局函数的定义跟下面第三种定义函数的方式道理一样。

 

2. 定义函数字面量直接传递到RDD定义的高阶函数中、

package spark.examples.rddapi

import org.apache.spark.{SparkContext, SparkConf}

object ReduceTest_21 {

  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("CoGroupTest_05")
    val sc = new SparkContext(conf);
    val z1 = sc.parallelize(List((3, "A"), (6, "B1"), (7, "Z1"), (9, "E"), (7, "F"), (9, "Y"), (77, "Z"), (31, "X")), 3)

    /**
     * Reduces the elements of this RDD using the specified commutative and
     * associative binary operator.
     */
    //r是结果不是集合,直接不是RDD
    //对RDD的元素类型不要求,不需要是KV类型
    val r = z1.reduce((k1: (Int, String), k2: (Int, String)) =>(k1._1 + k2._1, k1._2 + k2._2))
    println(r) //结果:(149,AB1Z1EFYZX)
  }

}

 

3. 将函数计算逻辑作为全局函数定义到Scala object中

 

Scala object函数定义:

package spark.examples.rddapi

object ReduceTestFunctions {
  def compute(k1: (Int, String), k2: (Int, String)) = {
    (k1._1 + k2._1, k1._2 + k2._2)
  }
}

  

 

Spark程序中引用Scala object函数定义

package spark.examples.rddapi

import org.apache.spark.{SparkContext, SparkConf}

object ReduceTestFunctions_20 {

  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("CoGroupTest_05")
    val sc = new SparkContext(conf);
    val z1 = sc.parallelize(List((3, "A"), (6, "B1"), (7, "Z1"), (9, "E"), (7, "F"), (9, "Y"), (77, "Z"), (31, "X")), 3)

    /**
     * Reduces the elements of this RDD using the specified commutative and
     * associative binary operator.
     */
    //r是结果不是集合,直接不是RDD
    //对RDD的元素类型不要求,不需要是KV类型
    val r = z1.reduce(ReduceTestFunctions.compute(_, _))
    println(r) //结果:(149,AB1Z1EFYZX)
  }

}

 

说明:

通过在Scala object中定义函数,因为Scala object是单例的,那么在序列化时就不需要序列化这个object,仅仅把function序列化到Worker节点即可

 

4.在普通Scala类中定义函数(不推荐)

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

  • 大小: 42.3 KB
分享到:
评论

相关推荐

    实验七:Spark初级编程实践

    使用命令./bin/spark-shell启动spark 图2启动spark 2. Spark读取文件系统的数据 (1) 在spark-shell中读取Linux系统本地文件“/home/hadoop/test.txt”,然后统计出文件的行数; 图3 spark统计行数 (2) 在spark-...

    开发者最佳实践日-Spark-Ecosystem

    开发者最佳实践日-Spark-Ecosystem

    Spark:最佳实践

    Spark:最佳实践

    sparkRDD函数大全

    spark rdd函数大全。spark rdd操作为core操作,虽然后续版本主要以dataset来操作,但是rdd操作也是不可忽略的一部分。

    All in Spark 实践

    Spark 应用实践分享

    Spark SQL最佳实践.pdf

    spark 优化最佳实践,推荐下载,好使请点赞

    Spark SQL最佳实践

    Spark SQL最佳实践

    基于Spark Streaming的大数据实时流计算平台和框架,并且是基于运行在yarn模式运行的spark streaming

    一个完善的Spark Streaming二次封装开源框架,包含:实时流任务调度、...基于Spark Streaming的大数据实时流计算平台和框架(包括:调度平台,开发框架,开发demo),并且是基于运行在yarn模式运行的spark streaming

    spark与sparkStreaming经典视频教程

    spark与sparkStreaming经典视频教程,学习spark和sparkStreaming非常经典得课程,提供代码,环境,本人之前购买得教程,分享给大家,同时欢迎大家和我一起交流

    Spark分布式内存计算框架视频教程

    3.SparkStreaming计算思路 4.入门案例 5.SparkStreaming工作原理 6.DStream及函数 7.集成Kafka 8.案例:百度搜索风云榜(实时ELT、窗口Window和状态State) 9.SparkStreaming Checkpoint 10.消费Kafka偏移量管理 第...

    《Spark编程基础及项目实践》试卷及答案2套.pdf

    《Spark编程基础及项目实践》试卷及答案2套.pdf《Spark编程基础及项目实践》试卷及答案2套.pdf《Spark编程基础及项目实践》试卷及答案2套.pdf《Spark编程基础及项目实践》试卷及答案2套.pdf《Spark编程基础及项目...

    《Spark编程基础及项目实践》课后习题及答案7.pdf

    《Spark编程基础及项目实践》课后习题及答案7.pdf《Spark编程基础及项目实践》课后习题及答案7.pdf《Spark编程基础及项目实践》课后习题及答案7.pdf《Spark编程基础及项目实践》课后习题及答案7.pdf《Spark编程基础...

    Spark和TiDB (Spark on TiDB)

    SparkTI (Spark on TiDB)是TiDB基于Apache Spark的独立于原生系统的计算引擎。它将Spark和TiDB深度集成,在原有MySQL Workload之外借助Spark支持了更多样的用户场景和API。这个项目在SparkSQL和Catalyst引擎之外实现...

    Spark项目实战视频

    Spark项目实战视频,内容涵盖scala,kafka,sparkML,mongodb等相关内容

    基于Spark的PSO并行计算

    项目名称:基于Spark的PSO并行计算 编程语言:scala 项目内容:将粒子群算法pso实现的了并行,并成功集成了bencmark的测试函数,可以利用该标准的测试函数,来验证算法的性能. 测试结果:在benchmark的20个测试函数当中有9...

    基于Spark的实践.pptx

    云计算Spark实践参考,包含详细代码和操作步骤: 理解Spark原理 开发Spark程序:开发环境、程序提交、运行模式 内核讲解:RDD 工作机制:任务调度、资源分配

    spark streaming实时网站分析项目实战.rar

    操作步骤: 一.数据采集:视频网站访问日志(编辑python...1.数据库访问dao层方法定义 2.hbase操作工具类开发 3.将spark streaming的处理结果写到hbase中 4.映射到hive数据仓库中 四.数据可视化:见数据可视化项目

    Spark 入门实战系列

    Spark 入门实战系列,适合初学者,文档包括十部分内容,质量很好,为了感谢文档作者,也为了帮助更多的人入门,传播作者的心血,特此友情转贴: 1.Spark及其生态圈简介.pdf 2.Spark编译与部署(上)--基础环境搭建....

    06Spark Streaming原理和实践

    06Spark Streaming原理和实践

    spring boot + scala + spark http驱动spark计算

    原始用的jetty做的http接口,最近有时间,研究了下spring boot + scala + spark做大数据计算

Global site tag (gtag.js) - Google Analytics