1.关于Spark SQL操作Parquet
因为Parquet文件中包含了Schema信息,也就是说,Parquet文件是Schema自解释的,因此Spark SQL操作Parquet时,不需要指定Schema,因为Spark SQL可以根据Parquet文件中的Schema信息,解析出Parquet文件对应的SQL Schema
本文中的idAndName.parquet内容如下:从中可以看出hive_schema包含id,name两列
2.关于idAndName.parquet文件
idAndName.parquet来源于文章http://bit1129.iteye.com/blog/2202396,即这个文件是使用Hive写到HDFS中的文件中,文件内容如下:
3.Spark SQL处理Parquet文件的源代码
从源代码中可以看出,Spark SQL 1.3既可以使用直接load的方式加载得到DataFrame,也可以使用1.3以前注册临时Table的方式进行加载
package spark.examples.sql import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkContext, SparkConf} object SparkSQLParquet { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("SparkSQLParquet").setMaster("local") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) val dir = "E:/open-sources/Hello2/src/spark/examples/sql" val df = sqlContext.load(dir + "/idAndName.parquet") //Spark SQL能够解析出来,给定的parquet文件是有id和name两列构成 df.select("id", "name").collect().foreach(row => println(row(0) + "," + row(1))) df.select("name").save(dir + "/name." + System.currentTimeMillis() + ".parquet") // Read in the parquet file.Parquet files are self-describing so the schema is preserved. // The result of loading a Parquet file is also a DataFrame. //parquet文件包含了Schema信息,因此parquet文件是Schema自解释的 val parquetFile = sqlContext.parquetFile(dir + "/idAndName.parquet") //Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerTempTable("parquetFile") val teenagers = sqlContext.sql("SELECT * FROM parquetFile WHERE id > 10") teenagers.map(t => "id: " + t(0) + ", " + "name:" + t(1)).collect().foreach(println) } }
4. 关于Spark SQL写Parquet文件
如下是name.parquet的文件,从内容中可以看到Parquet中只有一列name
5. 运行以上代码的日志信息
C:\jdk1.7.0_51\bin\java -Didea.launcher.port=7533 "-Didea.launcher.bin.path=E:\softwareInstalled\JetBrains\IntelliJ IDEA 13.1.3\bin" -Dfile.encoding=UTF-8 -classpath "C:\jdk1.7.0_51\jre\lib\charsets.jar;C:\jdk1.7.0_51\jre\lib\deploy.jar;C:\jdk1.7.0_51\jre\lib\javaws.jar;C:\jdk1.7.0_51\jre\lib\jce.jar;C:\jdk1.7.0_51\jre\lib\jfr.jar;C:\jdk1.7.0_51\jre\lib\jfxrt.jar;C:\jdk1.7.0_51\jre\lib\jsse.jar;C:\jdk1.7.0_51\jre\lib\management-agent.jar;C:\jdk1.7.0_51\jre\lib\plugin.jar;C:\jdk1.7.0_51\jre\lib\resources.jar;C:\jdk1.7.0_51\jre\lib\rt.jar;C:\jdk1.7.0_51\jre\lib\ext\access-bridge-32.jar;C:\jdk1.7.0_51\jre\lib\ext\dnsns.jar;C:\jdk1.7.0_51\jre\lib\ext\jaccess.jar;C:\jdk1.7.0_51\jre\lib\ext\localedata.jar;C:\jdk1.7.0_51\jre\lib\ext\sunec.jar;C:\jdk1.7.0_51\jre\lib\ext\sunjce_provider.jar;C:\jdk1.7.0_51\jre\lib\ext\sunmscapi.jar;C:\jdk1.7.0_51\jre\lib\ext\sunpkcs11.jar;C:\jdk1.7.0_51\jre\lib\ext\zipfs.jar;E:\open-sources\Hello2\out\production\SparkAndScalaExamples;E:\devsoftware\scala-2.10.4\lib\scala-library.jar;E:\devsoftware\scala-2.10.4\lib\scala-swing.jar;E:\devsoftware\scala-2.10.4\lib\scala-actors.jar;E:\open-sources\spark-1.3.0-bin-hadoop2.4\spark-1.3.0-bin-hadoop2.4\lib\spark-assembly-1.3.0-hadoop2.4.0.jar;E:\devsoftware\spark-1.2.0-bin-hadoop2.4\spark-1.2.0-bin-hadoop2.4\dependencies\spark-streaming-flume_2.11-1.2.0.jar;E:\devsoftware\apache-flume-1.5.2-bin(1)\apache-flume-1.5.2-bin\lib\flume-ng-sdk-1.5.2.jar;E:\devsoftware\apache-flume-1.5.2-bin(1)\apache-flume-1.5.2-bin\lib\flume-ng-core-1.5.2.jar;C:\Users\hadoop\Desktop\mysql-connector-java-5.1.34.jar;C:\Users\hadoop\Desktop\mongo-spark-master\mongo-spark-master\lib\mongo-hadoop-core_2.2.0-1.2.0.jar;E:\devsoftware\mongo-java-driver-2.9.3.jar;E:\devsoftware\spark-1.2.0-bin-hadoop2.4\spark-1.2.0-bin-hadoop2.4\lib\spark-examples-1.2.0-hadoop2.4.0.jar;E:\softwareInstalled\JetBrains\IntelliJ IDEA 13.1.3\lib\idea_rt.jar" com.intellij.rt.execution.application.AppMain spark.examples.sql.SparkSQLParquet Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/E:/open-sources/spark-1.3.0-bin-hadoop2.4/spark-1.3.0-bin-hadoop2.4/lib/spark-assembly-1.3.0-hadoop2.4.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/E:/devsoftware/spark-1.2.0-bin-hadoop2.4/spark-1.2.0-bin-hadoop2.4/lib/spark-examples-1.2.0-hadoop2.4.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 15/04/14 19:02:41 INFO SparkContext: Running Spark version 1.3.0 15/04/14 19:02:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/04/14 19:02:42 INFO SecurityManager: Changing view acls to: hadoop 15/04/14 19:02:42 INFO SecurityManager: Changing modify acls to: hadoop 15/04/14 19:02:43 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 15/04/14 19:02:44 INFO Slf4jLogger: Slf4jLogger started 15/04/14 19:02:44 INFO Remoting: Starting remoting 15/04/14 19:02:44 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@TP-A383-D.tom.com:60722] 15/04/14 19:02:44 INFO Utils: Successfully started service 'sparkDriver' on port 60722. 15/04/14 19:02:44 INFO SparkEnv: Registering MapOutputTracker 15/04/14 19:02:44 INFO SparkEnv: Registering BlockManagerMaster 15/04/14 19:02:44 INFO DiskBlockManager: Created local directory at C:\Users\hadoop\AppData\Local\Temp\spark-c52355be-0749-4859-be6d-d5d1e19c32df\blockmgr-e955e7e7-0cb0-4fce-8340-a2fbe58272d4 15/04/14 19:02:44 INFO MemoryStore: MemoryStore started with capacity 133.6 MB 15/04/14 19:02:44 INFO HttpFileServer: HTTP File server directory is C:\Users\hadoop\AppData\Local\Temp\spark-60338787-0109-43c3-a4c4-80cf2c127280\httpd-4e3e2224-c3b4-43fd-9f24-47c4899c8e3e 15/04/14 19:02:44 INFO HttpServer: Starting HTTP Server 15/04/14 19:02:45 INFO Server: jetty-8.y.z-SNAPSHOT 15/04/14 19:02:45 INFO AbstractConnector: Started SocketConnector@0.0.0.0:60723 15/04/14 19:02:45 INFO Utils: Successfully started service 'HTTP file server' on port 60723. 15/04/14 19:02:45 INFO SparkEnv: Registering OutputCommitCoordinator 15/04/14 19:02:45 INFO Server: jetty-8.y.z-SNAPSHOT 15/04/14 19:02:45 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 15/04/14 19:02:45 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/04/14 19:02:45 INFO SparkUI: Started SparkUI at http://TP-A383-D.tom.com:4040 15/04/14 19:02:45 INFO Executor: Starting executor ID <driver> on host localhost 15/04/14 19:02:45 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@TP-A383-D.tom.com:60722/user/HeartbeatReceiver 15/04/14 19:02:45 INFO NettyBlockTransferService: Server created on 60742 15/04/14 19:02:45 INFO BlockManagerMaster: Trying to register BlockManager 15/04/14 19:02:45 INFO BlockManagerMasterActor: Registering block manager localhost:60742 with 133.6 MB RAM, BlockManagerId(<driver>, localhost, 60742) 15/04/14 19:02:45 INFO BlockManagerMaster: Registered BlockManager SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 15/04/14 19:03:07 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes 15/04/14 19:03:07 INFO MemoryStore: ensureFreeSpace(210772) called with curMem=0, maxMem=140142182 15/04/14 19:03:07 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 205.8 KB, free 133.4 MB) 15/04/14 19:03:08 INFO MemoryStore: ensureFreeSpace(32081) called with curMem=210772, maxMem=140142182 15/04/14 19:03:08 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 31.3 KB, free 133.4 MB) 15/04/14 19:03:08 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:60742 (size: 31.3 KB, free: 133.6 MB) 15/04/14 19:03:08 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/04/14 19:03:08 INFO SparkContext: Created broadcast 0 from NewHadoopRDD at newParquet.scala:447 15/04/14 19:03:10 INFO deprecation: mapred.max.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize 15/04/14 19:03:10 INFO deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize 15/04/14 19:03:10 INFO ParquetRelation2$$anon$1$$anon$2: Using Task Side Metadata Split Strategy 15/04/14 19:03:10 INFO SparkContext: Starting job: collect at SparkPlan.scala:83 15/04/14 19:03:11 INFO DAGScheduler: Got job 0 (collect at SparkPlan.scala:83) with 1 output partitions (allowLocal=false) 15/04/14 19:03:11 INFO DAGScheduler: Final stage: Stage 0(collect at SparkPlan.scala:83) 15/04/14 19:03:11 INFO DAGScheduler: Parents of final stage: List() 15/04/14 19:03:11 INFO DAGScheduler: Missing parents: List() 15/04/14 19:03:11 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at map at SparkPlan.scala:83), which has no missing parents 15/04/14 19:03:11 INFO MemoryStore: ensureFreeSpace(3576) called with curMem=242853, maxMem=140142182 15/04/14 19:03:11 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.5 KB, free 133.4 MB) 15/04/14 19:03:11 INFO MemoryStore: ensureFreeSpace(2534) called with curMem=246429, maxMem=140142182 15/04/14 19:03:11 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.5 KB, free 133.4 MB) 15/04/14 19:03:11 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:60742 (size: 2.5 KB, free: 133.6 MB) 15/04/14 19:03:11 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 15/04/14 19:03:11 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839 15/04/14 19:03:11 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[2] at map at SparkPlan.scala:83) 15/04/14 19:03:11 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 15/04/14 19:03:11 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1526 bytes) 15/04/14 19:03:12 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 15/04/14 19:03:12 INFO ParquetRelation2$$anon$1: Input split: ParquetInputSplit{part: file:/E:/open-sources/Hello2/src/spark/examples/sql/idAndName.parquet start: 0 end: 325 length: 325 hosts: [] requestedSchema: message root { optional int32 id; optional binary name (UTF8); } readSupportMetadata: {org.apache.spark.sql.parquet.row.metadata={"type":"struct","fields":[{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}}]}, org.apache.spark.sql.parquet.row.requested_schema={"type":"struct","fields":[{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}}]}}} 15/04/14 19:03:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 3276 bytes result sent to driver 15/04/14 19:03:12 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 942 ms on localhost (1/1) 15/04/14 19:03:12 INFO DAGScheduler: Stage 0 (collect at SparkPlan.scala:83) finished in 1.065 s 15/04/14 19:03:12 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 15/04/14 19:03:13 INFO DAGScheduler: Job 0 finished: collect at SparkPlan.scala:83, took 2.529248 s 1,MSN 10,QQ 100,Gtalk 1000,Skype null,null 15/04/14 19:03:13 INFO MemoryStore: ensureFreeSpace(210652) called with curMem=248963, maxMem=140142182 15/04/14 19:03:13 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 205.7 KB, free 133.2 MB) 15/04/14 19:03:13 INFO MemoryStore: ensureFreeSpace(32059) called with curMem=459615, maxMem=140142182 15/04/14 19:03:13 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 31.3 KB, free 133.2 MB) 15/04/14 19:03:13 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:60742 (size: 31.3 KB, free: 133.6 MB) 15/04/14 19:03:13 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0 15/04/14 19:03:13 INFO SparkContext: Created broadcast 2 from NewHadoopRDD at newParquet.scala:447 15/04/14 19:03:16 INFO ParquetRelation2$$anon$1$$anon$2: Using Task Side Metadata Split Strategy 15/04/14 19:03:16 INFO SparkContext: Starting job: runJob at newParquet.scala:648 15/04/14 19:03:16 INFO DAGScheduler: Got job 1 (runJob at newParquet.scala:648) with 1 output partitions (allowLocal=false) 15/04/14 19:03:16 INFO DAGScheduler: Final stage: Stage 1(runJob at newParquet.scala:648) 15/04/14 19:03:16 INFO DAGScheduler: Parents of final stage: List() 15/04/14 19:03:16 INFO DAGScheduler: Missing parents: List() 15/04/14 19:03:16 INFO DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[4] at map at newParquet.scala:542), which has no missing parents 15/04/14 19:03:16 INFO MemoryStore: ensureFreeSpace(58000) called with curMem=491674, maxMem=140142182 15/04/14 19:03:16 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 56.6 KB, free 133.1 MB) 15/04/14 19:03:16 INFO MemoryStore: ensureFreeSpace(34439) called with curMem=549674, maxMem=140142182 15/04/14 19:03:16 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 33.6 KB, free 133.1 MB) 15/04/14 19:03:16 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:60742 (size: 33.6 KB, free: 133.6 MB) 15/04/14 19:03:16 INFO BlockManagerMaster: Updated info of block broadcast_3_piece0 15/04/14 19:03:16 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:839 15/04/14 19:03:16 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (MapPartitionsRDD[4] at map at newParquet.scala:542) 15/04/14 19:03:16 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 15/04/14 19:03:16 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 1523 bytes) 15/04/14 19:03:16 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 15/04/14 19:03:16 INFO ParquetRelation2$$anon$1: Input split: ParquetInputSplit{part: file:/E:/open-sources/Hello2/src/spark/examples/sql/idAndName.parquet start: 0 end: 325 length: 325 hosts: [] requestedSchema: message root { optional binary name (UTF8); } readSupportMetadata: {org.apache.spark.sql.parquet.row.metadata={"type":"struct","fields":[{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}}]}, org.apache.spark.sql.parquet.row.requested_schema={"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}}]}}} 15/04/14 19:03:16 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 15/04/14 19:03:16 INFO InternalParquetRecordReader: RecordReader initialized will read a total of 5 records. 15/04/14 19:03:16 INFO CodecConfig: Compression: GZIP 15/04/14 19:03:16 INFO ParquetOutputFormat: Parquet block size to 134217728 15/04/14 19:03:16 INFO ParquetOutputFormat: Parquet page size to 1048576 15/04/14 19:03:16 INFO ParquetOutputFormat: Parquet dictionary page size to 1048576 15/04/14 19:03:16 INFO ParquetOutputFormat: Dictionary is on 15/04/14 19:03:16 INFO ParquetOutputFormat: Validation is off 15/04/14 19:03:16 INFO ParquetOutputFormat: Writer version is: PARQUET_1_0 15/04/14 19:03:17 INFO CodecPool: Got brand-new compressor [.gz] 15/04/14 19:03:17 INFO BlockManager: Removing broadcast 1 15/04/14 19:03:17 INFO BlockManager: Removing block broadcast_1_piece0 15/04/14 19:03:17 INFO MemoryStore: Block broadcast_1_piece0 of size 2534 dropped from memory (free 139560603) 15/04/14 19:03:17 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:60742 in memory (size: 2.5 KB, free: 133.6 MB) 15/04/14 19:03:17 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 15/04/14 19:03:17 INFO BlockManager: Removing block broadcast_1 15/04/14 19:03:17 INFO MemoryStore: Block broadcast_1 of size 3576 dropped from memory (free 139564179) 15/04/14 19:03:17 INFO ContextCleaner: Cleaned broadcast 1 15/04/14 19:03:17 INFO BlockManager: Removing broadcast 0 15/04/14 19:03:17 INFO BlockManager: Removing block broadcast_0 15/04/14 19:03:17 INFO MemoryStore: Block broadcast_0 of size 210772 dropped from memory (free 139774951) 15/04/14 19:03:17 INFO BlockManager: Removing block broadcast_0_piece0 15/04/14 19:03:17 INFO MemoryStore: Block broadcast_0_piece0 of size 32081 dropped from memory (free 139807032) 15/04/14 19:03:17 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:60742 in memory (size: 31.3 KB, free: 133.6 MB) 15/04/14 19:03:17 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/04/14 19:03:17 INFO ContextCleaner: Cleaned broadcast 0 15/04/14 19:03:17 INFO InternalParquetRecordReader: at row 0. reading next block 15/04/14 19:03:17 INFO InternalParquetRecordReader: block read in memory in 0 ms. row count = 5 15/04/14 19:03:17 INFO InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 29,150,465 15/04/14 19:03:17 INFO ColumnChunkPageWriteStore: written 96B for [name] BINARY: 5 values, 44B raw, 58B comp, 1 pages, encodings: [RLE, PLAIN, BIT_PACKED] 15/04/14 19:03:17 INFO FileOutputCommitter: Saved output of task 'attempt_201504141903_0005_r_000000_0' to file:/E:/open-sources/Hello2/src/spark/examples/sql/name.1429009393304.parquet/_temporary/0/task_201504141903_0005_r_000000 15/04/14 19:03:17 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1792 bytes result sent to driver 15/04/14 19:03:17 INFO DAGScheduler: Stage 1 (runJob at newParquet.scala:648) finished in 0.971 s 15/04/14 19:03:17 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 971 ms on localhost (1/1) 15/04/14 19:03:17 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 15/04/14 19:03:17 INFO DAGScheduler: Job 1 finished: runJob at newParquet.scala:648, took 0.998114 s 15/04/14 19:03:17 INFO ParquetFileReader: Initiating action with parallelism: 5 15/04/14 19:03:21 INFO MemoryStore: ensureFreeSpace(212659) called with curMem=335150, maxMem=140142182 15/04/14 19:03:21 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 207.7 KB, free 133.1 MB) 15/04/14 19:03:21 INFO MemoryStore: ensureFreeSpace(32088) called with curMem=547809, maxMem=140142182 15/04/14 19:03:21 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 31.3 KB, free 133.1 MB) 15/04/14 19:03:21 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost:60742 (size: 31.3 KB, free: 133.6 MB) 15/04/14 19:03:21 INFO BlockManagerMaster: Updated info of block broadcast_4_piece0 15/04/14 19:03:21 INFO SparkContext: Created broadcast 4 from NewHadoopRDD at newParquet.scala:447 15/04/14 19:03:21 INFO ParquetRelation2$$anon$1$$anon$2: Using Task Side Metadata Split Strategy 15/04/14 19:03:21 INFO SparkContext: Starting job: collect at SparkSQLParquet.scala:26 15/04/14 19:03:21 INFO DAGScheduler: Got job 2 (collect at SparkSQLParquet.scala:26) with 1 output partitions (allowLocal=false) 15/04/14 19:03:21 INFO DAGScheduler: Final stage: Stage 2(collect at SparkSQLParquet.scala:26) 15/04/14 19:03:21 INFO DAGScheduler: Parents of final stage: List() 15/04/14 19:03:21 INFO DAGScheduler: Missing parents: List() 15/04/14 19:03:21 INFO DAGScheduler: Submitting Stage 2 (MapPartitionsRDD[10] at map at DataFrame.scala:776), which has no missing parents 15/04/14 19:03:21 INFO MemoryStore: ensureFreeSpace(4904) called with curMem=579897, maxMem=140142182 15/04/14 19:03:21 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 4.8 KB, free 133.1 MB) 15/04/14 19:03:21 INFO MemoryStore: ensureFreeSpace(3349) called with curMem=584801, maxMem=140142182 15/04/14 19:03:21 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 3.3 KB, free 133.1 MB) 15/04/14 19:03:21 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:60742 (size: 3.3 KB, free: 133.6 MB) 15/04/14 19:03:21 INFO BlockManagerMaster: Updated info of block broadcast_5_piece0 15/04/14 19:03:21 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:839 15/04/14 19:03:21 INFO DAGScheduler: Submitting 1 missing tasks from Stage 2 (MapPartitionsRDD[10] at map at DataFrame.scala:776) 15/04/14 19:03:21 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks 15/04/14 19:03:21 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, PROCESS_LOCAL, 1526 bytes) 15/04/14 19:03:21 INFO Executor: Running task 0.0 in stage 2.0 (TID 2) 15/04/14 19:03:21 INFO ParquetRelation2$$anon$1: Input split: ParquetInputSplit{part: file:/E:/open-sources/Hello2/src/spark/examples/sql/idAndName.parquet start: 0 end: 325 length: 325 hosts: [] requestedSchema: message root { optional int32 id; optional binary name (UTF8); } readSupportMetadata: {org.apache.spark.sql.parquet.row.metadata={"type":"struct","fields":[{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}}]}, org.apache.spark.sql.parquet.row.requested_schema={"type":"struct","fields":[{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}}]}}} 15/04/14 19:03:21 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 15/04/14 19:03:21 INFO InternalParquetRecordReader: RecordReader initialized will read a total of 5 records. 15/04/14 19:03:21 INFO InternalParquetRecordReader: at row 0. reading next block 15/04/14 19:03:21 INFO InternalParquetRecordReader: block read in memory in 1 ms. row count = 5 15/04/14 19:03:21 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1840 bytes result sent to driver id: 100, name:Gtalk id: 1000, name:Skype 15/04/14 19:03:21 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 39 ms on localhost (1/1) 15/04/14 19:03:21 INFO DAGScheduler: Stage 2 (collect at SparkSQLParquet.scala:26) finished in 0.039 s 15/04/14 19:03:21 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 15/04/14 19:03:21 INFO DAGScheduler: Job 2 finished: collect at SparkSQLParquet.scala:26, took 0.061630 s Process finished with exit code 0
相关推荐
Spark SQL的默认数据源为Parquet格式。数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作。 修改配置项spark.sql.sources.default,可修改默认数据源格式。 scala> val df = spark.read.load(hdfs://...
users.parquet people.json spark ,spark学习中的parquet文件和json文件
Spark SQL在不同存储格式下的性能对比 本文测试的目的是用来对比 Spark 三种存储格式txt、parquet、ya100的性能差异。 因机器环境以及配置的不同,测试结果可能略有差异,该测试报告仅对笔者的软硬件环境负责。
DataFrame 和 SQL 操作 MLlib 操作 缓存 / 持久化 CheckPointing 累加器和广播变量 应用程序部署 监控应用程序 性能 降低批处理的时间 设置合理的批处理间隔 内存 容错语义 迁移指南(从 0.9.1 或者更...
Parquet表的Spark SQL索引 总览 包允许为Parquet表创建索引(作为和),以减少在Spark SQL中用于几乎交互式分析或点查询时的查询延迟。 它设计用于表不经常更改但经常用于查询的用例,例如使用Thrift JDBC / ODBC...
Spark SQL can process, integrate and analyze the data from diverse data sources (e.g., Hive, Cassandra, Kafka and Oracle) and file formats (e.g., Parquet, ORC, CSV, and JSON). This talk will dive ...
一、前置知识详解 Spark SQL重要是操作DataFrame,...二、Spark SQL读写数据代码实战 import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkC
主要特征Apache Arrow格式化Spark运算符之间的中间数据 使用可以将的RDD传递给操作员。 我们以Arrow列格式实现了此API。基于Apache Arrow的Parquet和其他格式的本机读取器 开发了本机镶木地板阅读器以
Spark Parquet Driver 使用 Spark SQL 和动态配置将输入 TSV 文件转换为带有嵌套元素的 Parquet。
Spark SQL 是被用来对结构化数据进行操作 的程序包, 它支持多种数据源, 例如 Hive 表、 JSON、 Parquet 等, 还可以使用 HQL 来进行数据的查询。 Spark Streaming 能够对实时的数据进行流式计 算。 它具有准时的...
IQL | 简体中文 基于SparkSQL实现了一套即席查询服务,具有如下特性: 优雅的交互方式,支持多种datasource/sink,多数据源混算 spark常驻服务,基于zookeeper的引擎自动发现 ...支持的文件格式:parquet、csv
java写的一个spark小文件合并工具,支持text、parquet、orc等格式,分享给有需要的人
Apache Spark是一种快速的内存数据处理引擎,具有优雅且富于表现力的开发API,可让数据工作者高效执行需要快速迭代访问数据集的流,机器学习或SQL工作负载。 通过在Apache Hadoop YARN上运行Spark,现在,各地的开发...
* 通过whole-stage code generation(全流程代码生成)技术将spark sql和dataset的性能提升2~10倍 * 通过vectorization(向量化)技术提升parquet文件的扫描吞吐量 * 提升orc文件的读写性能 * 提升catalyst查询优化...
parquet数据:列式存储结构,由Twitter和Cloudera合作开发,相比于行式存储,其特点是: 可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量;压缩编码可以降低磁盘存储空间,使用更高效的压缩编码节约存储...
目录 一:为什么sparkSQL? 3 1.1:sparkSQL的发展历程 3 1.1.1:hive and shark 3 1.1.2:Shark和sparkSQL 4 ...9.2: 高效的数据格式 62 9.3:内存的使用 63 9.4:合适的Task 64 9.5:其他的一些建议 64 十:总结 64
Learn the concepts of Spark SQL, SchemaRDD, Caching, Spark UDFs and working with Hive and Parquet file formats Understand the architecture of Spark MLLib while discussing some of the off-the-shelf ...
用于Parquet元数据资源管理器的Spark SQL数据源,它是parquet-cli / parquet-tools的更简单替代方案。 特征: 使用Spark SQL分析Parquet元数据,例如行组大小,数据页标记,编码等。 将Parquet表中的所有文件上的...
spark sql test data
文档主要介绍了环境搭建和配置使用 1. 什么是Spark SQL Spark SQL的一个用途是...spark 通过DataFrame操作大量的数据源,包括外部文件(如 json、avro、parquet、sequencefile 等等)、Hive、关系数据库、cassandra等