Paralelo no funciona con Entity Framework

Tengo una lista de ID, y necesito ejecutar varios procedimientos almacenados en cada ID.

Cuando estoy usando un ciclo foreach estándar, funciona bien, pero cuando tengo muchos registros, funciona bastante lento.

Quise convertir el código para que funcione con EF, pero recibo una excepción: “El proveedor subyacente falló en Abrir”.

Estoy usando este código, dentro de Parallel.ForEach:

using (XmlEntities osContext = new XmlEntities()) { //The code } 

Pero todavía arroja la excepción.

¿Alguna idea de cómo puedo usar Parallel con EF? ¿Necesito crear un nuevo contexto para cada procedimiento que estoy ejecutando? Tengo alrededor de 10 procedimientos, así que creo que es muy malo crear 10 contextos, uno para cada uno.

Las conexiones de bases de datos subyacentes que utiliza Entity Framework no son seguras para subprocesos . Tendrá que crear un nuevo contexto para cada operación en otro hilo que va a realizar.

Su preocupación sobre cómo paralelizar la operación es válida; que muchos contextos serán costosos de abrir y cerrar.

En cambio, es posible que desee invertir cómo piensa sobre la paralelización del código. Parece que está pasando por encima de una serie de elementos y luego llama a los procedimientos almacenados en serie para cada elemento.

Si puede, cree una nueva Task (o Task , si no necesita un resultado) para cada procedimiento y luego en esa Task , abra un contexto único, Task todos los elementos y luego ejecuta el procedimiento almacenado. De esta forma, solo tiene un número de contextos igual al número de procedimientos almacenados que está ejecutando en paralelo.

Supongamos que tiene un MyDbContext con dos procedimientos almacenados, DoSomething1 y DoSomething2 , que toman una instancia de una clase, MyItem .

Implementar lo anterior se vería así:

 // You'd probably want to materialize this into an IList to avoid // warnings about multiple iterations of an IEnumerable. // You definitely *don't* want this to be an IQueryable // returned from a context. IEnumerable items = ...; // The first stored procedure is called here. Task t1 = Task.Run(() => { // Create the context. using (var ctx = new MyDbContext()) // Cycle through each item. foreach (MyItem item in items) { // Call the first stored procedure. // You'd of course, have to do something with item here. ctx.DoSomething1(item); } }); // The second stored procedure is called here. Task t2 = Task.Run(() => { // Create the context. using (var ctx = new MyDbContext()) // Cycle through each item. foreach (MyItem item in items) { // Call the first stored procedure. // You'd of course, have to do something with item here. ctx.DoSomething2(item); } }); // Do something when both of the tasks are done. 

Si no puede ejecutar los procedimientos almacenados en paralelo (cada uno depende de que se ejecute en un orden determinado), puede paralelizar sus operaciones, es un poco más complejo.

Consideraría la creación de particiones personalizadas en sus artículos (utilizando el método estático Create en la clase Partitioner ). Esto le dará los medios para obtener las IEnumerator (tenga en cuenta que esto no es IEnumerable por lo que no puede foreach ).

Para cada IEnumerator que recupere, creará una nueva Task (si necesita un resultado) y en el cuerpo de la Task , creará el contexto y luego recorrerá los elementos devueltos por IEnumerator , llamando a los procedimientos almacenados en orden.

Eso se vería así:

 // Get the partitioner. OrdinalPartitioner partitioner = Partitioner.Create(items); // Get the partitions. // You'll have to set the parameter for the number of partitions here. // See the link for creating custom partitions for more // creation strategies. IList> paritions = partitioner.GetPartitions( Environment.ProcessorCount); // Create a task for each partition. Task[] tasks = partitions.Select(p => Task.Run(() => { // Create the context. using (var ctx = new MyDbContext()) // Remember, the IEnumerator implementation // might implement IDisposable. using (p) // While there are items in p. while (p.MoveNext()) { // Get the current item. MyItem current = p.Current; // Call the stored procedures. Process the item ctx.DoSomething1(current); ctx.DoSomething2(current); } })). // ToArray is needed (or something to materialize the list) to // avoid deferred execution. ToArray(); 

EF no es seguro para subprocesos, por lo que no puede usar Paralelo.

Eche un vistazo a Entity Framework y Multi threading

y este artículo .

Esto es lo que uso y funciona muy bien. Además, admite el manejo de las excepciones de error y tiene un modo de depuración que hace que sea mucho más fácil rastrear las cosas

 public static ConcurrentQueue Parallel(this IEnumerable items, Action action, int? parallelCount = null, bool debugMode = false) { var exceptions = new ConcurrentQueue(); if (debugMode) { foreach (var item in items) { try { action(item); } // Store the exception and continue with the loop. catch (Exception e) { exceptions.Enqueue(e); } } } else { var partitions = Partitioner.Create(items).GetPartitions(parallelCount ?? Environment.ProcessorCount).Select(partition => Task.Factory.StartNew(() => { while (partition.MoveNext()) { try { action(partition.Current); } // Store the exception and continue with the loop. catch (Exception e) { exceptions.Enqueue(e); } } })); Task.WaitAll(partitions.ToArray()); } return exceptions; } 

Lo usa como el siguiente, donde como db es el DbContext original y db.CreateInstance () crea una nueva instancia utilizando la misma cadena de conexión.

  var batch = db.Set().ToList(); var exceptions = batch.Parallel((item) => { using (var batchDb = db.CreateInstance()) { var batchTime = batchDb.GetDBTime(); var someData = batchDb.Set().Where(x=>x.ID = item.ID).ToList(); //do stuff to someData item.WasMigrated = true; //note that this record is attached to db not batchDb and will only be saved when db.SaveChanges() is called batchDb.SaveChanges(); } }); if (exceptions.Count > 0) { logger.Error("ContactRecordMigration : Content: Error processing one or more records", new AggregateException(exceptions)); throw new AggregateException(exceptions); //optionally throw an exception } db.SaveChanges(); //save the item modifications 

Es un poco difícil resolver este problema sin saber cuál es el resultado de la excepción interna, si es que lo hay. Esto podría ser simplemente un problema con la forma en que se configura la cadena de conexión o la configuración del proveedor.

En general, debe tener cuidado con el código paralelo y EF. Sin embargo, lo que estás haciendo debe funcionar. Una pregunta en mi mente; ¿Se está realizando algún trabajo en otra instancia de ese contexto antes del paralelo? De acuerdo con su publicación, está haciendo un contexto separado en cada hilo. Eso es bueno. Parte de mí se pregunta, sin embargo, si hay una cierta contención constructora interesante entre los contextos múltiples. Si no está utilizando ese contexto en cualquier lugar antes de esa llamada paralela, sugeriría tratar de ejecutar incluso una consulta simple contra el contexto para abrirlo y asegurarse de que todos los bits EF se activen antes de ejecutar el método paralelo. Lo admito, no he intentado exactamente lo que hiciste aquí, pero lo hice de cerca y funcionó.