Disclaimer: I acknowledge this method was obtained from another blog; I’m pretty sure it was Stephen Toub’s, but I cannot seem to find his post again.
But here are my modified versions of his Interleaved() method.
#nullable enable
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
public static class TaskExtensions {
/// <summary>
/// Return tasks in order of completion.
/// </summary>
/// <param name="tasks"></param>
/// <returns></returns>
public static IEnumerable<Task<Task>> InOrderOfCompletion( this IEnumerable<Task> tasks ) {
var inputTasks = tasks.ToList();
var buckets = new TaskCompletionSource<Task>[ inputTasks.Count ];
var results = new Task<Task>[ buckets.Length ];
for ( var i = 0; i < buckets.Length; i++ ) {
buckets[ i ] = new TaskCompletionSource<Task>();
results[ i ] = buckets[ i ].Task;
}
var nextTaskIndex = -1;
void Continuation( Task completed ) {
var bucket = buckets[ Interlocked.Increment( ref nextTaskIndex ) ];
bucket.TrySetResult( completed );
}
foreach ( var inputTask in inputTasks ) {
inputTask?.ContinueWith( Continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default );
}
return results;
}
/// <summary>
/// Return tasks in order of completion.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="tasks"></param>
/// <returns></returns>
[ItemNotNull]
[NotNull]
public static IEnumerable<Task<Task<T>>> InOrderOfCompletion<T>( this IEnumerable<Task<T>> tasks ) {
var inputTasks = tasks.ToList();
var buckets = new TaskCompletionSource<Task<T>>[ inputTasks.Count ];
var results = new Task<Task<T>>[ buckets.Length ];
for ( var i = 0; i < buckets.Length; i++ ) {
buckets[ i ] = new TaskCompletionSource<Task<T>>();
results[ i ] = buckets[ i ].Task;
}
var nextTaskIndex = -1;
void Continuation( Task<T> completed ) {
var bucket = buckets[ Interlocked.Increment( ref nextTaskIndex ) ];
bucket.TrySetResult( completed );
}
foreach ( var inputTask in inputTasks ) {
inputTask?.ContinueWith( Continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default );
}
return results;
}
/// <summary>
/// Return tasks in order of completion.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="tasks"></param>
/// <returns></returns>
[ItemNotNull]
[NotNull]
public static IEnumerable<Task<Task<T>>> InOrderOfCompletion<T>( this IDictionary<TimeSpan, Task<T>> tasks ) {
var inputTasks = tasks.ToList();
var buckets = new TaskCompletionSource<Task<T>>[ inputTasks.Count ];
var results = new Task<Task<T>>[ buckets.Length ];
for ( var i = 0; i < buckets.Length; i++ ) {
buckets[ i ] = new TaskCompletionSource<Task<T>>();
results[ i ] = buckets[ i ].Task;
}
var nextTaskIndex = -1;
void Continuation( Task<T> completed ) {
var bucket = buckets[ Interlocked.Increment( ref nextTaskIndex ) ];
bucket.TrySetResult( completed );
}
foreach ( var inputTask in inputTasks ) {
inputTask.Value?.ContinueWith( Continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default );
}
return results;
}
}
And here is my test code.
public static class Examples {
public static async Task TestsInOrderOfCompletion( CancellationToken token ) {
var rng = new Random();
//Add to the list, out of "order"..
var tasks = new ConcurrentDictionary<TimeSpan, Task<String>> {
[ TimeSpan.FromSeconds( 3 ) ] = Task.Delay( TimeSpan.FromSeconds( 3 ), token ).ContinueWith( _ => "3 seconds", token ),
[ TimeSpan.FromSeconds( 1 ) ] = Task.Delay( TimeSpan.FromSeconds( 1 ), token ).ContinueWith( _ => "1 second", token ),
[ TimeSpan.FromSeconds( 2 ) ] = Task.Delay( TimeSpan.FromSeconds( 2 ), token ).ContinueWith( _ => "2 seconds", token ),
[ TimeSpan.FromSeconds( 5 ) ] = Task.Delay( TimeSpan.FromSeconds( 5 ), token ).ContinueWith( _ => "5 seconds", token ),
[ TimeSpan.FromSeconds( 8 ) ] = Task.Delay( TimeSpan.FromSeconds( 8 ), token ).ContinueWith( _ => "8 seconds", token ),
[ TimeSpan.FromSeconds( 7 ) ] = Task.Delay( TimeSpan.FromSeconds( 7 ), token ).ContinueWith( _ => "7 seconds", token ),
[ TimeSpan.FromSeconds( 9 ) ] = Task.Delay( 1000, token ).ContinueWith( _ => "9 seconds", token ),
[ TimeSpan.FromSeconds( 10 ) ] = Task.Delay( 1000, token ).ContinueWith( _ => "10 seconds", token ),
[ TimeSpan.FromSeconds( 4 ) ] = Task.Delay( 1000, token ).ContinueWith( _ => "4 seconds", token )
};
//Add a few more to the list, also in "random" order..
for ( var i = 0; i < 25; i++ ) {
var millisecondsDelay = rng.Next( 10000 );
var task = Task.Delay( millisecondsDelay, token ).ContinueWith( _ => $"{millisecondsDelay / 1000.0:F4}", token ); //return how many milliseconds we just delayed
tasks[ TimeSpan.FromMilliseconds( millisecondsDelay ) ] = task;
}
foreach ( var bucket in tasks.InOrderOfCompletion() ) {
try {
var task = await bucket.ConfigureAwait( false );
var result = await task.ConfigureAwait( false );
Console.WriteLine( $"{DateTime.Now:hh:mm:ss}, TaskId #{task.Id:N0}, {result} ms" );
}
catch ( OperationCanceledException ) { }
catch ( Exception exception ) {
Debug.WriteLineIf( Debugger.IsAttached, exception.ToString() );
}
}
}
}