推广

第九篇|Spark的五种JOIN策略解析

iseeyu2年前 (2024-02-21)推广136

image

如上图所示:Shuffle Hash Join的基本步骤主要有以下两点:

  • 首先,对于两张参与JOIN的表,分别按照join key进行重分区,该过程会涉及Shuffle,其目的是将相同join key的数据发送到同一个分区,方便分区内进行join。
  • 其次,对于每个Shuffle之后的分区,会将小表的分区数据构建成一个Hash table,然后根据join key与大表的分区数据记录进行匹配。

条件与特点

  • 仅支持等值连接,join key不需要排序
  • 支持除了全外连接(full outer joins)之外的所有join类型
  • 需要对小表构建Hash map,属于内存密集型的操作,如果构建Hash表的一侧数据比较大,可能会造成OOM
  • 将参数spark.sql.join.prefersortmergeJoin (default true)置为false

Broadcast Hash Join

简介

也称之为Map端JOIN。当有一张表较小时,我们通常选择Broadcast Hash Join,这样可以避免Shuffle带来的开销,从而提高性能。比如事实表与维表进行JOIN时,由于维表的数据通常会很小,所以可以使用Broadcast Hash Join将维表进行Broadcast。这样可以避免数据的Shuffle(在Spark中Shuffle操作是很耗时的),从而提高JOIN的效率。在进行 Broadcast Join 之前,Spark 需要把处于 Executor 端的数据先发送到 Driver 端,然后 Driver 端再把数据广播到 Executor 端。如果我们需要广播的数据比较多,会造成 Driver 端出现 OOM。具体如下图示:

第九篇|Spark的五种JOIN策略解析

image

Broadcast Hash Join主要包括两个阶段:

  • Broadcast阶段 :小表被缓存在executor中
  • Hash Join阶段:在每个 executor中执行Hash Join

条件与特点

  • 仅支持等值连接,join key不需要排序
  • 支持除了全外连接(full outer joins)之外的所有join类型
  • Broadcast Hash Join相比其他的JOIN机制而言,效率更高。但是,Broadcast Hash Join属于网络密集型的操作(数据冗余传输),除此之外,需要在Driver端缓存数据,所以当小表的数据量较大时,会出现OOM的情况
  • 被广播的小表的数据量要小于spark.sql.autoBroadcastJoinThreshold值,默认是10MB(10485760)
  • 被广播表的大小阈值不能超过8GB,spark2.4源码如下:BroadcastExchangeExec.scala
longMetric("dataSize") += dataSize
          if (dataSize >= (8L << 30)) {
            throw new SparkException(
              s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB")
          }
  • 基表不能被broadcast,比如左连接时,只能将右表进行广播。形如:fact_table.join(broadcast(dimension_table),可以不使用broadcast提示,当满足条件时会自动转为该JOIN方式。

Sort Merge Join

简介

该JOIN机制是Spark默认的,可以通过参数spark.sql.join.preferSortMergeJoin进行配置,默认是true,即优先使用Sort Merge Join。一般在两张大表进行JOIN时,使用该方式。Sort Merge Join可以减少集群中的数据传输,该方式不会先加载所有数据的到内存,然后进行hashjoin,但是在JOIN之前需要对join key进行排序。具体图示:

image

Sort Merge Join主要包括三个阶段:

  • Shuffle Phase : 两张大表根据Join key进行Shuffle重分区
  • Sort Phase: 每个分区内的数据进行排序
  • Merge Phase: 对来自不同表的排序好的分区数据进行JOIN,通过遍历元素,连接具有相同Join key值的行来合并数据集

条件与特点

  • 仅支持等值连接
  • 支持所有join类型
  • Join Keys是排序的
  • 参数spark.sql.join.prefersortmergeJoin (默认true)设定为true

Cartesian Join

简介

如果 Spark 中两张参与 Join 的表没指定join key(ON 条件)那么会产生 Cartesian product join,这个 Join 得到的结果其实就是两张行数的乘积。

条件

  • 仅支持内连接
  • 支持等值和不等值连接
  • 开启参数spark.sql.crossJoin.enabled=true

Broadcast Nested Loop Join

简介

该方式是在没有合适的JOIN机制可供选择时,最终会选择该种join策略。优先级为:Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > cartesian Join > Broadcast Nested Loop Join.

在Cartesian 与Broadcast Nested Loop Join之间,如果是内连接,或者非等值连接,则优先选择Broadcast Nested Loop策略,当时非等值连接并且一张表可以被广播时,会选择Cartesian Join。

条件与特点

  • 支持等值和非等值连接
  • 支持所有的JOIN类型,主要优化点如下:
    • 当右外连接时要广播左表
    • 当左外连接时要广播右表
    • 当内连接时,要广播左右两张表

Spark是如何选择JOIN策略的

等值连接的情况

有join提示(hints)的情况,按照下面的顺序

  • 1.Broadcast Hint:如果join类型支持,则选择broadcast hash join
  • 2.Sort merge hint:如果join key是排序的,则选择 sort-merge join
  • 3.shuffle hash hint:如果join类型支持, 选择 shuffle hash join
  • 4.shuffle replicate NL hint: 如果是内连接,选择笛卡尔积方式

没有join提示(hints)的情况,则逐个对照下面的规则

  • 1.如果join类型支持,并且其中一张表能够被广播(spark.sql.autoBroadcastJoinThreshold值,默认是10MB),则选择 broadcast hash join
  • 2.如果参数spark.sql.join.preferSortMergeJoin设定为false,且一张表足够小(可以构建一个hash map) ,则选择shuffle hash join
  • 3.如果join keys 是排序的,则选择sort-merge join
  • 4.如果是内连接,选择 cartesian join
  • 5.如果可能会发生OOM或者没有可以选择的执行策略,则最终选择broadcast nested loop join

非等值连接情况

有join提示(hints),按照下面的顺序

  • 1.broadcast hint:选择broadcast nested loop join.
  • 2.shuffle replicate NL hint: 如果是内连接,则选择cartesian product join

没有join提示(hints),则逐个对照下面的规则

  • 1.如果一张表足够小(可以被广播),则选择 broadcast nested loop join
  • 2.如果是内连接,则选择cartesian product join
  • 3.如果可能会发生OOM或者没有可以选择的执行策略,则最终选择broadcast nested loop join

join策略选择的源码片段

  object JoinSelection extends Strategy
    with PredicateHelper
    with JoinSelectionHelper {
    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {

      case j @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, left, right, hint) =>
        def createBroadcastHashJoin(onlyLookingAtHint: Boolean) = {
          getBroadcastBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map {
            buildSide =>
              Seq(joins.BroadcastHashJoinExec(
                leftKeys,
                rightKeys,
                joinType,
                buildSide,
                nonEquiCond,
                planLater(left),
                planLater(right)))
          }
        }

        def createShuffleHashJoin(onlyLookingAtHint: Boolean) = {
          getShuffleHashJoinBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map {
            buildSide =>
              Seq(joins.ShuffledHashJoinExec(
                leftKeys,
                rightKeys,
                joinType,
                buildSide,
                nonEquiCond,
                planLater(left),
                planLater(right)))
          }
        }

        def createSortMergeJoin() = {
          if (RowOrdering.isOrderable(leftKeys)) {
            Some(Seq(joins.SortMergeJoinExec(
              leftKeys, rightKeys, joinType, nonEquiCond, planLater(left), planLater(right))))
          } else {
            None
          }
        }

        def createCartesianProduct() = {
          if (joinType.isInstanceOf[InnerLike]) {
            Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), j.condition)))
          } else {
            None
          }
        }

        def createJoinWithoutHint() = {
          createBroadcastHashJoin(false)
            .orElse {
              if (!conf.preferSortMergeJoin) {
                createShuffleHashJoin(false)
              } else {
                None
              }
            }
            .orElse(createSortMergeJoin())
            .orElse(createCartesianProduct())
            .getOrElse {
              val buildSide = getSmallerSide(left, right)
              Seq(joins.BroadcastNestedLoopJoinExec(
                planLater(left), planLater(right), buildSide, joinType, nonEquiCond))
            }
        }

        createBroadcastHashJoin(true)
          .orElse { if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None }
          .orElse(createShuffleHashJoin(true))
          .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }
          .getOrElse(createJoinWithoutHint())

    
          if (canBuildLeft(joinType)) BuildLeft else BuildRight
        }

        def createBroadcastNLJoin(buildLeft: Boolean, buildRight: Boolean) = {
          val maybeBuildSide = if (buildLeft && buildRight) {
            Some(desiredBuildSide)
          } else if (buildLeft) {
            Some(BuildLeft)
          } else if (buildRight) {
            Some(BuildRight)
          } else {
            None
          }

          maybeBuildSide.map { buildSide =>
            Seq(joins.BroadcastNestedLoopJoinExec(
              planLater(left), planLater(right), buildSide, joinType, condition))
          }
        }

        def createCartesianProduct() = {
          if (joinType.isInstanceOf[InnerLike]) {
            Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition)))
          } else {
            None
          }
        }

        def createJoinWithoutHint() = {
          createBroadcastNLJoin(canBroadcastBySize(left, conf), canBroadcastBySize(right, conf))
            .orElse(createCartesianProduct())
            .getOrElse {
              Seq(joins.BroadcastNestedLoopJoinExec(
                planLater(left), planLater(right), desiredBuildSide, joinType, condition))
            }
        }

        createBroadcastNLJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint))
          .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }
          .getOrElse(createJoinWithoutHint())
      case _ => Nil
    }
  }

总结

本文主要介绍了Spark提供的5种JOIN策略,并对三种比较重要的JOIN策略进行了图示解析。首先对影响JOIN的因素进行了梳理,然后介绍了5种Spark的JOIN策略,并对每种JOIN策略的具体含义和触发条件进行了阐述,最后给出了JOIN策略选择对应的源码片段。希望本文能够对你有所帮助。

『大数据技术与数仓』

扫描二维码推送至手机访问。

版权声明:本文由西安泽虎代运营发布,如需转载请注明出处。

转载请注明出处https://0291.com.cn/post/57496.html

相关文章

#管理

#管理

我认为领导者的真正的艺术,最大的点应该是如何去识人,如何去用人,如何去选人。你去看一看二月河的雍正王朝,这当中就充满了智慧。你会发现在争夺太子的时候,四爷胤禛和八爷胤,其实打的是一塌糊涂水深火热。八爷党支持的人非常多,包括当时的中堂佟国维、马齐,各省的都府皇子当中包括有九爷...

5月打新赚钱势头归来?部分新股报价谨慎,询价对象数量缩减,网下中签率持续上升,打新生态正改变

5月打新赚钱势头归来?部分新股报价谨慎,询价对象数量缩减,网下中签率持续上升,打新生态正改变

财联社5月25日讯(记者 高艳云)北交所“转板第一股”登陆科创板上市首日破发,但并未打破5月新股打新势头。5月25日,观典防务科创板上市首日收跌23.63%,总市值日内缩水12.30亿元,若以原股东无交易的假设情形计算,原1.76万户股东户均浮亏近7万元。剔除转板而无打新情...

做运营总是完不成任务,来试试这招!

做运营总是完不成任务,来试试这招!

为什么用尽浑身解数,用户活跃度就是不够?为什么拼命进行宣传,用户数量还是那么少?老板总说:引导用户去使用APP,达不成目标,扣你这个月奖金!啊呸,你说引导就引导呀,APP做的那么垃圾,用户会用才怪呢! 相信很多运营人都被以上问题困扰过,用户活跃度太低,用户数量太少,尤其是引导站...

从“人-货-场”逻辑:看品牌—商家如何做好“抖

从“人-货-场”逻辑:看品牌—商家如何做好“抖

本文4395字,预计阅读15分钟 自2020年10月正式封杀第三方外链至今,抖音电商可谓 动作频频,其中较为重要的两条分别指向电商的另 外两个重要组成部分:带货端(核心指:主播/达人)和流量端(核心指:广告) 。 01 针对带货端:上线保证金政策...

小编分享企业建站能带来哪些竞争优势。

小编分享企业建站能带来哪些竞争优势。

现在很多人已经把企业转为线上,但是,你必须了解建设一个网站会给你带来什么?首先它的价格更低:网站建设公司自行开发系统,一般的开发成本都会在千元以上,就算网站建设公司使用自己已经开发的系统建站,客户的一些个性化需要也需要二次开发,而使用企业网站管理系统能满足绝大多数企业网站的功能需求,且成本非常低,甚...

海尔市场营销环境分析

优质文档宏观环境分析 1、人口环境 我国人口众多,分布不均,各地收入水平差距也很大,导致家电的密集度和发展程度参差不齐,从一定程度上也不利于家电行业的均衡持久发展. 2、经济环境 随着中国加入WTO,为本国家电企业开拓国际市场提供了广阔天地.加入WTO意味着大家可以在一个...

现在,非常期待与您的又一次邂逅

我们努力让每一部企业宣传片和抖音短视频成为商业大片