Table API是用于流和批处理的统一的关系API。Table API查询可以直接在批处理或流输入上运行而无需修改。Table API是SQL语言的超集,是专门为与Apache Flink配合使用而设计的。Table API是用于Scala和Java的语言集成的API。Table API查询不是将查询指定为SQL,而是以Java或Scala中的语言嵌入样式定义,并具有IDE支持,例如自动完成和语法验证。
Table API与Flink的SQL集成共享其API的许多概念和部分。看一下Common Concepts&API,了解如何注册表或创建Table对象。“ 流概念”页面讨论了流的特定概念,例如动态表和时间属性。
下面我们将进行Table API在流式处理中的操作,在实际的工作中,Flink也是大多数场景都是在流计算中使用。
Group Windows
“Group”窗口根据时间或行计数间隔将组行聚合为有限的组,并每组评估一次聚合函数。对于批处理表,窗口是按时间间隔对记录进行分组的便捷快捷方式。
Windows是使用该window(w: GroupWindow)子句定义的,并且需要使用该as子句指定的别名。为了按窗口对表进行分组,必须在groupBy(…)子句中像常规分组属性一样引用窗口别名。以下示例显示如何在表上定义窗口聚合。
1 | val table = input |
在流环境中,如果窗口聚合除对窗口以外的一个或多个属性进行分组,则只能并行计算窗口聚合,即,groupBy(…)子句引用了窗口别名和至少一个其他属性。groupBy(…)子句仅引用一个窗口别名(如在上面的例子)只能由一个单一的,非平行的任务进行评估。以下示例显示如何使用其他分组属性定义窗口聚合。
1 | val table = input |
窗口属性诸如开始,结束,或一个时间窗口的rowtime时间戳可以在选择语句被添加为窗口别名作为的一个属性w.start,w.end以及w.rowtime分别。窗口开始和行时间时间戳是包含窗口的上下边界。相反,窗口结束时间戳是唯一的窗口上边界。例如,从下午2点开始的30分钟滚动窗口将具有14:00:00.000开始时间戳记,14:29:59.999行时间时间戳记和14:30:00.000结束时间戳记。
1 | val table = input |
该Window参数定义行如何映射到窗口。Window不是用户可以实现的接口。相反,Table API提供了一组Window具有特定语义的预定义类,这些预定义类被转换为基础DataStream或DataSet操作。支持的窗口定义在下面列出。
Tumbling Windows
滚动窗口将行分配给固定长度的非重叠连续窗口。例如,5分钟的滚动窗口以5分钟为间隔对行进行分组。可以在事件时间,处理时间或行数上定义滚动窗口。
滚动窗口是使用以下Tumble
类定义的:
方法 | 描述 |
---|---|
over | 将窗口的长度定义为时间或行计数间隔。 |
on | 用于分组(时间间隔)或排序(行计数)的时间属性。对于批查询,它可以是任何Long或Timestamp属性。对于流查询,它必须是声明的event-time或processing-time time属性。 |
as | 为窗口分配别名。别名用于在以下groupBy() 子句中引用窗口,并可以选择在子句中选择窗口属性,例如窗口开始,结束或行时间时间戳select() 。 |
1 | // Tumbling Event-time Window |
Sliding Windows
滑动窗口的大小固定,并以指定的滑动间隔滑动。如果滑动间隔小于窗口大小,则滑动窗口重叠。因此,可以将行分配给多个窗口。例如,一个15分钟大小的滑动窗口和5分钟的滑动间隔将每行分配给3个15分钟大小的不同窗口,这些窗口以5分钟的间隔进行评估。可以在事件时间,处理时间或行数上定义滑动窗口。
滑动窗口是通过使用以下Slide
类来定义的:
方法 | 描述 |
---|---|
over | 将窗口的长度定义为时间或行计数间隔。 |
every | 将幻灯片间隔定义为时间间隔或行计数间隔。滑动间隔必须与尺寸间隔具有相同的类型。 |
on | 用于分组(时间间隔)或排序(行计数)的时间属性。对于批查询,它可以是任何Long或Timestamp属性。对于流查询,它必须是声明的event-time或processing-time time属性。 |
as | 为窗口分配别名。别名用于在以下groupBy() 子句中引用窗口,并可以选择在子句中选择窗口属性,例如窗口开始,结束或行时间时间戳select() 。 |
1 | // Sliding Event-time Window |
Session Windows
会话窗口没有固定的大小,但是其边界由不活动的时间间隔定义,即,如果在定义的间隔时间段内未出现任何事件,则会话窗口关闭。例如,间隔30分钟的会话窗口在30分钟不活动后观察到一行时开始(否则该行将被添加到现有窗口),如果在30分钟内未添加任何行,则关闭该窗口。会话窗口可以在事件时间或处理时间工作。
通过使用以下Session
类定义会话窗口:
方法 | 描述 |
---|---|
withGap | 将两个窗口之间的间隔定义为时间间隔。 |
on | 用于分组(时间间隔)或排序(行计数)的时间属性。对于批查询,它可以是任何Long或Timestamp属性。对于流查询,它必须是声明的event-time或processing-time time属性。 |
as | 为窗口分配别名。别名用于在以下groupBy() 子句中引用窗口,并可以选择在子句中选择窗口属性,例如窗口开始,结束或行时间时间戳select() 。 |
1 | // Session Event-time Window |
Over Windows
窗口聚合是标准SQL(OVER子句)中已知的,并在SELECT查询的子句中定义。与在GROUP BY子句中指定的组窗口不同,在窗口上方不会折叠行。取而代之的是,在窗口聚合中,为每个输入行在其相邻行的范围内计算一个聚合。
窗口是使用window(w: OverWindow)子句(over_window(OverWindow)在Python API中使用)定义的,并通过select()方法中的别名进行引用。以下示例显示如何在表上定义窗口聚合
1 | val table = input |
OverWindow在限定范围内的数据对其进行聚合计算。OverWindow不是用户可以实现的接口。相反,Table API提供了Over用于配置上方窗口属性的类。可以在事件时间或处理时间以及指定为时间间隔或行计数的范围上定义窗口上方。受支持的over窗口定义作为Over(和其他类)上的方法公开,并在下面列出:
方法 | 需要 | 描述 |
---|---|---|
partitionBy | 可选的 | 在一个或多个属性上定义输入的分区。每个分区都经过单独排序,并且聚合函数分别应用于每个分区。 注意:在流环境中,仅当窗口包含partition by子句时,才可以并行计算整个窗口聚合。如果没有 partitionBy(...) 流,则由单个非并行任务处理。 |
orderBy | 需要 | 定义每个分区内的行顺序,从而定义将聚合函数应用于行的顺序。 注意:对于流查询,它必须是声明的event-time或processing-time time属性。当前,仅支持单个sort属性。 |
preceding | 可选的 | 定义窗口中包含的并在当前行之前的行的间隔。该间隔可以指定为时间间隔或行计数间隔。 用时间间隔的大小(例如, 10.minutes 时间间隔或10.rows 行计数间隔)指定窗口边界。窗口上的无边界是使用常量指定的,即 UNBOUNDED_RANGE 用于时间间隔或UNBOUNDED_ROW 用于行计数间隔。Windows上的无边界从分区的第一行开始。如果 preceding 条款被省略,UNBOUNDED_RANGE 并且CURRENT_RANGE 被用作默认preceding 和following 用于该窗口。 |
following | 可选的 | 定义窗口中包含并紧随当前行的行的窗口间隔。该间隔必须与前面的间隔(时间或行计数)以相同的单位指定。 目前,不支持具有当前行之后的行的窗口。相反,您可以指定两个常量之一: - CURRENT_ROW 将窗口的上限设置为当前行。- CURRENT_RANGE 将窗口的上限设置为当前行的排序键,即,与当前行具有相同排序键的所有行都包含在窗口中。 |
如果following 省略该子句,则时间间隔窗口CURRENT_RANGE 的上限定义为,行计数间隔窗口的上限定义为CURRENT_ROW 。 | ||
as | 需要 | 为上方窗口分配别名。别名用于引用以下select() 子句中的over窗口。 |
注意:**当前,select()
必须在相同的窗口内计算同一调用中的所有聚合函数。
Unbounded Over Windows
1 | // Unbounded Event-time over window (assuming an event-time attribute "rowtime") |
bounded Over Windows
1 | // Bounded Event-time over window (assuming an event-time attribute "rowtime") |
基于行的操作
基于行的操作生成具有多列的输出。
算子 | 描述 | ||
---|---|---|---|
Map Batch\Streaming | 使用用户定义的标量函数或内置标量函数执行映射操作。如果输出类型是复合类型,则输出将被展平。``` | ||
class MyMapFunction extends ScalarFunction { | |||
def eval(a: String): Row = { | |||
Row.of(a, “pre-“ + a) | |||
} | |||
override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = | |||
Types.ROW(Types.STRING, Types.STRING) | |||
} | |||
val func = new MyMapFunction() | |||
val table = input | |||
.map(func(‘c)).as(‘a, ‘b) | |||
| |||
class MyFlatMapFunction extends TableFunction[Row] { | |||
def eval(str: String): Unit = { | |||
if (str.contains(“#”)) { | |||
str.split(“#”).foreach({ s => | |||
val row = new Row(2) | |||
row.setField(0, s) | |||
row.setField(1, s.length) | |||
collect(row) | |||
}) | |||
} | |||
} | |||
override def getResultType: TypeInformation[Row] = { | |||
Types.ROW(Types.STRING, Types.INT) | |||
} | |||
} | |||
val func = new MyFlatMapFunction | |||
val table = input | |||
.flatMap(func(‘c)).as(‘a, ‘b) | |||
| |||
val myAggFunc = new MyMinMax | |||
val table = input | |||
.window(Tumble over 5.minutes on ‘rowtime as ‘w) // define window | |||
.groupBy(‘key, ‘w) // group by key and window | |||
.aggregate(myAggFunc(‘a) as (‘x, ‘y)) | |||
.select(‘key, ‘x, ‘y, ‘w.start, ‘w.end) // access window properties and aggregate results | |||
| |||
import java.lang.{Integer => JInteger} | |||
import org.apache.flink.table.api.Types | |||
import org.apache.flink.table.functions.TableAggregateFunction | |||
/** | |||
* Accumulator for top2. | |||
*/ | |||
class Top2Accum { | |||
var first: JInteger = _ | |||
var second: JInteger = _ | |||
} | |||
/** | |||
* The top2 user-defined table aggregate function. | |||
*/ | |||
class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum] { | |||
override def createAccumulator(): Top2Accum = { | |||
val acc = new Top2Accum | |||
acc.first = Int.MinValue | |||
acc.second = Int.MinValue | |||
acc | |||
} | |||
def accumulate(acc: Top2Accum, v: Int) { | |||
if (v > acc.first) { | |||
acc.second = acc.first | |||
acc.first = v | |||
} else if (v > acc.second) { | |||
acc.second = v | |||
} | |||
} | |||
def merge(acc: Top2Accum, its: JIterable[Top2Accum]): Unit = { | |||
val iter = its.iterator() | |||
while (iter.hasNext) { | |||
val top2 = iter.next() | |||
accumulate(acc, top2.first) | |||
accumulate(acc, top2.second) | |||
} | |||
} | |||
def emitValue(acc: Top2Accum, out: Collector[JTuple2[JInteger, JInteger]]): Unit = { | |||
// emit the value and rank | |||
if (acc.first != Int.MinValue) { | |||
out.collect(JTuple2.of(acc.first, 1)) | |||
} | |||
if (acc.second != Int.MinValue) { | |||
out.collect(JTuple2.of(acc.second, 2)) | |||
} | |||
} | |||
} | |||
val top2 = new Top2 | |||
val orders: Table = tableEnv.from(“Orders”) | |||
val result = orders | |||
.groupBy(‘key) | |||
.flatAggregate(top2(‘a) as (‘v, ‘rank)) | |||
.select(‘key, ‘v, ‘rank) | |||
| |||
val top2 = new Top2 | |||
val orders: Table = tableEnv.from(“Orders”) | |||
val result = orders | |||
.window(Tumble over 5.minutes on ‘rowtime as ‘w) // define window | |||
.groupBy(‘a, ‘w) // group by key and window | |||
.flatAggregate(top2(‘b) as (‘v, ‘rank)) | |||
.select(‘a, w.start, ‘w.end, ‘w.rowtime, ‘v, ‘rank) // access window properties and aggregate results | |||
|
内置函数
我们在讲解 Flink Table & SQL 所支持的常用算子前,需要说明一点,Flink 自从 0.9 版本开始支持 Table & SQL 功能一直处于完善开发中,且在不断进行迭代。
我们在官网中也可以看到这样的提示:
Please note that the Table API and SQL are not yet feature complete and are being actively developed. Not all operations are supported by every combination of [Table API, SQL] and [stream, batch] input.
Flink Table & SQL 的开发一直在进行中,并没有支持所有场景下的计算逻辑。从我个人实践角度来讲,在使用原生的 Flink Table & SQL 时,务必查询官网当前版本对 Table & SQL 的支持程度,尽量选择场景明确,逻辑不是极其复杂的场景。
常用算子
目前 Flink SQL 支持的语法主要如下:
复制
1 | query: |
可以看到 Flink SQL 和传统的 SQL 一样,支持了包含查询、连接、聚合等场景,另外还支持了包括窗口、排序等场景。下面我就以最常用的算子来做详细的讲解。
SELECT/AS/WHERE**
SELECT、WHERE 和传统 SQL 用法一样,用于筛选和过滤数据,同时适用于 DataStream 和 DataSet。
1 | SELECT * FROM Table; |
当然我们也可以在 WHERE 条件中使用 =、<、>、<>、>=、<=,以及 AND、OR 等表达式的组合:
1 | SELECT name,age FROM Table where name LIKE '%小明%'; |
GROUP BY / DISTINCT/HAVING
GROUP BY 用于进行分组操作,DISTINCT 用于结果去重。
HAVING 和传统 SQL 一样,可以用来在聚合函数之后进行筛选。
1 | SELECT DISTINCT name FROM Table; |
JOIN
JOIN 可以用于把来自两个表的数据联合起来形成结果表,目前 Flink 的 Join 只支持等值连接。Flink 支持的 JOIN 类型包括:
1 | JOIN - INNER JOIN |
例如,用用户表和商品表进行关联:
1 | SELECT * |
LEFT JOIN、RIGHT JOIN 、FULL JOIN 相与我们传统 SQL 中含义一样。
WINDOW**
根据窗口数据划分的不同,目前 Apache Flink 有如下 3 种:
- 滚动窗口,窗口数据有固定的大小,窗口中的数据不会叠加;
- 滑动窗口,窗口数据有固定大小,并且有生成间隔;
- 会话窗口,窗口数据没有固定的大小,根据用户传入的参数进行划分,窗口数据无叠加;
滚动窗口
滚动窗口的特点是:有固定大小、窗口中的数据不会重叠,如下图所示:
滚动窗口的语法:
复制
1 | SELECT |
举例说明,我们需要计算每个用户每天的订单数量:
1 | SELECT user, TUMBLE_START(timeLine, INTERVAL '1' DAY) as winStart, SUM(amount) FROM Orders GROUP BY TUMBLE(timeLine, INTERVAL '1' DAY), user; |
其中,TUMBLE_START 和 TUMBLE_END 代表窗口的开始时间和窗口的结束时间,TUMBLE (timeLine, INTERVAL ‘1’ DAY) 中的 timeLine 代表时间字段所在的列,INTERVAL ‘1’ DAY 表示时间间隔为一天。
滑动窗口**
滑动窗口有固定的大小,与滚动窗口不同的是滑动窗口可以通过 slide 参数控制滑动窗口的创建频率。需要注意的是,多个滑动窗口可能会发生数据重叠,具体语义如下:
滑动窗口的语法与滚动窗口相比,只多了一个 slide 参数:
1 | SELECT |
例如,我们要每间隔一小时计算一次过去 24 小时内每个商品的销量:
1 | SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product |
上述案例中的 INTERVAL ‘1’ HOUR 代表滑动窗口生成的时间间隔。
会话窗口**
会话窗口定义了一个非活动时间,假如在指定的时间间隔内没有出现事件或消息,则会话窗口关闭。
会话窗口的语法如下:
1 | SELECT |
举例,我们需要计算每个用户过去 1 小时内的订单量:
1 | SELECT user, SESSION_START(rowtime, INTERVAL '1' HOUR) AS sStart, SESSION_ROWTIME(rowtime, INTERVAL '1' HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(rowtime, INTERVAL '1' HOUR), user |
内置函数
Flink 中还有大量的内置函数,我们可以直接使用,将内置函数分类如下:
- 比较函数
- 逻辑函数
- 算术函数
- 字符串处理函数
- 时间函数
比较函数
逻辑函数
算术函数
字符串处理函数
时间函数