深入理解map/reduce
。。。hadoop只是个工具,map/reduce没有多神奇
?
package mapreduce;import java.io.IOException;import java.util.ArrayList;import java.util.List;import java.util.Map;import java.util.TreeMap;public abstract class Mapper<K0, V0, K, V> {private final Map<K, List<V>> map = new TreeMap<K, List<V>>();public Map<K, List<V>> getResult() {return map;}public abstract void map(K0 key, V0 value, Context context)throws IOException, InterruptedException;public class Context {public void write(K k, V v) {List<V> list = map.get(k);if (list == null) {list = new ArrayList<V>();}list.add(v);map.put(k, list);}}}?
package mapreduce;import java.io.IOException;import java.util.Map;import java.util.TreeMap;public abstract class Reducer<K, V, K1, V1> {private final TreeMap<K1, V1> map = new TreeMap<K1, V1>();public Map<K1, V1> getResult() {return map;}public abstract void reduce(K k, Iterable<V> list, Context context)throws IOException, InterruptedException;public class Context {public void write(K1 k, V1 v) {map.put(k, v);}}}?
package mapreduce;import java.io.IOException;public interface Tool {int run(String[] data) throws IOException, InterruptedException;}?
wordcount
?
package mr.maillog;import java.io.IOException;import java.util.Iterator;import java.util.List;import java.util.Map;import mapreduce.Mapper;import mapreduce.Reducer;import mapreduce.Tool;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class WordCount implements Tool {private static final Logger LOG = LoggerFactory.getLogger(WordCount.class);public static final class M extends Mapper<Long, String, String, Long> {@Overridepublic void map(Long key, String value, Context context)throws IOException, InterruptedException {String tmp = value.toLowerCase();String[] line = tmp.split("[\\s]+");for (String word : line) {context.write(word, 1L);}}}public static final class R extends Reducer<String, Long, String, Long> {@Overridepublic void reduce(String k, Iterable<Long> list, Context context)throws IOException, InterruptedException {Long count = 0L;for (Long item : list) {count += item;}context.write(k, count);}}@Overridepublic int run(String[] data) throws IOException, InterruptedException {int len = data.length;M m = new M();for (int i = 0; i < len; i++) {m.map(0L + i, data[i], m.new Context());}Map<String, List<Long>> mapResult = m.getResult();R r = new R();Iterator<String> it = mapResult.keySet().iterator();while (it.hasNext()) {String k = it.next();r.reduce(k, mapResult.get(k), r.new Context());}Map<String, Long> reduceResult = r.getResult();it = reduceResult.keySet().iterator();while (it.hasNext()) {String k = it.next();LOG.info("{}\t{}", k.toString(), reduceResult.get(k));}return 1;}/** * @param args * @throws IOException * @throws InterruptedException */public static void main(String[] args) throws IOException,InterruptedException {LOG.info("Hi");String[] data = {"What Is Apache Hadoop?","The Apache? Hadoop? project develops open-source software for reliable, scalable, distributed computing.","The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using a simple programming model. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high-avaiability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-availabile service on top of a cluster of computers, each of which may be prone to failures.","The project includes these subprojects:","Hadoop Common: The common utilities that support the other Hadoop subprojects.","Hadoop Distributed File System (HDFS?): A distributed file system that provides high-throughput access to application data.","Hadoop MapReduce: A software framework for distributed processing of large data sets on compute clusters.","Other Hadoop-related projects at Apache include:","Avro?: A data serialization system.","Cassandra?: A scalable multi-master database with no single points of failure.","Chukwa?: A data collection system for managing large distributed systems.","HBase?: A scalable, distributed database that supports structured data storage for large tables.","Hive?: A data warehouse infrastructure that provides data summarization and ad hoc querying.","Mahout?: A Scalable machine learning and data mining library.","Pig?: A high-level data-flow language and execution framework for parallel computation.","ZooKeeper?: A high-performance coordination service for distributed applications." };new WordCount().run(data);}}?