- 浏览: 119795 次
- 性别:
- 来自: 杭州
文章分类
最新评论
org.apache.spark.sql.hive.thriftserver.server.UdfLoadUtils
package org.apache.spark.sql.hive.thriftserver.server
import org.apache.spark.SparkFiles
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types.{DataType, DataTypes}
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
object UdfLoadUtils {
var configArray: Array[String] = getConfigArray
def udfRegister( spark: SparkSession): Unit = {
// name,classname,returnType(udf need)
configArray.foreach(record => {
val registerInfoArray = record.split(",")
println(s"register udf info : $record")
if (registerInfoArray.size == 2) {
val Array(udfName, className) = registerInfoArray
val instance = getUDAFInstanceByClass(className)
spark.sqlContext.udf.register(udfName, instance)
} else if (registerInfoArray.size == 3) {
val Array(udfName, className, returnType) = registerInfoArray
var returnDataType: DataType = null
returnType match {
// Numeric types
case "ByteType" => returnDataType = DataTypes.ByteType
case "ShortType" => returnDataType = DataTypes.ShortType
case "IntegerType" => returnDataType = DataTypes.IntegerType
case "LongType" => returnDataType = DataTypes.LongType
case "FloatType" => returnDataType = DataTypes.FloatType
case "DoubleType" => returnDataType = DataTypes.DoubleType
//case "DecimalType" => returnDataType = DecimalType
// String types
case "StringType" => returnDataType = DataTypes.StringType
// Binary type
case "BinaryType" => returnDataType = DataTypes.BinaryType
// Boolean type
case "BooleanType" => returnDataType = DataTypes.BooleanType
// Datetime type
case "TimestampType" => returnDataType = DataTypes.TimestampType
case "DateType" => returnDataType = DataTypes.DateType
// Complex types
//case "ArrayType" => returnDataType = ArrayType
//case "MapType" => returnDataType = MapType
//case "StructType" => returnDataType = StructType
case _ => None
}
spark.sqlContext.udf.registerJava(udfName, className, returnDataType)
}
})
}
def getUDAFInstanceByClass(className: String): UserDefinedAggregateFunction = {
var instance: UserDefinedAggregateFunction = null
try {
instance = Class.forName(className).newInstance.asInstanceOf[UserDefinedAggregateFunction]
} catch {
case ex: Throwable => {
println(s" instance $className error ,error info : ${ex.getCause} ...................... ")
ex.printStackTrace()
}
}
instance
}
def getConfigArray():Array[String] ={
val configArray = new ArrayBuffer[String]()
try {
println(s"SparkFiles config.properties , path :" + SparkFiles.get("udf.config"))
val source = Source.fromFile(SparkFiles.get("udf.config"))
val sparkFiles = source.getLines().toArray
configArray ++= sparkFiles
println(s"SparkFiles udf.config , path : SparkFiles.get(udf.config) done!")
} catch {
case x: Throwable =>
}
try {
println(s"local config.properties , path : ./udf.config")
val source = Source.fromFile("./udf.config")
val localFiles = source.getLines().toArray
if(configArray.size == 0 ) configArray ++= localFiles
//localFiles.foreach(kv => println(s"localFiles config pop : key ${kv._1} ,value ${kv._2} "))
println(s"local udf.config , path : ./udf.config done!")
} catch {
case x: Throwable =>
}
try {
val path = SparkFiles.getRootDirectory() + "/udf.config"
println(s"SparkFilesroot udf.config , path : ${path}")
val source = Source.fromFile(path)
val sparkFilesroot = source.getLines().toArray
if(configArray.size == 0 ) configArray ++= sparkFilesroot
println(s"sparkFilesroot udf.config , path : ./udf.config done!")
} catch {
case x: Throwable =>
}
configArray.toArray
}
}
org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.thriftserver.server
import java.util.{Map => JMap}
import java.util.concurrent.ConcurrentHashMap
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager}
import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation}
/**
* Executes queries using Spark SQL, and maintains a list of handles to active queries.
*/
private[thriftserver] class SparkSQLOperationManager()
extends OperationManager with Logging {
val handleToOperation = ReflectionUtils
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]()
val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]()
var udfNotInited = true
override def newExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
async: Boolean): ExecuteStatementOperation = synchronized {
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
s" initialized or had already closed.")
if(udfNotInited) {
UdfLoadUtils.udfRegister(sqlContext.sparkSession)
udfNotInited = false
}
val conf = sqlContext.sessionState.conf
val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
runInBackground)(sqlContext, sessionToActivePool)
handleToOperation.put(operation.getHandle, operation)
logDebug(s"Created Operation for $statement with session=$parentSession, " +
s"runInBackground=$runInBackground")
operation
}
}
package org.apache.spark.sql.hive.thriftserver.server
import org.apache.spark.SparkFiles
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types.{DataType, DataTypes}
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
object UdfLoadUtils {
var configArray: Array[String] = getConfigArray
def udfRegister( spark: SparkSession): Unit = {
// name,classname,returnType(udf need)
configArray.foreach(record => {
val registerInfoArray = record.split(",")
println(s"register udf info : $record")
if (registerInfoArray.size == 2) {
val Array(udfName, className) = registerInfoArray
val instance = getUDAFInstanceByClass(className)
spark.sqlContext.udf.register(udfName, instance)
} else if (registerInfoArray.size == 3) {
val Array(udfName, className, returnType) = registerInfoArray
var returnDataType: DataType = null
returnType match {
// Numeric types
case "ByteType" => returnDataType = DataTypes.ByteType
case "ShortType" => returnDataType = DataTypes.ShortType
case "IntegerType" => returnDataType = DataTypes.IntegerType
case "LongType" => returnDataType = DataTypes.LongType
case "FloatType" => returnDataType = DataTypes.FloatType
case "DoubleType" => returnDataType = DataTypes.DoubleType
//case "DecimalType" => returnDataType = DecimalType
// String types
case "StringType" => returnDataType = DataTypes.StringType
// Binary type
case "BinaryType" => returnDataType = DataTypes.BinaryType
// Boolean type
case "BooleanType" => returnDataType = DataTypes.BooleanType
// Datetime type
case "TimestampType" => returnDataType = DataTypes.TimestampType
case "DateType" => returnDataType = DataTypes.DateType
// Complex types
//case "ArrayType" => returnDataType = ArrayType
//case "MapType" => returnDataType = MapType
//case "StructType" => returnDataType = StructType
case _ => None
}
spark.sqlContext.udf.registerJava(udfName, className, returnDataType)
}
})
}
def getUDAFInstanceByClass(className: String): UserDefinedAggregateFunction = {
var instance: UserDefinedAggregateFunction = null
try {
instance = Class.forName(className).newInstance.asInstanceOf[UserDefinedAggregateFunction]
} catch {
case ex: Throwable => {
println(s" instance $className error ,error info : ${ex.getCause} ...................... ")
ex.printStackTrace()
}
}
instance
}
def getConfigArray():Array[String] ={
val configArray = new ArrayBuffer[String]()
try {
println(s"SparkFiles config.properties , path :" + SparkFiles.get("udf.config"))
val source = Source.fromFile(SparkFiles.get("udf.config"))
val sparkFiles = source.getLines().toArray
configArray ++= sparkFiles
println(s"SparkFiles udf.config , path : SparkFiles.get(udf.config) done!")
} catch {
case x: Throwable =>
}
try {
println(s"local config.properties , path : ./udf.config")
val source = Source.fromFile("./udf.config")
val localFiles = source.getLines().toArray
if(configArray.size == 0 ) configArray ++= localFiles
//localFiles.foreach(kv => println(s"localFiles config pop : key ${kv._1} ,value ${kv._2} "))
println(s"local udf.config , path : ./udf.config done!")
} catch {
case x: Throwable =>
}
try {
val path = SparkFiles.getRootDirectory() + "/udf.config"
println(s"SparkFilesroot udf.config , path : ${path}")
val source = Source.fromFile(path)
val sparkFilesroot = source.getLines().toArray
if(configArray.size == 0 ) configArray ++= sparkFilesroot
println(s"sparkFilesroot udf.config , path : ./udf.config done!")
} catch {
case x: Throwable =>
}
configArray.toArray
}
}
org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.thriftserver.server
import java.util.{Map => JMap}
import java.util.concurrent.ConcurrentHashMap
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager}
import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation}
/**
* Executes queries using Spark SQL, and maintains a list of handles to active queries.
*/
private[thriftserver] class SparkSQLOperationManager()
extends OperationManager with Logging {
val handleToOperation = ReflectionUtils
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]()
val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]()
var udfNotInited = true
override def newExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
async: Boolean): ExecuteStatementOperation = synchronized {
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
s" initialized or had already closed.")
if(udfNotInited) {
UdfLoadUtils.udfRegister(sqlContext.sparkSession)
udfNotInited = false
}
val conf = sqlContext.sessionState.conf
val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
runInBackground)(sqlContext, sessionToActivePool)
handleToOperation.put(operation.getHandle, operation)
logDebug(s"Created Operation for $statement with session=$parentSession, " +
s"runInBackground=$runInBackground")
operation
}
}
发表评论
-
Spark SQL运行 过程 抄的别人的,记录 学习
2018-05-13 23:07 1005抄的别人的,觉得写的特别好 val FILESOURCE ... -
thriftserver log4j.properties 生效
2018-04-09 11:46 418/home/isuhadoop/spark2/sbin/sta ... -
udaf 返回的 子属性
2018-03-20 13:22 413udaf 返回的 子属性 spark.sql(" ... -
spark datasource
2018-03-16 16:36 635DataFrameWriter format val c ... -
如何 map 端 Join。
2018-03-04 19:31 590Hive 中 修改表的 rawDataSize = 1 1 ... -
hive hbase thriftserver run
2018-03-03 15:13 369正确方法 : 0\ 拷贝对应目录到 spark2 jars ... -
scala package
2018-01-25 09:48 482#scala 打包 mvn clean scala:com ... -
SPARK SERVER
2018-01-23 22:15 493sbin/start-thriftserver.sh --dr ... -
driver class
2018-01-21 22:11 494sbin/start-thriftserver.sh -- ... -
spark thrift server 调试
2017-10-20 15:50 829spark-hive-thriftserver 本地调试 ... -
spark SQL conf
2017-10-18 14:36 585org.apache.spark.sql.internal.S ... -
java 死锁 ,内存问题 分析
2017-10-17 10:50 312jstack -l pid /opt/soft/jdk/ ... -
thriftServer proxy
2017-10-16 14:21 898sudo yum install haproxy 257 ... -
hive spark conf
2017-09-26 17:44 1270CREATE TABLE org_userbehavior_a ... -
get day
2017-09-19 08:41 227def timeDayNow() = { var ... -
thriftserver
2017-09-14 19:47 425export SPARK_CONF_DIR=/home/yun ... -
thriftserver dynamicallocation
2017-09-08 14:41 548./sbin/start-thriftserver.sh -- ... -
test code2
2017-09-03 13:45 459package org.test.udf import co ... -
test code
2017-08-24 17:52 259def taskcal(data:Array[(String, ... -
struct streaming SQL udf udaf
2017-08-22 11:50 647spark aggregator class H ...
相关推荐
本设计源码提供了一个基于Scala的Spark Thrift Server。项目包含12731个文件,主要使用Scala、Java、Python、Shell、JavaScript、CSS、HTML、Ruby和C编程语言。文件类型包括3539个Scala源代码文件、1559个Q文件、...
支持mysql8.x,使用utf8mb4编码。
spark-hive-thriftserver_2.11-2.1.spark-hive-thrift
SparkSQL的分布式执行引擎(Spark ThriftServer)
spark-hive_2.11-2.3.0...spark-hive-thriftserver_2.11-2.3.0.jar log4j-2.15.0.jar slf4j-api-1.7.7.jar slf4j-log4j12-1.7.25.jar curator-client-2.4.0.jar curator-framework-2.4.0.jar curator-recipes-2.4.0.jar
spark和hive结合依赖,如何使用请看我博客https://blog.csdn.net/z1987865446/article/details/109372818
thrift特性、不支持的特性、对各个语言的支持情况、语法参考、Thrift 架构、协议、传输层、服务端类型、各种thriftServer实现的比较、Thrift对多接口服务的支持
蜜蜂用于R的Hive客户端,它也与Spark的ThriftServer兼容。 与其他此类库不同,Bee不需要运行rJava,客户端JVM或任何其他服务。 Bee通过C ++节俭库与HiveServer交互,从而使客户端依赖性最小化,并且处理速度很快。...
一组用thrift写的Java RPC框架,是两个maven项目,直接可以用,可以传字符串。 方便初学者了解thrift RPC各部分的原理,如果想自己加功能,就要新建XX.thrift文件,定义好接口,用用thrift生成
戈欣spark-thriftserver 的 go 客户端库
modified thrift server , support rest , http protocol with json content . support multi oracle db
thrift-server 最简单的测试案例
必须在 HBase 服务中配置 Thrift Server 角色以使用 Hue HBase Browser 应用程序。 【解决办法】 解决办法就是在HBase中配置Thrift Server服务,具体操作如下: (1)点击进入HBase详细界面 (2)操作-->...
pyhs2是一个python连接hive server/spark thrift server的客户端。
该包可以启动spark的thriftserver。可以解决报错failed load org.apache.spark.sql.hive.thriftserver.HiveThriftServer2的报错。
1.Spark及其生态圈简介.pdf 2.Spark编译与部署(上)--基础环境搭建.pdf 2.Spark编译与部署(下)--Spark编译安装.pdf 2.Spark编译与部署(中)--Hadoop编译安装.pdf 3.Spark编程模型(上)--概念及SparkShell实战....
NULL 博文链接:https://shift-alt-ctrl.iteye.com/blog/1990026
spark和hive结合依赖,如何使用请看我博客https://blog.csdn.net/z1987865446/article/details/109372818
使用Apache thrift通信框架编写的一个实例,以及提供开发中使用thrift所需的jar包
注意:Thrift Server尚不成熟,在v1.0.0之前仍将进行一些重大更改。 1.0.0之前的经验法则是,次要发行版可能包含重大更改,但补丁程序版本不包括。 发展 要在本地构建和运行thrift-server ,您可以按照以下步骤操作...