Flink 数据倾斜

无论是对于 Flink、Spark 这样的实时计算框架还是 Hive 等离线计算框架,数据量从来都不是问题,真正引起问题导致严重后果的是数据倾斜。所谓数据倾斜,是指在大规模并行处理的数据中,其中某个运行节点处理的数据远远超过其他部分,这会导致该节点压力极大,最终出现运行失败从而导致整个任务的失败。


我们在这一课时中将分析出现数据倾斜的原因,Flink 任务中最容易出现数据倾斜的几个算子并且给出解决方案。

数据倾斜背景和危害

数据倾斜产生的原因和危害和解决方案有哪些呢?我们一一来看。

数据倾斜原理

目前我们所知道的大数据处理框架,比如 Flink、Spark、Hadoop 等之所以能处理高达千亿的数据,是因为这些框架都利用了分布式计算的思想,集群中多个计算节点并行,使得数据处理能力能得到线性扩展。


在实际生产中 Flink 都是以集群的形式在运行,在运行的过程中包含了两类进程。其中 TaskManager 实际负责执行计算的 Worker,在其上执行 Flink Job 的一组 Task,Task 则是我们执行具体代码逻辑的容器。理论上只要我们的任务 Task 足够多就可以对足够大的数据量进行处理。


但是实际上大数据量经常出现,一个 Flink 作业包含 200 个 Task 节点,其中有 199 个节点可以在很短的时间内完成计算。但是有一个节点执行时间远超其他结果,并且随着数据量的持续增加,导致该计算节点挂掉,从而整个任务失败重启。我们可以在 Flink 的管理界面中看到任务的某一个 Task 数据量远超其他节点。

数据倾斜原因和解决方案

Flink 任务出现数据倾斜的直观表现是任务节点频繁出现反压,但是增加并行度后并不能解决问题;部分节点出现 OOM 异常,是因为大量的数据集中在某个节点上,导致该节点内存被爆,任务失败重启。


产生数据倾斜的原因主要有 2 个方面:

  • 业务上有严重的数据热点,比如滴滴打车的订单数据中北京、上海等几个城市的订单量远远超过其他地区;
  • 技术上大量使用了 KeyBy、GroupBy 等操作,错误的使用了分组 Key,人为产生数据热点。


因此解决问题的思路也很清晰:

  • 业务上要尽量避免热点 key 的设计,例如我们可以把北京、上海等热点城市分成不同的区域,并进行单独处理;
  • 技术上出现热点时,要调整方案打散原来的 key,避免直接聚合;此外 Flink 还提供了大量的功能可以避免数据倾斜。


那么我们就从典型的场景入手,看看在 Flink 任务中出现数据倾斜的主要场景和解决方案。

两阶段聚合解决 KeyBy 热点

KeyBy 是我们经常使用的分组聚合函数之一。在实际的业务中经常会碰到这样的场景:双十一按照下单用户所在的省聚合求订单量最高的前 10 个省,或者按照用户的手机类型聚合求访问量最高的设备类型等。


上述场景在我们进行 KeyBy 时就会出现严重的数据倾斜,如下图所示:
image.png




如果我们直接简单地使用 KeyBy 算子,模拟一个简单的统计 PV 的场景如下:

1
2
3
4
5
6
DataStream sourceStream = ...;
windowedStream = sourceStream.keyBy("type")
.window(TumblingEventTimeWindows.of(Time.minutes(1)));
windowedStream.process(new MyPVFunction())
.addSink(new MySink())...
env.execute()...

我们在根据 type 进行 KeyBy 时,如果数据的 type 分布不均匀就会导致大量的数据分配到一个 task 中去,发生数据倾斜。


那么我们的解决思路是:

  • 首先把分组的 key 打散,比如加随机后缀;
  • 对打散后的数据进行聚合;
  • 把打散的 key 还原为真正的 key;
  • 二次 KeyBy 进行结果统计,然后输出。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    DataStream sourceStream = ...;
    resultStream = sourceStream
    .map(record -> {
    Record record = JSON.parseObject(record, Record.class);
    String type = record.getType();
    record.setType(type + "#" + new Random().nextInt(100));
    return record;
    })
    .keyBy(0)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(new CountAggregate())
    .map(count -> {
    String key = count.getKey.substring(0, count.getKey.indexOf("#"));
    return RecordCount(key,count.getCount);
    })
    //二次聚合
    .keyBy(0)
    .process(new CountProcessFunction);

    resultStream.sink()...
    env.execute()...
    其中 CountAggregate 函数实现如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public class CountAggregate implements AggregateFunction<Record,CountRecord,CountRecord> {
    @Override
    public CountRecord createAccumulator() {
    return new CountRecord(null, 0L);
    }
    @Override
    public CountRecord add(Record value, CountRecord accumulator) {
    if(accumulator.getKey() == null){
    accumulator.setKey(value.key);
    }
    accumulator.setCount(value.count);
    return accumulator;
    }
    @Override
    public CountRecord getResult(CountRecord accumulator) {
    return accumulator;
    }
    @Override
    public CountRecord merge(CountRecord a, CountRecord b) {
    return new CountRecord(a.getKey(),a.getCount()+b.getCount()) ;
    }
    }
    CountProcessFunction 的实现如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    public class CountProcessFunction extends KeyedProcessFunction<String, CountRecord, CountRecord> {
    private ValueState<Long> state = this.getRuntimeContext().getState(new ValueStateDescriptor("count",Long.class));
    @Override
    public void processElement(CountRecord value, Context ctx, Collector<CountRecord> out) throws Exception {
    if(state.value()==0){
    state.update(value.count);
    ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 1000L * 5);
    }else{
    state.update(state.value() + value.count);
    }
    }
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<CountRecord> out) throws Exception {
    //这里可以做业务操作,例如每 5 分钟将统计结果发送出去
    //out.collect(...);
    //清除状态
    state.clear();
    //其他操作
    ...
    }
    }
    通过上面打散聚合再二次聚合的方式,我们就可以实现热点 Key 的打散,消除数据倾斜。

    GroupBy + Aggregation 分组聚合热点问题

    业务上通过 GroupBy 进行分组,然后紧跟一个 SUM、COUNT 等聚合操作是非常常见的。我们都知道 GroupBy 函数会根据 Key 进行分组,完全依赖 Key 的设计,如果 Key 出现热点,那么会导致巨大的 shuffle,相同 key 的数据会被发往同一个处理节点;如果某个 key 的数据量过大则会直接导致该节点成为计算瓶颈,引起反压。


    我们还是按照上面的分组统计 PV 的场景,SQL 语句如下:
    1
    2
    3
    4
    5
    6
    7
    8
    select
    date,
    type,
    sum(count) as pv
    from table
    group by
    date,
    type;
    我们可以通过内外两层聚合的方式将 SQL 改写为:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    select date,
    type,
    sum(pv) as pv
    from(
    select
    date,
    type,
    sum(count) as pv
    from table
    group by
    date,
    type,
    floor(rand()*100) --随机打散成100份
    )
    group by
    date,
    type;
    在上面的 SQL 拆成了内外两层,第一层通过随机打散 100 份的方式减少数据热点,当然这个打散的方式可以根据业务灵活指定。

通常我们在使用 Flink 处理实时业务时,上游一般都是消息系统,Kafka 是使用最广泛的大数据消息系统。当使用 Flink 消费 Kafka 数据时,也会出现数据倾斜。


需要十分注意的是,我们 Flink 消费 Kafka 的数据时,是推荐上下游并行度保持一致,即 Kafka 的分区数等于 Flink Consumer 的并行度。


但是会有一种情况,为了加快数据的处理速度,来设置 Flink 消费者的并行度大于 Kafka 的分区数。如果你不做任何的设置则会导致部分 Flink Consumer 线程永远消费不到数据。这时候你需要设置 Flink 的 Redistributing,也就是数据重分配。


Flink 提供了多达 8 种重分区策略,类图如下图所示:

在我们接收到 Kafka 消息后,可以通过自定义数据分区策略来实现数据的负载均衡,例如:
复制

1
2
3
4
5
6
dataStream
.setParallelism(2)
// 采用REBALANCE分区策略重分区
.rebalance() //.rescale()
.print()
.setParallelism(4);

其中,Rebalance 分区策略,数据会以 round-robin 的方式对数据进行再次分区,可以全局负载均衡。
Rescale 分区策略基于上下游的并行度,会将数据以循环的方式输出到下游的每个实例中。

其他

Flink 一直在不断地迭代,不断出现各种各样的手段解决我们遇到的数据倾斜问题。例如,MiniBatch 微批处理手段等,需要我们开发者不断地去发现,并学习新的解决问题的办法。