¿Cómo puedo evitar continuidades sincrónicas en una tarea?

Tengo un código de biblioteca (red de socket) que proporciona una API basada en Task para respuestas pendientes a las solicitudes, en función de TaskCompletionSource . Sin embargo, hay una molestia en el TPL ya que parece imposible evitar las continuas sincrónicas. Lo que me gustaría poder hacer es:

  • decirle a TaskCompletionSource que no debe permitir que las personas que llaman se TaskContinuationOptions.ExecuteSynchronously con TaskContinuationOptions.ExecuteSynchronously , o
  • establezca el resultado ( SetResult / TrySetResult ) de una manera que especifique que TaskContinuationOptions.ExecuteSynchronously debe ignorarse, utilizando el grupo en su lugar

Específicamente, el problema que tengo es que los datos entrantes están siendo procesados ​​por un lector dedicado, y si un llamador puede conectarse con TaskContinuationOptions.ExecuteSynchronously , pueden detener el lector (lo que afecta más que solo a ellos). Anteriormente, he solucionado este problema con algunos hackers que detectan si hay alguna continuación, y si lo hacen, empujan la terminación hacia el ThreadPool , sin embargo, esto tiene un impacto significativo si la persona que llama ha saturado su cola de trabajo, ya que la finalización no procesado de manera oportuna. Si están utilizando Task.Wait() (o similar), entonces esencialmente se estancarán. Asimismo, esta es la razón por la cual el lector tiene un hilo dedicado en lugar de usar trabajadores.

Asi que; antes de intentar regañar al equipo de TPL: ¿me estoy perdiendo una opción?

Puntos clave:

  • No quiero que los llamadores externos puedan secuestrar mi hilo
  • No puedo usar ThreadPool como implementación, ya que debe funcionar cuando el grupo está saturado

El siguiente ejemplo produce resultados (el orden puede variar según el tiempo):

 Continuation on: Main thread Press [return] Continuation on: Thread pool 

El problema es el hecho de que una persona que llamó al azar logró obtener una continuación en “Tema principal”. En el código real, esto estaría interrumpiendo al lector primario; ¡cosas malas!

Código:

 using System; using System.Threading; using System.Threading.Tasks; static class Program { static void Identify() { var thread = Thread.CurrentThread; string name = thread.IsThreadPoolThread ? "Thread pool" : thread.Name; if (string.IsNullOrEmpty(name)) name = "#" + thread.ManagedThreadId; Console.WriteLine("Continuation on: " + name); } static void Main() { Thread.CurrentThread.Name = "Main thread"; var source = new TaskCompletionSource(); var task = source.Task; task.ContinueWith(delegate { Identify(); }); task.ContinueWith(delegate { Identify(); }, TaskContinuationOptions.ExecuteSynchronously); source.TrySetResult(123); Console.WriteLine("Press [return]"); Console.ReadLine(); } } 

Nuevo en .NET 4.6:

.NET 4.6 contiene una nueva TaskCreationOptions : RunContinuationsAsynchronously .


Ya que está dispuesto a usar Reflection para acceder a campos privados …

Puede marcar la Tarea del TCS con el indicador TASK_STATE_THREAD_WAS_ABORTED , lo que provocaría que todas las continuas no estén en línea.

 const int TASK_STATE_THREAD_WAS_ABORTED = 134217728; var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance); stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED); 

Editar:

En lugar de usar Reflection emit, sugiero que use expresiones. Esto es mucho más legible y tiene la ventaja de ser compatible con PCL:

 var taskParameter = Expression.Parameter(typeof (Task)); const string stateFlagsFieldName = "m_stateFlags"; var setter = Expression.Lambda>( Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName), Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName), Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile(); 

Sin usar Reflection:

Si alguien está interesado, he descubierto una manera de hacer esto sin Reflexión, pero también es un poco “sucio” y, por supuesto, conlleva una penalización de rendimiento no despreciable:

 try { Thread.CurrentThread.Abort(); } catch (ThreadAbortException) { source.TrySetResult(123); Thread.ResetAbort(); } 

No creo que haya nada en TPL que proporcione control API explícito sobre TaskCompletionSource.SetResult continuas de TaskCompletionSource.SetResult . Decidí mantener mi respuesta inicial para controlar este comportamiento para escenarios async/await .

Aquí hay otra solución que impone asincrónicamente en ContinueWith , si la continuación tcs.SetResult tcs.SetResult tiene lugar en el mismo subproceso en el que se SetResult :

 public static class TaskExt { static readonly ConcurrentDictionary s_tcsTasks = new ConcurrentDictionary(); // SetResultAsync static public void SetResultAsync( this TaskCompletionSource @this, TResult result) { s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread); try { @this.SetResult(result); } finally { Thread thread; s_tcsTasks.TryRemove(@this.Task, out thread); } } // ContinueWithAsync, TODO: more overrides static public Task ContinueWithAsync( this Task @this, Action> action, TaskContinuationOptions continuationOptions = TaskContinuationOptions.None) { return @this.ContinueWith((Func, Task>)(t => { Thread thread = null; s_tcsTasks.TryGetValue(t, out thread); if (Thread.CurrentThread == thread) { // same thread which called SetResultAsync, avoid potential deadlocks // using thread pool return Task.Run(() => action(t)); // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread) // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning); } else { // continue on the same thread var task = new Task(() => action(t)); task.RunSynchronously(); return Task.FromResult(task); } }), continuationOptions).Unwrap(); } } 

Actualizado para abordar el comentario:

No controlo a la persona que llama, no puedo hacer que usen una variante específica de continuación: si pudiera, el problema no existiría en primer lugar.

No sabía que no controlas a la persona que llama. Sin embargo, si no lo controla, probablemente tampoco esté pasando el objeto TaskCompletionSource directamente a la persona que llama. Lógicamente, estarías pasando la parte del token , es decir, tcs.Task . En ese caso, la solución puede ser aún más fácil, agregando otro método de extensión a lo anterior:

 // ImposeAsync, TODO: more overrides static public Task ImposeAsync(this Task @this) { return @this.ContinueWith(new Func, Task>(antecedent => { Thread thread = null; s_tcsTasks.TryGetValue(antecedent, out thread); if (Thread.CurrentThread == thread) { // continue on a pool thread return antecedent.ContinueWith(t => t, TaskContinuationOptions.None).Unwrap(); } else { return antecedent; } }), TaskContinuationOptions.ExecuteSynchronously).Unwrap(); } 

Utilizar:

 // library code var source = new TaskCompletionSource(); var task = source.Task.ImposeAsync(); // ... // client code task.ContinueWith(delegate { Identify(); }, TaskContinuationOptions.ExecuteSynchronously); // ... // library code source.SetResultAsync(123); 

Esto realmente funciona tanto para await como para ContinueWith ( violín ) y está libre de reflections.

¿Qué hay en lugar de hacer

 var task = source.Task; 

haces esto en cambio

 var task = source.Task.ContinueWith( x => x.Result ); 

Por lo tanto, siempre está agregando una continuación que se ejecutará de forma asíncrona y luego no importa si los suscriptores desean una continuación en el mismo contexto. Es una especie de currying la tarea, ¿no?

si puedes y estás listo para usar la reflexión, esto debería hacerlo;

 public static class MakeItAsync { static public void TrySetAsync(this TaskCompletionSource source, T result) { var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance); var continuations = (List)continuation.GetValue(source.Task); foreach (object c in continuations) { var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance); var options = (TaskContinuationOptions)option.GetValue(c); options &= ~TaskContinuationOptions.ExecuteSynchronously; option.SetValue(c, options); } source.TrySetResult(result); } } 

Actualizado , publiqué una respuesta por separado para tratar con ContinueWith en lugar de await (porque ContinueWith no se preocupa por el contexto de sincronización actual).

Puede usar un contexto de sincronización estúpido para imponer asincronía en la continuación activada llamando a SetResult/SetCancelled/SetException en TaskCompletionSource . Creo que el contexto de sincronización actual (en el punto de await tcs.Task ) es el criterio que TPL usa para decidir si hacer que tal continuación sea síncrona o asíncrona.

Lo siguiente funciona para mí:

 if (notifyAsync) { tcs.SetResultAsync(null); } else { tcs.SetResult(null); } 

SetResultAsync se implementa así:

 public static class TaskExt { static public void SetResultAsync(this TaskCompletionSource tcs, T result) { FakeSynchronizationContext.Execute(() => tcs.SetResult(result)); } // FakeSynchronizationContext class FakeSynchronizationContext : SynchronizationContext { private static readonly ThreadLocal s_context = new ThreadLocal(() => new FakeSynchronizationContext()); private FakeSynchronizationContext() { } public static FakeSynchronizationContext Instance { get { return s_context.Value; } } public static void Execute(Action action) { var savedContext = SynchronizationContext.Current; SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance); try { action(); } finally { SynchronizationContext.SetSynchronizationContext(savedContext); } } // SynchronizationContext methods public override SynchronizationContext CreateCopy() { return this; } public override void OperationStarted() { throw new NotImplementedException("OperationStarted"); } public override void OperationCompleted() { throw new NotImplementedException("OperationCompleted"); } public override void Post(SendOrPostCallback d, object state) { throw new NotImplementedException("Post"); } public override void Send(SendOrPostCallback d, object state) { throw new NotImplementedException("Send"); } } } 

SynchronizationContext.SetSynchronizationContext es muy barato en términos de sobrecarga que agrega. De hecho, la implementación de WPF Dispatcher.BeginInvoke adopta un enfoque muy similar.

TPL compara el contexto de sincronización de destino en el punto de await con el del punto de tcs.SetResult . Si el contexto de sincronización es el mismo (o no hay un contexto de sincronización en ambos lugares), la continuación se llama directamente, sincrónicamente. De lo contrario, se pone en cola utilizando SynchronizationContext.Post en el contexto de sincronización de destino, es decir, el comportamiento normal de await . Lo que hace este enfoque es siempre imponer el comportamiento SynchronizationContext.Post (o una continuación del hilo de la agrupación si no hay un contexto de sincronización de destino).

Actualizado , esto no funcionará para la task.ContinueWith . ContinueWith , porque ContinueWith no se preocupa por el contexto de sincronización actual. Sin embargo, funciona para la await task ( violín ). También funciona para la tarea de await task.ConfigureAwait(false) .

OTOH, este enfoque funciona para ContinueWith .

El enfoque simular aborto se veía realmente bien, pero condujo a los hilos de secuestro de TPL en algunos escenarios .

Luego tuve una implementación que era similar a verificar el objeto de continuación , pero solo comprobando si había alguna continuación, ya que en realidad existen demasiados escenarios para que el código dado funcione bien, pero eso significaba que incluso cosas como Task.Wait daban como resultado un Task.Wait búsqueda de grupo.

En última instancia, después de inspeccionar montones y montones de IL, el único escenario seguro y útil es el escenario SetOnInvokeMres (continuación de restablecimiento manual-evento-delgado). Hay muchos otros escenarios:

  • algunos no son seguros y conducen al secuestro de hilos
  • el rest no son útiles, ya que finalmente conducen al grupo de hilos

Así que, al final, opté por buscar un objeto de continuación no nulo; si es nulo, está bien (sin continuación); si no es nulo, comprobación de caso especial para SetOnInvokeMres , si es así: fino (seguro de invocar); de lo contrario, permita que el grupo de subprocesos ejecute TrySetComplete , sin TrySetComplete a la tarea que haga nada especial como suplantar el aborto. Task.Wait usar el enfoque SetOnInvokeMres , que es el escenario específico que queremos probar realmente para no estancarnos.

 Type taskType = typeof(Task); FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic); Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic); if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null) { var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true); var il = method.GetILGenerator(); var hasContinuation = il.DefineLabel(); il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Ldfld, continuationField); Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel(); // check if null il.Emit(OpCodes.Brtrue_S, nonNull); il.MarkLabel(goodReturn); il.Emit(OpCodes.Ldc_I4_1); il.Emit(OpCodes.Ret); // check if is a SetOnInvokeMres - if so, we're OK il.MarkLabel(nonNull); il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Ldfld, continuationField); il.Emit(OpCodes.Isinst, safeScenario); il.Emit(OpCodes.Brtrue_S, goodReturn); il.Emit(OpCodes.Ldc_I4_0); il.Emit(OpCodes.Ret); IsSyncSafe = (Func)method.CreateDelegate(typeof(Func));