`
cuiliwei
  • 浏览: 2394 次
  • 性别: Icon_minigender_1
  • 来自: 济南
最近访客 更多访客>>
社区版块
存档分类
最新评论

阻塞队列实现

阅读更多
实现代码:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class BlockingQuery {

private Object[] item;

private int takeIndex, putIndex, count;

private final ReentrantLock lock = new ReentrantLock();

private Condition notFull = lock.newCondition();

private Condition notEmpty = lock.newCondition();

BlockingQuery(int cap) {
if (cap <= 0) {
throw new IllegalArgumentException("init error");
}
item = new Object[cap];
}

public void put(Object x) throws InterruptedException {

lock.lock();
try {
while (count == item.length) {

System.out.println("current count == "+ count + Thread.currentThread().getName()+ " is waiting to put  ....");

notFull.await();
}
item[putIndex] = x;
++count;
if (++putIndex == item.length) {
putIndex = 0;
}
System.out.println("current count == "+ count + " is signal others to take  ....");
notEmpty.signal();

} finally {
lock.unlock();
}

}

public Object take() throws InterruptedException {

Object obj = null;
lock.lock();
try {
while (count == 0) {

System.out.println("current count == "+ count + Thread.currentThread().getName()+ " is waiting to take  ....");

notEmpty.await();
}
obj = item[takeIndex];
--count;
if (++takeIndex == item.length) {
takeIndex = 0;
}
System.out.println("current count == "+ count + " is signal others to put  ....");
notFull.signal();
return obj;
} finally {
            lock.unlock();
}

}

}

测试代码:

public class BlockingTest {

public static void main(String[] args) {

final BlockingQuery query = new BlockingQuery(10);
//final BlockingQueue<Object> query = new ArrayBlockingQueue<Object>(10);
/**
* 同时启动1000个线程放对象
*
*/
for (int i = 0; i < 1000; i++) {
final int seq = i;
new Thread(new Runnable() {

@Override
public void run() {

try {
query.put("INDEX==" + seq);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

}

/**
*
* 同时启动1000个线程读取数据
*
*/
for (int i = 0; i < 1000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
                    try {
Object obj = query.take();
                        System.out.println(obj.toString());
                    } catch (InterruptedException e) {
e.printStackTrace();
}
                   
}
}

).start();

}

}

}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics