`
greemranqq
  • 浏览: 966114 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

大数据去除重复--实战(一)

阅读更多

          最近快过年了,来了一个紧急任务,加班加点的一周,终于上线了。也没多少时间去研究出去重复数据的算法,上一篇文章的算法,理论是可以的!但是由于我采用的行迭代的方式,JVM 会出现栈的深度溢出,我就换了一种方式,这里再次介绍给大家:

          回顾一下题目:超过内存限制的URL,去除重复数据!

   

          我的方法是根据hashCode 范围进行分组。比如文件A,假设有1亿行,A作为原始文件,然后循环读取每一行,根据hashCode 值分成两部分文件。初始我以int 的最大值和最小值为边界,0 作为分界线开始分,hashCode 大于0的放右边A-right,小于0的放一边,A-left

         假设第一次分割成 A-right , A-left 两个 文件,范围分别是2^31-0  和 0-2^31,然后记录两个文件hashCode 的范围,作为下一次分割条件,然后继续判断A-right 和A-left 文件是否是内存范围,迭代此方法,直到分割成内存能够读取的文件A-N,然后读入内存进行除重复,追加进汇总文件。

         这里临时画个图解释(图有点丑,谅解~。~):

       

 

这里我先临时写了一个生成文件的类,数据格式:www.+随即数字+.com 的形式

package com.file;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.util.Random;

/**
 * 生成内存不能一次读取的文件
 * @author Ran
 *
 */
public class InitFiles {
	
	private final static String PREFIX = "www.";
	private final static String SUFFIX = ".com";
	
	private final static Random RANDOM = new Random();
	// 每一次写入的行数,方便监控
	private final static int STRING_LINE_NUM = 1000*100*4;
	
	// 随即生成的数字 ,假设是网站地址
	public static final int RANDOM_NUM = Integer.MAX_VALUE;
	
	// 缓冲区大小
	static final int CACHE_SIZE = 1024*1024;
	
	// 生成文件的大小,这里就2G吧
	static final long FILE_MAX_LENGTH = 1024*1024*1024*2l;
	// 记录总行数
	static long file_lines = 0;
	public static void main(String[] args)   {
           // 看看运行状态
	    printMem();
	    long begin = System.currentTimeMillis();
	    writeFile("bigDate.txt");
	    long result = System.currentTimeMillis()-begin;
	    System.out.println("总行数"+file_lines);
	    System.out.println("生成文件总时间:"+result+"毫秒");
	}
	
	
	/**
	 * 循环写入文件,这里临时写死,生成的大概是1G左右的文件
	 */
	public static void writeFile(String fileName){
		Writer writer = null;
		File file = null;
		// 停止标志
		boolean isWrite = true;
		try {
			file = new File(fileName);
			writer = new BufferedWriter(new FileWriter(file),CACHE_SIZE);
			while(isWrite){
				writer.write(initStr());
				if(file.length() > FILE_MAX_LENGTH){
					isWrite = false;
				}
			}
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} finally{
			try {
				if(writer != null){
					writer.flush();
					writer.close();
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	
	/**
	 * 生成字符串,大量字符串 写入,会快一些
	 * @return
	 */
	public static String initStr(){
		StringBuilder sb = new StringBuilder(1000000);
		int i = 0;
		// 默认10W行 一次写入
		while(i<STRING_LINE_NUM){
			sb.append(PREFIX);
			sb.append(getContext());
			sb.append(SUFFIX);
			sb.append("\r\n");
			i++;
		}
		file_lines += i;
		return sb.toString();
	}
	
	/**
	 * 生成随机数,作为网站中间地址
	 * 参数 自己调节,测试方便
	 * @return
	 */
	public static String getContext(){
		String context = String.valueOf(RANDOM.nextInt(RANDOM_NUM));
		return context;
	}

       /**
	 * 每次写入时 信息
	 */
	public static void printMem(){
		print("已用内存:"+currRuntime.totalMemory()/1024/1024+" MB");
		print("最大内存:"+currRuntime.maxMemory()/1024/1024+" MB");
		print("可用内存:"+currRuntime.freeMemory()/1024/1024+" MB");
	}

       /**
	 * 方便我测试 是否打印,影响速度
	 * @param str
	 */
	public static void print(String str){
		System.out.println(str);
	}
}

 

生成2G 文件,我这里花了:

总行数:136800000 行

生成文件总时间:75812毫秒

 

下面是除去重复的类代码:

package com.files;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Reader;
import java.io.Writer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class PartFile {
	// 内存监控
	final static Runtime currRuntime = Runtime.getRuntime ();
	// 最小的空闲空间,额,可以用来 智能控制,- -考虑到GC ,暂时没用
	final static long MEMERY_LIMIT = 1024 * 1024 * 3;
	// 内存限制,我内存最多容纳的文件大小
	static final long FILE_LIMIT_SIZE = 1024*1024*20;
	// 文件写入缓冲区 ,我默认1M
	static final int CACHE_SIZE = 1024*1024;
	// 默认文件后缀
	static final String FILE_SUFFIX = ".txt";
	// 临时分割的文件目录,可以删除~。~
	static final String FILE_PREFIX = "test/";
	// 汇总的文件名
	static final String REQUST_FILE_NAME = "resultFile.txt";
	
	// 存放大文件 引用,以及分割位置
	static List<ChildFile> bigChildFiles = new ArrayList<ChildFile>();
	
	// 存放小文件的,驱除重复数据
	static Map<String,String> fileLinesMap = new HashMap<String,String>(10000);
	

	public static void main(String[] args) {
	long begin = System.currentTimeMillis();
	new PartFile().partFile(new File("bigData.txt"),
                   Integer.MAX_VALUE,Integer.MIN_VALUE);
	long result = System.currentTimeMillis()-begin;
	System.out.println("除去重复时间为:"+result +" 毫秒");
	}
     
	
	// 按hashCode 范围分割
	public  void partFile(File origFile,long maxNum,long minNum) {
		String line = null;
		long hashCode = 0;
		long max_left_hashCode = 0;
		long min_left_hashCode = 0;
		long max_right_hashCode = 0;
		long min_right_hashCode = 0;
		BufferedWriter rightWriter = null;
		BufferedWriter leftWriter = null;
		BufferedReader reader = null;
		try {
		reader = new BufferedReader(new FileReader(origFile));
		long midNum = (maxNum+minNum)/2;
		// 以文件hashCode 范围作为子文件名
		File leftFile = new File(FILE_PREFIX+minNum+"_"+midNum+FILE_SUFFIX);
		File rightFile = new File(FILE_PREFIX+midNum +"_"+maxNum+FILE_SUFFIX);
			
		leftWriter = new BufferedWriter(new FileWriter(leftFile),CACHE_SIZE);
		rightWriter = new BufferedWriter(new FileWriter(rightFile),CACHE_SIZE);
			
		ChildFile leftChild = new ChildFile(leftFile);
		ChildFile rightChild = new ChildFile(rightFile);
			
		// 字符串 组合写入也行
		// StringBuilder leftStr = new StringBuilder(100000);
		// StringBuilder rightStr = new StringBuilder(100000);
		// hashCode 的范围作为分割线
			while ((line = reader.readLine()) != null) {
				hashCode = line.hashCode();
					if (hashCode > midNum) {
						if(max_right_hashCode < hashCode || max_right_hashCode == 0){
							max_right_hashCode = hashCode;
						}else if(min_right_hashCode > hashCode || min_right_hashCode == 0){
							min_right_hashCode = hashCode;
						}
						// 按行写入缓存
						writeToFile(rightWriter, line);
				}else {
						if(max_left_hashCode < hashCode || max_left_hashCode == 0){
							max_left_hashCode = hashCode;
						}else if(min_left_hashCode > hashCode || min_left_hashCode == 0){
							min_left_hashCode = hashCode;
						}
						writeToFile(leftWriter, line);
				}
			}
			// 保存子文件信息
			leftChild.setHashCode(min_left_hashCode, max_left_hashCode);
			rightChild.setHashCode(min_right_hashCode, max_right_hashCode);
			
			closeWriter(rightWriter);
			closeWriter(leftWriter);
			closeReader(reader);
			
			
			// 删除原始文件,保留最原始的文件
			
			if(!origFile.getName().equals("bigData.txt")){
				origFile.delete();
			}
			
			// 分析子文件信息,是否写入或者迭代
			analyseChildFile(rightChild, leftChild);

		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	// 分析子文件信息
	public void analyseChildFile(ChildFile rightChild,ChildFile leftChild){
		// 将分割后 还是大于内存的文件保存  继续分割
		File rightFile = rightChild.getChildFile();
		if(isSurpassFileSize(rightFile)){
			bigChildFiles.add(rightChild);
		}else if(rightFile.length()>0){
			orderAndWriteToFiles(rightFile);
		}
		
		File leftFile = leftChild.getChildFile();
		if(isSurpassFileSize(leftFile)){
			bigChildFiles.add(leftChild);
		}else if(leftFile.length()>0){
			orderAndWriteToFiles(leftFile);
		}
		
		// 未超出直接内存排序,写入文件,超出继续分割,从末尾开始,不易栈深度溢出
		if(bigChildFiles.size() > 0 ){
			ChildFile e  = bigChildFiles.get(bigChildFiles.size()-1);
			bigChildFiles.remove(e);
			// 迭代分割
			partFile(e.getChildFile(), e.getMaxHashCode(), e.getMinHashCode());
		}
	}
	

	// 将小文件读到内存排序除重复
	public  void orderAndWriteToFiles(File file){
		BufferedReader reader = null;
		String line = null;
		BufferedWriter totalWriter = null;
		StringBuilder sb = new StringBuilder(1000000);
		try {
			totalWriter = new BufferedWriter(new FileWriter(REQUST_FILE_NAME,true),CACHE_SIZE);
			reader = new BufferedReader(new FileReader(file));
			while((line = reader.readLine()) != null){
				if(!fileLinesMap.containsKey(line)){
					fileLinesMap.put(line, null);
					sb.append(line+"\r\n");
					//totalWriter.write(line+"\r\n");
				}
			}
			totalWriter.write(sb.toString());
			fileLinesMap.clear();
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}finally{
			closeReader(reader);
			closeWriter(totalWriter);
			// 删除子文件
			file.delete();
		}
	}
	

	
	// 判断该文件是否超过 内存限制
	public  boolean isSurpassFileSize(File file){
		return FILE_LIMIT_SIZE <  file.length();
	}
	

	// 将数据写入文件
	public  void writeToFile(BufferedWriter writer, String writeInfo) {
		try {
			writer.write(writeInfo+"\r\n");
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	// 关闭流
	public  void closeReader(Reader reader) {
		if (reader != null) {
			try {
				reader.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	// 关闭流
	public  void closeWriter(Writer writer) {
		if (writer != null) {
			try {
				writer.flush();
				writer.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	
	
	// 内部类,记录子文件信息
	class ChildFile{
		// 文件 和 内容 hash 分布
		File childFile;
		long maxHashCode;
		long minHashCode;
		
		public ChildFile(File childFile){
			this.childFile = childFile;
		}
		
		public ChildFile(File childFile, long maxHashCode, long minHashCode) {
			super();
			this.childFile = childFile;
			this.maxHashCode = maxHashCode;
			this.minHashCode = minHashCode;
		}
		
		public File getChildFile() {
			return childFile;
		}
		public void setChildFile(File childFile) {
			this.childFile = childFile;
		}
		public long getMaxHashCode() {
			return maxHashCode;
		}
		public void setMaxHashCode(long maxHashCode) {
			this.maxHashCode = maxHashCode;
		}
		public long getMinHashCode() {
			return minHashCode;
		}
		public void setMinHashCode(long minHashCode) {
			this.minHashCode = minHashCode;
		}
		
		public void setHashCode(long minHashCode,long maxHashCode){
			this.setMaxHashCode(maxHashCode);
			this.setMinHashCode(minHashCode);
		}
		
	}
}

 

 

除去重复时间为:1228984 毫秒,20 多分钟

我的老电脑,总内存小,JVM 才几十M,大一些应该会快。 

 

方法分析:

       1.采用hashCode 范围迭代分割的方式,可以分割成内存可以容纳的小文件,然后完成功能

       2.我们发现每次迭代,相当于重复读取里面的文件,然后再进行分割,这样浪费了很多时间,那么有没有更好的方式呢? 我们可以这样设计,假设我们知道文件的总大小,已经大概的行数,比如2G,1亿行,我们一开始就分配区间,在分配完全均匀的情况下,1亿行数据,最多占用1亿个空间,那么可以这样分配,用hashCode 的范围,也就是Integer的最大值和最小值进行模拟分配。分配范围 根据内存进行,比如:读取第一行的的hashCode 值为100,那么,我们可以分配到1-1000000,(这里以100W 为单位),也就是说只要hashCode 范围在这个区间的都分配到这里,同理,读到任何一个hashCode 值,除以单位(100W),就能找到你的区间,比如hasCode 是 2345678,那么就是200W-300W的区间。这里有些区间可能空的,就删除了,有些区间很多,就用上面的迭代,空间足够 就直接写入汇总文件。当然区间单位的颗粒度划分,根据内存和数据总量 自己弄,这样下来就会一次读取 ,就能尽量的分配均匀,就不会大量迭代读取浪费时间了。

 

       3.我们发现分割的时候是直接写入,没有进行任何排序或者其他操作,如果我们在分割的时候保存一部分到集合内存,发现有重复的,就不写入分割文件了,如果写入量超过集合限制了,就清空集合,这样能保证单个小文件 一次达到除重复的效果,大文件部分除重复,减少子文件的数据量,如果重复数据较多,可以采用,极端的情况下完全不重复,那么集合会浪费你的空间,并且除重复的时候会浪费你的时间,这里自己对数据进行整体考虑。

 

       4.这里内存的控制是我测试进行控制,用JDK 的方法进行内存监控,因为涉及到回收频率 以及时间上的问题,我没有动态的对集合进行监控,占了多少内存,还剩多少内存等等,可以尝试。

 

 

小结:

        1.首先很感谢Jason.Hao     的的帮助,主动去实践,才会发现一些理论和实践的差距,更加体会深刻。

        2.在程序设计,以及一些异常控制上没多做处理,仅仅测试一下理论功能。

        3.这是根据hashCode 范围分离的方法,如果有其他方法或者建议,欢迎大家共同实践哦

 

        最后我的新书spring 到了,兴奋啊。关于《spring in action 》 我大概看了一下,主要是介绍spring 的一些使用技巧,和一些常用功能,也就是说教你如何使用,使用方便。

        关于《spring 核心技术内幕》 里面主要对spring 的一些架构设计进行分析,包括bean 加载, Ioc,AOP等,也就是说能加深你对源码的一些理解,稍微深一点。记得还有一本关于 精通AOP的书名字忘记了,有好的,欢迎分享,推荐哦。

        

  • 大小: 6.8 KB
分享到:
评论
1 楼 woommy 2014-01-26  
很好,有始有终。这个方法的核心原理是,hashcode不同的字符串一定不是相同的字符串,所以经过hashcode分开后的文件之间不存在重复的字符串,然后每个文件又可以读入内存,处理起来就容易了。但是如果对性能有要求还有优化的空间。

相关推荐

Global site tag (gtag.js) - Google Analytics