首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

Hadoop Yarn解决多种应用兼容方法

2013-07-09 
Hadoop Yarn解决多类应用兼容方法package org.apache.hadoop.mapreduce// 省去一些import 信息/** * Prov

Hadoop Yarn解决多类应用兼容方法
package org.apache.hadoop.mapreduce;// 省去一些import 信息/** * Provides a way to access information about the map/reduce cluster. */@InterfaceAudience.Public@InterfaceStability.Evolvingpublic class Cluster { @InterfaceStability.Evolving public static enum JobTrackerStatus {INITIALIZING, RUNNING}; private ClientProtocolProvider clientProtocolProvider; //该客户端协议的提供者,MRv2的提供类为LocalClientProtocolProvider, YarnClientProtocol;而Tez的提供类为YarnTezClientProtocolProvider。 private ClientProtocol client; //生成的是YarnRunner对象 //省略一些类属性... //利用ServiceLoader去加载ClientProtocolProvider的继承类,这是JDK提供的功能:多个服务提供者可以实现相同的接口去实现不同的逻辑,这需要按照一定的策略加载相关的文件配置。ServiceLoader实现了Iterable接口,可以直接迭代访问,采用Lazy加载的方式。 private static ServiceLoader<ClientProtocolProvider> frameworkLoader = ServiceLoader.load(ClientProtocolProvider.class); // 略去一些构造函数 ...// client端JobClient调用Cluster构造函数去初始化集群信息public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { this.conf = conf; this.ugi = UserGroupInformation.getCurrentUser(); initialize(jobTrackAddr, conf); } //初始化集群信息,主要是建立client与JT/RM的连接 private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { synchronized (frameworkLoader) { //for循环访问产生一个ClientProtocolProvider实现,frameWorkLoader里面的访问对象是根据在classpath的加载顺序来访问的,因此其加载顺序的不同会获得不一样的对象。 for (ClientProtocolProvider provider : frameworkLoader) { LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName()); ClientProtocol clientProtocol = null; try { if (jobTrackAddr == null) { clientProtocol = provider.create(conf); // 创建YarnRunner对象,建立与RM的连接 } else { clientProtocol = provider.create(jobTrackAddr, conf);//给定地址创建 } if (clientProtocol != null) { clientProtocolProvider = provider; //赋给属性供下次使用 client = clientProtocol; LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider"); break; //一旦得到一个新的对象(该对象不为NULL)就退出 } else { LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol"); } } catch (Exception e) { LOG.info("Failed to use " + provider.getClass().getName() + " due to error: " + e.getMessage()); } } } if (null == clientProtocolProvider || null == client) { throw new IOException( "Cannot initialize Cluster. Please check your configuration for " + MRConfig.FRAMEWORK_NAME + " and the correspond server addresses."); } } // 省略...}?

?

?从Cluster类中,可以看到Java提供的类ServiceLoader。ServiceLoader是服务加载类,它根据文件配置来在java classpath环境中加载对应接口的实现类。

?

?其配置规定在 classpath中需要把接口名命名成文件名,并把该文件放到classpath环境的META-INF/services文件中。例如:org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider是MRv2提供的接口,那么在Tez部署的client jar包中会有一个相对路径文件名META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider,里面写着Tez的ClientProtocolProvider实现类的限定名:org.apache.tez.mapreduce.YarnTezClientProtocolProvider。

ServiceLoader方法会自动去在classpath环境变量里面按需按序加载这些实现类。

3. ClientProtocolProvider客户端协议提供类

ClientProtocolProvider方法提供两个create方法和一个close方法。create方法用来创建clientProtocol实例,而close用于关闭实例。org.apache.tez.mapreduce.YarnTezClientProtocolProvider是实现代码如下:

?

package org.apache.tez.mapreduce;import java.io.IOException;import java.net.InetSocketAddress;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.mapreduce.protocol.ClientProtocol;import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;import org.apache.tez.mapreduce.hadoop.MRConfig;public class YarnTezClientProtocolProvider extends ClientProtocolProvider {  @Override  public ClientProtocol create(Configuration conf) throws IOException {    //根据mapreduce.framework.name的值来比较你需要的值:FRAMEWORK_NAME=mapreduce.framework.name,YARN_TEZ_FRAMEWORK_NAME=yarn-tez。如果conf里面的参数值是yarn-tez那么就创建一个Tez应用的org.apache.tez.mapreduce.YARNRunner对象。    if (MRConfig.YARN_TEZ_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {      return new YARNRunner(conf);    }    return null;  }  @Override  public ClientProtocol create(InetSocketAddress addr, Configuration conf)      throws IOException {    return create(conf);  }  @Override  public void close(ClientProtocol clientProtocol) throws IOException {    // nothing to do  }}
MRv2的org.apache.hadoop.mapred.YarnClientProtocolProvider的实现逻辑与Tez的一样,都是创建各自的YarnRunner。 MRv2还实现了一个本地运行的ClientProtocolProvider:org.apache.hadoop.mapred.LocalClientProtocolProvider,其就是创建一个org.apache.hadoop.mapred.LocalJobRunner对象,使其初始化本地环境而不需要建立socket与RM通信。结合Cluster.initialize()方法和ClientProtocolProvider.create()代码中可以看到,它根据mapreduce.framework.name参数来确定你的Job采用哪类应用。各自的应用只需要实现相应的接口就可以把自己的Job运行在Yarn上。4. 向Yarn提交Job运行类 YarnRunner?

ClientProtocolProvider用来构造ClientProtocol实现类YarnRunner,它直接产生一个RM代理ResourceMgrDelegate。在MRv2中,ResourceMgrDelegate继承了YarnClientImpl抽象类(同时YarnClientImpl实现了YarnClient接口),通过ApplicationClientProtocol代理直接向RM提交Job,杀死Job,查看Job运行状态等操作。

实际了上,只需要能过YarnClient.createYarnClient()静态方法就可以得到YarnClientImpl对象。

?

PS:Hadoop 的开发版发展速度出奇地快。因此如果想获得hadoop最新进展,最好经常把trunk代码更新下来。 其MRv2的API可以兼容老版的API,接口还是比较稳定,对原有代码的支持度都比较高,但是从其底层实现细节还正在处于不断地进化中。也正是其更新速度快,往往其它依赖Hadoop Yarn的生态系统出现一种滞后状态,例如Tez,Hama等系统,从它们的代码依赖里可以看出这种差别很明显,从它们的实现逻辑上都可以看出端倪。现在Hadoop 已经进入了2.1.0?beta开发阶段,相信在不久的将来,Hadoop 2.0 Stable版本就要发布了。我们拭目以待。

?

?

?

?

?

?

?

?

?

?

?

?

热点排行