Cola de seguridad de hilos C ++ 11

Un proyecto en el que estoy trabajando usa varios subprocesos para trabajar en una colección de archivos. Cada hilo puede agregar archivos a la lista de archivos que se procesarán, así que armé (lo que pensé que era) una cola segura para subprocesos. Las porciones relevantes siguen:

// qMutex is a std::mutex intended to guard the queue // populatedNotifier is a std::condition_variable intended to // notify waiting threads of a new item in the queue void FileQueue::enqueue(std::string&& filename) { std::lock_guard lock(qMutex); q.push(std::move(filename)); // Notify anyone waiting for additional files that more have arrived populatedNotifier.notify_one(); } std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout) { std::unique_lock lock(qMutex); if (q.empty()) { if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::no_timeout) { std::string ret = q.front(); q.pop(); return ret; } else { return std::string(); } } else { std::string ret = q.front(); q.pop(); return ret; } } 

Sin embargo, de vez en cuando estoy segfaulting dentro del bloque if (...wait_for(lock, timeout) == std::cv_status::no_timeout) { } , y la inspección en gdb indica que los valores predeterminados están ocurriendo porque la cola está vacía. ¿Cómo es esto posible? Comprendí que wait_for solo devuelve cv_status::no_timeout cuando se ha notificado, y esto solo debería ocurrir después de que FileQueue::enqueue acaba de FileQueue::enqueue un nuevo elemento en la cola.

De acuerdo con la condition_variables estándar condition_variables las variables pueden reactivarse espuriamente, incluso si el evento no se ha producido. En caso de cv_status::no_timeout espuria, devolverá cv_status::no_timeout (ya que se despertó en lugar de agotar el tiempo de espera), aunque no se haya notificado. La solución correcta para esto es, por supuesto, verificar si el despertador fue realmente legítimo antes de proceder.

Los detalles se especifican en el estándar §30.5.1 [thread.condition.condvar]:

-La función se desbloqueará cuando se la señalice mediante una llamada a notify_one (), una llamada a notify_all (), la expiración del tiempo de espera absoluto (30.2.4) especificado por abs_time, o de manera espuria.

Devuelve: cv_status :: timeout si el tiempo de espera absoluto (30.2.4) especificado por abs_time expiró, other-ise cv_status :: no_timeout.

Solo mirándolo, cuando revisa una variable de condición, es mejor usar un ciclo while (de modo que si se despierta y aún no es inválido, vuelva a verificar). Acabo de escribir una plantilla para una cola asíncrona, espero que esto ayude.

 #ifndef SAFE_QUEUE #define SAFE_QUEUE #include  #include  #include  // A threadsafe-queue. template  class SafeQueue { public: SafeQueue(void) : q() , m() , c() {} ~SafeQueue(void) {} // Add an element to the queue. void enqueue(T t) { std::lock_guard lock(m); q.push(t); c.notify_one(); } // Get the "front"-element. // If the queue is empty, wait till a element is avaiable. T dequeue(void) { std::unique_lock lock(m); while(q.empty()) { // release lock as long as the wait and reaquire it afterwards. c.wait(lock); } T val = q.front(); q.pop(); return val; } private: std::queue q; mutable std::mutex m; std::condition_variable c; }; #endif 

Esta es probablemente la forma en que debes hacerlo:

 void push(std::string&& filename) { { std::lock_guard lock(qMutex); q.push(std::move(filename)); } populatedNotifier.notify_one(); } bool try_pop(std::string& filename, std::chrono::milliseconds timeout) { std::unique_lock lock(qMutex); if(!populatedNotifier.wait_for(lock, timeout, [this] { return !q.empty(); })) return false; filename = std::move(q.front()); q.pop(); return true; } 

Agregando a la respuesta aceptada, diría que la implementación de una correcta cola multi productores / consumidores múltiples es difícil (más fácil desde C ++ 11, sin embargo)

Le sugiero que pruebe la (muy buena) biblioteca de impulso libre de locking , la estructura “queue” hará lo que quiera, con garantías de espera / locking y sin la necesidad de un comstackdor C ++ 11 .

Estoy agregando esta respuesta ahora porque la biblioteca sin candado es bastante nueva para impulsar (desde 1.53 creo)

Reescribiría tu función de dequeue como:

 std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout) { std::unique_lock lock(qMutex); while(q.empty()) { if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::timeout ) return std::string(); } std::string ret = q.front(); q.pop(); return ret; } 

Es más corto y no tiene código duplicado como el tuyo. Solo problema, puede esperar más tiempo. Para evitar eso, debe recordar la hora de inicio antes del ciclo, verifique el tiempo de espera y ajuste el tiempo de espera en consecuencia. O especifica el tiempo absoluto en la condición de espera.

También hay una solución GLib para este caso, aún no lo intenté, pero creo que es una buena solución. https://developer.gnome.org/glib/2.36/glib-Asynchronous-Queues.html#g-async-queue-new

Le puede gustar lfqueue, https://github.com/Taymindis/lfqueue . Es una cola simultánea sin locking. Actualmente lo estoy usando para consumir la cola de múltiples llamadas entrantes y funciona como un encanto.