`
qindongliang1922
  • 浏览: 2146924 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:116311
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:124580
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:58448
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:70345
社区版块
存档分类
最新评论

使用Spark SQL的临时表解决一个小问题

阅读更多


最近在使用spark处理一个业务场景时,遇到一个小问题,我在scala代码里,使用spark sql访问hive的表,然后根据一批id把需要的数据过滤出来,本来是非常简单的需求直接使用下面的伪SQL即可:
````
select * from table where  id in (id1,id2,id3,id4,idn)
````



但现在遇到的问题是id条件比较多,大概有几万个,这样量级的in是肯定会出错的,看网上文章hive的in查询超过3000个就报错了。

如何解决?

主要有两种解决方法:


(一)分批执行,就是把几万个id,按3000一组查询一次,最后把所有的查询结果在汇合起来。


(二)使用join,把几万个id创建成一张hive表,然后两表关联,可以一次性把结果给获取到。




这里倾向于第二种解决办法,比较灵活和方便扩展,尽量不要把数据集分散,一旦分散意味着客户端需要做更多的工作来合并结果集,比如随便一个sum或者dinstict,如果是第一种则需要在最终的结果集再次sum或者distinct。


下面看看如何使用第二种解决:


由于我们id列表是动态的,每个任务的id列表都有可能变换,所以要满足第二种方法,就得把他们变成一张临时表存储在内存中,当spark任务停止时,就自动销毁,因为他们不需要持久化到硬盘上。

在spark中使用临时表是非常简单的,我们只需要把id列表的数据放入rdd中,然后再把rdd注册成一个张表,就可以和hive库里面已有的表做各种join操作了,一个demo代码如下:
````
import org.apache.spark.sql.SparkSession


object SparkSQLJoinDemo {

  def main(args: Array[String]): Unit = {

    val spark=SparkSession//
      .builder()
      .appName(" spark sql demo ")
      .enableHiveSupport().getOrCreate()

    import spark.implicits._
    import spark.sql

    sql(" use  hivedb ")//指定hive的db库

    val ids="1,2,3,4,5"//模拟的id列表

    val data=ids.split(",").toSeq//转化成Seq结构

    val school_table=spark.sparkContext.makeRDD(data).toDF("id")//指定列名

    school_table.createOrReplaceTempView("temp_table")//在spark的内存里面创建一张临时表

    //这里假设hive_table是存在hive里面的一张表
    val xr=sql("select * from hive_table where hive_id=id")

    println("数据量:"+xr.count()) //打印数据量

    spark.close()//close

    
  }

}

````
上面代码里的ids,就是我们需要转化成内存表的数据,然后需要转成Seq,并生成RDD,再通过RDD转成DataFrame,注意如果要使用DF,需要导入 import spark.implicits._包下面的函数,这样就能隐式的直接转成DF,在转成DF的同时,我们给数据指定了列名叫id,这里如果有多列,后面可以继续逗号分隔,添加多个列名,最终我们给它注册成了内存临时表,然后在下面的语句中就可以直接使用hive里面存在的表与内存表进行join,最终我们打印一下成功join后数量,可以验证下程序是否正常运行。

有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。

0
0
分享到:
评论

相关推荐

    Spark SQL常见4种数据源详解

    一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。 Spark SQL的默认数据源为Parquet格式。数据源为Parquet文件时,Spark SQL可以方便的...

    SparkStreaming流式日志过滤与分析

    记得自己要引入环境 (1)利用SparkStreaming从文件目录读入日志信息,日志内容包含: ”日志级别、函数名、日志内容“ 三个字段,字段之间...(4)DataFrame注册为临时表,使用SQL过滤 (5)将过滤后的数据保存到MySQL

    SparkSQL通过Hive创建DataFrame

    SparkSQL通过Hive创建DataFrame问题分析 ...分析:确实没有临时表View,并且没有开启Hive支持 解决:开启Hive支持 val spark: SparkSession = SparkSession.builder() .appName(SparkUtils) .master(lo

    city_info.sql

    使用Spark SQL从MySQL中查询出来城市信息(city_id、city_name、area),用户访问行为数据要跟城市信息进行join,city_id、city_name、area、product_id,RDD,转换成DataFrame,注册成一个临时表

    03开源NewSql数据库TiDB-Deep Dive into TiDB

    在这一版本中,SQL 执行引擎引入新的内部数据表示方式 --- `Chunk`,一个结构中保存一批数据而不仅是一行数据,同一列的数据在内存中连续存放,使得内存使用更紧凑,这样带来了几点好处:1. 显著减小了内存消耗; 2....

    技巧篇:pyspark常用操作梳理

    创建临时表 创建临时视图 基于dataframe进行操作 了解表结构 查看数据 查看列名 持久化 列操作 列名称重命名 条件筛选 利用when做条件判断 利用between做多条件判断 in数据判断 数据去重 分组统计 ...

    PRoST:Apache Spark之上的RDF存储和SPARQL处理

    对于其他临时解决方案,还可以使用常见的Hadoop技术(例如Spark SQL,Hive或Impala)查询图形。刊物Cossu,Matteo,MichaelFärber和Georg Lausen。 “ PRoST:使用混合分区策略的SPARQL查询的分布式执行”。 ...

    kyuubi:Kyuubi是基于Apache Spark构建的用于大规模数据处理和分析的分布式多租户JDBC服务器

    Kyuubi依靠Apache Spark提供高性能的数据查询功能,并且引擎功能的每一项改进都可以帮助Kyuubi的性能取得质的飞跃。 此外,Kyuubi通过引擎缓存提高了临时响应速度,并通过水平缩放和负载平衡增强了并发性。 它提供...

    data-analysis:数据分析项目

    数据分析 数据科学项目 Yelp SQL分析(SQL) 我的目标是使用SQL查询对Yelp数据集执行多种分析。 对于该项目,所有查询均在SQLite而非MySQL上执行。... 然后,我最终使用了给我的UDF创建了一个临时视图,其中的另一列包

    大数据与人工智能-fy.docx

    由于Redis本质上是一个内存数据库,所以内存硬件的容量大小直接决定了Redis可用的数据库空间 D.比较适合存储视频文件(正确答案) 大数据与人工智能-fy全文共22页,当前为第2页。9. 关于Redis说法不正确的是 [单选题] ...

Global site tag (gtag.js) - Google Analytics