Hadoop核心源码剖析系列(一)
第一时间获取好内容
通过《Hadoop之HDFS架构演进之路》这篇文章我们能了解到HDFS的核心组成就是NameNode和DataNode,开始剖析源码之前,我们要先理解HDFS的本质,HDFS是一个分布式文件系统,那么就意味着核心功能必然就是读数据和写数据,读写数据的过程会涉及到的另外一个问题,就是数据如何读,怎么写,写到哪里去,怎么取出来,这就是另外一个核心功能:元数据管理,所以我们只需要解决以上问题,很大程度上就算是精通了HDFS。
看到下面的图片,想必大家应该都很熟悉,那么进程中的NameNode和DataNode是怎么产生的呢?
对于一个百万级别代码行数的技术框架的源码阅读和剖析其实是很难的,尤其是在不清楚源码架构设计的情况下显得更加难以下手,我们只能通过能看到的东西一步一步去剖析,对于分布式框架,首先我们要了解它底层的网络通信架构。
Hadoop各个节点之间和客户端之间的通信是基于RPC协议,RPC(Remote Procedure Call Protocol)即远程调用协议。不同进程的方法调用,本质上就是客户端调用服务端的方法,方法的执行在服务端。
一个RPC的实现主要包含三个部分,客户端、服务端和网络协议接口,见下图。
在pom.xml引入依赖:
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.0</version>
</dependency>
</dependencies>
定义通信协议接口:
/**
* 网络协议
*/
public interface Protocol {
//定义版本号,可自定义
long versionID=123456789L;
void hello(String msg);
void add(int num);
}
定义服务端实现类:
package RPC;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import java.io.IOException;
/**
* RPC 服务端
*/
public class NameNodeRPCServer implements Protocol {
public static void main(String[] args) throws IOException {
Server server = new RPC.Builder(new Configuration())
.setBindAddress('bigdata01')
.setPort(9999)
.setProtocol(Protocol.class)
.setInstance(new NameNodeRPCServer())
.build();
//启动服务端
System.out.println('我是RPC服务端,我准备启动了');
server.start();
System.out.println('启动完成');
}
@Override
public void hello(String msg) {
System.out.println(' hello ' + msg);
}
}
定义客户端类:
··· package RPC;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import java.io.IOException;
import java.net.InetSocketAddress;
/**
* RPC 客户端
*/
public class Client {
public static void main(String[] args) throws IOException {
Protocol namenode = RPC.getProxy(Protocol.class,
Protocol.versionID,
new InetSocketAddress('bigdata01',9999),
new Configuration());
namenode.hello('hadoop architechure');
启动NameNodeRPCServer,到服务器控制台执行jps,你会发现多了一个NameNodeRPCServer进程,所以不管是NameNode还是DataNode,其实都是一个RPC进程,于是我们可以从NameNode和DataNode这两个类入手(本篇文章我们只讲NameNode,DataNode放到下篇文章讲解)。
在这里我们选择一个企业中用得比较多且稳定的hadoop版本来进行展开,这里选定hadoop 2.7.3 版本,跟大家分享一种高效阅读源码的方法——场景驱动,比如说你想了解组件启动的过程,写数据或者读数据的过程,在阅读源码的过程中,有目的地去阅读,从大方向把握,遇到无关紧要的代码方法可以先直接略过,关注重点的代码,把整个流程走通了,后面想深究细节层面的东西,再回头去深入了解,这样的话会比较省时省力,我们以NameNode的启动初始化流程为例来说明。
剖析一个类,首先看类的注释说明,其次找 main() 方法,结合流程图阅读。
···
* NameNode serves as both directory namespace manager and
* 'inode table' for the Hadoop DFS. There is a single NameNode
* running in any DFS deployment. (Well, except when there
* is a second backup/failover NameNode, or when using federated NameNodes.)
*
* The NameNode controls two critical tables:
* 1) filename-> blocksequence (namespace)
* 2) block-> machinelist ('inodes')
*
*
*
* NameNode服务既管理了HDFS的集群的命名空间和 'inode table'。
* 一个HDFS集群里面只有一个NameNode.(除了HA方案,或者是联邦)
*
* Namenode管理了两张极其重要的表:
* 1)一张表管理了文件与block之间的映射关系。
* 2)另一张表管理了block文件块与 DataNode主机之间的映射关系。
*
* The first table is stored on disk and is very precious.
* The second table is rebuilt every time the NameNode comes up.
*
* 第一张表存储到了磁盘上面。(因为文件与block块之间的关系是不会发生变化的)
* 每次NameNode重启的时候重新构建第二张映射表。
*
* 'NameNode' refers to both this class as well as the 'NameNode server'.
* The 'FSNamesystem' class actually performs most of the filesystem
* management. The majority of the 'NameNode' class itself is concerned
* with exposing the IPC interface and the HTTP server to the outside world,
* plus some configuration management.
*
*
* Namenode服务是由三个重要的类支撑的:
* 1)NameNode类:
* 管理配置的参数
* 2)NameNode server:
* IPC Server:
* NameNodeRPCServer:开放端口,等待别人调用.比如:8020/9000
* HTTP Server:
* NameNodeHttpServer:开放50070界面,我们可以通过这个界面了解HDFS的情况
* 3) FSNameSystem:
* 这个类非常重要,管理了HDFS的元数据。
找到NameNode的 main()方法入口,我们只取关键代码剖析,避免钻牛角尖,不然只会让自己越陷越深。
public static void main(String argv[]) throws Exception {
//解析参数
if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
//如果参数异常则退出进程
System.exit(0);
}
try {
StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
//创建NameNode的核心代码
NameNode namenode = createNameNode(argv, null);
if (namenode != null) {
//让线程阻塞在这儿。这也就是为什么大家敲jps命令的时候能一直看到NameNode进程
namenode.join();
}
} catch (Throwable e) {
LOG.error('Failed to start namenode.', e);
terminate(1, e);
}
}
在 main() 中,我们整体看下来,可以发现下面这一行是关键代码,开始创建NameNode后台进程,所以我们主要看createNameNode() 这个方法。
NameNode namenode = createNameNode(argv, null);
跳转到 createNameNode() 方法,首先会判断启动HDFS时传入的参数是否合法,然后针对参数执行不同的操作。
public static NameNode createNameNode(String argv[], Configuration conf)
throws IOException {
LOG.info('createNameNode ' + Arrays.asList(argv));
if (conf == null)
conf = new HdfsConfiguration();
/**
* 我们操作HDFS集群的时候会传进来如下的参数:
*
* hdfs namenode -format
*
* hadoop-daemon.sh start namenode
*/
StartupOption startOpt = parseArguments(argv);
if (startOpt == null) {
printUsage(System.err);
return null;
}
setStartupOption(conf, startOpt);
switch (startOpt) {
case FORMAT: {
boolean aborted = format(conf, startOpt.getForceFormat(),
startOpt.getInteractiveFormat());
terminate(aborted ? 1 : 0);
return null; // avoid javac warning
}
case GENCLUSTERID: {
System.err.println('Generating new cluster id:');
System.out.println(NNStorage.newClusterID());
terminate(0);
return null;
}
case FINALIZE: {
System.err.println('Use of the argument '' + StartupOption.FINALIZE +
'' is no longer supported. To finalize an upgrade, start the NN ' +
' and then run `hdfs dfsadmin -finalizeUpgrade'');
terminate(1);
return null; // avoid javac warning
}
case ROLLBACK: {
boolean aborted = doRollback(conf, true);
terminate(aborted ? 1 : 0);
return null; // avoid warning
}
case BOOTSTRAPSTANDBY: {
String toolArgs[] = Arrays.copyOfRange(argv, 1, argv.length);
int rc = BootstrapStandby.run(toolArgs, conf);
terminate(rc);
return null; // avoid warning
}
case INITIALIZESHAREDEDITS: {
boolean aborted = initializeSharedEdits(conf,
startOpt.getForceFormat(),
startOpt.getInteractiveFormat());
terminate(aborted ? 1 : 0);
return null; // avoid warning
}
case BACKUP:
case CHECKPOINT: {
NamenodeRole role = startOpt.toNodeRole();
DefaultMetricsSystem.initialize(role.toString().replace(' ', ''));
return new BackupNode(conf, role);
}
case RECOVER: {
NameNode.doRecovery(startOpt, conf);
return null;
}
case METADATAVERSION: {
printMetadataVersion(conf);
terminate(0);
return null; // avoid javac warning
}
case UPGRADEONLY: {
DefaultMetricsSystem.initialize('NameNode');
new NameNode(conf);
terminate(0);
return null;
}
default: {
DefaultMetricsSystem.initialize('NameNode');
//关键代码
return new NameNode(conf);
}
}
}
public NameNode(Configuration conf) throws IOException {
this(conf, NamenodeRole.NAMENODE);
}
protected NameNode(Configuration conf, NamenodeRole role)
throws IOException {
this.conf = conf;
this.role = role;
setClientNamenodeAddress(conf);
String nsId = getNameServiceId(conf);
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
state = createHAState(getStartupOption(conf));
this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
this.haContext = createHAContext();
try {
initializeGenericKeys(conf, nsId, namenodeId);
//我们在阅读分析源码的时候,我们一定要留意关键的方法。
//初始化的配置方法
initialize(conf);
try {
haContext.writeLock();
state.prepareToEnterState(haContext);
state.enterState(haContext);
} finally {
haContext.writeUnlock();
}
} catch (IOException e) {
this.stop();
throw e;
} catch (HadoopIllegalArgumentException e) {
this.stop();
throw e;
}
this.started.set(true);
}
**
* Initialize name-node.
*
* @param conf the configuration
*/
protected void initialize(Configuration conf) throws IOException {
if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {
String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
if (intervals != null) {
conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
intervals);
}
}
UserGroupInformation.setConfiguration(conf);
loginAsNameNodeUser(conf);
NameNode.initMetrics(conf, this.getRole());
StartupProgressMetrics.register(startupProgress);
/**
* namenode的启动流程
* 服务端:
* RPCServer
* 9000/8020
* HttpServer
* 50070
*/
if (NamenodeRole.NAMENODE == role) {
//启动HTTPServer
startHttpServer(conf);
}
this.spanReceiverHost = SpanReceiverHost.getInstance(conf);
//加载元数据
//加载元数据这个事,目前对集群刚启动的时候,我们不做重点分析。
//在后面源码剖析系列会专门来讲元数据的管理。
//根据场景驱动,集群刚初始化启动,所以其实没什么元数据。
loadNamesystem(conf);
//这里就是Hadoop RPC
//NameNodeRPCServer里面有两个主要的RPC服务:
//1)ClientRPCServer: 主要管理的协议是:hdfs的客户端(用户)去操作HDFS的方法
//2)ServiceRPCServer: 主要管理的协议:服务之间互相进行的方法的调用(注册,心跳等)
rpcServer = createRpcServer(conf);
if (clientNamenodeAddress == null) {
// This is expected for MiniDFSCluster. Set it now using
// the RPC server's bind address.
clientNamenodeAddress =
NetUtils.getHostPortString(rpcServer.getRpcAddress());
LOG.info('Clients are to use ' + clientNamenodeAddress + ' to access'
+ ' this namenode/service.');
}
if (NamenodeRole.NAMENODE == role) {
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());
}
pauseMonitor = new JvmPauseMonitor(conf);
pauseMonitor.start();
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
//启动一些公共的服务。NameNode RPC的服务就是在里面启动的
//1)进行资源检查,检查是否有磁盘足够存储元数据
//2)进入安全模式检查,检查是否可以退出安全模式。
startCommonServices(conf);
private void startHttpServer(final Configuration conf) throws IOException {
//getHttpServerBindAddress 里面设置了主机名和端口号
httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf));
httpServer.start();
httpServer.setStartupProgress(startupProgress);
}
protected InetSocketAddress getHttpServerBindAddress(Configuration conf) {
//里面设置的是50070端口
InetSocketAddress bindAddress = getHttpServerAddress(conf);
// If DFS_NAMENODE_HTTP_BIND_HOST_KEY exists then it overrides the
// host name portion of DFS_NAMENODE_HTTP_ADDRESS_KEY.
final String bindHost = conf.getTrimmed(DFS_NAMENODE_HTTP_BIND_HOST_KEY);
if (bindHost != null && !bindHost.isEmpty()) {
bindAddress = new InetSocketAddress(bindHost, bindAddress.getPort());
}
return bindAddress;
}
protected InetSocketAddress getHttpServerAddress(Configuration conf) {
return getHttpAddress(conf);
}
/** @return the NameNode HTTP address.
* public static final int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;
* public static final String DFS_NAMENODE_HTTP_ADDRESS_KEY = 'dfs.namenode.http-address';
* public static final String DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = '0.0.0.0:' + DFS_NAMENODE_HTTP_PORT_DEFAULT;
*/
public static InetSocketAddress getHttpAddress(Configuration conf) {
return NetUtils.createSocketAddr(
conf.getTrimmed(DFS_NAMENODE_HTTP_ADDRESS_KEY, DFS_NAMENODE_HTTP_ADDRESS_DEFAULT));
protected NameNodeRpcServer createRpcServer(Configuration conf)
throws IOException {
return new NameNodeRpcServer(conf, this);
}
//由于NameNodeRpcServer(conf)方法的内容过长,在这里只取关键部门进行说明
public NameNodeRpcServer(Configuration conf, NameNode nn)
throws IOException {
this.nn = nn;
this.namesystem = nn.getNamesystem();
this.retryCache = namesystem.getRetryCache();
this.metrics = NameNode.getNameNodeMetrics();
int handlerCount =
conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY,
DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
ProtobufRpcEngine.class);
/**
* 如下就是一堆协议,协议里面就会有一堆方法
*/
//客户端调用namenode的那些方法,都在这个协议里面。
ClientNamenodeProtocolServerSideTranslatorPB
clientProtocolServerTranslator =
new ClientNamenodeProtocolServerSideTranslatorPB(this);
BlockingService clientNNPbService = ClientNamenodeProtocol.
newReflectiveBlockingService(clientProtocolServerTranslator);
//datanode之间需要互相调用的协议。
DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator =
new DatanodeProtocolServerSideTranslatorPB(this);
BlockingService dnProtoPbService = DatanodeProtocolService
.newReflectiveBlockingService(dnProtoPbTranslator);
//namenode之间互相调用的协议
NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =
new NamenodeProtocolServerSideTranslatorPB(this);
BlockingService NNPbService = NamenodeProtocolService
.newReflectiveBlockingService(namenodeProtocolXlator);
......
//看到这里是不是觉得很熟悉,启动serviceRpcServer服务,用来监控DataNode发送过来的请求
this.serviceRpcServer = new RPC.Builder(conf)
.setProtocol( org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
.setInstance(clientNNPbService)
.setBindAddress(bindHost) .setPort(serviceRpcAddr.getPort()).setNumHandlers(serviceHandlerCount)
.setVerbose(false) .setSecretManager(namesystem.getDelegationTokenSecretManager())
.build();
//然后又添加了很多的通信协议,最后都会被namenode继承,从而实现更多的方法
// Add all the RPC protocols that the namenode implements
DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
serviceRpcServer);
DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
serviceRpcServer);
DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
serviceRpcServer);
......
//启动了clientRpcServer
//这个服务是主要服务于 用户使用的客户端发送过来的请求的
this.clientRpcServer = new RPC.Builder(conf)
.setProtocol(
.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
.setInstance(clientNNPbService).setBindAddress(bindHost)
.setPort(rpcAddr.getPort()).setNumHandlers(handlerCount)
.setVerbose(false)
.setSecretManager(namesystem.getDelegationTokenSecretManager()).buil
/** Start the services common to active and standby states */
private void startCommonServices(Configuration conf) throws IOException {
//元数据管理
namesystem.startCommonServices(conf, haContext);
registerNNSMXBean();
if (NamenodeRole.NAMENODE != role) {
startHttpServer(conf);
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());
}
//RPC服务端启动起来了
rpcServer.start();
plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,
ServicePlugin.class);
for (ServicePlugin p: plugins) {
try {
p.start(this);
} catch (Throwable t) {
LOG.warn('ServicePlugin ' + p + ' could not be started', t);
}
}
LOG.info(getRole() + ' RPC up at: ' + rpcServer.getRpcAddress());
if (rpcServer.getServiceRpcAddress() != null) {
LOG.info(getRole() + ' service RPC up at: '
+ rpcServer.getServiceRpcAddress());
}
}
资源检查/**
* Start services common to both active and standby states
*
* namenode启动的时候,存储资源检查
*/
void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
this.registerMBean(); // register the MBean for the FSNamesystemState
writeLock();
this.haContext = haContext;
try {
//NameNode资源检查 通过core-site.xml hdfs-site.xml两个文件,就知道了元数据存在哪儿?
//需要检查三个目录,因为这三个目录都涉及到了元数据
//(1)NameNode的两个目录:存储fsiamge的目录,存储editlog的目录。但是一般情况下,或者默认情况这两个使用的是同一个目录。
//加载了配置文件,配置文件里面有存储元数据的目录。
nnResourceChecker = new NameNodeResourceChecker(conf);
//检查是否有足够的磁盘存储元数据
checkAvailableResources();
assert safeMode != null && !isPopulatingReplQueues();
StartupProgress prog = NameNode.getStartupProgress();
prog.beginPhase(Phase.SAFEMODE);
prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
getCompleteBlocksTotal());
//HDFS的安全模式
setBlockTotal();
//启动重要心跳服务
blockManager.activate(conf);
} finally {
writeUnlock();
}
registerMXBean();
DefaultMetricsSystem.instance().register(this);
if (inodeAttributeProvider != null) {
inodeAttributeProvider.start();
dir.setINodeAttributeProvider(inodeAttributeProvider);
}
snapshotManager.registerMXBean();
}
//检查元数据目录阈值,100M
//public static final String DFS_NAMENODE_DU_RESERVED_KEY = 'dfs.namenode.resource.du.reserved';
//public static final long DFS_NAMENODE_DU_RESERVED_DEFAULT = 1024 * 1024 * 100; // 100 MB
public NameNodeResourceChecker(Configuration conf) throws IOException {
this.conf = conf;
volumes = new HashMap<String, CheckedVolume>();
//计算磁盘阈值
duReserved = conf.getLong(DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_KEY,
DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_DEFAULT);
}
1.检查磁盘资源是否足够
/**
* Perform resource checks and cache the results.
*/
void checkAvailableResources() {
Preconditions.checkState(nnResourceChecker != null,
'nnResourceChecker not initialized');
//检查是否有足够的磁盘空间
//如果资源不够那么就返回hasResourcesAvailable = false,则不满足NameNode初始化条件
hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace();
}
/**
* Set the total number of blocks in the system.
*/
public void setBlockTotal() {
// safeMode is volatile, and may be set to null at any time
SafeModeInfo safeMode = this.safeMode;
if (safeMode == null)
return;
//设置安全模式
//getCompleteBlocksTotal()获取所有正常的block的个数
safeMode.setBlockTotal((int)getCompleteBlocksTotal());
}
2.检查NameNode是否处理安全模式
/**
* Set the total number of blocks in the system.
*/
public void setBlockTotal() {
// safeMode is volatile, and may be set to null at any time
SafeModeInfo safeMode = this.safeMode;
if (safeMode == null)
return;
//设置安全模式
//在HDFS中Block有两种状态,一种是正常的complete block数据块,一种是正在构建中的underconstruction block
//getCompleteBlocksTotal()获取所有正常的block的个数
safeMode.setBlockTotal((int)getCompleteBlocksTotal());
}
/**
* Set total number of blocks.
*
* total:complete状态的block的个数,也就是我们正常的block的个数
*
* 假设有1000 block
*
*/
private synchronized void setBlockTotal(int total) {
this.blockTotal = total;
//计算阈值
//1000 * 0.999 = 999
this.blockThreshold = (int) (blockTotal * threshold);
//999
this.blockReplQueueThreshold =
(int) (blockTotal * replQueueThreshold);
if (haEnabled) {
// After we initialize the block count, any further namespace
// modifications done while in safe mode need to keep track
// of the number of total blocks in the system.
this.shouldIncrementallyTrackBlocks = true;
}
if(blockSafe < 0)
this.blockSafe = 0;
//检查安全模式
checkMode();
}
/**
* Check and trigger safe mode if needed.
*/
private void checkMode() {
// Have to have write-lock since leaving safemode initializes
// repl queues, which requires write lock
assert hasWriteLock();
if (inTransitionToActive()) {
return;
}
// if smmthread is already running, the block threshold must have been
// reached before, there is no need to enter the safe mode again
//判断是否进入安全模式
if (smmthread == null && needEnter()) {
//进入安全模式
enter();
// check if we are ready to initialize replication queues
if (canInitializeReplQueues() && !isPopulatingReplQueues()
&& !haEnabled) {
initializeReplQueues();
}
reportStatus('STATE* Safe mode ON.', false);
return;
}
}
什么时候回进入安全模式:
1.当datanode汇报的block小于0.999阈值算出来的block总数
2.当存活的datanode小于一定初始化集群datanode的数量的情况
3.当NameNode元数据目录空间小于100M时
以上三种情况会导致HDFS进入安全模式
private boolean needEnter() {
//1000 * 0.999 =999
return (threshold != 0 && blockSafe < blockThreshold) ||
(datanodeThreshold != 0 && getNumLiveDataNodes() < datanodeThreshold) ||
(!nameNodeHasResourcesAvailable());
}