`

kakfa offset

 
阅读更多
package cn.analysys.stream.state

import java.nio.ByteBuffer

import cn.analysys.meta.MetaMapInfo
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import cn.analysys.third.hllc.HLLCounter
import cn.analysys.utils.{BridgeUtils, ParseUtils}
import org.apache.spark.streaming.State
import org.apache.spark.streaming.dstream.DStream


object StateProcess {
  val M = 10
  val TOPN = 20
  val MINUTECOMBINE = 300000
  val TIMEOUTMINUTE = 1440
  val OUTPUT = 100
  val funcLongAdd = (key: String, tempId: Option[Long], state: State[Long]) => {
    val sum = state.getOption.getOrElse(0L) + tempId.getOrElse(1l)
    val hllc: HLLCounter = null
    val output = (key, (sum, hllc))
    state.update(sum)
    output
  }

  def isNewKey(key: String, hllc: HLLCounter): Boolean = {
    var isNew = false
    val begin = hllc.getCountEstimate
    hllc.add(key)
    val end = hllc.getCountEstimate
    if (end > begin) isNew = true
    isNew
  }


  val appKeyDeviceIdessionIDDistinct = (key: Int, loadPage: Option[LoadPageClassType], state: State[HLLCounter]) => {
    val hllc = state.getOption().getOrElse(new HLLCounter(M))
    var page: LoadPageClassType = null
    val begin = hllc.getCountEstimate
    hllc.add(key)
    val end = hllc.getCountEstimate
    if (end > begin) page = loadPage.getOrElse(null)
    val output = page
    state.update(hllc)
    output
  }

  // 为了减少后面的处理数据,把出现过的基数设置为Null。
  val funcHllcAdd = (key: String, tempId: Option[String], state: State[HLLCounter]) => {
    val hllc = state.getOption().getOrElse(new HLLCounter(M))
    val begin = hllc.getCountEstimate
    hllc.add(tempId.getOrElse(""))
    val end = hllc.getCountEstimate
    val output = (if (end > begin) key else "", (hllc.getCountEstimate, hllc))
    state.update(hllc)
    output
  }

  //所有 KV 返回的都是 一个 String ,一个 Long 或者 hllc
  //约定:对于输出的Key ,根据Key 的MetaMapInfo.SEG 区分如果是两段,就存K:V 结构,如果是三段,就存 K:K:V结构
  //KVType : MetaMapInfo.KEYTYPEKV ,MetaMapInfo.KEYTYPEKKV
  //考虑 redis 的压力。 1、两段式阶段的会根据 KEY 或者最大值。写入存储
  //                  2,三段式,会根据 第三个字段排序,获取前20.并且把所有的维度的值得总和,记录一下。写入库里面
  def longDstreamSortAndOutPut(resultDstreamOrg: DStream[(String, (Long, HLLCounter))], KVType: String,
                               isHLLC: Boolean, sorted: Boolean = false) = {
    //debug
    //    if(isHLLC)
    //      resultDstream.foreachRDD(rRdd => rRdd.take(5).foreach(x => println(s" key : ${x._1}   hllc value ${x._2._2.getCountEstimate } ")))
    //    else resultDstream.foreachRDD(rRdd => rRdd.take(5).foreach(x => println(s" key : ${x._1}  long   value ${x._2._1}")))
    val resultDstream =  resultDstreamOrg.filter(result =>  result._2._1>OUTPUT  )
    resultDstream.foreachRDD(resultRdd => {
      val comparedTopRdd = resultRdd.reduceByKey((a, b) => if (a._1 > b._1) a else b) // mapwithstate 过来的记录和原记录是一样多的。
      if (MetaMapInfo.KEYTYPEKV.equals(KVType)) {
        // 两段式 KEY 直接写
        comparedTopRdd.foreachPartition(partition => {
          BridgeUtils.initRedis(MetaMapInfo.URL, MetaMapInfo.TABLENAME, MetaMapInfo.USERNAME, MetaMapInfo.PASSWORD)
          if (isHLLC) partition.foreach(KV =>  BridgeUtils.putRedis(KV._1, getByteFromHllc(KV._2._2)))
          else partition.foreach(KV =>  BridgeUtils.putRedis(KV._1, KV._2._1))
        })
      } else {
        //三段样式 要排序
        val dimentionRdd = comparedTopRdd.filter(
          kv => kv._1.contains(MetaMapInfo.DIMSEG) && kv._1.split(MetaMapInfo.DIMSEG).size == 2)
          .map(kv => {
            val dims = kv._1.split(MetaMapInfo.DIMSEG)
            (dims(0), (dims(1), kv._2))
          })
        // top 20 需要排序的
        if (sorted) {
          dimentionRdd.groupByKey().map(keyValueCollect => {
            val key = keyValueCollect._1
            val valueCollection = keyValueCollect._2
            val sortedKeyValue = valueCollection.toArray.sortWith(_._2._1 > _._2._1)
            (key, if (sortedKeyValue.size > TOPN) sortedKeyValue.take(TOPN) else sortedKeyValue)
          }).foreachPartition(partition => {
            BridgeUtils.initRedis(MetaMapInfo.URL, MetaMapInfo.TABLENAME, MetaMapInfo.USERNAME, MetaMapInfo.PASSWORD)
            partition.foreach(kvCollection => {
              val key = kvCollection._1
              // val sumValue = kvCollection._2
              val dimValue = kvCollection._2
              // BridgeUtils.putRedis(key, MetaMapInfo.ALLDIMENSIONCOUNT, sumValue)
              if (isHLLC) dimValue.foreach(kv =>  if(kv._2._2!=null) BridgeUtils.putRedis(key, kv._1, getByteFromHllc(kv._2._2)))
              else dimValue.foreach(kv => BridgeUtils.putRedis(key, kv._1, kv._2._1))
            })
          })
        }
        else {
          dimentionRdd.foreachPartition(partition => {
            BridgeUtils.initRedis(MetaMapInfo.URL, MetaMapInfo.TABLENAME, MetaMapInfo.USERNAME, MetaMapInfo.PASSWORD)
            partition.foreach(kvCollection => {
              val key = kvCollection._1
              val dimValue = kvCollection._2
              if (isHLLC&&dimValue._2._2!=null)  BridgeUtils.putRedis(key, dimValue._1, getByteFromHllc(dimValue._2._2))
              else  BridgeUtils.putRedis(key, dimValue._1, dimValue._2._1)
            }
            )
          })
        }
      }
    })
  }


  def getByteFromHllc(hllc: HLLCounter): Array[Byte] = {
    val out1 = ByteBuffer.allocate(hllc.maxLength())
    hllc.writeRegisters(out1)
    out1.array()
  }

  def getHllcFromByte(bytes: Array[Byte], compressMode: Int): HLLCounter = {
    val hllCounter = new HLLCounter(compressMode)
    hllCounter.readRegisters(ByteBuffer.wrap(bytes))
    hllCounter
  }


  def createContext(brokers: String, topics: String, batchseconds: Int, checkpointDirectory: String
                    , DataType: String, calDate: String, offset: String): StreamingContext = {

    println(s" createContext \n  " +
      s" calbrokersDate : ${brokers} \n  " +
      s" topics : ${topics} \n  " +
      s" batchseconds : ${batchseconds}  \n " +
      s" checkpointDirectory : ${checkpointDirectory}  \n " +
      s" DataType : ${DataType}  \n " +
      s" calDate : ${calDate}  \n " +
      s" offset : ${offset}   ")

    val sparkConf = new SparkConf().setAppName("ArkStreamApp")
      .set("spark.streaming.backpressure.enabled","true")
      .set("spark.streaming.kafka.maxRatePerPartition","5")

    val ssc = new StreamingContext(sparkConf, Seconds(batchseconds))
    ssc.checkpoint(checkpointDirectory)
    val topicsSet = topics.split(",").toSet
    var kafkaParams = Map[String, String](
      "metadata.broker.list" -> brokers
      , "auto.offset.reset" -> "largest"
    )
    if ("smallest".equals(offset)) kafkaParams = kafkaParams.updated("auto.offset.reset", "smallest")


    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    var offsetRanges = Array.empty[OffsetRange]

    messages.transform(r => {
      offsetRanges = r.asInstanceOf[HasOffsetRanges].offsetRanges
      for (o <- offsetRanges) {
        println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
      }
      r
    }).count().print()


//    val dataInfoStream = messages.flatMap(
//      record => ParseUtils.parseArray(record._2, DataType)
//    ).filter(record => record.dataType.equals(MetaMapInfo.PINFO)) //只处理 pinfo数据
//      .map(record => {
//      var mapData = record.dataMap
//      mapData = ParseUtils.eventTimeProcess(mapData) //时间处理
//      mapData = ParseUtils.appIDProcess(mapData) // appID 处理
//      mapData = ParseUtils.sourceProcess(mapData) //来源类型处理
//      mapData
//    }).filter(dataMap =>
//      (dataMap.contains(MetaMapInfo.H5MAP("ApplicationKey"))
//        && dataMap(MetaMapInfo.H5MAP("ApplicationKey")) != null
//        && !(dataMap(MetaMapInfo.H5MAP("ApplicationKey")).isEmpty)
//        && dataMap("APPID") != null
//        && !dataMap("APPID").isEmpty
//        && dataMap("APPID") != "-1"
//        && dataMap("APPID").equals("100024")
//        && (!(dataMap("eventTime").isEmpty))
//        && dataMap("eventTime").contains(calDate)))



    //dataInfoStream.foreachRDD( r => r.take(100).foreach( record =>  println(s" dataInfoStream2 ${record("eventTime")} ")))
//   dataInfoStream.count().print
    // business type:
    // 一、计算业务 UV 类, COUNT(DISTINCT TEMPID)
    //val UV = "UV"
    //val REGION_PROVINCE = "PROVINCE"
    //val REGION_CITY = "CITY"
    //val DEVICE_TYPE = "DEVICE-TYPE"
    //DstreamUVProcess.uvDStreamProcess(dataInfoStream)
    //DstreamUVProcess.uvDStreamProcessDim(dataInfoStream)
    // 二、这个就是一个统计量
    //val PAGE_COUNT = "PAGE-COUNT"
    //DstreamPagePvProcess.pagePVDStreamProcess(dataInfoStream)
    // 三、 着陆页 相关统计
    //val SOURCE_TYPE = "SOURCE-TYPE"
    //val SOURCE_AD = "SOURCE-AD"
    //val SOURCE_KEYWORD = "SOURCE-KEYWORD"
    //val LOAD_PAGE_INCOUNT = "LOAD-PAGE-INCOUNT"
    //DstreamLoadPageProcess.loadPageDStreamProcess(dataInfoStream)
    ssc
  }

}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics