`

Java实现通用线程池

    博客分类:
  • J2SE
阅读更多

URL: http://blog.csdn.net/polarman/archive/2006/08/09/1042149.aspx

      线程池通俗的描述就是预先创建若干空闲线程,等到需要用多线程去处理事务的时候去唤醒某些空闲线程执行处理任务,这样就省去了频繁创建线程的时间,因为频 繁创建线程是要耗费大量的CPU资源的。如果一个应用程序需要频繁地处理大量并发事务,不断的创建销毁线程往往会大大地降低系统的效率,这时候线程池就派 上用场了。
      本文旨在使用Java语言编写一个通用的线程池。当需要使用线程池处理事务时,只需按照指定规范封装好事务处理对象,然后用已有的线程池对象去自动选择空 闲线程自动调用事务处理对象即可。并实现线程池的动态修改(修改当前线程数,最大线程数等)。下面是实现代码:

//ThreadTask .java

package polarman.threadpool;

/** *//**
 * 线程任务
 * @author ryang
 * 2006-8-8
 */
public interface ThreadTask ...{
    public void run();
}

//PooledThread.java

package polarman.threadpool;

import java.util.Collection;
import java.util.Vector;

/** *//**
 * 接受线程池管理的线程
 * @author ryang
 * 2006-8-8
 */
public class PooledThread extends Thread ...{
  
    protected Vector tasks = new Vector();
    protected boolean running = false;
    protected boolean stopped = false;
    protected boolean paused = false;
    protected boolean killed = false;
    private ThreadPool pool;
  
    public PooledThread(ThreadPool pool)...{
        this.pool = pool;
    }
  
    public void putTask(ThreadTask task)...{
        tasks.add(task);
    }
  
    public void putTasks(ThreadTask[] tasks)...{
        for(int i=0; i<tasks.length; i++)
            this.tasks.add(tasks[i]);
    }
  
    public void putTasks(Collection tasks)...{
        this.tasks.addAll(tasks);
    }
  
    protected ThreadTask popTask()...{
        if(tasks.size() > 0)
            return (ThreadTask)tasks.remove(0);
        else
            return null;
    }
  
    public boolean isRunning()...{
        return running;
    }
  
    public void stopTasks()...{
        stopped = true;
    }
  
    public void stopTasksSync()...{
        stopTasks();
        while(isRunning())...{
            try ...{
                sleep(5);
            } catch (InterruptedException e) ...{
            }
        }
    }
  
    public void pauseTasks()...{
        paused = true;
    }
  
    public void pauseTasksSync()...{
        pauseTasks();
        while(isRunning())...{
            try ...{
                sleep(5);
            } catch (InterruptedException e) ...{
            }
        }
    }
  
    public void kill()...{
        if(!running)
            interrupt();
        else
            killed = true;
    }
  
    public void killSync()...{
        kill();
        while(isAlive())...{
            try ...{
                sleep(5);
            } catch (InterruptedException e) ...{
            }
        }
    }
  
    public synchronized void startTasks()...{
        running = true;
        this.notify();
    }
  
    public synchronized void run()...{
        try...{
            while(true)...{
                if(!running || tasks.size() == 0)...{
                    pool.notifyForIdleThread();
                    //System.out.println(Thread.currentThread().getId() + ": 空闲");
                    this.wait();
                }else...{
                    ThreadTask task;
                    while((task = popTask()) != null)...{
                        task.run();
                        if(stopped)...{
                            stopped = false;
                            if(tasks.size() > 0)...{
                                tasks.clear();
                                System.out.println(Thread.currentThread().getId() + ": Tasks are stopped");
                                break;
                            }
                        }
                        if(paused)...{
                            paused = false;
                            if(tasks.size() > 0)...{
                                System.out.println(Thread.currentThread().getId() + ": Tasks are paused");
                                break;
                            }
                        }
                    }
                    running = false;
                }

                if(killed)...{
                    killed = false;
                    break;
                }
            }
        }catch(InterruptedException e)...{
            return;
        }
      
        //System.out.println(Thread.currentThread().getId() + ": Killed");
    }
}

//ThreadPool.java

package polarman.threadpool;

import java.util.Collection;
import java.util.Iterator;
import java.util.Vector;

/** *//**
 * 线程池
 * @author ryang
 * 2006-8-8
 */
public class ThreadPool ...{
  
    protected int maxPoolSize;
    protected int initPoolSize;
    protected Vector threads = new Vector();
    protected boolean initialized = false;
    protected boolean hasIdleThread = false;
  
    public ThreadPool(int maxPoolSize, int initPoolSize)...{
        this.maxPoolSize = maxPoolSize;
        this.initPoolSize = initPoolSize;
    }
  
    public void init()...{
        initialized = true;
        for(int i=0; i<initPoolSize; i++)...{
            PooledThread thread = new PooledThread(this);
            thread.start();
            threads.add(thread);
        }
      
        //System.out.println("线程池初始化结束,线程数=" + threads.size() + " 最大线程数=" + maxPoolSize);
    }
  
    public void setMaxPoolSize(int maxPoolSize)...{
        //System.out.println("重设最大线程数,最大线程数=" + maxPoolSize);
        this.maxPoolSize = maxPoolSize;
        if(maxPoolSize < getPoolSize())
            setPoolSize(maxPoolSize);
    }
  
    /** *//**
     * 重设当前线程数
     * 若需杀掉某线程,线程不会立刻杀掉,而会等到线程中的事务处理完成
     * 但此方法会立刻从线程池中移除该线程,不会等待事务处理结束
     * @param size
     */
    public void setPoolSize(int size)...{
        if(!initialized)...{
            initPoolSize = size;
            return;
        }else if(size > getPoolSize())...{
            for(int i=getPoolSize(); i<size && i<maxPoolSize; i++)...{
                PooledThread thread = new PooledThread(this);
                thread.start();
                threads.add(thread);
            }
        }else if(size < getPoolSize())...{
            while(getPoolSize() > size)...{
                PooledThread th = (PooledThread)threads.remove(0);
                th.kill();
            }
        }
      
        //System.out.println("重设线程数,线程数=" + threads.size());
    }
  
    public int getPoolSize()...{
        return threads.size();
    }
  
    protected void notifyForIdleThread()...{
        hasIdleThread = true;
    }
  
    protected boolean waitForIdleThread()...{
        hasIdleThread = false;
        while(!hasIdleThread && getPoolSize() >= maxPoolSize)...{
            try ...{
                Thread.sleep(5);
            } catch (InterruptedException e) ...{
                return false;
            }
        }
      
        return true;
    }
  
    public synchronized PooledThread getIdleThread()...{
        while(true)...{
            for(Iterator itr=threads.iterator(); itr.hasNext();)...{
                PooledThread th = (PooledThread)itr.next();
                if(!th.isRunning())
                    return th;
            }
          
            if(getPoolSize() < maxPoolSize)...{
                PooledThread thread = new PooledThread(this);
                thread.start();
                threads.add(thread);
                return thread;
            }
          
            //System.out.println("线程池已满,等待...");
            if(waitForIdleThread() == false)
                return null;
        }
    }
  
    public void processTask(ThreadTask task)...{
        PooledThread th = getIdleThread();
        if(th != null)...{
            th.putTask(task);
            th.startTasks();
        }
    }
  
    public void processTasksInSingleThread(ThreadTask[] tasks)...{
        PooledThread th = getIdleThread();
        if(th != null)...{
            th.putTasks(tasks);
            th.startTasks();
        }
    }
  
    public void processTasksInSingleThread(Collection tasks)...{
        PooledThread th = getIdleThread();
        if(th != null)...{
            th.putTasks(tasks);
            th.startTasks();
        }
    }
}


下面是线程池的测试程序
//ThreadPoolTest.java

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import polarman.threadpool.ThreadPool;
import polarman.threadpool.ThreadTask;

public class ThreadPoolTest ...{

    public static void main(String[] args) ...{
        System.out.println(""quit" 退出");
        System.out.println(""task A 10" 启动任务A,时长为10秒");
        System.out.println(""size 2" 设置当前线程池大小为2");
        System.out.println(""max 3" 设置线程池最大线程数为3");
        System.out.println();
       
        final ThreadPool pool = new ThreadPool(3, 2);
        pool.init();
       
        Thread cmdThread = new Thread()...{
            public void run()...{
               
                BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
               
                while(true)...{
                    try ...{
                        String line = reader.readLine();
                        String words[] = line.split(" ");
                        if(words[0].equalsIgnoreCase("quit"))...{
                            System.exit(0);
                        }else if(words[0].equalsIgnoreCase("size") && words.length >= 2)...{
                            try...{
                                int size = Integer.parseInt(words[1]);
                                pool.setPoolSize(size);
                            }catch(Exception e)...{
                            }
                        }else if(words[0].equalsIgnoreCase("max") && words.length >= 2)...{
                            try...{
                                int max = Integer.parseInt(words[1]);
                                pool.setMaxPoolSize(max);
                            }catch(Exception e)...{
                            }
                        }else if(words[0].equalsIgnoreCase("task") && words.length >= 3)...{
                            try...{
                                int timelen = Integer.parseInt(words[2]);
                                SimpleTask task = new SimpleTask(words[1], timelen * 1000);
                                pool.processTask(task);
                            }catch(Exception e)...{
                            }
                        }
                       
                    } catch (IOException e) ...{
                        e.printStackTrace();
                    }
                }
            }
        };
       
        cmdThread.start();
        /**//*
        for(int i=0; i<10; i++){
            SimpleTask task = new SimpleTask("Task" + i, (i+10)*1000);
            pool.processTask(task);
        }*/
    }

}

class SimpleTask implements ThreadTask...{
   
    private String taskName;
    private int timeLen;
   
    public SimpleTask(String taskName, int timeLen)...{
        this.taskName = taskName;
        this.timeLen = timeLen;
    }
   
    public void run() ...{
        System.out.println(Thread.currentThread().getId() +
                ": START TASK "" + taskName + """);
        try ...{
            Thread.sleep(timeLen);
        } catch (InterruptedException e) ...{
        }
       
        System.out.println(Thread.currentThread().getId() +
                ": END TASK "" + taskName + """);
    }
   
}

使用此线程池相当简单,下面两行代码初始化线程池:

ThreadPool pool = new ThreadPool(3, 2);
pool.init();

要处理的任务实现ThreadTask...接口即可(如测试代码里的SimpleTask),这个接口只有一个方法run()
两行代码即可调用:

ThreadTask task = ... //实例化你的任务对象
pool.processTask(task);

本文来自CSDN博客,转载请标明出处:http://blog.csdn.net/coofucoo/archive/2007/09/29/1806529.aspx

分享到:
评论

相关推荐

    java实现通用线程池的代码

    本文旨在使用Java语言编写一个通用的线程池。当需要使用线程池处理事务时,只需按照指定规范封装好事务处理对象,然后用已有的线程池对象去自动选择空 闲线程自动调用事务处理对象即可。并实现线程池的动态修改...

    java实现通用的线程池

    java实现通用的线程池,这是我网上找的资料,O(∩_∩)O~希望大家能用的到。

    一个通用的Java线程池类

    环境:Windows XP ...这里本人翻写一个通用的线程池类,它可以用来作为工具类处理许多多线程问题。代码注释非常详尽,一行注释一行代码。 阅读对象:非常熟悉Java的基本概念,并且熟悉命令行编写代码的人员。

    一个可以直接使用的通用线程池Util

    使用步骤: 1.下载解压之后,在控制台运行javac ThreadPoolTest.java 2.然后根据提示运行java命令...这里本人翻写一个通用的线程池类,它可以用来作为工具类处理许多多线程问题。代码注释非常详尽,一行注释一行代码。

    java中通用的线程池实例代码

    java中通用的线程池实例代码,需要的朋友可以参考一下

    通用多线程模块(jdk线程池的运用)

    介绍一个通用多线程服务模块。是利用jdk线程池,多线程并行处理多任务,以提高执行效率。

    Java多线程读取大文本文件并批量插入MongoDB的实战代码

    Java多线程读取大文本文件并批量插入MongoDB的代码,文本文件,csv文件,可以结合POI改造使其支持excel。 适合做大量文本数据或日志文件...包含Main方法调用案例,基于接口的通用设计,业务模块可自定义实现具体逻辑。

    线程 JAVA java线程 java线程第3版 java线程第2版第3版合集

    本版本还增加了广泛的例子,展示如何实现线程池和其他同步技术,如条件变量、屏障和守护锁。它展示了如何与非线程安全的类共同工作,并特别关注于Swing的线程问题。新增加的一章介绍了如何为多处理器机器编写并行...

    通用Android工具库Common4Android.zip

    Bitmap特效实现类,封装bitmap特效实现方法,如:老照片、RGB偏移等。 BitmapUtil.java Bitmap常用工具类,Bitmap数据类型转换、圆角、缩放、倒影。 ...

    JAVA面试题最全集

    编写代码实现一个线程池 40.描述一下JVM加载class文件的原理机制? 41.试举例说明一个典型的垃圾回收算法? 42.请用java写二叉树算法,实现添加数据形成二叉树功能,并以先序的方式打印出来. 43.请写一个java...

    从使用到原理学习Java线程池

    在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收。所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁。如何利用已有对象来...

    基于Java子线程中的异常处理方法(通用)

    下面小编就为大家带来一篇基于Java子线程中的异常处理方法(通用)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧

    一种基于Java企业内部及时通讯软件设计.doc

    (3) 实现基于线程池的多端口监听。 (4) 实现客户端间的文字、文件信息共享。 2.2功能需求分析 (1)用户管理。即时通讯系统拥有多个账户,允许多个用户注册。一个用户可以注 册多个标识,注册所使用的帐号类型...

    threadly:一个工具库,用于协助安全的并发Java开发。 提供独特的基于优先级的线程池,以及安全分配线程工作的方式

    它被设计为对java.util.concurrent的补充,并使用java.util.concurrent帮助在有意义的情况下辅助其实现。 从Maven Central将线程库包含到您的项目中: &lt;groupId&gt;org.threadly &lt;artifactId&gt;threadly ...

    IOIF基于开源技术的JAVA开发框架

    实现的主要功能有:客户端Frame、客户端VAD、异常处理、错误处理、AOP日志管理、计划任务、操作日志管理、数据分页功能、线程池管理、内存缓存管理。主要工具有:系统日志工具、应用属性配置工具、系统属性配置工具...

    Java后端知识+Java并发系统设计

    带着你了解了高并发系统设计的三种通用方法:Scale-out、缓存和异步。这三种方法可以在做方案设计时灵活地运用,但它不是具体实施的方案,而是三种思想,在实际运用中会千变万化。 了解了性能的原则、度量指标,以及...

Global site tag (gtag.js) - Google Analytics