`

交流 code 09-18

 
阅读更多
object  DataProcess extends  App {
  val spark = SparkSession
    .builder()
    .appName("UserBehiviorToHHDataPartition")
    .getOrCreate()
  val dataCollection = spark.sparkContext.textFile("./testdata/ods")
  import spark.implicits._
  val mergePartitonStroragePath = "./testdata/dwd2"
  val datadf = dataCollection.map( r => r.split("\t")).map(r => (r(0).toInt,(r(1).toLong/1000).toInt,r(2).toInt,r(4),r(5)))
    .toDF("user_id","event_time","behivior_id","behivior_pop","record_date").filter($"user_id" === 4707776)
  datadf.write.format("parquet").mode("overwrite").partitionBy("record_date").save(mergePartitonStroragePath)
  spark.stop()

}


class JsonInfoGetString extends UDF2[String, String, String] {
  def call(jsonInfo: String, keyName: String): String = {
    var value: String = ""
    try {
      val typeOfHashMap: Type = new TypeToken[java.util.Map[String, String]]() {}.getType
      val map: java.util.Map[String, String] = FunnelUtil.gson.fromJson(jsonInfo, typeOfHashMap)
      if (map.containsKey(keyName))
        value = map.get(keyName).toString
    } catch {
      case ex: Throwable =>
    }
    value
  }
}

class JsonInfoGetLong extends UDF2[String, String, Long] {
  def call(jsonInfo: String, keyName: String): Long = {
    var value: Long = 0l
    try {
      val typeOfHashMap: Type = new TypeToken[java.util.Map[String, Object]]() {}.getType
      val map: java.util.Map[String, Object] = FunnelUtil.gson.fromJson(jsonInfo, typeOfHashMap)
      if (map.containsKey(keyName))
        value = map.get(keyName).toString.toFloat.toLong
    } catch {
      case ex: Throwable =>
    }
    value
  }
}


class JsonInfoGetDouble extends UDF2[String, String, Float] {
  def call(jsonInfo: String, keyName: String): Float = {
    var value: Float = 0.0f
    try {
      val typeOfHashMap: Type = new TypeToken[java.util.Map[String, Object]]() {}.getType
      val map: java.util.Map[String, Object] = FunnelUtil.gson.fromJson(jsonInfo, typeOfHashMap)
      if (map.containsKey(keyName))
        value = map.get(keyName).toString.toFloat
    } catch {
      case ex: Throwable =>
    }
    value
  }
}


    def benchTest(name:String,sql:String): Unit ={
      val b2 = System.currentTimeMillis()
      spark.sql(sql).show(10,false)
      val e2 = System.currentTimeMillis()
      println(s"name $name ,  cost:  ${e2-b2}")
    }
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics