Flink 分布式缓存-demo

分布式缓存

Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。

此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。


当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,仅会执行一次。**用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它。



Broadcast 广播变量

一句话解释,可以理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的任务在节点上都能够获取到,这个数据在每个节点上只会存在一份。如果不使用broadcast,则在每个节点中的每个任务中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。
注意:
1:广播出去的变量存在于每个节点的内存中,所以这个数据集不能太大,避免发生OOM。因为广播出去的数据,会常驻内存,除非程序执行结束。
2:广播变量在初始化广播出去以后不支持修改,这样才能保证每个节点的数据都是一致的。
个人建议:如果数据集在几十兆或者百兆的时候,可以选择进行广播,如果数据集的大小上G的话,就不建议进行广播了。

区别

1.广播变量是基于内存的,是将变量分发到各个worker节点的内存上(避免多次复制,节省内存)
2.分布式缓存是基于磁盘的,将文件copy到各个节点上,当函数运行时可以在本地文件系统检索该文件(避免多次复制,提高执行效率)

Code

接下来我们来看一下简单的使用方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.configuration.Configuration;
import java.io.File;
import java.util.ArrayList;
import java.util.List;

/**
* 分布式缓存
* 第一步:首先需要在 env 环境中注册一个文件,该文件可以来源于本地,也可以来源于 HDFS ,并且为该文件取一个名字。
* 第二步:在使用分布式缓存时,可根据注册的名字直接获取。
*/

public class DistributeCache {
public static void main(String[] args) throws Exception {

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerCachedFile("/Users/cpeixin/cache/distributedcache.txt", "distributedCache");
//1:注册一个文件,可以使用hdfs上的文件 也可以是本地文件进行测试
DataSource<String> data = env.fromElements("Linea", "Lineb", "Linec", "Lined");

// RichFuction除了提供原来MapFuction的方法之外,还提供open, close, getRuntimeContext 和setRuntimeContext方法,
// 这些功能可用于参数化函数(传递参数),创建和完成本地状态,访问广播变量以及访问运行时信息以及有关迭代中的信息。
DataSet<String> result = data.map(new RichMapFunction<String, String>() {
private ArrayList<String> dataList = new ArrayList<String>();

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//2:使用该缓存文件
File myFile = getRuntimeContext().getDistributedCache().getFile("distributedCache");
List<String> lines = FileUtils.readLines(myFile);
for (String line : lines) {
dataList.add(line);
System.err.println("分布式缓存为:" + line);
}
}

@Override
public String map(String value) throws Exception {
//在这里就可以使用dataList
System.err.println("使用datalist:" + dataList + "-------" +value);
//业务逻辑
return dataList +":" + value;
}
});
result.printToErr();
}

}


下面给出一个更加贴合生产场景下的需求实现,在流计算中使用DistributedCache来完成我们的计算,整个过程其实有些像与维度数据的join

1
2
3
4
5
6
7
 1. Prepare resources on hdfs
[robin@node01 ~]$ hdfs dfs -mkdir /flink/cache
[robin@node01 ~]$ vi gender.txt
1, male
2, female
[robin@node01 ~]$ hdfs dfs -put gender.txt /flink/cache
2. Turn on the source (socket)

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package distributeCache

import java.io.File
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import scala.collection.mutable
import scala.io.{BufferedSource, Source}

object DistrubutedCacheTest {
def main(args: Array[String]): Unit = {
//1. Environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

//2. Read the resources on hdfs and set them in the distributed cache
env.registerCachedFile("hdfs://node01:9000/flink/cache/gender.txt","hdfsGenderInfo")

//3. Read the student information sent by the socket in real time, calculate it, and output the result
//(101,"jackson",1,"Shanghai")
env.socketTextStream("node01",8888)
.filter((_: String).trim.nonEmpty)
.map(new RichMapFunction[String,(Int,String,Char,String)] {

//Used to store student information read from the distributed cache
val map:mutable.Map[Int,Char]= mutable.HashMap()
var bs: BufferedSource = _

override def open(parameters: Configuration): Unit = {
//1. Read the data stored in the distributed cache
var file:File = getRuntimeContext.getDistributedCache.getFile("hdfsGenderInfo")

//2. Encapsulate the read information into a map instance for storage
bs = Source.fromFile(file)
val lst: List[String] = bs.getLines().toList
for(perLine <-lst){
val arr: Array[String] = perLine.split(",")
val genderFlg: Int = arr(0).trim.toInt
val genderName: Char = arr(1).trim.toCharArray()(0)
map.put(genderFlg,genderName)
}
}

override def map(perStudentInfo: String): (Int, String, Char, String) = {
//Get student details
val arr: Array[String] = perStudentInfo.split(",")
val id: Int = arr(0).trim.toInt
val name: String = arr(1).trim
val genderFlg: Int = arr(2).trim.toInt
val address: String = arr(3).trim
//According to the data in the distributed cache stored in the container Map, replace the gender identifier in the student information with the real gender
var genderName: Char = map.getOrElse(genderFlg, 'x')
(id, name, genderName, address)
}

override def close(): Unit = {
if(bs != null){
bs.close()
}
}
}).print("The information completed by the student is ->")

//4. Start
env.execute(this.getClass.getSimpleName)
}
}


结果
image.png
image.png