前面是单机版的实现,现在通过MapReduce来实现FPGrowth算法,主要用了两个MR,具体过程如下:
1、第一个MR扫描所有数据集统计数据集中的频繁一项集,即每个项的出现次数。
2、读取第一个MR产生的文件,对频繁一项集排序,然后上传到HDFS上。
3、第二个MR扫描所有数据集,并根据第二步产生的排序好的频繁一项集来得出频繁项集。
第二个MR的Map阶段过程:首先根据排好序的频繁一项集将事务数据排好序,然后遍历排好序的事务数据,以频繁项为键,事务数据为值传递给Reduce阶段。
第二个MR的Reduce阶段过程:Reduce节点接收到从Map节点过来的数据,遍历这个频繁项对应的事务数据集,将它们构建起该频繁项的条件FP树。从条件FP树进而得到包含本频繁项的频繁项集。
FPGrowth算法MapReduce简单实现:
public class FPGrowthJob { private Configuration conf = null; //频繁一项集生成 public String frequency_1_itemset_gen(String input, String minSupport) { String output = HDFSUtils.HDFS_TEMP_INPUT_URL + IdentityUtils.generateUUID(); String[] inputArgs = new String[]{input, output, minSupport}; Frequency1ItemSetMR.main(inputArgs); return output; } //频繁一项集排序 public String frequency_1_itemset_sort(String input) { Map<String, Integer> map = new HashMap<String, Integer>(); SequenceFile.Reader reader = null; try { Path dirPath = new Path(input); Path[] paths = HDFSUtils.getPathFiles(conf, dirPath); FileSystem fs = FileSystem.get(conf); reader = new SequenceFile.Reader(fs, paths[0], conf); Text key = (Text) ReflectionUtils.newInstance( reader.getKeyClass(), conf); IntWritable value = new IntWritable(); while (reader.next(key, value)) { map.put(key.toString(), value.get()); key = new Text(); value = new IntWritable(); } } catch (IOException e) { e.printStackTrace(); } finally { IOUtils.closeQuietly(reader); } List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(); for (Map.Entry<String, Integer> entry : map.entrySet()) { entries.add(entry); } //根据出现频次排序项 Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() { public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) { return ((Integer) o2.getValue()).compareTo((Integer) o1.getValue()); } }); String output = HDFSUtils.HDFS_TEMP_INPUT_URL + IdentityUtils.generateUUID() + File.separator + IdentityUtils.generateUUID(); SequenceFile.Writer writer = null; try { Path path = new Path(output); FileSystem fs = FileSystem.get(conf); writer = SequenceFile.createWriter(fs, conf, path, Text.class, IntWritable.class); for (Map.Entry<String, Integer> entry : entries) { writer.append(new Text(entry.getKey()), new IntWritable(entry.getValue())); } } catch (IOException e) { e.printStackTrace(); } finally { IOUtils.closeQuietly(writer); } return output; } //频繁项集生成 public void frequency_itemset_gen(String input, String output, String sort_input) { System.out.println("frequency_itemset_gen input: " + input); System.out.println("frequency_itemset_gen sort input: " + sort_input); String[] inputArgs = new String[]{input, output, sort_input}; FPGrowthMR.main(inputArgs); } public void run(String[] args) { if (null == conf) conf = new Configuration(); try { String[] inputArgs = new GenericOptionsParser( conf, args).getRemainingArgs(); if (inputArgs.length != 3) { System.out.println("error"); System.out.println("1. input path."); System.out.println("2. output path."); System.out.println("3. min support."); System.exit(2); } String fre1_output = frequency_1_itemset_gen(inputArgs[0], inputArgs[2]); String fre1_sort_output = frequency_1_itemset_sort(fre1_output); frequency_itemset_gen(inputArgs[0], inputArgs[1], fre1_sort_output); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { FPGrowthJob job = new FPGrowthJob(); long startTime = System.currentTimeMillis(); job.run(args); long endTime = System.currentTimeMillis(); System.out.println("spend time: " + (endTime - startTime)); } }
第一个MR很简单就不上了,直接贴第二个MR
public class FPGrowthMR { private static void configureJob(Job job) { job.setJarByClass(FPGrowthMR.class); job.setMapperClass(FPGrowthMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(FPGrowthReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); } public static void main(String[] args) { Configuration configuration = new Configuration(); try { String[] inputArgs = new GenericOptionsParser( configuration, args).getRemainingArgs(); if (inputArgs.length != 3) { System.out.println("error"); System.out.println("error, please input two path. input and output"); System.out.println("1. input path."); System.out.println("2. output path."); System.out.println("3. sort input path."); System.exit(2); } // configuration.set("mapred.job.queue.name", "q_hudong"); configuration.set("sort.input.path", inputArgs[2]); Path sortPath = new Path(inputArgs[2]); DistributedCache.addCacheFile(sortPath.toUri(), configuration); Job job = new Job(configuration, "FPGrowth Algorithm"); FileInputFormat.setInputPaths(job, new Path(inputArgs[0])); FileOutputFormat.setOutputPath(job, new Path(inputArgs[1])); configureJob(job); System.out.println(job.waitForCompletion(true) ? 0 : 1); } catch (Exception e) { e.printStackTrace(); } } } class FPGrowthMapper extends Mapper<LongWritable, Text, Text, Text> { private List<Map.Entry<String, Integer>> entries = null; @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); Configuration conf = context.getConfiguration(); URI[] uris = DistributedCache.getCacheFiles(conf); Map<String, Integer> map = new HashMap<String, Integer>(); SequenceFile.Reader reader = null; try { Path path = new Path(uris[0]); FileSystem fs = FileSystem.get(conf); reader = new SequenceFile.Reader(fs, path, conf); Text key = (Text) ReflectionUtils.newInstance( reader.getKeyClass(), conf); IntWritable value = new IntWritable(); while (reader.next(key, value)) { map.put(key.toString(), value.get()); key = new Text(); value = new IntWritable(); } } catch (IOException e) { e.printStackTrace(); } finally { IOUtils.closeQuietly(reader); } entries = new ArrayList<Map.Entry<String, Integer>>(); for (Map.Entry<String, Integer> entry : map.entrySet()) { entries.add(entry); } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString()); tokenizer.nextToken(); List<String> results = new ArrayList<String>(); while (tokenizer.hasMoreTokens()) { String token = tokenizer.nextToken(); String[] items = token.split(","); for (Map.Entry<String, Integer> entry : entries) { String eKey = entry.getKey(); for (String item : items) { if (eKey.equals(item)) { results.add(eKey); break; } } } } String[] values = results.toArray(new String[0]); StringBuilder sb = new StringBuilder(); for (String v : values) { sb.append(v).append(","); } if (sb.length() > 0) sb.deleteCharAt(sb.length() - 1); for (String v : values) { context.write(new Text(v), new Text(sb.toString())); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } } class FPGrowthReducer extends Reducer<Text, Text, Text, IntWritable> { @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); } @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String keyItem = key.toString(); System.out.println("key: " + keyItem); Data data = new Data(); for (Text value : values) { Instance instance = new Instance(); StringTokenizer tokenizer = new StringTokenizer(value.toString()); String token = tokenizer.nextToken(); String[] items = token.split(","); List<String> temp = new ArrayList<String>(); for (String item : items) { if (keyItem.equals(item)) { break; } temp.add(item); } instance.setValues(temp.toArray(new String[0])); data.getInstances().add(instance); } context.write(new Text(keyItem), new IntWritable(data.getInstances().size())); FPGrowthBuilder fpBuilder = new FPGrowthBuilder(); fpBuilder.build(data, null); List<List<ItemSet>> frequencies = fpBuilder.obtainFrequencyItemSet(); for (List<ItemSet> frequency : frequencies) { for (ItemSet itemSet : frequency) { StringBuilder sb = new StringBuilder(); for (String i : itemSet.getItems()) { sb.append(i).append(","); } sb.append(keyItem); context.write(new Text(sb.toString()), new IntWritable(itemSet.getSupport())); } } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } }
HDFS上查看结果如下:
尿布 5 莴苣,尿布 4 葡萄酒,尿布 4 豆奶,尿布 3 橙汁,尿布 2 莴苣,葡萄酒,尿布 3 莴苣,豆奶,尿布 2 葡萄酒,豆奶,尿布 2 橙汁,豆奶,尿布 2 橙汁 2 豆奶,橙汁 2 莴苣 5 豆奶,莴苣 3 葡萄酒 4 莴苣,葡萄酒 3 豆奶,葡萄酒 2 豆奶 4
相关推荐
数据挖掘笔记01-031
数据挖掘笔记11-121
数据挖掘笔记07-091
Python学习笔记--皮大庆,非常适合零基础入门。对应英文版本《How to think like a computer scientist》
数据结构高分笔记-1数据结构高分笔记-1数据结构高分笔记-1数据结构高分笔记-1数据结构高分笔记-1数据结构高分笔记-1
数据结构高分笔记-2数据结构高分笔记-2数据结构高分笔记-2数据结构高分笔记-2数据结构高分笔记-2数据结构高分笔记-2
郝斌数据结构自学笔记--知识点+程序源代码.docx
HotApp云笔记 - 精品开源demo-基于免费API(源代码+截图)HotApp云笔记 - 精品开源demo-基于免费API(源代码+截图)HotApp云笔记 - 精品开源demo-基于免费API(源代码+截图)HotApp云笔记 - 精品开源demo-基于免费API(源...
概率论与数理统计-手写笔记-期末考试复习概率论与数理统计-手写笔记-期末考试复习概率论与数理统计-手写笔记-期末考试复习概率论与数理统计-手写笔记-期末考试复习概率论与数理统计-手写笔记-期末考试复习概率论与...
2004考研通信原理笔记--续大我2004考研通信原理笔记--续大我2004考研通信原理笔记--续大我2004考研通信原理笔记--续大我2004考研通信原理笔记--续大我2004考研通信原理笔记--续大我
《技术人的管理之路》读书笔记 --思维导图 《技术人的管理之路》读书笔记 --思维导图 《技术人的管理之路》读书笔记 --思维导图 《技术人的管理之路》读书笔记 --思维导图 《技术人的管理之路》读书笔记 --思维导图 ...
Java基础每日复习笔记-JavaSE高级阶段.2020-10-13-211312.edf
嵌入式Liunx应用程序开发笔记-代码.zip嵌入式Liunx应用程序开发笔记-代码.zip嵌入式Liunx应用程序开发笔记-代码.zip嵌入式Liunx应用程序开发笔记-代码.zip嵌入式Liunx应用程序开发笔记-代码.zip嵌入式Liunx应用程序...
JPA学习笔记-EJB-04JPA关联映射总结 JPA JPA关联映射JPA学习笔记-EJB-04JPA关联映射总结 JPA JPA关联映射
Python金融大数据挖掘与分析全流程详解-学习笔记及案例代码.zip Python金融大数据挖掘与分析全流程详解-学习笔记及案例代码.zip Python金融大数据挖掘与分析全流程详解-学习笔记及案例代码.zip Python金融大数据挖掘...
信息系统项目管理师考试必过笔记---第十章-项目时间进度管理.docx信息系统项目管理师考试必过笔记---第十章-项目时间进度管理.docx信息系统项目管理师考试必过笔记---第十章-项目时间进度管理.docx信息系统项目管理...
信息系统项目管理师考试必过笔记---第十章-项目时间进度管理.pdf信息系统项目管理师考试必过笔记---第十章-项目时间进度管理.pdf信息系统项目管理师考试必过笔记---第十章-项目时间进度管理.pdf信息系统项目管理师...
数据结构笔记-浙江大学
通信电子电路(高频-模电PLUS)-手写笔记-期末考试复习通信电子电路(高频-模电PLUS)-手写笔记-期末考试复习通信电子电路(高频-模电PLUS)-手写笔记-期末考试复习通信电子电路(高频-模电PLUS)-手写笔记-期末考试...
根据《Redis笔记-尚硅谷周阳V1.3》整理,脑图、思维导图xmind