`

struct streaming SQL udf udaf

 
阅读更多
spark  aggregator



  class HllcdistinctByte extends Aggregator[Row, HLLCounter, Array[Byte]] {
    // A zero value for this aggregation. Should satisfy the property that any b + zero = b
    def zero: HLLCounter = new HLLCounter(14)
    // Combine two values to produce a new value. For performance, the function may modify `buffer`
    // and return it instead of constructing a new object
    def reduce(buffer: HLLCounter, employee: Row): HLLCounter = {
      buffer.add(employee.getString(2))
      buffer
    }
    // Merge two intermediate values
    def merge(b1: HLLCounter, b2: HLLCounter): HLLCounter = {
      b1.merge(b2)
      b1
    }
    // Transform the output of the reduction
    def finish(reduction: HLLCounter): Array[Byte] =  {
      val out1 = ByteBuffer.allocate(reduction.maxLength())
      reduction.writeRegisters(out1)
      out1.array()
    }
    // Specifies the Encoder for the intermediate value type
    def bufferEncoder: Encoder[HLLCounter] = Encoders.javaSerialization
    // Specifies the Encoder for the final output value type
    def outputEncoder: Encoder[Array[Byte]] = Encoders.BINARY
  }


使用:
  val uvbytes =  new  HllcdistinctByte().toColumn
      val uvb =  wordsDataFrame.where("event_id = '2001'").groupByKey(_.getString(0)).agg(uvbytes)


但是 UDaf 的 Byte 的类型是固定的

  class HLLCUDAFByte extends UserDefinedAggregateFunction{ //ctrl+I实现复写方法
  override def inputSchema: StructType = StructType(Array(StructField("input", StringType, true)))
    override def bufferSchema: StructType = StructType(Array(StructField("hllcbyte",BinaryType , true)))
    override def dataType: DataType = BinaryType
    override def deterministic: Boolean = true
    override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0)=  {
      val hllc = new HLLCounter(14)
      val bytes1 = ByteBuffer.allocate(hllc.maxLength())
      hllc.writeRegisters(bytes1)
      bytes1.array
    }}
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
      val hllc = new HLLCounter(14)
      hllc.readRegisters(ByteBuffer.wrap(buffer.getAs[Array[Byte]](0)))
      hllc.add(input.getAs[String](0))
      val bytes1 = ByteBuffer.allocate(hllc.maxLength())
      hllc.writeRegisters(bytes1)
      buffer(0) =  bytes1.array
    }
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
      val hllc = new HLLCounter(14)
      hllc.readRegisters(ByteBuffer.wrap(buffer1.getAs[Array[Byte]](0)))
      val hllc2 = new HLLCounter(14)
      hllc2.readRegisters(ByteBuffer.wrap(buffer2.getAs[Array[Byte]](0)))
      hllc.merge(hllc2)
      val bytes1 = ByteBuffer.allocate(hllc.maxLength())
      hllc.writeRegisters(bytes1)
      buffer1(0) =  bytes1.array
    }
    override def evaluate(buffer: Row): Any = buffer.getAs[Array[Byte]](0)
  }

这里序列化 ,反序列化要很多资源。 如果自定义 类型 ?
  spark.udf.register("HLLCUDAFInt", new  HLLCUDAFInt()  )


分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics