SwingWorker, done () se ejecuta antes de que las llamadas a process () hayan finalizado

He estado trabajando con SwingWorker s por un tiempo y he terminado con un comportamiento extraño, al menos para mí. Comprendo claramente que, por motivos de rendimiento, varias invocaciones al método publish () se combinan en una sola invocación. Tiene mucho sentido para mí y sospecho que SwingWorker mantiene algún tipo de cola para procesar todas las llamadas.

De acuerdo con el tutorial y API, cuando SwingWorker finaliza su ejecución, doInBackground () finaliza normalmente o el hilo de trabajo se cancela desde el exterior, luego se invoca el método done () . Hasta aquí todo bien.

Pero tengo un ejemplo (similar al que se muestra en los tutoriales) donde hay llamadas al método process() realizadas después de que se ejecuta el método done() . Como ambos métodos se ejecutan en el hilo de despacho de eventos , esperaría que done() se ejecutara después de que todas las invocaciones de process() hayan finalizado. En otras palabras:

Esperado:

 Writing... Writing... Stopped! 

Resultado:

 Writing... Stopped! Writing... 

Código de muestra

 import java.awt.BorderLayout; import java.awt.Dimension; import java.awt.Graphics; import java.awt.event.ActionEvent; import java.util.List; import javax.swing.AbstractAction; import javax.swing.Action; import javax.swing.JButton; import javax.swing.JFrame; import javax.swing.JPanel; import javax.swing.JScrollPane; import javax.swing.JTextArea; import javax.swing.SwingUtilities; import javax.swing.SwingWorker; public class Demo { private SwingWorker worker; private JTextArea textArea; private Action startAction, stopAction; private void createAndShowGui() { startAction = new AbstractAction("Start writing") { @Override public void actionPerformed(ActionEvent e) { Demo.this.startWriting(); this.setEnabled(false); stopAction.setEnabled(true); } }; stopAction = new AbstractAction("Stop writing") { @Override public void actionPerformed(ActionEvent e) { Demo.this.stopWriting(); this.setEnabled(false); startAction.setEnabled(true); } }; JPanel buttonsPanel = new JPanel(); buttonsPanel.add(new JButton(startAction)); buttonsPanel.add(new JButton(stopAction)); textArea = new JTextArea(30, 50); JScrollPane scrollPane = new JScrollPane(textArea); JFrame frame = new JFrame("Test"); frame.setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE); frame.add(scrollPane); frame.add(buttonsPanel, BorderLayout.SOUTH); frame.pack(); frame.setLocationRelativeTo(null); frame.setVisible(true); } private void startWriting() { stopWriting(); worker = new SwingWorker() { @Override protected Void doInBackground() throws Exception { while(!isCancelled()) { publish("Writing...\n"); } return null; } @Override protected void process(List chunks) { String string = chunks.get(chunks.size() - 1); textArea.append(string); } @Override protected void done() { textArea.append("Stopped!\n"); } }; worker.execute(); } private void stopWriting() { if(worker != null && !worker.isCancelled()) { worker.cancel(true); } } public static void main(String[] args) { SwingUtilities.invokeLater(new Runnable() { @Override public void run() { new Demo().createAndShowGui(); } }); } } 

RESPUESTA CORTA:

Esto sucede porque publish () no progtwig directamente el process , sino que establece un temporizador que activará la progtwigción de un bloque de proceso () en el EDT después del DELAY . Entonces, cuando el trabajador es cancelado, todavía hay un temporizador esperando para progtwigr un proceso () con los datos de la última publicación. La razón para usar un temporizador es implementar la optimización donde se puede ejecutar un único proceso con los datos combinados de varias publicaciones.

RESPUESTA LARGA:

Veamos cómo publish () y cancelar interactúan entre ellos, para eso, sumerjamosnos en algún código fuente.

Primero la parte fácil, cancel(true) :

 public final boolean cancel(boolean mayInterruptIfRunning) { return future.cancel(mayInterruptIfRunning); } 

Esta cancelación termina llamando al siguiente código:

 boolean innerCancel(boolean mayInterruptIfRunning) { for (;;) { int s = getState(); if (ranOrCancelled(s)) return false; if (compareAndSetState(s, CANCELLED)) // <----- break; } if (mayInterruptIfRunning) { Thread r = runner; if (r != null) r.interrupt(); // <----- } releaseShared(0); done(); // <----- return true; } 

El estado de SwingWorker está establecido en CANCELLED , el hilo se interrumpe y se llama done() , pero esto no es hecho por SwingWorker, pero el future hecho (), que se especifica cuando la instancia se crea en el constructor SwingWorker:

 future = new FutureTask(callable) { @Override protected void done() { doneEDT(); // <----- setState(StateValue.DONE); } }; 

Y el código doneEDT() es:

 private void doneEDT() { Runnable doDone = new Runnable() { public void run() { done(); // <----- } }; if (SwingUtilities.isEventDispatchThread()) { doDone.run(); // <----- } else { doSubmit.add(doDone); } } 

Que llama a los SwingWorkers done() directamente si estamos en el EDT, que es nuestro caso. En este punto, el SwingWorker debe detenerse, no se debe llamar a más publish() , esto es fácil de demostrar con la siguiente modificación:

 while(!isCancelled()) { textArea.append("Calling publish\n"); publish("Writing...\n"); } 

Sin embargo, recibimos un mensaje de "Escritura ..." de process (). Entonces, veamos cómo se llama process (). El código fuente para publish(...) es

 protected final void publish(V... chunks) { synchronized (this) { if (doProcess == null) { doProcess = new AccumulativeRunnable() { @Override public void run(List args) { process(args); // <----- } @Override protected void submit() { doSubmit.add(this); // <----- } }; } } doProcess.add(chunks); // <----- } 

Vemos que la run() del Runnable doProcess es quién termina llamando al process(args) , pero este código simplemente llama a doProcess.add(chunks) no a doProcess.run() y hay una doSubmit alrededor también. Veamos doProcess.add(chunks) .

 public final synchronized void add(T... args) { boolean isSubmitted = true; if (arguments == null) { isSubmitted = false; arguments = new ArrayList(); } Collections.addAll(arguments, args); // <----- if (!isSubmitted) { //This is what will make that for multiple publishes only one process is executed submit(); // <----- } } 

Entonces, lo que publish() realmente hace es agregar los fragmentos en algunos arguments ArrayList internos y llamar a submit() . Acabamos de ver que solo envía llamadas doSubmit.add(this) , que es el mismo método add , ya que doProcess y doSubmit extienden AccumulativeRunnable , sin embargo, esta vez V es Runnable lugar de String como en doProcess . Entonces un pedazo es el ejecutable que llama process(args) . Sin embargo, la llamada a submit() es un método completamente diferente definido en la clase de doSubmit :

 private static class DoSubmitAccumulativeRunnable extends AccumulativeRunnable implements ActionListener { private final static int DELAY = (int) (1000 / 30); @Override protected void run(List args) { for (Runnable runnable : args) { runnable.run(); } } @Override protected void submit() { Timer timer = new Timer(DELAY, this); // <----- timer.setRepeats(false); timer.start(); } public void actionPerformed(ActionEvent event) { run(); // <----- } } 

Crea un temporizador que dispara el código actionPerformed una vez después de DELAY milisegundos. Una vez que se activa el evento, el código se pondrá en cola en el EDT que llamará a una run() interna run() que termina run(flush()) de doProcess y ejecutando el process(chunk) , donde el fragmento es el dato de los arguments Lista de arreglo. Me salté algunos detalles, la cadena de llamadas "ejecutar" es así:

  • doSubmit.run ()
  • doSubmit.run (flush ()) // En realidad, un bucle de ejecutables pero solo tendrá uno (*)
  • doProcess.run ()
  • doProcess.run (flush ())
  • proceso (pedazo)

(*) isSubmited and flush() (que restablece este valor booleano) lo hace para que las llamadas adicionales a publicar no agreguen ejecutables doProcess para ser llamados en doSubmit.run (flush ()), sin embargo, sus datos no se ignoran. Ejecutando así un proceso único para cualquier cantidad de publicaciones llamadas durante la vida de un temporizador.

En general, lo que publish("Writing...") es progtwigr la llamada a process(chunk) en el EDT después de un RETRASO. Esto explica por qué incluso después de que cancelamos el hilo y no hay más publicaciones, aparece una ejecución del proceso, porque en el momento en que cancelamos el trabajo hay (con alta probabilidad) un temporizador que progtwigrá un process() después de done() progtwigdo.

¿Por qué se usa este temporizador en lugar de solo el proceso de progtwigción () en el EDT con invokeLater(doProcess) ? Para implementar la optimización del rendimiento explicada en los documentos :

Debido a que el método de proceso se invoca asíncronamente en el hilo de envío de eventos, pueden producirse múltiples invocaciones al método de publicación antes de que se ejecute el método de proceso. Para fines de rendimiento, todas estas invocaciones se combinan en una sola invocación con argumentos concatenados. Por ejemplo:

  publish("1"); publish("2", "3"); publish("4", "5", "6"); might result in: process("1", "2", "3", "4", "5", "6") 

Ahora sabemos que esto funciona porque todas las publicaciones que ocurren dentro de un intervalo DELAY están agregando sus arguments en esa variable interna, vimos arguments y el process(chunk) se ejecutará con todos esos datos de una vez.

¿ES ESTE UN ERROR? ¿TRABAJO?

Es difícil saber si esto es un error o no, podría tener sentido procesar los datos que el hilo de fondo ha publicado, dado que el trabajo ya está hecho y usted podría estar interesado en actualizar la GUI con tanta información como pueda. (si eso es lo process() está haciendo el process() , por ejemplo). Y entonces podría no tener sentido si done() requiere que se procesen todos los datos y / o una llamada a process () después de done () cree incoherencias en los datos / GUI.

Hay una solución obvia si no desea que se ejecute ningún nuevo proceso () después de done (), simplemente verifique si el trabajador también se cancela en el método de process .

 @Override protected void process(List chunks) { if (isCancelled()) return; String string = chunks.get(chunks.size() - 1); textArea.append(string); } 

Es más complicado hacer que done () se ejecute después de ese último proceso (); por ejemplo, solo se podría usar también un temporizador que progtwigrá el trabajo done () real después de> DELAY. Aunque no puedo pensar que este sea un caso común, si cancelas No debería ser importante perder un proceso más () cuando sabemos que de hecho estamos cancelando la ejecución de todos los futuros.

Después de leer la excelente respuesta de DSquare y concluir que se necesitarían algunas subclases, se me ocurrió esta idea para cualquiera que necesite asegurarse de que todos los fragmentos publicados se hayan procesado en el EDT antes de continuar.

NB Intenté escribirlo en Java en lugar de Jython (mi idioma de elección y oficialmente el mejor idioma del mundo), pero es un poco complicado porque, por ejemplo, la publish es final , por lo que tendrías que inventar otro método para llamarlo, y también porque tienes que (bostezar) parametrizar todo con generics en Java.

Este código debe ser comprensible para cualquier persona de Java: solo para ayudar, con self.publication_counter.get() , se evalúa como False cuando el resultado es 0.

 # this is how you say Worker... is a subclass of SwingWorker in Python/Jython class WorkerAbleToWaitForPublicationToFinish( javax.swing.SwingWorker ): # __init__ is the constructor method in Python/Jython def __init__( self ): # we just add an "attribute" (here, publication_counter) to the object being created (self) to create a field of the new object self.publication_counter = java.util.concurrent.atomic.AtomicInteger() def await_processing_of_all_chunks( self ): while self.publication_counter.get(): time.sleep( 0.001 ) # fully functional override of the Java method def process( self, chunks ): for chunk in chunks: pass # DO SOMETHING WITH EACH CHUNK # decrement the counter by the number of chunks received # NB do this AFTER dealing with the chunks self.publication_counter.addAndGet( - len( chunks ) ) # fully functional override of the Java method def publish( self, *chunks ): # increment the counter by the number of chunks received # NB do this BEFORE publishing the chunks self.publication_counter.addAndGet( len( chunks )) self.super__publish( chunks ) 

Entonces en tu código de llamada, pones algo como:

  engine.update_xliff_task.get() engine.update_xliff_task.await_processing_of_all_chunks() 

PD el uso de una cláusula while como esta (es decir, una técnica de sondeo) no es muy elegante. Miré las clases java.util.concurrent disponibles, tales como CountDownLatch y Phaser (ambas con métodos de locking de hilos), pero no creo que ninguna de ellas sea adecuada para este propósito …

luego

Estaba lo suficientemente interesado en esto para ajustar una clase de concurrencia adecuada (escrita en Java, que se encuentra en el sitio de Apache) llamada CounterLatch . Su versión detiene el hilo en await() si se alcanza el valor de un contador AtomicLong . Mi versión aquí le permite hacer eso, o lo contrario: decir “espere hasta que el contador scope cierto valor antes de levantar el pestillo”:

Nota: el uso de AtomicLong para la signal y AtomicBoolean para el released : porque en el Java original usan la palabra clave volatile . Creo que usar las clases atómicas logrará el mismo propósito.

 class CounterLatch(): def __init__( self, initial = 0, wait_value = 0, lift_on_reached = True ): self.count = java.util.concurrent.atomic.AtomicLong( initial ) self.signal = java.util.concurrent.atomic.AtomicLong( wait_value ) class Sync( java.util.concurrent.locks.AbstractQueuedSynchronizer ): def tryAcquireShared( sync_self, arg ): if lift_on_reached: return -1 if (( not self.released.get() ) and self.count.get() != self.signal.get() ) else 1 else: return -1 if (( not self.released.get() ) and self.count.get() == self.signal.get() ) else 1 def tryReleaseShared( self, args ): return True self.sync = Sync() self.released = java.util.concurrent.atomic.AtomicBoolean() # initialised at False def await( self, *args ): if args: assert len( args ) == 2 assert type( args[ 0 ] ) is int timeout = args[ 0 ] assert type( args[ 1 ] ) is java.util.concurrent.TimeUnit unit = args[ 1 ] return self.sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)) else: self.sync.acquireSharedInterruptibly( 1 ) def count_relative( self, n ): previous = self.count.addAndGet( n ) if previous == self.signal.get(): self.sync.releaseShared( 0 ) return previous 

Entonces mi código ahora se ve así:

En el constructor SwingWorker:

 self.publication_counter_latch = CounterLatch() 

En SW.publish:

 self.publication_counter_latch.count_relative( len( chunks ) ) self.super__publish( chunks ) 

En el hilo esperando a que se detenga el procesamiento de fragmentos:

 worker.publication_counter_latch.await()