`

大数据学习0基础葵花宝典

阅读更多
BIGDATA葵花宝典

1、虚拟机安装CentOS
VMware workstation 12
CentOS 6.6
(百度下安装步骤,网上很多)
2、CentOS配置网络和安装桌面系统
2.1 网络配置
选择桥接模式
vi /etc/sysconfig/network-scripts/ifcfg-eth0
把ONBOOT=no这行的no修改为yes,reboot重启可联通网络
修改/etc/hosts,IP地址对应机器名,把之前的行注释调,加入:IP地址 机器名
service network restart //重启网络

安装vim软件:yum -y install vim

shutdown -h now //关闭系统

2.2 安装桌面系统
用Scala开发工具才操作
1)、使用命令 runlevel 查看当前的运行级别 ,
2)、使用命令 yum grouplist | more  查看是否安装了桌面环境的组件,
3)、当前运行级别是3,而且也没有安装桌面环境的软件
4)、yum groupinstall -y   "Desktop"   "Desktop Platform"   "Desktop Platform Development"  "Fonts"  "General Purpose Desktop"  "X Window System"  "Chinese Support [zh]" "Internet Browser"
     后面的是安装软件过程,需要等等一阵时间。
5)、编辑/etc/inittab文件,修改启级别为5,“id:3:initdefault:”修改为“id:5:initdefault:”
6)、然后重新启动就可以进入桌面环境
7)、重启的过程中,设置一下桌面环境的几个参数就可以正常进入登陆界面了

3、查看测试环境版本以及下载相对应版本
1) echo $JA[align=center][/align]VA_HOME:/usr/java/jdk1.7.0_75
下载路径:www.oracle.com -> 鼠标悬停“Downloads”上点击“Java for Developers”->拖到网页最下面Java Archive页,点击“Downloads”选择下载对应的版本即可
2) hadoop version:Hadoop 2.6.0-cdh5.8.3
下载路径:http://hadoop.apache.org ->左边点击“Download Hadoop”->点击“releases”->选择镜像站“mirror site”

4、配置SSH(免密码登录)
命令:ssh-keygen -t rsa  --(ll .ssh/ id_rsa为私钥,id_rsa.pub为公钥)
命令:cd .ssh/
命令:cat id_rsa.pub >> authorized_keys --生成一个权限文件
命令:chmod 644 authorized_keys --给一个644的权限
  验证下:ssh bigdata 第一次登陆需要数据YES  退出exit
ssh IP/HOSTNAME 连接命令
5、 安装JDK和HADOOP
5.1、安装和配置JDK
安装命令:rpm -ivh jdk-7u75-linux-x64.rpm,
安装好的路径一般在/usr目录下,命令:ll /usr/java/default/
环境变量:vi /etc/profile 修改该文件,最末尾加入:
export JAVA_HOME=/usr/java/jdk1.7.0_75
export PATH=$PATH:$JAVA_HOME/bin

          source /etc/profile 生效操作 (该操作不需要)
卸载:rpm -e packgename,rpm -e jdk1.8.0_101(文件安装的跟目录)
5.2、安装及配置Hadoop
安装:tar zxf hadoop-2.6.0.tar.gz  //解压到当前目录
配置环境变量:修改vi /etc/profile,在最后加上如下三行:
export JAVA_HOME=/usr/java/jdk1.7.0_75
export HADOOP_HOME=/opt/hadoop-2.6.0
export PATH=$PATH:$JAVA_HOME/bin: $HADOOP_HOME/bin

       配置文件路径:安装目录 /etc/Hadoop

   创建文件:
           /opt/hadoop-2.6.0/current/tmp
           /opt/hadoop-2.6.0/current/data
mkdir –p /opt/hadoop-2.6.0/current/dfs/name

       下面的配置文件路径为:/opt/hadoop-2.6.0/etc/hadoop
5.2.1、Core-site.xml
<property>
    <name>fs.default.name</name>
    <value>hdfs://bigdata:9000</value>
  </property>

<property>
    <name>hadoop.tmp.dir</name>
    <value>/opt/hadoop-2.6.0/current/tmp</value>
  </property>
<property>
    <name>fs.trash.interval</name>
    <value>10</value>
  </property>

5.2.2、Hdfs-site.xml
<property>
   <name>dfs.namenode.name.dir</name>
   <value>/opt/hadoop-2.6.0/current/dfs/name</value>
</property>
<property>
   <name>dfs.datanode.data.dir</name>
   <value>/opt/hadoop-2.6.0/current/data</value>
</property>
<property>  --副本的数量
   <name>dfs.replication</name>
   <value>1</value>
</property>
<property> --是否启用WEB
   <name>dfs.webhdfs.enabled</name>
   <value>true</value>
</property>
<property> --设置用户组
   <name>dfs.permissions.superusergroup</name>
   <value>staff</value>
</property>
<property> --是否开启HDFS的权限
   <name>dfs.permissions.enabled</name>
   <value>false</value>
</property>
5.2.3、Yarn-site.xml
<property> 配置resourcemanager名称
   <name>yarn.resourcemanager.hostname</name>
   <value>bigdata</value>
</property>
<property>  配置nodemanager
   <name>yarn.nodemanager.aux-services</name>
   <value>mapreduce_shuffle</value>
</property>
<property> 配置nodemanager的mapreduce的类
   <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
   <value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<property> 配置resourcemanager地址的端口
   <name>yarn.resourcemanager.address</name>
   <value>bigdata:18040</value>
</property>
<property> 配置resourcemanager调度器的端口
   <name>yarn.resourcemanager.scheduler.address</name>
   <value>bigdata:18030</value>
</property>
<property> tracker的端口地址
   <name>yarn.resourcemanager.resource-tracker.address</name>
   <value>bigdata:18025</value>
</property>
<property>  admin的端口地址
   <name>yarn.resourcemanager.admin.address</name>
   <value>bigdata:18141</value>
</property>
<property> webapp端口地址
   <name>yarn.resourcemanager.webapp.address</name>
   <value>bigdata:18088</value>
</property>
<property> 配置日志 启用
   <name>yarn.log-aggregation-enable</name>
   <value>true</value>
</property>
<property> 日志聚合,是以秒为单位
   <name>yarn.log-aggregation.retain-seconds</name>
   <value>86400</value>
</property>
<property> 日志检查,多长时间检查一次,是以秒为单位
   <name>yarn.log-aggregation.retain-check-interval-seconds</name>
   <value>86400</value>
</property>
<property>
   <name>yarn.nodemanager.remote-app-log-dir</name>
   <value>/tmp/logs</value>
</property>
<property>
   <name>yarn.nodemanager.remote-app-log-dir-suffix</name>
   <value>logs</value>
</property>

5.2.4、Mapred-site.xml
复制文件:cp mapred-site.xml.template mapred-site.xml

<property> mapreduce框架
  <name>mapreduce.framework.name</name>
  <value>yarn</value>
</property>
<property> JOB端口地址
  <name>mapreduce.jobtracker.http.address</name>
  <value>bigdata:50030</value>
</property>
<property> JOB跑的历史记录地址
  <name>mapreduce.jobhisotry.address</name>
  <value>bigdata:10020</value>
</property>
<property>
  <name>mapreduce.jobhistory.webapp.address</name>
  <value>bigdata:19888</value>
</property>
<property> 已完成的日志目录
  <name>mapreduce.jobhistory.done-dir</name>
  <value>/jobhistory/done</value>
</property>
<property> 中间完成情况的日志目录
  <name>mapreduce.intermediate-done-dir</name>
  <value>/jobhisotry/done_intermediate</value>
</property>
<property>
  <name>mapreduce.job.ubertask.enable</name>
  <value>true</value>
</property>

5.2.5、Hadoop-env.sh
目录:/opt/hadoop-2.6.0/etc/hadoop
export JAVA_HOME=/usr/java/default/

5.2.6、Slaves
Bigdata  //计算机名称

5.2.7、格式化HDFS
hdfs namenode -format
5.2.8、启动Hadoop集群
/opt/hadoop-2.6.0/sbin/start-all.sh
5.2.9、验证Hadoop集群
1) jps
2) 通过页面端口来查看
查看防火墙是否关闭:chkconfig iptables --list
关闭防火墙:service iptables stop (临时)
    永久  vim /etc/selinux/config 把SELINUX=enforcing修改为SELINUX=disabled
          chkconfig iptables off
http://172.29.20.53:18088/  http://172.29.20.53:50070/

问题,启动hadoop有警告
WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Starting namenodes on [bigdata]
解决过程:直接在log4j日志中去除告警信息。
在/opt /hadoop-2.6.0/etc/hadoop/log4j.properties文件中添加
log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR

6、 HDFS的一些命令操作
技术相关文档:http://hadoop.apache.org/ ->左边点击Documentation->选择相关的版本
创建目录:hdfs dfs -mkdir /demo
上传文件:hdfs dfs -put demo.txt /demo/
查看目录:hdfs dfs -ls /demo
查看文件:hdfs dfs -cat /demo/demo.txt
删除文件:hdfs dfs -rm /demo/demo.txt
删除目录:hdfs dfs –rm –r /demo
下载文件:hdfs dfs -get /demo/demo.txt ./

7、 Spark安装、配置、验证
7.1、Scala安装和配置
解压:tar zxf scala-2.10.5.tgz
配置:vim /etc/profile
export JAVA_HOME=/usr/java/jdk1.7.0_75
export SCALA_HOME=/opt/scala-2.10.5
export HADOOP_HOME=/opt/hadoop-2.6.0
export PATH=$PATH:$HADOOP_HOME/bin:$SCALA_HOME/bin:$JAVA_HOME/bin
7.2、Spark安装和配置
解压:tar zxf spark-1.6.0-bin-hadoop2.6
配置:1)vim /etc/profile
export JAVA_HOME=/usr/java/jdk1.7.0_75
export SCALA_HOME=/opt/scala-2.10.5
export HADOOP_HOME=/opt/hadoop-2.6.0
export SPARK_HOME=/opt/spark-1.6.0-bin-hadoop2.6
export PATH=$PATH:$SCALA_HOME/bin:$JAVA_HOME/bin:$SPARK_HOME/bin
      2)spark-env.sh
        路径:/opt/spark-1.6.0-bin-hadoop2.6/conf
复制文件:cp spark-env.sh.template spark-env.sh
export SPARK_MASTER_IP=bigdata
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=2g
3)slaves
路径:/opt/spark-1.6.0-bin-hadoop2.6/conf
复制文件:cp slaves.template slaves
增加计算机名称或者是IP地址
7.3、Spark启动
先启动Hadoop
/opt/spark-1.6.0-bin-hadoop2.6/sbin/start-all.sh
用jps查看会多处两个进程:Master、Worker
页面地址和端口:http://172.29.20.53:8080/
7.4、Spark_shell启动和验证
启动:/opt/spark-1.6.0-bin-hadoop2.6/bin/spark-shell
验证,执行如下代码:
scala> val file=sc.textFile("hdfs://bigdata:9000/demo/demo.txt")
scala>val count=file.flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey(_+_)
scala> count.collect
第一行是读取HDFS的demo.txt文件
第二行是对文件进行操作
第三行是提交并执行Job
   

  :help  --获取帮助
  :quit   --退出spark-shell
7.5、Spark四大天王简介
Spark Streaming:
Spark Streaming基于微批量方式的计算和处理,可以用于处理实时的流数据。它使用DStream,简单来说就是一个弹性分布式数据集(RDD)系列,处理实时数据。

Spark SQL:
Spark SQL可以通过JDBC API将Spark数据集暴露出去,而且还可以用传统的BI和可视化工具在Spark数据上执行类似SQL的查询。用户还可以用Spark SQL对不同格式的数据(如JSON,Parquet以及数据库等)执行ETL,将其转化,然后暴露给特定的查询。

Spark MLlib:
MLlib是一个可扩展的Spark机器学习库,由通用的学习算法和工具组成,包括二元分类、线性回归、聚类、协同过滤、梯度下降以及底层优化原语。用于机器学习和统计等场景

Spark GraphX:
GraphX是用于图计算和并行图计算的新的(alpha)Spark API。通过引入弹性分布式属性图(Resilient Distributed Property Graph),一种顶点和边都带有属性的有向多重图,扩展了Spark RDD。为了支持图计算,GraphX暴露了一个基础操作符集合(如subgraph,joinVertices和aggregateMessages)和一个经过优化的Pregel API变体。此外,GraphX还包括一个持续增长的用于简化图分析任务的图算法和构建器集合。
8、 基于IDEA构建SPARK开发环境
8.1 手动安装SBT
 下载地址:http://www.scala-sbt.org/
 sudo tar zxvf sbt-0.13.13.tgz (把解压后的文件放到/opt/sbt中)
 建立启动sbt的脚本文件
路径在/opt/sbt中新建一个文件sbt,命令vim sbt
SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M"
java $SBT_OPTS -jar /opt/sbt/bin/sbt-launch.jar "$@"

 修改sbt文件权限
命令为:chmod u+x sbt
 配置PATH环境变量,保证在控制台中可以使用sbt命令
vim ~/.bashrc  最后加入:export PATH=/opt/sbt/:$PATH  
source ~/.bashrc 生效
 测试sbt是否安装成功
第一次执行时,会下载一些文件包,然后才能正常使用,要确保联网了,安装成功后显示如下:
sbt sbt-version
[info] Set current project to sbt (in build file:/opt/sbt/)
[info] 0.13.13

8.2 安装IDEA和相关插件
浏览器:yum install firefox
安装中文输入法:yum install "@Chinese Support"
IntelliJ idea下载地址:https://www.jetbrains.com/idea/
把/opt/idea/bin配置到PATH环境变量中,vim ~./.bashrc
把插件解压放到/opt/idea/plugins目录下即可
启动命令:./idea.sh
IDEA工具打开后,需要安装SBT插件,打开idea的首选项,然后找到 Plugins ,点击 Browser repositores... 按钮,输入 sbt 搜索,然后找到 sbt 的插件进行安装,如下图所示:

8.3 创建SBT工程
创建SBT项目,依赖包下载完成后的目录结构如下图:

plugins.sbt 文件放置插件配置
build.sbt 是整体的项目配置信息
build.properties 可以设置 sbt 版本
java 目录存放 java 文件
scala 目录存放 scala 文件
resources 目录用来存放配置文件
test 相关目录用来存放测试相关文件

测试一下运行:


9、 Spark Streaming
学习:http://spark.apache.org/docs/1.6.0/streaming-programming-guide.html

安装netcat
netcat 在centos里叫:nc.x86_64,可以用:yum search nc找下。如果有执行下面命令: yum install nc.x86_64,安装完成后用nc –help验证是否可以用。
9.1 A Quick Example
   def main(args: Array[String]): Unit = {
    // 创建一个local StreamingContext,包含2个工作线程,并将批次间隔设为1秒
// master至少需要2个CPU核,以避免出现任务饿死的情况
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf,Seconds(1))

// 创建一个连接到hostname:port的DStream,如:localhost:9999
    val lines = ssc.socketTextStream("bigdata",9999)

// 将每一行分割成多个单词
    val words = lines.flatMap(_.split(" "))

// 对每一批次中的单词进行计数
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_+_)

// 将该DStream产生的RDD的头十个元素打印到控制台上
    wordCounts.print()

    ssc.start()             // 启动流式计算
    ssc.awaitTermination()  // 等待直到计算终止

  }



运行结果如下:


9.2 初始化StreamingContext
import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
context对象创建后,你还需要如下步骤:
1、创建DStream对象,并定义好输入数据源。
2、基于数据源DStream定义好计算逻辑和输出。
3、调用streamingContext.start() 启动接收并处理数据。
4、调用streamingContext.awaitTermination() 等待流式处理结束(不管是手动结束,还是发生异常错误)
5、你可以主动调用 streamingContext.stop() 来手动停止处理流程。
9.3 离散数据流 (DStreams)
离散数据流(DStream)是Spark Streaming最基本的抽象。它代表了一种连续的数据流,要么从某种数据源提取数据,要么从其他数据流映射转换而来。

9.4 输入DStream和接收器
Spark Streaming主要提供两种内建的流式数据源:
基础数据源(Basic sources): 在StreamingContext API 中可直接使用的源,如:文件系统,套接字连接或者Akka actor。
 文件数据流(File Streams): 可以从任何兼容HDFS API(包括:HDFS、S3、NFS等)的文件系统,创建方式如下:
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
另外,文件数据流不是基于接收器的,所以不需要为其单独分配一个CPU core。
 基于自定义Actor的数据流(Streams based on Custom Actors): DStream可以由Akka actor创建得到,只需调用 streamingContext.actorStream(actorProps, actor-name)。详见自定义接收器(Custom Receiver Guide)。actorStream暂时不支持Python API。
 RDD队列数据流(Queue of RDDs as a Stream): 如果需要测试Spark Streaming应用,你可以创建一个基于一批RDD的DStream对象,只需调用 streamingContext.queueStream(queueOfRDDs)。RDD会被一个个依次推入队列,而DStream则会依次以数据流形式处理这些RDD的数据。

高级数据源(Advanced sources): 需要依赖额外工具类的源,如:Kafka、Flume、Kinesis、Twitter等数据源。这些数据源都需要增加额外的依赖。
自定义数据源

9.5 接收器可靠性
从可靠性角度来划分,大致有两种数据源。其中,像Kafka、Flume这样的数据源,它们支持对所传输的数据进行确认。系统收到这类可靠数据源过来的数据,然后发出确认信息,这样就能够确保任何失败情况下,都不会丢数据。因此我们可以将接收器也相应地分为两类:
可靠接收器(Reliable Receiver) – 可靠接收器会在成功接收并保存好Spark数据副本后,向可靠数据源发送确认信息。
不可靠接收器(Unreliable Receiver) – 不可靠接收器不会发送任何确认信息。不过这种接收器常用语于不支持确认的数据源,或者不想引入数据确认的复杂性的数据源。
9.6 DStream支持的transformation算子
和RDD类似,DStream也支持从输入DStream经过各种transformation算子映射成新的DStream。DStream支持很多RDD上常见的transformation算子,一些常用的见下表:
Transformation算子 用途
map(func) 返回会一个新的DStream,并将源DStream中每个元素通过func映射为新的元素
flatMap(func) 和map类似,不过每个输入元素不再是映射为一个输出,而是映射为0到多个输出
filter(func) 返回一个新的DStream,并包含源DStream中被func选中(func返回true)的元素
repartition(numPartitions) 更改DStream的并行度(增加或减少分区数)
union(otherStream) 返回新的DStream,包含源DStream和otherDStream元素的并集
count() 返回一个包含单元素RDDs的DStream,其中每个元素是源DStream中各个RDD中的元素个数
reduce(func) 返回一个包含单元素RDDs的DStream,其中每个元素是通过源RDD中各个RDD的元素经func(func输入两个参数并返回一个同类型结果数据)聚合得到的结果。func必须满足结合律,以便支持并行计算。
countByValue() 如果源DStream包含的元素类型为K,那么该算子返回新的DStream包含元素为(K, Long)键值对,其中K为源DStream各个元素,而Long为该元素出现的次数。
reduceByKey(func, [numTasks]) 如果源DStream 包含的元素为 (K, V) 键值对,则该算子返回一个新的也包含(K, V)键值对的DStream,其中V是由func聚合得到的。注意:默认情况下,该算子使用Spark的默认并发任务数(本地模式为2,集群模式下由spark.default.parallelism 决定)。你可以通过可选参数numTasks来指定并发任务个数。
join(otherStream, [numTasks]) 如果源DStream包含元素为(K, V),同时otherDStream包含元素为(K, W)键值对,则该算子返回一个新的DStream,其中源DStream和otherDStream中每个K都对应一个 (K, (V, W))键值对元素。
cogroup(otherStream, [numTasks]) 如果源DStream包含元素为(K, V),同时otherDStream包含元素为(K, W)键值对,则该算子返回一个新的DStream,其中每个元素类型为包含(K, Seq[V], Seq[W])的tuple。
transform(func) 返回一个新的DStream,其包含的RDD为源RDD经过func操作后得到的结果。利用该算子可以对DStream施加任意的操作。
updateStateByKey(func) 返回一个包含新”状态”的DStream。源DStream中每个key及其对应的values会作为func的输入,而func可以用于对每个key的“状态”数据作任意的更新操作。

……不想往下看了,还有好多内容

10、 Spark SQL, DataFrames 以及 Datasets 编程指南
Spark SQL的一种用法是直接执行SQL查询语句,你可使用最基本的SQL语法,也可以选择HiveQL语法。Spark SQL可以从已有的Hive中读取数据。更详细的请参考Hive Tables 这一节。如果用其他编程语言运行SQL,Spark SQL将以DataFrame返回结果。你还可以通过命令行command-line 或者 JDBC/ODBC 使用Spark SQL。
DataFrame是一种分布式数据集合,每一条数据都由几个命名字段组成。概念上来说,她和关系型数据库的表 或者 R和Python中的data frame等价,只不过在底层,DataFrame采用了更多优化。DataFrame可以从很多数据源(sources)加载数据并构造得到,如:结构化数据文件,Hive中的表,外部数据库,或者已有的RDD。
Dataset是Spark-1.6新增的一种API,目前还是实验性的。Dataset想要把RDD的优势(强类型,可以使用lambda表达式函数)和Spark SQL的优化执行引擎的优势结合到一起。Dataset可以由JVM对象构建(constructed )得到,而后Dataset上可以使用各种transformation算子(map,flatMap,filter 等)。
10.1 入门
10.1.1 创建DataFrames和一些操作
Spark SQL所有的功能入口都是SQLContext 类,及其子类。不过要创建一个SQLContext对象,首先需要有一个SparkContext对象
val conf = new SparkConf().setMaster("local[2]").setAppName("JSON DATA")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

val df = sqlContext.read.json("/opt/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json")
df.show() //显示内容
df.printSchema() //打印数据树形结构
df.select("name").show()
df.filter(df("age")>21).show()
df.groupBy("age").count().show()

10.1.2 编程方式执行SQL查询
val df = sqlContext.sql("SELECT * FROM table")
10.1.3 创建Dataset
10.1.3.1 和RDD互操作-利用反射推导schema
Dataset API和RDD类似,不过Dataset不使用Java序列化或者Kryo,而是使用专用的编码器(Encoder )来序列化对象和跨网络传输通信。如果这个编码器和标准序列化都能把对象转字节,那么编码器就可以根据代码动态生成,并使用一种特殊数据格式,这种格式下的对象不需要反序列化回来,就能允许Spark进行操作,如过滤、排序、哈希等。
// 定义一个case class.
// 注意:Scala 2.10的case class最多支持22个字段,要绕过这一限制,
// 你可以使用自定义class,并实现Product接口。当然,你也可以改用编程方式定义schema
case class Person(name: String, age: Int)
上面创建的case class类放到函数外面。

val conf = new SparkConf().setMaster("local[2]").setAppName("JSON DATA")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// 为了支持RDD到DataFrame的隐式转换
import sqlContext.implicits._

val path = "/opt/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.txt"

// 创建一个包含Person对象的RDD,并将其注册成table
val people = sc.textFile(path).map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// sqlContext.sql方法可以直接执行SQL语句
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// SQL查询的返回结果是一个DataFrame,且能够支持所有常见的RDD算子
// 查询结果中每行的字段可以按字段索引访问:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// 或者按字段名访问:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] 会一次性返回多列,并以Map[String, T]为返回结果类型
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// 返回结果: Map("name" -> "Justin", "age" -> 19)

10.1.3.2 和RDD互操作-编程方式定义Schema
如果不能事先通过case class定义schema(例如,记录的字段结构是保存在一个字符串,或者其他文本数据集中,需要先解析,又或者字段对不同用户有所不同),那么你可能需要按以下三个步骤,以编程方式的创建一个DataFrame:
 从已有的RDD创建一个包含Row对象的RDD
 用StructType创建一个schema,和步骤1中创建的RDD的结构相匹配
 把得到的schema应用于包含Row对象的RDD,调用这个方法来实现这一步:SQLContext.createDataFrame
val conf = new SparkConf().setMaster("local[2]").setAppName("Dataset")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

val path = "/opt/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.txt"
// 创建一个RDD
val people = sc.textFile(path)

// 数据的schema被编码与一个字符串中
val schemaString = "name age"

// Import Row.
//    import org.apache.spark.sql.Row;

// Import Spark SQL 各个数据类型
//    import org.apache.spark.sql.types.{StructType,StructField,StringType};

// 基于前面的字符串生成schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// 将RDD[people]的各个记录转换为Rows,即:得到一个包含Row对象的RDD
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// 将schema应用到包含Row对象的RDD上,得到一个DataFrame
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

// 将DataFrame注册为table
peopleDataFrame.registerTempTable("people")

// 执行SQL语句
val results = sqlContext.sql("SELECT name FROM people")

// SQL查询的结果是DataFrame,且能够支持所有常见的RDD算子
// 并且其字段可以以索引访问,也可以用字段名访问
results.map(t => "Name: " + t(0)).collect().foreach(println)

10.2 数据源
Spark SQL支持基于DataFrame操作一系列不同的数据源。DataFrame既可以当成一个普通RDD来操作,也可以将其注册成一个临时表来查询。把DataFrame注册为table之后,你就可以基于这个table执行SQL语句了。本节将描述加载和保存数据的一些通用方法,包含了不同的Spark数据源,然后深入介绍一下内建数据源可用选项。
10.2.1 通用加载/保存函数
在最简单的情况下,所有操作都会以默认类型数据源来加载数据(默认是Parquet,除非修改了spark.sql.sources.default 配置).
val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

 手动指定选项
你也可以手动指定数据源,并设置一些额外的选项参数。数据源可由其全名指定(如,org.apache.spark.sql.parquet),而对于内建支持的数据源,可以使用简写名(json, parquet, jdbc)。任意类型数据源创建的DataFrame都可以用下面这种语法转成其他类型数据格式。
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

 直接对文件使用SQL
Spark SQL还支持直接对文件使用SQL查询,不需要用read方法把文件加载进来。
val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

 保存模式
Save操作有一个可选参数SaveMode,用这个参数可以指定如何处理数据已经存在的情况。很重要的一点是,这些保存模式都没有加锁,所以其操作也不是原子性的。另外,如果使用Overwrite模式,实际操作是,先删除数据,再写新数据。
仅Scala/Java 所有支持的语言 含义
SaveMode.ErrorIfExists (default) "error" (default) (默认模式)从DataFrame向数据源保存数据时,如果数据已经存在,则抛异常。
SaveMode.Append "append" 如果数据或表已经存在,则将DataFrame的数据追加到已有数据的尾部。
SaveMode.Overwrite "overwrite" 如果数据或表已经存在,则用DataFrame数据覆盖之。
SaveMode.Ignore "ignore" 如果数据已经存在,那就放弃保存DataFrame数据。这和SQL里CREATE TABLE IF NOT EXISTS有点类似。

 保存到持久化表
在使用HiveContext的时候,DataFrame可以用saveAsTable方法,将数据保存成持久化的表。与registerTempTable不同,saveAsTable会将DataFrame的实际数据内容保存下来,并且在HiveMetastore中创建一个游标指针。持久化的表会一直保留,即使Spark程序重启也没有影响,只要你连接到同一个metastore就可以读取其数据。读取持久化表时,只需要用用表名作为参数,调用SQLContext.table方法即可得到对应DataFrame。
默认情况下,saveAsTable会创建一个”managed table“,也就是说这个表数据的位置是由metastore控制的。同样,如果删除表,其数据也会同步删除。
10.2.2 Parquet文件
Parquet 是一种流行的列式存储格式。Spark SQL提供对Parquet文件的读写支持,而且Parquet文件能够自动保存原始数据的schema。写Parquet文件的时候,所有的字段都会自动转成nullable,以便向后兼容。
 编程方式加载数据
代码省略
10.2.3 JSON数据集
Spark SQL在加载JSON数据的时候,可以自动推导其schema并返回DataFrame。用SQLContext.read.json读取一个包含String的RDD或者JSON文件,即可实现这一转换。
注意,通常所说的json文件只是包含一些json数据的文件,而不是我们所需要的JSON格式文件。JSON格式文件必须每一行是一个独立、完整的的JSON对象。因此,一个常规的多行json文件经常会加载失败。
    val conf = new SparkConf().setMaster("local[2]").setAppName("sparkSQLJSON")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    // 数据集是由路径指定的
    // 路径既可以是单个文件,也可以还是存储文本文件的目录
    val path = "/opt/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json"
    val people = sqlContext.read.json(path)

    // 推导出来的schema,可由printSchema打印出来
    people.printSchema()
    // root
    //  |-- age: integer (nullable = true)
    //  |-- name: string (nullable = true)

    // 将DataFrame注册为table
    people.registerTempTable("people")

    // 跑SQL语句吧!
    val teenagers = sqlContext.sql("SELECT name,age FROM people WHERE age >= 13 AND age <= 19")

    teenagers.map(t => "Name: " + t(0) + ", Age:" + t(1)).collect().foreach(println)

    teenagers.map(t => "Name: " + t.getAs[String]("name") + ", Age:" + t.getAs[Int]("age")).collect().foreach(println)

    // 另一种方法是,用一个包含JSON字符串的RDD来创建DataFrame
    val anotherPeopleRDD = sc.parallelize(
      """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
    val anotherPeople = sqlContext.read.json(anotherPeopleRDD)

SQL版的应用
CREATE TEMPORARY TABLE jsonTable
USING org.apache.spark.sql.json
OPTIONS (
  path "examples/src/main/resources/people.json"
)

SELECT * FROM jsonTable

10.2.4 Hive表
Spark SQL支持从Apache Hive读写数据。然而,Hive依赖项太多,所以没有把Hive包含在默认的Spark发布包里。要支持Hive,需要在编译spark的时候增加-Phive和-Phive-thriftserver标志。这样编译打包的时候将会把Hive也包含进来。注意,hive的jar包也必须出现在所有的worker节点上,访问Hive数据时候会用到(如:使用hive的序列化和反序列化SerDes时)。

Hive配置在conf/目录下hive-site.xml,core-site.xml(安全配置),hdfs-site.xml(HDFS配置)文件中。请注意,如果在YARN cluster(yarn-cluster mode)模式下执行一个查询的话,lib_mananged/jar/下面的datanucleus 的jar包,和conf/下的hive-site.xml必须在驱动器(driver)和所有执行器(executor)都可用。一种简便的方法是,通过spark-submit命令的–jars和–file选项来提交这些文件。
代码省略
10.2.5 用JDBC连接其他数据库
Spark SQL也可以用JDBC访问其他数据库。这一功能应该优先于使用JdbcRDD。因为它返回一个DataFrame,而DataFrame在Spark SQL中操作更简单,且更容易和来自其他数据源的数据进行交互关联。JDBC数据源在java和python中用起来也很简单,不需要用户提供额外的ClassTag。(注意,这与Spark SQL JDBC server不同,Spark SQL JDBC server允许其他应用执行Spark SQL查询)

首先,你需要在spark classpath中包含对应数据库的JDBC driver,下面这行包括了用于访问postgres的数据库driver
SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell
远程数据库的表可以通过Data Sources API,用DataFrame或者SparkSQL 临时表来装载。以下是选项列表:
属性名 含义
url 需要连接的JDBC URL
dbtable 需要读取的JDBC表。注意,任何可以填在SQL的where子句中的东西,都可以填在这里。(既可以填完整的表名,也可填括号括起来的子查询语句)
driver JDBC driver的类名。这个类必须在master和worker节点上都可用,这样各个节点才能将driver注册到JDBC的子系统中。
partitionColumn, lowerBound, upperBound, numPartitions 这几个选项,如果指定其中一个,则必须全部指定。他们描述了多个worker如何并行的读入数据,并将表分区。partitionColumn必须是所查询的表中的一个数值字段。注意,lowerBound和upperBound只是用于决定分区跨度的,而不是过滤表中的行。因此,表中所有的行都会被分区然后返回。
fetchSize JDBC fetch size,决定每次获取多少行数据。在JDBC驱动上设成较小的值有利于性能优化(如,Oracle上设为10)

Spark SQL连接MYSQL查询的例子。
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://172.29.20.35:3306/xedk"
val user = "xedk"
val pwd = "admin"

val conf = new SparkConf().setMaster("local[2]").setAppName("sparkSQL connMySQL")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

//注意:集群上运行时,一定要添加这句话,否则会报找不到mysql驱动的错误
val prop = new Properties()
prop.put("driver", "com.mysql.jdbc.Driver")

val jdbcMYSQL = sqlContext.read.format("jdbc").options(
  Map("url" -> url,
      "user" -> user,
      "password" -> pwd,
      "driver" -> driver,
      "dbtable" -> "cr_customer")
).load()

jdbcMYSQL.registerTempTable("cr_customer")
//字段区分大小写
sqlContext.sql("select CIFNO,CIFNAME from cr_customer").collect().take(10).foreach(println)

10.2.6 疑难解答
JDBC driver class必须在所有client session或者executor上,对java的原生classloader可见。这是因为Java的DriverManager在打开一个连接之前,会做安全检查,并忽略所有对原声classloader不可见的driver。最简单的一种方法,就是在所有worker节点上修改compute_classpath.sh,并包含你所需的driver jar包。
一些数据库,如H2,会把所有的名字转大写。对于这些数据库,在Spark SQL中必须也使用大写。
10.3 性能调优
对于有一定计算量的Spark作业来说,可能的性能改进的方式,不是把数据缓存在内存里,就是调整一些开销较大的选项参数。
10.3.1 内存缓存
Spark SQL可以通过调用SQLContext.cacheTable(“tableName”)或者DataFrame.cache()把tables以列存储格式缓存到内存中。随后,Spark SQL将会扫描必要的列,并自动调整压缩比例,以减少内存占用和GC压力。你也可以用SQLContext.uncacheTable(“tableName”)来删除内存中的table。
你还可以使用SQLContext.setConf 或在SQL语句中运行SET key=value命令,来配置内存中的缓存。
属性名 默认值 含义
spark.sql.inMemoryColumnarStorage.compressed TRUE 如果设置为true,Spark SQL将会根据数据统计信息,自动为每一列选择单独的压缩编码方式。
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制列式缓存批量的大小。增大批量大小可以提高内存利用率和压缩率,但同时也会带来OOM(Out Of Memory)的风险。

10.3.2 其他配置选项
以下选项同样也可以用来给查询任务调性能。不过这些选项在未来可能被放弃,因为spark将支持越来越多的自动优化。
10.4 分布式SQL引擎
10.4.1 运行Thrift JDBC/ODBC server
10.4.2 使用Spark SQL命令行工具

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics