首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

Hadoop远路开发

2012-12-25 
Hadoop远程开发?哎,最近公司又让我做flex的东西。也没什么时间搞Hadoop了。这里先把这段时间的东西做一个简

Hadoop远程开发

?

哎,最近公司又让我做flex的东西。也没什么时间搞Hadoop了。这里先把这段时间的东西做一个简单的总结。

?

思路:把本地的类编译打包->上传到Hadoop集群环境->运行。

上述属于废话,网上的相关内容很多。

?

Hadoop配置方面的就不多说了,baidu一下你就知道。

为了达到远程开发的目的,之前在配置过程中犯了很多错误。主要是masters里的问题,还有我的ubuntu的hosts文件总出问题。

?

另外,又简单看了看hadoop源码,唯一的感觉就是到处都是配置项。

?

这里还是主要是贴代码吧。

?

实现:还以WordCount为例。

?

?

代码如下:

??? WordCount.java

package org.yangzc.hadoop.demo;import java.io.File;import java.io.IOException;import java.util.StringTokenizer;import java.util.UUID;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.MapFile.Reader;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.yangzc.hadoop.tools.FileTools;import org.yangzc.hadoop.tools.JarTools;import org.yangzc.hadoop.tools.JobTools;/** * 测试用例 * @author yangzc * */public class WordCount {public static class TokenizerMapper extendsMapper<Object, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context)throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}public static class IntSumReducer extendsReducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();FileTools.copyToRemote(conf, "C:/Documents and Settings/Administrator/桌面/test.txt", "hdfs://ubuntu:9000/usr/devsoft/tmp/ss1.txt");//FileTools.copyToLocal(conf, "C:/Documents and Settings/Administrator/桌面/indexhaha.txt", "hdfs://ubuntu:9000/usr/devsoft/tmp/e8e035dcd123404fa8cf8f132fa37e4a");JobTools.addClassPath("D:/workspace/Hadoop/conf");Thread.currentThread().setContextClassLoader(JobTools.getClassLoader());File jarpath = JarTools.makeJar("bin");conf.set("mapred.jar", jarpath.toString());Job job = new Job(conf, "word count");job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path("hdfs://ubuntu:9000/usr/devsoft/tmp/ss1.txt"));Path destpath = new Path("hdfs://ubuntu:9000/usr/devsoft/tmp/" + UUID.randomUUID().toString().replace("-", ""));FileOutputFormat.setOutputPath(job, destpath);Path rtnpath = FileOutputFormat.getOutputPath(job);System.out.println(rtnpath);job.waitForCompletion(false);//FileTools.copyDirToLocal(conf, "C:/Documents and Settings/Administrator/桌面", destpath.toString());Reader reader = new Reader(FileSystem.get(conf),destpath.toString()+"/part-r-00000", conf);Writable wa = reader.get(new Text("hao"), new IntWritable(1));System.out.println(wa.toString());}}

?

帮助类:

FileTools.java

package org.yangzc.hadoop.tools;import java.io.File;import java.io.FileOutputStream;import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;public class FileTools {public static List<File> getFileList(String parent){List<File> filelst = new ArrayList<File>();//取得文件列表getList(filelst, parent);return filelst;}private static void getList(List<File> filelst, String parent){File p = new File(parent);if(!p.exists())return;if(p.isDirectory()){File clst[] = p.listFiles();if(clst == null || clst.length == 0)return;for(File f: clst){getList(filelst, f.getAbsolutePath());}}else{filelst.add(p);}}public static void copyToRemote(Configuration conf, String local, String dest) throws IOException{FileSystem fs = FileSystem.get(conf);fs.copyFromLocalFile(new Path(local), new Path(dest));fs.close();}public static void copyToLocal(Configuration conf, String local, String dest) throws IOException{FileSystem fs = FileSystem.get(conf);FSDataInputStream fis = fs.open(new Path(dest));FileOutputStream fos = new FileOutputStream(local);byte buf[] = new byte[1024];int len = -1;while((len = fis.read(buf, 0, 1024)) != -1){fos.write(buf, 0, len);}fos.flush();fos.close();fis.close();fs.close();}public static void copyDirToLocal(Configuration conf, String localdir, String destdir) throws IOException{if(!new File(localdir).exists())new File(localdir).mkdirs();FileSystem fs = FileSystem.get(conf);FileStatus filestatus[] = fs.listStatus(new Path(destdir));for(FileStatus f: filestatus){Path path = f.getPath();String local = localdir + "/" + path.getName();try{copyToLocal(conf, local, path.toString());}catch(Exception e){System.out.println("read remote file error!!!");}}}}

?

?? JarTools.java

package org.yangzc.hadoop.tools;import java.io.File;import java.io.FileInputStream;import java.io.FileOutputStream;import java.io.IOException;import java.util.List;import java.util.jar.JarEntry;import java.util.jar.JarOutputStream;import java.util.jar.Manifest;public class JarTools {public static File makeJar(String classPath) throws Exception{if(classPath == null || "".equals(classPath))throw new Exception("classPath不能为空!!!");JarOutputStream jos = null;try {Manifest manifest = new Manifest();manifest.getMainAttributes().putValue("Manifest-Version", "1.0");//创建临时文件final File jarfile = File.createTempFile("tmp_", ".jar", new File(System.getProperty("java.io.tmpdir")));//注册关闭钩子Runtime.getRuntime().addShutdownHook(new Thread(){@Overridepublic void run() {jarfile.deleteOnExit();}});FileOutputStream fis = new FileOutputStream(jarfile);jos = new JarOutputStream(fis, manifest);File file = new File(classPath);if(!file.isDirectory())return null;List<File> files = FileTools.getFileList(classPath);for(File f: files){String filepath = f.getAbsolutePath().replace(new File(classPath).getAbsolutePath(), "");if(filepath.startsWith("\")){filepath = filepath.substring(1);}JarEntry entry = new JarEntry(filepath.replace("\", "/"));jos.putNextEntry(entry);FileInputStream is = new FileInputStream(f);byte buf[] = new byte[1024];int len = -1;while((len = is.read(buf, 0, 1024)) != -1){jos.write(buf, 0 , len);}is.close();}jos.flush();return jarfile;} catch (IOException e) {e.printStackTrace();} finally{if(jos != null)jos.close();}return null;}public static void main(String[] args) {try {JarTools.makeJar("D:/workspace/JarMaker/bin/");} catch (Exception e) {e.printStackTrace();}}}

?

?? JobTools.java

package org.yangzc.hadoop.tools;import java.io.File;import java.io.IOException;import java.net.MalformedURLException;import java.net.URL;import java.net.URLClassLoader;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.mapreduce.Job;public class JobTools {private static List<URL> classPaths = new ArrayList<URL>();public static void addClassPath(String classPath){if(classPath != null && !"".equals(classPath)){File f = new File(classPath);if(!f.exists())return;try {classPaths.add(f.getCanonicalFile().toURI().toURL());} catch (MalformedURLException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}}}public static void addJarDir(String lib){File file = new File(lib);if(!file.exists())return;if(!file.isDirectory())return;File fs[] = file.listFiles();if(fs == null || fs.length == 0)return;for(File f: fs){addClassPath(f.getAbsolutePath());}}public static URLClassLoader getClassLoader(){ClassLoader classLoader = Thread.currentThread().getContextClassLoader();if(classLoader == null)classLoader = JobTools.getClassLoader();if(classLoader == null)classLoader = ClassLoader.getSystemClassLoader();return new URLClassLoader(classPaths.toArray(new URL[0]), classLoader);}public static boolean runJob(Job job){try {return job.waitForCompletion(true);} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();}return false;}public static void main(String[] args) {}}

?

热点排行