Hadoop源代码分析完整版.doc
《Hadoop源代码分析完整版.doc》由会员分享,可在线阅读,更多相关《Hadoop源代码分析完整版.doc(83页珍藏版)》请在淘文阁 - 分享文档赚钱的网站上搜索。
1、Hadoop源代码分析(一)关键字: 分布式 云计算 Google的核心竞争技术是它的计算平台。Google的大牛们用了下面5篇文章,介绍了它们的计算设施。 GoogleCluster: Chubby: GFS: BigTable: MapReduce: 很快,Apache上就出现了一个类似的解决方案,目前它们都属于Apache的Hadoop项目,对应的分别是: Chubby-ZooKeeper GFS-HDFS BigTable-HBase MapReduce-Hadoop 目前,基于类似思想的Open Source项目还很多,如Facebook用于用户分析的Hive。 HDFS作为一个分布
2、式文件系统,是所有这些项目的基础。分析好HDFS,有利于了解其他系统。由于Hadoop的HDFS和MapReduce是同一个项目,我们就把他们放在一块,进行分析。下图是MapReduce整个项目的顶层包图和他们的依赖关系。Hadoop包之间的依赖关系比较复杂,原因是HDFS提供了一个分布式文件系统,该系统提供API,可以屏蔽本地文件系统和分布式文件系统,甚至象Amazon S3这样的在线存储系统。这就造成了分布式文件系统的实现,或者是分布式文件系统的底层的实现,依赖于某些貌似高层的功能。功能的相互引用,造成了蜘蛛网型的依赖关系。一个典型的例子就是包conf,conf用于读取系统配置,它依赖于f
3、s,主要是读取配置文件的时候,需要使用文件系统,而部分的文件系统的功能,在包fs中被抽象了。Hadoop的关键部分集中于图中蓝色部分,这也是我们考察的重点。 大小: 78.3 KB Hadoop源代码分析(二)下面给出了Hadoop的包的功能分析。 PackageDependencestool提供一些命令行工具,如DistCp,archivemapreduceHadoop的Map/Reduce实现filecache提供HDFS文件的本地缓存,用于加快Map/Reduce的数据访问速度fs文件系统的抽象,可以理解为支持多种文件系统实现的统一文件访问接口hdfsHDFS,Hadoop的分布式文件系
4、统实现ipc一个简单的IPC的实现,依赖于io提供的编解码功能参考:io表示层。将各种数据编码/解码,方便于在网络上传输net封装部分网络功能,如DNS,socketsecurity用户和用户组信息conf系统的配置参数metrics系统统计数据的收集,属于网管范畴util工具类record根据DDL(数据描述语言)自动生成他们的编解码函数,目前可以提供C+和Javahttp基于Jetty的HTTP Servlet,用户通过浏览器可以观察文件系统的一些状态信息和日志log提供HTTP访问日志的HTTP ServletHadoop源代码分析(三)由于Hadoop的MapReduce和HDFS都有
5、通信的需求,需要对通信的对象进行序列化。Hadoop并没有采用Java的序列化,而是引入了它自己的系统。org.apache.hadoop.io中定义了大量的可序列化对象,他们都实现了Writable接口。实现了Writable接口的一个典型例子如下:Java代码 1. public class MyWritable implements Writable 2. / Some data 3. private int counter; 4. private long timestamp; 5. 6. public void write(DataOutput out) throws IOExcep
6、tion 7. out.writeInt(counter); 8. out.writeLong(timestamp); 9. 10. 11. public void readFields(DataInput in) throws IOException 12. counter = in.readInt(); 13. timestamp = in.readLong(); 14. 15. 16. public static MyWritable read(DataInput in) throws IOException 17. MyWritable w = new MyWritable(); 18
7、. w.readFields(in); 19. return w; 20. 21. public class MyWritable implements Writable / Some data private int counter; private long timestamp; public void write(DataOutput out) throws IOException out.writeInt(counter); out.writeLong(timestamp); public void readFields(DataInput in) throws IOException
8、 counter = in.readInt(); timestamp = in.readLong(); public static MyWritable read(DataInput in) throws IOException MyWritable w = new MyWritable(); w.readFields(in); return w; 其中的write和readFields分别实现了把对象序列化和反序列化的功能,是Writable接口定义的两个方法。下图给出了庞大的org.apache.hadoop.io中对象的关系。 这里,我把ObjectWritable标为红色,是因为相对于
9、其他对象,它有不同的地位。当我们讨论Hadoop的RPC时,我们会提到RPC上交换的信息,必须是Java的基本类型,String和Writable接口的实现类,以及元素为以上类型的数组。ObjectWritable对象保存了一个可以在RPC上传输的对象和对象的类型信息。这样,我们就有了一个万能的,可以用于客户端/服务器间传输的Writable对象。例如,我们要把上面例子中的对象作为RPC请求,需要根据MyWritable创建一个ObjectWritable,ObjectWritable往流里会写如下信息对象类名长度,对象类名,对象自己的串行化结果这样,到了对端,ObjectWritable可以
10、根据对象类名创建对应的对象,并解串行。应该注意到,ObjectWritable依赖于WritableFactories,那存储了Writable子类对应的工厂。我们需要把MyWritable的工厂,保存在WritableFactories中(通过WritableFactories.setFactory)。Hadoop源代码分析(五)介绍完org.apache.hadoop.io以后,我们开始来分析org.apache.hadoop.rpc。RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。当我们讨论HDFS的,通信可能发生在: Client-NameNode之
11、间,其中NameNode是服务器 Client-DataNode之间,其中DataNode是服务器 DataNode-NameNode之间,其中NameNode是服务器 DataNode-DateNode之间,其中某一个DateNode是服务器,另一个是客户端 如果我们考虑Hadoop的Map/Reduce以后,这些系统间的通信就更复杂了。为了解决这些客户机/服务器之间的通信,Hadoop引入了一个RPC框架。该RPC框架利用的Java的反射能力,避免了某些RPC解决方案中需要根据某种接口语言(如CORBA的IDL)生成存根和框架的问题。但是,该RPC框架要求调用的参数和返回结果必须是Java
12、的基本类型,String和Writable接口的实现类,以及元素为以上类型的数组。同时,接口方法应该只抛出IOException异常。(参考自既然是RPC,当然就有客户端和服务器,当然,org.apache.hadoop.rpc也就有了类Client和类Server。但是类Server是一个抽象类,类RPC封装了Server,利用反射,把某个对象的方法开放出来,变成RPC中的服务器。下图是org.apache.hadoop.rpc的类图。 大小: 130.3 KB Hadoop源代码分析(六)既然是RPC,自然就有客户端和服务器,当然,org.apache.hadoop.rpc也就有了类Cli
13、ent和类Server。在这里我们来仔细考察org.apache.hadoop.rpc.Client。下面的图包含了org.apache.hadoop.rpc.Client中的关键类和关键方法。由于Client可能和多个Server通信,典型的一次HDFS读,需要和NameNode打交道,也需要和某个/某些DataNode通信。这就意味着某一个Client需要维护多个连接。同时,为了减少不必要的连接,现在Client的做法是拿ConnectionId(图中最右侧)来做为Connection的ID。ConnectionId包括一个InetSocketAddress(IP地址+端口号或主机名+端口
14、号)对象和一个用户信息对象。这就是说,同一个用户到同一个InetSocketAddress的通信将共享同一个连接。 连接被封装在类Client.Connection中,所有的RPC调用,都是通过Connection,进行通信。一个RPC调用,自然有输入参数,输出参数和可能的异常,同时,为了区分在同一个Connection上的不同调用,每个调用都有唯一的id。调用是否结束也需要一个标记,所有的这些都体现在对象Client.Call中。Connection对象通过一个Hash表,维护在这个连接上的所有Call:Java代码 1. private Hashtable calls = new Hash
15、table(); private Hashtable calls = new Hashtable(); 一个RPC调用通过addCall,把请求加到Connection里。为了能够在这个框架上传输Java的基本类型,String和Writable接口的实现类,以及元素为以上类型的数组,我们一般把Call需要的参数打包成为ObjectWritable对象。Client.Connection会通过socket连接服务器,连接成功后回校验客户端/服务器的版本号(Client.ConnectionwriteHeader()方法),校验成功后就可以通过Writable对象来进行请求的发送/应答了。注意,
16、每个Client.Connection会起一个线程,不断去读取socket,并将收到的结果解包,找出对应的Call,设置Call并通知结果已经获取。Call使用Obejct的wait和notify,把RPC上的异步消息交互转成同步调用。还有一点需要注意,一个Client会有多个Client.Connection,这是一个很自然的结果。Hadoop源代码分析(七)聊完了Client聊Server,按惯例,先把类图贴出来。 需要注意的是,这里的Server类是个抽象类,唯一抽象的地方,就是Java代码 1. public abstract Writable call(Writable param,
17、 long receiveTime) throws IOException; public abstract Writable call(Writable param, long receiveTime) throws IOException; 这表明,Server提供了一个架子,Server的具体功能,需要具体类来完成。而具体类,当然就是实现call方法。我们先来分析Server.Call,和Client.Call类似,Server.Call包含了一次请求,其中,id和param的含义和Client.Call是一致的。不同点在后面三个属性,connection是该Call来自的连接,当然,当
18、请求处理结束时,相应的结果会通过相同的connection,发送给客户端。属性timestamp是请求到达的时间戳,如果请求很长时间没被处理,对应的连接会被关闭,客户端也就知道出错了。最后的response是请求处理的结果,可能是一个Writable的串行化结果,也可能一个异常的串行化结果。Server.Connection维护了一个来之客户端的socket连接。它处理版本校验,读取请求并把请求发送到请求处理线程,接收处理结果并把结果发送给客户端。Hadoop的Server采用了Java的NIO,这样的话就不需要为每一个socket连接建立一个线程,读取socket上的数据。在Server中,
19、只需要一个线程,就可以accept新的连接请求和读取socket上的数据,这个线程,就是上面图里的Listener。请求处理线程一般有多个,它们都是Server.Handle类的实例。它们的run方法循环地取出一个Server.Call,调用Server.call方法,搜集结果并串行化,然后将结果放入Responder队列中。对于处理完的请求,需要将结果写回去,同样,利用NIO,只需要一个线程,相关的逻辑在Responder里。Hadoop源代码分析(八)(注:本节需要用到一些Java反射的背景)有了Client和Server,很自然就能RPC啦。下面轮到RPC.java啦。一般来说,分布式对
20、象一般都会要求根据接口生成存根和框架。如CORBA,可以通过IDL,生成存根和框架。但是,在org.apache.hadoop.rpc,我们就不需要这样的步骤了。上类图。为了分析Invoker,我们需要介绍一些Java反射实现Dynamic Proxy的背景。Dynamic Proxy是由两个class实现的:java.lang.reflect.Proxy 和 java.lang.reflect.InvocationHandler,后者是一个接口。所谓Dynamic Proxy是这样一种class:它是在运行时生成的class,在生成它时你必须提供一组interface给它,然后该class就
21、宣称它实现了这些interface。这个Dynamic Proxy其实就是一个典型的Proxy模式,它不会替你作实质性的工作,在生成它的实例时你必须提供一个handler,由它接管实际的工作。这个handler,在Hadoop的RPC中,就是Invoker对象。我们可以简单地理解:就是你可以通过一个接口来生成一个类,这个类上的所有方法调用,都会传递到你生成类时传递的InvocationHandler实现中。在Hadoop的RPC中,Invoker实现了InvocationHandler的invoke方法(invoke方法也是InvocationHandler的唯一方法)。Invoker会把所有
22、跟这次调用相关的调用方法名,参数类型列表,参数列表打包,然后利用前面我们分析过的Client,通过socket传递到服务器端。就是说,你在proxy类上的任何调用,都通过Client发送到远方的服务器上。Invoker使用Invocation。Invocation封装了一个远程调用的所有相关信息,它的主要属性有: methodName,调用方法名,parameterClasses,调用方法参数的类型列表和parameters,调用方法参数。注意,它实现了Writable接口,可以串行化。RPC.Server实现了org.apache.hadoop.ipc.Server,你可以把一个对象,通过R
- 配套讲稿:
如PPT文件的首页显示word图标,表示该PPT已包含配套word讲稿。双击word图标可打开word文档。
- 特殊限制:
部分文档作品中含有的国旗、国徽等图片,仅作为作品整体效果示例展示,禁止商用。设计者仅对作品中独创性部分享有著作权。
- 关 键 词:
- Hadoop 源代码 分析 完整版
限制150内