Consumidor productor con calificaciones

Soy nuevo en Clojure y trato de entender cómo usar correctamente sus características de simultaneidad, por lo que cualquier crítica / sugerencia es apreciada. Así que estoy tratando de escribir un pequeño progtwig de prueba en Clojure que funciona de la siguiente manera:

  1. hay 5 productores y 2 consumidores
  2. un productor espera un tiempo aleatorio y luego empuja un número en una cola compartida.
  3. un consumidor debe sacar un número de la cola tan pronto como la cola no esté vacía y luego dormir durante un corto tiempo para simular el trabajo
  4. los consumidores deben bloquear cuando la cola está vacía
  5. los productores deben bloquear cuando la cola tiene más de 4 elementos para evitar que crezca enormemente

Aquí está mi plan para cada paso anterior:

  1. los productores y consumidores serán agentes que realmente no se preocupan por su estado (solo valores nulos o algo así); Solo uso los agentes para enviar una función de “consumidor” o “productor” para hacer en algún momento. Entonces la cola compartida será (def queue (ref [])). Quizás esto debería ser un átomo sin embargo?
  • en la función del agente “productor”, simplemente (Subproceso / reposo (rand-int 1000)) y luego (dosync (alter queue conj (rand-int 100))) para pasar a la cola.
  • Estoy pensando en hacer que los agentes de consumo observen la cola de cambios con add-watcher. Sin embargo, no estoy seguro de esto … despertará a los consumidores en cualquier cambio, incluso si el cambio proviene de un consumidor que saca algo (posiblemente haciéndolo vacío). Tal vez sea suficiente verificar esto en la función de observador. Otro problema que veo es que si todos los consumidores están ocupados, ¿qué sucede cuando un productor agrega algo nuevo a la cola? ¿El evento observado se pone en cola en algún agente de consumo o desaparece?
  • véase más arriba
  • Realmente no sé cómo hacer esto. Escuché que la secuencia de clojure puede ser útil, pero no pude encontrar suficiente documentación sobre cómo usarla y mi prueba inicial no pareció funcionar (lo siento, ya no tengo el código en mí)
  • Aquí está mi opinión sobre esto. Me empeñé en usar solo las estructuras de datos de Clojure para ver cómo funcionaría. Tenga en cuenta que hubiera sido perfectamente habitual e idiomático tomar una cola de locking de la caja de herramientas de Java y usarla aquí; el código sería fácil de adaptar, creo. Actualización: de hecho lo java.util.concurrent.LinkedBlockingQueue a java.util.concurrent.LinkedBlockingQueue , ver a continuación.

    clojure.lang.PersistentQueue

    Llamar (pro-con) para iniciar una ejecución de prueba; luego eche un vistazo a los contenidos de output para ver si sucedió algo y queue-lengths para ver si permanecen dentro del límite dado.

    Actualización: para explicar por qué sentí la necesidad de usar, ensure hacerlo más abajo (me preguntaron sobre esto en el IRC), esto es para evitar errores de escritura (para una definición, consulte el artículo de Wikipedia sobre Aislamiento de instantáneas ). Si sustituyo @queue por (ensure queue) , sería posible que dos o más productores verifiquen la longitud de la cola, descubran que es menor que 4, luego coloquen elementos adicionales en la cola y posiblemente traigan la longitud total de la cola. la cola por encima de 4, rompiendo la restricción. De manera similar, dos consumidores que hacen @queue podrían aceptar el mismo artículo para procesarlo, luego sacar dos artículos de la cola. ensure que no ocurra ninguno de estos escenarios.

     (def go-on? (atom true)) (def queue (ref clojure.lang.PersistentQueue/EMPTY)) (def output (ref ())) (def queue-lengths (ref ())) (def *max-queue-length* 4) (defn overseer ([] (overseer 20000)) ([timeout] (Thread/sleep timeout) (swap! go-on? not))) (defn queue-length-watch [_ _ _ new-queue-state] (dosync (alter queue-lengths conj (count new-queue-state)))) (add-watch queue :queue-length-watch queue-length-watch) (defn producer [tag] (future (while @go-on? (if (dosync (let [l (count (ensure queue))] (when (< l *max-queue-length*) (alter queue conj tag) true))) (Thread/sleep (rand-int 2000)))))) (defn consumer [] (future (while @go-on? (Thread/sleep 100) ; don't look at the queue too often (when-let [item (dosync (let [item (first (ensure queue))] (alter queue pop) item))] (Thread/sleep (rand-int 500)) ; do stuff (dosync (alter output conj item)))))) ; and let us know (defn pro-con [] (reset! go-on? true) (dorun (map #(%1 %2) (repeat 5 producer) (iterate inc 0))) (dorun (repeatedly 2 consumer)) (overseer)) 

    java.util.concurrent.LinkedBlockingQueue

    Una versión de lo anterior escrito usando LinkedBlockingQueue . Tenga en cuenta cómo el esquema general del código es básicamente el mismo, con algunos detalles en realidad siendo un poco más limpio. LBQ queue-lengths de queue-lengths de esta versión, ya que LBQ se encarga de esa restricción para nosotros.

     (def go-on? (atom true)) (def *max-queue-length* 4) (def queue (java.util.concurrent.LinkedBlockingQueue. *max-queue-length*)) (def output (ref ())) (defn overseer ([] (overseer 20000)) ([timeout] (Thread/sleep timeout) (swap! go-on? not))) (defn producer [tag] (future (while @go-on? (.put queue tag) (Thread/sleep (rand-int 2000))))) (defn consumer [] (future (while @go-on? ;; I'm using .poll on the next line so as not to block ;; indefinitely if we're done; note that this has the ;; side effect that nulls = nils on the queue will not ;; be handled; there's a number of other ways to go about ;; this if this is a problem, see docs on LinkedBlockingQueue (when-let [item (.poll queue)] (Thread/sleep (rand-int 500)) ; do stuff (dosync (alter output conj item)))))) ; and let us know (defn pro-con [] (reset! go-on? true) (dorun (map #(%1 %2) (repeat 5 producer) (iterate inc 0))) (dorun (repeatedly 2 consumer)) (overseer))