`
fighting_2013
  • 浏览: 14732 次
  • 性别: Icon_minigender_1
社区版块
存档分类
最新评论

数据挖掘笔记-分类-决策树-5

阅读更多

上篇文章里面虽然结合hadoop用到mapreduce去计算属性的增益率,但是发现整个程序似乎也并没有做到并行化处理。后面又看了一些网上的资料,自己又想了想,然后又重新实现了一下决策树,大体思路如下:

1、将一个大数据集文件拆分成N个小数据集文件,对数据做好预处理工作,上传到HDFS

2、计算HDFS上小数据集文件的最佳分割属性与分割点

3、汇总N个小数据集文件的最佳划分,投票选出最佳划分

4、N个小数据集的节点根据最终的最佳划分,分割自己节点上的数据,上传到HDFS,跳转到第二步

 

下面是具体的实现代码:

public class DecisionTreeSprintBJob extends AbstractJob {
	
	private Map<String, Map<Object, Integer>> attributeValueStatistics = null;
	
	private Map<String, Set<String>> attributeNameToValues = null;
	
	private Set<String> allAttributes = null;
	
	/** 数据拆分,大数据文件拆分为小数据文件,便于分配到各个节点开启Job*/
	private List<String> split(String input, String splitNum) {
		String output = HDFSUtils.HDFS_TEMP_INPUT_URL + IdentityUtils.generateUUID();
		String[] args = new String[]{input, output, splitNum};
		DataFileSplitMR.main(args);
		List<String> inputs = new ArrayList<String>();
		Path outputPath = new Path(output);
		try {
			FileSystem fs = outputPath.getFileSystem(conf);
			Path[] paths = HDFSUtils.getPathFiles(fs, outputPath);
			for(Path path : paths) {
				System.out.println("split input path: " + path);
				InputStream in = fs.open(path);
				BufferedReader reader = new BufferedReader(new InputStreamReader(in));
				String line = reader.readLine();
				while (null != line && !"".equals(line)) {
					inputs.add(line);
					line = reader.readLine();
				}
				IOUtils.closeQuietly(in);
				IOUtils.closeQuietly(reader);
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
		System.out.println("inputs size: " + inputs.size());
		return inputs;
	}
	
	/** 初始化工作,主要是获取特征属性集以及属性值的统计,主要是为了填充默认值*/
	private void initialize(String input) {
		System.out.println("initialize start.");
		allAttributes = new HashSet<String>();
		attributeNameToValues = new HashMap<String, Set<String>>();
		attributeValueStatistics = new HashMap<String, Map<Object, Integer>>();
		String output = HDFSUtils.HDFS_TEMP_INPUT_URL + IdentityUtils.generateUUID();
		String[] args = new String[]{input, output};
		AttributeStatisticsMR.main(args);
		Path outputPath = new Path(output);
		SequenceFile.Reader reader = null;
		try {
			FileSystem fs = outputPath.getFileSystem(conf);
			Path[] paths = HDFSUtils.getPathFiles(fs, outputPath);
			for(Path path : paths) {
				reader = new SequenceFile.Reader(fs, path, conf);
				AttributeKVWritable key = (AttributeKVWritable) 
						ReflectionUtils.newInstance(reader.getKeyClass(), conf);
				IntWritable value = new IntWritable();
				while (reader.next(key, value)) {
					String attributeName = key.getAttributeName();
					allAttributes.add(attributeName);
					Set<String> values = attributeNameToValues.get(attributeName);
					if (null == values) {
						values = new HashSet<String>();
						attributeNameToValues.put(attributeName, values);
					}
					String attributeValue = key.getAttributeValue();
					values.add(attributeValue);
					Map<Object, Integer> valueStatistics = 
							attributeValueStatistics.get(attributeName);
					if (null == valueStatistics) {
						valueStatistics = new HashMap<Object, Integer>();
						attributeValueStatistics.put(attributeName, valueStatistics);
					}
					valueStatistics.put(attributeValue, value.get());
					value = new IntWritable();
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			IOUtils.closeQuietly(reader);
		}
		System.out.println("initialize end.");
	}
	
	/** 预处理,主要是将分割后的小文件填充好默认值后在上传到HDFS上面*/
	private List<String> preHandle(List<String> inputs) throws IOException {
		List<String> fillInputs = new ArrayList<String>();
		for (String input : inputs) {
			Data data =null;
			try {
				Path inputPath = new Path(input);
				FileSystem fs = inputPath.getFileSystem(conf);
				FSDataInputStream fsInputStream = fs.open(inputPath);
				data = DataLoader.load(fsInputStream, true);
			} catch (IOException e) {
				e.printStackTrace();
			}
			DataHandler.computeFill(data.getInstances(), 
					allAttributes.toArray(new String[0]), 
					attributeValueStatistics, 1.0);
			OutputStream out = null;
			BufferedWriter writer = null;
			String outputDir = HDFSUtils.HDFS_TEMP_INPUT_URL + IdentityUtils.generateUUID();
			fillInputs.add(outputDir);
			String output = outputDir + File.separator + IdentityUtils.generateUUID();
			try {
				Path outputPath = new Path(output);
				FileSystem fs = outputPath.getFileSystem(conf);
				out = fs.create(outputPath);
				writer = new BufferedWriter(new OutputStreamWriter(out));
				StringBuilder sb = null;
				for (Instance instance : data.getInstances()) {
					sb = new StringBuilder();
					sb.append(instance.getId()).append("\t");
					sb.append(instance.getCategory()).append("\t");
					Map<String, Object> attrs = instance.getAttributes();
					for (Map.Entry<String, Object> entry : attrs.entrySet()) {
						sb.append(entry.getKey()).append(":");
						sb.append(entry.getValue()).append("\t");
					}
					writer.write(sb.toString());
					writer.newLine();
				}
				writer.flush();
			} catch (Exception e) {
				e.printStackTrace();
			} finally {
				IOUtils.closeQuietly(out);
				IOUtils.closeQuietly(writer);
			}
		}
		return fillInputs;
	}
	
	/** 创建JOB*/
	private Job createJob(String jobName, String input, String output) {
		Configuration conf = new Configuration();
		conf.set("mapred.job.queue.name", "q_hudong");
		Job job = null;
		try {
			job = new Job(conf, jobName);
			
			FileInputFormat.addInputPath(job, new Path(input));
			FileOutputFormat.setOutputPath(job, new Path(output));
			
			job.setJarByClass(DecisionTreeSprintBJob.class);
			
			job.setMapperClass(CalculateGiniMapper.class);
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(AttributeWritable.class);
			
			job.setReducerClass(CalculateGiniReducer.class);
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(AttributeGiniWritable.class);
			
			job.setInputFormatClass(TextInputFormat.class);
			job.setOutputFormatClass(SequenceFileOutputFormat.class);
		} catch (IOException e) {
			e.printStackTrace();
		}
		return job;
	}
	
	/** 根据HDFS上的输出路径选择最佳属性*/
	private AttributeGiniWritable chooseBestAttribute(String... outputs) {
		AttributeGiniWritable minSplitAttribute = null;
		double minSplitPointGini = 1.0;
		try {
			for (String output : outputs) {
				System.out.println("choose output: " + output);
				Path outputPath = new Path(output);
				FileSystem fs = outputPath.getFileSystem(conf);
				Path[] paths = HDFSUtils.getPathFiles(fs, outputPath);
				ShowUtils.print(paths);
				SequenceFile.Reader reader = null;
				for (Path path : paths) {
					reader = new SequenceFile.Reader(fs, path, conf);
					Text key = (Text) ReflectionUtils.newInstance(
							reader.getKeyClass(), conf);
					AttributeGiniWritable value = new AttributeGiniWritable();
					while (reader.next(key, value)) {
						double gini = value.getGini();
						System.out.println(value.getAttribute() + " : " + gini);
						if (gini <= minSplitPointGini) {
							minSplitPointGini = gini;
							minSplitAttribute = value;
						}
						value = new AttributeGiniWritable();
					}
					IOUtils.closeQuietly(reader);
				}
				System.out.println("delete hdfs file start: " + outputPath.toString());
				HDFSUtils.delete(conf, outputPath);
				System.out.println("delete hdfs file end: " + outputPath.toString());
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
		if (null == minSplitAttribute) {
			System.out.println("minSplitAttribute is null");
		}
		return minSplitAttribute;
	}
	
	private Data obtainData(String input) {
		Data data = null;
		Path inputPath = new Path(input);
		try {
			FileSystem fs = inputPath.getFileSystem(conf);
			Path[] hdfsPaths = HDFSUtils.getPathFiles(fs, inputPath);
			FSDataInputStream fsInputStream = fs.open(hdfsPaths[0]);
			data = DataLoader.load(fsInputStream, true);
		} catch (IOException e) {
			e.printStackTrace();
		}
		return data;
	}
	
	/** 构建决策树*/
	private Object build(List<String> inputs) throws IOException {
		List<String> outputs = new ArrayList<String>();
		JobControl jobControl = new JobControl("CalculateGini");
		for (String input : inputs) {
			System.out.println("split path: " + input);
			String output = HDFSUtils.HDFS_TEMP_OUTPUT_URL + IdentityUtils.generateUUID();
			outputs.add(output);
			Configuration conf = new Configuration();
			ControlledJob controlledJob = new ControlledJob(conf);
			controlledJob.setJob(createJob(input, input, output));
			jobControl.addJob(controlledJob);
		}
		Thread jcThread = new Thread(jobControl);  
        jcThread.start();  
        while(true){  
            if(jobControl.allFinished()){  
//                System.out.println(jobControl.getSuccessfulJobList());  
                jobControl.stop();  
                AttributeGiniWritable bestAttr = chooseBestAttribute(
                		outputs.toArray(new String[0]));
                String attribute = bestAttr.getAttribute();
        		System.out.println("best attribute: " + attribute);
        		System.out.println("isCategory: " + bestAttr.isCategory());
        		if (bestAttr.isCategory()) {
        			return attribute;
        		}
        		TreeNode treeNode = new TreeNode(attribute);
        		Map<String, List<String>> splitToInputs = 
        				new HashMap<String, List<String>>();
        		for (String input : inputs) {
        			Data data = obtainData(input);
        			String splitPoint = bestAttr.getSplitPoint();
//        			Map<String, Set<String>> attrName2Values = 
//        					DataHandler.attributeValueStatistics(data.getInstances());
        			Set<String> attributeValues = attributeNameToValues.get(attribute);
        			System.out.println("attributeValues:");
        			ShowUtils.print(attributeValues);
        			if (attributeNameToValues.size() == 0 || null == attributeValues) {
        				continue;
        			}
        			attributeValues.remove(splitPoint);
        			StringBuilder sb = new StringBuilder();
        			for (String attributeValue : attributeValues) {
        				sb.append(attributeValue).append(",");
        			}
        			if (sb.length() > 0) sb.deleteCharAt(sb.length() - 1);
        			String[] names = new String[]{splitPoint, sb.toString()};
        			DataSplit dataSplit = DataHandler.split(new Data(
        					data.getInstances(), attribute, names));
        			for (DataSplitItem item : dataSplit.getItems()) {
        				if (item.getInstances().size() == 0) continue;
        				String path = item.getPath();
        				String name = path.substring(path.lastIndexOf(File.separator) + 1);
        				String hdfsPath = HDFSUtils.HDFS_TEMP_INPUT_URL + name;
        				HDFSUtils.copyFromLocalFile(conf, path, hdfsPath);
        				String split = item.getSplitPoint();
        				List<String> nextInputs = splitToInputs.get(split);
        				if (null == nextInputs) {
        					nextInputs = new ArrayList<String>();
        					splitToInputs.put(split, nextInputs);
        				}
        				nextInputs.add(hdfsPath);
        			}
        		}
        		for (Map.Entry<String, List<String>> entry : 
        			splitToInputs.entrySet()) {
        			treeNode.setChild(entry.getKey(), build(entry.getValue()));
        		}
        		return treeNode;
            }  
            if(jobControl.getFailedJobList().size() > 0){  
//                System.out.println(jobControl.getFailedJobList());  
                jobControl.stop();  
            }  
        }  
	}
	
	/** 分类样本集*/
	private void classify(TreeNode treeNode, String testSet, String output) {
		OutputStream out = null;
		BufferedWriter writer = null;
		try {
			Path testSetPath = new Path(testSet);
			FileSystem testFS = testSetPath.getFileSystem(conf);
			Path[] testHdfsPaths = HDFSUtils.getPathFiles(testFS, testSetPath);
			FSDataInputStream fsInputStream = testFS.open(testHdfsPaths[0]);
			Data testData = DataLoader.load(fsInputStream, true);
			
			DataHandler.computeFill(testData.getInstances(), 
					allAttributes.toArray(new String[0]), 
					attributeValueStatistics, 1.0);
			Object[] results = (Object[]) treeNode.classifySprint(testData);
			ShowUtils.print(results);
			DataError dataError = new DataError(testData.getCategories(), results);
			dataError.report();
			String path = FileUtils.obtainRandomTxtPath();
			out = new FileOutputStream(new File(path));
			writer = new BufferedWriter(new OutputStreamWriter(out));
			StringBuilder sb = null;
			for (int i = 0, len = results.length; i < len; i++) {
				sb = new StringBuilder();
				sb.append(i+1).append("\t").append(results[i]);
				writer.write(sb.toString());
				writer.newLine();
			}
			writer.flush();
			Path outputPath = new Path(output);
			FileSystem fs = outputPath.getFileSystem(conf);
			if (!fs.exists(outputPath)) {
				fs.mkdirs(outputPath);
			}
			String name = path.substring(path.lastIndexOf(File.separator) + 1);
			HDFSUtils.copyFromLocalFile(conf, path, output + 
					File.separator + name);
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			IOUtils.closeQuietly(out);
			IOUtils.closeQuietly(writer);
		}
	}
	
	public void run(String[] args) {
		try {
			if (null == conf) conf = new Configuration();
			String[] inputArgs = new GenericOptionsParser(
					conf, args).getRemainingArgs();
			if (inputArgs.length != 4) {
				System.out.println("error, please input three path.");
				System.out.println("1. trainset path.");
				System.out.println("2. testset path.");
				System.out.println("3. result output path.");
				System.out.println("4. data split number.");
				System.exit(2);
			}
			List<String> splitInputs = split(inputArgs[0], inputArgs[3]);
			initialize(inputArgs[0]);
			List<String> inputs = preHandle(splitInputs);
			TreeNode treeNode = (TreeNode) build(inputs);
			TreeNodeHelper.print(treeNode, 0, null);
			classify(treeNode, inputArgs[1], inputArgs[2]);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) {
		DecisionTreeSprintBJob job = new DecisionTreeSprintBJob();
		long startTime = System.currentTimeMillis();
		job.run(args);
		long endTime = System.currentTimeMillis();
		System.out.println("spend time: " + (endTime - startTime));
	}

}

 

 

 

分享到:
评论

相关推荐

    基于C4.5决策树的大学生笔记本电脑购买行为的数据挖掘.pdf

    基于C4.5决策树的大学生笔记本电脑购买行为的数据挖掘.pdf

    数据挖掘十大算法详解.zip

    数据挖掘十大算法详解,数据挖掘学习笔记--决策树C4.5 、数据挖掘十大算法--K-均值聚类算法 、机器学习与数据挖掘-支持向量机(SVM)、拉格朗日对偶、支持向量机(SVM)(三)-- 最优间隔分类器 (optimal margin ...

    决策树DTC数据分析及鸢尾数据集分析.doc

    决策树算法根据数据的属性采用树状结构建立决策模型, 决策树模型常用来解决分类和回归问题。常见的算法包括:分类及回归树(Classifica tion And Regression Tree, CART), ID3 (Iterative Dichotomiser 3), C...

    20200401零基础入门数据挖掘 – 二手车交易价格预测笔记(4)

    决策树; 4.4 模型对比: 常用线性模型; 常用非线性模型; 4.5 模型调参: 贪心调参方法; 网格调参方法; 贝叶斯调参方法; 下面节选一些我学习比较多的地方进行记录: 4.1.1 线性回归 建立线性模型 from sklearn....

    机器学习&深度学习资料笔记&基本算法实现&资源整理.zip

    决策树 - Adaboost kNN - 朴素贝叶斯 EM - HMM - 条件随机场 kMeans - PCA ROC曲线&AUC值 Stacking(demo) 计算IOU 参考:《机器学习》周志华 《统计学习方法》李航 1.机器学习&深度学习 工具 | 书籍 | 课程 | ...

    数据挖掘学习笔记(三)

    典型方法:决策树、朴素贝叶斯分类、支持向量机、神经网络、规则分类器、基于模式的分类、逻辑回归… 3.聚类分析 聚类就是把一些对象划分为多个组或者“聚簇”,从而使得同组内对象间比较相似而不同组对象间的差异较

    机器学习课程笔记完整版

    作为人工智能领域(数据挖掘/机器学习方向)的提升课程,掌握更深更有效的解决问题技能 目标 应用Scikit-learn实现数据集的特征工程 掌握机器学习常见算法原理 应用Scikit-learn实现机器学习算法的应用,结合...

    案例系列:泰坦尼克号-预测幸存者-TensorFlow决策森林.ipynb jupyter 代码示例

    TensorFlow决策森林在表格数据上表现较好。本笔记将带您完成使用TensorFlow决策森林训练基线梯度提升树模型并在泰坦尼克号竞赛中提交的步骤。

    java笔试题算法-rapaio:统计、数据挖掘和机器学习工具箱

    已实现算法和功能的不完整列表包括:核心统计工具、常见分布和假设检验、朴素贝叶斯、二元逻辑回归、决策树(回归和分类)、随机森林(回归和分类)、AdaBoost、梯度提升树(回归)和分类)、BinarySMO SVM、相关...

    MachineLearningNote

    Python机器学习笔记:使用sklearn做特征工程和数据挖掘 地址: Python机器学习笔记:Grid SearchCV(网格搜索) 地址: 1,logistic Regression 关于逻辑回归文件夹中的数据和代码,详情请参考博客: Python机器学习...

    财经数据分析(第一周笔记整理)

    数据挖掘任务主要分为两种:描述性任务和预测性任务 1.描述性任务 定义:描述性任务将发掘数据中潜在的规律,找出目前可以理解与描述数据集中数据之间的联系,并刻画数据集中数据的一般特性。 描述性任务一般采用的...

    大数据预处理技术.pdf

    对于正常的数据分布⽽⾔可以使⽤均值,⽽倾斜数据分布应使⽤中位数 最可能的值填充:使⽤回归、基于推理的⼯具或者决策树归纳确定。 2.噪声数据与离群点: 噪声:被测量的变量的随机误差或者⽅差(⼀般指错误的数据...

    java版飞机大战源码-open-src:开源项目资料整理

    本课程提供了一个广泛的介绍机器学习、数据挖掘、统计模式识别的课程 莫烦大神 机器学习相关教程 B站: 个人页: AiLearning: 机器学习 AiLearning: 机器学习 - MachineLearning - ML、深度学习 - DeepLearning - DL...

Global site tag (gtag.js) - Google Analytics