`

Mahout K-means clustering 入门

 
阅读更多

Mahout下处理的文件必须是SequenceFile格式的,所以需要把txtfile转换成sequenceFile。SequenceFile是Hadoop中的一个类,允许我们向文件中写入二进制的键值对。
       Mahout中提供了一种将指定文件下的文件转换成sequenceFile的方式。(You may find Tika (http://lucene.apache.org/tika) helpful in converting binary documents to text.)
使用方法如下

$MAHOUT_HOME/bin/mahout seqdirectory \
--input <PARENT DIR WHERE DOCS ARE LOCATED> --output <OUTPUT DIRECTORY> \
<-c <CHARSET NAME OF THE INPUT DOCUMENTS> {UTF-8|cp1252|ascii...}> \
<-chunk <MAX SIZE OF EACH CHUNK in Megabytes> 64> \
<-prefix <PREFIX TO ADD TO THE DOCUMENT ID>>

 

    举个例子:

bin/mahout seqdirectory --input /hive/hadoopuser/ --output /mahout/seq/ --charset UTF-8
   

 

运行k-means例子

Kmeans算法思想
首先从n个数据对象任意选择 k 个对象作为初始聚类中心;而对于所剩下其它对象,则根据它们与这些聚类中心的相似度(距离),分别将它们分配给与其最相似的(聚类中心所代表的)聚类;然后再计算每个所获新聚类的聚类中心(该聚类中所有对象的均值);不断重复这一过程直到标准测度函数开始收敛为止。

运行过程:参照官网https://cwiki.apache.org/confluence/display/MAHOUT/Clustering+of+synthetic+control+data 步骤:

首先,下载数据集synthetic_control.data,在以上官网上的Input data set. Download it here点击可下载,并将其导入到分布式文件系统上,$HADOOP_HOME/bin/hadoop fs -mkdir testdata
              $HADOOP_HOME/bin/hadoop fs -put  /home/hadoop/synthetic_control.data testdata

其次,使用k-means算法,在mahout的安装目录下直接mahout org.apache.mahout.clustering.syntheticcontrol.kmeans.Job或是$HADOOP_HOME/bin/hadoop jar /home/hadoop/mahout-distribution-0.4/mahout-examples-0.4-job.jar org.apache.mahout.clustering.syntheticcontrol.kmeans.Job这里运行时间会长点,因为迭代,请耐心等待

最后,查看运行结果,如果在控制台直接显示结果:mahout vectordump --seqFile /user/hadoop/output/data/part-00000,或者依次运行命令:$HADOOP_HOME/bin/hadoop fs -lsr output  $HADOOP_HOME/bin/hadoop fs -get output $MAHOUT_HOME/examples(将结果从分布式文件系统上导下来),$cd MAHOUT_HOME/examples/output 看到以下结果,那么算法运行成功:canopies    clusters-1  clusters-3  clusters-5  clusters-7  points
clusters-0  clusters-2  clusters-4  clusters-6  data

好长一段时间都不知知道怎么查看kmeans的结果,例如想查看clusters-i中的 part-r-00000时,应该将其从分布式上导入到本地的txt格式(命令):

./mahout seqdumper -s /user/hadoop /output/cluster-9/part-r-00000 -o /home/hadoop/out/part-0
   

其中n为某类的样本数目,c为各类各属性的中心,r为各类属性的半径。

 

mahout Kmeans聚类实现 :
(1)参数input指定待聚类的所有数据点,clusters指定初始聚类中心
如果指定参数k,由org.apache.mahout.clustering.kmeans.RandomSeedGenerator.buildRandom,通过org.apache.hadoop.fs直接从input指定文件中随机读取k个点放入clusters中
(2)根据原数据点和上一次迭代(或初始聚类)的聚类中心计算本次迭代的聚类中心,输出到clusters-N目录下。
该过程由org.apache.mahout.clustering.kmeans下的KMeansMapper\KMeansCombiner\KMeansReducer\KMeansDriver实现
KMeansMapper:在configure中初始化mapper时读入上一次迭代产生或初始聚类中心(每个mapper都读入所有的聚类中心);map方法对输入的每个点,计算距离其最近的类,并加入其中输出key为该点所属聚类ID,value为KMeansInfo实例,包含点的个数和各分量的累加和。KMeansCombiner:本地累加KMeansMapper输出的同一聚类ID下的点个数和各分量的和KMeansReducer:累加同一聚类ID下的点个数和各分量的和,求本次迭代的聚类中心;并根据输入Delta判断该聚类是否已收敛:上一次迭代聚类中心与本次迭代聚类中心距离<Delta;输出各聚类中心和其是否收敛标记。KMeansDriver:控制迭代过程直至超过最大迭代次数或所有聚类都已收敛,每轮迭代后,KMeansDriver读取其clusters-N目录下的所有聚类,若所有聚类已收敛,则整个Kmeans聚类过程收敛了。

bin/mahout kmeans \
    -i <input vectors directory> \
    -c <input clusters directory> \
    -o <output working directory> \
    -k <optional number of initial clusters to sample from input vectors> \
    -dm <DistanceMeasure> \
    -x <maximum number of iterations> \
    -cd <optional convergence delta. Default is 0.5> \
    -ow <overwrite output directory if present>
    -cl <run input vector clustering after computing Canopies>
    -xm <execution method: sequential or mapreduce>
注意: 当-k被指定的时候,-c目录下的所有聚类都将被重写,将从输入的数据向量中随机抽取-k个点作为初始聚类的中心。

参数调整 :mahout Kmeans聚类有两个重要参数:收敛Delta和最大迭代次数。个人觉得Delta值越小,表示收敛条件越高,因此最终收敛的聚类数可能会降低,而最大迭代次数可通过观察每次迭代后收敛聚类数决定,当收敛聚类数几乎不再变化或震荡时可停止迭代了。

 

 

分享到:
评论
2 楼 chenwq 2012-06-03  
package org.apache.mahout.clustering.kmeans;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.clustering.canopy.CanopyDriver;
import org.apache.mahout.clustering.conversion.InputDriver;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.ClassUtils;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.commandline.DefaultOptionCreator;
import org.apache.mahout.common.distance.DistanceMeasure;
import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure;
import org.apache.mahout.utils.clustering.ClusterDumper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class Job extends AbstractJob {

private static final Logger log = LoggerFactory.getLogger(Job.class);

private static final String DIRECTORY_CONTAINING_CONVERTED_INPUT = "data";

private Job() {
}

public static void main(String[] args) throws Exception {
if (args.length > 0) {
log.info("Running with only user-supplied arguments");
ToolRunner.run(new Configuration(), new Job(), args);
} else {
log.info("Running with default arguments");
Path output = new Path("output");
Configuration conf = new Configuration();
HadoopUtil.delete(conf, output);
run(conf, new Path("testdata"), output,
new EuclideanDistanceMeasure(), 6, 0.5, 10);

}
}

@Override
public int run(String[] args) throws Exception {
System.out.println("run args");
addInputOption();
addOutputOption();
addOption(DefaultOptionCreator.distanceMeasureOption().create());
addOption(DefaultOptionCreator.numClustersOption().create());
addOption(DefaultOptionCreator.t1Option().create());
addOption(DefaultOptionCreator.t2Option().create());
addOption(DefaultOptionCreator.convergenceOption().create());
addOption(DefaultOptionCreator.maxIterationsOption().create());
addOption(DefaultOptionCreator.overwriteOption().create());

Map<String, String> argMap = parseArguments(args);
if (argMap == null) {
return -1;
}

Path input = getInputPath();
Path output = getOutputPath();
String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
if (measureClass == null) {
measureClass = SquaredEuclideanDistanceMeasure.class.getName();
}
double convergenceDelta = Double
.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
int maxIterations = Integer
.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
HadoopUtil.delete(getConf(), output);
}
DistanceMeasure measure = ClassUtils.instantiateAs(measureClass,
DistanceMeasure.class);
if (hasOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION)) {
int k = Integer
.parseInt(getOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION));
run(getConf(), input, output, measure, k, convergenceDelta,
maxIterations);
} else {
double t1 = Double
.parseDouble(getOption(DefaultOptionCreator.T1_OPTION));
double t2 = Double
.parseDouble(getOption(DefaultOptionCreator.T2_OPTION));
System.out.println("output: " + output);
System.out.println("t1: " + t1);
System.out.println("t2: " + t2);
run(getConf(), input, output, measure, t1, t2, convergenceDelta,
maxIterations);
}
return 0;
}

/**
* Run the kmeans clustering job on an input dataset using the given the
* number of clusters k and iteration parameters. All output data will be
* written to the output directory, which will be initially deleted if it
* exists. The clustered points will reside in the path
* <output>/clustered-points. By default, the job expects a file containing
* equal length space delimited data that resides in a directory named
* "testdata", and writes output to a directory named "output".
*
* @param conf
*            the Configuration to use
* @param input
*            the String denoting the input directory path
* @param output
*            the String denoting the output directory path
* @param measure
*            the DistanceMeasure to use
* @param k
*            the number of clusters in Kmeans
* @param convergenceDelta
*            the double convergence criteria for iterations
* @param maxIterations
*            the int maximum number of iterations
*/
public static void run(Configuration conf, Path input, Path output,
DistanceMeasure measure, int k, double convergenceDelta,
int maxIterations) throws Exception {
Path directoryContainingConvertedInput = new Path(output,
DIRECTORY_CONTAINING_CONVERTED_INPUT);
log.info("Preparing Input");
InputDriver.runJob(input, directoryContainingConvertedInput,
"org.apache.mahout.math.RandomAccessSparseVector");
log.info("Running random seed to get initial clusters");
Path clusters = new Path(output, Cluster.INITIAL_CLUSTERS_DIR);
clusters = RandomSeedGenerator.buildRandom(conf,
directoryContainingConvertedInput, clusters, k, measure);
log.info("Running KMeans");
KMeansDriver.run(conf, directoryContainingConvertedInput, clusters,
output, measure, convergenceDelta, maxIterations, true, false);
// run ClusterDumper
ClusterDumper clusterDumper = new ClusterDumper(finalClusterPath(conf,
output, maxIterations), new Path(output, "clusteredPoints"));
clusterDumper.printClusters(null);
}

/**
* Run the kmeans clustering job on an input dataset using the given
* distance measure, t1, t2 and iteration parameters. All output data will
* be written to the output directory, which will be initially deleted if it
* exists. The clustered points will reside in the path
* <output>/clustered-points. By default, the job expects the a file
* containing synthetic_control.data as obtained from
* http://archive.ics.uci.
* edu/ml/datasets/Synthetic+Control+Chart+Time+Series resides in a
* directory named "testdata", and writes output to a directory named
* "output".
*
* @param conf
*            the Configuration to use
* @param input
*            the String denoting the input directory path
* @param output
*            the String denoting the output directory path
* @param measure
*            the DistanceMeasure to use
* @param t1
*            the canopy T1 threshold
* @param t2
*            the canopy T2 threshold
* @param convergenceDelta
*            the double convergence criteria for iterations
* @param maxIterations
*            the int maximum number of iterations
*/
public static void run(Configuration conf, Path input, Path output,
DistanceMeasure measure, double t1, double t2,
double convergenceDelta, int maxIterations) throws Exception {

System.out.println("run canopy output: " + output);
Path directoryContainingConvertedInput = new Path(output,
DIRECTORY_CONTAINING_CONVERTED_INPUT);
log.info("Preparing Input");
InputDriver.runJob(input, directoryContainingConvertedInput,
"org.apache.mahout.math.RandomAccessSparseVector");
log.info("Running Canopy to get initial clusters");
CanopyDriver.run(conf, directoryContainingConvertedInput, output,
measure, t1, t2, false, false);
log.info("Running KMeans");
System.out.println("kmeans cluster starting...");
KMeansDriver.run(conf, directoryContainingConvertedInput, new Path(
output, Cluster.INITIAL_CLUSTERS_DIR+"-final"), output, measure,
convergenceDelta, maxIterations, true, false);
// run ClusterDumper
ClusterDumper clusterDumper = new ClusterDumper(finalClusterPath(conf,
output, maxIterations), new Path(output, "clusteredPoints"));
clusterDumper.printClusters(null);
}

/**
* Return the path to the final iteration's clusters
*/
private static Path finalClusterPath(Configuration conf, Path output,
int maxIterations) throws IOException {
FileSystem fs = FileSystem.get(conf);
for (int i = maxIterations; i >= 0; i--) {
Path clusters = new Path(output, "clusters-" + i);
if (fs.exists(clusters)) {
return clusters;
}
}
return null;
}
}
1 楼 chenwq 2012-06-03  
http://blog.csdn.net/lwm_1985/article/details/7221464

相关推荐

Global site tag (gtag.js) - Google Analytics