import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class PriorityQueue< T >{ /** Main lock guarding all access */ final ReentrantLock lock = new ReentrantLock( ); /** Condition for waiting takes */ private final Condition notEmpty = lock.newCondition( ); /** Condition for waiting puts */ @SuppressWarnings ( "unused" ) private final Condition notFull = lock.newCondition( ); private final AtomicInteger count = new AtomicInteger( ); private List< Queue< T > > lbqList; private int capacity; private Priority< T > priority; private boolean queue; public PriorityQueue( int capacity , Priority< T > priority , boolean queue ) { this.capacity = capacity; this.priority = priority; this.queue = queue; init( ); } private void init( ) { lbqList = new ArrayList<>( this.capacity ); for ( int i = 0 ; i < capacity ; i++ ) { if ( queue ) { lbqList.add( new LinkedList<>( ) ); } else { lbqList.add( new LinkedBlockingQueue<>( ) ); } } } public int add( T t ) throws Exception { int pr = priority.getPriority( t ); Queue< T > queue = lbqList.get( pr ); if ( count.get( ) > 0 ) { count.getAndIncrement( ); queue.add( t ); if ( count.get( ) <= 1 ) { final ReentrantLock lock = this.lock; try { lock.lockInterruptibly( ); notEmpty.signal( ); } catch ( Exception e ) { return -1; } finally { lock.unlock( ); } } } else { final ReentrantLock lock = this.lock; try { lock.lockInterruptibly( ); queue.add( t ); count.getAndIncrement( ); notEmpty.signal( ); return pr; } catch ( Exception e ) { throw e; } finally { lock.unlock( ); } } return pr; } public T get( ) throws InterruptedException { T t; if ( count.get( ) > 0 ) { t = getData(); if(t != null){ return t; } } final ReentrantLock lock = this.lock; lock.lock( ); try { while ( ( t = getData( ) ) == null ) notEmpty.await( ); return t; } finally { lock.unlock( ); } } public static interface Priority< T > { int getPriority( T t ); } public T getData(){ int i = 0; T t; for( ; ;){ t = lbqList.get( i++ ).poll( ); if( t != null){ count.getAndDecrement( ); return t; } if( i >= capacity ){ return null; } } } public int size( ) { return count.get( ); } public boolean isEmpty( ) { return count.get( ) == 0; } }
test代码
@Test public void testQ( ) throws InterruptedException { Priority< Notification > priority = new Priority< Notification >( ) { @Override public int getPriority( Notification t ) { return t.getPriority( ); } }; PriorityQueue< Notification > rq = new PriorityQueue<>( 6 , priority , false ); ThreadPoolExecutor te = new ThreadPoolExecutor( 8 , 8 , 100 , TimeUnit.MICROSECONDS , new LinkedBlockingQueue< Runnable >( ) ); ThreadPoolExecutor ted = new ThreadPoolExecutor(4 , 4 , 100 , TimeUnit.MICROSECONDS , new LinkedBlockingQueue< Runnable >( ) ); int sum = 100; int read = sum , write = sum; boolean boo = true; AtomicLong at = new AtomicLong( ); for ( ; ; ) { int i = rr.nextInt( 10 ); if ( boo && read > 0) { for (int j = 0 ; i >= j ; j++ ) { te.execute( new Runnable( ) { @Override public void run( ) { try { Notification nf = new Notification( ); nf.setId( at.getAndIncrement( ) ); //nf.setPriority( r.nextInt( 6 ) ); nf.setPriority( nf.getId( )%30==0?5:1 ); rq.add( nf ); System.out.println( String.format( " read tid : %d data : %s" ,Thread.currentThread( ).getId( ) , nf ) ); } catch ( Exception e ) { e.printStackTrace( ); } } } ); } read = read - i; boo = !boo; } else if( !boo && write > 0){ for ( int j = 0 ; i >= j ; j++ ) { ted.execute( new Runnable( ) { @Override public void run( ) { try { Notification nf = rq.get( ); System.out.println( String.format( " write tid : %d data : %s" ,Thread.currentThread( ).getId( ) , nf ) ); } catch ( InterruptedException e ) { // TODO 自动生成的 catch 块 e.printStackTrace( ); } } } ); } boo = !boo; write = write - i; } if ( read <= 0 && write <= 0 ) { break; } if( read <= 0){ boo = false; } if( write <= 0 ){ boo = true; } } System.out.println( "---------------------------------------------------------------------" ); Thread.sleep( 1000000000 ); }
相关推荐
使用顺序存储实现优先级队列,展示优先级队列和普通队列的区别之处。
基于优先级队列方案抵御DDoS攻击,秦琳琳,张永平,本文提出基于优先级队列的自适应调整方案,采用带宽分配策略把合法用户的数据包分配到高优先级队列,恶意或可疑攻击者的数据包分
用c语言实现的,简单易懂,希望对大家有用。
优先级队列cpp文件PriorityQueue.cpp
优先级队列,能够实现队列的进队出队,判断队列优先级并对其排序。
算法中优先级队列问题...用堆排序的算法来做的例子
dheap(插入元素)、pop_max_dheap(删除最大元素)、pop_min_dheap(删除最小元素),is_dheap(堆验证)五个泛型算法,在此基础上实现了一个能在对数时间内获取最大和最小元素的优先级队列,相当于原stl优先级队列...
一个用堆实现的最大优先级队列,支持模板,动态内存申请,一个小例子,VS2008编译通过,大家一起进步。
使用c++模板实现的堆排序、优先级队列,在vs2010下编译运行通过。压缩文件里为两个工程文件,如果有vs2010的话解压缩打开sln文件就可以了,没有的话,新建工程将文件复制过去就ok了。如果有问题可以留言。
队列及优先级队列的std=C99结构方法声明,固定内存大小,构造函数时申请最大内存空间,不支持动态内存申请。优先级队列的比较函数需要另外明细
队列及优先级队列的std=C99结构方法实现,固定内存大小,构造函数时申请最大内存空间,不支持动态内存申请。优先级队列的比较函数需要另外明细
10.链式队列以及优先级队列应用.ppt
ES6的优先级队列 描述 ES6实现具有TypeScript支持的优先级队列数据结构。 请访问以了解更多有关如何将此文档翻译成更多语言的信息。 内容 安装 纱 yarn add prioqueue NPM npm install prioqueue 深入 优先级队列...
优先级队列的实现,包括堆的实现,最大堆的生成
优先级队列的C++实现
优先级队列测试程序c++
PriorityQueue-MEX-Matlab 优先级队列 matlab