¿Cuál es el equivalente async / await de un servidor ThreadPool?

Estoy trabajando en un servidor tcp que se parece a esto usando apis síncronas y el grupo de subprocesos:

TcpListener listener; void Serve(){ while(true){ var client = listener.AcceptTcpClient(); ThreadPool.QueueUserWorkItem(this.HandleConnection, client); //Or alternatively new Thread(HandleConnection).Start(client) } } 

Suponiendo que mi objective es manejar tantas conexiones concurrentes como sea posible con el menor uso de recursos, esto parece que estará rápidamente limitado por la cantidad de subprocesos disponibles. Sospecho que mediante el uso de apis de tarea sin locking, podré manejar mucho más con menos recursos.

Mi impresión inicial es algo así como:

 async Task Serve(){ while(true){ var client = await listener.AcceptTcpClientAsync(); HandleConnectionAsync(client); //fire and forget? } } 

Pero me parece que esto podría causar cuellos de botella. Tal vez HandleConnectionAsync demorará un tiempo inusualmente largo para alcanzar el primero en espera, y evitará que el ciclo de aceptación principal continúe. ¿Esto solo usará un hilo cada vez, o el tiempo de ejecución mágicamente ejecutará cosas en múltiples hilos como lo considere conveniente?

¿Hay alguna manera de combinar estos dos enfoques para que mi servidor use exactamente la cantidad de hilos que necesita para el número de tareas que se ejecutan activamente, pero para que no bloquee los hilos innecesariamente en las operaciones de E / S?

¿Existe una forma idiomática de maximizar el rendimiento en una situación como esta?

Permitiría que Framework manejara el subprocesamiento y no crearía ningún subproceso adicional, a menos que las pruebas de perfil sugirieran que podría ser necesario. Especialmente, si las llamadas dentro de HandleConnectionAsync están mayormente vinculadas a IO.

De todos modos, si desea liberar el hilo de llamada (el despachador) al comienzo de HandleConnectionAsync , hay una solución muy fácil. Puede saltar en un nuevo hilo desde ThreadPool con await Yield() ThreadPool await Yield() . Eso funciona si su servidor se ejecuta en el entorno de ejecución que no tiene ningún contexto de sincronización instalado en el hilo inicial (una aplicación de consola, un servicio WCF), que normalmente es el caso de un servidor TCP.

[EDITADO] Lo siguiente ilustra esto (el código es originalmente de aquí ). Tenga en cuenta que el ciclo while principal no crea ningún subproceso explícitamente:

 using System; using System.Collections.Generic; using System.Net.Sockets; using System.Text; using System.Threading.Tasks; class Program { object _lock = new Object(); // sync lock List _connections = new List(); // pending connections // The core server task private async Task StartListener() { var tcpListener = TcpListener.Create(8000); tcpListener.Start(); while (true) { var tcpClient = await tcpListener.AcceptTcpClientAsync(); Console.WriteLine("[Server] Client has connected"); var task = StartHandleConnectionAsync(tcpClient); // if already faulted, re-throw any error on the calling context if (task.IsFaulted) task.Wait(); } } // Register and handle the connection private async Task StartHandleConnectionAsync(TcpClient tcpClient) { // start the new connection task var connectionTask = HandleConnectionAsync(tcpClient); // add it to the list of pending task lock (_lock) _connections.Add(connectionTask); // catch all errors of HandleConnectionAsync try { await connectionTask; // we may be on another thread after "await" } catch (Exception ex) { // log the error Console.WriteLine(ex.ToString()); } finally { // remove pending task lock (_lock) _connections.Remove(connectionTask); } } // Handle new connection private async Task HandleConnectionAsync(TcpClient tcpClient) { await Task.Yield(); // continue asynchronously on another threads using (var networkStream = tcpClient.GetStream()) { var buffer = new byte[4096]; Console.WriteLine("[Server] Reading from client"); var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length); var request = Encoding.UTF8.GetString(buffer, 0, byteCount); Console.WriteLine("[Server] Client wrote {0}", request); var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server"); await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length); Console.WriteLine("[Server] Response has been written"); } } // The entry point of the console app static void Main(string[] args) { Console.WriteLine("Hit Ctrl-C to exit."); new Program().StartListener().Wait(); } } 

Alternativamente, el código puede verse como a continuación, sin await Task.Yield() . Tenga en cuenta que paso un async lambda a Task.Run , porque aún quiero beneficiarme de las API asincrónicas dentro de HandleConnectionAsync y usar HandleConnectionAsync allí:

 // Handle new connection private static Task HandleConnectionAsync(TcpClient tcpClient) { return Task.Run(async () => { using (var networkStream = tcpClient.GetStream()) { var buffer = new byte[4096]; Console.WriteLine("[Server] Reading from client"); var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length); var request = Encoding.UTF8.GetString(buffer, 0, byteCount); Console.WriteLine("[Server] Client wrote {0}", request); var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server"); await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length); Console.WriteLine("[Server] Response has been written"); } }); } 

[ACTUALIZACIÓN] Basado en el comentario: si se trata de un código de biblioteca, el entorno de ejecución es realmente desconocido y puede tener un contexto de sincronización no predeterminado. En este caso, prefiero ejecutar el bucle del servidor principal en un hilo de grupo (que está libre de cualquier contexto de sincronización):

 private static Task StartListener() { return Task.Run(async () => { var tcpListener = TcpListener.Create(8000); tcpListener.Start(); while (true) { var tcpClient = await tcpListener.AcceptTcpClientAsync(); Console.WriteLine("[Server] Client has connected"); var task = StartHandleConnectionAsync(tcpClient); // if already faulted, re-throw any error on the calling context if (task.IsFaulted) task.Wait(); } }); } 

De esta forma, todas las tareas secundarias creadas dentro de StartListener no se verían afectadas por el contexto de sincronización del código del cliente. Por lo tanto, no tendría que llamar a Task.ConfigureAwait(false) cualquier lugar explícitamente.

Las respuestas existentes han propuesto correctamente utilizar Task.Run(() => HandleConnection(client)); , pero no explica por qué.

Aquí está el porqué: le preocupa que HandleConnectionAsync tarde algún tiempo en llegar al primero en espera. Si se queda con el uso de async IO (como debería hacerlo en este caso), esto significa que HandleConnectionAsync está realizando un trabajo de CPU sin ningún locking. Este es un caso perfecto para el grupo de hilos. Está hecho para ejecutar trabajos de CPU cortos y sin lockings.

Y tiene razón, que HandleConnectionAsync acelerará el HandleConnectionAsync de HandleConnectionAsync antes de volver (tal vez porque hay un trabajo significativo vinculado a la CPU). Esto debe evitarse si necesita una alta frecuencia de nuevas conexiones.

Si está seguro de que no hay un trabajo significativo que limite el ciclo, puede guardar la Task grupo de Task adicional y no hacerlo.

Alternativamente, puede tener múltiples aceptaciones ejecutándose al mismo tiempo. Reemplazar await Serve(); por (por ejemplo):

 var serverTasks = Enumerable.Range(0, Environment.ProcessorCount) .Select(_ => Serve()); await Task.WhenAll(serverTasks); 

Esto elimina los problemas de escalabilidad. Tenga en cuenta que, await se tragará aquí todos los errores menos uno.

Tratar

 TcpListener listener; void Serve(){ while(true){ var client = listener.AcceptTcpClient(); Task.Run(() => this.HandleConnection(client)); //Or alternatively new Thread(HandleConnection).Start(client) } } 

De acuerdo con Microsoft http://msdn.microsoft.com/en-AU/library/hh524395.aspx#BKMK_VoidReturnType , no se debe utilizar el tipo de devolución de nulo porque no puede detectar excepciones. Como ha señalado, necesita tareas de “disparar y olvidar”, por lo que mi conclusión es que siempre debe devolver Tarea (como Microsoft dijo), pero debe detectar el error usando:

 TaskInstance.ContinueWith(i => { /* exception handler */ }, TaskContinuationOptions.OnlyOnFaulted); 

Un ejemplo que utilicé como prueba es a continuación:

 public static void Main() { Awaitable() .ContinueWith( i => { foreach (var exception in i.Exception.InnerExceptions) { Console.WriteLine(exception.Message); } }, TaskContinuationOptions.OnlyOnFaulted); Console.WriteLine("This needs to come out before my exception"); Console.ReadLine(); } public static async Task Awaitable() { await Task.Delay(3000); throw new Exception("Hey I can catch these pesky things"); } 

¿Hay alguna razón por la que deba aceptar conexiones asincrónicas? Quiero decir, ¿espera que cualquier conexión con el cliente te dé algún valor? La única razón para hacerlo sería porque hay otros trabajos en el servidor mientras se espera una conexión. Si hay, probablemente puedas hacer algo como esto:

  public async void Serve() { while (true) { var client = await _listener.AcceptTcpClientAsync(); Task.Factory.StartNew(() => HandleClient(client), TaskCreationOptions.LongRunning); } } 

De esta forma, la aceptación liberará la opción de salir del hilo actual para que se realicen otras cosas, y el manejo se ejecutará en un nuevo hilo. La única sobrecarga sería generar un nuevo hilo para manejar al cliente antes de que vuelva directamente a aceptar una nueva conexión.

Editar: Acabo de darme cuenta de que es casi el mismo código que escribió. Creo que necesito leer su pregunta nuevamente para comprender mejor lo que realmente está preguntando: S

Edit2:

¿Hay alguna manera de combinar estos dos enfoques para que mi servidor use exactamente la cantidad de hilos que necesita para el número de tareas que se ejecutan activamente, pero para que no bloquee los hilos innecesariamente en las operaciones de E / S?

Piensa que mi solución realmente responde esta pregunta. ¿Es realmente necesario sin embargo?

Edit3: Made Task.Factory.StartNew () en realidad crea un nuevo hilo.