Lectura / escritura de flujo asíncrono .NET

He estado tratando de resolver este ejercicio de examen de “Simultaneidad Concurrente” (en C #):

Sabiendo que la clase Stream contiene los métodos int Read(byte[] buffer, int offset, int size) y void Write(byte[] buffer, int offset, int size) , implementa en C # el método NetToFile que copia todos los datos recibidos de NetworkStream net instancia a la instancia del FileStream file . Para realizar la transferencia, utilice lecturas asíncronas y escrituras síncronas, evitando que se bloquee un hilo durante las operaciones de lectura. La transferencia finaliza cuando la operación de lectura de la net devuelve el valor 0. Para simplificar, no es necesario admitir la cancelación controlada de la operación.

 void NetToFile(NetworkStream net, FileStream file); 

He estado tratando de resolver este ejercicio, pero estoy luchando con una pregunta relacionada con la pregunta en sí. Pero primero, aquí está mi código:

 public static void NetToFile(NetworkStream net, FileStream file) { byte[] buffer = new byte[4096]; // buffer with 4 kB dimension int offset = 0; // read/write offset int nBytesRead = 0; // number of bytes read on each cycle IAsyncResult ar; do { // read partial content of net (asynchronously) ar = net.BeginRead(buffer,offset,buffer.Length,null,null); // wait until read is completed ar.AsyncWaitHandle.WaitOne(); // get number of bytes read on each cycle nBytesRead = net.EndRead(ar); // write partial content to file (synchronously) fs.Write(buffer,offset,nBytesRead); // update offset offset += nBytesRead; } while( nBytesRead > 0); } 

La pregunta que tengo es que, en la statement de pregunta, se dice:

Para realizar la transferencia, utilice lecturas asíncronas y escrituras síncronas, evitando que se bloquee un hilo durante las operaciones de lectura

No estoy seguro de si mi solución logra lo que se quiere en este ejercicio, porque estoy usando AsyncWaitHandle.WaitOne() para esperar hasta que se complete la lectura asincrónica.

Por otro lado, no estoy realmente averiguando qué se supone que es una solución “sin locking” en este escenario, ya que la escritura FileStream está hecha de forma sincrónica … y para hacerlo, tengo que esperar hasta que se complete la lectura de NetworkStream para continuar con la escritura de FileStream , ¿no es así?

¿Puedes ayudarme con esto?


[EDITAR 1] Usar la solución de callback

De acuerdo, si entendí lo que Mitchel Sellers y Willvv respondieron, se me aconsejó utilizar un método de callback para convertir esto en una solución “sin locking”. Aquí está mi código, entonces:

 byte[] buffer; // buffer public static void NetToFile(NetworkStream net, FileStream file) { // buffer with same dimension as file stream data buffer = new byte[file.Length]; //start asynchronous read net.BeginRead(buffer,0,buffer.Length,OnEndRead,net); } //asynchronous callback static void OnEndRead(IAsyncResult ar) { //NetworkStream retrieve NetworkStream net = (NetworkStream) ar.IAsyncState; //get number of bytes read int nBytesRead = net.EndRead(ar); //write content to file //... and now, how do I write to FileStream instance without //having its reference?? //fs.Write(buffer,0,nBytesRead); } 

Como habrás notado, estoy atascado en el método de callback, ya que no tengo una referencia a la instancia de FileStream en la que deseo invocar el método “Write (…)”.

Además, esta no es una solución segura para subprocesos, ya que el campo byte[] está expuesto y puede compartirse entre invocaciones de NetToFile simultáneas. No sé cómo resolver este problema sin exponer este campo byte[] en el scope externo … y estoy casi seguro de que no se expondrá de esta manera.

No quiero utilizar una solución de método lambda o anónima, porque eso no está en el plan de estudios del curso de “Progtwigción concurrente”.

Necesitará usar la callback desde la lectura de NetStream para manejar esto. Y, francamente, podría ser más fácil ajustar la lógica de copia en su propia clase para que pueda mantener la instancia de las secuencias activas.

Así es como me acercaría (no probado):

 public class Assignment1 { public static void NetToFile(NetworkStream net, FileStream file) { var copier = new AsyncStreamCopier(net, file); copier.Start(); } public static void NetToFile_Option2(NetworkStream net, FileStream file) { var completedEvent = new ManualResetEvent(false); // copy as usual but listen for completion var copier = new AsyncStreamCopier(net, file); copier.Completed += (s, e) => completedEvent.Set(); copier.Start(); completedEvent.WaitOne(); } ///  /// The Async Copier class reads the input Stream Async and writes Synchronously ///  public class AsyncStreamCopier { public event EventHandler Completed; private readonly Stream input; private readonly Stream output; private byte[] buffer = new byte[4096]; public AsyncStreamCopier(Stream input, Stream output) { this.input = input; this.output = output; } public void Start() { GetNextChunk(); } private void GetNextChunk() { input.BeginRead(buffer, 0, buffer.Length, InputReadComplete, null); } private void InputReadComplete(IAsyncResult ar) { // input read asynchronously completed int bytesRead = input.EndRead(ar); if (bytesRead == 0) { RaiseCompleted(); return; } // write synchronously output.Write(buffer, 0, bytesRead); // get next GetNextChunk(); } private void RaiseCompleted() { if (Completed != null) { Completed(this, EventArgs.Empty); } } } } 

A pesar de que va contra stream ayudar a las personas con sus tareas, dado que esto tiene más de un año, esta es la manera correcta de lograr esto. Todo lo que necesita para superponer sus operaciones de lectura / escritura – no se necesita engendrar hilos adicionales, ni nada más.

 public static class StreamExtensions { private const int DEFAULT_BUFFER_SIZE = short.MaxValue ; // +32767 public static void CopyTo( this Stream input , Stream output ) { input.CopyTo( output , DEFAULT_BUFFER_SIZE ) ; return ; } public static void CopyTo( this Stream input , Stream output , int bufferSize ) { if ( !input.CanRead ) throw new InvalidOperationException( "input must be open for reading" ); if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" ); byte[][] buf = { new byte[bufferSize] , new byte[bufferSize] } ; int[] bufl = { 0 , 0 } ; int bufno = 0 ; IAsyncResult read = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ; IAsyncResult write = null ; while ( true ) { // wait for the read operation to complete read.AsyncWaitHandle.WaitOne() ; bufl[bufno] = input.EndRead(read) ; // if zero bytes read, the copy is complete if ( bufl[bufno] == 0 ) { break ; } // wait for the in-flight write operation, if one exists, to complete // the only time one won't exist is after the very first read operation completes if ( write != null ) { write.AsyncWaitHandle.WaitOne() ; output.EndWrite(write) ; } // start the new write operation write = output.BeginWrite( buf[bufno] , 0 , bufl[bufno] , null , null ) ; // toggle the current, in-use buffer // and start the read operation on the new buffer. // // Changed to use XOR to toggle between 0 and 1. // A little speedier than using a ternary expression. bufno ^= 1 ; // bufno = ( bufno == 0 ? 1 : 0 ) ; read = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ; } // wait for the final in-flight write operation, if one exists, to complete // the only time one won't exist is if the input stream is empty. if ( write != null ) { write.AsyncWaitHandle.WaitOne() ; output.EndWrite(write) ; } output.Flush() ; // return to the caller ; return ; } public static async Task CopyToAsync( this Stream input , Stream output ) { await input.CopyToAsync( output , DEFAULT_BUFFER_SIZE ) ; return; } public static async Task CopyToAsync( this Stream input , Stream output , int bufferSize ) { if ( !input.CanRead ) throw new InvalidOperationException( "input must be open for reading" ); if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" ); byte[][] buf = { new byte[bufferSize] , new byte[bufferSize] } ; int[] bufl = { 0 , 0 } ; int bufno = 0 ; Task read = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length ) ; Task write = null ; while ( true ) { await read ; bufl[bufno] = read.Result ; // if zero bytes read, the copy is complete if ( bufl[bufno] == 0 ) { break; } // wait for the in-flight write operation, if one exists, to complete // the only time one won't exist is after the very first read operation completes if ( write != null ) { await write ; } // start the new write operation write = output.WriteAsync( buf[bufno] , 0 , bufl[bufno] ) ; // toggle the current, in-use buffer // and start the read operation on the new buffer. // // Changed to use XOR to toggle between 0 and 1. // A little speedier than using a ternary expression. bufno ^= 1; // bufno = ( bufno == 0 ? 1 : 0 ) ; read = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length ); } // wait for the final in-flight write operation, if one exists, to complete // the only time one won't exist is if the input stream is empty. if ( write != null ) { await write; } output.Flush(); // return to the caller ; return; } } 

Aclamaciones.

Dudo que este sea el código más rápido (hay una sobrecarga de la abstracción de tareas de .NET) pero creo que es un enfoque más limpio para todo lo de la copia asincrónica.

Necesitaba un CopyTransformAsync en el que podía pasar a un delegado para hacer algo a medida que se pasaban trozos a través de la operación de copia. por ejemplo, calcular un resumen del mensaje mientras se copia. Es por eso que me interesé en rodar mi propia opción.

Recomendaciones:

  • CopyToAsync bufferSize es sensible (se requiere un gran buffer)
  • FileOptions.Asynchronous -> lo hace tremendamente lento (no estoy seguro de por qué es eso)
  • El bufferSize de los objetos FileStream puede ser más pequeño (no es tan importante)
  • La prueba Serial es claramente la más rápida y la más intensiva en recursos

Esto es lo que encontré y el código fuente completo del progtwig que usé para probar esto. En mi máquina, estas pruebas se ejecutaron en un disco SSD y es el equivalente de una copia de archivo. Normalmente, no querrás usar esto solo para copiar archivos; en cambio, cuando tengas una transmisión en red (que es mi caso de uso), es cuando querrás usar algo como esto.

 4K buffer Serial... in 0.474s CopyToAsync... timed out CopyToAsync (Asynchronous)... timed out CopyTransformAsync... timed out CopyTransformAsync (Asynchronous)... timed out 8K buffer Serial... in 0.344s CopyToAsync... timed out CopyToAsync (Asynchronous)... timed out CopyTransformAsync... in 1.116s CopyTransformAsync (Asynchronous)... timed out 40K buffer Serial... in 0.195s CopyToAsync... in 0.624s CopyToAsync (Asynchronous)... timed out CopyTransformAsync... in 0.378s CopyTransformAsync (Asynchronous)... timed out 80K buffer Serial... in 0.190s CopyToAsync... in 0.355s CopyToAsync (Asynchronous)... in 1.196s CopyTransformAsync... in 0.300s CopyTransformAsync (Asynchronous)... in 0.886s 160K buffer Serial... in 0.432s CopyToAsync... in 0.252s CopyToAsync (Asynchronous)... in 0.454s CopyTransformAsync... in 0.447s CopyTransformAsync (Asynchronous)... in 0.555s 

Aquí puede ver el Explorador de procesos, gráfico de rendimiento a medida que se ejecuta la prueba. Básicamente, cada parte superior (en el más bajo de los tres gráficos) es el comienzo de la prueba en serie. Puede ver claramente cómo el rendimiento aumenta dramáticamente a medida que crece el tamaño del búfer. Parecería que planea en algún lugar alrededor de 80K, que es lo que el método .NET framework CopyToAsync usa internamente.

Gráfico de rendimiento

Lo bueno aquí es que la implementación final no fue tan complicada:

 static Task CompletedTask = ((Task)Task.FromResult(0)); static async Task CopyTransformAsync(Stream inputStream , Stream outputStream , Func, ArraySegment> transform = null ) { var temp = new byte[bufferSize]; var temp2 = new byte[bufferSize]; int i = 0; var readTask = inputStream .ReadAsync(temp, 0, bufferSize) .ConfigureAwait(false); var writeTask = CompletedTask.ConfigureAwait(false); for (; ; ) { // synchronize read int read = await readTask; if (read == 0) { break; } if (i++ > 0) { // synchronize write await writeTask; } var chunk = new ArraySegment(temp, 0, read); // do transform (if any) if (!(transform == null)) { chunk = transform(chunk); } // queue write writeTask = outputStream .WriteAsync(chunk.Array, chunk.Offset, chunk.Count) .ConfigureAwait(false); // queue read readTask = inputStream .ReadAsync(temp2, 0, bufferSize) .ConfigureAwait(false); // swap buffer var temp3 = temp; temp = temp2; temp2 = temp3; } await writeTask; // complete any lingering write task } 

Este método de entrelazado de lectura / escritura a pesar de los enormes búferes es en algún momento entre 18% más rápido que BCL CopyToAsync .

Por curiosidad, modifiqué las llamadas asincrónicas a las típicas llamadas de patrón asíncronas de inicio / finalización y eso no mejoró un poco la situación, empeoró las cosas. Por todo lo que me gusta bash sobre la tara de abstracción de tareas, hacen algunas cosas ingeniosas cuando escribes tu código con las palabras clave async / await y es mucho más agradable leer ese código.

Wow, ¡todos son muy complejos! Aquí está mi solución asíncrona, y es solo una función. Read () y BeginWrite () se ejecutan al mismo tiempo.

 ///  /// Copies a stream. ///  /// The stream containing the source data. /// The stream that will receive the source data. ///  /// This function copies until no more can be read from the stream /// and does not close the stream when done.
/// Read and write are performed simultaneously to improve throughput.
/// If no data can be read for 60 seconds, the copy will time-out. ///
public static void CopyStream(Stream source, Stream target) { // This stream copy supports a source-read happening at the same time // as target-write. A simpler implementation would be to use just // Write() instead of BeginWrite(), at the cost of speed. byte[] readbuffer = new byte[4096]; byte[] writebuffer = new byte[4096]; IAsyncResult asyncResult = null; for (; ; ) { // Read data into the readbuffer. The previous call to BeginWrite, if any, // is executing in the background.. int read = source.Read(readbuffer, 0, readbuffer.Length); // Ok, we have read some data and we're ready to write it, so wait here // to make sure that the previous write is done before we write again. if (asyncResult != null) { // This should work down to ~0.01kb/sec asyncResult.AsyncWaitHandle.WaitOne(60000); target.EndWrite(asyncResult); // Last step to the 'write'. if (!asyncResult.IsCompleted) // Make sure the write really completed. throw new IOException("Stream write failed."); } if (read <= 0) return; // source stream says we're done - nothing else to read. // Swap the read and write buffers so we can write what we read, and we can // use the then use the other buffer for our next read. byte[] tbuf = writebuffer; writebuffer = readbuffer; readbuffer = tbuf; // Asynchronously write the data, asyncResult.AsyncWaitHandle will // be set when done. asyncResult = target.BeginWrite(writebuffer, 0, read, null, null); } }

Es extraño que nadie haya mencionado a TPL.
Esta es una muy buena publicación del equipo de PFX (Stephen Toub) sobre cómo implementar una copia de transmisión asincrónica simultánea. La publicación contiene refenrece obsoleta a las muestras así que aquí está el primer:
Obtenga Extras de Extensiones Paralelas de code.msdn luego

 var task = sourceStream.CopyStreamToStreamAsync(destinationStream); // do what you want with the task, for example wait when it finishes: task.Wait(); 

También considere usar AsyncEnumerator de J.Richer .

Tienes razón, lo que estás haciendo es básicamente lectura síncrona, porque usas el método WaitOne () y simplemente detiene la ejecución hasta que los datos estén listos, eso es básicamente lo mismo que hacerlo usando Read () en lugar de BeginRead ( ) y EndRead ().

Lo que tienes que hacer es usar el argumento de callback en el método BeginRead (), con él definirás un método de callback (o una expresión lambda), este método se invocará cuando la información haya sido leída (en el método de callback que tiene que verificar el final del flujo y escribir en el flujo de salida), de esta manera no estará bloqueando el hilo principal (no necesitará el WaitOne () ni el EndRead ().

Espero que esto ayude.