`
fighting_2013
  • 浏览: 14725 次
  • 性别: Icon_minigender_1
社区版块
存档分类
最新评论

数据挖掘笔记-分类-决策树-4

阅读更多

之前写的代码都是单机上跑的,发现现在很流行hadoop,所以又试着用hadoop mapreduce来处理下决策树的创建。因为hadoop接触的也不多,所以写的不好,勿怪。

 

看了一些mahout在处理决策树和随机森林的过程,大体过程是Job只有一个Mapper处理,在map方法里面做数据的转换收集工作,然后在cleanup方法里面去做决策树的创建过程。然后将决策树序列化到HDFS上面,分类样本数据集的时候,在从HDFS上面取回决策树结构。大体来说,mahout决策树的构建过程好像并没有结合分布式计算,因为我也并没有仔仔细细的去研读mahout里面的源码,所以可能是我没发现。下面是我实现的一个简单hadoop版本决策树,用的C4.5算法,通过MapReduce去计算增益率。最后生成的决策树并未保存在HDFS上面,后面有时间在考虑下吧。下面是具体代码实现:

 

public class DecisionTreeC45Job extends AbstractJob {
	
	/** 对数据集做准备工作,主要就是将填充好默认值的数据集再次传到HDFS上*/
	public String prepare(Data trainData) {
		String path = FileUtils.obtainRandomTxtPath();
		DataHandler.writeData(path, trainData);
		System.out.println(path);
		String name = path.substring(path.lastIndexOf(File.separator) + 1);
		String hdfsPath = HDFSUtils.HDFS_TEMP_INPUT_URL + name;
		HDFSUtils.copyFromLocalFile(conf, path, hdfsPath);
		return hdfsPath;
	}
	
	/** 选择最佳属性,读取MapReduce计算后产生的文件,取增益率最大*/
	public AttributeGainWritable chooseBestAttribute(String output) {
		AttributeGainWritable maxAttribute = null;
		Path path = new Path(output);
		try {
			FileSystem fs = path.getFileSystem(conf);
			Path[] paths = HDFSUtils.getPathFiles(fs, path);
			ShowUtils.print(paths);
			double maxGainRatio = 0.0;
			SequenceFile.Reader reader = null;
			for (Path p : paths) {
				reader = new SequenceFile.Reader(fs, p, conf);
				Text key = (Text) ReflectionUtils.newInstance(
						reader.getKeyClass(), conf);
				AttributeGainWritable value = new AttributeGainWritable();
				while (reader.next(key, value)) {
					double gainRatio = value.getGainRatio();
					if (gainRatio >= maxGainRatio) {
						maxGainRatio = gainRatio;
						maxAttribute = value;
					}
					value = new AttributeGainWritable();
				}
				IOUtils.closeQuietly(reader);
			}
			System.out.println("output: " + path.toString());
			HDFSUtils.delete(conf, path);
			System.out.println("hdfs delete file : " + path.toString());
		} catch (IOException e) {
			e.printStackTrace();
		}
		return maxAttribute;
	}
	
	/** 构造决策树 */
	public Object build(String input, Data data) {
		Object preHandleResult = preHandle(data);
		if (null != preHandleResult) return preHandleResult;
		String output = HDFSUtils.HDFS_TEMP_OUTPUT_URL;
		HDFSUtils.delete(conf, new Path(output));
		System.out.println("delete output path : " + output);
		String[] paths = new String[]{input, output};
		//通过MapReduce计算增益率
		CalculateC45GainRatioMR.main(paths);
		
		AttributeGainWritable bestAttr = chooseBestAttribute(output);
		String attribute = bestAttr.getAttribute();
		System.out.println("best attribute: " + attribute);
		System.out.println("isCategory: " + bestAttr.isCategory());
		if (bestAttr.isCategory()) {
			return attribute;
		}
		String[] splitPoints = bestAttr.obtainSplitPoints();
		System.out.print("splitPoints: ");
		ShowUtils.print(splitPoints);
		TreeNode treeNode = new TreeNode(attribute);
		String[] attributes = data.getAttributesExcept(attribute);
		
		//分割数据集,并将分割后的数据集传到HDFS上
		DataSplit dataSplit = DataHandler.split(new Data(
				data.getInstances(), attribute, splitPoints));
		for (DataSplitItem item : dataSplit.getItems()) {
			String path = item.getPath();
			String name = path.substring(path.lastIndexOf(File.separator) + 1);
			String hdfsPath = HDFSUtils.HDFS_TEMP_INPUT_URL + name;
			HDFSUtils.copyFromLocalFile(conf, path, hdfsPath);
			treeNode.setChild(item.getSplitPoint(), build(hdfsPath, 
					new Data(attributes, item.getInstances())));
		}
		return treeNode;
	}
	
	/** 分类,根据决策树节点判断测试样本集的类型,并将结果上传到HDFS上*/
	private void classify(TreeNode treeNode, String trainSet, String testSet, String output) {
		OutputStream out = null;
		BufferedWriter writer = null;
		try {
			Path trainSetPath = new Path(trainSet);
			FileSystem trainFS = trainSetPath.getFileSystem(conf);
			Path[] trainHdfsPaths = HDFSUtils.getPathFiles(trainFS, trainSetPath);
			FSDataInputStream trainFSInputStream = trainFS.open(trainHdfsPaths[0]);
			Data trainData = DataLoader.load(trainFSInputStream, true);
			
			Path testSetPath = new Path(testSet);
			FileSystem testFS = testSetPath.getFileSystem(conf);
			Path[] testHdfsPaths = HDFSUtils.getPathFiles(testFS, testSetPath);
			FSDataInputStream fsInputStream = testFS.open(testHdfsPaths[0]);
			Data testData = DataLoader.load(fsInputStream, true);
			
			DataHandler.fill(testData.getInstances(), trainData.getAttributes(), 0);
			Object[] results = (Object[]) treeNode.classify(testData);
			ShowUtils.print(results);
			DataError dataError = new DataError(testData.getCategories(), results);
			dataError.report();
			String path = FileUtils.obtainRandomTxtPath();
			out = new FileOutputStream(new File(path));
			writer = new BufferedWriter(new OutputStreamWriter(out));
			StringBuilder sb = null;
			for (int i = 0, len = results.length; i < len; i++) {
				sb = new StringBuilder();
				sb.append(i+1).append("\t").append(results[i]);
				writer.write(sb.toString());
				writer.newLine();
			}
			writer.flush();
			Path outputPath = new Path(output);
			FileSystem fs = outputPath.getFileSystem(conf);
			if (!fs.exists(outputPath)) {
				fs.mkdirs(outputPath);
			}
			String name = path.substring(path.lastIndexOf(File.separator) + 1);
			HDFSUtils.copyFromLocalFile(conf, path, output + 
					File.separator + name);
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			IOUtils.closeQuietly(out);
			IOUtils.closeQuietly(writer);
		}
	}
	
	public void run(String[] args) {
		try {
			if (null == conf) conf = new Configuration();
			String[] inputArgs = new GenericOptionsParser(
					conf, args).getRemainingArgs();
			if (inputArgs.length != 3) {
				System.out.println("error, please input three path.");
				System.out.println("1. trainset path.");
				System.out.println("2. testset path.");
				System.out.println("3. result output path.");
				System.exit(2);
			}
			Path input = new Path(inputArgs[0]);
			FileSystem fs = input.getFileSystem(conf);
			Path[] hdfsPaths = HDFSUtils.getPathFiles(fs, input);
			FSDataInputStream fsInputStream = fs.open(hdfsPaths[0]);
			Data trainData = DataLoader.load(fsInputStream, true);
			/** 填充缺失属性的默认值*/
			DataHandler.fill(trainData, 0);
			String hdfsInput = prepare(trainData);
			TreeNode treeNode = (TreeNode) build(hdfsInput, trainData);
			TreeNodeHelper.print(treeNode, 0, null);
			classify(treeNode, inputArgs[0], inputArgs[1], inputArgs[2]);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) {
		DecisionTreeC45Job job = new DecisionTreeC45Job();
		long startTime = System.currentTimeMillis();
		job.run(args);
		long endTime = System.currentTimeMillis();
		System.out.println("spend time: " + (endTime - startTime));
	}

}

CalculateC45GainRatioMR具体实现:

public class CalculateC45GainRatioMR {
	
	private static void configureJob(Job job) {
		job.setJarByClass(CalculateC45GainRatioMR.class);
		
		job.setMapperClass(CalculateC45GainRatioMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(AttributeWritable.class);

		job.setReducerClass(CalculateC45GainRatioReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(AttributeGainWritable.class);
		
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(SequenceFileOutputFormat.class);
	}

	public static void main(String[] args) {
		Configuration configuration = new Configuration();
		try {
			String[] inputArgs = new GenericOptionsParser(
						configuration, args).getRemainingArgs();
			if (inputArgs.length != 2) {
				System.out.println("error, please input two path. input and output");
				System.exit(2);
			}
			Job job = new Job(configuration, "Decision Tree");
			
			FileInputFormat.setInputPaths(job, new Path(inputArgs[0]));
			FileOutputFormat.setOutputPath(job, new Path(inputArgs[1]));
			
			configureJob(job);
			
			System.out.println(job.waitForCompletion(true) ? 0 : 1);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

class CalculateC45GainRatioMapper extends Mapper<LongWritable, Text, 
	Text, AttributeWritable> {
	
	@Override
	protected void setup(Context context) throws IOException,
			InterruptedException {
		super.setup(context);
	}

	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		StringTokenizer tokenizer = new StringTokenizer(line);
		Long id = Long.parseLong(tokenizer.nextToken());
		String category = tokenizer.nextToken();
		boolean isCategory = true;
		while (tokenizer.hasMoreTokens()) {
			isCategory = false;
			String attribute = tokenizer.nextToken();
			String[] entry = attribute.split(":");
			context.write(new Text(entry[0]), new AttributeWritable(id, category, entry[1]));
		}
		if (isCategory) {
			context.write(new Text(category), new AttributeWritable(id, category, category));
		}
	}
	
	@Override
	protected void cleanup(Context context) throws IOException, InterruptedException {
		super.cleanup(context);
	}
}

class CalculateC45GainRatioReducer extends Reducer<Text, AttributeWritable, Text, AttributeGainWritable> {
	
	@Override
	protected void setup(Context context) throws IOException, InterruptedException {
		super.setup(context);
	}
	
	@Override
	protected void reduce(Text key, Iterable<AttributeWritable> values,
			Context context) throws IOException, InterruptedException {
		String attributeName = key.toString();
		double totalNum = 0.0;
		Map<String, Map<String, Integer>> attrValueSplits = 
				new HashMap<String, Map<String, Integer>>();
		Iterator<AttributeWritable> iterator = values.iterator();
		boolean isCategory = false;
		while (iterator.hasNext()) {
			AttributeWritable attribute = iterator.next();
			String attributeValue = attribute.getAttributeValue();
			if (attributeName.equals(attributeValue)) {
				isCategory = true;
				break;
			}
			Map<String, Integer> attrValueSplit = attrValueSplits.get(attributeValue);
			if (null == attrValueSplit) {
				attrValueSplit = new HashMap<String, Integer>();
				attrValueSplits.put(attributeValue, attrValueSplit);
			}
			String category = attribute.getCategory();
			Integer categoryNum = attrValueSplit.get(category);
			attrValueSplit.put(category, null == categoryNum ? 1 : categoryNum + 1);
			totalNum++;
		}
		if (isCategory) {
			System.out.println("is Category");
			int sum = 0;
			iterator = values.iterator();
			while (iterator.hasNext()) {
				iterator.next();
				sum += 1;
			}
			System.out.println("sum: " + sum);
			context.write(key, new AttributeGainWritable(attributeName,
					sum, true, null));
		} else {
			double gainInfo = 0.0;
			double splitInfo = 0.0;
			for (Map<String, Integer> attrValueSplit : attrValueSplits.values()) {
				double totalCategoryNum = 0;
				for (Integer categoryNum : attrValueSplit.values()) {
					totalCategoryNum += categoryNum;
				}
				double entropy = 0.0;
				for (Integer categoryNum : attrValueSplit.values()) {
					double p = categoryNum / totalCategoryNum;
					entropy -= p * (Math.log(p) / Math.log(2));
				}
				double dj = totalCategoryNum / totalNum;
				gainInfo += dj * entropy;
				splitInfo -= dj * (Math.log(dj) / Math.log(2));
			}
			double gainRatio = splitInfo == 0.0 ? 0.0 : gainInfo / splitInfo;
			StringBuilder splitPoints = new StringBuilder();
			for (String attrValue : attrValueSplits.keySet()) {
				splitPoints.append(attrValue).append(",");
			}
			splitPoints.deleteCharAt(splitPoints.length() - 1);
			System.out.println("attribute: " + attributeName);
			System.out.println("gainRatio: " + gainRatio);
			System.out.println("splitPoints: " + splitPoints.toString());
			context.write(key, new AttributeGainWritable(attributeName,
					gainRatio, false, splitPoints.toString()));
		}
	}
	
	@Override
	protected void cleanup(Context context) throws IOException, InterruptedException {
		super.cleanup(context);
	}
	
}

 

 

分享到:
评论

相关推荐

    基于C4.5决策树的大学生笔记本电脑购买行为的数据挖掘.pdf

    基于C4.5决策树的大学生笔记本电脑购买行为的数据挖掘.pdf

    数据挖掘十大算法详解.zip

    数据挖掘十大算法详解,数据挖掘学习笔记--决策树C4.5 、数据挖掘十大算法--K-均值聚类算法 、机器学习与数据挖掘-支持向量机(SVM)、拉格朗日对偶、支持向量机(SVM)(三)-- 最优间隔分类器 (optimal margin ...

    决策树DTC数据分析及鸢尾数据集分析.doc

    决策树算法根据数据的属性采用树状结构建立决策模型, 决策树模型常用来解决分类和回归问题。常见的算法包括:分类及回归树(Classifica tion And Regression Tree, CART), ID3 (Iterative Dichotomiser 3), C...

    20200401零基础入门数据挖掘 – 二手车交易价格预测笔记(4)

    决策树; 4.4 模型对比: 常用线性模型; 常用非线性模型; 4.5 模型调参: 贪心调参方法; 网格调参方法; 贝叶斯调参方法; 下面节选一些我学习比较多的地方进行记录: 4.1.1 线性回归 建立线性模型 from sklearn....

    机器学习&深度学习资料笔记&基本算法实现&资源整理.zip

    机器学习&深度学习资料笔记&基本算法实现&资源整理.zip 0.不调库系列 No free lunch. 线性回归 - logistic回归 - 感知机 - SVM(SMO) - ...4.数据挖掘 笔记:特征工程 常见数学、机器学习知识点整理&gt;&gt; 5.其他 资源索引

    数据挖掘学习笔记(三)

    典型方法:决策树、朴素贝叶斯分类、支持向量机、神经网络、规则分类器、基于模式的分类、逻辑回归… 3.聚类分析 聚类就是把一些对象划分为多个组或者“聚簇”,从而使得同组内对象间比较相似而不同组对象间的差异较

    机器学习课程笔记完整版

    作为人工智能领域(数据挖掘/机器学习方向)的提升课程,掌握更深更有效的解决问题技能 目标 应用Scikit-learn实现数据集的特征工程 掌握机器学习常见算法原理 应用Scikit-learn实现机器学习算法的应用,结合...

    案例系列:泰坦尼克号-预测幸存者-TensorFlow决策森林.ipynb jupyter 代码示例

    TensorFlow决策森林在表格数据上表现较好。本笔记将带您完成使用TensorFlow决策森林训练基线梯度提升树模型并在泰坦尼克号竞赛中提交的步骤。

    java笔试题算法-rapaio:统计、数据挖掘和机器学习工具箱

    已实现算法和功能的不完整列表包括:核心统计工具、常见分布和假设检验、朴素贝叶斯、二元逻辑回归、决策树(回归和分类)、随机森林(回归和分类)、AdaBoost、梯度提升树(回归)和分类)、BinarySMO SVM、相关...

    MachineLearningNote

    Python机器学习笔记:使用sklearn做特征工程和数据挖掘 地址: Python机器学习笔记:Grid SearchCV(网格搜索) 地址: 1,logistic Regression 关于逻辑回归文件夹中的数据和代码,详情请参考博客: Python机器学习...

    财经数据分析(第一周笔记整理)

    数据挖掘任务主要分为两种:描述性任务和预测性任务 1.描述性任务 定义:描述性任务将发掘数据中潜在的规律,找出目前可以理解与描述数据集中数据之间的联系,并刻画数据集中数据的一般特性。 描述性任务一般采用的...

    大数据预处理技术.pdf

    对于正常的数据分布⽽⾔可以使⽤均值,⽽倾斜数据分布应使⽤中位数 最可能的值填充:使⽤回归、基于推理的⼯具或者决策树归纳确定。 2.噪声数据与离群点: 噪声:被测量的变量的随机误差或者⽅差(⼀般指错误的数据...

    java版飞机大战源码-open-src:开源项目资料整理

    本课程提供了一个广泛的介绍机器学习、数据挖掘、统计模式识别的课程 莫烦大神 机器学习相关教程 B站: 个人页: AiLearning: 机器学习 AiLearning: 机器学习 - MachineLearning - ML、深度学习 - DeepLearning - DL...

Global site tag (gtag.js) - Google Analytics