- 浏览: 121551 次
- 性别:
- 来自: 杭州
-
文章分类
最新评论
def taskcal(data:Array[(String,Long)],rt:Array[String],wd:Int):Array[Boolean]={
val result = Array.fill[Boolean](rt.length)(false)
val sortData = data.sortBy(_._2)
val indexArrayLength = rt.length - 1
var startTimeArray = Array.fill[Long](rt.length)(0l)
val indexMap = rt.map(item => item -> rt.indexOf(item)).toMap
var notFull = true
for(itemWithTimeKv <- sortData if notFull ){
val itemIndex = indexMap(itemWithTimeKv._1)
if(itemIndex == 0) { startTimeArray(0) = itemWithTimeKv._2 ; result(0) = true} // first item
else if(startTimeArray(itemIndex-1) !=0) { // pre item exists?
if( (itemWithTimeKv._2 - startTimeArray(itemIndex-1))< wd) { // in range
startTimeArray(itemIndex) = startTimeArray(itemIndex-1)
result(itemIndex) = true
}else // out range
startTimeArray = Array.fill[Long](rt.length)(0l)
}
if(result(indexArrayLength) == true) notFull = false
}
result
}
def main(args:Array[String]): Unit = {
val data =Array(("A",1450000000000l),
("B",1450000000001l),
("C",1430000000002l),
("A",1460000000001l)
)
val rt = Array("A","B","C")
val wd = 3600000
println(taskcal(data,rt,wd).mkString(","))
// bench("r",100000,taskcal(data,rt,wd))
}
def bench(name:String,count:Int,f: => Unit): Unit ={
val begin = System.currentTimeMillis()
for(i <-0 to count) f
val end = System.currentTimeMillis()
println(s"name : ${name} count: $count count:${end - begin} ")
}
val storageDir = "UserBehaviorDStream"
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
val sqlContext = new SQLContext(ssc.sparkContext)
val getPartitionDate = udf(FunnelUtil.getDatePartiton _)
lines.foreachRDD(rdd => {
val userBehiviorData = rdd.map(x => {println("**:"+x);x.split(",")}).filter(_.length == 6)
.map(r => RowFactory.create(r(0), r(1), r(2), r(3), r(4), r(5)))
val userBehiviorDataDF = sqlContext.createDataFrame(userBehiviorData, getStructType)
val userBehiviorDataPartition = userBehiviorDataDF.withColumn("yyyyMMddHH", getPartitionDate(userBehiviorDataDF("eventTime"))).coalesce(1)
userBehiviorDataPartition.write.format("parquet").mode("append").partitionBy("yyyyMMddHH").save(storageDir)
})
ssc.start()
ssc.awaitTermination()
}
val getStructType = {
val structFields = mutable.ArrayBuffer[StructField]()
structFields += DataTypes.createStructField("userId", DataTypes.StringType, true)
structFields += DataTypes.createStructField("eventTime", DataTypes.StringType, true)
structFields += DataTypes.createStructField("itemId", DataTypes.StringType, true)
structFields += DataTypes.createStructField("itemName", DataTypes.StringType, true)
structFields += DataTypes.createStructField("eventAttribute", DataTypes.StringType, true)
structFields += DataTypes.createStructField("eventDate", DataTypes.StringType, true)
val structType = DataTypes.createStructType(structFields.toArray)
structType
}
val result = Array.fill[Boolean](rt.length)(false)
val sortData = data.sortBy(_._2)
val indexArrayLength = rt.length - 1
var startTimeArray = Array.fill[Long](rt.length)(0l)
val indexMap = rt.map(item => item -> rt.indexOf(item)).toMap
var notFull = true
for(itemWithTimeKv <- sortData if notFull ){
val itemIndex = indexMap(itemWithTimeKv._1)
if(itemIndex == 0) { startTimeArray(0) = itemWithTimeKv._2 ; result(0) = true} // first item
else if(startTimeArray(itemIndex-1) !=0) { // pre item exists?
if( (itemWithTimeKv._2 - startTimeArray(itemIndex-1))< wd) { // in range
startTimeArray(itemIndex) = startTimeArray(itemIndex-1)
result(itemIndex) = true
}else // out range
startTimeArray = Array.fill[Long](rt.length)(0l)
}
if(result(indexArrayLength) == true) notFull = false
}
result
}
def main(args:Array[String]): Unit = {
val data =Array(("A",1450000000000l),
("B",1450000000001l),
("C",1430000000002l),
("A",1460000000001l)
)
val rt = Array("A","B","C")
val wd = 3600000
println(taskcal(data,rt,wd).mkString(","))
// bench("r",100000,taskcal(data,rt,wd))
}
def bench(name:String,count:Int,f: => Unit): Unit ={
val begin = System.currentTimeMillis()
for(i <-0 to count) f
val end = System.currentTimeMillis()
println(s"name : ${name} count: $count count:${end - begin} ")
}
val storageDir = "UserBehaviorDStream"
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
val sqlContext = new SQLContext(ssc.sparkContext)
val getPartitionDate = udf(FunnelUtil.getDatePartiton _)
lines.foreachRDD(rdd => {
val userBehiviorData = rdd.map(x => {println("**:"+x);x.split(",")}).filter(_.length == 6)
.map(r => RowFactory.create(r(0), r(1), r(2), r(3), r(4), r(5)))
val userBehiviorDataDF = sqlContext.createDataFrame(userBehiviorData, getStructType)
val userBehiviorDataPartition = userBehiviorDataDF.withColumn("yyyyMMddHH", getPartitionDate(userBehiviorDataDF("eventTime"))).coalesce(1)
userBehiviorDataPartition.write.format("parquet").mode("append").partitionBy("yyyyMMddHH").save(storageDir)
})
ssc.start()
ssc.awaitTermination()
}
val getStructType = {
val structFields = mutable.ArrayBuffer[StructField]()
structFields += DataTypes.createStructField("userId", DataTypes.StringType, true)
structFields += DataTypes.createStructField("eventTime", DataTypes.StringType, true)
structFields += DataTypes.createStructField("itemId", DataTypes.StringType, true)
structFields += DataTypes.createStructField("itemName", DataTypes.StringType, true)
structFields += DataTypes.createStructField("eventAttribute", DataTypes.StringType, true)
structFields += DataTypes.createStructField("eventDate", DataTypes.StringType, true)
val structType = DataTypes.createStructType(structFields.toArray)
structType
}
发表评论
-
Spark SQL运行 过程 抄的别人的,记录 学习
2018-05-13 23:07 1016抄的别人的,觉得写的特别好 val FILESOURCE ... -
thriftserver log4j.properties 生效
2018-04-09 11:46 431/home/isuhadoop/spark2/sbin/sta ... -
udaf 返回的 子属性
2018-03-20 13:22 424udaf 返回的 子属性 spark.sql(" ... -
spark datasource
2018-03-16 16:36 648DataFrameWriter format val c ... -
如何 map 端 Join。
2018-03-04 19:31 605Hive 中 修改表的 rawDataSize = 1 1 ... -
spark thrift server 修改
2018-03-04 12:58 561org.apache.spark.sql.hive.thrif ... -
hive hbase thriftserver run
2018-03-03 15:13 385正确方法 : 0\ 拷贝对应目录到 spark2 jars ... -
scala package
2018-01-25 09:48 503#scala 打包 mvn clean scala:com ... -
SPARK SERVER
2018-01-23 22:15 507sbin/start-thriftserver.sh --dr ... -
driver class
2018-01-21 22:11 507sbin/start-thriftserver.sh -- ... -
spark thrift server 调试
2017-10-20 15:50 844spark-hive-thriftserver 本地调试 ... -
spark SQL conf
2017-10-18 14:36 598org.apache.spark.sql.internal.S ... -
java 死锁 ,内存问题 分析
2017-10-17 10:50 327jstack -l pid /opt/soft/jdk/ ... -
thriftServer proxy
2017-10-16 14:21 909sudo yum install haproxy 257 ... -
hive spark conf
2017-09-26 17:44 1283CREATE TABLE org_userbehavior_a ... -
get day
2017-09-19 08:41 311def timeDayNow() = { var ... -
thriftserver
2017-09-14 19:47 442export SPARK_CONF_DIR=/home/yun ... -
thriftserver dynamicallocation
2017-09-08 14:41 558./sbin/start-thriftserver.sh -- ... -
test code2
2017-09-03 13:45 468package org.test.udf import co ... -
struct streaming SQL udf udaf
2017-08-22 11:50 657spark aggregator class H ...
相关推荐
test code project for UI
pyopengl testcode demopyopengl testcode demopyopengl testcode demopyopengl testcode demopyopengl testcode demo
test code for video
testcode图书管理系统样例 testcode图书管理系统样例 testcode图书管理系统样例
11-2 testcode
php symphony test code
DNLINL代码可能有用maxim公司所用,正弦波测量高速模数转换器(ADC)的INLDNL。
CodedUI test code for excel
国际标准原文,国标依据此文件的第三版制订了《GBT 17421.2-2016 机床检验通则》。 这是第四版,高清英文版。 This part of ISO 230 specifies methods for testing and evaluating the accuracy and repeatability ...
主要展示了avrusb通信的主要几个函数的使用方法,这只是下位机程序即固件程序,代码中,有两种类的通信,一种是cdc类通信,另外是其他设备类
SAE J643_2023 Hydrodynamic Drive Test Code(STABILIZED Aug 2023).pdf
self test code for demo using
stock test code with zf
IEC 60704-2-18-2022 Household and similar electrical appliances - Test code for the determination of airborne acoustical noise -.pdf
达盛arm实验代码,初学者可以参考一下,基础
Test PHP Code
MyTestCode2021博客地址: :
读取文件的测试代码,year!my test code
mybatis批量更新,如果积分不够请留言,免费提供。https://javapub.blog.csdn.net/article/details/118033849
test account manage vc++ c++11