`
DavyJones2010
  • 浏览: 147891 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Java SE: Effective Java Notes "Concurrency"

阅读更多

Use the Advanced Concurrent Tools instead of the legacy wait() and notify().

Java Advanced Concurrent Tools:

    1> Executor Framework: ExecutorService, Callable, Executors ...

    2> Concurrent Collection: ConcurrentMap, BlockingQueue ... (Is there any ConcurrentSet*(1) and ConcurrentList*(2) ?)

    3> Synchronizer: CountDownLatch, Semaphore, CyclicBarrier, Exchanger, Locks: (ReentrantLock, ReadLock, WriteLock)...

 

Never call an alien method from within a synchronized region.

1) Example for ConcurrentModificationException:

public class ConcurrentModificationTest {
	@Test(expected=ConcurrentModificationException.class)
	public void test() {
		Set<String> set = Sets.newHashSet("A", "B", "C");
		for (String str : set) {
			if ("B".equals(str)) {
				set.remove(str);
			}
		}
	}
}

    We cannot modify the set when iterating it.

    But how did JDK inspect this kind of issue(ConcurrentModification)?

final Entry<K,V> nextEntry() {
    if (modCount != expectedModCount)
        throw new ConcurrentModificationException(); // Exception is thrown here
    Entry<K,V> e = next;
    if (e == null)
        throw new NoSuchElementException();

    if ((next = e.next) == null) {
        Entry[] t = table;
        while (index < t.length && (next = t[index++]) == null)
           ;
    }
    current = e;
    return e;
}

    We can know that this exception is not thrown when we execute set.remove(str), it is thrown when we iterate the element just after we executing remove(str);

    Thus we can run across this exception by just jump out of this for-loop. But this shortcut is limited(can only remove one element for the whole loop) and deprecated(we can use a new filteredSet to store the filtered elements instead).

@Test
public void test() {
	Set<String> set = Sets.newHashSet("A", "B", "C");
	for (String str : set) {
		if ("B".equals(str)) {
			set.remove(str);
			break; // Here we just jump out of the loop, thus do not have a chance to execute nextEntry() and check modCount == expectedModCount
		}
	}
	assertEquals(2, set.size());
}

 

2. ObservableSet Example -"Effective Java 2nd Edition, Chapter 10"

public class ForwardingSet<E> implements Set<E> {
	private final Set<E> s;
	public ForwardingSet(Set<E> s) {
		this.s = s;
	}

	public void clear() {
		s.clear();
	}
	public boolean contains(Object o) {
		return s.contains(o);
	}
	public boolean isEmpty() {
		return s.isEmpty();
	}
	public int size() {
		return s.size();
	}
	public Iterator<E> iterator() {
		return s.iterator();
	}
	public boolean add(E e) {
		return s.add(e);
	}
	public boolean remove(Object o) {
		return s.remove(o);
	}
	public boolean containsAll(Collection<?> c) {
		return s.containsAll(c);
	}
	public boolean addAll(Collection<? extends E> c) {
		return s.addAll(c);
	}
	public boolean removeAll(Collection<?> c) {
		return s.removeAll(c);
	}
	public boolean retainAll(Collection<?> c) {
		return s.retainAll(c);
	}
	public Object[] toArray() {
		return s.toArray();
	}
	public <T> T[] toArray(T[] a) {
		return s.toArray(a);
	}
	@Override
	public boolean equals(Object o) {
		return s.equals(o);
	}
	@Override
	public int hashCode() {
		return s.hashCode();
	}
	@Override
	public String toString() {
		return s.toString();
	}
}
public class ObservableSet<E> extends ForwardingSet<E> {
	private final List<SetObserver<E>> observers = new ArrayList<SetObserver<E>>();

	public ObservableSet(Set<E> set) {
		super(set);
	}

	public void addObserver(SetObserver<E> observer) {
		synchronized (observers) {
			observers.add(observer);
		}
	}

	public boolean removeObserver(SetObserver<E> observer) {
		synchronized (observers) {
			return observers.remove(observer);
		}
	}

	@Override
	public boolean add(E element) {
		boolean added = super.add(element);
		if (added) {
			synchronized (observers) {
				for (SetObserver<E> observer : observers)
					observer.elementAdded(this, element);
			}
		}
		return added;
	}
}
@Test(expected = ConcurrentModificationException.class)
public void addTest() throws InterruptedException {
	ObservableSet<Integer> set = new ObservableSet<>(new HashSet<Integer>());

	set.addObserver(new SetObserver<Integer>() {
		@Override
		public void elementAdded(final ObservableSet<Integer> observalSet,
				Integer element) {
			System.out.println(element + " added to set");
			final SetObserver<Integer> setObserverInstance = this;
			if (23 == element) {
				observalSet.removeObserver(setObserverInstance);
			}
		}
	});

	for (int i = 0; i < 100; i++) {
		set.add(i);
	}
}

   1) We can find that because "set.add()" need current thread to get lock for "observers", and "observalSet.removeObserver()" need current thread to get lock for "observers" too,

       And because the intrinsic lock in Java is ReentrantLock, thus when we invoke "observalSet.removeObserver()" in main thread, we don't have to get the lock again, and thus ConcurrentModificationException will be thrown.

       Just it behaves exactely the same as the first example above.

    Another version of removeObserver:

@Test
public void addTest() throws InterruptedException {
	ObservableSet<Integer> set = new ObservableSet<>(new HashSet<Integer>());

	set.addObserver(new SetObserver<Integer>() {
		@Override
		public void elementAdded(final ObservableSet<Integer> observalSet,
				Integer element) {
			System.out.println(element);
			final SetObserver<Integer> observerInstance = this;
			if (23 == element) {
				new Thread(new Runnable() {
					@Override
					public void run() {
					observalSet.removeObserver(observerInstance);
					}
				}).start();
			}
		}
	});

	for (int i = 0; i < 100; i++) {
		set.add(i);
	}
}

    Here we created a new thread to call "observalSet.removeObserver()", and the new thread need to get lock "observers", but the lock is currently held by the main thread, thus the removal thread need to wait until the main thread loops over and relase the lock.

    The result is that "observalSet.removeObserver()" executed only after 100 elements added to set.
    And this is esay to jump into DeatLock:

public boolean isObserverEmpty() {
	synchronized (observers) {
		return observers.isEmpty();
	}
}
@Test
public void addTest() throws InterruptedException {
	ObservableSet<Integer> set = new ObservableSet<>(new HashSet<Integer>());

	set.addObserver(new SetObserver<Integer>() {
		@Override
		public void elementAdded(final ObservableSet<Integer> observalSet,
				Integer element) {
			System.out.println(element);
			final SetObserver<Integer> observerInstance = this;
			new Thread(new Runnable() {
				@Override
				public void run() {
					try {
					observalSet.removeObserver(observerInstance);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}).start();
		}
	});
	while (!set.isObserverEmpty()) {
		Thread.sleep(10);
	}
}

    The example above wil jump into DeadLock: Main thread hold the "observers" lock(Thread.sleep will not release this lock) and wait for the listener thread to delete observer; At the same time, the listener thread trying to get the "observers" lock in order to delete the observer.

    This is a typical deatlock example. But it has the same mechanism that applies in Java GUI Toolkit which lead to exception/deadlock/starvation in practice.

    The root reason why this kind of program is awkward/dangerous, is that it has elementeAdded() method which is designed to be override by client, but this method is just inside a synchronized region.

    "Inside a synchronized region, do not invoke a method that is designed to be overriden."

    "To avoid liveness and safety failures, never cede control to the client within a synchronized method or block."

 

 

Reference Links:
1) http://stackoverflow.com/questions/6916385/is-there-a-concurrent-list-in-javas-jdk

2) http://stackoverflow.com/questions/6992608/why-there-is-no-concurrenthashset-against-concurrenthashmap

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics