`
Kevin12
  • 浏览: 230564 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Spark SQL on hive配置和实战

阅读更多
spark sql 官网:http://spark.apache.org/docs/latest/sql-programming-guide.html#starting-point-sqlcontext

首先要配置好hive,保存元数据到mysql中,参考:http://kevin12.iteye.com/blog/2280777

然后配置Spark SQL,
1.配置hive-site.xml
在master1上的/usr/local/spark/spark-1.6.0-bin-hadoop2.6/conf目录创建hive-site.xml文件,内容如下:
<configuration>
<property>
    <name>hive.metastore.uris</name>
    <value>thrift://master1:9083</value>
    <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
  </property>
</configuration>

这个配置信息从hive的目录$HIVE_HOME/conf/hive-default.xml.template文件中找到的,默认情况下的value是空值;
2.配置驱动
将$HIVE_HOME/lib/mysql-connector-java-5.1.35-bin.jar 中的mysql驱动拷贝到$SPARK_HOME/lib/下面即可。

注意:
因为之前我spark环境配置了Zookeeper,做HA,现在不在练习HA,将HA的配置去掉。如果不去掉,必须启动Zookeeper集群才可以单独一台节点上启动spark-shell等;
因为我在master1、worker1、worker2上安装了Zookeeper,所以,要将这三台节点上的$SPARK_HOME/conf/spark-env.sh文件中的SPARK_DAEMON_JAVA_OPTS去掉,并且将配置HA是注释的SPARK_MASTER_IP参数的注释去掉;
export SPARK_MASTER_IP=master1
#export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=master1:2181,worker1:2181,worker2:2181 -Dspark.deploy.zookeeper.dir=/spark"

3.启动hive的metastore后台进程
将日志打印到/root/logs/hive目录下,如果目录不存在则先创建
root@master1:~/logs# hive --service metastore >> /root/logs/hive/metastore.log 2>& 1&
因为配置了上面的hive.metastore.uris,所以必须启动hive的service metastore后台进程才可以执行./spark-shell --master spark://master1:7077和./spark-sql --master spark://master1:7077命令。
在$SPARK_HOME/bin下执行 ./spark-shell --master spark://master1:7077命令。
然后依次执行下面的命令
scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
scala> hiveContext.sql("show databases").collect.foreach(println)
scala> hiveContext.sql("use testdb").collect.foreach(println)
scala> hiveContext.sql("show tables").collect.foreach(println)
res5: org.apache.spark.sql.DataFrame = [tableName: string, isTemporary: boolean]
scala> hiveContext.sql("show tables").collect.foreach(println)
[student,false]
[student2,false]
[student3,false]
[student4,false]
[tbsogou,false]
[tmp_pre_hour_seach_info,false]
scala> hiveContext.sql("select d,count(*)cnt from tbsogou group by d ").collect.foreach(println)
当同时用hive和spark sql运行查询时,如果没有资源spark sql会打印下面的语句,一直等待资源释放,获取资源后会执行spark sql进行查询;
16/03/25 20:13:04 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
16/03/25 20:13:19 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
16/03/25 20:13:34 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources


先将$SPARK_HOME下面的examples/src/main/resources/中的所有文件都上传到hdfs的/library/examples/src/main/resources/目录中以备后用。

Starting Point: SQLContext

valsc:SparkContext// An existing SparkContext.valsqlContext=neworg.apache.spark.sql.SQLContext(sc)// this is used to implicitly convert an RDD to a DataFrame.importsqlContext.implicits._
Creating DataFrames

valsc:SparkContext// An existing SparkContext.valsqlContext=neworg.apache.spark.sql.SQLContext(sc)
valdf=sqlContext.read.json("examples/src/main/resources/people.json")// Displays the content of the DataFrame to stdoutdf.show()
DataFrame Operations

val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create the DataFrame
val df = sqlContext.read.json("examples/src/main/resources/people.json")

// Show the content of the DataFrame
df.show()
// age  name
// null Michael
// 30   Andy
// 19   Justin

// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// name
// Michael
// Andy
// Justin

// Select everybody, but increment the age by 1
df.select(df("name"), df("age") + 1).show()
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// Select people older than 21
df.filter(df("age") > 21).show()
// age name
// 30  Andy

// Count people by age
df.groupBy("age").count().show()
// age  count
// null 1
// 19   1
// 30   1


分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics