28 Mar 2012
I have posted about implementation of non-blocking IO method, Stream.ReadAsync, as extension method. But even with that method, the consumer still need to do a lot of things such as creating buffer, writing to buffer and reading from buffer, etc.
When we need to read the File, we mostly use StreamReader.ReadToEnd method. It will be great if we implement the non-blocking IO version of ReadToEnd method as Task-based ReadToEndAsync method.
First, lets see the usage of ReadToEndAsync method.
using (var fs = File.Open(fileName, FileMode.Open))
using (var reader = new StreamReader(fs)) {
var task = reader
.ReadToEndAsync()
.ContinueWith(t => {
var content = t.Result;
// do your work with file content
});
//
// do other things while reading the file (non-blocking IO)
//
// wait until the file reading is complete
task.Wait();
}
OK. It is pretty easy to use, right? :)
But the implementation is not that straight forward. Lets see the implementation
public static class StreamReaderTaskParallelism {
public static Task<string> ReadToEndAsync(this StreamReader reader) {
var stream = reader.BaseStream;
if (stream is MemoryStream) {
return Task.Factory.StartNew<string>(() => reader.ReadToEnd());
}
var ts = new TaskCompletionSource<string>();
var writer = new MemoryStream();
var enumerator = EnumerateReadAsync(stream, writer).GetEnumerator();
Action action = null;
action = delegate {
try {
if (enumerator.MoveNext()) {
enumerator.Current.ContinueWith(delegate { action(); });
}
else {
using (var r = new StreamReader(writer)) {
writer.Position = 0;
var result = r.ReadToEnd();
ts.TrySetResult(result);
}
enumerator.Dispose();
}
}
catch (Exception ex) {
ts.TrySetException(ex);
}
};
action();
return ts.Task;
}
private static IEnumerable<Task<int>> EnumerateReadAsync(Stream stream, MemoryStream writer) {
var buffer = new byte[4 * 1024];
while (true) {
var pendingRead = stream.ReadAsync(buffer, 0, buffer.Length);
yield return pendingRead;
int bytesRead = pendingRead.Result;
if (bytesRead <= 0) break;
writer.Write(buffer, 0, bytesRead);
}
}
}
First, we check whether it is MemoryStream. If it is, we just use ReadToEnd method since it is not IO-bound operation.
If it is IO-bound operation, we should utilise non-blocking IO method, ReadAsync. The overview operation is
- create a TaskCompletionSource which will control the Task completion.
- scheduled to recursively read using non-blocking IO method, ReadAsync, by chaining with ContinueWith method until the end of stream.
- return Task from TaskCompletionSource.
- when reading IO completes, TaskCompletionSource provides the result for returned Task and signal the Task is completed. Of course, if there is error, it will set Exception in the task.
var ts = new TaskCompletionSource<string>();
var writer = new MemoryStream();
var enumerator = EnumerateReadAsync(stream, writer).GetEnumerator();
Action action = null;
action = delegate {
try {
if (enumerator.MoveNext()) {
enumerator.Current.ContinueWith(delegate { action(); });
}
else {
using (var r = new StreamReader(writer)) {
writer.Position = 0;
var result = r.ReadToEnd();
ts.TrySetResult(result);
}
enumerator.Dispose();
}
}
catch (Exception ex) {
ts.TrySetException(ex);
}
};
action();
The issue is we need to chain ContinueWith method recursively until the end of file.
To achieve that, we need to retrieve an enumerator from the enumerable and uses that enumerator in a delegate. The delegate moves the enumerator to its next element, and if there is a next element, retrieves it and uses ContinueWith to schedule the delegate (recursively, in a sense) for execution when that current Task completes. When the enumerator reaches the end, TaskCompletionSource will set result and enumerator is disposed. If there is any Exception throws, it will set it in the Task. With that delegate created, it simply executes the delegate to get the execution started.
Now, we need to create EnumerateReadAsync method which is state machine (in-dept article from jon skeet) by using yield.
private static IEnumerable<Task<int>> EnumerateReadAsync(Stream stream, MemoryStream writer) {
var buffer = new byte[4 * 1024];
while (true) {
var pendingRead = stream.ReadAsync(buffer, 0, buffer.Length);
yield return pendingRead;
int bytesRead = pendingRead.Result;
if (bytesRead <= 0) break;
writer.Write(buffer, 0, bytesRead);
}
}
Task<int> is yield from the EnumerateReadAsync method. When Task<int> completes, its continuation will be executed, which will cause the enumerator to MoveNext, thus ending up back in the EnumerateReadAsync method just after the yield location. When there is nothing to read from the file, MoveNext will return false, which causes the Task completion.