`

【线程】线程并发的一些高级API

    博客分类:
  • Java
 
阅读更多

Lock&Condition

 

Executor

任务执行器,通过线程池来完成任务的执行。

使用线程池的好处:

仅需维护一定数量的线程去执行任务,降低频繁创建或销毁线程而带来的性能损耗。

 

interface ExecutorService extends Executor

interface ScheduledExecutorService extends ExecutorService

 

Executor  属于顶层接口,仅提供了一个执行线程任务的方法

void execute(Runnable command);

 

ExecutorService 接口则定义了更多适用的方法

 <T> Future<T> submit(Callable<T> task);

 <T> Future<T> submit(Runnable task, T result);

 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);//批量任务

 

 

创建线程池,使用Executors

//1.单一线程

ExecutorService executor = Executors.newSingleThreadExecutor();

//2.可无限创建新线程,一般用于请求多但处理时间短的情况

ExecutorService executor = Executors.newCachedThreadPool();

//3.固定线程数量的线程池

ExecutorService executor = Executors.newFixedThreadPool(nThreads);

 

//4. 按指定频率执行任务的线程池

ScheduledExecutorService service = Executors.newScheduledThreadPool(corePoolSize);

 

扩展:

手动配置线程池的参数

ExecutorService executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);

 

Callable & Future 的用法

package org.thread;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;

public class App {
	
	ExecutorService executor = Executors.newSingleThreadExecutor();
	ArchiveSearcher searcher = new ArchiveSearcher();

	void showSearch(final String target) throws InterruptedException {
		//提交任务1到线程池中执行
		Future<String> future = 
			executor.submit(new Callable<String>() {
				@Override
				public String call() throws Exception {
					return searcher.search(target);
				}
		});
		
		//任务2由当前线程负责执行
		displayOtherThings();
		
		//当任务2执行完成后,当前线程尝试获取任务1的执行结果
		try {
			disPlayText(future.get());
			executor.shutdown();
		} catch (ExecutionException e) {
			e.printStackTrace();
			cleanup();
			return;
		}
	}

	private void cleanup() {
		executor.shutdownNow();
	}

	private void disPlayText(String text) {
		System.out.println("Result from future is :" + text);
	}

	private void displayOtherThings() {
		for(int i=0;i<10;i++) {
			System.out.println(Thread.currentThread().getName() + "..." + i);
		}
	}

	public static void main(String[] args) throws InterruptedException {
		new App().showSearch("UFO");
	}
}

class ArchiveSearcher {
	public String search(String target) {
		int mills = ThreadLocalRandom.current().nextInt(10000);
		System.out.format(Thread.currentThread().getName() + " sleep %s ms%n", mills);
		try {
			Thread.sleep(mills);
		} catch (InterruptedException e) {}
		return "Hello Callable & Future! The target is: " + target;
	}
}

 

任务调度

package schedulePool;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ThreadScheduledTest {
	/**
	 * 线程调度中,同一时间段内只能有1个任务被调度.
	 * 如果前面的任务抛出了异常,后面的调度将不会再执行!
	 */
	public static void main(String[] args) throws Exception {
		ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
		System.out.println("start at: "+System.currentTimeMillis());
		
		/**
		 * AtFixedRate调度的基本规则:
		 * 		initialDelay之后开启第1个任务的调度
		 * 		第二个调度:initialDelay+period, 
		 * 		then initialDelay + 2 * period,
		 * 		...
		 * 需要注意的是:
		 * 	前一个任务执行所需时间大于period值时,Executor不会按period值调度下一个任务,
		 * 	而是等到前一个任务完成之后立即开始调度下一个任务
		 *  但,一般情况下线程调度的执行周期都是比较长的值,如1天执行1次
		 */
		executor.scheduleAtFixedRate(new Runnable() {
			@Override
			public void run() {
				long current = System.currentTimeMillis();
				System.out.println(Thread.currentThread().getName()+": " + current);
				try {
					Thread.sleep(4000);
				} catch (InterruptedException e) {}
				System.out.println(Thread.currentThread().getName()+": " + "Done "+System.currentTimeMillis());
			}
		}, 2, 3, TimeUnit.SECONDS);
		
		/**
		 * 以上一个任务完成的时间为计算基准,在此基础上延迟若干秒后才开始新的调度
		 * 前一个任务完成后 + delay ===> 开始调度下一个任务
		 */
		executor.scheduleWithFixedDelay(new Runnable() {
			@Override
			public void run() {
				long current = System.currentTimeMillis();
				System.out.println(Thread.currentThread().getName()+": " + current);
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {}
				System.out.println(Thread.currentThread().getName()+": " + "Done "+System.currentTimeMillis());
			}
		}, 2, 3, TimeUnit.SECONDS);
	}
}

 

 

 

并发集合

java.util.concurrent提供了一些集合框架的辅助类

BlockingQueue 任务列队

ArrayBlockingQueue LinkedBlockingQueue

ConcurrentMap

HashMap的并发模式:ConcurrentHashMap

TreeMap的并发模式:ConcurrentSkipListMap

 

对这些集合中的元素进行操作,会发生happens-before关系,能够有效避免内存一致性错误

 

原子操作

 

class Sequencer {
	
	private final AtomicLong sequenceNumber = new AtomicLong(0);

	public long next() {
		return sequenceNumber.getAndIncrement();
	}
}

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics