论坛首页 Java企业应用论坛

自己实现的java lock

浏览 14265 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2011-06-02   最后修改:2011-06-03
    在做一个ajax web推送功能的时候碰到一个问题。 如果多个用户同时访问List,或者Map中的同一条数据的时候,如果对数据进行插入或者排序,就会出现并发问题。为了避免这个问题查阅了 java.util.concurrent 里面的一些类。发现没有将锁对象公开的实现类。 所以自己就尝试写一个!

实现代码

Lock  接口实现
package com.fantasy.framework.util.concurrent;

import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.fantasy.framework.util.common.DateUtil;

public class ClassLock implements Lock {

	private static final Log logger = LogFactory.getLog(ClassLock.class);

	private ConditionObject condition = new ConditionObject();
	transient Thread owner = null;

	/**
	 * 获取锁定。 <br/>
	 * 如果锁定不可用,出于线程调度目的,将禁用当前线程,并且在获得锁定之前,该线程将一直处于休眠状态。
	 */
	public void lock() {
		if (owner == null || owner == Thread.currentThread()) {
			owner = Thread.currentThread();
		} else {
			try {
				condition.await();
			} catch (InterruptedException e) {
				logger.debug(e);
			}
			owner = Thread.currentThread();
		}
	}

	/**
	 * 如果当前线程未被中断,则获取锁定。 <br/>
	 * 如果锁定可用,则获取锁定,并立即返回。 <br/>
	 */
	public void lockInterruptibly() throws InterruptedException {
		if (!Thread.currentThread().isInterrupted()) {
			this.lock();
		}
	}
	
	/**
	 * 和接口描述不一样<br/>
	 * 
	 */
	public Condition newCondition() {
		return this.condition;
	}

	/**
	 * 仅在调用时锁定为空闲状态才获取该锁定。 <br/>
	 * 如果锁定可用,则获取锁定,并立即返回值 true。 <br/>
	 * 如果锁定不可用,则此方法将立即返回值 false。
	 */
	public boolean tryLock() {
		if (owner == null || owner == Thread.currentThread()) {
			owner = Thread.currentThread();
			return true;
		}
		return false;
	}

	/**
	 * 如果锁定在给定的等待时间内空闲,并且当前线程未被中断,则获取锁定
	 */
	public boolean tryLock(long time, TimeUnit unit)throws InterruptedException {
		if (owner == null || owner == Thread.currentThread()) {
			owner = Thread.currentThread();
			return true;
		} else {
			if (condition.await(time, unit)) {
				owner = Thread.currentThread();
				return true;
			} else {
				return false;
			}
		}
	}

	/**
	 * 释放锁定
	 */
	public void unlock() {
		if (this.owner == Thread.currentThread()){
			this.owner = null;
			this.condition.signal();
		}
	}

	public class ConditionObject implements Condition {
		private BlockingQueue<Thread> threadQueues = new LinkedBlockingQueue<Thread>();

		/**
		 * 造成当前线程在接到信号或被中断之前一直处于等待状态。
		 */
		public void await() throws InterruptedException {
			if(!this.threadQueues.contains(Thread.currentThread()) && owner != Thread.currentThread())
				this.threadQueues.offer(Thread.currentThread());
			try {
				while (true)
					awaitNanos(TimeUnit.SECONDS.toNanos(60));
			} catch (InterruptedException e) {
				logger.error(e);
			}
		}

		/**
		 * 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
		 */
		public boolean await(long time, TimeUnit unit)throws InterruptedException {
			return awaitNanos(unit.toNanos(time)) > 0;
		}
		
		public long awaitNanos(long nanosTimeout) throws InterruptedException {
			Thread current = Thread.currentThread();
			if (Thread.interrupted())
				throw new InterruptedException();
			if(!this.threadQueues.contains(current) && owner != Thread.currentThread())
				this.threadQueues.offer(current);
			long start = System.currentTimeMillis();
			try {
				TimeUnit.NANOSECONDS.sleep(nanosTimeout);
			} catch (InterruptedException e) {
				logger.error(e);
			}
			long end = System.currentTimeMillis();
			return TimeUnit.MILLISECONDS.toNanos(TimeUnit.NANOSECONDS.toMillis(nanosTimeout) - (end - start));
		}

		public boolean awaitUntil(Date deadline) throws InterruptedException {
			return awaitNanos(DateUtil.interval(deadline, new Date(),Calendar.MILLISECOND)) <= 0 ? false : true;
		}

		/**
		 * 造成当前线程在接到信号之前一直处于等待状态。
		 */
		public void awaitUninterruptibly() {
			try {
				while (true)
					awaitNanos(TimeUnit.SECONDS.toNanos(60));
			} catch (InterruptedException e) {
				logger.debug(e);
			}
		}

		/**
		 * 唤醒一个等待线程。
		 */
		public void signal() {
			if(owner != null){
				owner.interrupt();
			}else{
				Thread thread = threadQueues.poll();
				if (thread != null) {
					thread.interrupt();
				}
			}
		}

		/**
		 * 唤醒所有等待线程。
		 */
		public void signalAll() {
			throw new RuntimeException("signalAll 方法未实现");
		}

	}

}



队列实现
package com.fantasy.framework.util.concurrent;

import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class LinkedQueue<E> implements BlockingQueue<E> {

	private static final Log logger = LogFactory.getLog(LinkedQueue.class);
	private LinkedList<E> items = new LinkedList<E>();//普通队列
	protected final ClassLock takeLock = new ClassLock();//取出锁
	protected final ClassLock putLock = new ClassLock();//存入锁

	public void fullyLock() {
		putLock.lock();
		takeLock.lock();
	}

	public void fullyUnlock() {
		takeLock.unlock();
		putLock.unlock();
	}

	public E element() {
		return items.element();
	}

	public boolean offer(E o) {
		try {
			return items.offer(o);
		} finally {
			if (this.takeLock.owner != null) {
				this.takeLock.newCondition().signal();
			}
		}
	}
	
	public E peek() {
		return items.peek();
	}

	public E poll() {
		return items.poll();
	}
	
	public E remove() {
		return items.remove();
	}

	public boolean add(E o) {
		return items.add(o);
	}

	public boolean addAll(Collection<? extends E> c) {
		return items.addAll(c);
	}
	
	public void clear() {
		items.clear();
	}

	public boolean contains(Object o) {
		return items.contains(o);
	}

	public boolean containsAll(Collection<?> c) {
		return items.containsAll(c);
	}

	public boolean isEmpty() {
		return items.isEmpty();
	}

	public Iterator<E> iterator() {
		return items.iterator();
	}

	public boolean remove(Object o) {
		return items.remove(o);
	}

	public boolean removeAll(Collection<?> c) {
		return items.removeAll(c);
	}

	public boolean retainAll(Collection<?> c) {
		return items.retainAll(c);
	}

	public int size() {
		return items.size();
	}

	public Object[] toArray() {
		return items.toArray();
	}

	public <T> T[] toArray(T[] a) {
		return items.toArray(a);
	}

	public int drainTo(Collection<? super E> c) {
		return 0;
	}

	public int drainTo(Collection<? super E> c, int maxElements) {
		return 0;
	}

	public boolean offer(E o, long timeout, TimeUnit unit)
			throws InterruptedException {
		try {
			if (this.putLock.tryLock(timeout, unit)) {
				this.offer(o);
				return true;
			}
			return false;
		} finally {
			this.putLock.unlock();
		}
	}

	public E poll(long time, TimeUnit unit) throws InterruptedException {
		try {
			long start = System.currentTimeMillis();
			long timeout = unit.toMillis(time);
			if (this.takeLock.tryLock(time,unit)) {
				long end = System.currentTimeMillis();
				E e = this.poll();
				if(e == null){
					this.takeLock.newCondition().await((timeout - (end - start)), TimeUnit.MILLISECONDS);
					return this.poll();
				}
				return e;
			}
			return null;
		} finally {
			this.takeLock.unlock();
		}
	}

	public static void main(String[] args) throws Exception{
		final LinkedQueue<String> queue = new LinkedQueue<String>();
		(new Thread(new Runnable() {

			public void run() {
				final Thread thread = Thread.currentThread();
				try {					
					queue.takeLock.lock();
					System.out.println("获取takeLock");
					Thread.currentThread().sleep(5000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}				
				queue.takeLock.unlock();
				queue.add("123123");
				System.out.println("释放takeLock");
			}

		})).start();
		long start = System.currentTimeMillis();
		queue.poll(10000, TimeUnit.MILLISECONDS);
		long end = System.currentTimeMillis();
		System.out.println("运行时间>"+(end - start));
	}
	
	public void put(E o){
		try {
			this.putLock.lock();
			this.offer(o);
		} finally {
			this.putLock.unlock();
		}
	}

	public int remainingCapacity() {
		return Integer.MAX_VALUE;
	}

	public E take(){
		try {
			this.takeLock.lock();
			if (this.size() == 0) {
				this.takeLock.newCondition().awaitUninterruptibly();
			}
			return this.poll();
		} finally {
			this.takeLock.unlock();
		}
	}

	public List<E> toList() {
		return this.items;
	}

}



主要在使用 LinkedQueue 就可以使用锁了
LinkedQueue<Message> queue = new LinkedQueue<Message>();
		try{
			queue.fullyLock();
			//对队列排序或者插入排序的时候 锁定 takeLock 和   putLock
		}finally{
			queue.fullyUnlock();
		}



基本思路:如果一个线程获得锁,其他线程再获取该锁时,会挂起该线程,并将线程放入一个等待队列。待线程锁释放的时候再去检查等待队列,出队。并激活线程

========================================================================================================

放弃了这个方案!确实很多方面考虑不足

请教有没人 试过给队列排序 或者 按下标插入值的。   

LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>();
				List list = new ArrayList<String>(queue);
//集合排序 插入
				queue.clear();
				queue.addAll(list);



这样做转换 是不是效率很低?
   发表时间:2011-06-02  
cocurrent BlockQueue的东西不能满足你??
0 请登录后投票
   发表时间:2011-06-02  
agapple 写道
cocurrent BlockQueue的东西不能满足你??

开始也试过!  但是它没公开 private void fullyUnlock(); 与 private void fullyUnlock(); 因为我想直接操作锁。   也试过继承 LinkedBlockingQueue 这个类修改了一下。 但还是出现了一些问题。所以...
0 请登录后投票
   发表时间:2011-06-03  
没有明白你这样做的原因。

就算concurrent里面的不满足你,你也可以换一种方式。
单独操作一个lcok不可以么。 jdk提供了锁的。
0 请登录后投票
   发表时间:2011-06-03  
如果多个用户同时访问List(CopyOnWriteArrayList),
或者Map中的同一条数据的时候,(ConcurrentHashMap)
0 请登录后投票
   发表时间:2011-06-03  
又在发明轮子
0 请登录后投票
   发表时间:2011-06-03  
这也算轮子?
0 请登录后投票
   发表时间:2011-06-03  
我果然是闲的蛋疼!~  呵呵         
LinkedBlockingQueue   已经实现好了。 
只是 fullyLock   和  fullyUnlock  是私有的。  
而且 还想加个 toList 方法。 
好对队列排序!~   不知道谁有好点的建议或者实现?

或者告诉一下 如何对   LinkedBlockingQueue   进行排序  和    插入
    
0 请登录后投票
   发表时间:2011-06-03  
java_user 写道
又在发明轮子

不知道怎么的,我特别烦动不动就冒什么轮子轮子的。。很装X的感觉。
0 请登录后投票
   发表时间:2011-06-03   最后修改:2011-06-03
只看了你的第一个lock()的实现,我震惊了,连状态更新的原子性都没保证。。。 先把基本概念了解清楚吧。。。
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics