`
zhouxy1123
  • 浏览: 5839 次
文章分类
社区版块
存档分类
最新评论

一个例子教会你java 基本线程api

 
阅读更多

 


 这里提供一个java 代码例子并结合jps 和jstack来对java 基本线程api一探究竟。这里提供的代码(附在文后),与其说是例子,不如说是一个工具更合适些。你可以在eclipse下运行,也可以打成jar 包单独运行。它提供了一些命令,可以帮助我们理解java 的线程api,如果你在eclipse下运行,可以在控制台里使用这些命令,并通过jstack来观察线程发生了什么。

 

这些命令包括wait,建立一个wait 线程,使用object wait;notifyone ,建立一个notify one 线程,调用object notify 方法;notifyall 调用object notify all 方法;sleep,建立唯一sleep线程,调用sleep方法;join4sleeper,建立一个join线程,等待sleep 线程结束,调用join;wakeup,唤醒sleep 线程;lockall,获得锁,并持有锁,使用synchronized 块;unlock ,解锁;notifynaughty,notify all 但是不释放锁;stopnaughty,释放naughty 线程;waitwhile,有时间等待,调用 wait(time)。这里就是提供的基本命令。这里我们通过jstack来看一下java线程会是一个什么状态。

首先我们敲入wait,在通过jstack,会发现jstack显示线程状态为in object.wait()而Thread.state 为waiting(on object monitor)。

 

 

敲入waitwhile,使用jstack,会发现多一个线程,state为timed_waiting(on object monitor)。



 

再敲入notifyone,通过jstack 会发现少一个线程,控制台也会打出对应结束的等待线程。



 

再使用notifyone,会是另一停止等待。

可见每次notify 只会使一个等待线程结束等待。

我们再看一下notifyall,我们先执行多次wait,之后执行notifyall,发现控制台里打印出刚才的等待线程结束。可见object.notifyAll可以使“所有”的等待线程继续执行。这里的所有是指什么呢。为了理解这个,我们现要理解一个object monitor,在java里每个object实例都有一个monitor对象,用来管理线程同步,这个可以monitor看成是一个集合类实例,包含三部分,一个是获得这个monitor的 线程,另一个是包含所有尝试获得monitor,却被其他线程捷足先登的线程集合,和包含获得monitor 之后调用wait 方法的线程集。这里的所有就是指所有在调用wait 方法线程集内的线程。

是不是调用了notify,或notifyAll,in objet.wait()的线程就一定能继续执行呢?回答是不一定。

这里我们敲几次wait,之后执行,notifynaughty,发现wait 的线程并没有继续执行而结束,而是线程state 进入了Blocked 状态。这是怎么回事呢?这是因为,线程获得锁后执行wait,不是被notify了想继续执行就继续执行的,还要要从新获得monitor(monitor:“呔,舍弃了哥不是你说让哥回来就回来的”)。



 notifynaughty,调用了notifyAll 但是并没有释放锁。(这家伙是挺讨厌的,让别人看着干着急)。这里Thread.state Blocked 状态是说线程未获得monitor 而被阻塞。

还有一个问题是,如果你发现,你每次使用notifyone,最先wait 的线程先执行,是不是notify 会使最先wait的线程先执行呢,notify 实现了公平等待?答案是这只是一个错觉,看一下notify的api doc 会发现不是这样的,不要误会。

我们再看一下lockall,执行lockall,再执行wait,jstack 发现多出来的线程并不是在in object.wait()和waiting 状态而是waiting for monitor entry 和 Blocked的,这进一步说明了,先要调用wait 方法,先要获得固有锁,如果没有获得,那么就要进入Blocked状态。直到锁被放弃。

之后我们再看一下sleep,会发现又多了一个线程,线程状态是wait on condition,state 是 Timed_Waiting(sleeping) 。



 

之后使用join4sleeper,会产生等待线程,这个线程的状态和调用wait 方法的线程状态一样。

之后是使用wakeup,会发现前面的两个线程已经执行完成后退出了。可以看出join 线程是在sleep 线程之后的。join 可以用来控制线程间的执行顺序。此外在sleep 方法是使用锁很tricky的事,因为sleep 不会释放锁,如果sleep 睡太沉,中断线程要先获得锁,那么王子是救不了睡美人的。

到现在,你果你仔细,会发现每次调用jstack主线程都是在同一条语句,这是因为主线程在等待用户输入调用了inputsteam 的read 方法而卡主了线程。可以发现线程是卡住的,但是状态是runnable 的可见对于oio(old io 对应nio)是和线程状态没啥关系的,且不可中断。(对应nio 不是这样的,而且可以中断对应线程,可以看intercept 的java api doc)

此外在java 线程API 中是不是所有的状态有waiting 的线程都可以中断?如果在java 1.6下,是的。但是由于新引入了显示锁,结果就不是了。5 显示锁的状态不现实貌似是。而显示锁 reentrylock 相对于synchronized的状态则是waiting(parking) 而jstack的线程状态和和sleep 一样是wait on condition。

此外固有锁无法获得锁,那么其他线程只能等,和尾生似的,你不来,哥就敢等个地老天荒。reentrylock 除了提供这种功能外,还有其它三种,trylock:拿不到,哥不伺候了,走你。trylock(time):不要这么急么,先等会再说,3秒钟不能再多了。lockInterruptibly:哥才不像你们呢,哥要一直等下去,你说什么,不来了,哥走了。(被中断,退出锁定)。

还有显示锁的性能要比固有锁好。java 6 接近一倍(java concurrency in practice)。要是看到这里,你可能要跳脚了,前面讲的那么大堆关于固有锁的没有啊?并不是的,因为显示锁和固有锁有相同的语义,你明白synchronized,wait,notify 你就能明白 reentrylock 和condition的用法。

 

后面示例程序,have fun

 

 

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.atomic.AtomicInteger;

public class Main {

	public static void main(String[] args) {
		ThreadTest test = new ThreadTest();
		Thread sleeper = null;
		Thread naughty = null;
		BufferedReader reader = new BufferedReader(new InputStreamReader(
				System.in));
		while (true) {
			try {
				String command = reader.readLine();
				Command comm = null;
				try {
					comm = Enum.valueOf(Command.class, command.trim()
							.toLowerCase());
				} catch (Exception e) {
					System.out.println("there is no this command");
					continue;
				}
				switch (comm) {
				case exit:
					test.close();
					test.beginNotifyAllThread();
					if (sleeper != null) {
						sleeper.interrupt();
					}
					if (naughty != null) {
						naughty.interrupt();
					}
					System.out.println(comm.name());
					return;
				case wait:
					test.beginWaitThread();
					break;
				case notifyone:
					test.beginNotifyOneThread();
					break;
				case notifyall:
					test.beginNotifyAllThread();
					break;
				case notifynaughty:
					if (naughty != null) {
						naughty.interrupt();
					}
					naughty = test.beginNotifAllNaughty();
					break;
				case stopnaughty:
					if (naughty != null) {
						naughty.interrupt();
						naughty = null;
					} else {
						System.out.println("there is no naughty boy");
					}
				case sleep:
					if (sleeper == null) {
						sleeper = test.beginSleepThread();
					} else {
						System.out.println("there is already a sleeper");
					}
					break;
				case wakeup:
					if (sleeper != null) {
						sleeper.interrupt();
						sleeper = null;

					} else {
						System.out.println("there is no sleeper");
					}
					break;
				case lockall:
					test.beginLockAll();
					break;
				case unlock:
					test.beginUnLock();
					break;
				case waitwhile:
					test.beginWaiWhile();
					break;
				case join4sleeper:
					if (sleeper != null) {
						test.beginJoinForSleeper(sleeper);
					} else {
						System.out.println("there is no sleeper");
					}
					break;
				default:
					break;
				}

			} catch (IOException e) {
				System.out.println(e.getMessage());
			}
		}
	}

	public enum Command {
		exit, wait, notifyone, notifyall, sleep, wakeup, notifynaughty, lockall, unlock, waitwhile, join4sleeper, stopnaughty;

	}

	public static class ThreadTest {

		Object lock = new Object();
		volatile boolean wait = true;
		volatile boolean finish = false;
		volatile boolean locked = false;
		public static final int time = 5000;
		final Runnable notifyOneRun;
		final Runnable sleep;
		final Runnable waitRun;
		final Runnable notifyAllRun;
		final Runnable notifyAllNaughty;
		final Runnable lockAll;
		final Runnable unlock;
		final Runnable waitWhile;
		final AtomicInteger threadCount = new AtomicInteger();

		public ThreadTest() {
			notifyAllNaughty = new Runnable() {
				@Override
				public void run() {
					synchronized (lock) {
						System.out.println("notify all");
						wait = false;
						lock.notifyAll();
						try {
							Thread.sleep(Integer.MAX_VALUE);
						} catch (InterruptedException e) {

						}
					}

				}
			};

			waitWhile = new Runnable() {

				@Override
				public void run() {
					try {
						synchronized (lock) {
							wait = true;
							while (wait) {
								lock.wait(Integer.MAX_VALUE);
							}
						}
						System.out.println(new StringBuilder(Thread
								.currentThread().getName()).append("finish"));
					} catch (InterruptedException e) {
						System.out.println(new StringBuilder(Thread
								.currentThread().getName())
								.append(" wait finish"));
					}
				}

			};
			lockAll = new Runnable() {

				@Override
				public void run() {
					synchronized (lock) {
						System.out.println("lock all");
						locked = true;
						while (locked) {

						}
						System.out.println("unlocked now!");
						;
					}

				}
			};

			unlock = new Runnable() {

				@Override
				public void run() {
					System.out.println("unlock");
					locked = false;

				}
			};
			sleep = new Runnable() {

				@Override
				public void run() {
					try {
						System.out.println("begin sleep");
						while (!Thread.currentThread().isInterrupted()) {
							Thread.sleep(time);
						}
						System.out.println("should not happen");
					} catch (InterruptedException e) {
						System.out.println(new StringBuilder(Thread
								.currentThread().getName()).append(" wake up"));
					}

				}
			};
			waitRun = new Runnable() {
				@Override
				public void run() {
					synchronized (lock) {
						System.out.println("begin wait");
						try {
							wait = true;
							while (wait) {
								lock.wait();
							}
							System.out.println(new StringBuilder(Thread
									.currentThread().getName())
									.append("finish"));
						} catch (InterruptedException e) {
							System.out.println(new StringBuilder(Thread
									.currentThread().getName())
									.append(" wait finish"));
						}
					}

				}
			};

			notifyOneRun = new Runnable() {

				@Override
				public void run() {
					synchronized (lock) {
						System.out.println("notify one");
						wait = false;
						lock.notify();
					}

				}
			};

			notifyAllRun = new Runnable() {
				@Override
				public void run() {
					synchronized (lock) {
						System.out.println("notify all");
						wait = false;
						lock.notifyAll();
					}

				}
			};
		}

		public Thread beginWaitThread() {
			Thread t = new Thread(waitRun, "wait" + threadCount.addAndGet(1));
			t.start();
			return t;
		}

		public Thread beginNotifyOneThread() {
			Thread t = new Thread(notifyOneRun, "notifyOne"
					+ threadCount.addAndGet(1));
			t.start();
			return t;
		}

		public Thread beginNotifyAllThread() {
			Thread t = new Thread(notifyAllRun, "notifyAll"
					+ threadCount.addAndGet(1));
			t.start();
			return t;
		}

		public Thread beginSleepThread() {
			Thread t = new Thread(sleep, "sleep" + threadCount.addAndGet(1));
			t.start();
			return t;
		}

		public Thread beginLockAll() {
			Thread t = new Thread(lockAll, "lockAll" + threadCount.addAndGet(1));
			t.start();
			return t;
		}

		public Thread beginUnLock() {
			Thread t = new Thread(unlock, "unlock" + threadCount.addAndGet(1));
			t.start();
			return t;
		}

		public Thread beginWaiWhile() {
			Thread t = new Thread(waitWhile, "waitWhile"
					+ threadCount.addAndGet(1));
			t.start();
			return t;
		}

		public Thread beginNotifAllNaughty() {
			Thread t = new Thread(notifyAllNaughty, "notifyAllNaughty"
					+ threadCount.addAndGet(1));
			t.start();
			return t;
		}

		public Thread beginJoinForSleeper(final Thread sleeper) {
			Thread t = new Thread(new Runnable() {

				@Override
				public void run() {
					System.out.println("begin join");
					try {
						sleeper.join();
					} catch (InterruptedException e) {
					}

				}
			}, "beginJoinForSleeper waiting" + sleeper.getName()
					+ threadCount.addAndGet(1));
			t.start();
			return t;
		}

		public void close() {
			this.locked = false;
		}
	}

 

 

  • 大小: 104 KB
  • 大小: 98 KB
  • 大小: 141 KB
  • 大小: 44.1 KB
  • 大小: 69.9 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics