Flink Table API & SQL

简介


Apache Flink具有两个有关联的API:Table API和SQL,用于统一流和批处理(批流一体)。Table API是由Scala和Java语言集成的查询API,它允许以非常直观的方式,组合来自关系运算符(例如filter,join)的查询。


Flink的SQL支持基于实现SQL标准的Apache Calcite。无论输入是批处理输入(DataSet)还是流输入(DataStream),在两个接口中指定的查询都具有相同的语义并指定相同的结果。


Table API和SQL接口以及Flink的DataStream和DataSet API紧密集成在一起。您可以轻松地在所有API和基于API的库之间切换。例如,您可以使用CEP库从DataStream中提取模式,然后再使用Table API分析模式,或者您可以在预处理程序上运行Gelly图算法之前,使用SQL查询扫描,过滤和聚合批处理表。


请注意,Table API和SQL尚未完成功能,正在积极开发中。[Table API,SQL]和[stream,batch]输入的每种组合都不支持所有操作。
**

程序依赖


从Flink 1.9开始,Flink提供了两种不同的计划程序实现来评估Table&SQL API程序:Blink计划程序和Flink 1.9之前可用的旧计划程序。计划人员负责将关系运算符转换为可执行的,优化的Flink作业。两位计划者都带有不同的优化规则和运行时类。它们在支持的功能方面也可能有所不同。


注意**对于生产用例,建议使用Flink 1.9之前的旧计划器。


所有Table API和SQL组件都捆绑在flink-table或flink-table-blinkMaven构件中。
以下依赖关系与大多数项目有关:

  • flink-table-common:用于通过自定义功能,格式等扩展表生态系统的通用模块。
  • flink-table-api-java:适用于使用Java编程语言的纯表程序的Table&SQL API(处于开发初期,不建议使用!)。
  • flink-table-api-scala:使用Scala编程语言的纯表程序的Table&SQL API(处于开发初期,不建议使用!)。
  • flink-table-api-java-bridge:使用Java编程语言支持带有DataStream / DataSet API的Table&SQL API。
  • flink-table-api-scala-bridge:使用Scala编程语言支持带有DataStream / DataSet API的Table&SQL API。
  • flink-table-planner:表程序计划程序和运行时。这是1.9版本之前Flink的唯一计划者。仍然是推荐的一种。
  • flink-table-planner-blink:新的Blink计划器。
  • flink-table-runtime-blink:新的Blink运行时。
  • flink-table-uber:将上述API模块以及旧的计划器打包到大多数Table&SQL API用例的分发中。默认情况下,超级JAR文件flink-table-*.jar位于/libFlink版本的目录中。
  • flink-table-uber-blink:将上述API模块以及特定于Blink的模块打包到大多数Table&SQL API用例的分发中。默认情况下,超级JAR文件flink-table-blink-*.jar位于/libFlink版本的目录中。

有关如何在表程序中的新旧Blink规划器之间进行切换的更多信息,请参见通用API页面。

表程序依赖性


根据目标编程语言,您需要将Java或Scala API添加到项目中,以便使用Table API和SQL定义管道:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<!-- Either... -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
<!-- or... -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>


此外,如果要在IDE中本地运行Table API和SQL程序,则必须添加以下一组模块之一,具体取决于要使用的计划程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<!-- Either... (for the old planner that was available before Flink 1.9) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
<!-- or.. (for the new Blink planner) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>


在内部,表生态系统的一部分在Scala中实现。因此,请确保为批处理和流应用程序都添加以下依赖项:

1
2
3
4
5
6
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>

扩展依赖


如果要实现与Kafka或一组用户定义的函数进行交互的自定义格式,则以下依赖关系就足够了,并且可以用于SQL Client的JAR文件:

1
2
3
4
5
6
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>


当前,该模块包括以下扩展点:

  • SerializationSchemaFactory
  • DeserializationSchemaFactory
  • ScalarFunction
  • TableFunction
  • AggregateFunction

两个Planner之间主要的不同

  1. Blink将批处理作业视为流的特殊情况。因此,还不支持Table和DataSet之间的转换,并且批处理作业不会转换成DateSet程序,则还是和DataStream程序一样。
  2. Blink planner 不支持BatchTableSource,而是使用StreamTableSource 代替。
  3. Blink planner 程序仅支持全新的Catalog,不支持ExternalCatalog,ExternalCatalog不推荐使用。
  4. 使用FilterableTableSource在**old plannerBlink planner中是不相容的。old planner会将PlannerExpressions 按下FilterableTableSource,而Blink planner**的计划程序将Expressions 按下。
  5. 基于字符串的键值配置选项(有关详细信息,请参阅有关配置的文档)仅用于Blink计划器。
  6. 两个planner实现(CalciteConfig)PlannerConfig不同。
  7. Blink planner会将多个接收器优化为一个DAG(仅在上支持TableEnvironment,而不在上支持StreamTableEnvironment)。old planner将始终将每个接收器优化为一个新的DAG,其中所有DAG彼此独立。
  8. old planner程序现在不支持目录统计信息,而Blink计划程序则支持。

Table API & SQL程序框架


用于批处理和流式传输的所有Table API和SQL程序都遵循相同的模式。以下代码示例显示了Table API和SQL程序的通用结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// create a TableEnvironment for specific planner batch or streaming
val tableEnv = ... // see "Create a TableEnvironment" section

// create a Table
tableEnv.connect(...).createTemporaryTable("table1")
// register an output Table
tableEnv.connect(...).createTemporaryTable("outputTable")

// create a Table from a Table API query
val tapiResult = tableEnv.from("table1").select(...)

// create a Table from a SQL query
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table1 ...")

// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.insertInto("outputTable")

// execute
tableEnv.execute("scala_job")

创建TableEnvironment


这TableEnvironment是Table API和SQL集成的中心概念。它负责:

  • 在内部catalog中注册Table
  • 注册catalog
  • 加载可插拔模块
  • 执行SQL查询
  • 注册用户定义的UDF函数
  • 将DataStream或DataSet转换为Table
  • 持有对ExecutionEnvironment或的引用StreamExecutionEnvironment


一张表始终绑定到特定的TableEnvironment。不可能在同一查询中组合不同TableEnvironments的表,例如,将它们join或union。


以下是四种不同类型的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.StreamTableEnvironment

val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings)
// or val fsTableEnv = TableEnvironment.create(fsSettings)

// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.scala.BatchTableEnvironment
val fbEnv = ExecutionEnvironment.getExecutionEnvironment
val fbTableEnv = BatchTableEnvironment.create(fbEnv)

// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.StreamTableEnvironment
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
// or val bsTableEnv = TableEnvironment.create(bsSettings)

// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)

注意:如果/lib目录中只有一个计划器jar ,则可以使用useAnyPlanner(use_any_planner对于python)创建specific EnvironmentSettings。

在Catalog中创建表


TableEnvironment维护使用标识符创建的表目录的映射。每个标识符由3部分组成:目录名称,数据库名称和对象名称。如果未指定目录或数据库,则将使用当前默认值(请参阅表标识符扩展部分中的示例)。
表可以是虚拟(VIEWS)视图或常规(TABLES)。VIEWS可以从现有Table对象创建,通常是Table API或SQL查询的结果。TABLES描述外部数据,例如文件,数据库表或消息队列。

临时表与永久表

表可以是临时的,并与单个Flink会话的生命周期相关,也可以是永久的,并且在多个Flink会话和群集中可见。
永久表需要一个目录(例如Hive Metastore)来维护有关表的元数据。创建永久表后,连接到目录的任何Flink会话都可以看到该表,并且该表将一直存在,直到明确删除该表为止。


另一方面,临时表始终存储在内存中,并且仅在它们在其中创建的Flink会话期间存在。这些表对其他会话不可见。它们未绑定到任何目录或数据库,但可以在一个目录或数据库的名称空间中创建。如果删除了临时表的相应数据库,则不会删除这些临时表。

创建Table

Virtual Tables
TableAPI对象对应于VIEW(虚拟表)是一个SQL术语。它封装了逻辑查询计划。可以在目录中创建它,如下所示:

1
2
3
4
5
6
7
8
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// table is the result of a simple projection query
val projTable: Table = tableEnv.from("X").select(...)

// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable)


注意: Table对象类似于VIEW关系数据库系统中的对象,即,定义的查询Table未经过优化,但是当另一个查询引用已注册时将内联Table。如果多个查询引用同一个已注册的查询Table,则将为每个引用查询内联该查询并执行多次,即,Table将不会共享已注册的结果。


Connector Tables


也可以从关系数据库中创建一个TABLE 。连接器描述了存储表数据的外部系统。可以在此处声明诸如Apacha Kafka之类的存储系统或常规文件系统。

1
2
3
4
5
6
tableEnvironment
.connect(...)
.withFormat(...)
.withSchema(...)
.inAppendMode()
.createTemporaryTable("MyTable")

查询表

Table API

Table API是用于Scala和Java的语言集成查询API。与SQL相反,查询未指定为SQL字符串,而是以编程语言算子逐步构成。


API基于Table代表表(流或批处理)的类,并提供应用关系操作的方法。这些方法返回一个新Table对象,该对象表示在input上应用关系运算的结果Table。一些关系操作由多个方法调用组成,例如table.groupBy(…).select(),其中groupBy(…)指定的分组table和select(…)对分组的投影table。该Table API文档介绍了支持流媒体和批次表中的所有表API操作。


以下示例显示了一个简单的Table API聚合查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section

// register Orders table
// scan registered Orders table
val orders = tableEnv.from("Orders")

// compute revenue for all customers from France
val revenue = orders
.filter('cCountry === "FRANCE")
.groupBy('cID, 'cName)
.select('cID, 'cName, 'revenue.sum AS 'revSum)
// emit or convert Table
// execute query


注意:** Scala Table API使用Scala符号,这些符号以单个记号(’)开头来引用的属性Table。Table API使用Scala隐式。确保导入,org.apache.flink.api.scala._然后org.apache.flink.table.api.scala._使用Scala隐式转换。

SQL

Flink的SQL集成基于实现SQL标准的Apache Calcite。SQL查询被指定为常规字符串。该SQL文件描述弗林克的流媒体和批量表的SQL支持。以下示例说明如何指定查询并以返回结果Table

1
2
3
4
5
6
7
8
9
10
11
12
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// register Orders table
// compute revenue for all customers from France
val revenue = tableEnv.sqlQuery("""
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
// emit or convert Table
// execute query


下面的示例演示如何指定将查询结果插入已注册表的更新查询。

1
2
3
4
5
6
7
8
9
10
11
12
13
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// register "Orders" table
// register "RevenueFrance" output table
// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.sqlUpdate("""
|INSERT INTO RevenueFrance
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
// execute query

混合Table API和SQL

表API和SQL查询可以轻松混合,因为它们都返回Table对象:

  • 可以在TableSQL查询返回的对象上定义Table API 查询。
  • 可以通过在Table API查询的结果上定义SQL查询,方法是在中注册结果表,TableEnvironment然后在FROMSQL查询的子句中引用该

与DataStream和DataSet API集成

计划中的两个planner都可以与DataStreamAPI 集成。只有old planner程序可以与DataSet API集成,在批量处理情况下的Blink planner不能与两者结合使用。 注意:下面DataSet讨论的API仅与批量使用的old planner程序有关。


Table API和SQL查询可以轻松地与DataStreamDataSet程序集成并嵌入其中。例如,可以查询外部表(例如从RDBMS),进行一些预处理,例如过滤,投影,聚合或与元数据联接,然后使用DataStream或DataSet API(以及在这些API之上构建的任何库,例如CEP或Gelly)。相反,也可以将Table API或SQL查询应用于DataStream或DataSet程序的结果。

Scala的隐式转换

scala Table API功能的隐式转换DataSet,DataStream以及Table类。org.apache.flink.table.api.scala._除了导入org.apache.flink.api.scala._Scala DataStream API 的包外,还可以通过导入包来启用这些转换。

从DataStream或DataSet创建视图

一个DataStream或DataSet可在注册TableEnvironment为视图。结果视图的模式取决于注册的DataStream或的数据类型DataSet。请检查有关将数据类型映射到表模式的部分,以获取详细信息。


注意:从DataStream或创建的DataSet视图只能注册为临时视图。

1
2
3
4
5
6
7
8
9
10
11
// get TableEnvironment 
// registration of a DataSet is equivalent
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section

val stream: DataStream[(Long, String)] = ...

// register the DataStream as View "myTable" with fields "f0", "f1"
tableEnv.createTemporaryView("myTable", stream)

// register the DataStream as View "myTable2" with fields "myLong", "myString"
tableEnv.createTemporaryView("myTable2", stream, 'myLong, 'myString)

将DataStream或DataSet转换为表

还可以将DataStream或DataSetin直接转换为aTable。如果要在Table API查询中使用Table,这将很方便。

1
2
3
4
5
6
7
8
// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = ... // see "Create a TableEnvironment" section
val stream: DataStream[(Long, String)] = ...
// convert the DataStream into a Table with default fields '_1, '_2
val table1: Table = tableEnv.fromDataStream(stream)
// convert the DataStream into a Table with fields 'myLong, 'myString
val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)

将表转换为DataStream或DataSet

Table可以转换为DataStream或DataSet。这样,可以在Table API或SQL查询的结果上运行自定义DataStream或DataSet程序。


将Table转换为DataStreamor时DataSet,您需要指定结果DataStreamor DataSet的数据类型,即,将Table转换为的数据类型。最方便的转换类型通常是Row。以下列表概述了不同选项的功能:

  • Row:字段按位置,任意数量的字段,对null值的支持,没有类型安全的访问进行映射。
  • POJO:字段按名称映射(POJO字段必须命名为Table字段),任意数量的字段,支持null值,类型安全访问。
  • 案例类:字段按位置映射,不支持null值,类型安全访问。
  • 元组:按位置映射字段,限制为22(Scala)或25(Java)字段,不支持null值,类型安全访问。
  • 原子类型Table必须具有单个字段,不支持null值,类型安全访问。

    将Table转换为Stream


对Table数据进行查询,数据流将动态更新,随着新记录到达查询的输入流,它会发生变化。因此,将DataStream这种动态查询转换内容,需要对表的更新进行编码。


有两种将Table转化成DataStream的模式:

  1. Append Mode: 仅仅在对Table进行Insert动态操作的情况下才能使用此模式,即仅追加的情况,并且以前的结果不会更新。
  2. Retract Mode: 始终可以使用此模式。它使用标志进行编码INSERT和DELETE更改boolean。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// get TableEnvironment. 
// registration of a DataSet is equivalent
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section

// Table with two fields (String name, Integer age)
val table: Table = ...

// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)

// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple =
tableEnv.toAppendStream[(String, Int)](table)
// convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream[(Boolean, X)].
// The boolean field indicates the type of the change.
// True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

注意:有关动态表及其属性的详细讨论,请参见“ 动态表”文档。

将Table转换为DataSet


Table转换为DataSet如下:

1
2
3
4
5
6
7
8
9
10
11
12
// get TableEnvironment 
// registration of a DataSet is equivalent
val tableEnv = BatchTableEnvironment.create(env)

// Table with two fields (String name, Integer age)
val table: Table = ...

// convert the Table into a DataSet of Row
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)

// convert the Table into a DataSet of Tuple2[String, Int]
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)


**Table API DataStream演示**
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package tableAPI

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.types.Row

object table_stream {

case class GameData(user_id: String, game_id: String, game_time: Long, game_score: Int)

def main(args: Array[String]): Unit = {
val stream_env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val table_env: StreamTableEnvironment = StreamTableEnvironment.create(stream_env, settings)
// stream_env.setParallelism(1)
val socketStream: DataStream[String] = stream_env.socketTextStream("localhost", 8888)

val gameStream: DataStream[GameData] = socketStream.map((line: String) => {
val array_data: Array[String] = line.split(",")
GameData(array_data(0), array_data(1), array_data(2).toLong, array_data(3).toInt)
})



// 打印表结构
// table_env.registerDataStream("t_game_detail", gameStream)
// val t_game_detail: Table = table_env.scan("t_game_detail")
// t_game_detail.printSchema()


//查询,去重操作
//这里需注意,使用去重操作时,不能使用toAppendStream。
//val t_game_detail: Table = table_env.fromDataStream(gameStream).select("user_id").distinct()

//过滤操作
//val t_game_detail: Table = table_env.fromDataStream(gameStream).filter("game_score>500")

//分组聚合操作
import org.apache.flink.table.api.scala._
val t_game_detail: Table = table_env.fromDataStream(gameStream).groupBy('user_id).select('user_id, 'user_id.count as 'count)//.select('user_id, 'game_score.sum as 'sum_game_score)
//.groupBy('user_id).select('user_id, 'game_score.avg as 'avg_game_score)


// 这里如果对原始数据没有做字段的添加,修改,删除等,toAppendStream的类型可以为case class类型
// 如果对字段进行修改了,可以设置为Row类型,会简化操作
val result: DataStream[(Boolean, Row)] = table_env.toRetractStream[Row](t_game_detail).filter((_: (Boolean, Row))._1 == true)


result.print("t_game_detail")

stream_env.execute("stream table")


}

}