手动实现简单的线程池
写在前面:
本文使用了 BlockingQueue 作为线程池实现的数据结构,利用生产者/消费者思想进行多任务的处理。
实现方式比较简单,并没有完全实现所有方法,本文可作为线程池和同步队列的入门学习参考。
受限于博主的姿势水平,本文中的一些方法肯定存在优化的空间及更好的实现方式,欢迎探讨。
基于 spring-boot 编写,测试。
1. 自定义线程池接口
package com.getthrough.threadpool.mythreadpool; /** * <p>This interface is a top interface that defined several necessary methods, * it imitates {@link java.util.concurrent.ExecutorService}, * {@link java.util.concurrent.ThreadPoolExecutor} * for personal learning.</p> * @author: getthrough * @date: 2018/5/20 * @description: * @version: */ public interface ThreadPool { /** * to execute the given task in the future, * it can be executed by a thread or a thread pool. * @param runnable the given task */ void execute(Runnable runnable); /** * It will close the thread pool after all submitted tasked are executed, * and will not accept new tasks. */ void shutdown(); /** * test whether the thread pool has been shut down. * @return the boolean result. */ boolean isShutdown(); }
2. 线程池的默认实现
package com.getthrough.threadpool.mythreadpool.impl; import com.getthrough.threadpool.mythreadpool.ThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; /** * @author: getthrough * @date: 2018/5/20 * @description: * @version: */ public class DefaultThreadPool implements ThreadPool { public Logger logger = LoggerFactory.getLogger(DefaultThreadPool.class); /** * Workers queue, get the task from {@code tasks} and run the task. */ private BlockingQueue<Worker> workers = new LinkedBlockingQueue<>(DEFAULT_POOL_SIZE); /** * The queue to accept the tasks. */ private BlockingQueue<Runnable> tasks = new LinkedBlockingQueue(MAX_POOL_SIZE); private int corePoolSize = 0; private int maxPoolSize = 0; /** * How long will the worker waits(keep alive) for the task if there is no task in tasks. */ private volatile long aliveTime = 0L; /** * The default pool size. */ private static final int DEFAULT_POOL_SIZE = 20; /** * The maximum pool size. */ private static final int MAX_POOL_SIZE = 30; private volatile boolean isShutdown = false; public DefaultThreadPool() throws InterruptedException { this.corePoolSize = DEFAULT_POOL_SIZE; this.maxPoolSize = MAX_POOL_SIZE; new DefaultThreadPool(DEFAULT_POOL_SIZE, MAX_POOL_SIZE); } public DefaultThreadPool(int corePoolSize, int maxPoolSize) { if (corePoolSize <= 0 || maxPoolSize <= 0 || aliveTime < 0) throw new IllegalArgumentException("ERROR:arguments must greater than zero!"); if (corePoolSize > maxPoolSize) throw new IllegalArgumentException("ERROR:corePoolSize can't be greater than maxPoolSize!"); this.corePoolSize = corePoolSize; this.maxPoolSize = maxPoolSize; for (int i = 0; i < corePoolSize; i ++) { Worker worker = new Worker(getTask(0L)); workers.add(worker); worker.start(); } } @Override public void execute(Runnable runnable) { if (isShutdown) { logger.info("pool is closed, you should call start method"); return; } if (workers.size() < corePoolSize) { Worker worker = new Worker(runnable); workers.add(worker); worker.start(); logger.info("task is immediately got by work : {}", worker.getName()); } else if (workers.size() == corePoolSize) { try { tasks.put(runnable); logger.info("task waiting in the task queue..."); } catch (InterruptedException e) { logger.info("application is busy, please try again later!"); } } } @Override public void shutdown() { // reject the new task isShutdown = true; for(;;) { if (tasks.size() == 0){ // clear the work queue workers.clear(); break; } } logger.info("shutting down the pool"); } @Override public boolean isShutdown() { return workers.size() == 0; } private Runnable getTask(long timeOut) { try { return tasks.poll(timeOut, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } return null; } public void start() { isShutdown = false; } private class Worker extends Thread{ private Runnable task; Worker(Runnable task) { this.task = task; } @Override public void run() { while ((task != null || (task = getTask(60L)) != null)) { try { // if (!Thread.interrupted()) task.run(); logger.info("worker : {} has finished the task.", getName()); } finally { task = null; } } } } public int getCorePoolSize() { return corePoolSize; } public void setCorePoolSize(int corePoolSize) { this.corePoolSize = corePoolSize; } public int getMaxPoolSize() { return maxPoolSize; } public void setMaxPoolSize(int maxPoolSize) { this.maxPoolSize = maxPoolSize; } }
3. 简单的 main 方法测试
package com.getthrough.threadpool; import com.getthrough.threadpool.mythreadpool.ThreadPool; import com.getthrough.threadpool.mythreadpool.impl.DefaultThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; /** * @author: getthrough * @date: 2018/5/21 * @description: * @version: */ public class TestClass { private static Logger logger = LoggerFactory.getLogger(DefaultThreadPool.class); public static void main(String[] args) throws InterruptedException { ThreadPool threadPool = new DefaultThreadPool(); for (int i = 0; i < 22; i++) { threadPool.execute(()-> { logger.info("TASK produced"); }); try { TimeUnit.MILLISECONDS.sleep(50L); } catch (InterruptedException e) { e.printStackTrace(); } } TimeUnit.SECONDS.sleep(1L); threadPool.shutdown(); logger.info("shutdown : {}", threadPool.isShutdown()); threadPool.execute(new Runnable() { @Override public void run() { logger.info("task submit after shutdown"); } }); TimeUnit.SECONDS.sleep(1L); ((DefaultThreadPool)threadPool).start(); logger.info("thread pool restarted "); threadPool.execute(new Runnable() { @Override public void run() { logger.info("task submit after restart"); } }); TimeUnit.SECONDS.sleep(1L); threadPool.shutdown(); } }
完整代码获取:https://github.com/Getthrough/my-threadpool/tree/master
相关推荐
Java开发,Android开发,自己实现线程池,明白线程池的实现机制
易语言简易线程池的实现 ——V雪落有声V原创 转载请保留 前文: 为了能充分理解本篇文章的内容,需要了解的知识如下: 1.事件对象的使用:http://baike.baidu.com/view/751499.htm 2.信号量的使用:...
简单易用的线程池,可以异步或同步执行任务,支持functional 和 lambad表达式。 工具库 文件操作。 std::cout风格的日志库,支持颜色高亮、代码定位、异步打印。 INI配置文件的读写。 监听者模式的消息广播器。 基于...
分钟,可手动设置)主线程会自动关闭一部分的线程,保留最小线程数,来释放资源。 3.执行任务的线程等待队列,如果队列中有任务,则执行任务,如果队列中没有任务,则进入内核等待状态,当队列中有任务时继续执行。...
优点:相对简单,易于理解和实现;可以直接重写 run() 函数来定义线程的逻辑;可以通过信号和槽机制与其他对象进行通信。 缺点:对象和线程是紧密耦合的,可能导致设计上的限制和复杂性;不能方便地重复使用线程对象...
* 为什么不要手动线程池设置最大值? * .Net线程池有什么不足? 同步 * CLR怎样实现lock(obj)锁定? * WaitHandle是什么,他和他的派生类怎么使用 * 什么是用双锁实现Singleton,为什么要这样做,为什么有人说双锁...
• 为什么不要手动线程池设置最大值? • .Net线程池有什么不足? 同步 • CLR怎样实现lock(obj)锁定? • WaitHandle是什么,他和他的派生类怎么使用 • 什么是用双锁实现Singleton,为什么要这样做,为什么有人说...
• 为什么不要手动线程池设置最大值? • .Net线程池有什么不足? 同步 • CLR怎样实现lock(obj)锁定? • WaitHandle是什么,他和他的派生类怎么使用 • 什么是用双锁实现Singleton,为什么要这样做,为什么有人说...
1. 基于Aspectj实现动态数据源切换,支持类级、方法级,方法级优先于类级 2. 实现数据源动态注册、修改、删除,无需重启服务 3. 实现同时兼容多种数据源连接池 4. 实现动态数据源连接池属性配置 5. 采用双端队列实现...
手动实现一个可重入锁.mp4 AbstractQueuedSynchronizer(AQS)详解.mp4 使用AQS重写自己的锁.mp4 重入锁原理与演示.mp4 读写锁认识与原理.mp4 细读ReentrantReadWriteLock源码.mp4 ReentrantReadWriteLock锁降级详解....
java除了使用关键字synchronized外,还可以使用ReentrantLock实现独占锁的功能。而且ReentrantLock相比synchronized而言功能更加丰富,使用起来更为灵活,也更适合复杂的并发场景。这篇文章主要是从使用的角度来分析...
詹科夫以下是Jakob Jenkov的教程的练习示例,该教程位于Java并发这是Java并发部分... (使用手动实现的阻塞队列的示例)。 线程池。 (使用手工实现的线程池的示例)。 非阻塞算法。 9.1。 易挥发的。 (使用“一个作家
如何手动触发对象的垃圾回收? 什么是Java中的设计模式?列举一些常见的设计模式。 什么是Java中的单例模式?如何实现线程安全的单例模式? 什么是Java中的生命周期回调方法?列举一些常见的生命周期回调方法。 ...
积分管理系统java源码 基础知识 java基础 基本类型(占用的内存)和包装类型 数组和对象 程序控制语句,if、switch、while、for ...手动实现Mini版本的Mybatis 分布式 分布式原理 分布式架构的演进过
它允许手动发布和自动线程池将消息发布到Danmaku服务器上。 您可以调整发布间隔以及自定义模板danmaku池以进行顺序/随机生成。 没有我的最后一刻帮助,这个项目可能无法实现。 :D 该应用程序在单线程模式下的...
第21节手动实现一个可重入锁00:26:31分钟 | 第22节AbstractQueuedSynchronizer(AQS)详解00:49:04分钟 | 第23节使用AQS重写自己的锁00:31:04分钟 | 第24节重入锁原理与演示00:12:24分钟 | 第25节读写锁认识与原理...
第21节手动实现一个可重入锁00:26:31分钟 | 第22节AbstractQueuedSynchronizer(AQS)详解00:49:04分钟 | 第23节使用AQS重写自己的锁00:31:04分钟 | 第24节重入锁原理与演示00:12:24分钟 | 第25节读写锁认识与原理...
第21节手动实现一个可重入锁00:26:31分钟 | 第22节AbstractQueuedSynchronizer(AQS)详解00:49:04分钟 | 第23节使用AQS重写自己的锁00:31:04分钟 | 第24节重入锁原理与演示00:12:24分钟 | 第25节读写锁认识与原理...
第21节手动实现一个可重入锁00:26:31分钟 | 第22节AbstractQueuedSynchronizer(AQS)详解00:49:04分钟 | 第23节使用AQS重写自己的锁00:31:04分钟 | 第24节重入锁原理与演示00:12:24分钟 | 第25节读写锁认识与原理...