/** * Executes a SQL query using Spark, returning the result as a `DataFrame`. * The dialect that is used for SQL parsing can be configured with 'spark.sql.dialect'. * * @since 2.0.0 */ defsql(sqlText: String): DataFrame = { Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText)) }
⬇️
1 2 3 4 5 6 7 8
traitParserInterface{ /** * Parse a string to a [[LogicalPlan]]. */ @throws[ParseException]("Text cannot be parsed to a LogicalPlan") defparsePlan(sqlText: String): LogicalPlan ....... }
/** Creates LogicalPlan for a given SQL string. */ overridedefparsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser => astBuilder.visitSingleStatement(parser.singleStatement()) match { case plan: LogicalPlan => plan case _ => val position = Origin(None, None) thrownewParseException(Option(sqlText), "Unsupported SQL statement", position, position) } }
/** * Executes a SQL query using Spark, returning the result as a `DataFrame`. * The dialect that is used for SQL parsing can be configured with 'spark.sql.dialect'. * * @since 2.0.0 */ defsql(sqlText: String): DataFrame = { Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText)) }
private[sql] objectDataset{ defapply[T: Encoder](sparkSession: SparkSession, logicalPlan: LogicalPlan): Dataset[T] = { val dataset = newDataset(sparkSession, logicalPlan, implicitly[Encoder[T]]) // Eagerly bind the encoder so we verify that the encoder matches the underlying // schema. The user will get an error if this is not the case. dataset.deserializer dataset }
defofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = { val qe = sparkSession.sessionState.executePlan(logicalPlan) qe.assertAnalyzed() newDataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema)) } }
classQueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
// TODO: Move the planner an optimizer into here from SessionState. protecteddefplanner= sparkSession.sessionState.planner
defassertAnalyzed(): Unit = analyzed
defassertSupported(): Unit = { if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) { UnsupportedOperationChecker.checkForBatch(analyzed) } }
package org.apache.spark.sql.catalyst.analysis /** * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and * [[UnresolvedRelation]]s into fully typed objects using information in a [[SessionCatalog]]. */ classAnalyzer( catalog: SessionCatalog, conf: SQLConf, maxIterations: Int) extendsRuleExecutor[LogicalPlan] withCheckAnalysis{
/** * Override to provide additional rules for the "Resolution" batch. */ val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Nil
/** * Override to provide rules to do post-hoc resolution. Note that these rules will be executed * in an individual batch. This batch is to run right after the normal resolution batch and * execute its rules in one pass. */ val postHocResolutionRules: Seq[Rule[LogicalPlan]] = Nil
/** * An execution strategy for rules that indicates the maximum number of executions. If the * execution reaches fix point (i.e. converge) before maxIterations, it will stop. */ abstractclassStrategy{ defmaxIterations: Int }
/** A strategy that only runs once. */ caseobjectOnceextendsStrategy{ val maxIterations = 1 }
/** A strategy that runs until fix point or maxIterations times, whichever comes first. */ caseclassFixedPoint(maxIterations: Int) extendsStrategy /**Abatchofrules. */ protectedcaseclassBatch(name: String, strategy: Strategy, rules: Rule[TreeType]*) /**Definesasequenceofrulebatches, tobeoverriddenbytheimplementation. */ protecteddefbatches: Seq[Batch]
/** * Defines a check function that checks for structural integrity of the plan after the execution * of each rule. For example, we can check whether a plan is still resolved after each rule in * `Optimizer`, so we can catch rules that return invalid plans. The check function returns * `false` if the given plan doesn't pass the structural integrity check. */ protecteddefisPlanIntegral(plan: TreeType): Boolean = true
/** * Executes the batches of rules defined by the subclass. The batches are executed serially * using the defined execution strategy. Within each batch, rules are also executed serially. */ defexecute(plan: TreeType): TreeType = { var curPlan = plan val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
batches.foreach { batch => val batchStartPlan = curPlan var iteration = 1 var lastPlan = curPlan varcontinue = true
// Run until fix point (or the max number of iterations as specified in the strategy. while (continue) { curPlan = batch.rules.foldLeft(curPlan) { case (plan, rule) => val startTime = System.nanoTime() val result = rule(plan) val runTime = System.nanoTime() - startTime
// Run the structural integrity checker against the plan after each rule. if (!isPlanIntegral(result)) { val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " + "the structural integrity of the plan is broken." thrownewTreeNodeException(result, message, null) }
result } iteration += 1 if (iteration > batch.strategy.maxIterations) { // Only log if this is a rule that is supposed to run more than once. if (iteration != 2) { val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" if (Utils.isTesting) { thrownewTreeNodeException(curPlan, message, null) } else { logWarning(message) } } continue = false }
if (curPlan.fastEquals(lastPlan)) { logTrace( s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.") continue = false } lastPlan = curPlan }
if (!batchStartPlan.fastEquals(curPlan)) { logDebug( s""" |=== Result of Batch ${batch.name} === |${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")} """.stripMargin) } else { logTrace(s"Batch ${batch.name} has no effect.") } }
curPlan } }
此函数实现了针对analyzer类中定义的每一个batch(类别),按照batch中定义的fix point(策略)和rule(规则)对Unresolved的逻辑计划进行解析。这里提到的batch,可以在上面的class Analyzer中看到,lazy val batches: Seq[Batch],这里定义了一组转换规则。
/** Name for this rule, automatically inferred based on class name. */ val ruleName: String = { val className = getClass.getName if (className endsWith "$") className.dropRight(1) else className }
defapply(plan: TreeType): TreeType }
rule(plan),这里的参数plan是应用rule.apply转化里面的TreeNode
分析后,每张表对应的字段集,字段类型,数据存储位置都已确定。Project 与 Filter 操作的字段类型以及在表中的位置也已确定。有了这些信息,已经可以直接将该 LogicalPlan 转换为 Physical Plan 进行执行。
/** * The primary workflow for executing relational queries using Spark. Designed to allow easy * access to the intermediate phases of query execution for developers. * * While this is not a public class, we should avoid changing the function names for the sake of * changing them, because a lot of developers use the feature for debugging. */ classQueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
// TODO: Move the planner an optimizer into here from SessionState. protecteddefplanner= sparkSession.sessionState.planner
defassertAnalyzed(): Unit = analyzed
defassertSupported(): Unit = { if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) { UnsupportedOperationChecker.checkForBatch(analyzed) } }
lazyval sparkPlan: SparkPlan = { SparkSession.setActiveSession(sparkSession) // TODO: We use next(), i.e. take the first plan returned by the planner, here for now, // but we will implement to choose the best plan. planner.plan(ReturnAnswer(optimizedPlan)).next() } .......
package org.apache.spark.sql.catalyst.optimizer /** * Abstract class all optimizers should inherit of, contains the standard batches (extending * Optimizers can override this. */ abstractclassOptimizer(sessionCatalog: SessionCatalog) extendsRuleExecutor[LogicalPlan] {
// Check for structural integrity of the plan in test mode. Currently we only check if a plan is // still resolved after the execution of each rule. overrideprotecteddefisPlanIntegral(plan: LogicalPlan): Boolean = { !Utils.isTesting || plan.resolved }
val operatorOptimizationBatch: Seq[Batch] = { val rulesWithoutInferFiltersFromConstraints = operatorOptimizationRuleSet.filterNot(_ == InferFiltersFromConstraints) Batch("Operator Optimization before Inferring Filters", fixedPoint, rulesWithoutInferFiltersFromConstraints: _*) :: Batch("Infer Filters", Once, InferFiltersFromConstraints) :: Batch("Operator Optimization after Inferring Filters", fixedPoint, rulesWithoutInferFiltersFromConstraints: _*) :: Nil }
(Batch("Eliminate Distinct", Once, EliminateDistinct) :: // Technically some of the rules in Finish Analysis are not optimizer rules and belong more // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime). // However, because we also use the analyzer to canonicalized queries (for view definition), // we do not eliminate subqueries or compute current time in the analyzer. Batch("Finish Analysis", Once, EliminateSubqueryAliases, EliminateView, ReplaceExpressions, ComputeCurrentTime, GetCurrentDatabase(sessionCatalog), RewriteDistinctAggregates, ReplaceDeduplicateWithAggregate) :: ////////////////////////////////////////////////////////////////////////////////////////// // Optimizer rules start here ////////////////////////////////////////////////////////////////////////////////////////// // - Do the first call of CombineUnions before starting the major Optimizer rules, // since it can reduce the number of iteration and the other rules could add/move // extra operators between two adjacent Union operators. // - Call CombineUnions again in Batch("Operator Optimizations"), // since the other rules might make two separate Unions operators adjacent. Batch("Union", Once, CombineUnions) :: Batch("Pullup Correlated Expressions", Once, PullupCorrelatedPredicates) :: Batch("Subquery", Once, OptimizeSubqueries) :: Batch("Replace Operators", fixedPoint, ReplaceIntersectWithSemiJoin, ReplaceExceptWithFilter, ReplaceExceptWithAntiJoin, ReplaceDistinctWithAggregate) :: Batch("Aggregate", fixedPoint, RemoveLiteralFromGroupExpressions, RemoveRepetitionFromGroupExpressions) :: Nil ++ operatorOptimizationBatch) :+ Batch("Join Reorder", Once, CostBasedJoinReorder) :+ Batch("Decimal Optimizations", fixedPoint, DecimalAggregates) :+ Batch("Object Expressions Optimization", fixedPoint, EliminateMapObjects, CombineTypedFilters) :+ Batch("LocalRelation", fixedPoint, ConvertToLocalRelation, PropagateEmptyRelation) :+ // The following batch should be executed after batch "Join Reorder" and "LocalRelation". Batch("Check Cartesian Products", Once, CheckCartesianProducts) :+ Batch("RewriteSubquery", Once, RewritePredicateSubquery, ColumnPruning, CollapseProject, RemoveRedundantProject) }
lazyval sparkPlan: SparkPlan = { SparkSession.setActiveSession(sparkSession) // TODO: We use next(), i.e. take the first plan returned by the planner, here for now, // but we will implement to choose the best plan. planner.plan(ReturnAnswer(optimizedPlan)).next() }
// executedPlan should not be used to initialize any SparkPlan. It should be // only used for execution. lazyval executedPlan: SparkPlan = prepareForExecution(sparkPlan)
/** Internal version of the RDD. Avoids copies and has no schema */ lazyval toRdd: RDD[InternalRow] = executedPlan.execute()
/** * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal * row format conversions as needed. */ protecteddefprepareForExecution(plan: SparkPlan): SparkPlan = { preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) } }
/** A sequence of rules that will be applied in order to the physical plan before execution. */ protecteddefpreparations: Seq[Rule[SparkPlan]] = Seq( python.ExtractPythonUDFs, PlanSubqueries(sparkSession), EnsureRequirements(sparkSession.sessionState.conf), CollapseCodegenStages(sparkSession.sessionState.conf), ReuseExchange(sparkSession.sessionState.conf), ReuseSubquery(sparkSession.sessionState.conf))