Spark SQL 源码解析

Spark SQL是让Spark应用程序拥有高效性、高可容错性和丰富生态的“幕后英雄”。在学习Spark SQL 底层解析原理前,先看下面三张架构图。架构图中运行的流程,以及过程中涉及到的组件。两张图片互为补充的去看。
AAEAAQAAAAAAAAsRAAAAJDg2ZTBlMGY2LThmY2YtNDBiZS1iZWVlLTIwMzdkZDA4MTY3OA.png


B05793_03_02 (1).jpg

运行流程


首先梳理运行流程,在spark中执行SQL语句或者DataFrame的算子操作,在集群主要经过两大过程

  1. LogicalPlan(逻辑计划),理解为树型数据结构,逻辑算子树
  2. PhysicalPlan(物理计划),理解为物理算子树


其次,两个主要过程中有包含几个子过程:

逻辑计划(LogicalPlan):**

  1. 未解析的逻辑算子树(Unresolved LogicalPlan),仅仅是数据结构,不包含任何数据信息
  2. 解析后逻辑算子树(Analyzed LogicalPlan),节点中绑定各种信息,可以看到,这个过程中有catalog参与
  3. 优化后逻辑算子树(Optimized LogicalPlan),应用各种优化规则对一些低效的逻辑计划进行转换


物理计划(PhysicalPlan):**

  1. 根据逻辑算子树,生成物理算子树的列表(Iterator[PhysicalPlan]),同样逻辑算子树可能对应多个物理算子树。(上图可以看到,PhysicalPlan阶段的多个绿色五边形)
  2. 从列表中按照一定策略选取最优的物理算子树(SparkPlan)
  3. 对选取的物理算子树做提交前的准备工作,例如确保分区操作正确,物理算子节点重用,执行代码生成等。经过准备后的物理算子(Prepared SparkPlan)

最终物理算子树生成的RDD执行action操作,即可提交执行。

流程说明

image.png


逻辑执行计划主要是一系列 _抽象的转换过程_,它并不涉及executors或者drivers,它仅仅是将用户的代码表达式转换成一个优化的版本。

用户的代码首先会被转换成 未解决的逻辑计划(unresolved logical plan) ,之所以将之称作 未解决的 , 是因为它并一定是正确的,它所引用到的表名或者列名可能存在,也可能不存在。

Spark之后会使用 calalog_,一个含有所有table和DataFrame的元数据仓库,在 _分析器(analyser) 中来解析校对所引用的表名或者列名。

假如 unresolved logical plan 通过了验证,这时的计划我们称之为 已解决的逻辑计划(resolved logical plan) 。它会传递到 Catalyst Optimizer 优化器,然后被经过一些列优化生成最优逻辑计划(Optimized logical plan)

AST -> Unresolved Logical Plan


实质:SQL语句首先通过Parser模块被解析为语法树,此棵树称为Unresolved Logical Plan


首先在架构图中,最前端我们可以是在Spark中指定SQL语句,或者是Dataframe的算子操作,那么接下来,就直接进入了逻辑计划生成阶段:

  1. spark.sql() 这种方式的话会涉及到sql的解析,解析之后就会生成逻辑计划
  2. 如果是直接在DataFrame的api上直接操作的话,使用的api将直接生成逻辑计划


这里,我们从spark.sql()为入口,在源码里面一步一步的去探索底层逻辑是如何实现的。

1
spark.sql("select * from t_user")

sql入口
⬇️

1
2
3
4
5
6
7
8
9
/**
* Executes a SQL query using Spark, returning the result as a `DataFrame`.
* The dialect that is used for SQL parsing can be configured with 'spark.sql.dialect'.
*
* @since 2.0.0
*/
def sql(sqlText: String): DataFrame = {
Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
}

⬇️

1
2
3
4
5
6
7
8
trait ParserInterface {
/**
* Parse a string to a [[LogicalPlan]].
*/
@throws[ParseException]("Text cannot be parsed to a LogicalPlan")
def parsePlan(sqlText: String): LogicalPlan
.......
}

这里看到 parsePlan 方法返回为LogicalPlan,就是在这里,将一个sql语句通过ast解析成unresolved logicalPlan
⬇️

1
2
3
4
5
6
7
8
9
/** Creates LogicalPlan for a given SQL string. */
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
astBuilder.visitSingleStatement(parser.singleStatement()) match {
case plan: LogicalPlan => plan
case _ =>
val position = Origin(None, None)
throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
}
}

ANTLR4是JAVA写的语言识别工具,它用来声明语言的语法。它的语法识别分为两个阶段

  • 词法分析阶段:理解为把符号分成组或者符号类
  • 解析阶段:根据词法,解析成一棵语法分析树


astBuilder类主要功能是把Antlr4的解析树 转换为catalyst表达式
astBuilder是SparkSqlAstBuilder的实例,在将Antlr中的匹配树转换成unresolved logical plan中,它起着桥梁作用。singleStatement()方法构建整棵语法树,然后通过 astBuilder.visitSingleStatement使用visitor模式,开始匹配SqlBase.g4中sql的入口匹配规则,(SqlBase.g4是sparksql的语法文件)


递归的遍历statement,以及其后的各个节点。在匹配过程中,碰到叶子节点,就将构造Logical Plan中对应的TreeNode。


⬇️

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
logDebug(s"Parsing command: $command")

val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
lexer.removeErrorListeners()
lexer.addErrorListener(ParseErrorListener)

val tokenStream = new CommonTokenStream(lexer)
val parser = new SqlBaseParser(tokenStream)
parser.addParseListener(PostProcessor)
parser.removeErrorListeners()
parser.addErrorListener(ParseErrorListener)

try {
try {
// first, try parsing with potentially faster SLL mode
parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
toResult(parser)
}
catch {
case e: ParseCancellationException =>
// if we fail, parse with LL mode
tokenStream.seek(0) // rewind input stream
parser.reset()

// Try Again.
parser.getInterpreter.setPredictionMode(PredictionMode.LL)
toResult(parser)
}
}
catch {
case e: ParseException if e.command.isDefined =>
throw e
case e: ParseException =>
throw e.withCommand(command)
case e: AnalysisException =>
val position = Origin(e.line, e.startPosition)
throw new ParseException(Option(command), e.message, position, position)
}
}

先后实例化了分词解析SqlBaseLexer和语法解析SqlBaseParser类,最后将antlr的语法解析器,然后一次尝试用不同的模式去进行解析。最终将antlr的语法解析器parser,传入parsePlan。


UnresolvedLogicalPlan在经历上面过程之后,此时的未解析的逻辑树,只是一个简单逻辑,还没有绑定数据,没有任何数据信息。那么接下来发生的事情就是如何在UnresolvedLogicalPlan基础上绑定每个节点信息?

Unresolved Logical Plan -> Logical Plan


实质:Unresolved Logical Plan通过Analyzer模块借助于Catalog中的表信息解析为Logical Plan;


流程图中,我们看到Analysis主要职责就是将通过Sql Parser未能Resolved的Logical Plan给Resolved掉。
我们从下面开始:

1
2
3
4
5
6
7
8
9
/**
* Executes a SQL query using Spark, returning the result as a `DataFrame`.
* The dialect that is used for SQL parsing can be configured with 'spark.sql.dialect'.
*
* @since 2.0.0
*/
def sql(sqlText: String): DataFrame = {
Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
}

在上一阶段我们分析到sessionState.sqlParser.parsePlan(sqlText)方式是逻辑计划的开始,解析生成Unresolved Logical Plan,这时Unresolved Logical Plan作为参数,传入到ofRows()方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package org.apache.spark.sql

private[sql] object Dataset {
def apply[T: Encoder](sparkSession: SparkSession, logicalPlan: LogicalPlan): Dataset[T] = {
val dataset = new Dataset(sparkSession, logicalPlan, implicitly[Encoder[T]])
// Eagerly bind the encoder so we verify that the encoder matches the underlying
// schema. The user will get an error if this is not the case.
dataset.deserializer
dataset
}

def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
val qe = sparkSession.sessionState.executePlan(logicalPlan)
qe.assertAnalyzed()
new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
}
}

这里首先创建了queryExecution类对象,QueryExecution中定义了sql执行过程中的关键步骤,是sql执行的关键类,返回一个dataframe类型的对象。QueryExecution类中的成员都是lazy的,被调用时才会执行。只有等到程序中出现action算子时,才会调用 queryExecution类中的executedPlan成员,

1
def executePlan(plan: LogicalPlan): QueryExecution = createQueryExecution(plan)

创建QueryExecution

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {

// TODO: Move the planner an optimizer into here from SessionState.
protected def planner = sparkSession.sessionState.planner

def assertAnalyzed(): Unit = analyzed

def assertSupported(): Unit = {
if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
UnsupportedOperationChecker.checkForBatch(analyzed)
}
}

lazy val analyzed: LogicalPlan = {
SparkSession.setActiveSession(sparkSession)
sparkSession.sessionState.analyzer.executeAndCheck(logical)
}
.......

QueryExecution是Spark用来执行关系型查询的主要工作流。它是被设计用来为开发人员提供更方便的对查询执行中间阶段的访问。QueryExecution中最重要的是成员变量,它的成员变量几乎都是lazy variable,它的方法大部分是为了提供给这些lazy variable去调用的,调用analyzer解析器,对Unresolved LogicalPlan进行元数据绑定生成的Resolved LogicalPlan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package org.apache.spark.sql.catalyst.analysis
/**
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
* [[UnresolvedRelation]]s into fully typed objects using information in a [[SessionCatalog]].
*/
class Analyzer(
catalog: SessionCatalog,
conf: SQLConf,
maxIterations: Int)
extends RuleExecutor[LogicalPlan] with CheckAnalysis {

def this(catalog: SessionCatalog, conf: SQLConf) = {
this(catalog, conf, conf.optimizerMaxIterations)
}

def executeAndCheck(plan: LogicalPlan): LogicalPlan = {
val analyzed = execute(plan)
try {
checkAnalysis(analyzed)
EliminateBarriers(analyzed)
} catch {
case e: AnalysisException =>
val ae = new AnalysisException(e.message, e.line, e.startPosition, Option(analyzed))
ae.setStackTrace(e.getStackTrace)
throw ae
}
}

override def execute(plan: LogicalPlan): LogicalPlan = {
AnalysisContext.reset()
try {
executeSameContext(plan)
} finally {
AnalysisContext.reset()
}
}

private def executeSameContext(plan: LogicalPlan): LogicalPlan = super.execute(plan)

def resolver: Resolver = conf.resolver

protected val fixedPoint = FixedPoint(maxIterations)

/**
* Override to provide additional rules for the "Resolution" batch.
*/
val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Nil

/**
* Override to provide rules to do post-hoc resolution. Note that these rules will be executed
* in an individual batch. This batch is to run right after the normal resolution batch and
* execute its rules in one pass.
*/
val postHocResolutionRules: Seq[Rule[LogicalPlan]] = Nil

lazy val batches: Seq[Batch] = Seq(
Batch("Hints", fixedPoint,
new ResolveHints.ResolveBroadcastHints(conf),
ResolveHints.RemoveAllHints),
Batch("Simple Sanity Check", Once,
LookupFunctions),
Batch("Substitution", fixedPoint,
CTESubstitution,
WindowsSubstitution,
EliminateUnions,
new SubstituteUnresolvedOrdinals(conf)),
Batch("Resolution", fixedPoint,
ResolveTableValuedFunctions ::
ResolveRelations ::
ResolveReferences ::
ResolveCreateNamedStruct ::
ResolveDeserializer ::
ResolveNewInstance ::
ResolveUpCast ::
ResolveGroupingAnalytics ::
ResolvePivot ::
ResolveOrdinalInOrderByAndGroupBy ::
ResolveAggAliasInGroupBy ::
ResolveMissingReferences ::
ExtractGenerator ::
ResolveGenerate ::
ResolveFunctions ::
ResolveAliases ::
ResolveSubquery ::
ResolveSubqueryColumnAliases ::
ResolveWindowOrder ::
ResolveWindowFrame ::
ResolveNaturalAndUsingJoin ::
ExtractWindowExpressions ::
GlobalAggregates ::
ResolveAggregateFunctions ::
TimeWindowing ::
ResolveInlineTables(conf) ::
ResolveTimeZone(conf) ::
ResolvedUuidExpressions ::
TypeCoercion.typeCoercionRules(conf) ++
extendedResolutionRules : _*),
Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
Batch("View", Once,
AliasViewChild(conf)),
Batch("Nondeterministic", Once,
PullOutNondeterministic),
Batch("UDF", Once,
HandleNullInputsForUDF),
Batch("FixNullability", Once,
FixNullability),
Batch("Subquery", Once,
UpdateOuterReferences),
Batch("Cleanup", fixedPoint,
CleanupAliases)
)

其中val analyzed: LogicalPlan= analyzer.execute(logical),logical就是sqlparser解析出来的unresolved logical plan,analyzed就是analyzed logical plan。那么exectue究竟是这么样的过程呢?


super.execute(plan)方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124

package org.apache.spark.sql.catalyst.rules

object RuleExecutor {
protected val queryExecutionMeter = QueryExecutionMetering()

/** Dump statistics about time spent running specific rules. */
def dumpTimeSpent(): String = {
queryExecutionMeter.dumpTimeSpent()
}

/** Resets statistics about time spent running specific rules */
def resetMetrics(): Unit = {
queryExecutionMeter.resetMetrics()
}
}

abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {

/**
* An execution strategy for rules that indicates the maximum number of executions. If the
* execution reaches fix point (i.e. converge) before maxIterations, it will stop.
*/
abstract class Strategy { def maxIterations: Int }

/** A strategy that only runs once. */
case object Once extends Strategy { val maxIterations = 1 }

/** A strategy that runs until fix point or maxIterations times, whichever comes first. */
case class FixedPoint(maxIterations: Int) extends Strategy

/** A batch of rules. */
protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)

/** Defines a sequence of rule batches, to be overridden by the implementation. */
protected def batches: Seq[Batch]

/**
* Defines a check function that checks for structural integrity of the plan after the execution
* of each rule. For example, we can check whether a plan is still resolved after each rule in
* `Optimizer`, so we can catch rules that return invalid plans. The check function returns
* `false` if the given plan doesn't pass the structural integrity check.
*/
protected def isPlanIntegral(plan: TreeType): Boolean = true

/**
* Executes the batches of rules defined by the subclass. The batches are executed serially
* using the defined execution strategy. Within each batch, rules are also executed serially.
*/
def execute(plan: TreeType): TreeType = {
var curPlan = plan
val queryExecutionMetrics = RuleExecutor.queryExecutionMeter

batches.foreach { batch =>
val batchStartPlan = curPlan
var iteration = 1
var lastPlan = curPlan
var continue = true

// Run until fix point (or the max number of iterations as specified in the strategy.
while (continue) {
curPlan = batch.rules.foldLeft(curPlan) {
case (plan, rule) =>
val startTime = System.nanoTime()
val result = rule(plan)
val runTime = System.nanoTime() - startTime

if (!result.fastEquals(plan)) {
queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
logTrace(
s"""
|=== Applying Rule ${rule.ruleName} ===
|${sideBySide(plan.treeString, result.treeString).mkString("\n")}
""".stripMargin)
}
queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
queryExecutionMetrics.incNumExecution(rule.ruleName)

// Run the structural integrity checker against the plan after each rule.
if (!isPlanIntegral(result)) {
val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " +
"the structural integrity of the plan is broken."
throw new TreeNodeException(result, message, null)
}

result
}
iteration += 1
if (iteration > batch.strategy.maxIterations) {
// Only log if this is a rule that is supposed to run more than once.
if (iteration != 2) {
val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}"
if (Utils.isTesting) {
throw new TreeNodeException(curPlan, message, null)
} else {
logWarning(message)
}
}
continue = false
}

if (curPlan.fastEquals(lastPlan)) {
logTrace(
s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
continue = false
}
lastPlan = curPlan
}

if (!batchStartPlan.fastEquals(curPlan)) {
logDebug(
s"""
|=== Result of Batch ${batch.name} ===
|${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")}
""".stripMargin)
} else {
logTrace(s"Batch ${batch.name} has no effect.")
}
}

curPlan
}
}

1587653425913-e81fdad6-b476-43d2-80d9-e04009760e3a.png


此函数实现了针对analyzer类中定义的每一个batch(类别),按照batch中定义的fix point(策略)和rule(规则)对Unresolved的逻辑计划进行解析。这里提到的batch,可以在上面的class Analyzer中看到,lazy val batches: Seq[Batch],这里定义了一组转换规则。

Analyzer中使用的Rules,具体的Rule实现是通过RuleExecutor完成 ,定义了batches,由多个batch构成,每个batch又有不同的rule构成,如Resolution由ResolveReferences 、ResolveRelations、ResolveSortReferences 、NewRelationInstances等构成;每个rule又有自己相对应的处理函数,可以具体参看Analyzer中的ResolveReferences 、ResolveRelations、ResolveSortReferences 、NewRelationInstances函数;同时要注意的是,不同的rule应用次数是不同的:如CaseInsensitiveAttributeReferences这个batch中rule只应用了一次(Once),而Resolution这个batch中的rule应用了多次(fixedPoint = FixedPoint(100),也就是说最多应用100次,除非前后迭代结果一致退出)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 输入为旧的plan,输出为新的plan,仅此而已。
* 所以真正的逻辑在各个继承实现的rule里,analyze的过程也就是执行各个rule的过程
*/
abstract class Rule[TreeType <: TreeNode[_]] extends Logging {

/** Name for this rule, automatically inferred based on class name. */
val ruleName: String = {
val className = getClass.getName
if (className endsWith "$") className.dropRight(1) else className
}

def apply(plan: TreeType): TreeType
}

rule(plan),这里的参数plan是应用rule.apply转化里面的TreeNode


分析后,每张表对应的字段集,字段类型,数据存储位置都已确定。Project 与 Filter 操作的字段类型以及在表中的位置也已确定。有了这些信息,已经可以直接将该 LogicalPlan 转换为 Physical Plan 进行执行。


但是由于不同用户提交的 SQL 质量不同,直接执行会造成不同用户提交的语义相同的不同 SQL 执行效率差距甚远。换句话说,如果要保证较高的执行效率,用户需要做大量的 SQL 优化,使用体验大大降低。


为了尽可能保证无论用户是否熟悉 SQL 优化,提交的 SQL 质量如何, Spark SQL 都能以较高效率执行,还需在执行前进行 LogicalPlan 优化。

Logical Plan -> Optimized Logical Plan

作用:Optimizer再通过各种基于规则的优化策略对Logical Plan进行深入优化,得到Optimized Logical Plan;优化后的逻辑执行计划依然是逻辑的,并不能被Spark系统理解

Optimizer的主要职责是将Analyzer给Resolved的Logical Plan根据不同的优化策略Batch,来对语法树进行优化,优化逻辑计划节点(Logical Plan)以及表达式(Expression),也是转换成物理执行计划的前置。它的工作原理和analyzer一致,也是通过其下的batch里面的Rule[LogicalPlan]来进行处理的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package org.apache.spark.sql.execution

/**
* The primary workflow for executing relational queries using Spark. Designed to allow easy
* access to the intermediate phases of query execution for developers.
*
* While this is not a public class, we should avoid changing the function names for the sake of
* changing them, because a lot of developers use the feature for debugging.
*/
class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {

// TODO: Move the planner an optimizer into here from SessionState.
protected def planner = sparkSession.sessionState.planner

def assertAnalyzed(): Unit = analyzed

def assertSupported(): Unit = {
if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
UnsupportedOperationChecker.checkForBatch(analyzed)
}
}

lazy val analyzed: LogicalPlan = {
SparkSession.setActiveSession(sparkSession)
sparkSession.sessionState.analyzer.executeAndCheck(logical)
}
// 如果缓存中有查询结果,则直接替换为缓存的结果,逻辑不复杂,这里不再展开讲了。
lazy val withCachedData: LogicalPlan = {
assertAnalyzed()
assertSupported()
sparkSession.sharedState.cacheManager.useCachedData(analyzed)
}
// 对Logical Plan 优化
lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData)

lazy val sparkPlan: SparkPlan = {
SparkSession.setActiveSession(sparkSession)
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
// but we will implement to choose the best plan.
planner.plan(ReturnAnswer(optimizedPlan)).next()
}
.......

看到Optimizer也是继承自RuleExecutor,和Analyzer一个套路,也是遍历tree,并对每个节点应用rule。


Optimizer里的batches包含了3类优化策略:1、Combine Limits 合并Limits 2、ConstantFolding 常量合并 3、Filter Pushdown 过滤器下推,每个Batch里定义的优化伴随对象都定义在Optimizer里了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package org.apache.spark.sql.catalyst.optimizer
/**
* Abstract class all optimizers should inherit of, contains the standard batches (extending
* Optimizers can override this.
*/
abstract class Optimizer(sessionCatalog: SessionCatalog)
extends RuleExecutor[LogicalPlan] {

// Check for structural integrity of the plan in test mode. Currently we only check if a plan is
// still resolved after the execution of each rule.
override protected def isPlanIntegral(plan: LogicalPlan): Boolean = {
!Utils.isTesting || plan.resolved
}

protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations)

def batches: Seq[Batch] = {
val operatorOptimizationRuleSet =
Seq(
// Operator push down
PushProjectionThroughUnion,
ReorderJoin,
EliminateOuterJoin,
PushPredicateThroughJoin,
PushDownPredicate,
LimitPushDown,
ColumnPruning,
InferFiltersFromConstraints,
// Operator combine
CollapseRepartition,
CollapseProject,
CollapseWindow,
CombineFilters,
CombineLimits,
CombineUnions,
// Constant folding and strength reduction
NullPropagation,
ConstantPropagation,
FoldablePropagation,
OptimizeIn,
ConstantFolding,
ReorderAssociativeOperator,
LikeSimplification,
BooleanSimplification,
SimplifyConditionals,
RemoveDispensableExpressions,
SimplifyBinaryComparison,
PruneFilters,
EliminateSorts,
SimplifyCasts,
SimplifyCaseConversionExpressions,
RewriteCorrelatedScalarSubquery,
EliminateSerialization,
RemoveRedundantAliases,
RemoveRedundantProject,
SimplifyCreateStructOps,
SimplifyCreateArrayOps,
SimplifyCreateMapOps,
CombineConcats) ++
extendedOperatorOptimizationRules

val operatorOptimizationBatch: Seq[Batch] = {
val rulesWithoutInferFiltersFromConstraints =
operatorOptimizationRuleSet.filterNot(_ == InferFiltersFromConstraints)
Batch("Operator Optimization before Inferring Filters", fixedPoint,
rulesWithoutInferFiltersFromConstraints: _*) ::
Batch("Infer Filters", Once,
InferFiltersFromConstraints) ::
Batch("Operator Optimization after Inferring Filters", fixedPoint,
rulesWithoutInferFiltersFromConstraints: _*) :: Nil
}

(Batch("Eliminate Distinct", Once, EliminateDistinct) ::
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
// However, because we also use the analyzer to canonicalized queries (for view definition),
// we do not eliminate subqueries or compute current time in the analyzer.
Batch("Finish Analysis", Once,
EliminateSubqueryAliases,
EliminateView,
ReplaceExpressions,
ComputeCurrentTime,
GetCurrentDatabase(sessionCatalog),
RewriteDistinctAggregates,
ReplaceDeduplicateWithAggregate) ::
//////////////////////////////////////////////////////////////////////////////////////////
// Optimizer rules start here
//////////////////////////////////////////////////////////////////////////////////////////
// - Do the first call of CombineUnions before starting the major Optimizer rules,
// since it can reduce the number of iteration and the other rules could add/move
// extra operators between two adjacent Union operators.
// - Call CombineUnions again in Batch("Operator Optimizations"),
// since the other rules might make two separate Unions operators adjacent.
Batch("Union", Once,
CombineUnions) ::
Batch("Pullup Correlated Expressions", Once,
PullupCorrelatedPredicates) ::
Batch("Subquery", Once,
OptimizeSubqueries) ::
Batch("Replace Operators", fixedPoint,
ReplaceIntersectWithSemiJoin,
ReplaceExceptWithFilter,
ReplaceExceptWithAntiJoin,
ReplaceDistinctWithAggregate) ::
Batch("Aggregate", fixedPoint,
RemoveLiteralFromGroupExpressions,
RemoveRepetitionFromGroupExpressions) :: Nil ++
operatorOptimizationBatch) :+
Batch("Join Reorder", Once,
CostBasedJoinReorder) :+
Batch("Decimal Optimizations", fixedPoint,
DecimalAggregates) :+
Batch("Object Expressions Optimization", fixedPoint,
EliminateMapObjects,
CombineTypedFilters) :+
Batch("LocalRelation", fixedPoint,
ConvertToLocalRelation,
PropagateEmptyRelation) :+
// The following batch should be executed after batch "Join Reorder" and "LocalRelation".
Batch("Check Cartesian Products", Once,
CheckCartesianProducts) :+
Batch("RewriteSubquery", Once,
RewritePredicateSubquery,
ColumnPruning,
CollapseProject,
RemoveRedundantProject)
}

Optimizer的优化策略不仅有对plan进行transform的,也有对expression进行transform的,究其原理就是遍历树,然后应用优化的Rule,但是注意一点,对Logical Plantransfrom的是先序遍历(pre-order),而对Expression transfrom的时候是后序遍历(post-order):

batch的执行和analyzer一样是通过RuleExecutor的execute方法依次遍历,这里不再解析

Catalyst

以上的三个过程都是属于Catalyst阶段,SQL语句首先通过Parser模块被解析为语法树,此棵树称为Unresolved Logical Plan;Unresolved Logical Plan通过Analyzer模块借助于Catalog中的表信息解析为Logical Plan;此时,Optimizer再通过各种基于规则的优化策略进行深入优化,得到Optimized Logical Plan;优化后的逻辑执行计划依然是逻辑的,并不能被Spark系统理解,此时需要将此逻辑执行计划转换为Physical Plan。

Physical Plan

屏幕快照 2020-04-27 下午9.07.14.png


从Optimizer LogicalPlan传入SparkSQL物理计划,并提交,整个PhysicalPlan经历了三个阶段

  • 转换为Iterator[PhysicalPlan]
  • SparkPlan
  • Prepared SparkPlan


SparkPlanner把Optimizer LogicalPlan转换为PhysicalPlan列表。SparkPlanner主要是通过物理计划策略(Strategy)作用于Optimizer LogicalPlan上,从而生成SparkPlan列表即Iterator[PhysicalPlan],此时的一个Optimizer LogicalPlan会对应多个SparkPlan。


在Iterator[PhysicalPlan]中选取一个最佳的SparkPlan


SparkPlan通过 prepareForExecution方法调用若干规则( Rule )进行转换为Prepared SparkPlan为能在各个节点上正确执行。Spark SQL中有多于65种以上的SparkPlan实现,涉及到数据源RDD的创建和各种数据处理等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package org.apache.spark.sql.execution

lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData)

lazy val sparkPlan: SparkPlan = {
SparkSession.setActiveSession(sparkSession)
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
// but we will implement to choose the best plan.
planner.plan(ReturnAnswer(optimizedPlan)).next()
}

// executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)

/** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[InternalRow] = executedPlan.execute()

/**
* Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal
* row format conversions as needed.
*/
protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
}

/** A sequence of rules that will be applied in order to the physical plan before execution. */
protected def preparations: Seq[Rule[SparkPlan]] = Seq(
python.ExtractPythonUDFs,
PlanSubqueries(sparkSession),
EnsureRequirements(sparkSession.sessionState.conf),
CollapseCodegenStages(sparkSession.sessionState.conf),
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf))


......

一条SQL经历了Parser,Analyzed,Optimizer之后再转换为最终能在Spark框架中实际执行的SparkPlan的物理算子树。物理算子树中,叶子类型SparkPlan是创建一个RDD开始,而往上遍历Tree过程中每个非叶子节点做一次Transformation,通过execute函数转换成新的RDD,最终会执行Action算子把结果返回给用户。


SparkPlan除了给RDD做Transformation的操作之外,还做分区,排序;同时除了执行execute方法之外,还会执行executeBroadcase方法,将数据直接广播到集群上。


SparkPlan分为三大块:
1,每个SparkPlan都会记住其元数据(Metadata)与指标(Metric)的信息。
2,RDD做Transformation操作时候还同时做了分区和排序
3,最后SparkPlan作为物理计划提交到Spark集群中执行,其中以execute为主,executeBroadcast为辅。


至此,将用户程序中的SQL/Dataset/DataFrame经过一系列操作,最终转化为Spark系统中执行的RDD。