¿Es posible leer desde un InputStream con un tiempo de espera?

Específicamente, el problema es escribir un método como este:

int maybeRead(InputStream in, long timeout) 

donde el valor de retorno es el mismo que en .read () si los datos están disponibles dentro de “milisegundos de tiempo de espera” y de lo contrario -2. Antes de que el método regrese, cualquier subproceso generado debe salir.

Para evitar los argumentos, el tema aquí java.io.InputStream, según lo documentado por Sun (cualquier versión de Java). Tenga en cuenta que esto no es tan simple como parece. A continuación, se incluyen algunos hechos que son respaldados directamente por la documentación de Sun.

  1. El método in.read () puede no ser interrumpible.

  2. Envolver el InputStream en un Reader o InterruptibleChannel no ayuda, porque todas esas clases pueden hacer es llamar a los métodos de InputStream. Si fuera posible utilizar esas clases, sería posible escribir una solución que simplemente ejecute la misma lógica directamente en el InputStream.

  3. Siempre es aceptable que in.available () devuelva 0.

  4. El método in.close () puede bloquear o no hacer nada.

  5. No hay una forma general de matar otro hilo.

Usando inputStream.available ()

Siempre es aceptable que System.in.available () devuelva 0.

He encontrado lo contrario: siempre devuelve el mejor valor para la cantidad de bytes disponibles. Javadoc para InputStream.available() :

 Returns an estimate of the number of bytes that can be read (or skipped over) from this input stream without blocking by the next invocation of a method for this input stream. 

Una estimación es inevitable debido a la temporización / estancamiento. La cifra puede ser una subestimación única porque constantemente llegan nuevos datos. Sin embargo, siempre se “pone al día” en la próxima llamada: debe tener en cuenta todos los datos recibidos, y la barra que llega justo en el momento de la nueva llamada. Devuelve permanentemente 0 cuando hay datos que no cumplen con la condición anterior.

Primera advertencia: las subclases concretas de InputStream son responsables de () disponible

InputStream es una clase abstracta. No tiene una fuente de datos. No tiene sentido que tenga datos disponibles. Por lo tanto, javadoc for available() también declara:

 The available method for class InputStream always returns 0. This method should be overridden by subclasses. 

Y, de hecho, las clases de flujo de entrada concretas anulan disponible (), proporcionando valores significativos, no constantes de 0.

Segunda advertencia: asegúrese de utilizar el retorno de carro al escribir la entrada en Windows.

Si usa System.in , su progtwig solo recibe información cuando su caparazón de comando se la entrega. Si está utilizando la redirección / canalización de archivos (por ejemplo, algún archivo> java myJavaApp o algún comando | java myJavaApp), los datos de entrada generalmente se entregan inmediatamente. Sin embargo, si ingresa manualmente la entrada, el traspaso de datos puede retrasarse. Por ejemplo, con el shell windows cmd.exe, los datos se almacenan en búfer dentro del shell cmd.exe. Los datos solo se pasan al progtwig Java en ejecución después del retorno de carro (control-m o ). Esa es una limitación del entorno de ejecución. Por supuesto, InputStream.available () devolverá 0 durante el tiempo que el shell almacena en búfer los datos; ese es el comportamiento correcto; no hay datos disponibles en ese punto. Tan pronto como los datos estén disponibles desde el shell, el método devuelve un valor> 0. NB: Cygwin también usa cmd.exe.

La solución más simple (sin locking, por lo que no se requiere tiempo de espera)

Solo usa esto:

  byte[] inputData = new byte[1024]; int result = is.read(inputData, 0, is.available()); // result will indicate number of bytes read; -1 for EOF with no data read. 

O equivalente,

  BufferedReader br = new BufferedReader(new InputStreamReader(System.in, Charset.forName("ISO-8859-1")),1024); // ... // inside some iteration / processing logic: if (br.ready()) { int readCount = br.read(inputData, bufferOffset, inputData.length-bufferOffset); } 

Solución más rica (llena al máximo el búfer dentro del período de tiempo de espera)

Declara esto:

 public static int readInputStreamWithTimeout(InputStream is, byte[] b, int timeoutMillis) throws IOException { int bufferOffset = 0; long maxTimeMillis = System.currentTimeMillis() + timeoutMillis; while (System.currentTimeMillis() < maxTimeMillis && bufferOffset < b.length) { int readLength = java.lang.Math.min(is.available(),b.length-bufferOffset); // can alternatively use bufferedReader, guarded by isReady(): int readResult = is.read(b, bufferOffset, readLength); if (readResult == -1) break; bufferOffset += readResult; } return bufferOffset; } 

Luego usa esto:

  byte[] inputData = new byte[1024]; int readCount = readInputStreamWithTimeout(System.in, inputData, 6000); // 6 second timeout // readCount will indicate number of bytes read; -1 for EOF with no data read. 

Suponiendo que su flujo no está respaldado por un socket (por lo que no puede usar Socket.setSoTimeout() ), creo que la forma estándar de resolver este tipo de problema es usar un futuro.

Supongamos que tengo el siguiente ejecutor y transmisiones:

  ExecutorService executor = Executors.newFixedThreadPool(2); final PipedOutputStream outputStream = new PipedOutputStream(); final PipedInputStream inputStream = new PipedInputStream(outputStream); 

Tengo un escritor que escribe algunos datos y luego espera 5 segundos antes de escribir la última información y cerrar la transmisión:

  Runnable writeTask = new Runnable() { @Override public void run() { try { outputStream.write(1); outputStream.write(2); Thread.sleep(5000); outputStream.write(3); outputStream.close(); } catch (Exception e) { e.printStackTrace(); } } }; executor.submit(writeTask); 

La forma normal de leer esto es la siguiente. La lectura bloqueará indefinidamente los datos, por lo que se completa en 5s:

  long start = currentTimeMillis(); int readByte = 1; // Read data without timeout while (readByte >= 0) { readByte = inputStream.read(); if (readByte >= 0) System.out.println("Read: " + readByte); } System.out.println("Complete in " + (currentTimeMillis() - start) + "ms"); 

qué salidas:

 Read: 1 Read: 2 Read: 3 Complete in 5001ms 

Si hubiera un problema más fundamental, como que el escritor no respondiera, el lector bloquearía para siempre. Si envuelvo la lectura en un futuro, puedo controlar el tiempo de espera de la siguiente manera:

  int readByte = 1; // Read data with timeout Callable readTask = new Callable() { @Override public Integer call() throws Exception { return inputStream.read(); } }; while (readByte >= 0) { Future future = executor.submit(readTask); readByte = future.get(1000, TimeUnit.MILLISECONDS); if (readByte >= 0) System.out.println("Read: " + readByte); } 

qué salidas:

 Read: 1 Read: 2 Exception in thread "main" java.util.concurrent.TimeoutException at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228) at java.util.concurrent.FutureTask.get(FutureTask.java:91) at test.InputStreamWithTimeoutTest.main(InputStreamWithTimeoutTest.java:74) 

Puedo atrapar la TimeoutException y hacer la limpieza que desee.

Me gustaría cuestionar el enunciado del problema en lugar de simplemente aceptarlo a ciegas. Solo necesita tiempos de espera de la consola o de la red. Si este último tiene Socket.setSoTimeout() y HttpURLConnection.setReadTimeout() ambos hacen exactamente lo que se requiere, siempre y cuando los configure correctamente cuando los construya / adquiera. Dejándolo en un punto arbitrario más adelante en la aplicación cuando todo lo que tiene es el InputStream es un diseño pobre que conduce a una implementación muy incómoda.

Si su InputStream está respaldado por un Socket, puede establecer un tiempo de espera de Socket (en milisegundos) usando setSoTimeout . Si la llamada de lectura () no se desbloquea dentro del tiempo de espera especificado, arrojará una excepción SocketTimeoutException.

Solo asegúrese de llamar a setSoTimeout en el Socket antes de realizar la llamada a read ().

No he usado las clases del paquete Java NIO, pero parece que podrían ser de ayuda aquí. Específicamente, java.nio.channels.Channels y java.nio.channels.InterruptibleChannel .

Esta es una forma de obtener un NIO FileChannel de System.in y verificar la disponibilidad de datos mediante un tiempo de espera, que es un caso especial del problema descrito en la pregunta. Ejecútelo en la consola, no escriba ninguna entrada y espere los resultados. Se probó con éxito en Java 6 en Windows y Linux.

 import java.io.FileInputStream; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.nio.channels.ClosedByInterruptException; public class Main { static final ByteBuffer buf = ByteBuffer.allocate(4096); public static void main(String[] args) { long timeout = 1000 * 5; try { InputStream in = extract(System.in); if (! (in instanceof FileInputStream)) throw new RuntimeException( "Could not extract a FileInputStream from STDIN."); try { int ret = maybeAvailable((FileInputStream)in, timeout); System.out.println( Integer.toString(ret) + " bytes were read."); } finally { in.close(); } } catch (Exception e) { throw new RuntimeException(e); } } /* unravels all layers of FilterInputStream wrappers to get to the * core InputStream */ public static InputStream extract(InputStream in) throws NoSuchFieldException, IllegalAccessException { Field f = FilterInputStream.class.getDeclaredField("in"); f.setAccessible(true); while( in instanceof FilterInputStream ) in = (InputStream)f.get((FilterInputStream)in); return in; } /* Returns the number of bytes which could be read from the stream, * timing out after the specified number of milliseconds. * Returns 0 on timeout (because no bytes could be read) * and -1 for end of stream. */ public static int maybeAvailable(final FileInputStream in, long timeout) throws IOException, InterruptedException { final int[] dataReady = {0}; final IOException[] maybeException = {null}; final Thread reader = new Thread() { public void run() { try { dataReady[0] = in.getChannel().read(buf); } catch (ClosedByInterruptException e) { System.err.println("Reader interrupted."); } catch (IOException e) { maybeException[0] = e; } } }; Thread interruptor = new Thread() { public void run() { reader.interrupt(); } }; reader.start(); for(;;) { reader.join(timeout); if (!reader.isAlive()) break; interruptor.start(); interruptor.join(1000); reader.join(1000); if (!reader.isAlive()) break; System.err.println("We're hung"); System.exit(1); } if ( maybeException[0] != null ) throw maybeException[0]; return dataReady[0]; } } 

Curiosamente, cuando se ejecuta el progtwig dentro de NetBeans 6.5 en lugar de en la consola, el tiempo de espera no funciona en absoluto, y la llamada a System.exit () es realmente necesaria para eliminar los subprocesos zombie. Lo que sucede es que el hilo interruptor bloquea (!) En la llamada a reader.interrupt (). Otro progtwig de prueba (que no se muestra aquí) también intenta cerrar el canal, pero eso tampoco funciona.

Como dijo jt, NIO es la mejor (y correcta) solución. Si realmente estás atrapado con un InputStream, podrías

  1. Genera un hilo cuyo trabajo exclusivo es leer desde InputStream y poner el resultado en un búfer que se puede leer desde el hilo original sin bloquear. Esto debería funcionar bien si solo tienes una instancia de la transmisión. De lo contrario, es posible que pueda eliminar el hilo utilizando los métodos desaprobados en la clase Thread, aunque esto puede causar pérdidas de recursos.

  2. Confíe en está disponible para indicar datos que se pueden leer sin bloquear. Sin embargo, en algunos casos (como en el caso de los Sockets) puede llevar una lectura de locking potencial para isAvailable para informar algo distinto de 0.