mahout之Kmeans使用及结果分析
Mahout-Kmeans
3,输出目录“clusteredPoints”,表示最终的集群结果,即每个集群中所包含的Points。
4,解压安装Mahout0.7版本。
5,Running k-Means Clustering
?
?
bin/mahout kmeans \ -i <input vectors directory> \ -c <input clusters directory> \ -o <output working directory> \ -k <optional number of initial clusters to sample from input vectors> \//如果指定-k参数,-c参数指定的目录将被overwitten随机的k个点。 -dm <DistanceMeasure> \ -x <maximum number of iterations> \ -cd <optional convergence delta. Default is 0.5> \ -ow <overwrite output directory if present> -cl <run input vector clustering after computing Canopies> //很重要,很重要,如果没有指定这个参数,只能得到最后的集群信息,不能得到集群中的Points。 -xm <execution method: sequential or mapreduce>
?
./mahout kmeans -i archer/kmeans_in -c archer/kmeans_clusters -o archer/kmeans_out -k 50 -x 10 -ow –cl
?
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.SequenceFile;import org.apache.hadoop.io.SequenceFile.CompressionType;import org.apache.hadoop.io.Writable;import org.apache.hadoop.util.ReflectionUtils;import org.apache.mahout.math.NamedVector;import org.apache.mahout.math.SequentialAccessSparseVector;import org.apache.mahout.math.Vector;import org.apache.mahout.math.VectorWritable; import java.io.IOException;import java.net.URI;import java.util.StringTokenizer; public class Convert2Kmeans { //表示向量的维数 public static int Cardinality = 6000; public static void main(String[] args) throws IOException { String uri = "/tmp/snsVec2.seq"; Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://baby6:31054"); FileSystem fs = FileSystem.get(URI.create(uri), conf); SequenceFile.Reader reader = null;//读取原来的SequenceFile,将向量封装成具有Name属性的向量 reader = new SequenceFile.Reader(fs, new Path("/kmeans_in_seq/snsVec.seq"), conf); Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); Writable val = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); try { writer = SequenceFile.createWriter(fs, conf, new Path(uri), IntWritable.class, VectorWritable.class, CompressionType.BLOCK); final IntWritable key1 = new IntWritable(); final VectorWritable value = new VectorWritable(); int lineNum = 0; Vector vector = null; while (reader.next(key, val)) { int index = 0; StringTokenizer st = new StringTokenizer(val.toString());// 将SequentialAccessSparseVector进一步封装成NamedVector类型 vector = new NamedVector(new SequentialAccessSparseVector(Cardinality), lineNum + ""); while (st.hasMoreTokens()) { if (Integer.parseInt(st.nextToken()) == 1) { vector.set(index, 1); } index++; } key1.set(lineNum++); value.set(vector); writer.append(key, value); } } finally { writer.close(); reader.close(); } }}
?
?
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.SequenceFile;import org.apache.mahout.clustering.classify.WeightedVectorWritable;import org.apache.mahout.math.NamedVector; import java.io.BufferedWriter;import java.io.File;import java.io.FileWriter;import java.io.IOException;import java.util.HashMap;import java.util.Set; public class ClusterOutput { public static void main(String[] args) { try { BufferedWriter bw; Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://baby6:31054"); FileSystem fs = FileSystem.get(conf); SequenceFile.Reader reader = null; reader = new SequenceFile.Reader(fs, new Path("/kmeans_out/clusteredPoints/part-m-00000"), conf); //将分组信息写到文件uidOfgrp.txt,每行格式为 uid \t groupID bw = new BufferedWriter(new FileWriter(new File("D:\\uidOfgrp.txt"))); HashMap<String, Integer> clusterIds; clusterIds = new HashMap<String, Integer>(120); IntWritable key = new IntWritable(); WeightedVectorWritable value = new WeightedVectorWritable(); while (reader.next(key, value)) { NamedVector vector = (NamedVector) value.getVector(); //得到Vector的Name标识 String vectorName = vector.getName(); bw.write(vectorName + "\t" + key.toString() + "\n"); //更新每个group的大小 if (clusterIds.containsKey(key.toString())) { clusterIds.put(key.toString(), clusterIds.get(key.toString()) + 1); } else clusterIds.put(key.toString(), 1); } bw.flush(); reader.close(); //将每个group的大小,写入grpSize文件中 bw = new BufferedWriter(new FileWriter(new File("D:\\grpSize.txt"))); Set<String> keys = clusterIds.keySet(); for (String k : keys) { bw.write(k + " " + clusterIds.get(k) + "\n"); } bw.flush(); bw.close(); } catch (IOException e) { e.printStackTrace(); } }}
?
?