Descarga asincrónica “acelerada” en F #

Estoy intentando descargar las más de 3000 fotos a las que se hace referencia desde la copia de seguridad xml de mi blog. El problema que encontré es que si solo una de esas fotos ya no está disponible, se bloquea todo el asincronismo porque AsyncGetResponse no hace tiempos de espera.

ildjarn me ayudó a armar una versión de AsyncGetResponse que falla en el tiempo de espera, pero al usarla se obtienen muchos más tiempos de espera, como si las solicitudes que simplemente están en cola agotaran el tiempo de espera. Parece que todas las WebRequests se lanzan ‘inmediatamente’, la única forma de hacerlo funcionar es establecer el tiempo de espera en el tiempo requerido para descargarlas todas juntas: lo cual no es genial porque significa que tengo que ajustar el tiempo de espera dependiendo de la cantidad de imágenes

¿He alcanzado los límites de la async de vainilla? ¿Debo buscar extensiones reactivas?

Esto es un poco embarazoso, porque ya he hecho dos preguntas aquí sobre este código en particular, ¡y todavía no lo tengo funcionando como quiero!

Creo que debe haber una forma mejor de descubrir que un archivo no está disponible que usar un tiempo de espera excedido. No estoy exactamente seguro, pero ¿hay alguna manera de hacer que arroje una excepción si no se puede encontrar un archivo? Entonces podrías simplemente envolver tu código async dentro de try .. with y deberías evitar la mayoría de los problemas.

De todos modos, si quiere escribir su propio “administrador de concurrencia” que ejecuta cierto número de solicitudes en paralelo y pone en cola las solicitudes pendientes restantes, entonces la opción más fácil en F # es usar agentes (el tipo de MailboxProcessor ). El siguiente objeto encapsula el comportamiento:

 type ThrottlingAgentMessage = | Completed | Work of Async /// Represents an agent that runs operations in concurrently. When the number /// of concurrent operations exceeds 'limit', they are queued and processed later type ThrottlingAgent(limit) = let agent = MailboxProcessor.Start(fun agent -> /// Represents a state when the agent is blocked let rec waiting () = // Use 'Scan' to wait for completion of some work agent.Scan(function | Completed -> Some(working (limit - 1)) | _ -> None) /// Represents a state when the agent is working and working count = async { while true do // Receive any message let! msg = agent.Receive() match msg with | Completed -> // Decrement the counter of work items return! working (count - 1) | Work work -> // Start the work item & continue in blocked/working state async { try do! work finally agent.Post(Completed) } |> Async.Start if count < limit then return! working (count + 1) else return! waiting () } working 0) /// Queue the specified asynchronous workflow for processing member x.DoWork(work) = agent.Post(Work work) 

Nada es siempre fácil. 🙂

Creo que los problemas que está abordando son intrínsecos al dominio del problema (en lugar de simplemente ser un problema con el modelo de progtwigción asíncrono, aunque interactúan de alguna manera).

Supongamos que quiere descargar 3000 imágenes. Primero, en su proceso .NET, hay algo así como System.Net.ConnectionLimit o algo por lo que olvido el nombre, que p. Ej. Acelerará el número de conexiones HTTP simultáneas que su proceso .NET puede ejecutar simultáneamente (y el valor predeterminado es solo ‘2 ‘ Creo). Entonces podrías encontrar ese control y establecerlo en un número más alto, y sería útil.

Pero luego, la máquina y la conexión a Internet tienen un ancho de banda finito. Por lo tanto, incluso si pudieras intentar iniciar concurrentemente 3000 conexiones HTTP, cada conexión individual sería más lenta en función de las limitaciones del ancho de banda. Entonces esto también interactuaría con tiempos de espera. (Y esto ni siquiera considera qué clases de throttles / limits hay en el servidor. Tal vez, si envía 3000 solicitudes, piense que es un ataque de DoS y una lista negra de su IP).

Así que este es realmente un dominio de problemas donde una buena solución requiere de un control inteligente y control de flujo para administrar cómo se utilizan los recursos del sistema subyacente.

Como en la otra respuesta, los agentes F # (MailboxProcessors) son un buen modelo de progtwigción para crear dicha lógica de regulación / control de flujo.

(Incluso con todo eso, si la mayoría de los archivos de imagen son como 1MB pero luego hay un archivo de 1GB mezclado allí, ese único archivo podría desconectarse).

De todos modos, esto no es tanto una respuesta a la pregunta, sino solo señalar cuánta complejidad intrínseca hay en el dominio del problema en sí mismo. (Quizás también sea sugestivo de por qué los “gestores de descarga” de UI son tan populares).

Aquí hay una variación de la respuesta de Tomás, porque necesitaba un agente que pudiera devolver los resultados.

 type ThrottleMessage<'a> = | AddJob of (Async<'a>*AsyncReplyChannel<'a>) | DoneJob of ('a*AsyncReplyChannel<'a>) | Stop /// This agent accumulates 'jobs' but limits the number which run concurrently. type ThrottleAgent<'a>(limit) = let agent = MailboxProcessor>.Start(fun inbox -> let rec loop(jobs, count) = async { let! msg = inbox.Receive() //get next message match msg with | AddJob(job) -> if count < limit then //if not at limit, we work, else loop return! work(job::jobs, count) else return! loop(job::jobs, count) | DoneJob(result, reply) -> reply.Reply(result) //send back result to caller return! work(jobs, count - 1) //no need to check limit here | Stop -> return () } and work(jobs, count) = async { match jobs with | [] -> return! loop(jobs, count) //if no jobs left, wait for more | (job, reply)::jobs -> //run job, post Done when finished async { let! result = job inbox.Post(DoneJob(result, reply)) } |> Async.Start return! loop(jobs, count + 1) //job started, go back to waiting } loop([], 0) ) member m.AddJob(job) = agent.PostAndAsyncReply(fun rep-> AddJob(job, rep)) member m.Stop() = agent.Post(Stop) 

En mi caso particular, solo necesito usarlo como un ‘mapa’ de una sola vez, así que agregué una función estática:

  static member RunJobs limit jobs = let agent = ThrottleAgent<'a>(limit) let res = jobs |> Seq.map (fun job -> agent.AddJob(job)) |> Async.Parallel |> Async.RunSynchronously agent.Stop() res 

Parece que funciona bien …

Aquí hay una solución lista para usar:

FSharpx.Control ofrece una función Async.ParallelWithThrottle . No estoy seguro de si es la mejor implementación ya que usa SemaphoreSlim . Pero la facilidad de uso es excelente y, como mi aplicación no necesita un rendimiento superior, funciona lo suficientemente bien para mí. Aunque dado que es una biblioteca, si alguien sabe cómo mejorarla, siempre es una buena idea hacer que las bibliotecas tengan el mejor rendimiento de la lista para que el rest de nosotros pueda usar el código que funciona y ¡simplemente terminar nuestro trabajo!