`
hope598
  • 浏览: 65403 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

大家帮忙看看这个多线程有问题没

阅读更多

整个流程是这样的,循环从一个文件中读取数据,每读6万条后就要暂停,启动三个线程(每次仅允许三个线程同时处理)来处理这6万条数据,处理结束后,再继续读。。。循环这样直到文件中数据全部处理完。大家帮忙看看,有什么问题没,例如处理流程、并发。。。等方面,多谢!

public class CopyOfTest {
	public static void main(String[] args) {
		Producer p = new Producer();
		while(p.producer() > 0){
			p.cunsumer();
		}
	}
}

class Producer {
	private Lock lock = new ReentrantLock();
	private Condition notEmpty = lock.newCondition();
	private Condition notFull = lock.newCondition();
	private List<List<String>> dataList = null;
	private BufferedReader  reader;
	private static ExecutorService executor = Executors.newFixedThreadPool(3);
	
	public Producer() {
		try {
			reader = new BufferedReader(new InputStreamReader(new FileInputStream(new File("D:/11.txt")), "GBK"));
			dataList = new ArrayList<List<String>>();
		} catch (UnsupportedEncodingException e1) {
			e1.printStackTrace();
		} catch (FileNotFoundException e1) {
			e1.printStackTrace();
		}
	}
	
	public int producer() {
		String text = "";
		List<String> tmp = new ArrayList<String>();
		int count = 0;
		lock.lock();
		try {
			try {
				System.out.println("+++++++++++");
				if (dataList.size() == 0) {
					notEmpty.signalAll();
				}
				if (dataList.size() == 3) {
					try {
						notFull.await();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
				while ((text = reader.readLine()) != null) {
					count++;
					System.out.println(count + "+++" + text);
					tmp.add(text);
					if (count % 20000 == 0) {
						dataList.add(tmp);
						tmp = new ArrayList<String>();
					}
					if (dataList.size() == 3) {
						break;
					}
				}
				if (count % 60000 != 0) {
					dataList.add(tmp);
				}
				tmp = null;
			} catch (IOException e1) {
				e1.printStackTrace();
			}
			if (dataList.size() < 1) {
				executor.shutdown();
			}
			return dataList.size();
		} finally {
			lock.unlock();
		}
	}
	
	public void cunsumer() {
		lock.lock();
		int len = dataList.size();
		Future<?>[] futureArr = new Future<?>[len];
		try {
			if (dataList.size() == 0) {
				try {
					notEmpty.await();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			if (dataList.size() == 3) {
				notFull.signalAll();
			}
			for (int i = 0; i < len; i++) {
				futureArr[i] = executor.submit(new SingleThread12(dataList.get(i)));
			}
			boolean flag = true;
			while (flag) {
				for (Future<?> f : futureArr) {
					flag = flag && f.isDone();
				}
				flag = !flag;
			}
			dataList = new ArrayList<List<String>>();
		} finally {
			lock.unlock();
		}
	}
}

 

 

0
0
分享到:
评论
18 楼 quietwater 2013-03-16  
这个问题我是这样分析的。
生产者读取文件内容,3万条一组,放入长度为3的阻塞队列,一旦读取文件完毕并放入阻塞队列,置生产者状态位为真,默认为假。
消费者从队列中拿一组数据,成功取出后启动一个线程处理该数据,并将计数器加一,默认为零。当计数器为3时,休眠再检查。当生产者状态为真,并且阻塞队列为空时退出。

这里生产者状态为volatile boolean
计数值需要同步增加和减少,包括查看个数
17 楼 lovexp2010 2013-03-13  
唔系好人 写道

  九楼的内部类,你不用继承线程???


实现Runnable接口的匿名类,呵呵~
16 楼 backkom1982 2013-03-13  
你现在的思路,可以用CountdownLatch类实现,不需要自己关心线程之间的调度。

更好的方式,其实同学已经提到了,使用阻塞队列用多个consumer和provider同时处理。

比阻塞队列性能更好的工具可以使用disruptor
15 楼 唔系好人 2013-03-13  
yuyue007 写道
看了评论,个人还是觉得用多线程处理比较好,而且在处理的时候,可以一边读,一般处理数据,这样能最大限度的利用资源,节约时间。

贴一个我写的代码,欢迎指正。

package test.thread.concurrent;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author yole.yu <br/>
 * Created date: 2013-3-13 <br/>
 */
public class ProducerConsumerTest {

    public static void main(String[] args) throws InterruptedException{

        Producer producer = new Producer();
        producer.read();

        BlockingQueue<String> data = producer.getData();
        Consumer consumer = new Consumer(data, producer);
        consumer.consume();
    }
}

class Producer{
    private volatile boolean endOfFile = false;
    private volatile int dataSequence=0;
    // store data, it's a warehouse
    private BlockingQueue<String> data = new ArrayBlockingQueue<String>(60);

    public void read(){
        //use a new thread to process read 
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("start produce...");
                while(!endOfFile){
                    try {
                        // read data from your file, I use dataSequence to identify data
                        data.put(++dataSequence + "");
                        System.out.println("produced " + dataSequence);
                        //assume dataSequence=123 means end of file
                        if(dataSequence == 123){
                            endOfFile = true;
                            System.out.println("all data have been read");
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();

    }

    public BlockingQueue<String> getData(){
        return data;
    }

    public boolean isEndOfFile() {
        return endOfFile;
    }
}

class Consumer{
    private int threadSize=3;
    private BlockingQueue<String> datas;
    private Producer producer;

    public Consumer(BlockingQueue<String> data, Producer producer){
        this.datas = data;
        this.producer = producer;
    }
    
    public void consume(){
        System.out.println("start consume..");
        ExecutorService pool = Executors.newFixedThreadPool(threadSize);
        for(int i=0; i<3; i++){
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    while(datas.size()>0 || !producer.isEndOfFile()){
                        try {
                            String data = datas.take();
                            System.out.println(Thread.currentThread().getName() + "-----" + data);
                            TimeUnit.MILLISECONDS.sleep(300);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
        pool.shutdown(); 
    }

    public int getThreadSize() {
        return threadSize;
    }

    public void setThreadSize(int threadSize) {
        this.threadSize = threadSize;
    }
}

  九楼的内部类,你不用继承线程???
14 楼 yuyue007 2013-03-13  
beykery 写道
我奇怪你为何不使用阻塞队列,你这个逻辑直接用阻塞队列,写起来更简单。

下面写了个阻塞队列的,貌似他不感兴趣哦。。
13 楼 beykery 2013-03-13  
我奇怪你为何不使用阻塞队列,你这个逻辑直接用阻塞队列,写起来更简单。
12 楼 hope598 2013-03-13  
zjhlht 写道
advantech 写道
这个程序的思路可以这样,你建n个长度1000的队列,读程序往队列里读,读满一个就把该队列放到一个线程去处理,然后换下一个空队列继续读。
线程处理完后将空队列重新交给读程序,如此循环。根本不需要用锁。


同意!这个方法推荐~~~~~避免了很多问题

并且读的速度远远大于处理的速度,读完6万笔数据后,业务的处理要持续1、2分钟,这个时间段读操作要阻塞的
11 楼 kidding87 2013-03-13  
zjhlht 写道
advantech 写道
这个程序的思路可以这样,你建n个长度1000的队列,读程序往队列里读,读满一个就把该队列放到一个线程去处理,然后换下一个空队列继续读。
线程处理完后将空队列重新交给读程序,如此循环。根本不需要用锁。


同意!这个方法推荐~~~~~避免了很多问题



边读边处理不是更好些
10 楼 hope598 2013-03-13  
zjhlht 写道
advantech 写道
这个程序的思路可以这样,你建n个长度1000的队列,读程序往队列里读,读满一个就把该队列放到一个线程去处理,然后换下一个空队列继续读。
线程处理完后将空队列重新交给读程序,如此循环。根本不需要用锁。


同意!这个方法推荐~~~~~避免了很多问题

内存中最多只允许存在三个线程在处理业务,不能一直的读数据出来,因为数据量比较大,经常会出现内存溢出。。。
9 楼 yuyue007 2013-03-13  
看了评论,个人还是觉得用多线程处理比较好,而且在处理的时候,可以一边读,一般处理数据,这样能最大限度的利用资源,节约时间。

贴一个我写的代码,欢迎指正。

package test.thread.concurrent;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author yole.yu <br/>
 * Created date: 2013-3-13 <br/>
 */
public class ProducerConsumerTest {

    public static void main(String[] args) throws InterruptedException{

        Producer producer = new Producer();
        producer.read();

        BlockingQueue<String> data = producer.getData();
        Consumer consumer = new Consumer(data, producer);
        consumer.consume();
    }
}

class Producer{
    private volatile boolean endOfFile = false;
    private volatile int dataSequence=0;
    // store data, it's a warehouse
    private BlockingQueue<String> data = new ArrayBlockingQueue<String>(60);

    public void read(){
        //use a new thread to process read 
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("start produce...");
                while(!endOfFile){
                    try {
                        // read data from your file, I use dataSequence to identify data
                        data.put(++dataSequence + "");
                        System.out.println("produced " + dataSequence);
                        //assume dataSequence=123 means end of file
                        if(dataSequence == 123){
                            endOfFile = true;
                            System.out.println("all data have been read");
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();

    }

    public BlockingQueue<String> getData(){
        return data;
    }

    public boolean isEndOfFile() {
        return endOfFile;
    }
}

class Consumer{
    private int threadSize=3;
    private BlockingQueue<String> datas;
    private Producer producer;

    public Consumer(BlockingQueue<String> data, Producer producer){
        this.datas = data;
        this.producer = producer;
    }
    
    public void consume(){
        System.out.println("start consume..");
        ExecutorService pool = Executors.newFixedThreadPool(threadSize);
        for(int i=0; i<3; i++){
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    while(datas.size()>0 || !producer.isEndOfFile()){
                        try {
                            String data = datas.take();
                            System.out.println(Thread.currentThread().getName() + "-----" + data);
                            TimeUnit.MILLISECONDS.sleep(300);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
        pool.shutdown(); 
    }

    public int getThreadSize() {
        return threadSize;
    }

    public void setThreadSize(int threadSize) {
        this.threadSize = threadSize;
    }
}
8 楼 zjhlht 2013-03-13  
advantech 写道
这个程序的思路可以这样,你建n个长度1000的队列,读程序往队列里读,读满一个就把该队列放到一个线程去处理,然后换下一个空队列继续读。
线程处理完后将空队列重新交给读程序,如此循环。根本不需要用锁。


同意!这个方法推荐~~~~~避免了很多问题
7 楼 advantech 2013-03-12  
这个程序的思路可以这样,你建n个长度1000的队列,读程序往队列里读,读满一个就把该队列放到一个线程去处理,然后换下一个空队列继续读。
线程处理完后将空队列重新交给读程序,如此循环。根本不需要用锁。
6 楼 derta2009 2013-03-12  
额,你这看的我晕啊,我按你的需求,大概写了例子意思一下啊。
public class ThreadPoolTest {

	public static void main(String[] args) {
		ThreadPoolTest tp=new ThreadPoolTest();
		ExecutorService es = Executors.newFixedThreadPool(6);//一次6个线程同时运行,每个线程处理10000条数据
		for(int i=0;i<10;i++){
			es.execute(tp.new newThread(i));
		}
		es.shutdown();
	}
	
	class newThread  extends Thread{
		private int i ;
		
		public newThread(int i) {
			this.i = i;
		}
		List<Integer> list = new ArrayList<Integer>();
		@Override
		public void run() {

			//读取一万调数据
			for(int i=0;i<10000;i++){
				list.add(i);
			}
			
			//处理数据
			System.out.println("处理了10000条数据");
		}
		
	}
}
5 楼 hope598 2013-03-12  
这个说成多线程的确不合适。。。还请多多指点,3q
4 楼 java_min 2013-03-12  
你确定你写的是多线程吗,好好看看多线程什么意思,多线程是怎么处理数据的。
3 楼 hope598 2013-03-12  
因为业务处理时间有点长,如果一次性全读出来,会出现内存溢出。。。
2 楼 panggezi 2013-03-12  
为啥非得等到6万条处理完producer才继续读?不然用BlockingQueue多方便。
1 楼 zj304292653 2013-03-12  
我觉得你这个消费者用三个线程没有意义,整个消费流程都被锁住了,还不如单线程

相关推荐

    有问题的QT程序,请大家帮忙看一下吧

    有问题的QT程序 这个程序的功能是客户端向服务器发送一个字符串,服务器接收到这个字符串之后,进行处理(复制一个相同的...(提示说不能向别的线程发送信号/数据,但我所有的问题都是在同一个线程里面处理的啊?)

    小米云相册导出至本地 原图下载 支持多线程

    多线程有bug 下载结束后 自动关闭软件 不急着要的话 建议单线程比较稳定 小米的cookie 打开云相册 刷新后 按f12 抓包获取 小米cookie 有时效 有一个接口可以直接获取 等v2.0后发布更新 大家先用着 昨天其实也发布...

    用C#做的一个聊天软件

    这段代码是百度好友为我解惑帮忙写的两份聊天软件的其中之一,两个功能是一样的,完全可以实现点对点的聊天,只不过一个是用到了异步另一个是多线程同步(其实我觉得通过多线程控制窗体会使得程序清晰易懂,虽说会...

    leetcode下载-hy_computerAndJava_basic:计算机基础知识知识

    在那个时候基础不好(没学过操作系统),对jvm和多线程只是死记硬背,很多概念根部不理解 只有系统的学习,才能更深的理解 提升 高并发架构(消息队列,搜索引擎,缓存,数据库高级)-&gt;分布式系统-&gt;springcloud微服务 -&gt;k8s ...

    心蓝12306购票软件2013年1月6日最新版

    这个问题是由于您的证书安装不正确导致,请重新下载证书,并按里面的“SRCA根证书安装说明手册”重新安装即可解决问题。如果还不行,这也是您自己电脑的问题,请联系您的网络管理员或者与我们的客服取得联系以取得...

    易语言NetDB数据库操作中间件

    3、**本次更新包括数据库读写的权限控制,测试环境有限,有问题再改进,这个只是个雏形,后期还会更强。 4、**主要演示文件是服务端和网盘,请详细阅读。 ================ [2020-2-4日] ================ 1、**...

    CoreData:Demo中详细讲解了CoreData的各种操作,并且给出了详细的注释。写技术文章不容易,希望各位能帮忙点个Star,谢谢!

    我在简书的博客中写了一系列总计六篇的CoreData文章,总字数大概3W+,从CoreData的基础使用到使用进阶,再到多线程、版本迁移等高级用法,讲解非常详细。 但CoreData的学习还是应该偏实践,我根据博客中讲到的知识点...

    精易模块[源码] V5.15

    11、修复“线程_等待”命令注释反的问题,返回真表示线程结束,假表示已超时。感谢易友【tone】反馈。 12、修复“类_识图-&gt;找图_从字节集”命令,载入大文件直接奔溃的BUG,感谢易友【tone】反馈。 精易模块 V3.80...

    my-python-practices:我的python做法

    my-python-practicessimplemultithreadsCrawler.py:一个简单的多线程生产者消费者爬虫,实际使用的时候可以重写parser的parse_links方法,来写自己的解析规则,然后将解析后将继续要爬的地址放入url队列,生产者会...

    易语言-易语言快速文件搜索

    非原创,此源码是该模块的作者为了测试多线程而写的。 经过测试,调试情况下运行没发生什么BUG。希望懂的大神有时间的话帮忙看看为什么编译出来后第二次搜索会崩溃。

    .NET 分布式组件库 Exceptionless Foundatio.zip

    如果您觉得本文对您有帮助,想让更多人了解Exceptionless,感谢您帮忙点的【推荐】。如果您对 Exceptionless 感兴趣或者是想学习 Exceptionless 的代码,可以加入群Exceptionless QQ群:330316486。 ...

    精易官方免费模块v3.60版

    感谢 90后辉煌 提供参考代码,让我们期待已久的超时功能回归,也请大家帮忙测试 【网页_访问_EX 为测试版本,完善后,将会删除该命令,功能会添加到 网页_访问()】 2.增加 线程池类1“取状态” 感谢 我叫林舒书 ...

Global site tag (gtag.js) - Google Analytics