2023年12月9日发(作者:致母亲)
本文的目的在于详细记录2.4.1版本下hadoop的DataNode的启动过程,作此记录,也为以后回过头看DataNode留下方便。本文的思路是结合DataNode的代码,来分析他的启动过程。
说明,限于篇幅,对文章中引用的代码都只留了关键部分。
DataNode同NameNode都一样,是一个java进程,所以从main方法开始,看代码:
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class DataNode extends Configured
implements InterDatanodeProtocol, ClientDatanodeProtocol,
DataNodeMXBean {
public static void main(String args[]) {
cureMain(args, null);
}
public static void cureMain(String args[], SecureResources resources) {
DataNode datanode = createDataNode(args, null, resources);
}
public static DataNode createDataNode(String args[], Configuration conf,
SecureResources resources) throws IOException {
DataNode dn = instantiateDataNode(args, conf, resources);
if (dn != null) {
anodeDaemon();
}
return dn;
}
public static DataNode instantiateDataNode(String args [], Configuration conf, SecureResources resources) throws IOException {
return makeInstance(dataLocations, conf, resources);
}
static DataNode makeInstance(Collection
Configuration conf, SecureResources resources) throws IOException {
return new DataNode(conf, locations, resources); 29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
}
DataNode(final Configuration conf,
final List
final SecureResources resources) throws IOException {
startDataNode(conf, dataDirs, resources);
}
public void runDatanodeDaemon() throws IOException {
ll();
// start dataXceiveServer
();
if (localDataXceiverServer != null) {
();
}
();
startPlugins(conf);
}
}
上面的代码跟踪不难,但是最终需要注意两个方法:startDataNode和runDatanodeDaemon方法,前面一个用于初始化DataNode,后面一个启动DataNode的后台线程,这些线程是会伴随DataNode进程一直跑着的。接着,让我们重点研究下方法startDataNode,看代码:
?
1
2
3
4
5
6
7
8
void startDataNode(Configuration conf,
List
// DatanodeProtocol namenode,
SecureResources resources
) throws IOException {
storage = new DataStorage();
// global DN ttings 9
10
11
12
13
14
15
16
17
18
registerMXBean();
initDataXceiver(conf);
startInfoServer(conf);
pauMonitor = new JvmPauMonitor(conf);
();
initIpcServer(conf);
blockPoolManager = new BlockPoolManager(this);
hNamenodes(conf);
}
registerMXBean这个方法可以忽略,用来注册MBean信息;initDataXceiver这个方法应该来说还是比较重要,实例化的dataXceiverServer用来接受客户端或者其他datanode的数据接收或者发送请求;startInfoServer方法用来启动datanode的web服务器;pauMonitor用来监控jvm是否有停顿;initIpcServer方法比较重要,用来启动datanode上的rpc服务,主要包括两个服务:ClientDatanodeProtocolPB和InterDatanodeProtocolPB。
然后属于DataNode的重点来了,blockPoolManager对象的实例化,注意一点,2.4.1 这个版本的hadoop已经支持了hadoop Federation的特性,而blockPooolManager就是支撑这个特性来的。现在让我们来看看他里面的东西。还是先上代码吧。
?
1
2
3
4
5
6
7
8
class BlockPoolManager {
BlockPoolManager(DataNode dn) {
= dn;
}
void refreshNamenodes(Configuration conf)
throws IOException {
synchronized (refreshNamenodesLock) { 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
doRefreshNamenodes(newAddressMap);
}
}
private void doRefreshNamenodes(
Map
synchronized (this) {
startAll();
}
}
synchronized void startAll() throws IOException {
for (BPOfferService bpos : offerServices) {
();
}
}
}
class BPOfferService {
void start() {
for (BPServiceActor actor : bpServices) {
();
}
}
}
class BPServiceActor implements Runnable {
void start() {
if ((bpThread != null) && (e())) {
//Thread is started already
return;
}
bpThread = new Thread(this, formatThreadName());
mon(true); // needed for JUnit testing
();
}
public void run() {
while (true) {
connectToNNAndHandshake();
break;
}
while (shouldRun()) { 53
54
55
56
57
58
59
60
61
62
63
64
offerService();
}
}
private void offerService() throws Exception {
while (shouldRun()) {
HeartbeatRespon resp = ndHeartBeat();
List
}
}
}
顺着代码往下走,整个思路都会比较清晰了,BPServiceActor这个类做了具体的事情,包括datanode跟namenode的握手,发送心跳和报告块信息,执行namenode发回来的命名。
详细的过程就不啰嗦了。
到这里DataNode的启动过程就搞了一个段落。
本文发布于:2023-12-09 21:31:43,感谢您对本站的认可!
本文链接:https://www.wtabcd.cn/zhishi/a/1702128704241138.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文word下载地址:Hadoop中DataNode的启动过程详解.doc
本文 PDF 下载地址:Hadoop中DataNode的启动过程详解.pdf
留言与评论(共有 0 条评论) |