- 浏览: 120188 次
- 性别:
- 来自: 杭州
文章分类
最新评论
spark aggregator
class HllcdistinctByte extends Aggregator[Row, HLLCounter, Array[Byte]] {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
def zero: HLLCounter = new HLLCounter(14)
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: HLLCounter, employee: Row): HLLCounter = {
buffer.add(employee.getString(2))
buffer
}
// Merge two intermediate values
def merge(b1: HLLCounter, b2: HLLCounter): HLLCounter = {
b1.merge(b2)
b1
}
// Transform the output of the reduction
def finish(reduction: HLLCounter): Array[Byte] = {
val out1 = ByteBuffer.allocate(reduction.maxLength())
reduction.writeRegisters(out1)
out1.array()
}
// Specifies the Encoder for the intermediate value type
def bufferEncoder: Encoder[HLLCounter] = Encoders.javaSerialization
// Specifies the Encoder for the final output value type
def outputEncoder: Encoder[Array[Byte]] = Encoders.BINARY
}
使用:
val uvbytes = new HllcdistinctByte().toColumn
val uvb = wordsDataFrame.where("event_id = '2001'").groupByKey(_.getString(0)).agg(uvbytes)
但是 UDaf 的 Byte 的类型是固定的
class HLLCUDAFByte extends UserDefinedAggregateFunction{ //ctrl+I实现复写方法
override def inputSchema: StructType = StructType(Array(StructField("input", StringType, true)))
override def bufferSchema: StructType = StructType(Array(StructField("hllcbyte",BinaryType , true)))
override def dataType: DataType = BinaryType
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0)= {
val hllc = new HLLCounter(14)
val bytes1 = ByteBuffer.allocate(hllc.maxLength())
hllc.writeRegisters(bytes1)
bytes1.array
}}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val hllc = new HLLCounter(14)
hllc.readRegisters(ByteBuffer.wrap(buffer.getAs[Array[Byte]](0)))
hllc.add(input.getAs[String](0))
val bytes1 = ByteBuffer.allocate(hllc.maxLength())
hllc.writeRegisters(bytes1)
buffer(0) = bytes1.array
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val hllc = new HLLCounter(14)
hllc.readRegisters(ByteBuffer.wrap(buffer1.getAs[Array[Byte]](0)))
val hllc2 = new HLLCounter(14)
hllc2.readRegisters(ByteBuffer.wrap(buffer2.getAs[Array[Byte]](0)))
hllc.merge(hllc2)
val bytes1 = ByteBuffer.allocate(hllc.maxLength())
hllc.writeRegisters(bytes1)
buffer1(0) = bytes1.array
}
override def evaluate(buffer: Row): Any = buffer.getAs[Array[Byte]](0)
}
这里序列化 ,反序列化要很多资源。 如果自定义 类型 ?
spark.udf.register("HLLCUDAFInt", new HLLCUDAFInt() )
class HllcdistinctByte extends Aggregator[Row, HLLCounter, Array[Byte]] {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
def zero: HLLCounter = new HLLCounter(14)
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: HLLCounter, employee: Row): HLLCounter = {
buffer.add(employee.getString(2))
buffer
}
// Merge two intermediate values
def merge(b1: HLLCounter, b2: HLLCounter): HLLCounter = {
b1.merge(b2)
b1
}
// Transform the output of the reduction
def finish(reduction: HLLCounter): Array[Byte] = {
val out1 = ByteBuffer.allocate(reduction.maxLength())
reduction.writeRegisters(out1)
out1.array()
}
// Specifies the Encoder for the intermediate value type
def bufferEncoder: Encoder[HLLCounter] = Encoders.javaSerialization
// Specifies the Encoder for the final output value type
def outputEncoder: Encoder[Array[Byte]] = Encoders.BINARY
}
使用:
val uvbytes = new HllcdistinctByte().toColumn
val uvb = wordsDataFrame.where("event_id = '2001'").groupByKey(_.getString(0)).agg(uvbytes)
但是 UDaf 的 Byte 的类型是固定的
class HLLCUDAFByte extends UserDefinedAggregateFunction{ //ctrl+I实现复写方法
override def inputSchema: StructType = StructType(Array(StructField("input", StringType, true)))
override def bufferSchema: StructType = StructType(Array(StructField("hllcbyte",BinaryType , true)))
override def dataType: DataType = BinaryType
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0)= {
val hllc = new HLLCounter(14)
val bytes1 = ByteBuffer.allocate(hllc.maxLength())
hllc.writeRegisters(bytes1)
bytes1.array
}}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val hllc = new HLLCounter(14)
hllc.readRegisters(ByteBuffer.wrap(buffer.getAs[Array[Byte]](0)))
hllc.add(input.getAs[String](0))
val bytes1 = ByteBuffer.allocate(hllc.maxLength())
hllc.writeRegisters(bytes1)
buffer(0) = bytes1.array
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val hllc = new HLLCounter(14)
hllc.readRegisters(ByteBuffer.wrap(buffer1.getAs[Array[Byte]](0)))
val hllc2 = new HLLCounter(14)
hllc2.readRegisters(ByteBuffer.wrap(buffer2.getAs[Array[Byte]](0)))
hllc.merge(hllc2)
val bytes1 = ByteBuffer.allocate(hllc.maxLength())
hllc.writeRegisters(bytes1)
buffer1(0) = bytes1.array
}
override def evaluate(buffer: Row): Any = buffer.getAs[Array[Byte]](0)
}
这里序列化 ,反序列化要很多资源。 如果自定义 类型 ?
spark.udf.register("HLLCUDAFInt", new HLLCUDAFInt() )
发表评论
-
Spark SQL运行 过程 抄的别人的,记录 学习
2018-05-13 23:07 1008抄的别人的,觉得写的特别好 val FILESOURCE ... -
thriftserver log4j.properties 生效
2018-04-09 11:46 421/home/isuhadoop/spark2/sbin/sta ... -
udaf 返回的 子属性
2018-03-20 13:22 415udaf 返回的 子属性 spark.sql(" ... -
spark datasource
2018-03-16 16:36 638DataFrameWriter format val c ... -
如何 map 端 Join。
2018-03-04 19:31 594Hive 中 修改表的 rawDataSize = 1 1 ... -
spark thrift server 修改
2018-03-04 12:58 556org.apache.spark.sql.hive.thrif ... -
hive hbase thriftserver run
2018-03-03 15:13 373正确方法 : 0\ 拷贝对应目录到 spark2 jars ... -
scala package
2018-01-25 09:48 488#scala 打包 mvn clean scala:com ... -
SPARK SERVER
2018-01-23 22:15 497sbin/start-thriftserver.sh --dr ... -
driver class
2018-01-21 22:11 498sbin/start-thriftserver.sh -- ... -
spark thrift server 调试
2017-10-20 15:50 833spark-hive-thriftserver 本地调试 ... -
spark SQL conf
2017-10-18 14:36 587org.apache.spark.sql.internal.S ... -
java 死锁 ,内存问题 分析
2017-10-17 10:50 316jstack -l pid /opt/soft/jdk/ ... -
thriftServer proxy
2017-10-16 14:21 900sudo yum install haproxy 257 ... -
hive spark conf
2017-09-26 17:44 1275CREATE TABLE org_userbehavior_a ... -
get day
2017-09-19 08:41 254def timeDayNow() = { var ... -
thriftserver
2017-09-14 19:47 430export SPARK_CONF_DIR=/home/yun ... -
thriftserver dynamicallocation
2017-09-08 14:41 549./sbin/start-thriftserver.sh -- ... -
test code2
2017-09-03 13:45 465package org.test.udf import co ... -
test code
2017-08-24 17:52 260def taskcal(data:Array[(String, ...
相关推荐
适用于初学者的源代码 介绍了增删查改等功能
sqlserver+struct宾馆信息管理系统 sqlserver+struct宾馆信息管理系统
SQL2Struct是一款对golang开发者友好的chrome插件,根据在mysql中创建数据表的sql语句,自动生成golang中的struct,在golang开发者使用诸如gorm之类的框架时,可以很好的把mysql中的数据表与orm的结构体关联起来。
SQL2Struct SQL2Struct是一款对golang开发者友好的chrome插件,根据在mysql中创建数据表的sql语句,自动生成golang中的struct,在golang开发者使用诸如gorm之类的框架时,可以很好的把mysql中的数据表与orm的结构体...
Meddler是一个小型工具包,可以消除在SQL查询和结构之间来回移动数据时的一些乏味。 它不是完整的ORM。 Meddler旨在以一种轻量级的方式来增加ORM的一些便利,同时将更多的控制权交给程序员。 软件包文档可在以下...
一个J2EE购物网站的实现 运用struts1.2+hibernate+spring 框架,数据库连接池,事务管理;Struts 应用国际化,Struts 标签库与Tiles框架, JSTL标签库,Spring IOC。 采用优化性能技术,采用oscache缓存,freemarker静态...
personal sqlserver database struct
typedef struct 与 struct 的区别及初始化 typedef struct 与 struct 的区别及初始化 typedef struct 与 struct 的区别及初始化 typedef struct 与 struct 的区别及初始化 typedef struct 与 struct 的区别及初始化
https://github.com/whr-helen/go-struct-auto 自动构建工具使用 安装包命令:go get github.com/whr-helen/go-struct-auto 注释:参数信息 -host host改为自己数据库的地址(默认127.0.0.1) -port port改为...
struct typedef struct 区别
OA 项目(struct+hibernet+sql),java jsp java内部培训时做的,很适合学习框架的同志参考
struct2第一个struct2,简单的struct2例子,struct2,struct2
赠送jar包:mapstruct-1.1.0.Final.jar; 赠送原API文档:mapstruct-1.1.0.Final-javadoc.jar; 赠送源代码:mapstruct-1.1.0.Final-sources.jar; 赠送Maven依赖信息文件:mapstruct-1.1.0.Final.pom; 包含翻译后...
赠送jar包:mapstruct-1.2.0.Final.jar; 赠送原API文档:mapstruct-1.2.0.Final-javadoc.jar; 赠送源代码:mapstruct-1.2.0.Final-sources.jar; 赠送Maven依赖信息文件:mapstruct-1.2.0.Final.pom; 包含翻译后...
赠送jar包:mapstruct-1.3.1.Final.jar; 赠送原API文档:mapstruct-1.3.1.Final-javadoc.jar; 赠送源代码:mapstruct-1.3.1.Final-sources.jar; 赠送Maven依赖信息文件:mapstruct-1.3.1.Final.pom; 包含翻译后...
赠送jar包:mapstruct-1.3.1.Final.jar; 赠送原API文档:mapstruct-1.3.1.Final-javadoc.jar; 赠送源代码:mapstruct-1.3.1.Final-sources.jar; 赠送Maven依赖信息文件:mapstruct-1.3.1.Final.pom; 包含翻译后...
structtype&def_struct的使用及区别。
sqlstruct sqlstruct提供了一些方便的功能,可将结构体与go的数据库/ sql包一起使用 可以在找到文档
名称DBIx :: Struct-具有类:: Struct的行对象的便捷SQL函数概要use DBIx::Struct;DBIx::Struct::connect($data_source, $username, $auth);my $row = one_row("table", $idField);print $row->field;$row->field('...
java struct2 收集整理的 一些资料 比如struct1和struct2的区别概述等