`
0428loveyu
  • 浏览: 29148 次
  • 性别: Icon_minigender_2
  • 来自: 西安
文章分类
社区版块
存档分类
最新评论

Java线程池例子

 
阅读更多
/**
 * 
 */
package iotest.serversocket;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.zip.GZIPOutputStream;

/**
 * @author Brandon B. Lin
 * 
 */
public class GZipThread extends Thread {

	private List<File> jobQueue;
	private static int filesCompressed = 0;

	public GZipThread(List<File> jobQueue) {
		this.jobQueue = jobQueue;
	}

	private static synchronized void incrementFilesCompressed() {
		filesCompressed++;
	}

	@Override
	public void run() {
		while (filesCompressed != GZipAllFiles
				.getNumbersOfFilesToBeCompressed()) { // if there is any job, even not in jobqueue
			
			File input = getAJobFromQueue();
			if(input == null) {
				return;
			} else {		
				incrementFilesCompressed();
				compressedAnFile(input);
			}
		}
	}
	
	/**
	 * Get a job from queue if any , null if all job done
	 */
	private File getAJobFromQueue() {
		
		File result = null;
		synchronized (jobQueue) {
			while (jobQueue.isEmpty()) {
				if (filesCompressed == GZipAllFiles
						.getNumbersOfFilesToBeCompressed()) {
					System.out.println(Thread.currentThread() + " ending!");
					return result;
				}
				
				try {
					jobQueue.wait();
				} catch (InterruptedException exception) {
					exception.printStackTrace();
				}
			}
		    result =  jobQueue.remove(jobQueue.size() - 1);
	    }
		return result;
    }

	/**
	 * compress an file
	 */
	private void compressedAnFile(File fileToBeCompressed) {
		if (!fileToBeCompressed.getName().endsWith(".gz")) { // 不压缩已经压缩的文件
			try {
				InputStream in = new BufferedInputStream(new FileInputStream(
						fileToBeCompressed));
				File output = new File(fileToBeCompressed.getParent(),
						fileToBeCompressed.getName() + ".gz");
				if (!output.exists()) { // 不重复压缩
					OutputStream out = new BufferedOutputStream(
							new GZIPOutputStream(new FileOutputStream(output)));
					copyIntoOut(in, out);
				}
			} catch (IOException exception) {
				exception.printStackTrace();
			}
		}
	}

	/**
	 * copy data from in to out
	 */
	private void copyIntoOut(InputStream in, OutputStream out)
			throws IOException {
		int readByte;
		while ((readByte = in.read()) != -1) {
			out.write(readByte);
		}
		out.flush();
		out.close();
		in.close();
	}

}

/**
 * 
 */
package iotest.serversocket;

import java.io.File;
import java.util.Vector;

/**
 * @author Brandon B. Lin
 * 
 */
public class GZipAllFiles {

	public final static int THREAD_COUNT = 4;
	private static int filesToBeCompressed = -1;
	private static GZipThread[] threads = new GZipThread[THREAD_COUNT];
	private static Vector<File> jobQueue = new Vector<>();

	public static void main(String[] args) {
		createThreadPool();
		String directoryToBeCompressed = "F:\\java\\UML\\";
		addFilesToJobQueue(directoryToBeCompressed);
		notifyAllThreadNoMoreJobs();
	}
	
	/**
	 * create THREAD_COUNT threads in thread pool
	 */
	private static void createThreadPool() {
		for (int i = 0; i < threads.length; i++) {
			threads[i] = new GZipThread(jobQueue);
			threads[i].start();
		}
	}
	
	/**
	 * if parameter is an file, add the file to jobQueue,
	 * if parameter is an directory, add first level's uncompressed files to jobqueue
	 */
	private static void addFilesToJobQueue(String directory) {
		File file = new File(directory);
		int totalFiles = 0;
		
		if (file.exists()) {
			if (file.isDirectory()) {
				totalFiles += addDirectoryToJobQueue(file);
			} else {
				addAnFileToJobQueue(file);
				totalFiles++;
			}
		}
		
		filesToBeCompressed = totalFiles; // 必须一次性增加
	}
	
	/**
	 * 添加目錄的第一級文檔到作業隊列
	 */
	private static int addDirectoryToJobQueue(File directory) {
		File[] files = directory.listFiles();
		int numberOfFiles = 0;
		for (int j = 0; j < files.length; j++) {
			if (!files[j].isDirectory()) {
				addAnFileToJobQueue(files[j]);
				numberOfFiles++;
			}
		}
		return numberOfFiles;
	}
	
	/**
	 * add an file (not directory) to jobqueuqe
	 */
	private static void addAnFileToJobQueue(File fileToAdd) {
		synchronized (jobQueue) {
			jobQueue.add(0, fileToAdd);
			jobQueue.notifyAll();
		}
	}
	
	/**
	 * Notify all threads in thread pool that no more job will be added.
	 */
	private static void notifyAllThreadNoMoreJobs() {
		for (int i = 0; i < threads.length; i++) {
			threads[i].interrupt();
		}
	}

	/**
	 * How many files need to be compressed
	 */ 
	public static int getNumbersOfFilesToBeCompressed() {
		return filesToBeCompressed;
	}

}

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics