Cómo convertir OutputStream a InputStream?

Estoy en la etapa de desarrollo, donde tengo dos módulos y de uno obtuve salida como OutputStream y la segunda, que acepta solo InputStream . ¿Sabes cómo convertir OutputStream a InputStream (no al revés, me refiero realmente a este modo) que podré conectar estas dos partes?

Gracias

Un OutputStream es aquel en el que escribe datos. Si algún módulo expone un OutputStream , la expectativa es que hay algo leyendo en el otro extremo.

Algo que expone un InputStream , por otro lado, indica que tendrá que escuchar esta secuencia, y habrá datos que puede leer.

Por lo tanto, es posible conectar un InputStream a un OutputStream

InputStream----read---> intermediateBytes[n] ----write----> OutputStream

Como alguien mencionó, esto es lo que el método copy() de IOUtils le permite hacer. No tiene sentido ir por el otro lado … espero que tenga sentido

ACTUALIZAR:

Por supuesto, cuanto más pienso en esto, más puedo ver cómo esto realmente sería un requisito. Sé que algunos de los comentarios mencionaron los flujos de entrada / salida de Piped, pero hay otra posibilidad.

Si el flujo de salida que está expuesto es un ByteArrayOutputStream , siempre puede obtener el contenido completo llamando al método toByteArray() . Luego puede crear un contenedor de flujo de entrada utilizando la subclase ByteArrayInputStream . Estas dos son pseudo-streams, ambas básicamente envuelven una matriz de bytes. Usar las streams de esta manera, por lo tanto, es técnicamente posible, pero para mí todavía es muy extraño …

Parece que hay muchos enlaces y otras cosas similares, pero no hay un código real que use tuberías. La ventaja de usar java.io.PipedInputStream y java.io.PipedOutputStream es que no hay consumo adicional de memoria. ByteArrayOutputStream.toByteArray() devuelve una copia del búfer original, por lo que significa que tenga lo que tenga en la memoria, ahora tiene dos copias. Luego, escribir en un InputStream significa que ahora tiene tres copias de los datos.

El código:

 // take the copy of the stream and re-write it to an InputStream PipedInputStream in = new PipedInputStream(); final PipedOutputStream out = new PipedOutputStream(in); new Thread(new Runnable() { public void run () { try { // write the original OutputStream to the PipedOutputStream originalByteArrayOutputStream.writeTo(out); } catch (IOException e) { // logging and exception handling should go here } } }).start(); 

Este código supone que el originalByteArrayOutputStream es un ByteArrayOutputStream ya que suele ser la única secuencia de salida utilizable, a menos que esté escribiendo en un archivo. ¡Espero que esto ayude! Lo bueno de esto es que, dado que se trata de un hilo separado, también funciona en paralelo, por lo que cualquier cosa que consum tu flujo de entrada también se transferirá desde tu flujo de salida anterior. Eso es beneficioso porque el buffer puede permanecer más pequeño y tendrá menos latencia y menos uso de memoria.

Necesitarás una clase intermedia que amortiguará entre. Cada vez que se InputStream.read(byte[]...) , la clase de búfer rellenará la matriz de bytes pasados ​​con el siguiente fragmento transferido desde OutputStream.write(byte[]...) . Dado que los tamaños de los fragmentos pueden no ser los mismos, la clase de adaptador deberá almacenar una cierta cantidad hasta que tenga suficiente para llenar el búfer de lectura y / o pueda almacenar cualquier desbordamiento de búfer.

Este artículo tiene un buen resumen de algunos enfoques diferentes para este problema:

http://blog.ostermiller.org/convert-java-outputstream-inputstream

Como las secuencias de entrada y salida son solo puntos de inicio y finalización, la solución es almacenar datos temporalmente en una matriz de bytes. Por lo tanto, debe crear ByteArrayOutputStream intermedio, desde el que crea byte[] que se utiliza como entrada para el nuevo ByteArrayInputStream .

 public void doTwoThingsWithStream(InputStream inStream, OutputStream outStream){ //create temporary bayte array output stream ByteArrayOutputStream baos = new ByteArrayOutputStream(); doFirstThing(inStream, baos); //create input stream from baos InputStream isFromFirstData = new ByteArrayInputStream(baos.toByteArray()); doSecondThing(isFromFirstData, outStream); } 

Espero eso ayude.

La biblioteca de código abierto de easystream tiene soporte directo para convertir un OutputStream en un InputStream: http://io-tools.sourceforge.net/easystream/tutorial/tutorial.html

También enumeran otras opciones: http://io-tools.sourceforge.net/easystream/OutputStream_to_InputStream.html

 ByteArrayOutputStream buffer = (ByteArrayOutputStream) aOutputStream; byte[] bytes = buffer.toByteArray(); InputStream inputStream = new ByteArrayInputStream(bytes); 

Encontré el mismo problema al convertir un ByteArrayOutputStream en ByteArrayInputStream y lo resolví utilizando una clase derivada de ByteArrayOutputStream que puede devolver un ByteArrayInputStream que se inicializa con el búfer interno de ByteArrayOutputStream . De esta forma, no se utiliza memoria adicional y la “conversión” es muy rápida:

 package info.whitebyte.utils; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; /** * This class extends the ByteArrayOutputStream by * providing a method that returns a new ByteArrayInputStream * which uses the internal byte array buffer. This buffer * is not copied, so no additional memory is used. After * creating the ByteArrayInputStream the instance of the * ByteArrayInOutStream can not be used anymore. * 

* The ByteArrayInputStream can be retrieved using getInputStream(). * @author Nick Russler */ public class ByteArrayInOutStream extends ByteArrayOutputStream { /** * Creates a new ByteArrayInOutStream. The buffer capacity is * initially 32 bytes, though its size increases if necessary. */ public ByteArrayInOutStream() { super(); } /** * Creates a new ByteArrayInOutStream, with a buffer capacity of * the specified size, in bytes. * * @param size the initial size. * @exception IllegalArgumentException if size is negative. */ public ByteArrayInOutStream(int size) { super(size); } /** * Creates a new ByteArrayInputStream that uses the internal byte array buffer * of this ByteArrayInOutStream instance as its buffer array. The initial value * of pos is set to zero and the initial value of count is the number of bytes * that can be read from the byte array. The buffer array is not copied. This * instance of ByteArrayInOutStream can not be used anymore after calling this * method. * @return the ByteArrayInputStream instance */ public ByteArrayInputStream getInputStream() { // create new ByteArrayInputStream that respects the current count ByteArrayInputStream in = new ByteArrayInputStream(this.buf, 0, this.count); // set the buffer of the ByteArrayOutputStream // to null so it can't be altered anymore this.buf = null; return in; } }

Puse el material en github: https://github.com/nickrussler/ByteArrayInOutStream

La biblioteca io-extras puede ser útil. Por ejemplo, si quiere gzip un InputStream usando GZIPOutputStream y quiere que ocurra de forma síncrona (usando el tamaño de búfer predeterminado de 8192):

 InputStream is = ... InputStream gz = IOUtil.pipe(is, o -> new GZIPOutputStream(o)); 

Tenga en cuenta que la biblioteca tiene una cobertura de prueba 100% unitaria (¡por lo que vale la pena, por supuesto!) Y está en Maven Central. La dependencia de Maven es:

  com.github.davidmoten io-extras 0.1  

Asegúrate de buscar una versión posterior.

Desde mi punto de vista, java.io.PipedInputStream / java.io.PipedOutputStream es la mejor opción para considerar. En algunas situaciones, es posible que desee utilizar ByteArrayInputStream / ByteArrayOutputStream. El problema es que necesita duplicar el búfer para convertir un ByteArrayOutputStream a ByteArrayInputStream. También ByteArrayOutpuStream / ByteArrayInputStream están limitados a 2 GB. Aquí hay una implementación de OutpuStream / InputStream que escribí para eludir las limitaciones de ByteArrayOutputStream / ByteArrayInputStream (código de Scala, pero fácilmente comprensible para los desarrolladores de Java):

 import java.io.{IOException, InputStream, OutputStream} import scala.annotation.tailrec /** Acts as a replacement for ByteArrayOutputStream * */ class HugeMemoryOutputStream(capacity: Long) extends OutputStream { private val PAGE_SIZE: Int = 1024000 private val ALLOC_STEP: Int = 1024 /** Pages array * */ private var streamBuffers: Array[Array[Byte]] = Array.empty[Array[Byte]] /** Allocated pages count * */ private var pageCount: Int = 0 /** Allocated bytes count * */ private var allocatedBytes: Long = 0 /** Current position in stream * */ private var position: Long = 0 /** Stream length * */ private var length: Long = 0 allocSpaceIfNeeded(capacity) /** Gets page count based on given length * * @param length Buffer length * @return Page count to hold the specified amount of data */ private def getPageCount(length: Long) = { var pageCount = (length / PAGE_SIZE).toInt + 1 if ((length % PAGE_SIZE) == 0) { pageCount -= 1 } pageCount } /** Extends pages array * */ private def extendPages(): Unit = { if (streamBuffers.isEmpty) { streamBuffers = new Array[Array[Byte]](ALLOC_STEP) } else { val newStreamBuffers = new Array[Array[Byte]](streamBuffers.length + ALLOC_STEP) Array.copy(streamBuffers, 0, newStreamBuffers, 0, streamBuffers.length) streamBuffers = newStreamBuffers } pageCount = streamBuffers.length } /** Ensures buffers are bug enough to hold specified amount of data * * @param value Amount of data */ private def allocSpaceIfNeeded(value: Long): Unit = { @tailrec def allocSpaceIfNeededIter(value: Long): Unit = { val currentPageCount = getPageCount(allocatedBytes) val neededPageCount = getPageCount(value) if (currentPageCount < neededPageCount) { if (currentPageCount == pageCount) extendPages() streamBuffers(currentPageCount) = new Array[Byte](PAGE_SIZE) allocatedBytes = (currentPageCount + 1).toLong * PAGE_SIZE allocSpaceIfNeededIter(value) } } if (value < 0) throw new Error("AllocSpaceIfNeeded < 0") if (value > 0) { allocSpaceIfNeededIter(value) length = Math.max(value, length) if (position > length) position = length } } /** * Writes the specified byte to this output stream. The general * contract for write is that one byte is written * to the output stream. The byte to be written is the eight * low-order bits of the argument b. The 24 * high-order bits of b are ignored. * 

* Subclasses of OutputStream must provide an * implementation for this method. * * @param b the byte. */ @throws[IOException] override def write(b: Int): Unit = { val buffer: Array[Byte] = new Array[Byte](1) buffer(0) = b.toByte write(buffer) } /** * Writes len bytes from the specified byte array * starting at offset off to this output stream. * The general contract for write(b, off, len) is that * some of the bytes in the array b are written to the * output stream in order; element b[off] is the first * byte written and b[off+len-1] is the last byte written * by this operation. *

* The write method of OutputStream calls * the write method of one argument on each of the bytes to be * written out. Subclasses are encouraged to override this method and * provide a more efficient implementation. *

* If b is null, a * NullPointerException is thrown. *

* If off is negative, or len is negative, or * off+len is greater than the length of the array * b, then an IndexOutOfBoundsException is thrown. * * @param b the data. * @param off the start offset in the data. * @param len the number of bytes to write. */ @throws[IOException] override def write(b: Array[Byte], off: Int, len: Int): Unit = { @tailrec def writeIter(b: Array[Byte], off: Int, len: Int): Unit = { val currentPage: Int = (position / PAGE_SIZE).toInt val currentOffset: Int = (position % PAGE_SIZE).toInt if (len != 0) { val currentLength: Int = Math.min(PAGE_SIZE - currentOffset, len) Array.copy(b, off, streamBuffers(currentPage), currentOffset, currentLength) position += currentLength writeIter(b, off + currentLength, len - currentLength) } } allocSpaceIfNeeded(position + len) writeIter(b, off, len) } /** Gets an InputStream that points to HugeMemoryOutputStream buffer * * @return InputStream */ def asInputStream(): InputStream = { new HugeMemoryInputStream(streamBuffers, length) } private class HugeMemoryInputStream(streamBuffers: Array[Array[Byte]], val length: Long) extends InputStream { /** Current position in stream * */ private var position: Long = 0 /** * Reads the next byte of data from the input stream. The value byte is * returned as an int in the range 0 to * 255. If no byte is available because the end of the stream * has been reached, the value -1 is returned. This method * blocks until input data is available, the end of the stream is detected, * or an exception is thrown. * *

A subclass must provide an implementation of this method. * * @return the next byte of data, or -1 if the end of the * stream is reached. */ @throws[IOException] def read: Int = { val buffer: Array[Byte] = new Array[Byte](1) if (read(buffer) == 0) throw new Error("End of stream") else buffer(0) } /** * Reads up to len bytes of data from the input stream into * an array of bytes. An attempt is made to read as many as * len bytes, but a smaller number may be read. * The number of bytes actually read is returned as an integer. * *

This method blocks until input data is available, end of file is * detected, or an exception is thrown. * *

If len is zero, then no bytes are read and * 0 is returned; otherwise, there is an attempt to read at * least one byte. If no byte is available because the stream is at end of * file, the value -1 is returned; otherwise, at least one * byte is read and stored into b. * *

The first byte read is stored into element b[off], the * next one into b[off+1], and so on. The number of bytes read * is, at most, equal to len. Let k be the number of * bytes actually read; these bytes will be stored in elements * b[off] through b[off+k-1], * leaving elements b[off+k] through * b[off+len-1] unaffected. * *

In every case, elements b[0] through * b[off] and elements b[off+len] through * b[b.length-1] are unaffected. * *

The read(b, off, len) method * for class InputStream simply calls the method * read() repeatedly. If the first such call results in an * IOException, that exception is returned from the call to * the read(b, off, len) method. If * any subsequent call to read() results in a * IOException, the exception is caught and treated as if it * were end of file; the bytes read up to that point are stored into * b and the number of bytes read before the exception * occurred is returned. The default implementation of this method blocks * until the requested amount of input data len has been read, * end of file is detected, or an exception is thrown. Subclasses are encouraged * to provide a more efficient implementation of this method. * * @param b the buffer into which the data is read. * @param off the start offset in array b * at which the data is written. * @param len the maximum number of bytes to read. * @return the total number of bytes read into the buffer, or * -1 if there is no more data because the end of * the stream has been reached. * @see java.io.InputStream#read() */ @throws[IOException] override def read(b: Array[Byte], off: Int, len: Int): Int = { @tailrec def readIter(acc: Int, b: Array[Byte], off: Int, len: Int): Int = { val currentPage: Int = (position / PAGE_SIZE).toInt val currentOffset: Int = (position % PAGE_SIZE).toInt val count: Int = Math.min(len, length - position).toInt if (count == 0 || position >= length) acc else { val currentLength = Math.min(PAGE_SIZE - currentOffset, count) Array.copy(streamBuffers(currentPage), currentOffset, b, off, currentLength) position += currentLength readIter(acc + currentLength, b, off + currentLength, len - currentLength) } } readIter(0, b, off, len) } /** * Skips over and discards n bytes of data from this input * stream. The skip method may, for a variety of reasons, end * up skipping over some smaller number of bytes, possibly 0. * This may result from any of a number of conditions; reaching end of file * before n bytes have been skipped is only one possibility. * The actual number of bytes skipped is returned. If n is * negative, the skip method for class InputStream always * returns 0, and no bytes are skipped. Subclasses may handle the negative * value differently. * * The skip method of this class creates a * byte array and then repeatedly reads into it until n bytes * have been read or the end of the stream has been reached. Subclasses are * encouraged to provide a more efficient implementation of this method. * For instance, the implementation may depend on the ability to seek. * * @param n the number of bytes to be skipped. * @return the actual number of bytes skipped. */ @throws[IOException] override def skip(n: Long): Long = { if (n < 0) 0 else { position = Math.min(position + n, length) length - position } } } }

Fácil de usar, sin duplicación de búfer, sin límite de memoria de 2 GB

 val out: HugeMemoryOutputStream = new HugeMemoryOutputStream(initialCapacity /*may be 0*/) out.write(...) ... val in1: InputStream = out.asInputStream() in1.read(...) ... val in2: InputStream = out.asInputStream() in2.read(...) ... 

Si desea hacer un OutputStream desde un InputStream, hay un problema básico. Un método que escribe en un bloque OutputStream hasta que finaliza. Entonces, el resultado está disponible cuando el método de escritura finaliza. Esto tiene 2 consecuencias:

  1. Si usa solo un hilo, debe esperar hasta que todo esté escrito (por lo que debe almacenar los datos del flujo en la memoria o en el disco).
  2. Si desea acceder a los datos antes de que finalicen, necesita un segundo hilo.

La variante 1 puede implementarse mediante matrices de bytes o archivarse. La variante 1 se puede implementar usando pipies (ya sea directamente o con una abstracción adicional, por ejemplo, RingBuffer o Google Lib del otro comentario).

De hecho, con Java estándar no hay otra manera de resolver el problema. Cada solución es una implementación de uno de estos.

Hay un concepto llamado “continuación” (ver wikipedia para más detalles). En este caso, básicamente esto significa:

  • hay un flujo de salida especial que espera una cierta cantidad de datos
  • si se llega a la cantidad, la secuencia da control a su contraparte, que es una stream de entrada especial
  • la secuencia de entrada hace que la cantidad de datos esté disponible hasta que se lea; luego, devuelve el control a la secuencia de salida

Si bien algunos idiomas tienen este concepto incorporado, para Java necesitas algo de “magia”. Por ejemplo, “commons-javaflow” de apache implementa tales para java. La desventaja es que esto requiere algunas modificaciones especiales de bytecode en tiempo de comstackción. Por lo tanto, tendría sentido colocar todas las cosas en una biblioteca adicional con scripts de comstackción personalizados.

Publicación anterior pero podría ayudar a otros, usar de esta manera:

 OutputStream out = new ByteArrayOutputStream(); ... out.write(); ... ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(out.toString().getBytes())); 

Aunque no puede convertir un OutputStream a un InputStream, java proporciona una forma de utilizar PipedOutputStream y PipedInputStream para que los datos escritos en un PipedOutputStream estén disponibles a través de un PipedInputStream asociado.
Hace algún tiempo me enfrenté a una situación similar cuando manejaba bibliotecas de terceros que requerían que se les pasara una instancia de InputStream en lugar de una instancia de OutputStream.
La forma en que resolvió este problema es utilizar PipedInputStream y PipedOutputStream.
Por cierto, son difíciles de usar y debes usar multihilo para lograr lo que quieres. Recientemente publiqué una implementación en github que puedes usar.
Aquí está el enlace . Puede ir a través de la wiki para comprender cómo usarla.