¿Imposible crear un grupo de subprocesos en caché con un límite de tamaño?

Parece imposible crear un grupo de subprocesos en caché con un límite en la cantidad de subprocesos que puede crear.

Así es como se implementa Executors.newCachedThreadPool estático en la biblioteca estándar de Java:

public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); } 

Entonces, usando esa plantilla para continuar creando un grupo de subprocesos en caché de tamaño fijo:

 new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue()); 

Ahora, si usa esto y envía 3 tareas, todo estará bien. Enviar cualquier tarea adicional dará lugar a excepciones de ejecución rechazadas.

Probando esto

 new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue()); 

Como resultado, todos los subprocesos se ejecutarán secuencialmente. Es decir, el grupo de subprocesos nunca creará más de un subproceso para manejar sus tareas.

Este es un error en el método de ejecución de ThreadPoolExecutor? O tal vez esto es intencional? O hay alguna otra manera?

Editar: Quiero algo exactamente como el grupo de subprocesos en caché (crea subprocesos a petición y luego los mata después de un tiempo de espera) pero con un límite en el número de subprocesos que puede crear y la posibilidad de continuar haciendo cola tareas adicionales una vez que tiene alcanzar su límite de hilo Según la respuesta de sjlee, esto es imposible. En cuanto al método execute () de ThreadPoolExecutor, es imposible. Necesitaría la subclase ThreadPoolExecutor y anular execute () de forma parecida a como lo hace SwingWorker, pero lo que hace SwingWorker en su ejecución () es un truco completo.

    El ThreadPoolExecutor tiene los siguientes comportamientos clave, y tus problemas pueden explicarse por estos comportamientos.

    Cuando se envían tareas,

    1. Si el grupo de subprocesos no ha alcanzado el tamaño de núcleo, crea nuevos subprocesos.
    2. Si se ha alcanzado el tamaño del núcleo y no hay hilos inactivos, pone en cola las tareas.
    3. Si se ha alcanzado el tamaño de núcleo, no hay subprocesos inactivos, y la cola se llena, crea nuevos subprocesos (hasta que alcanza el tamaño máximo).
    4. Si se alcanzó el tamaño máximo, no hay subprocesos inactivos, y la cola se llena, la política de rechazo se activa.

    En el primer ejemplo, tenga en cuenta que SynchronousQueue tiene esencialmente un tamaño de 0. Por lo tanto, en el momento en que alcanza el tamaño máximo (3), la política de rechazo comienza (# 4).

    En el segundo ejemplo, la cola de elección es una LinkedBlockingQueue que tiene un tamaño ilimitado. Por lo tanto, te quedas atascado con el comportamiento n. ° 2.

    Realmente no se puede jugar mucho con el tipo de caché o el tipo fijo, ya que su comportamiento está casi completamente determinado.

    Si desea tener un grupo de hilos delimitado y dynamic, debe utilizar un tamaño de núcleo positivo y un tamaño máximo combinados con una cola de tamaño finito. Por ejemplo,

     new ThreadPoolExecutor(10, // core size 50, // max size 10*60, // idle timeout TimeUnit.SECONDS, new ArrayBlockingQueue(20)); // queue with a size 

    A menos que me haya perdido algo, la solución a la pregunta original es simple. El siguiente código implementa el comportamiento deseado como se describe en el póster original. Generará hasta 5 hilos para trabajar en una cola ilimitada y los hilos inactivos finalizarán después de 60 segundos.

     tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue()); tp.allowCoreThreadTimeOut(true); 

    Tenía el mismo problema. Como ninguna otra respuesta reúne todos los problemas, voy a agregar la mía:

    Ahora está escrito claramente en documentos : si utiliza una cola que no bloquea ( LinkedBlockingQueue ) la configuración de subprocesos máximos no tiene ningún efecto, solo se utilizan subprocesos centrales.

    asi que:

     public class MyExecutor extends ThreadPoolExecutor { public MyExecutor() { super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue()); allowCoreThreadTimeOut(true); } public void setThreads(int n){ setMaximumPoolSize(Math.max(1, n)); setCorePoolSize(Math.max(1, n)); } } 

    Este ejecutor tiene:

    1. No hay concepto de subprocesos máx. Ya que estamos utilizando una cola ilimitada. Esto es algo bueno porque tal cola puede hacer que el ejecutor cree un número masivo de hilos adicionales no centrales, si sigue su política habitual.

    2. Una cola de tamaño máximo Integer.MAX_VALUE . Submit() lanzará RejectedExecutionException si el número de tareas pendientes supera Integer.MAX_VALUE . No estoy seguro de que nos quedemos sin memoria primero o esto sucederá.

    3. Tiene 4 hilos principales posibles. Los hilos de núcleo inactivos salen automáticamente si están inactivos durante 5 segundos. Por lo tanto, sí, estrictamente a petición. El número puede variarse usando el método setThreads() .

    4. Se asegura de que la cantidad mínima de hilos centrales nunca sea inferior a uno, o bien submit() rechazará todas las tareas. Dado que los hilos centrales deben ser> = hilos máximos, el método setThreads() establece hilos máximos, aunque la configuración del hilo máximo es inútil para una cola sin límites.

    En su primer ejemplo, las tareas subsiguientes se rechazan porque AbortPolicy es el RejectedExecutionHandler predeterminado. ThreadPoolExecutor contiene las siguientes políticas, que puede cambiar a través del método setRejectedExecutionHandler :

     CallerRunsPolicy AbortPolicy DiscardPolicy DiscardOldestPolicy 

    Parece que quieres un grupo de subprocesos en caché con una CallerRunsPolicy.

    Ninguna de las respuestas solucionó mi problema, que tenía que ver con la creación de una cantidad limitada de conexiones HTTP utilizando el cliente HTTP de Apache (versión 3.x). Como me tomó algunas horas encontrar una buena configuración, compartiré:

     private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); 

    Esto crea un ThreadPoolExecutor que comienza con cinco y contiene un máximo de diez subprocesos que se ejecutan simultáneamente utilizando CallerRunsPolicy para su ejecución.

    Según el Javadoc para ThreadPoolExecutor:

    Si hay más de corePoolSize pero menos de máximo subprocesos dePoolSize en ejecución, solo se creará un nuevo subproceso si la cola está llena . Al establecer corePoolSize y maximumPoolSize de la misma manera, crea un grupo de subprocesos de tamaño fijo.

    (Énfasis mío)

    La respuesta de jitter es lo que quieres, aunque la mía responde a tu otra pregunta. 🙂

    hay una opción más En lugar de utilizar SynchronousQueue nuevo, también puede usar cualquier otra cola, pero debe asegurarse de que su tamaño sea 1, por lo que obligará a los ejecutores a crear un nuevo hilo.

    No parece que ninguna de las respuestas responda la pregunta, de hecho, no veo una forma de hacerlo, incluso si subclasificas desde PooledExecutorService, ya que muchos de los métodos / propiedades son privados, por ejemplo, haciendo que addIfUnderMaximumPoolSize estuviera protegido, podrías Haz lo siguiente:

     class MyThreadPoolService extends ThreadPoolService { public void execute(Runnable run) { if (poolSize() == 0) { if (addIfUnderMaximumPoolSize(run) != null) return; } super.execute(run); } } 

    Lo más cerca que conseguí fue esto, pero incluso esa no es una solución muy buena

     new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue()) { public void execute(Runnable command) { if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) { super.setCorePoolSize(super.getCorePoolSize() + 1); } super.execute(command); } protected void afterExecute(Runnable r, Throwable t) { // nothing in the queue if (getQueue().isEmpty() && getPoolSize() > min) { setCorePoolSize(getCorePoolSize() - 1); } }; }; 

    ps no probado el anterior

    Esto es lo que quieres (al menos eso creo). Para una explicación, verifique la respuesta de Jonathan Feinberg

    Executors.newFixedThreadPool(int n)

    Crea un grupo de subprocesos que reutiliza una cantidad fija de subprocesos que operan desde una cola compartida ilimitada. En cualquier punto, como mucho nThreads los hilos serán tareas de procesamiento activas. Si se envían tareas adicionales cuando todos los hilos están activos, esperarán en la cola hasta que haya un hilo disponible. Si un hilo termina debido a un fallo durante la ejecución antes del apagado, uno nuevo tomará su lugar si es necesario para ejecutar tareas posteriores. Los hilos en el grupo existirán hasta que se cierre explícitamente.

    Aquí hay otra solución. Creo que esta solución se comporta como desea (aunque no está orgulloso de esta solución):

     final LinkedBlockingQueue queue = new LinkedBlockingQueue() { public boolean offer(Runnable o) { if (size() > 1) return false; return super.offer(o); }; public boolean add(Runnable o) { if (super.offer(o)) return true; else throw new IllegalStateException("Queue full"); } }; RejectedExecutionHandler handler = new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { queue.add(r); } }; dbThreadExecutor = new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler); 
    1. Puedes usar ThreadPoolExecutor como lo sugiere @sjlee

      Puede controlar el tamaño del grupo de forma dinámica. Eche un vistazo a esta pregunta para más detalles:

      Grupo de hilos dynamic

      O

    2. Puede usar la API newWorkStealingPool , que se ha introducido con java 8.

       public static ExecutorService newWorkStealingPool() 

      Crea un grupo de subprocesos de robo de trabajo utilizando todos los procesadores disponibles como su nivel de paralelismo de destino.

    De forma predeterminada, el nivel de paralelismo se establece en el número de núcleos de CPU en su servidor. Si tiene un servidor de núcleo de 4 núcleos, el tamaño del grupo de subprocesos sería 4. Esta API devuelve el tipo ForkJoinPool de ExecutorService y permite el robo de trabajo de subprocesos inactivos al robar tareas de subprocesos ocupados en ForkJoinPool.