简介
Apache Flink具有两个有关联的API:Table API和SQL,用于统一流和批处理(批流一体)。Table API是由Scala和Java语言集成的查询API,它允许以非常直观的方式,组合来自关系运算符(例如filter,join)的查询。
Flink的SQL支持基于实现SQL标准的Apache Calcite。无论输入是批处理输入(DataSet)还是流输入(DataStream),在两个接口中指定的查询都具有相同的语义并指定相同的结果。
Table API和SQL接口以及Flink的DataStream和DataSet API紧密集成在一起。您可以轻松地在所有API和基于API的库之间切换。例如,您可以使用CEP库从DataStream中提取模式,然后再使用Table API分析模式,或者您可以在预处理程序上运行Gelly图算法之前,使用SQL查询扫描,过滤和聚合批处理表。
请注意,Table API和SQL尚未完成功能,正在积极开发中。[Table API,SQL]和[stream,batch]输入的每种组合都不支持所有操作。
**
程序依赖
从Flink 1.9开始,Flink提供了两种不同的计划程序实现来评估Table&SQL API程序:Blink计划程序和Flink 1.9之前可用的旧计划程序。计划人员负责将关系运算符转换为可执行的,优化的Flink作业。两位计划者都带有不同的优化规则和运行时类。它们在支持的功能方面也可能有所不同。
注意**对于生产用例,建议使用Flink 1.9之前的旧计划器。
所有Table API和SQL组件都捆绑在flink-table或flink-table-blinkMaven构件中。
以下依赖关系与大多数项目有关:
- flink-table-common:用于通过自定义功能,格式等扩展表生态系统的通用模块。
- flink-table-api-java:适用于使用Java编程语言的纯表程序的Table&SQL API(处于开发初期,不建议使用!)。
- flink-table-api-scala:使用Scala编程语言的纯表程序的Table&SQL API(处于开发初期,不建议使用!)。
- flink-table-api-java-bridge:使用Java编程语言支持带有DataStream / DataSet API的Table&SQL API。
- flink-table-api-scala-bridge:使用Scala编程语言支持带有DataStream / DataSet API的Table&SQL API。
- flink-table-planner:表程序计划程序和运行时。这是1.9版本之前Flink的唯一计划者。仍然是推荐的一种。
- flink-table-planner-blink:新的Blink计划器。
- flink-table-runtime-blink:新的Blink运行时。
- flink-table-uber:将上述API模块以及旧的计划器打包到大多数Table&SQL API用例的分发中。默认情况下,超级JAR文件flink-table-*.jar位于/libFlink版本的目录中。
- flink-table-uber-blink:将上述API模块以及特定于Blink的模块打包到大多数Table&SQL API用例的分发中。默认情况下,超级JAR文件flink-table-blink-*.jar位于/libFlink版本的目录中。
有关如何在表程序中的新旧Blink规划器之间进行切换的更多信息,请参见通用API页面。
表程序依赖性
根据目标编程语言,您需要将Java或Scala API添加到项目中,以便使用Table API和SQL定义管道:
1 | <!-- Either... --> |
此外,如果要在IDE中本地运行Table API和SQL程序,则必须添加以下一组模块之一,具体取决于要使用的计划程序:
1 | <!-- Either... (for the old planner that was available before Flink 1.9) --> |
在内部,表生态系统的一部分在Scala中实现。因此,请确保为批处理和流应用程序都添加以下依赖项:
1 | <dependency> |
扩展依赖
如果要实现与Kafka或一组用户定义的函数进行交互的自定义格式,则以下依赖关系就足够了,并且可以用于SQL Client的JAR文件:
1 | <dependency> |
当前,该模块包括以下扩展点:
- SerializationSchemaFactory
- DeserializationSchemaFactory
- ScalarFunction
- TableFunction
- AggregateFunction
两个Planner之间主要的不同
- Blink将批处理作业视为流的特殊情况。因此,还不支持Table和DataSet之间的转换,并且批处理作业不会转换成DateSet程序,则还是和DataStream程序一样。
- Blink planner 不支持BatchTableSource,而是使用StreamTableSource 代替。
- Blink planner 程序仅支持全新的Catalog,不支持ExternalCatalog,ExternalCatalog不推荐使用。
- 使用FilterableTableSource在**old planner和Blink planner中是不相容的。old planner会将PlannerExpressions 按下FilterableTableSource,而Blink planner**的计划程序将Expressions 按下。
- 基于字符串的键值配置选项(有关详细信息,请参阅有关配置的文档)仅用于Blink计划器。
- 两个planner实现(CalciteConfig)PlannerConfig不同。
- Blink planner会将多个接收器优化为一个DAG(仅在上支持TableEnvironment,而不在上支持StreamTableEnvironment)。old planner将始终将每个接收器优化为一个新的DAG,其中所有DAG彼此独立。
- old planner程序现在不支持目录统计信息,而Blink计划程序则支持。
Table API & SQL程序框架
用于批处理和流式传输的所有Table API和SQL程序都遵循相同的模式。以下代码示例显示了Table API和SQL程序的通用结构。
1 | // create a TableEnvironment for specific planner batch or streaming |
创建TableEnvironment
这TableEnvironment是Table API和SQL集成的中心概念。它负责:
- 在内部catalog中注册Table
- 注册catalog
- 加载可插拔模块
- 执行SQL查询
- 注册用户定义的UDF函数
- 将DataStream或DataSet转换为Table
- 持有对ExecutionEnvironment或的引用StreamExecutionEnvironment
一张表始终绑定到特定的TableEnvironment。不可能在同一查询中组合不同TableEnvironments的表,例如,将它们join或union。
以下是四种不同类型的定义:
1 | // ********************** |
注意:如果/lib目录中只有一个计划器jar ,则可以使用useAnyPlanner(use_any_planner对于python)创建specific EnvironmentSettings。
在Catalog中创建表
TableEnvironment维护使用标识符创建的表目录的映射。每个标识符由3部分组成:目录名称,数据库名称和对象名称。如果未指定目录或数据库,则将使用当前默认值(请参阅表标识符扩展部分中的示例)。
表可以是虚拟(VIEWS)视图或常规(TABLES)。VIEWS可以从现有Table对象创建,通常是Table API或SQL查询的结果。TABLES描述外部数据,例如文件,数据库表或消息队列。
临时表与永久表
表可以是临时的,并与单个Flink会话的生命周期相关,也可以是永久的,并且在多个Flink会话和群集中可见。
永久表需要一个目录(例如Hive Metastore)来维护有关表的元数据。创建永久表后,连接到目录的任何Flink会话都可以看到该表,并且该表将一直存在,直到明确删除该表为止。
另一方面,临时表始终存储在内存中,并且仅在它们在其中创建的Flink会话期间存在。这些表对其他会话不可见。它们未绑定到任何目录或数据库,但可以在一个目录或数据库的名称空间中创建。如果删除了临时表的相应数据库,则不会删除这些临时表。
创建Table
Virtual Tables
TableAPI对象对应于VIEW(虚拟表)是一个SQL术语。它封装了逻辑查询计划。可以在目录中创建它,如下所示:
1 | // get a TableEnvironment |
注意: Table对象类似于VIEW关系数据库系统中的对象,即,定义的查询Table未经过优化,但是当另一个查询引用已注册时将内联Table。如果多个查询引用同一个已注册的查询Table,则将为每个引用查询内联该查询并执行多次,即,Table将不会共享已注册的结果。
Connector Tables
也可以从关系数据库中创建一个TABLE 。连接器描述了存储表数据的外部系统。可以在此处声明诸如Apacha Kafka之类的存储系统或常规文件系统。
1 | tableEnvironment |
查询表
Table API
Table API是用于Scala和Java的语言集成查询API。与SQL相反,查询未指定为SQL字符串,而是以编程语言算子逐步构成。
API基于Table代表表(流或批处理)的类,并提供应用关系操作的方法。这些方法返回一个新Table对象,该对象表示在input上应用关系运算的结果Table。一些关系操作由多个方法调用组成,例如table.groupBy(…).select(),其中groupBy(…)指定的分组table和select(…)对分组的投影table。该Table API文档介绍了支持流媒体和批次表中的所有表API操作。
以下示例显示了一个简单的Table API聚合查询:
1 | // get a TableEnvironment |
注意:** Scala Table API使用Scala符号,这些符号以单个记号(’)开头来引用的属性Table。Table API使用Scala隐式。确保导入,org.apache.flink.api.scala._然后org.apache.flink.table.api.scala._使用Scala隐式转换。
SQL
Flink的SQL集成基于实现SQL标准的Apache Calcite。SQL查询被指定为常规字符串。该SQL文件描述弗林克的流媒体和批量表的SQL支持。以下示例说明如何指定查询并以返回结果Table
。
1 | // get a TableEnvironment |
下面的示例演示如何指定将查询结果插入已注册表的更新查询。
1 | // get a TableEnvironment |
混合Table API和SQL
表API和SQL查询可以轻松混合,因为它们都返回Table对象:
- 可以在TableSQL查询返回的对象上定义Table API 查询。
- 可以通过在Table API查询的结果上定义SQL查询,方法是在中注册结果表,TableEnvironment然后在FROMSQL查询的子句中引用该表。
与DataStream和DataSet API集成
计划中的两个planner都可以与DataStreamAPI 集成。只有old planner程序可以与DataSet API集成,在批量处理情况下的Blink planner不能与两者结合使用。 注意:下面DataSet讨论的API仅与批量使用的old planner程序有关。
Table API和SQL查询可以轻松地与DataStream和DataSet程序集成并嵌入其中。例如,可以查询外部表(例如从RDBMS),进行一些预处理,例如过滤,投影,聚合或与元数据联接,然后使用DataStream或DataSet API(以及在这些API之上构建的任何库,例如CEP或Gelly)。相反,也可以将Table API或SQL查询应用于DataStream或DataSet程序的结果。
Scala的隐式转换
scala Table API功能的隐式转换DataSet,DataStream以及Table类。org.apache.flink.table.api.scala._除了导入org.apache.flink.api.scala._Scala DataStream API 的包外,还可以通过导入包来启用这些转换。
从DataStream或DataSet创建视图
一个DataStream或DataSet可在注册TableEnvironment为视图。结果视图的模式取决于注册的DataStream或的数据类型DataSet。请检查有关将数据类型映射到表模式的部分,以获取详细信息。
注意:从DataStream或创建的DataSet视图只能注册为临时视图。
1 | // get TableEnvironment |
将DataStream或DataSet转换为表
还可以将DataStream或DataSetin直接转换为aTable。如果要在Table API查询中使用Table,这将很方便。
1 | // get TableEnvironment |
将表转换为DataStream或DataSet
Table可以转换为DataStream或DataSet。这样,可以在Table API或SQL查询的结果上运行自定义DataStream或DataSet程序。
将Table转换为DataStreamor时DataSet,您需要指定结果DataStreamor DataSet的数据类型,即,将Table转换为的数据类型。最方便的转换类型通常是Row。以下列表概述了不同选项的功能:
- Row:字段按位置,任意数量的字段,对
null
值的支持,没有类型安全的访问进行映射。 - POJO:字段按名称映射(POJO字段必须命名为
Table
字段),任意数量的字段,支持null
值,类型安全访问。 - 案例类:字段按位置映射,不支持
null
值,类型安全访问。 - 元组:按位置映射字段,限制为22(Scala)或25(Java)字段,不支持
null
值,类型安全访问。 - 原子类型:
Table
必须具有单个字段,不支持null
值,类型安全访问。将Table转换为Stream
对Table数据进行查询,数据流将动态更新,随着新记录到达查询的输入流,它会发生变化。因此,将DataStream这种动态查询转换内容,需要对表的更新进行编码。
有两种将Table转化成DataStream的模式:
- Append Mode: 仅仅在对Table进行Insert动态操作的情况下才能使用此模式,即仅追加的情况,并且以前的结果不会更新。
- Retract Mode: 始终可以使用此模式。它使用标志进行编码INSERT和DELETE更改boolean。
1 | // get TableEnvironment. |
注意:有关动态表及其属性的详细讨论,请参见“ 动态表”文档。
将Table转换为DataSet
Table转换为DataSet如下:
1 | // get TableEnvironment |
**Table API DataStream演示**
1 | package tableAPI |