锁定老帖子 主题:自己实现的java lock
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2011-06-02
最后修改:2011-06-03
实现代码 Lock 接口实现 package com.fantasy.framework.util.concurrent; import java.util.Calendar; import java.util.Date; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.fantasy.framework.util.common.DateUtil; public class ClassLock implements Lock { private static final Log logger = LogFactory.getLog(ClassLock.class); private ConditionObject condition = new ConditionObject(); transient Thread owner = null; /** * 获取锁定。 <br/> * 如果锁定不可用,出于线程调度目的,将禁用当前线程,并且在获得锁定之前,该线程将一直处于休眠状态。 */ public void lock() { if (owner == null || owner == Thread.currentThread()) { owner = Thread.currentThread(); } else { try { condition.await(); } catch (InterruptedException e) { logger.debug(e); } owner = Thread.currentThread(); } } /** * 如果当前线程未被中断,则获取锁定。 <br/> * 如果锁定可用,则获取锁定,并立即返回。 <br/> */ public void lockInterruptibly() throws InterruptedException { if (!Thread.currentThread().isInterrupted()) { this.lock(); } } /** * 和接口描述不一样<br/> * */ public Condition newCondition() { return this.condition; } /** * 仅在调用时锁定为空闲状态才获取该锁定。 <br/> * 如果锁定可用,则获取锁定,并立即返回值 true。 <br/> * 如果锁定不可用,则此方法将立即返回值 false。 */ public boolean tryLock() { if (owner == null || owner == Thread.currentThread()) { owner = Thread.currentThread(); return true; } return false; } /** * 如果锁定在给定的等待时间内空闲,并且当前线程未被中断,则获取锁定 */ public boolean tryLock(long time, TimeUnit unit)throws InterruptedException { if (owner == null || owner == Thread.currentThread()) { owner = Thread.currentThread(); return true; } else { if (condition.await(time, unit)) { owner = Thread.currentThread(); return true; } else { return false; } } } /** * 释放锁定 */ public void unlock() { if (this.owner == Thread.currentThread()){ this.owner = null; this.condition.signal(); } } public class ConditionObject implements Condition { private BlockingQueue<Thread> threadQueues = new LinkedBlockingQueue<Thread>(); /** * 造成当前线程在接到信号或被中断之前一直处于等待状态。 */ public void await() throws InterruptedException { if(!this.threadQueues.contains(Thread.currentThread()) && owner != Thread.currentThread()) this.threadQueues.offer(Thread.currentThread()); try { while (true) awaitNanos(TimeUnit.SECONDS.toNanos(60)); } catch (InterruptedException e) { logger.error(e); } } /** * 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。 */ public boolean await(long time, TimeUnit unit)throws InterruptedException { return awaitNanos(unit.toNanos(time)) > 0; } public long awaitNanos(long nanosTimeout) throws InterruptedException { Thread current = Thread.currentThread(); if (Thread.interrupted()) throw new InterruptedException(); if(!this.threadQueues.contains(current) && owner != Thread.currentThread()) this.threadQueues.offer(current); long start = System.currentTimeMillis(); try { TimeUnit.NANOSECONDS.sleep(nanosTimeout); } catch (InterruptedException e) { logger.error(e); } long end = System.currentTimeMillis(); return TimeUnit.MILLISECONDS.toNanos(TimeUnit.NANOSECONDS.toMillis(nanosTimeout) - (end - start)); } public boolean awaitUntil(Date deadline) throws InterruptedException { return awaitNanos(DateUtil.interval(deadline, new Date(),Calendar.MILLISECOND)) <= 0 ? false : true; } /** * 造成当前线程在接到信号之前一直处于等待状态。 */ public void awaitUninterruptibly() { try { while (true) awaitNanos(TimeUnit.SECONDS.toNanos(60)); } catch (InterruptedException e) { logger.debug(e); } } /** * 唤醒一个等待线程。 */ public void signal() { if(owner != null){ owner.interrupt(); }else{ Thread thread = threadQueues.poll(); if (thread != null) { thread.interrupt(); } } } /** * 唤醒所有等待线程。 */ public void signalAll() { throw new RuntimeException("signalAll 方法未实现"); } } } 队列实现 package com.fantasy.framework.util.concurrent; import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; public class LinkedQueue<E> implements BlockingQueue<E> { private static final Log logger = LogFactory.getLog(LinkedQueue.class); private LinkedList<E> items = new LinkedList<E>();//普通队列 protected final ClassLock takeLock = new ClassLock();//取出锁 protected final ClassLock putLock = new ClassLock();//存入锁 public void fullyLock() { putLock.lock(); takeLock.lock(); } public void fullyUnlock() { takeLock.unlock(); putLock.unlock(); } public E element() { return items.element(); } public boolean offer(E o) { try { return items.offer(o); } finally { if (this.takeLock.owner != null) { this.takeLock.newCondition().signal(); } } } public E peek() { return items.peek(); } public E poll() { return items.poll(); } public E remove() { return items.remove(); } public boolean add(E o) { return items.add(o); } public boolean addAll(Collection<? extends E> c) { return items.addAll(c); } public void clear() { items.clear(); } public boolean contains(Object o) { return items.contains(o); } public boolean containsAll(Collection<?> c) { return items.containsAll(c); } public boolean isEmpty() { return items.isEmpty(); } public Iterator<E> iterator() { return items.iterator(); } public boolean remove(Object o) { return items.remove(o); } public boolean removeAll(Collection<?> c) { return items.removeAll(c); } public boolean retainAll(Collection<?> c) { return items.retainAll(c); } public int size() { return items.size(); } public Object[] toArray() { return items.toArray(); } public <T> T[] toArray(T[] a) { return items.toArray(a); } public int drainTo(Collection<? super E> c) { return 0; } public int drainTo(Collection<? super E> c, int maxElements) { return 0; } public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException { try { if (this.putLock.tryLock(timeout, unit)) { this.offer(o); return true; } return false; } finally { this.putLock.unlock(); } } public E poll(long time, TimeUnit unit) throws InterruptedException { try { long start = System.currentTimeMillis(); long timeout = unit.toMillis(time); if (this.takeLock.tryLock(time,unit)) { long end = System.currentTimeMillis(); E e = this.poll(); if(e == null){ this.takeLock.newCondition().await((timeout - (end - start)), TimeUnit.MILLISECONDS); return this.poll(); } return e; } return null; } finally { this.takeLock.unlock(); } } public static void main(String[] args) throws Exception{ final LinkedQueue<String> queue = new LinkedQueue<String>(); (new Thread(new Runnable() { public void run() { final Thread thread = Thread.currentThread(); try { queue.takeLock.lock(); System.out.println("获取takeLock"); Thread.currentThread().sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } queue.takeLock.unlock(); queue.add("123123"); System.out.println("释放takeLock"); } })).start(); long start = System.currentTimeMillis(); queue.poll(10000, TimeUnit.MILLISECONDS); long end = System.currentTimeMillis(); System.out.println("运行时间>"+(end - start)); } public void put(E o){ try { this.putLock.lock(); this.offer(o); } finally { this.putLock.unlock(); } } public int remainingCapacity() { return Integer.MAX_VALUE; } public E take(){ try { this.takeLock.lock(); if (this.size() == 0) { this.takeLock.newCondition().awaitUninterruptibly(); } return this.poll(); } finally { this.takeLock.unlock(); } } public List<E> toList() { return this.items; } } 主要在使用 LinkedQueue 就可以使用锁了 LinkedQueue<Message> queue = new LinkedQueue<Message>(); try{ queue.fullyLock(); //对队列排序或者插入排序的时候 锁定 takeLock 和 putLock }finally{ queue.fullyUnlock(); } 基本思路:如果一个线程获得锁,其他线程再获取该锁时,会挂起该线程,并将线程放入一个等待队列。待线程锁释放的时候再去检查等待队列,出队。并激活线程 ======================================================================================================== 放弃了这个方案!确实很多方面考虑不足 请教有没人 试过给队列排序 或者 按下标插入值的。 LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>(); List list = new ArrayList<String>(queue); //集合排序 插入 queue.clear(); queue.addAll(list); 这样做转换 是不是效率很低? 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2011-06-02
cocurrent BlockQueue的东西不能满足你??
|
|
返回顶楼 | |
发表时间:2011-06-02
agapple 写道 cocurrent BlockQueue的东西不能满足你??
开始也试过! 但是它没公开 private void fullyUnlock(); 与 private void fullyUnlock(); 因为我想直接操作锁。 也试过继承 LinkedBlockingQueue 这个类修改了一下。 但还是出现了一些问题。所以... |
|
返回顶楼 | |
发表时间:2011-06-03
没有明白你这样做的原因。
就算concurrent里面的不满足你,你也可以换一种方式。 单独操作一个lcok不可以么。 jdk提供了锁的。 |
|
返回顶楼 | |
发表时间:2011-06-03
如果多个用户同时访问List(CopyOnWriteArrayList),
或者Map中的同一条数据的时候,(ConcurrentHashMap) |
|
返回顶楼 | |
发表时间:2011-06-03
又在发明轮子
|
|
返回顶楼 | |
发表时间:2011-06-03
这也算轮子?
|
|
返回顶楼 | |
发表时间:2011-06-03
我果然是闲的蛋疼!~ 呵呵
LinkedBlockingQueue 已经实现好了。 只是 fullyLock 和 fullyUnlock 是私有的。 而且 还想加个 toList 方法。 好对队列排序!~ 不知道谁有好点的建议或者实现? 或者告诉一下 如何对 LinkedBlockingQueue 进行排序 和 插入 |
|
返回顶楼 | |
发表时间:2011-06-03
java_user 写道 又在发明轮子
不知道怎么的,我特别烦动不动就冒什么轮子轮子的。。很装X的感觉。 |
|
返回顶楼 | |
发表时间:2011-06-03
最后修改:2011-06-03
只看了你的第一个lock()的实现,我震惊了,连状态更新的原子性都没保证。。。 先把基本概念了解清楚吧。。。
|
|
返回顶楼 | |