Flink从源码看状态定时清理

状态是Flink中的一个重要特征,不论是存储在内存中,还是在rockdb中,但是我们有时候需要对状态进行清查,不需要保存太久的状态,否则这个状态会太大,这个时候就会用到Flink的清除。

为什么需要清理状态

其一是Flink中的状态是有时效性的,也就是在一定的实际内,是有效的,一旦过了某个时间点,它就没有什么价值了,其二是需要控制flink的状态大小,能够有效的管理不断增长的state的规模大小。


从flink的1.6中,就引入了关于状态时效性方面的特性,在1.8中,引入了基于TTL的对过去state的清理,让我们可以通过程序的 方式,对state进行清理,否则还的依赖其他额外的操作,来对state清理,这样会容易导致出错,并且不容易控制。


Apache Flink的1.6.0版本引入了State TTL功能。它使流处理应用程序的开发人员配置过期时间,并在定义时间超时(Time to Live)之后进行清理。在Flink 1.8.0中,该功能得到了扩展,包括对RocksDB和堆状态后端(FSStateBackend和MemoryStateBackend)的历史数据进行持续清理,从而实现旧条目的连续清理过程(根据TTL设置)。


Flink程序中的状态,是通过状态描述符来定义的,所以我们定义ttl,是通过flink程序中的StateTtlConfiguration对象,传递给状态描述符,来实现状态的清理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class StateDemo {
public static void main(String[] args) throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 10L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
.keyBy(0)
.flatMap(new MyFlatMapFunction())
.print();

// the printed output will be (1,4) and (1,5)
env.execute();
}
}

class MyFlatMapFunction extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

private static final long serialVersionUID = 1808329479322205953L;
/**
* The ValueState handle. The first field is the count, the second field a running sum.
*/
private transient ValueState<Tuple2<Long, Long>> sum;

// 状态过期清除
// flink 的状态清理是惰性策略,也就是我们访问的状态,可能已经过期了,但是还没有删除状态数据,我们可以配置
// 是否返回过期状态的数据,不论是否返回过期数据,数据被访问后会立即清除过期状态。并且截止1.8.0 的版本
// 状态的清除针对的是process time ,还不支持event time,可能在后期的版本中会支持。

// flink的内部,状态ttl 功能是通过上次相关状态访问的附加时间戳和实际状态值来实现的,这样的方案会增加存储
// 上的开销,但是会允许flink程序在查询数据,cp的时候访问数据的过期状态
StateTtlConfig ttlConfig =
StateTtlConfig.newBuilder(Time.days(1)) //它是生存时间值
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
//状态可见性配置是否在读取访问时返回过期值
// .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupFullSnapshot() // 在快照的时候进行删除
.build();


@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

// access the state value
Tuple2<Long, Long> currentSum = sum.value();

// update the count
currentSum.f0 += 1;

// add the second field of the input value
currentSum.f1 += input.f1;

// update the state
sum.update(currentSum);

// if the count reaches 2, emit the average and clear the state
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}

@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {
}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set

//设置stage过期时间
descriptor.enableTimeToLive(ttlConfig);
sum = getRuntimeContext().getState(descriptor);
}
}

上面是定义了StateTtlConfig ,用于描述状态清除的配置信息,这个类是状态清除的核心配置类。Flink提供了多个选项来配置TTL功能的行为。

状态的时间什么被修改?

默认情况下,是当我们的数据状态修改会更新数据的ttl时间,当然我们也可以在读取数据时候对它进行更新,这样做会出现额外写入操作来更新时间戳操作。

过去的状态数据是否可以访问?

state ttl采用惰性策略来清理过期状态。这可能导致我们的应用程序会去尝试读取已过期但处于尚未删除状态的数据。我们可以观察此类读取请求是否返回了过期状态。无论哪种情况,数据被访问后会立即清除过期状态

注意**
使用Flink 1.8.0,用户只能根据处理时间(Processing Time)定义状态TTL。未来的Apache Flink版本中计划支持事件时间(Event Time)
Flink内部,状态TTL功能是通过存储上次相关状态访问的附加时间戳以及实际状态值来实现的。虽然这种方法增加了一些存储开销,但它允许Flink程序在查询数据、checkpointing,数据恢复的时候访问数据的过期状态。

如何避免读取过期数据?

在读取操作中访问状态对象时,Flink将检查其时间戳并清除状态是否已过期(取决于配置的状态可见性,是否返回过期状态)。由于这种延迟删除的特性,永远不会再次访问的过期状态数据将永远占用存储空间,除非被垃圾回收。


那么如何在没有应用程序逻辑明确的处理它的情况下删除过期的状态呢?通常,我们可以配置不同的策略进行后台删除。

完整快照自动删除过期状态

当获取检查点或保存点的完整快照时,Flink 1.6.0已经支持自动删除过期状态。大家注意,过期状态删除不适用于增量检查点。必须明确启用完全快照的状态删除

1
2
3
4
5
6
7
8
9
10
11
StateTtlConfig ttlConfig =
StateTtlConfig.newBuilder(Time.days(1)) //它是生存时间值
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.cleanupFullSnapshot() // 在快照的时候进行删除
.build();
/** Cleanup expired state in full snapshot on checkpoint. */
@Nonnull
public Builder cleanupFullSnapshot() {
cleanupStrategies.activate(CleanupStrategies.Strategies.FULL_STATE_SCAN_SNAPSHOT);
return this;
}


堆状态后端的增量清理**
此方法特定于堆状态后端(FSStateBackend和MemoryStateBackend)。它的实现方法是存储后端在所有状态条目上维护一个惰性全局迭代器。某些事件(例如状态访问)会触发增量清理。每次触发增量清理时,迭代器都会向前迭代删除已遍历的过期数据。以下代码示例演示如何启用增量清理

1
2
3
4
5
StateTtlConfig ttlConfig =
StateTtlConfig.newBuilder(Time.days(1)) //它是生存时间值
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.cleanupIncrementally(15,false)//
.build();


如果启用,则每次进行状态访问都会触发清理步骤。对于每个清理步骤,都会检查一定数量的数据是否过期。
参数说明:
第一个参数是检查每个清理步骤的状态条目数。
第二个参数是一个标志,用于数据处理后触发清理步骤,此外对于每次状态访问同样有效。

注意**
第一个是增量清理所花费的时间增加了数据处理延迟。
第二个应该可以忽略不计,但仍然值得一提:如果没有状态访问或没有数据处理记录,则不会删除过期状态

RocksDB后台压缩可以过滤掉过期状态

如果你的Flink应用程序使用RocksDB作为状态后端存储,则可以启用另一个基于Flink特定压缩过滤器的清理策略。RocksDB定期运行异步压缩以合并状态更新并减少存储。Flink压缩过滤器使用TTL检查状态条目的到期时间戳,并丢弃所有过期值。


激活此功能的第一步是通过设置以下Flink配置选项来配置RocksDB状态后端

1
state.backend.rocksdb.ttl.compaction.filter.enabled

配置RocksDB状态后端后,将为状态启用压缩清理策略,如以下代码示例所示

1
2
3
4
5
StateTtlConfig ttlConfig =
StateTtlConfig.newBuilder(Time.days(1)) //它是生存时间值
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.cleanupInRocksdbCompactFilter() // 基于rocksdb的定期压缩合并进行清理
.build();


使用定时器删除(Timers)
手动清除状态的另一种方法是基于Flink定时器。这是社区目前正在评估未来版本的想法。通过这种方法,为每个状态访问注册清理定时器。这种方法更容易预测,因为状态一旦到期就会被删除。但是,这种方法代价很大,因为定时器消耗存储资源,并且会频繁读取状态信息。

StateTtlConfig
这个类用于配置state TTL 的逻辑
UpdateType 这个枚举类表示此选项值配置何时更新上次访问时间戳记,以延长状态TTL

1
2
3
4
5
public enum UpdateType {
Disabled,//禁用过期,状态不过期
OnCreateAndWrite,// 创建并且写时候
OnReadAndWrite// 读然后写
}


StateVisibility 这个枚举类表示是否可以返回过期的用户值

1
2
3
4
public enum StateVisibility {
ReturnExpiredIfNotCleanedUp,// 如果还没有清除,那么返回
NeverReturnExpired// 不返回过期的值
}


CleanupStrategies 这个抽象类是定义TTL 清除策略的类,里面有个枚举类Strategies ,用于定义清除策略

1
2
3
4
5
enum Strategies {
FULL_STATE_SCAN_SNAPSHOT,// 快照扫描时候
INCREMENTAL_CLEANUP,//增量清理
ROCKSDB_COMPACTION_FILTER// rocksdb 压缩过滤
}


CleanupStrategy接口
这是一个空接口,用于定义各个清除策略的实现类,有三个实现类,分别是:
EmptyCleanupStrategy:这个类没有任何内容,是个空类
IncrementalCleanupStrategy:增量清理策略的配置实现类
RocksdbCompactFilterCleanupStrategy:基于rocksdb 压缩过滤策略的配置实现类


StateTtlConfig 的核心属性:

1
2
3
4
5
private UpdateType updateType = OnCreateAndWrite;
private StateVisibility stateVisibility = NeverReturnExpired;
private TimeCharacteristic timeCharacteristic = ProcessingTime;
private Time ttl;
private CleanupStrategies cleanupStrategies = new CleanupStrategies();

说明:
StateTtlConfig 这个类主要有上面5个属性
updateType :代表什么时候更新上次的时间戳
stateVisibility :表示状态过期后是否给用户返回相关值
timeCharacteristic :表示时间特征,现在只支持process time上的状态ttl定义
ttl:表示时间频率
cleanupStrategies :代表清除策略,有三种,EmptyCleanupStrategy,IncrementalCleanupStrategy和RocksdbCompactFilterCleanupStrategy