MapReduce源码解析(一)

将Hadoop源码下载到本地后,直接用Vs Code打开,起初我是想通过IDEA,一边Debug,一边看源码的执行流程,但是Maven依赖总出问题,后来索性还是直接用Vs Code生撸吧,我选择的版本是Hadoop 2.9.2

从提交任务开始

路径:hadoop-2.9.2-src/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/WordCount.java
截屏2020-12-06 下午4.21.21.png
Command + 左键, 点击光标选中的方法
截屏2020-12-06 下午4.32.08.png
接下来跳入submit**()方法**
截屏2020-12-06 下午4.40.35.png
**
进入submitJobInternal(),这坨代码有点长,我们先看writeSplits()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
JobStatus submitJobInternal(Job job, Cluster cluster) //向系统提交作业的内部方法。
throws ClassNotFoundException, InterruptedException, IOException {

//validate the jobs output specs,校验输出路径
checkSpecs(job);

Configuration conf = job.getConfiguration(); //获取Job配置信息
addMRFrameworkToDistributedCache(conf);//配置信息添加到缓存

Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {
conf.set(MRJobConfig.USER_NAME,
UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
// get delegation token for the dir
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf);

populateTokenCache(conf, job.getCredentials());

// generate a secret to authenticate shuffle transfers
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(SHUFFLE_KEY_LENGTH);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
job.getCredentials());
}
if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
"data spill is enabled");
}

copyAndConfigureFiles(job, submitJobDir);//将可执行文件之类拷贝到HDFS中,默认的是保留10份,会存在不同的节点上

Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);

// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
//计算map数量,所有的切片信息 提交到 submitJobDir 路径上
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);

int maxMaps = conf.getInt(MRJobConfig.JOB_MAX_MAP,
MRJobConfig.DEFAULT_JOB_MAX_MAP);
if (maxMaps >= 0 && maxMaps < maps) {
throw new IllegalArgumentException("The number of map tasks " + maps +
" exceeded limit " + maxMaps);
}

// write "queue admins of the queue to which job is being submitted"
// to job file.
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

// removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
TokenCache.cleanUpTokenReferral(conf);//清除Token引用的缓存

if (conf.getBoolean(
MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
// Add HDFS tracking ids
ArrayList<String> trackingIds = new ArrayList<String>();
for (Token<? extends TokenIdentifier> t :
job.getCredentials().getAllTokens()) {
trackingIds.add(t.decodeIdentifier().getTrackingId());
}
conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
trackingIds.toArray(new String[trackingIds.size()]));
}

// Set reservation info if it exists
ReservationId reservationId = job.getReservationId();
if (reservationId != null) {
conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
}

// Write job file to submit dir
writeConf(conf, submitJobFile);

//
// Now, actually submit the job (using the submit name)
// 这里,开始正式的提交任务了。
//
printTokens(jobId, job.getCredentials());
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);

}
}
}

writeSplits(),该方法是对输入的数据进行切片处理并且返回map任务数
截屏2020-12-06 下午5.30.22.png

截屏2020-12-06 下午5.31.15.png
跳进getSplit()
截屏2020-12-06 下午5.45.29.png
在看上面的代码中,其中有一块是我的盲区,就是while (((double) bytesRemaining)/splitSize > SPLIT_SLOP)
**
其中SPLIT_SLOP默认是1.1,这里所实现的功能就是,在一个文件不断的split的时候,如果当前的待切割容量/blockSize > 1.1,则会继续进行划分,如果小于1.1,则可以不用再继续split了,举例讲的话,也就是待划分大小不大于140.8M的话,就不用切割了。

现在跳出这个方法。继续看writeNewSplits(),这里只是对文件进行了区间划分,并没有进行实际的物理切割,那接下来经过一次按照分片大小的排序后,它将切片数据写入切片文件,并得到切片元数据信息SplitMetaInfo数组info,最后返回map数。一个 Job 的Map阶段并行度由客户端在提交Job时的切片数决定(有多少个切片就有多少个 MapTask)

继续跳回submitJobInternal()
截屏2020-12-06 下午6.07.23.png
submitJob()将任务提交到集群