Flink Table API window操作

Table API是用于流和批处理的统一的关系API。Table API查询可以直接在批处理或流输入上运行而无需修改。Table API是SQL语言的超集,是专门为与Apache Flink配合使用而设计的。Table API是用于Scala和Java的语言集成的API。Table API查询不是将查询指定为SQL,而是以Java或Scala中的语言嵌入样式定义,并具有IDE支持,例如自动完成和语法验证。


Table API与Flink的SQL集成共享其API的许多概念和部分。看一下Common Concepts&API,了解如何注册表或创建Table对象。“ 流概念”页面讨论了流的特定概念,例如动态表和时间属性。


下面我们将进行Table API在流式处理中的操作,在实际的工作中,Flink也是大多数场景都是在流计算中使用。

Group Windows

“Group”窗口根据时间或行计数间隔将组行聚合为有限的组,并每组评估一次聚合函数。对于批处理表,窗口是按时间间隔对记录进行分组的便捷快捷方式。


Windows是使用该window(w: GroupWindow)子句定义的,并且需要使用该as子句指定的别名。为了按窗口对表进行分组,必须在groupBy(…)子句中像常规分组属性一样引用窗口别名。以下示例显示如何在表上定义窗口聚合。

1
2
3
4
val table = input
.window([w: GroupWindow] as 'w) // define window with alias w
.groupBy('w) // group the table by window w
.select('b.sum) // aggregate


在流环境中,如果窗口聚合除对窗口以外的一个或多个属性进行分组,则只能并行计算窗口聚合,即,groupBy(…)子句引用了窗口别名和至少一个其他属性。groupBy(…)子句仅引用一个窗口别名(如在上面的例子)只能由一个单一的,非平行的任务进行评估。以下示例显示如何使用其他分组属性定义窗口聚合。

1
2
3
4
val table = input
.window([w: GroupWindow] as 'w) // define window with alias w
.groupBy('w, 'a) // group the table by attribute a and window w
.select('a, 'b.sum) // aggregate


窗口属性诸如开始,结束,或一个时间窗口的rowtime时间戳可以在选择语句被添加为窗口别名作为的一个属性w.start,w.end以及w.rowtime分别。窗口开始和行时间时间戳是包含窗口的上下边界。相反,窗口结束时间戳是唯一的窗口上边界。例如,从下午2点开始的30分钟滚动窗口将具有14:00:00.000开始时间戳记,14:29:59.999行时间时间戳记和14:30:00.000结束时间戳记。

1
2
3
4
val table = input
.window([w: GroupWindow] as 'w) // define window with alias w
.groupBy('w, 'a) // group the table by attribute a and window w
.select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count) // aggregate and add window start, end, and rowtime timestamps


该Window参数定义行如何映射到窗口。Window不是用户可以实现的接口。相反,Table API提供了一组Window具有特定语义的预定义类,这些预定义类被转换为基础DataStream或DataSet操作。支持的窗口定义在下面列出。

Tumbling Windows

滚动窗口将行分配给固定长度的非重叠连续窗口。例如,5分钟的滚动窗口以5分钟为间隔对行进行分组。可以在事件时间,处理时间或行数上定义滚动窗口。


滚动窗口是使用以下Tumble类定义的:

方法描述
over将窗口的长度定义为时间或行计数间隔。
on用于分组(时间间隔)或排序(行计数)的时间属性。对于批查询,它可以是任何Long或Timestamp属性。对于流查询,它必须是声明的event-time或processing-time time属性
as为窗口分配别名。别名用于在以下groupBy()子句中引用窗口,并可以选择在子句中选择窗口属性,例如窗口开始,结束或行时间时间戳select()
1
2
3
4
5
6
// Tumbling Event-time Window
.window(Tumble over 10.minutes on 'rowtime as 'w)
// Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
.window(Tumble over 10.minutes on 'proctime as 'w)
// Tumbling Row-count Window (assuming a processing-time attribute "proctime")
.window(Tumble over 10.rows on 'proctime as 'w)

Sliding Windows


滑动窗口的大小固定,并以指定的滑动间隔滑动。如果滑动间隔小于窗口大小,则滑动窗口重叠。因此,可以将行分配给多个窗口。例如,一个15分钟大小的滑动窗口和5分钟的滑动间隔将每行分配给3个15分钟大小的不同窗口,这些窗口以5分钟的间隔进行评估。可以在事件时间,处理时间或行数上定义滑动窗口。


滑动窗口是通过使用以下Slide类来定义的:

方法描述
over将窗口的长度定义为时间或行计数间隔。
every将幻灯片间隔定义为时间间隔或行计数间隔。滑动间隔必须与尺寸间隔具有相同的类型。
on用于分组(时间间隔)或排序(行计数)的时间属性。对于批查询,它可以是任何Long或Timestamp属性。对于流查询,它必须是声明的event-time或processing-time time属性
as为窗口分配别名。别名用于在以下groupBy()子句中引用窗口,并可以选择在子句中选择窗口属性,例如窗口开始,结束或行时间时间戳select()
1
2
3
4
5
6
// Sliding Event-time Window
.window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w)
// Sliding Processing-time window (assuming a processing-time attribute "proctime")
.window(Slide over 10.minutes every 5.minutes on 'proctime as 'w)
// Sliding Row-count window (assuming a processing-time attribute "proctime")
.window(Slide over 10.rows every 5.rows on 'proctime as 'w)

Session Windows


会话窗口没有固定的大小,但是其边界由不活动的时间间隔定义,即,如果在定义的间隔时间段内未出现任何事件,则会话窗口关闭。例如,间隔30分钟的会话窗口在30分钟不活动后观察到一行时开始(否则该行将被添加到现有窗口),如果在30分钟内未添加任何行,则关闭该窗口。会话窗口可以在事件时间或处理时间工作。


通过使用以下Session类定义会话窗口:

方法描述
withGap将两个窗口之间的间隔定义为时间间隔。
on用于分组(时间间隔)或排序(行计数)的时间属性。对于批查询,它可以是任何Long或Timestamp属性。对于流查询,它必须是声明的event-time或processing-time time属性
as为窗口分配别名。别名用于在以下groupBy()子句中引用窗口,并可以选择在子句中选择窗口属性,例如窗口开始,结束或行时间时间戳select()
1
2
3
4
// Session Event-time Window
.window(Session withGap 10.minutes on 'rowtime as 'w)
// Session Processing-time Window (assuming a processing-time attribute "proctime")
.window(Session withGap 10.minutes on 'proctime as 'w)

Over Windows


窗口聚合是标准SQL(OVER子句)中已知的,并在SELECT查询的子句中定义。与在GROUP BY子句中指定的组窗口不同,在窗口上方不会折叠行。取而代之的是,在窗口聚合中,为每个输入行在其相邻行的范围内计算一个聚合。
窗口是使用window(w: OverWindow)子句(over_window(OverWindow)在Python API中使用)定义的,并通过select()方法中的别名进行引用。以下示例显示如何在表上定义窗口聚合

1
2
3
val table = input
.window([w: OverWindow] as 'w) // define over window with alias w
.select('a, 'b.sum over 'w, 'c.min over 'w) // aggregate over the over window w


OverWindow在限定范围内的数据对其进行聚合计算。OverWindow不是用户可以实现的接口。相反,Table API提供了Over用于配置上方窗口属性的类。可以在事件时间或处理时间以及指定为时间间隔或行计数的范围上定义窗口上方。受支持的over窗口定义作为Over(和其他类)上的方法公开,并在下面列出:

方法需要描述
partitionBy可选的在一个或多个属性上定义输入的分区。每个分区都经过单独排序,并且聚合函数分别应用于每个分区。
注意:在流环境中,仅当窗口包含partition by子句时,才可以并行计算整个窗口聚合。如果没有partitionBy(...)流,则由单个非并行任务处理。
orderBy需要定义每个分区内的行顺序,从而定义将聚合函数应用于行的顺序。
注意:对于流查询,它必须是声明的event-time或processing-time time属性。当前,仅支持单个sort属性。
preceding可选的定义窗口中包含的并在当前行之前的行的间隔。该间隔可以指定为时间间隔或行计数间隔。
用时间间隔的大小(例如,10.minutes时间间隔或10.rows行计数间隔)指定窗口边界
窗口上的无边界是使用常量指定的,即UNBOUNDED_RANGE用于时间间隔或UNBOUNDED_ROW用于行计数间隔。Windows上的无边界从分区的第一行开始。
如果preceding条款被省略,UNBOUNDED_RANGE并且CURRENT_RANGE被用作默认precedingfollowing用于该窗口。
following可选的定义窗口中包含并紧随当前行的行的窗口间隔。该间隔必须与前面的间隔(时间或行计数)以相同的单位指定。
目前,不支持具有当前行之后的行的窗口。相反,您可以指定两个常量之一:
- CURRENT_ROW 将窗口的上限设置为当前行。
- CURRENT_RANGE 将窗口的上限设置为当前行的排序键,即,与当前行具有相同排序键的所有行都包含在窗口中。
如果following省略该子句,则时间间隔窗口CURRENT_RANGE的上限定义为,行计数间隔窗口的上限定义为CURRENT_ROW
as需要为上方窗口分配别名。别名用于引用以下select()子句中的over窗口。


注意:**当前,select()必须在相同的窗口内计算同一调用中的所有聚合函数。

Unbounded Over Windows

1
2
3
4
5
6
7
8
9
// Unbounded Event-time over window (assuming an event-time attribute "rowtime")
.window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)
// Unbounded Processing-time over window (assuming a processing-time attribute "proctime")
.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)
// Unbounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
.window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)

// Unbounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)

bounded Over Windows

1
2
3
4
5
6
7
8
9
// Bounded Event-time over window (assuming an event-time attribute "rowtime")
.window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w)
// Bounded Processing-time over window (assuming a processing-time attribute "proctime")
.window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w)
// Bounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
.window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w)

// Bounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
.window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w)

基于行的操作

基于行的操作生成具有多列的输出。

算子描述
Map
Batch\Streaming
使用用户定义的标量函数或内置标量函数执行映射操作。如果输出类型是复合类型,则输出将被展平。```
class MyMapFunction extends ScalarFunction {
def eval(a: String): Row = {
Row.of(a, “pre-“ + a)
}
override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
Types.ROW(Types.STRING, Types.STRING)
}
val func = new MyMapFunction()
val table = input
.map(func(‘c)).as(‘a, ‘b)
1
2
 |
| **FlatMap**<br />Batch\Streaming | 使用表函数执行flatMap操作。
class MyFlatMapFunction extends TableFunction[Row] {
def eval(str: String): Unit = {
if (str.contains(“#”)) {
str.split(“#”).foreach({ s =>
val row = new Row(2)
row.setField(0, s)
row.setField(1, s.length)
collect(row)
})
}
}
override def getResultType: TypeInformation[Row] = {
Types.ROW(Types.STRING, Types.INT)
}
}
val func = new MyFlatMapFunction
val table = input
.flatMap(func(‘c)).as(‘a, ‘b)
1
2
3
 |
| **Aggregate**<br />Batch\Streaming<br />**Result Updating** | 使用聚合函数执行聚合操作。您必须使用select语句关闭“聚合”,并且select语句不支持聚合功能。如果输出类型是复合类型,则聚合的输出将被展平。<br />case class MyMinMaxAcc(var min: Int, var max: Int)<br />class MyMinMax extends AggregateFunction[Row, MyMinMaxAcc] {<br />  def accumulate(acc: MyMinMaxAcc, value: Int): Unit = {<br />    if (value < acc.min) {<br />      acc.min = value<br />    }<br />    if (value > acc.max) {<br />      acc.max = value<br />    }<br />  }<br />  override def createAccumulator(): MyMinMaxAcc = MyMinMaxAcc(0, 0)<br />  <br />  def resetAccumulator(acc: MyMinMaxAcc): Unit = {<br />    acc.min = 0<br />    acc.max = 0<br />  }<br />  override def getValue(acc: MyMinMaxAcc): Row = {<br />    Row.of(Integer.valueOf(acc.min), Integer.valueOf(acc.max))<br />  }<br />  override def getResultType: TypeInformation[Row] = {<br />    new RowTypeInfo(Types.INT, Types.INT)<br />  }<br />}<br />val myAggFunc = new MyMinMax<br />val table = input<br />  .groupBy('key)<br />  .aggregate(myAggFunc('a) as ('x, 'y))<br />  .select('key, 'x, 'y) |
| **组窗口聚合**<br />Batch\Streaming | 在[组窗口](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/tableApi.html#group-windows)和可能的一个或多个分组键上对表进行分组和聚集。您必须使用select语句关闭“聚合”。并且select语句不支持“ *”或聚合函数。
val myAggFunc = new MyMinMax
val table = input
.window(Tumble over 5.minutes on ‘rowtime as ‘w) // define window
.groupBy(‘key, ‘w) // group by key and window
.aggregate(myAggFunc(‘a) as (‘x, ‘y))
.select(‘key, ‘x, ‘y, ‘w.start, ‘w.end) // access window properties and aggregate results
1
2
3
4
 |
| **FlatAggregate**

Streaming<br />**Result Updating** | 类似于**GroupBy聚合**。使用以下运行表聚合运算符将分组键上的行分组,以逐行聚合行。与AggregateFunction的区别在于TableAggregateFunction可以为一个组返回0个或更多记录。您必须使用select语句关闭“ flatAggregate”。并且select语句不支持聚合函数。<br />除了使用`emitValue`输出结果,还可以使用`emitUpdateWithRetract`方法。与不同`emitValue`,`emitUpdateWithRetract`用于发出已更新的值。此方法以缩回模式增量输出数据,即,一旦有更新,我们必须先缩回旧记录,然后再发送新的更新记录。如果在表聚合函数中定义了这两种方法,则该`emitUpdateWithRetract`方法将优先于该`emitValue`方法使用,因为这被认为比该方法更有效,`emitValue`因为它可以递增地输出值。有关详细信息,请[参见表聚合函数](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/functions/udfs.html#table-aggregation-functions)。
import java.lang.{Integer => JInteger}
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.TableAggregateFunction
/**
* Accumulator for top2.
*/
class Top2Accum {
var first: JInteger = _
var second: JInteger = _
}
/**
* The top2 user-defined table aggregate function.
*/
class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum] {
override def createAccumulator(): Top2Accum = {
val acc = new Top2Accum
acc.first = Int.MinValue
acc.second = Int.MinValue
acc
}
def accumulate(acc: Top2Accum, v: Int) {
if (v > acc.first) {
acc.second = acc.first
acc.first = v
} else if (v > acc.second) {
acc.second = v
}
}
def merge(acc: Top2Accum, its: JIterable[Top2Accum]): Unit = {
val iter = its.iterator()
while (iter.hasNext) {
val top2 = iter.next()
accumulate(acc, top2.first)
accumulate(acc, top2.second)
}
}
def emitValue(acc: Top2Accum, out: Collector[JTuple2[JInteger, JInteger]]): Unit = {
// emit the value and rank
if (acc.first != Int.MinValue) {
out.collect(JTuple2.of(acc.first, 1))
}
if (acc.second != Int.MinValue) {
out.collect(JTuple2.of(acc.second, 2))
}
}
}
val top2 = new Top2
val orders: Table = tableEnv.from(“Orders”)
val result = orders
.groupBy(‘key)
.flatAggregate(top2(‘a) as (‘v, ‘rank))
.select(‘key, ‘v, ‘rank)
1
2
**注意:**对于流式查询,根据聚合类型和不同分组关键字的数量,计算查询结果所需的状态可能会无限增长。请提供具有有效保留间隔的查询配置,以防止出现过多的状态。有关详细信息,请参见[查询配置](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html)。 |
| **Group Window FlatAggregate**<br />**Streaming** | 在[组窗口](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/tableApi.html#group-windows)和可能的一个或多个分组键上对表进行分组和聚集。您必须使用select语句关闭“ flatAggregate”。并且select语句不支持聚合函数。
val top2 = new Top2
val orders: Table = tableEnv.from(“Orders”)
val result = orders
.window(Tumble over 5.minutes on ‘rowtime as ‘w) // define window
.groupBy(‘a, ‘w) // group by key and window
.flatAggregate(top2(‘b) as (‘v, ‘rank))
.select(‘a, w.start, ‘w.end, ‘w.rowtime, ‘v, ‘rank) // access window properties and aggregate results
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
 |


<br />
<br />下面是根据上面所讲到的操作,所写的小实例,每五秒统计一次每个用户目前的游戏得分情况
```scala
package tableAPI

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{EnvironmentSettings, Slide, Table, Tumble}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.types.Row
import org.apache.flink.streaming.api.scala._


object table_stream_window {
case class GameData(user_id: String, game_id: String, game_time: Long, game_score: Int)
def main(args: Array[String]): Unit = {
val stream_env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
stream_env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val table_env: StreamTableEnvironment = StreamTableEnvironment.create(stream_env, settings)
stream_env.setParallelism(1)
val socketStream: DataStream[String] = stream_env.socketTextStream("localhost", 8888)

val gameStream: DataStream[GameData] = socketStream.map((line: String) => {
val array_data: Array[String] = line.split(",")
GameData(array_data(0), array_data(1), array_data(2).toLong, array_data(3).toInt)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[GameData](Time.seconds(3)){
override def extractTimestamp(element: GameData): Long = {
element.game_time
}
})

//分组聚合操作
import org.apache.flink.table.api.scala._


//创建动态Table,并且指定event time
val game_table: Table = table_env.fromDataStream(gameStream,'user_id,'game_id,'game_time.rowtime,'game_score)


//开启窗口,滑动窗口
// game_table.window(Slide.over("10.second").every("5.second").on("game_time").as("window"))

//开启窗口,滚动窗口
// val game_score_sum: Table = game_table.window(Tumble.over("5.second").on("game_time").as("window"))
val game_score_sum: Table = game_table.window(Tumble over 5.second on 'game_time as 'window)
.groupBy('window, 'user_id) //必须指定窗口字段
.select('user_id, 'game_score.sum, 'window.start, 'window.end)

table_env
.toRetractStream[Row](game_score_sum)
.filter((_: (Boolean, Row))._1==true)
.print("user_sum_score")

table_env.execute("user_sum_score——job")
}
}

内置函数

我们在讲解 Flink Table & SQL 所支持的常用算子前,需要说明一点,Flink 自从 0.9 版本开始支持 Table & SQL 功能一直处于完善开发中,且在不断进行迭代。
我们在官网中也可以看到这样的提示:

Please note that the Table API and SQL are not yet feature complete and are being actively developed. Not all operations are supported by every combination of [Table API, SQL] and [stream, batch] input.

Flink Table & SQL 的开发一直在进行中,并没有支持所有场景下的计算逻辑。从我个人实践角度来讲,在使用原生的 Flink Table & SQL 时,务必查询官网当前版本对 Table & SQL 的支持程度,尽量选择场景明确,逻辑不是极其复杂的场景。

常用算子

目前 Flink SQL 支持的语法主要如下:
复制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
query:
values
| {
select
| selectWithoutFrom
| query UNION [ ALL ] query
| query EXCEPT query
| query INTERSECT query
}
[ ORDER BY orderItem [, orderItem ]* ]
[ LIMIT { count | ALL } ]
[ OFFSET start { ROW | ROWS } ]
[ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
orderItem:
expression [ ASC | DESC ]
select:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
FROM tableExpression
[ WHERE booleanExpression ]
[ GROUP BY { groupItem [, groupItem ]* } ]
[ HAVING booleanExpression ]
[ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
selectWithoutFrom:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
projectItem:
expression [ [ AS ] columnAlias ]
| tableAlias . *
tableExpression:
tableReference [, tableReference ]*
| tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
tableReference:
tablePrimary
[ matchRecognize ]
[ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
tablePrimary:
[ TABLE ] [ [ catalogName . ] schemaName . ] tableName
| LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
| UNNEST '(' expression ')'
values:
VALUES expression [, expression ]*
groupItem:
expression
| '(' ')'
| '(' expression [, expression ]* ')'
| CUBE '(' expression [, expression ]* ')'
| ROLLUP '(' expression [, expression ]* ')'
| GROUPING SETS '(' groupItem [, groupItem ]* ')'
windowRef:
windowName
| windowSpec
windowSpec:
[ windowName ]
'('
[ ORDER BY orderItem [, orderItem ]* ]
[ PARTITION BY expression [, expression ]* ]
[
RANGE numericOrIntervalExpression {PRECEDING}
| ROWS numericExpression {PRECEDING}
]
')'
...

可以看到 Flink SQL 和传统的 SQL 一样,支持了包含查询、连接、聚合等场景,另外还支持了包括窗口、排序等场景。下面我就以最常用的算子来做详细的讲解。

SELECT/AS/WHERE**
SELECT、WHERE 和传统 SQL 用法一样,用于筛选和过滤数据,同时适用于 DataStream 和 DataSet。

1
2
SELECT * FROM Table;
SELECT name,age FROM Table;

当然我们也可以在 WHERE 条件中使用 =、<、>、<>、>=、<=,以及 AND、OR 等表达式的组合:

1
2
3
4
5
SELECT name,age FROM Table where name LIKE '%小明%';
SELECT * FROM Table WHERE age = 20;
SELECT name, age
FROM Table
WHERE name IN (SELECT name FROM Table2)

GROUP BY / DISTINCT/HAVING
GROUP BY 用于进行分组操作,DISTINCT 用于结果去重。
HAVING 和传统 SQL 一样,可以用来在聚合函数之后进行筛选。

1
2
3
4
SELECT DISTINCT name FROM Table;
SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name;
SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name HAVING
SUM(score) > 300;

JOIN
JOIN 可以用于把来自两个表的数据联合起来形成结果表,目前 Flink 的 Join 只支持等值连接。Flink 支持的 JOIN 类型包括:

1
2
3
4
JOIN - INNER JOIN
LEFT JOIN - LEFT OUTER JOIN
RIGHT JOIN - RIGHT OUTER JOIN
FULL JOIN - FULL OUTER JOIN

例如,用用户表和商品表进行关联:

1
2
3
4
5
6
SELECT *
FROM User LEFT JOIN Product ON User.name = Product.buyer
SELECT *
FROM User RIGHT JOIN Product ON User.name = Product.buyer
SELECT *
FROM User FULL OUTER JOIN Product ON User.name = Product.buyer

LEFT JOIN、RIGHT JOIN 、FULL JOIN 相与我们传统 SQL 中含义一样。

WINDOW**
根据窗口数据划分的不同,目前 Apache Flink 有如下 3 种:

  • 滚动窗口,窗口数据有固定的大小,窗口中的数据不会叠加;
  • 滑动窗口,窗口数据有固定大小,并且有生成间隔;
  • 会话窗口,窗口数据没有固定的大小,根据用户传入的参数进行划分,窗口数据无叠加;

滚动窗口
滚动窗口的特点是:有固定大小、窗口中的数据不会重叠,如下图所示:

滚动窗口的语法:
复制

1
2
3
4
5
6
7
8
9
SELECT 
[gk],
[TUMBLE_START(timeCol, size)],
[TUMBLE_END(timeCol, size)],
agg1(col1),
...
aggn(colN)
FROM Tab1
GROUP BY [gk], TUMBLE(timeCol, size)

举例说明,我们需要计算每个用户每天的订单数量:

1
SELECT user, TUMBLE_START(timeLine, INTERVAL '1' DAY) as winStart, SUM(amount) FROM Orders GROUP BY TUMBLE(timeLine, INTERVAL '1' DAY), user;

其中,TUMBLE_START 和 TUMBLE_END 代表窗口的开始时间和窗口的结束时间,TUMBLE (timeLine, INTERVAL ‘1’ DAY) 中的 timeLine 代表时间字段所在的列,INTERVAL ‘1’ DAY 表示时间间隔为一天。

滑动窗口**
滑动窗口有固定的大小,与滚动窗口不同的是滑动窗口可以通过 slide 参数控制滑动窗口的创建频率。需要注意的是,多个滑动窗口可能会发生数据重叠,具体语义如下:

滑动窗口的语法与滚动窗口相比,只多了一个 slide 参数:

1
2
3
4
5
6
7
8
9
SELECT 
[gk],
[HOP_START(timeCol, slide, size)] ,
[HOP_END(timeCol, slide, size)],
agg1(col1),
...
aggN(colN)
FROM Tab1
GROUP BY [gk], HOP(timeCol, slide, size)

例如,我们要每间隔一小时计算一次过去 24 小时内每个商品的销量:

1
SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product

上述案例中的 INTERVAL ‘1’ HOUR 代表滑动窗口生成的时间间隔。

会话窗口**
会话窗口定义了一个非活动时间,假如在指定的时间间隔内没有出现事件或消息,则会话窗口关闭。

会话窗口的语法如下:

1
2
3
4
5
6
7
8
9
SELECT 
[gk],
SESSION_START(timeCol, gap) AS winStart,
SESSION_END(timeCol, gap) AS winEnd,
agg1(col1),
...
aggn(colN)
FROM Tab1
GROUP BY [gk], SESSION(timeCol, gap)

举例,我们需要计算每个用户过去 1 小时内的订单量:

1
SELECT user, SESSION_START(rowtime, INTERVAL '1' HOUR) AS sStart, SESSION_ROWTIME(rowtime, INTERVAL '1' HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(rowtime, INTERVAL '1' HOUR), user

内置函数

Flink 中还有大量的内置函数,我们可以直接使用,将内置函数分类如下:

  • 比较函数
  • 逻辑函数
  • 算术函数
  • 字符串处理函数
  • 时间函数

比较函数

逻辑函数

算术函数

字符串处理函数

时间函数