`
芝加哥09
  • 浏览: 59176 次
社区版块
存档分类
最新评论

利用带返回值多线程实现Hadoop中的WordCount实例

    博客分类:
  • Java
 
阅读更多

 更多IT互联网学习资源,尽在通通学 - 知识学习与分享平台

 

学习过Hadoop的都知道中,里面有一个经典实例就是统计文档每个单词出现的次数,即WordCount实例。这里利用Executor框架及带返回值的多线程实现Word?Count实例。

 

以下是核心代码:

 

WordCountMapper.java

package com.tongtongxue.wordcount;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.concurrent.Callable;

public class WordCountMapper implements Callable<Map> {
    private int start;
    private int end;
    private File[] files;

    public WordCountMapper() {
    }

    public WordCountMapper(File[] files, int start, int end) {
        this.files = files;
        this.start = start;
        this.end = end;
    }

    @Override
    public Map call() throws Exception {
        BufferedReader reader = null;
        Map result = new HashMap();
        String line = null;
        for (int i = start; i < end; i++) {
            File file = files[i];
            try {
                reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), "utf-8"));
                while ((line = reader.readLine()) != null) {
                    StringTokenizer tokenizer = new StringTokenizer(line);
                    while (tokenizer.hasMoreTokens()) {
                        String word = tokenizer.nextToken();
                        if (result.containsKey(word)) {
                            result.put(word, result.get(word) + 1L);
                        } else {
                            result.put(word, 1L);
                        }
                    }
                }
            } finally {
                if (reader != null) {
                    reader.close();
                }
            }
        }
        return result;
    }
}

 

WordCount.java

package com.tongtongxue.wordcount;

import java.io.File;
import java.io.FileFilter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class WordCount {
    private ExecutorService executorService;
    private int threadNum;
    private List<Future<Map>> tasks = new ArrayList<Future<Map>>();
    private File[] txtFiles;

    public WordCount() {
        // 以cup的个数,作为线程个数
        threadNum = Runtime.getRuntime().availableProcessors();
        executorService = Executors.newFixedThreadPool(threadNum);
    }

    public WordCount(int threadNum) {
        this.threadNum = threadNum;
        executorService = Executors.newFixedThreadPool(threadNum);
    }

    public void count(String dirPath) throws Exception {
        File dir = new File(dirPath);
        txtFiles = dir.listFiles(new FileFilter() {

            @Override
            public boolean accept(File file) {
                String fileName = file.getName();
                if (fileName.endsWith(".txt") || fileName.endsWith(".TXT")) {
                    return true;
                }
                return false;
            }
        });

        int size = txtFiles.length;
        for (int i = 0; i  size) {
                end = size;
            }

            WordCountMapper mapper = new WordCountMapper(txtFiles, start, end);
            FutureTask<Map> futureTask = new FutureTask<Map>(mapper);
            tasks.add(futureTask);

            if (!executorService.isShutdown()) {
                executorService.submit(futureTask);
            }

        }
        showResult();
    }

    public void close() {
        executorService.shutdown();
    }

    public void showResult() throws Exception {
        Map map = new HashMap();
        for (Future<Map> task : tasks) {
            Map result = task.get();
            for (Entry entry : result.entrySet()) {
                String word = entry.getKey();
                Long num = entry.getValue();
                if (map.containsKey(word)) {
                    map.put(word, map.get(word) + num);
                } else {
                    map.put(word, num);
                }
            }
        }

        System.out.println(map.size());

        for (Entry entry : map.entrySet()) {
            System.out.println(entry.getKey() + " ------> " + entry.getValue());
        }
    }
}

 

转载本文链接为:http://www.tongtongxue.com/archives/1141.html

 

 

 

1
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics