`

Java线程(第三版)

阅读更多

1. Thread生命周期

// 创建Thread
extends Thread或者implements Runnable接口

// 启动Thread
thread.start()
isAlive(): 可以判断该Thread是否终结

// Thread终结
run()方法执行到return语句,执行到代码最后一行,抛出一个异常

// Thread加入
join(): 用于一个开始执行独立task的thread,来观察该thread是否完成,但是要小心block

// 停止Thread - 设定标记
private volatile boolean done = false;
public void run() {
    while(!done) {}
}
public void setDone() {done=true;}
producer.setDone();

// 停止Thread - 中断
public void run() {
    while(!isInterrupted) {}
}
producer.interrupt();

// 取得当前thread的引用
Thread.currentThread();

 

2. 数据同步

// volatile
对变量进行原子操作,使其不会有中间状态出现
可以保证变量每次都从主存储器中读出,但递增和递减不能用于volatile

// 锁机制 - java.util.concurrent.locks.Lock
private Lock myLock = new ReentrantLock();
try {
    myLock.lock();
    //todo
} finally {myLock.unlock();} 
定义一个Lock可以多个方法使用,那么意味着这些方法不能异步执行,因为只有一个方法获得锁
每次只有一个方法能够执行

// synchronized也可以锁住快
synchronized(this) {...} 但同步块一般就不能跨方法使用

// Nested Lock
两个不同类的方法相互调用,那么可以使用ReentrantLock来解决此问题,在进入同步块时候,
该方法会判定lock是否获得,而不会产生死锁。ReentrantLock的常见方法
getHoldCount(): 返回当前thread对lock所要求的数量
isLocked(): 是否thread获取lock
isHeldByCurrentThread(): 该lock是否由当前thread所有
getQueueLength(): 多少thread在等待取得该lock的估计值

// tryLock() - 返回值可以判定是否获取lock
private Lock myLock = new ReentrantLock();
try {
    if (myLock.tryLock()) {
        // todo something
    } else {
        // todo other task    
    }
} finally {myLock.unlock();} 

// 解决死锁
public void run() {
    while (!getDone()) {
      //todo task
    }
}
public synchronized boolean getDone() {
    return done;
}
public synchronized void setDone(boolean b) {
    done = b;
}

// 公平锁
private Lock myLock = new ReentrantLock(true);
会以接近发出时间的顺序来执行

 

3. Thread Notifiaction

// wait()与notify()
public synchronized void run() {
    while (true) {
        try {
            if (done) {
                wait();
            }
        }
    }
}
public synchronized void setDone(boolean b) {
    done = b;
    if (!done) {
        notify();
    }
}

// 使用synchronized块来执行等待通知机制
wait()和notify()必须执行在同步块或同步锁中
private Object doneLock = new Object();
public void run() {
    synchronized(doneLock) {
        while (true) {
            try {
                if (done) {
                    doneLock.wait();
                } else {
                    repaint();
                    doneLock.wait(100);
                }
            }
        }
    }
}
public void setDone(boolean b) {
    synchronized(doneLock) {
    done = b;
    if (!done)
        doneLock.notify();
    }
}

// 条件变量
Condition应该和Lock绑定在一起
使用await()和signal()代替等待通知机制
private boolean done = true;
private Lock lock = new ReentrantLock();
private Condition cv = lock.newCondition();
public void run() {
    try {
        lock.lock();
        while (true) {
            try {
                if (done) {
                    cv.await();
                } else {
                    nextCharacter();
                    cv.await(getPauseTime(), TimeUnit.MILLISECONDS);
                }
            }
        }
    } finally {
        lock.unlock();
    }
}
public void setDone(boolean b) {
    try {
        lock.lock();
        done = b;
        if (!done)
            cv.signal();
        }
    } finally {
        lock.unlock();
    }
}

 

4. 极简同步技巧

 // 变量存储原理
CPU将数据从主存储器中读到寄存器中,对存储器操作,然后将数据存回主存储器
volatile变量仅能用于单一的载入和存储操作,所以5.0提供了atomic class来代替

// Atomic Class
AtomicInteger, AtomicLong, AtomicBoolean, AtomicReference
AtomicInteger score = new AtomicInteger(0);
AtomicReference<CharacterSource> generator = new AtomicReference<CharacterSource>;
generator.getAndSet(newGenerator); // 只有一个Thread取得值并设置该值
generator.get(); // 取得旧值,然后使用compareAndSet()改变旧值
如果需要对一批数据操作,可以把一批数据放入Class,然后操作Classic为Atomic

// ThreadLocal
对变量设置为ThreadLocal,可以保证其他Thread不能访问,因而不需要同步,也没有竞争
public abstract class Calculator {
    protected abstract Object doLocalCalculate(Object param);
    private static ThreadLocal<HashMap> results = new ThreadLocal<HashMap>() {
        protected HashMap initialValue() {
            return new HashMap();
        } 
    };
    public Object calculate(Object param) {
        HashMap hm = results.get();
        Object o = hm.get(param);
        if (o != null) 
            return o;
        o = doLocalCalculate(param);
        hm.put(param, o);
        return o;
    }
}

public class CalculatorTest extends Calculator implements Runnable {
    public static void main(String[] args) {
        int nThreads = 5;
        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread(new CalculatorTest());
            t.start();
        }
    }
    public void run() {
        for (int i = 0; i < 30; i++) {
            Integer p = new Integer(i % 5);
            calculate(p);
        }
    }
    protected Object doLocalCalculate(Object p) {
        System.out.println("Doing calculation of " + p + " in thread " + Thread.currentThread());
        return p;
    }
}

 

5. 高级同步议题 

// 术语
Barrier(屏障): 多个Thread集合,所有Thread到齐才允许继续下去
Condition variable(条件变量): 与Lock相关的变量,用于等待通知机制
Critical section(临界区): synchronized method或block
Event variable(事件变量): 条件变量的另一名称
Monitor(监视器): 有些系统是一个Lock,有些系统类似等待通知机制
Mutex(互斥): Lock的另个名称
Semaphore(信号量): 同Lock的功能,能锁住对象

// Semaphore
等同于带计数器的Lock
acquire()与release()类似Lock中lock()与unlock()方法
与Lock的区别:构造器的时候 Semaphore(long permits),需要指定可以被允许的数目

// Barrier 
可以有两种方法实现Barrier同样的功能
1. 设置Thread等待条件变量,最后达到的Thread通知所有的Thread
2. 使用join()来等待Thread终结
CyclicBarrier(int parties) //构造Barrier
await() //等待

// Latch
和Barrieer具有相同功能
CountDownLatch(int count) //构造
await() //等待
countDown() //递减计数,当达到0,所有等待的Thread释放

// 读写Lock
Lock lock = new ReentrantReadWriteLock();
lock.writeLock();
lock.readLock();

// 防止死锁的最佳实践是按照顺序获得Lock

 

6. Collection Class

// java.util.concurrent容器
ConcurrentHashMap: 实现无序Map
ConcurrentLinkedQueue: 无限的FIFO Queue
ArrayBlockingQueue: 有限FIFO Queue
LinkedBlockingQueue: 可是是有限或无线FIFO Queue
SynchronousQueue: 有限的FIFO Queue
PriorityBlockingQueue: 优先级的Queue
DelayQueue: getDelay()取出到期的的元素,没有到期不会取出

// Collections提供同步容器
synchronizedList(list)
synchronizedMap(map)
synchronizedSet(set)

// Iterator与Enumeration
最好使用synchronize过的容器来迭代

// 生产者/消费者模式
/**生产者*/
public class FibonacciProducer implements Runnable {
    private Thread thr;
    private BlockingQueue<Integer> queue;
    public FibonacciProducer(BlockingQueue<Integer> q) {
        queue = q;
        thr = new Thread(this);
        thr.start();
    }
    public void run() {
        try {
            for(int x=0;;x++) {
                Thread.sleep(1000);
                queue.put(new Integer(x));
                System.out.println("Produced request " + x);
            }
        } catch (InterruptedException ex) {
        }
    }
}
/**消费者*/
public class FibonacciConsumer implements Runnable {
    private Fibonacci fib = new Fibonacci();
    private Thread thr;
    private BlockingQueue<Integer> queue;
    public FibonacciConsumer(BlockingQueue<Integer> q) {
        queue = q;
        thr = new Thread(this);
        thr.start();
    }
    public void run() {
        int request, result;
        try {
            while (true) {
                request = queue.take().intValue();
                result = fib.calculateWithCache(request);
                System.out.println("Calculated result of " + result + " from " + request);
            }
        } catch (InterruptedException ex) {
        }
    }
}
/**测试*/
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);
new FibonacciProducer(queue);

int nThreads = Integer.parseInt(args[0]);
for (int i = 0; i < nThreads; i++)
    new FibonacciConsumer(queue);
    
// 建议
使用Collection class的时候,通过接口来运用
对有竞争的算法,考虑并发的Collection

  

7. Thread调度

// Thread优先级
Thread.MAX_PRIORITY;
Thread.MIN_PRIORITY;
Thread.NORM_PRIORITY;
Thread t = new Thread();
t.setPriority(0);
t.getPriority();
每个操作系统优先级都不同

  

8. ThreadPool

// ThreadPoolExecutor
execute(Runable task): 执行task  
shutdown(): 任何送到Executor的task允许执行,不接受新的task
shutdownNow(): 没有启动的task不会执行
<T> Future<T> submit(Callable<T> task): 有返回值
<T> Future<T> submit(Runnable task): 有返回值
<T> Future<T> invokeAll(Collection<Callable<T>> tasks): 执行所有task
purge(): 查看整个Queue并删除任何被取消的对象

// 定义ThreadPool
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> workQueue)
corePoolSize: 线程池维护线程的最少数量
maximumPoolSize: 线程池维护线程的最大数量
keepAliveTime: 线程池维护线程所允许的空闲时间,如果指定为0,则Thread不会等待就离开
unit: 线程池维护线程所允许的空闲时间的单位
workQueue: 线程池所使用的缓冲队列
一般用LinkedBlockingQueue作为workQueue,不要直接调用该Queue的任何方法,不然ThreadPool的内部运作很混淆
实例:
portalThreadPool = new ThreadPoolExecutor(20, 50, 0L, TimeUnit.MILLISECONDS, 
new LinkedBlockingQueue<Runnable>());

// Callable可以返回结果或者抛出checked异常

  

9. Task调度

// TimerTask
cancel(): 取消此计时器任务
run(): 此计时器任务要执行的操作 
scheduledExecutionTime(): 返回此任务最近实际执行的已安排执行时间

// Timer
cancel(): 终止此计时器,丢弃所有当前已安排的任务 
purge(): 从此计时器的任务队列中移除所有已取消的任务 
schedule(TimerTask task, Date time) 
安排在指定的时间执行指定的任务 
schedule(TimerTask task, Date firstTime, long period) 
安排指定的任务在指定的时间开始进行重复的固定延迟执行 
schedule(TimerTask task, long delay) 
安排在指定延迟后执行指定的任务 
schedule(TimerTask task, long delay, long period) 
安排指定的任务从指定的延迟后开始进行重复的固定延迟执行 
scheduleAtFixedRate(TimerTask task, Date firstTime, long period) 
安排指定的任务在指定的时间开始进行重复的固定速率执行
scheduleAtFixedRate(TimerTask task, long delay, long period) 
安排指定的任务在指定的延迟后开始进行重复的固定速率执行

// ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor(int corePoolSize) 
使用给定核心池大小创建一个新 ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) 
使用给定初始参数创建一个新 ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) 
使用给定的初始参数创建一个新 ScheduledThreadPoolExecutor 
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) 
使用给定初始参数创建一个新 ScheduledThreadPoolExecutor
execute(Runnable command): 使用所要求的零延迟执行命令
schedule(Callable<V> callable, long delay, TimeUnit unit) 
创建并执行在给定延迟后启用的 ScheduledFuture。 
schedule(Runnable command, long delay, TimeUnit unit) 
创建并执行在给定延迟后启用的一次性操作 
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) 
创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,
然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) 
创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟

// Callable使用
class TimeoutTask implements Callable {
    public Integer call() throws IOException {
        return new Integer(0);
    }
}
ScheduledThreadPoolExecutor ste = new ScheduledThreadPoolExecutor(20);
Future<Integer> fTaskResult = ste.scheduleAtFixedRate(new TimeoutTask(), 0L, 5L, TimeUnit.SECONDS);

  

10. I/O

// NIO
使用一个Thread来处理所有的客户端Socket
Selector追踪集合点的Scoket和所有开放的客户端Socket
当其中任何Socket有数据,Selector会被通知,未处理数据
的Socket会通过selectedKeys()方法返回
public abstract class TCPNIOServer implements Runnable {
    protected ServerSocketChannel channel = null;
    private boolean done = false;
    protected Selector selector;
    protected int port = 8000;

    public void startServer() throws IOException {
        channel = ServerSocketChannel.open();
        channel.configureBlocking(false);
        ServerSocket server = channel.socket();
        server.bind(new InetSocketAddress(port));
        selector = Selector.open();
        channel.register(selector, SelectionKey.OP_ACCEPT);
    }

    public synchronized void stopServer() throws IOException {
        done = true;
        channel.close();
    }

    protected synchronized boolean getDone() {
        return done;
    }

    public void run() {
        try {
            startServer();
        } catch (IOException ioe) {
            System.out.println("Can't start server:  " + ioe);
            return;
        }
        while (!getDone()) {
            try {
                selector.select();
            } catch (IOException ioe) {
                System.err.println("Server error: " + ioe);
                return;
            }
            Iterator it = selector.selectedKeys().iterator();
            while (it.hasNext()) {
                SelectionKey key = (SelectionKey) it.next();
                if (key.isReadable() || key.isWritable()) {
                    // Key represents a socket client
                    try {
                        handleClient(key);
                    } catch (IOException ioe) {
                        // Client disconnected
                        key.cancel();
                    }
                } else if (key.isAcceptable()) {
                    try {
                        handleServer(key);
                    } catch (IOException ioe) {
                        // Accept error; treat as fatal
                        throw new IllegalStateException(ioe);
                    }
                } else System.out.println("unknown key state");
                it.remove();
            }
        }
    }

    protected void handleServer(SelectionKey key) throws IOException {
         SocketChannel sc = channel.accept();
         sc.configureBlocking(false);
         sc.register(selector, SelectionKey.OP_READ);
         registeredClient(sc);
     }

    protected abstract void handleClient(SelectionKey key) throws IOException;
    protected abstract void registeredClient(SocketChannel sc) throws IOException;
}

以上是采用单Thread来处理,多Thread使用Thread Pool,在请求送到服务器时,
handleClient()方法将请求放进thread pool的Queue中,依次处理
public class CalcServer extends TCPNIOServer {
    static ThreadPoolExecutor pool;

    class FibClass implements Runnable {
        long n;
        SocketChannel clientChannel;
        ByteBuffer buffer = ByteBuffer.allocateDirect(8);

        FibClass(long n, SocketChannel sc) {
            this.n = n;
            clientChannel = sc;
        }

        private long fib(long n) {
            if (n == 0)
                return 0L;
            if (n == 1)
                return 1L;
            return fib(n - 1) + fib(n - 2);
        }

        public void run() {
            try {
                long answer = fib(n);
                buffer.putLong(answer);
                buffer.flip();
                clientChannel.write(buffer);
                if (buffer.remaining() > 0) {
                    Selector s = Selector.open();
                    clientChannel.register(s, SelectionKey.OP_WRITE);
                    while (buffer.remaining() > 0) {
                        s.select();
                        clientChannel.write(buffer);
                    }
                    s.close();
                }
            } catch (IOException ioe) {
                System.out.println("Client error " + ioe);
            }
        }
    }

    protected void handleClient(SelectionKey key) throws IOException {
        SocketChannel sc = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocateDirect(8);
        sc.read(buffer);
        buffer.flip();
        long n = buffer.getLong();
        FibClass fc = new FibClass(n, sc);
        pool.execute(fc);
    }

    protected void registeredClient(SocketChannel sc) {
    }

    public static void main(String[] args) throws Exception {
        CalcServer cs = new CalcServer();
        cs.port = Integer.parseInt(args[0]);
        int tpSize = Integer.parseInt(args[1]);
        pool = new ThreadPoolExecutor(
                        tpSize, tpSize, 50000L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<Runnable>());
        cs.run();
        System.out.println("Calc server waiting for requests...");
    }
}

  

11. 其他Thread议题

// Thread Group
System thread group - Main thread group - Applet thread group
使用group可以对group中所有thread进行操作

// Thread与Security
checkAccess(Thread t): 如果不允许调用线程修改thread参数,则抛出SecurityException 
checkAccess(ThreadGroup g): 如果不允许调用线程修改线程组参数,则抛出SecurityException
public void interrupt() {
    SecurityManager sm = System.getSecurityManager();
    if (sm != null)
        sm.checkAccess(this);
} 

permission java.security.AllPermission;
permission java.security.RuntimePermission "thread";
permission java.lang.RuntimePermission "stopThread"; //stop方法特殊

// Daemon Thread
setDaemon(true);

// Thread与Class加载
getContextClassLoader()

// Thread与异常
Thread的run方法不能抛出未检查型异常,所以需要使用UncaugthExceptionHandler来处理
try {
    AppTimeoutThread timeThread = new AppTimeoutThread(10000, new MSCtrlTimeoutException(
        "subSessionIsLoggedOn()")); //超时
    timeThread.setUncaughtExceptionHandler(new ChallengeTimeoutHanlder(soapApiMsg,
        reqPortalMsg, soapAdapter));
    timeThread.start();
    soapApiMsg = soapAdapter.subSessionIsLoggedOn(reqPortalMsg.getUserIp());
    timeThread.cancel();
} catch (Exception e) {
    log.error("call subSessionIsLoggedOn() timeout", e);
}
class ChallengeTimeoutHanlder implements UncaughtExceptionHandler {
    private PortalMsg soapApiMsg;
    private PortalMsg reqPortalMsg;
    private SOAPAdapterI soapAdapter;
    public ChallengeTimeoutHanlder(PortalMsg soapApiMsg, PortalMsg reqPortalMsg,
        SOAPAdapterI soapAdapter) {
        this.soapApiMsg = soapApiMsg;
        this.reqPortalMsg = reqPortalMsg;
        this.soapAdapter = soapAdapter;
    }
    public void uncaughtException(Thread t, Throwable e) {
        soapApiMsg = soapAdapter.subSessionIsLoggedOn(reqPortalMsg.getUserIp());
    }
}

// Thread与内存
Java的stack默认为1024KB
Linux与Windows程序最大内存为2GB
Solaris为4GB
java -Xss128k MyClass  //指定Stack尺寸

  

12. Thread性能

// JVM参数
Solaris  -Xms3500m -Xmx3500m
Intel  -Xmn1800m -Xms1800m

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics