自定义集合类是一个基本优先级别队列,优先级别在其中表示为 System.Collections.Concurrent.ConcurrentQueue<T> 对象的数组。在每个队列中不进行其他排序。通过实现类中的 System.Collections.Concurrent.IProducerConsumerCollection<T> 接口,然后将类实例用作 System.Collections.Concurrent.BlockingCollection<T> 的内部存储机制,来向自定义集合类添加限制和阻塞功能。
namespace ProdConsumerCS { using System; using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; // 实现具有边界和阻止功能的优先级队列。 public class SimplePriorityQueue<TPriority, TValue> : IProducerConsumerCollection<KeyValuePair<int, TValue>> { // 数组中每一个队列表示同一个优先级. // 队列中的元素具有相同的优先级 private ConcurrentQueue<KeyValuePair<int, TValue>>[] _queues = null; // 内部存储的队列个数. private int priorityCount = 0; private int m_count = 0; public SimplePriorityQueue(int priCount) { this.priorityCount = priCount; _queues = new ConcurrentQueue<KeyValuePair<int, TValue>>[priorityCount]; for (int i = 0; i < priorityCount; i++) _queues[i] = new ConcurrentQueue<KeyValuePair<int, TValue>>(); } // IProducerConsumerCollection members public bool TryAdd(KeyValuePair<int, TValue> item) { _queues[item.Key].Enqueue(item); Interlocked.Increment(ref m_count); return true; } public bool TryTake(out KeyValuePair<int, TValue> item) { bool success = false; // Loop through the queues in priority order // looking for an item to dequeue. for (int i = 0; i < priorityCount; i++) { // Lock the internal data so that the Dequeue // operation and the updating of m_count are atomic. lock (_queues) { success = _queues[i].TryDequeue(out item); if (success) { Interlocked.Decrement(ref m_count); return true; } } } // If we get here, we found nothing. // Assign the out parameter to its default value and return false. item = new KeyValuePair<int, TValue>(0, default(TValue)); return false; } public int Count { get { return m_count; } } // Required for ICollection void ICollection.CopyTo(Array array, int index) { CopyTo(array as KeyValuePair<int, TValue>[], index); } // CopyTo is problematic in a producer-consumer. // The destination array might be shorter or longer than what // we get from ToArray due to adds or takes after the destination array was allocated. // Therefore, all we try to do here is fill up destination with as much // data as we have without running off the end. public void CopyTo(KeyValuePair<int, TValue>[] destination, int destStartingIndex) { if (destination == null) throw new ArgumentNullException(); if (destStartingIndex < 0) throw new ArgumentOutOfRangeException(); int remaining = destination.Length; KeyValuePair<int, TValue>[] temp = this.ToArray(); for (int i = 0; i < destination.Length && i < temp.Length; i++) destination[i] = temp[i]; } public KeyValuePair<int, TValue>[] ToArray() { KeyValuePair<int, TValue>[] result; lock (_queues) { result = new KeyValuePair<int, TValue>[this.Count]; int index = 0; foreach (var q in _queues) { if (q.Count > 0) { q.CopyTo(result, index); index += q.Count; } } return result; } } IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } public IEnumerator<KeyValuePair<int, TValue>> GetEnumerator() { for (int i = 0; i < priorityCount; i++) { foreach (var item in _queues[i]) yield return item; } } public bool IsSynchronized { get { throw new NotSupportedException(); } } public object SyncRoot { get { throw new NotSupportedException(); } } } public class TestBlockingCollection { static void Main() { int priorityCount = 7; SimplePriorityQueue<int, int> queue = new SimplePriorityQueue<int, int>(priorityCount); var bc = new BlockingCollection<KeyValuePair<int, int>>(queue, 50); CancellationTokenSource cts = new CancellationTokenSource(); Task.Run(() => { if (Console.ReadKey(true).KeyChar == 'c') cts.Cancel(); }); // Create a Task array so that we can Wait on it // and catch any exceptions, including user cancellation. Task[] tasks = new Task[2]; // Create a producer thread. You can change the code to // make the wait time a bit slower than the consumer // thread to demonstrate the blocking capability. tasks[0] = Task.Run(() => { // We randomize the wait time, and use that value // to determine the priority level (Key) of the item. Random r = new Random(); int itemsToAdd = 40; int count = 0; while (!cts.Token.IsCancellationRequested && itemsToAdd-- > 0) { int waitTime = r.Next(2000); int priority = waitTime % priorityCount; var item = new KeyValuePair<int, int>(priority, count++); bc.Add(item); Console.WriteLine("added pri {0}, data={1}", item.Key, item.Value); } Console.WriteLine("Producer is done adding."); bc.CompleteAdding(); }, cts.Token); //Give the producer a chance to add some items. Thread.SpinWait(1000000); // Create a consumer thread. The wait time is // a bit slower than the producer thread to demonstrate // the bounding capability at the high end. Change this value to see // the consumer run faster to demonstrate the blocking functionality // at the low end. tasks[1] = Task.Run(() => { while (!bc.IsCompleted && !cts.Token.IsCancellationRequested) { Random r = new Random(); int waitTime = r.Next(2000); Thread.SpinWait(waitTime * 70); // KeyValuePair is a value type. Initialize to avoid compile error in if(success) KeyValuePair<int, int> item = new KeyValuePair<int, int>(); bool success = false; success = bc.TryTake(out item); if (success) { // Do something useful with the data. Console.WriteLine("removed Pri = {0} data = {1} collCount= {2}", item.Key, item.Value, bc.Count); } else Console.WriteLine("No items to retrieve. count = {0}", bc.Count); } Console.WriteLine("Exited consumer loop"); }, cts.Token); try { Task.WaitAll(tasks, cts.Token); } catch (OperationCanceledException e) { if (e.CancellationToken == cts.Token) Console.WriteLine("Operation was canceled by user. Press any key to exit"); } catch (AggregateException ae) { foreach (var v in ae.InnerExceptions) Console.WriteLine(v.Message); } finally { cts.Dispose(); } Console.ReadKey(true); } } }
默认情况下,System.Collections.Concurrent.BlockingCollection<T> 的存储为 System.Collections.Concurrent.ConcurrentQueue<T>。
相关推荐
Nachos实现id、限制线程数和按优先级调度算法(增改源码) Nachos实现id、限制线程数和按优先级调度算法
详细介绍IP优先级、TOS优先级、DSCP优先级和802.1p优先级的区别
路由优先级路由优先级路由优先级路由优先级路由优先级路由优先级路由优先级
NachOS线程调度_基于优先级和Round Robin算法
线程池提交优先级,执行优先级
优先级调度 和 轮转调度算法的介绍,含相关测试数据
描述解决线程优先级翻转的优先级继承协议实现
该器件返回最高优先级和次最高优先级请求代码 要求设计输入 15 位 reg 信号,输出是双优先级,分别用四位二进制代码表示最高优先级和次最高优先级。 1、列出真值表 2、设计电路、编写代码 3、设计测试电路代码 4、...
C语言运算符优先级和结合性表,全面介绍了C语言运算符的优先级和结合性。
最高优先级编码器 最高优先级编码器 最高优先级编码器 最高优先级编码器
从语言优先级及优先级口诀,一共有十五个优先级
此计算器实现了有优先级计算,除了+、-、*、/等基本运算外,还有括号、三角函数、开根等附加运算。
优先级大小优先级大小优先级大小优先级大小优先级大小优先级大小
模拟进程调度优先级算法和时间轮转算法。。无bug,可支持模拟100个进程同时测试。。,内有代码注释说明
运算符的优先级和结合性
操作系统——动态优先级调度算法源代码,多道系统中多进程并发执行,为了提高系统性能解决进程死锁问题,进程的优先级是动态变化的。正在执行的进程优先级会随时间降低,而挂起的进程或等待的进程的优先级会逐渐升高...
C++进程优先级调度进程优先级调度进程优先级调度C++进程优先级调度进程优先级调度进程优先级调度
BGP04-BGP属性之本地优先级和MEDBGP04-BGP属性之本地优先级和MED
C语言运算符优先级和口诀 便于在学习C语言中查阅哦。
c和c++运算符优先级, 方便参考