在Spark> = 1.6中,可以使用按列分区查询和缓存。参见:SPARK-11410和SPARK-4849使用重分区方法:
val df = sc.parallelize(Seq(("A",1),("B",2),("A",3),("C",1))).toDF("k","v")
val partitioned = df.repartition($"k")
partitioned.explain
// scala> df.repartition($"k").explain(true)// == Parsed Logical Plan ==// 'RepartitionByExpression ['k], None// +- Project [_1#5 AS k#7,_2#6 AS v#8]// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27// // == Analyzed Logical Plan ==// k: string, v: int// RepartitionByExpression [k#7], None// +- Project [_1#5 AS k#7,_2#6 AS v#8]// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27// // == Optimized Logical Plan ==// RepartitionByExpression [k#7], None// +- Project [_1#5 AS k#7,_2#6 AS v#8]// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27// // == Physical Plan ==// TungstenExchange hashpartitioning(k#7,200), None// +- Project [_1#5 AS k#7,_2#6 AS v#8]// +- Scan PhysicalRDD[_1#5,_2#6]
与RDDs不同,Spark Dataset(包括Dataset [Row] a.k.a DataFrame)现在不能使用自定义分区器。你通常可以通过创建一个人工分区列来解决这个问题,但它不会给你相同的灵活性。
Spark< 1.6.0: 您可以做的一件事是在创建DataFrame之前预分区输入数据
import org.apache.spark.sql.types._
import org.apache.spark.sql.Rowimport org.apache.spark.HashPartitioner
val schema =StructType(Seq(StructField("x",StringType,false),StructField("y",LongType,false),StructField("z",DoubleType,false)))
val rdd = sc.parallelize(Seq(Row("foo",1L,0.5),Row("bar",0L,0.0),Row("??",-1L,2.0),Row("foo",-1L,0.0),Row("??",3L,0.6),Row("bar",-3L,0.99)))
val partitioner =newHashPartitioner(5)
val partitioned = rdd.map(r =>(r.getString(0), r)).partitionBy(partitioner).values
val df = sqlContext.createDataFrame(partitioned, schema)
由于从RDD创建DataFrame只需要一个简单的映射阶段现有的分区布局应该保留*:
assert(df.rdd.partitions == partitioned.partitions)
以同样的方式,您可以重新分区现有的DataFrame:
sqlContext.createDataFrame(
df.rdd.map(r =>(r.getInt(1), r)).partitionBy(partitioner).values,
df.schema
)
所以看起来这不是不可能的。问题仍然存在,如果它是有意义的。我会说,大多数时候它不:
>重新分区是一个昂贵的过程。在典型情况下,大多数数据必须序列化,混排和反序列化。另一方面,可以从预分割数据中受益的操作数量相对较少,并且如果内部API未被设计为利用该属性,则进一步受限。
>在某些情况下联接,但它需要内部支持,
>窗口函数调用与匹配分区器。同上,限于一个窗口定义。它已经在内部进行了分区,因此预分区可能是多余的,
>使用GROUP BY的简单聚合 – 可以减少临时缓冲区**的内存占用,但总体成本要高得多。或多或少相当于groupByKey.mapValues(_。reduce)(当前行为)vs reduceByKey(预分区)。不太可能在实践中有用。
>使用SqlContext.cacheTable进行数据压缩。由于它看起来像是使用运行长度编码,应用OrderedRDDFunctions.repartitionAndSortWithinPartitions可以提高压缩率。
>性能高度依赖于密钥的分布。如果它是倾斜的,它将导致次优资源利用。在最坏的情况下,根本不可能完成这项工作。
>使用高级声明性API的一个重点是将自己与低级实现细节隔离开来。正如@dwysakowicz和@RomiKuntsman已经提到的,优化是Catalyst Optimizer的工作。它是一个非常复杂的野兽,我真的怀疑你可以轻松地改进,没有深入到它的内部。
使用JDBC源分区:
JDBC数据源支持predicates
argument.它可以如下使用:
sqlContext.read.jdbc(url, table,Array("foo = 1","foo = 3"), props)
它为每个谓词创建一个JDBC分区。请记住,如果使用单个谓词创建的集合不是不相交的,则会在结果表中看到重复的集合。
DataFrameWriter中的partitionBy方法:
Spark DataFrameWriter提供了partitionBy方法,可用于在写入时“分区”数据。它使用提供的列集分隔写入数据
val df =Seq(("foo",1.0),("bar",2.0),("foo",1.5),("bar",2.6)).toDF("k","v")
df.write.partitionBy("k").json("/tmp/foo.json")
这使得基于键的查询读取上的谓词下推:
val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k"==="bar")
但它不等同于DataFrame.repartition。特别是聚合:
val cnts = df1.groupBy($"k").sum()
仍将需要TunnstenExchange:
cnts.explain
// == Physical Plan ==// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])// +- TungstenExchange hashpartitioning(k#90,200), None// +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])// +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json
*分区布局我的意思是只有一个数据分布。分区RDD不再是分区器。
**假设没有早期预测。如果聚合仅覆盖列的小子集,则可能没有任何增益。
相关推荐
Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。 Spark SQL的默认数据源为Parquet...
熟悉Spark的分区对于Spark性能调优很重要,本文总结Spark通过各种函数创建RDD、DataFrame时默认的分区数,其中主要和sc.defaultParallelism、sc.defaultMinPartitions以及HDFS文件的Block数量有关,还有很坑的某些...
Apache Spark是一个快速、通用、可扩展的大数据分析平台。...13. 描述Spark的分区策略。 14. Spark的持久化(Persistence)或缓存(Caching)有哪些级别? 15. 解释Spark的任务调度。 16. Spark Streaming
- RDD是Spark的基本抽象,代表一个不可变、可分区、可并行计算的数据集。RDD可以在集群上进行分布式计算。 2. **数据流处理:** - Spark 提供了弹性分布式数据流(DStream)用于实时数据处理。它是以微批处理的...
对于每个分区,数据通过“列族”在物理上进一步分区,“列族”指定了数据“列”的集合。 数据模型用于宽而稀疏的表,其中列是动态的,并且很可能稀疏。 尽管HBase是一个非常有用的大数据存储,但是它的访问机制...
该连接器支持, 以及按正交维度和分区。 示例Scala代码: import org . apache . spark . sql . DataFrameval target = " localhost:9080 "import uk . co . gresearch . spark . dgraph . graphx . _val graph : ...
与Redis群集一起使用时,Spark-Redis会意识到其分区方案,并会根据重新分片和节点故障事件进行调整。 Spark-Redis还支持Spark流(DStream)和结构化流。版本兼容性和分支该库具有多个分支,每个分支对应于一个受...
如果您经常需要对大型空间3D数据集进行分区或执行空间查询(邻居搜索,窗口查询,交叉匹配,聚类...),spark3D非常适合您。 它包含优化的类和方法,从而节省了实现时间! 此外,所有这些扩展的一大优势是可以通过...
java实现数据同步源码 BigData-In-Practice 大数据项目仓库、涉及 Hadoop、Spark、Kafka、Hbase..... ...样例,关于HiveContext、SQLContext、SparkSession、RDD、DataFrame、Dataset的使用 Zookeeper
将数据从分区发送到与 Executor 关联的 Python 进程,以及 等待 Python 进程反序列化数据,在其上运行 UDF,重新序列化数据,然后将其发回。 相比之下,一个 Hive UDF,无论是用 Scala 还是 Java 编写的,都可以在 ...
Dataframe的API repartition() VS coalesce() - repartition()确实在存储器中的新鲜重新分区,它可以增加或减少分区数由主叫参数所指示的。 另一方面, coalesce()避免了改组,并将分区数减少到调用参数所指示的数 ...
HDFS的HA共识检测和处理损坏的块的过程实木复合地板和立柱式货架备用名称节点vs备用名称节点Hadoop生态系统구성요소별分区의의미 Hadoop生态系统中的“分区”是什么? 什么是MapReduce溢出? vm.swappiness vm....