- 浏览: 554037 次
- 性别:
- 来自: 济南
文章分类
- 全部博客 (270)
- Ask chenwq (10)
- JSF (2)
- ExtJS (5)
- Life (19)
- jQuery (5)
- ASP (7)
- JavaScript (5)
- SQL Server (1)
- MySQL (4)
- En (1)
- development tools (14)
- Data mining related (35)
- Hadoop (33)
- Oracle (13)
- To Do (2)
- SSO (2)
- work/study diary (10)
- SOA (6)
- Ubuntu (7)
- J2SE (18)
- NetWorks (1)
- Struts2 (2)
- algorithm (9)
- funny (1)
- BMP (1)
- Paper Reading (2)
- MapReduce (23)
- Weka (3)
- web design (1)
- Data visualisation&R (1)
- Mahout (7)
- Social Recommendation (1)
- statistical methods (1)
- Git&GitHub (1)
- Python (1)
- Linux (1)
最新评论
-
brandNewUser:
楼主你好,问个问题,为什么我写的如下的:JobConf pha ...
Hadoop ChainMap -
Molisa:
Molisa 写道mapred.min.split.size指 ...
Hadoop MapReduce Job性能调优——修改Map和Reduce个数 -
Molisa:
mapred.min.split.size指的是block数, ...
Hadoop MapReduce Job性能调优——修改Map和Reduce个数 -
heyongcs:
请问导入之后,那些错误怎么解决?
Eclipse导入Mahout -
a420144030:
看了你的文章深受启发,想请教你几个问题我的数据都放到hbase ...
Mahout clustering Canopy+K-means 源码分析
Mahout下处理的文件必须是SequenceFile格式的,所以需要把txtfile转换成sequenceFile。SequenceFile是Hadoop中的一个类,允许我们向文件中写入二进制的键值对。
Mahout中提供了一种将指定文件下的文件转换成sequenceFile的方式。(You may find Tika (http://lucene.apache.org/tika) helpful in converting binary documents to text.)
使用方法如下:
$MAHOUT_HOME/bin/mahout seqdirectory \ --input <PARENT DIR WHERE DOCS ARE LOCATED> --output <OUTPUT DIRECTORY> \ <-c <CHARSET NAME OF THE INPUT DOCUMENTS> {UTF-8|cp1252|ascii...}> \ <-chunk <MAX SIZE OF EACH CHUNK in Megabytes> 64> \ <-prefix <PREFIX TO ADD TO THE DOCUMENT ID>>
举个例子:
bin/mahout seqdirectory --input /hive/hadoopuser/ --output /mahout/seq/ --charset UTF-8
运行k-means例子
Kmeans算法思想
首先从n个数据对象任意选择 k 个对象作为初始聚类中心;而对于所剩下其它对象,则根据它们与这些聚类中心的相似度(距离),分别将它们分配给与其最相似的(聚类中心所代表的)聚类;然后再计算每个所获新聚类的聚类中心(该聚类中所有对象的均值);不断重复这一过程直到标准测度函数开始收敛为止。
运行过程:参照官网https://cwiki.apache.org/confluence/display/MAHOUT/Clustering+of+synthetic+control+data的 步骤:
首先,下载数据集synthetic_control.data,在以上官网上的Input data set. Download it here点击可下载,并将其导入到分布式文件系统上,$HADOOP_HOME/bin/hadoop fs -mkdir testdata
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/synthetic_control.data testdata
其次,使用k-means算法,在mahout的安装目录下直接mahout org.apache.mahout.clustering.syntheticcontrol.kmeans.Job或是$HADOOP_HOME/bin/hadoop jar /home/hadoop/mahout-distribution-0.4/mahout-examples-0.4-job.jar org.apache.mahout.clustering.syntheticcontrol.kmeans.Job这里运行时间会长点,因为迭代,请耐心等待
最后,查看运行结果,如果在控制台直接显示结果:mahout vectordump --seqFile /user/hadoop/output/data/part-00000,或者依次运行命令:$HADOOP_HOME/bin/hadoop fs -lsr output $HADOOP_HOME/bin/hadoop fs -get output $MAHOUT_HOME/examples(将结果从分布式文件系统上导下来),$cd MAHOUT_HOME/examples/output 看到以下结果,那么算法运行成功:canopies clusters-1 clusters-3 clusters-5 clusters-7 points
clusters-0 clusters-2 clusters-4 clusters-6 data
好长一段时间都不知知道怎么查看kmeans的结果,例如想查看clusters-i中的 part-r-00000时,应该将其从分布式上导入到本地的txt格式(命令):
./mahout seqdumper -s /user/hadoop /output/cluster-9/part-r-00000 -o /home/hadoop/out/part-0
其中n为某类的样本数目,c为各类各属性的中心,r为各类属性的半径。
mahout Kmeans聚类实现 :
(1)参数input指定待聚类的所有数据点,clusters指定初始聚类中心
如果指定参数k,由org.apache.mahout.clustering.kmeans.RandomSeedGenerator.buildRandom,通过org.apache.hadoop.fs直接从input指定文件中随机读取k个点放入clusters中
(2)根据原数据点和上一次迭代(或初始聚类)的聚类中心计算本次迭代的聚类中心,输出到clusters-N目录下。
该过程由org.apache.mahout.clustering.kmeans下的KMeansMapper\KMeansCombiner\KMeansReducer\KMeansDriver实现
KMeansMapper:在configure中初始化mapper时读入上一次迭代产生或初始聚类中心(每个mapper都读入所有的聚类中心);map方法对输入的每个点,计算距离其最近的类,并加入其中输出key为该点所属聚类ID,value为KMeansInfo实例,包含点的个数和各分量的累加和。KMeansCombiner:本地累加KMeansMapper输出的同一聚类ID下的点个数和各分量的和KMeansReducer:累加同一聚类ID下的点个数和各分量的和,求本次迭代的聚类中心;并根据输入Delta判断该聚类是否已收敛:上一次迭代聚类中心与本次迭代聚类中心距离<Delta;输出各聚类中心和其是否收敛标记。KMeansDriver:控制迭代过程直至超过最大迭代次数或所有聚类都已收敛,每轮迭代后,KMeansDriver读取其clusters-N目录下的所有聚类,若所有聚类已收敛,则整个Kmeans聚类过程收敛了。
bin/mahout kmeans \
-i <input vectors directory> \
-c <input clusters directory> \
-o <output working directory> \
-k <optional number of initial clusters to sample from input vectors> \
-dm <DistanceMeasure> \
-x <maximum number of iterations> \
-cd <optional convergence delta. Default is 0.5> \
-ow <overwrite output directory if present>
-cl <run input vector clustering after computing Canopies>
-xm <execution method: sequential or mapreduce>
注意: 当-k被指定的时候,-c目录下的所有聚类都将被重写,将从输入的数据向量中随机抽取-k个点作为初始聚类的中心。
参数调整 :mahout Kmeans聚类有两个重要参数:收敛Delta和最大迭代次数。个人觉得Delta值越小,表示收敛条件越高,因此最终收敛的聚类数可能会降低,而最大迭代次数可通过观察每次迭代后收敛聚类数决定,当收敛聚类数几乎不再变化或震荡时可停止迭代了。
评论
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.clustering.canopy.CanopyDriver;
import org.apache.mahout.clustering.conversion.InputDriver;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.ClassUtils;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.commandline.DefaultOptionCreator;
import org.apache.mahout.common.distance.DistanceMeasure;
import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure;
import org.apache.mahout.utils.clustering.ClusterDumper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class Job extends AbstractJob {
private static final Logger log = LoggerFactory.getLogger(Job.class);
private static final String DIRECTORY_CONTAINING_CONVERTED_INPUT = "data";
private Job() {
}
public static void main(String[] args) throws Exception {
if (args.length > 0) {
log.info("Running with only user-supplied arguments");
ToolRunner.run(new Configuration(), new Job(), args);
} else {
log.info("Running with default arguments");
Path output = new Path("output");
Configuration conf = new Configuration();
HadoopUtil.delete(conf, output);
run(conf, new Path("testdata"), output,
new EuclideanDistanceMeasure(), 6, 0.5, 10);
}
}
@Override
public int run(String[] args) throws Exception {
System.out.println("run args");
addInputOption();
addOutputOption();
addOption(DefaultOptionCreator.distanceMeasureOption().create());
addOption(DefaultOptionCreator.numClustersOption().create());
addOption(DefaultOptionCreator.t1Option().create());
addOption(DefaultOptionCreator.t2Option().create());
addOption(DefaultOptionCreator.convergenceOption().create());
addOption(DefaultOptionCreator.maxIterationsOption().create());
addOption(DefaultOptionCreator.overwriteOption().create());
Map<String, String> argMap = parseArguments(args);
if (argMap == null) {
return -1;
}
Path input = getInputPath();
Path output = getOutputPath();
String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
if (measureClass == null) {
measureClass = SquaredEuclideanDistanceMeasure.class.getName();
}
double convergenceDelta = Double
.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
int maxIterations = Integer
.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
HadoopUtil.delete(getConf(), output);
}
DistanceMeasure measure = ClassUtils.instantiateAs(measureClass,
DistanceMeasure.class);
if (hasOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION)) {
int k = Integer
.parseInt(getOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION));
run(getConf(), input, output, measure, k, convergenceDelta,
maxIterations);
} else {
double t1 = Double
.parseDouble(getOption(DefaultOptionCreator.T1_OPTION));
double t2 = Double
.parseDouble(getOption(DefaultOptionCreator.T2_OPTION));
System.out.println("output: " + output);
System.out.println("t1: " + t1);
System.out.println("t2: " + t2);
run(getConf(), input, output, measure, t1, t2, convergenceDelta,
maxIterations);
}
return 0;
}
/**
* Run the kmeans clustering job on an input dataset using the given the
* number of clusters k and iteration parameters. All output data will be
* written to the output directory, which will be initially deleted if it
* exists. The clustered points will reside in the path
* <output>/clustered-points. By default, the job expects a file containing
* equal length space delimited data that resides in a directory named
* "testdata", and writes output to a directory named "output".
*
* @param conf
* the Configuration to use
* @param input
* the String denoting the input directory path
* @param output
* the String denoting the output directory path
* @param measure
* the DistanceMeasure to use
* @param k
* the number of clusters in Kmeans
* @param convergenceDelta
* the double convergence criteria for iterations
* @param maxIterations
* the int maximum number of iterations
*/
public static void run(Configuration conf, Path input, Path output,
DistanceMeasure measure, int k, double convergenceDelta,
int maxIterations) throws Exception {
Path directoryContainingConvertedInput = new Path(output,
DIRECTORY_CONTAINING_CONVERTED_INPUT);
log.info("Preparing Input");
InputDriver.runJob(input, directoryContainingConvertedInput,
"org.apache.mahout.math.RandomAccessSparseVector");
log.info("Running random seed to get initial clusters");
Path clusters = new Path(output, Cluster.INITIAL_CLUSTERS_DIR);
clusters = RandomSeedGenerator.buildRandom(conf,
directoryContainingConvertedInput, clusters, k, measure);
log.info("Running KMeans");
KMeansDriver.run(conf, directoryContainingConvertedInput, clusters,
output, measure, convergenceDelta, maxIterations, true, false);
// run ClusterDumper
ClusterDumper clusterDumper = new ClusterDumper(finalClusterPath(conf,
output, maxIterations), new Path(output, "clusteredPoints"));
clusterDumper.printClusters(null);
}
/**
* Run the kmeans clustering job on an input dataset using the given
* distance measure, t1, t2 and iteration parameters. All output data will
* be written to the output directory, which will be initially deleted if it
* exists. The clustered points will reside in the path
* <output>/clustered-points. By default, the job expects the a file
* containing synthetic_control.data as obtained from
* http://archive.ics.uci.
* edu/ml/datasets/Synthetic+Control+Chart+Time+Series resides in a
* directory named "testdata", and writes output to a directory named
* "output".
*
* @param conf
* the Configuration to use
* @param input
* the String denoting the input directory path
* @param output
* the String denoting the output directory path
* @param measure
* the DistanceMeasure to use
* @param t1
* the canopy T1 threshold
* @param t2
* the canopy T2 threshold
* @param convergenceDelta
* the double convergence criteria for iterations
* @param maxIterations
* the int maximum number of iterations
*/
public static void run(Configuration conf, Path input, Path output,
DistanceMeasure measure, double t1, double t2,
double convergenceDelta, int maxIterations) throws Exception {
System.out.println("run canopy output: " + output);
Path directoryContainingConvertedInput = new Path(output,
DIRECTORY_CONTAINING_CONVERTED_INPUT);
log.info("Preparing Input");
InputDriver.runJob(input, directoryContainingConvertedInput,
"org.apache.mahout.math.RandomAccessSparseVector");
log.info("Running Canopy to get initial clusters");
CanopyDriver.run(conf, directoryContainingConvertedInput, output,
measure, t1, t2, false, false);
log.info("Running KMeans");
System.out.println("kmeans cluster starting...");
KMeansDriver.run(conf, directoryContainingConvertedInput, new Path(
output, Cluster.INITIAL_CLUSTERS_DIR+"-final"), output, measure,
convergenceDelta, maxIterations, true, false);
// run ClusterDumper
ClusterDumper clusterDumper = new ClusterDumper(finalClusterPath(conf,
output, maxIterations), new Path(output, "clusteredPoints"));
clusterDumper.printClusters(null);
}
/**
* Return the path to the final iteration's clusters
*/
private static Path finalClusterPath(Configuration conf, Path output,
int maxIterations) throws IOException {
FileSystem fs = FileSystem.get(conf);
for (int i = maxIterations; i >= 0; i--) {
Path clusters = new Path(output, "clusters-" + i);
if (fs.exists(clusters)) {
return clusters;
}
}
return null;
}
}
发表评论
-
Mahout RandomForest Example使用步骤
2012-06-15 16:59 0处理数据集 #hadoop ja ... -
Mahout资源
2012-06-14 16:38 778Quickstart Creating Vect ... -
Mahout Creating Vectors from Weka's ARFF Format
2012-06-12 17:00 1533转自: https://cwiki.apache.org/ ... -
学习Mahout
2012-06-03 16:53 0http://www.cnblogs.com/vivounic ... -
Mahout clustering Canopy+K-means 源码分析
2012-06-03 16:10 4002聚类分析 聚类(Clustering) ... -
Mahout实现的机器学习算法
2012-06-01 20:37 2435使用命令:mahout -h 在Mahout实现 ... -
Mahout文件系统结构说明
2012-06-01 20:35 1371Mahout项目是由多个子项目组成的,各子项目分别位于源 ... -
Eclipse导入Mahout
2012-06-01 20:33 59751、环境配置 a)JDK,使用1.6版本。需要说明一 ...
相关推荐
mahout-core-0.9.jar+mahout-core-0.8.jar+mahout-core-0.1.jar
官方下载的mahout-distribution-0.9.tar.gz 因为下载速度实在太慢,所以分享出来,方便大家下载使用。mahout-distribution-0.9.tar.gz
mahout-integration-0.7mahout-integration-0.7mahout-integration-0.7mahout-integration-0.7
mahout-0.9-cdh5.5.0.tar.gz
mahout0.11版本,源码,可修改源码并自己编译,使用java语言编写,maven编译
mahout是用来做大数据推荐系统和机器学习使用的框架,这个工具包官网下载非常慢,下载了一夜终于下载到了,刚好够上传的
mahout-distribution-0.9-src.zip
mahout-core-0.9.jar,支持版本hadoop-2.2.x,由mahout-distribution-0.9.tar.gz源码构建生成jar包。
MAHOUT实战--中文高清版,分享是为了知识的传播。希望更多人学到有关技术能力,从而做更好的产品,为个性化需求提供更好的服务
mahout-math-0.8.jar mahout-math-0.8.jar
mahout-distribution-0.5-src.zip mahout 源码包
重新编译mahout-examples-0.9-job.jar,增加分类指标:最小最大精度、召回率。详情见http://blog.csdn.net/u012948976/article/details/50203249
mahout-examples-0.10.1-job.jar 已经包含分词程序,替换掉mahout默认的jar包
官方mahout-distribution-0.12.2-src.tar.gz
mahout-distribution-0.10.0-src.tar.gz
mahout实战 源码 mahout实战 配套 mahout-distribution-0.5.tar.gz 版本
maven_mahout_template-mahout-0.8
mahout中需要用到的一个版本jar包:mahout-core-0.3.jar
mahout-examples-0.11.1 mahout-examples-0.11.1-job mahout-h2o_2.10-0.11.1 mahout-h2o_2.10-0.11.1-dependency-reduced mahout-hdfs-0.11.1 mahout-integration-0.11.1 mahout-math-0.11.1 mahout-math-0.11.1 ...
apache-mahout-distribution-0.12.1.tar.gz 开源版本 .