Mahout系列----MinHash聚类
Map:
?
Vector featureVector = features.get();
??? if (featureVector.size() < minVectorSize) {
????? return;
??? }
??? // Initialize the MinHash values to highest
??? for (int i = 0; i < numHashFunctions; i++) {
????? minHashValues[i] = Integer.MAX_VALUE;
??? }
??? for (int i = 0; i < numHashFunctions; i++) {
????? for (Vector.Element ele : featureVector.nonZeroes()) {
??????? int value = hashValue ? (int) ele.get() : ele.index();
??????? bytesToHash[0] = (byte) (value >> 24);
??????? bytesToHash[1] = (byte) (value >> 16);
??????? bytesToHash[2] = (byte) (value >> 8);
??????? bytesToHash[3] = (byte) value;
??????? int hashIndex = hashFunction[i].hash(bytesToHash);
??????? //if our new hash value is less than the old one, replace the old one
??????? if (minHashValues[i] > hashIndex) {
????????? minHashValues[i] = hashIndex;
??????? }
????? }
??? }
??? // output the cluster information
??? for (int i = 0; i < numHashFunctions; i++) {
????? StringBuilder clusterIdBuilder = new StringBuilder();
????? for (int j = 0; j < keyGroups; j++) {
??????? clusterIdBuilder.append(minHashValues[(i + j) % numHashFunctions]).append('-');
????? }
????? //remove the last dash
????? clusterIdBuilder.deleteCharAt(clusterIdBuilder.length() - 1);
????? cluster.set(clusterIdBuilder.toString());
????? if (debugOutput) {
??????? vector.set(featureVector);
??????? context.write(cluster, vector);
????? } else {
??????? context.write(cluster, item);
????? }
??? }
?
?
?protected void reduce(Text cluster, Iterable<Writable> points, Context context)
??? throws IOException, InterruptedException {
??? Collection<Writable> pointList = Lists.newArrayList();
??? for (Writable point : points) {
????? if (debugOutput) {
??????? Vector pointVector = ((VectorWritable) point).get().clone();
??????? Writable writablePointVector = new VectorWritable(pointVector);
??????? pointList.add(writablePointVector);
????? } else {
??????? Writable pointText = new Text(point.toString());
??????? pointList.add(pointText);
????? }
??? }
??? if (pointList.size() >= minClusterSize) {
????? context.getCounter(Clusters.ACCEPTED).increment(1);
????? for (Writable point : pointList) {
??????? context.write(cluster, point);
????? }
??? } else {
????? context.getCounter(Clusters.DISCARDED).increment(1);
??? }
? }
?