¿Cómo crear una Cola de locking en .NET?

Tengo un escenario en el que tengo varios subprocesos que agregan a una cola y varios subprocesos que se leen desde la misma cola. Si la cola alcanza un tamaño específico, todos los hilos que llenan la cola se bloquearán al agregar hasta que se elimine un elemento de la cola.

La solución a continuación es lo que estoy usando en este momento y mi pregunta es: ¿Cómo se puede mejorar esto? ¿Hay algún objeto que ya habilite este comportamiento en el BCL que debería usar?

internal class BlockingCollection : CollectionBase, IEnumerable { //todo: might be worth changing this into a proper QUEUE private AutoResetEvent _FullEvent = new AutoResetEvent(false); internal T this[int i] { get { return (T) List[i]; } } private int _MaxSize; internal int MaxSize { get { return _MaxSize; } set { _MaxSize = value; checkSize(); } } internal BlockingCollection(int maxSize) { MaxSize = maxSize; } internal void Add(T item) { Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId)); _FullEvent.WaitOne(); List.Add(item); Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId)); checkSize(); } internal void Remove(T item) { lock (List) { List.Remove(item); } Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId)); } protected override void OnRemoveComplete(int index, object value) { checkSize(); base.OnRemoveComplete(index, value); } internal new IEnumerator GetEnumerator() { return List.GetEnumerator(); } private void checkSize() { if (Count < MaxSize) { Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId)); _FullEvent.Set(); } else { Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId)); _FullEvent.Reset(); } } } 

Eso se ve muy inseguro (muy poca sincronización); ¿Qué tal algo así como:

 class SizeQueue { private readonly Queue queue = new Queue(); private readonly int maxSize; public SizeQueue(int maxSize) { this.maxSize = maxSize; } public void Enqueue(T item) { lock (queue) { while (queue.Count >= maxSize) { Monitor.Wait(queue); } queue.Enqueue(item); if (queue.Count == 1) { // wake up any blocked dequeue Monitor.PulseAll(queue); } } } public T Dequeue() { lock (queue) { while (queue.Count == 0) { Monitor.Wait(queue); } T item = queue.Dequeue(); if (queue.Count == maxSize - 1) { // wake up any blocked enqueue Monitor.PulseAll(queue); } return item; } } } 

(editar)

En realidad, desearía una forma de cerrar la cola para que los lectores comiencen a salir limpiamente, tal vez algo como una bandera bool, si está configurada, una cola vacía simplemente regresa (en lugar de bloquear):

 bool closing; public void Close() { lock(queue) { closing = true; Monitor.PulseAll(queue); } } public bool TryDequeue(out T value) { lock (queue) { while (queue.Count == 0) { if (closing) { value = default(T); return false; } Monitor.Wait(queue); } value = queue.Dequeue(); if (queue.Count == maxSize - 1) { // wake up any blocked enqueue Monitor.PulseAll(queue); } return true; } } 

Utilice .net 4 BlockingCollection, para poner en cola el uso de Add (), para quitar la cola del uso de Take (). Internamente utiliza ConcurrentQueue sin locking. Más información aquí Rápida y mejor técnica de cola de productor / consumidor BlockingCollection vs concurrente Cola

“¿Cómo se puede mejorar esto?”

Bueno, necesitas ver cada método en tu clase y considerar qué pasaría si otro hilo llamara simultáneamente a ese método o cualquier otro método. Por ejemplo, pones un candado en el método Eliminar, pero no en el método Agregar. ¿Qué sucede si un hilo se agrega al mismo tiempo que otro hilo? ¿Se quita? Cosas malas.

Además, considere que un método puede devolver un segundo objeto que proporciona acceso a los datos internos del primer objeto, por ejemplo, GetEnumerator. Imagine que un hilo está pasando por ese enumerador, otro hilo está modificando la lista al mismo tiempo. No está bien.

Una buena regla general es simplificar el proceso reduciendo el número de métodos en la clase al mínimo absoluto.

En particular, no herede otra clase contenedora, ya que expondrá todos los métodos de esa clase, proporcionando una forma para que la persona que llama corrompa los datos internos, o para ver cambios parcialmente completos en los datos (igual de malo, porque los datos aparece corrompido en ese momento). Oculte todos los detalles y sea completamente despiadado sobre cómo permite el acceso a ellos.

Le recomiendo encarecidamente que use soluciones listas para usar: obtenga un libro sobre cómo enhebrar o use una biblioteca de terceros. De lo contrario, dado lo que estás intentando, vas a depurar tu código durante mucho tiempo.

Además, ¿no tendría más sentido para Remove devolver un artículo (por ejemplo, el que se agregó primero, ya que es una cola), en lugar de que la persona que llama elija un elemento específico? Y cuando la cola está vacía, quizás Remove también debe bloquearse.

Actualización: ¡la respuesta de Marc realmente implementa todas estas sugerencias! 🙂 Pero lo dejaré aquí ya que puede ser útil entender por qué su versión es una mejora.

Puede usar BlockingCollection y ConcurrentQueue en System.Collections.Concurrent Namespace

  public class ProducerConsumerQueue : BlockingCollection { ///  /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality ///  public ProducerConsumerQueue() : base(new ConcurrentQueue()) { } ///  /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality ///  ///  public ProducerConsumerQueue(int maxSize) : base(new ConcurrentQueue(), maxSize) { } } 

Acabo de terminar esto usando las Extensiones Reactivas y recordé esta pregunta:

 public class BlockingQueue { private readonly Subject _queue; private readonly IEnumerator _enumerator; private readonly object _sync = new object(); public BlockingQueue() { _queue = new Subject(); _enumerator = _queue.GetEnumerator(); } public void Enqueue(T item) { lock (_sync) { _queue.OnNext(item); } } public T Dequeue() { _enumerator.MoveNext(); return _enumerator.Current; } } 

No necesariamente completamente seguro, pero muy simple.

Esto es lo que vine a optar por una cola de locking limitada y segura de hilos.

 using System; using System.Collections.Generic; using System.Text; using System.Threading; public class BlockingBuffer { private Object t_lock; private Semaphore sema_NotEmpty; private Semaphore sema_NotFull; private T[] buf; private int getFromIndex; private int putToIndex; private int size; private int numItems; public BlockingBuffer(int Capacity) { if (Capacity <= 0) throw new ArgumentOutOfRangeException("Capacity must be larger than 0"); t_lock = new Object(); buf = new T[Capacity]; sema_NotEmpty = new Semaphore(0, Capacity); sema_NotFull = new Semaphore(Capacity, Capacity); getFromIndex = 0; putToIndex = 0; size = Capacity; numItems = 0; } public void put(T item) { sema_NotFull.WaitOne(); lock (t_lock) { while (numItems == size) { Monitor.Pulse(t_lock); Monitor.Wait(t_lock); } buf[putToIndex++] = item; if (putToIndex == size) putToIndex = 0; numItems++; Monitor.Pulse(t_lock); } sema_NotEmpty.Release(); } public T take() { T item; sema_NotEmpty.WaitOne(); lock (t_lock) { while (numItems == 0) { Monitor.Pulse(t_lock); Monitor.Wait(t_lock); } item = buf[getFromIndex++]; if (getFromIndex == size) getFromIndex = 0; numItems--; Monitor.Pulse(t_lock); } sema_NotFull.Release(); return item; } } 

No he explorado completamente el TPL pero pueden tener algo que se adapte a sus necesidades, o al menos, algún forraje Reflector para inspirarse.

Espero que ayude.

Bueno, podrías mirar la clase System.Threading.Semaphore . Aparte de eso, no, tienes que hacer esto tú mismo. AFAIK no hay tal colección incorporada.

Si desea el máximo rendimiento, permitiendo que varios lectores lean y solo un escritor escriba, BCL tiene algo llamado ReaderWriterLockSlim que debería ayudar a adelgazar su código …