¿Cómo funciona el patrón disruptivo de LMAX?

Estoy tratando de entender el patrón disruptivo . He visto el video de InfoQ e intenté leer su artículo. Entiendo que hay un buffer en anillo involucrado, que se inicializa como una matriz extremadamente grande para aprovechar la ubicación del caché, eliminar la asignación de memoria nueva.

Parece que hay uno o más enteros atómicos que registran las posiciones. Cada ‘evento’ parece tener una identificación única y su posición en el anillo se encuentra al encontrar su módulo con respecto al tamaño del anillo, etc., etc.

Desafortunadamente, no tengo un sentido intuitivo de cómo funciona. He hecho muchas aplicaciones comerciales y estudié el modelo de actor , miré a SEDA, etc.

En su presentación, mencionaron que este patrón es básicamente cómo funcionan los enrutadores; sin embargo, no he encontrado ninguna buena descripción de cómo funcionan los enrutadores tampoco.

¿Hay algunos buenos consejos para una mejor explicación?

El proyecto Google Code hace referencia a un documento técnico sobre la implementación del buffer de anillo, sin embargo, es un poco seco, académico y difícil para alguien que quiere aprender cómo funciona. Sin embargo, hay algunas publicaciones en el blog que han comenzado a explicar los aspectos internos de una manera más legible. Hay una explicación del buffer en anillo que es el núcleo del patrón disruptor, una descripción de las barreras del consumidor (la parte relacionada con la lectura del disruptor) y alguna información sobre el manejo de múltiples productores disponibles.

La descripción más simple de Disruptor es: es una forma de enviar mensajes entre hilos de la manera más eficiente posible. Se puede utilizar como alternativa a una cola, pero también comparte varias características con SEDA y Actores.

Comparado con Colas:

The Disruptor proporciona la capacidad de pasar un mensaje a otros hilos, activándolo si es necesario (similar a un BlockingQueue). Sin embargo, hay 3 diferencias distintas.

  1. El usuario de Disruptor define cómo se almacenan los mensajes ampliando la clase Entry y proporcionando una fábrica para realizar la asignación previa. Esto permite la reutilización de la memoria (copia) o la Entrada podría contener una referencia a otro objeto.
  2. Poner mensajes en el Disruptor es un proceso de dos fases, primero se reclama un espacio en el buffer de anillo, que proporciona al usuario la Entrada que puede llenarse con los datos apropiados. Entonces la entrada debe ser confirmada, este enfoque de dos fases es necesario para permitir el uso flexible de la memoria mencionada anteriormente. Es la confirmación lo que hace que el mensaje sea visible para los hilos del consumidor.
  3. Es responsabilidad del consumidor realizar un seguimiento de los mensajes que se han consumido desde el buffer circular. Alejar esta responsabilidad del buffer de anillo ayudó a reducir la cantidad de contención de escritura ya que cada hilo mantiene su propio contador.

Comparado con Actores

El modelo Actor es más cercano al Disruptor que la mayoría de los otros modelos de progtwigción, especialmente si utiliza las clases BatchConsumer / BatchHandler que se proporcionan. Estas clases ocultan todas las complejidades de mantener los números de secuencia consumidos y proporcionan un conjunto de devoluciones de llamada simples cuando ocurren eventos importantes. Sin embargo, hay un par de diferencias sutiles.

  1. The Disruptor utiliza un modelo de consumo de 1 hilo – 1, donde los actores usan un modelo N: M, es decir, puedes tener tantos actores como quieras y se distribuirán a través de un número fijo de hilos (generalmente 1 por núcleo).
  2. La interfaz BatchHandler proporciona una callback adicional (y muy importante) onEndOfBatch() . Esto permite a los consumidores lentos, por ejemplo, aquellos que realizan E / S a lotes de eventos juntos para mejorar el rendimiento. Es posible realizar lotes en otros marcos Actor, sin embargo, como casi todos los demás marcos no proporcionan una callback al final del lote, debe usar un tiempo de espera para determinar el final del lote, lo que da como resultado una latencia deficiente.

Comparado con SEDA

LMAX construyó el patrón Disruptor para reemplazar un enfoque basado en SEDA.

  1. La principal mejora que proporcionó a SEDA fue la capacidad de trabajar en paralelo. Para hacer esto, el Disruptor admite multidifusión de los mismos mensajes (en el mismo orden) a múltiples consumidores. Esto evita la necesidad de etapas de horquilla en la tubería.
  2. También permitimos que los consumidores esperen los resultados de otros consumidores sin tener que poner otra etapa de espera entre ellos. Un consumidor simplemente puede ver el número de secuencia de un consumidor del que depende. Esto evita la necesidad de etapas de unión en la tubería.

Comparado con barreras de memoria

Otra forma de pensarlo es como una barrera de memoria ordenada y estructurada. Donde la barrera del productor forma la barrera de escritura y la barrera del consumidor es la barrera de lectura.

Primero, nos gustaría entender el modelo de progtwigción que ofrece.

Hay uno o más escritores. Hay uno o más lectores. Hay una línea de entradas, totalmente ordenadas de antiguo a nuevo (en la foto de izquierda a derecha). Los escritores pueden agregar nuevas entradas en el extremo derecho. Cada lector lee entradas secuencialmente de izquierda a derecha. Los lectores no pueden leer escritores anteriores, obviamente.

No hay concepto de eliminación de entrada. Uso “lector” en lugar de “consumidor” para evitar la imagen de las entradas que se consumen. Sin embargo, entendemos que las entradas a la izquierda del último lector se vuelven inútiles.

En general, los lectores pueden leer concurrentemente e independientemente. Sin embargo, podemos declarar dependencias entre los lectores. Las dependencias del lector pueden ser un gráfico acíclico arbitrario. Si el lector B depende del lector A, el lector B no puede leer más allá del lector A.

La dependencia del lector surge porque el lector A puede anotar una entrada, y el lector B depende de esa anotación. Por ejemplo, A hace algunos cálculos en una entrada y almacena el resultado en el campo a en la entrada. A luego seguir adelante, y ahora B puede leer la entrada y el valor de a A almacenada. Si el lector C no depende de A, C no debería intentar leer a .

Este es de hecho un modelo de progtwigción interesante. Independientemente del rendimiento, el modelo solo puede beneficiar a muchas aplicaciones.

Por supuesto, el objective principal de LMAX es el rendimiento. Utiliza un anillo de entradas preasignadas. El anillo es lo suficientemente grande, pero está delimitado para que el sistema no se cargue más allá de la capacidad de diseño. Si el anillo está lleno, el (los) escritor (es) esperarán hasta que los lectores más lentos avancen y hagan espacio.

Los objetos de entrada están preasignados y viven para siempre, para reducir el costo de recolección de basura. No insertamos nuevos objetos de entrada ni eliminamos objetos de entrada antiguos; en cambio, un escritor solicita una entrada preexistente, completa sus campos y notifica a los lectores. Esta aparente acción de 2 fases es realmente simplemente una acción atómica

 setNewEntry(EntryPopulator); interface EntryPopulator{ void populate(Entry existingEntry); } 

Las entradas de asignación previa también significan que las entradas adyacentes (muy probablemente) se ubican en celdas de memoria adyacentes, y dado que los lectores leen las entradas secuencialmente, esto es importante para utilizar cachés de CPU.

Y muchos esfuerzos para evitar el locking, CAS, incluso la barrera de la memoria (por ejemplo, usar una variable de secuencia no volátil si solo hay un escritor)

Para desarrolladores de lectores: diferentes lectores de anotaciones deben escribir en diferentes campos, para evitar la contención de escritura. (De hecho, deberían escribir en diferentes líneas de caché). Un lector anotador no debe tocar nada que otros lectores no dependientes puedan leer. Es por eso que digo que estos lectores anotan las entradas, en lugar de modificar las entradas.

Martin Fowler ha escrito un artículo sobre LMAX y el patrón disruptivo , The LMAX Architecture , que puede aclararlo aún más.

De hecho, me tomé el tiempo de estudiar la fuente real, por pura curiosidad, y la idea detrás de esto es bastante simple. La versión más reciente al momento de escribir esta publicación es 3.2.1.

Hay un almacenamiento intermedio que almacena eventos preasignados que almacenará los datos para que los consumidores los lean.

El búfer está respaldado por una matriz de indicadores (matriz de enteros) de su longitud que describe la disponibilidad de las ranuras de búfer (ver más detalles para más detalles). Se accede a la matriz como una java # AtomicIntegerArray, por lo tanto, a los fines de esta explicación, puede asumir que es una.

Puede haber cualquier cantidad de productores. Cuando el productor desea escribir en el búfer, se genera un número largo (como cuando llama a AtomicLong # getAndIncrement, el disruptor realmente usa su propia implementación, pero funciona de la misma manera). Vamos a llamar esto generado por mucho tiempo a producerCallId. De manera similar, un ConsumerCallId se genera cuando un consumidor ENDS lee un slot desde un buffer. Se accede al consumidorCallId más reciente.

(Si hay muchos consumidores, se elige la llamada con la ID más baja).

Estos identificadores se comparan, y si la diferencia entre los dos es menor que el lado de la memoria intermedia, el productor puede escribir.

(Si el producerCallId es mayor que el consumerCallId + bufferSize reciente, significa que el búfer está lleno y el productor está obligado a esperar en el bus hasta que esté disponible).

Al productor se le asigna la ranura en el búfer en función de su callId (que es prducerCallId modulo bufferSize, pero como el bufferSize siempre tiene una potencia de 2 (el límite se aplica en la creación del búfer), la operación real utilizada es producerCallId & (bufferSize – 1 )). Entonces es libre de modificar el evento en ese espacio.

(El algoritmo real es un poco más complicado, involucrando almacenar en caché el consumerId reciente en una referencia atómica separada, con fines de optimización).

Cuando se modificó el evento, el cambio se “publicó”. Al publicar la ranura respectiva en la matriz de banderas se completa con la bandera actualizada. El valor del indicador es el número del bucle (producerCallId dividido por el bufferSize (de nuevo desde que bufferSize tiene una potencia de 2, la operación real es un cambio a la derecha).

De manera similar, puede haber cualquier cantidad de consumidores. Cada vez que un consumidor desea acceder al buffer, se genera un consumerCallId (dependiendo de cómo se agregaron los consumidores al disruptor, el atómico utilizado en la generación del id se puede compartir o separar para cada uno de ellos). Este consumerCallId se compara con el producto más reciente, y si es menor de los dos, el lector puede progresar.

(Del mismo modo, si producerCallId es incluso para el consumerCallId, significa que el buffer es empety y el consumidor está obligado a esperar. La forma de esperar la define WaitStrategy durante la creación del disruptor).

Para los consumidores individuales (los que tienen su propio generador de identificación), lo siguiente que se verifica es la capacidad de consumir por lotes. Las ranuras en el buffer se examinan en orden desde el respectivo al consumidor CallId (el índice se determina de la misma manera que para los productores), a la correspondiente al productorCallId reciente.

Se examinan en un bucle al comparar el valor del indicador escrito en el conjunto de indicadores, frente a un valor de indicador generado para el identificador de llamadas del consumidor. Si las banderas coinciden, significa que los productores que llenan las máquinas tragamonedas han confirmado sus cambios. De lo contrario, el ciclo se interrumpe y se devuelve el valor de cambio commited más alto. Las ranuras de ConsumerCallId a recibidas en changeId se pueden consumir en lote.

Si un grupo de consumidores lee en conjunto (los que tienen un generador de identificador compartido), cada uno solo toma un callId único, y solo se verifica y se devuelve el espacio para ese callId individual.

De este artículo :

El patrón disruptivo es una cola de procesamiento por lotes respaldada por una matriz circular (es decir, la memoria intermedia en anillo) llena de objetos de transferencia preasignados que utiliza barreras de memoria para sincronizar productores y consumidores a través de secuencias.

Las barreras de memoria son difíciles de explicar y el blog de Trisha ha hecho el mejor bash en mi opinión con esta publicación: http://mechanitis.blogspot.com/2011/08/dissecting-disruptor-why-its-so-fast. html

Pero si no quiere sumergirse en los detalles de bajo nivel, puede saber que las barreras de memoria en Java se implementan a través de la palabra clave volatile o a través de java.util.concurrent.AtomicLong . Las secuencias de patrones AtomicLong son AtomicLong y se comunican entre productores y consumidores a través de barreras de memoria en lugar de lockings.

Me resulta más fácil entender un concepto a través del código, por lo que el siguiente código es un helloworld simple de CoralQueue , que es una implementación de patrón disruptor realizada por CoralBlocks con la que estoy afiliado. En el siguiente código, puede ver cómo el patrón disruptor implementa el procesamiento por lotes y cómo el búfer de anillo (es decir, el conjunto circular) permite la comunicación libre de basura entre dos subprocesos:

 package com.coralblocks.coralqueue.sample.queue; import com.coralblocks.coralqueue.AtomicQueue; import com.coralblocks.coralqueue.Queue; import com.coralblocks.coralqueue.util.MutableLong; public class Sample { public static void main(String[] args) throws InterruptedException { final Queue queue = new AtomicQueue(1024, MutableLong.class); Thread consumer = new Thread() { @Override public void run() { boolean running = true; while(running) { long avail; while((avail = queue.availableToPoll()) == 0); // busy spin for(int i = 0; i < avail; i++) { MutableLong ml = queue.poll(); if (ml.get() == -1) { running = false; } else { System.out.println(ml.get()); } } queue.donePolling(); } } }; consumer.start(); MutableLong ml; for(int i = 0; i < 10; i++) { while((ml = queue.nextToDispatch()) == null); // busy spin ml.set(System.nanoTime()); queue.flush(); } // send a message to stop consumer... while((ml = queue.nextToDispatch()) == null); // busy spin ml.set(-1); queue.flush(); consumer.join(); // wait for the consumer thread to die... } } 
Intereting Posts