LINQ Power
I think we’ll all agree that LINQ has been a revolution in the way we develop, and has made many developers from other languages look at us with a certain tone of envy… And even made other platforms make serious efforts to incorporate it into their frameworks :-)
Now, with the arrival of the Task Parallel Library, a world of possibilities opens up thanks to PLINQ, which allows - in an extremely simple way - to convert any sequential LINQ query into a parallelizable query, allowing its segmentation and execution on different cores in parallel.
That is, any LINQ query, like the following, where we have an array called numbers and an IsPrime method that returns a boolean value based on whether a number is prime.
Assuming this function that returns whether a number is prime:
public static bool IsPrime(this int n) //1 = false, 2 = true, 3 = true...
{
if (n <= 1) return false;
if ((n & 1) == 0)
{
if (n == 2) return true;
else return false;
}
for (int i = 3; (i * i) <= n; i += 2)
{
if ((n % i) == 0) return false;
}
return n != 1;
}
Can be executed with LINQ like this to return only those numbers that are prime:
var query =
from n in numbers
where n.IsPrime()
select n;
And this can be parallelized simply by adding AsParallel like this:
var query =
from n in numbers.AsParallel()
where n.IsPrime()
select n;
Starting from the fact that the numbers array contains the first 10 million integers, and that my current workstation has an i7 processor with 8 cores, the result is overwhelming:
The LINQ query takes 5.2 seconds compared to 1.3 seconds for the second one.
That is, almost 4 seconds less or 400% faster. Not bad, eh?
Where’s the Magic?
The real magic is that PLINQ abstracts us from the entire parallelization process, task creation, threads, synchronization, and data consolidation.
And I think it does it very elegantly :-)
As we already know, LINQ queries are based on IEnumerable
However, in the PLINQ query when using the AsParallel() extension method we are transforming the input sequence from IEnumerable
The input sequence is partitioned and sent in chunks to different threads that invoke the IsPrime method returning true (T) or false (F), and then consolidates them into an output sequence that can be consumed.
However, the fact of parallelizing the work doesn’t guarantee that the result is returned in the same order, since it’s possible that one thread finishes before another and returns its partial result earlier than expected. So, if the ordering of the output data is important we have to go one step further.
PLINQ and Ordering
To ensure the ordering of the result set, just add the AsOrdered() method to the query. This method ensures correct ordering, at the cost of implementing a series of synchronization mechanisms. These mechanisms, logically delay the delivery time of results a bit, but it’s negligible. On my workstation it throws values of 1.311 seconds unordered versus 1.344 seconds ordered (barely 30 milliseconds). These results are the average of a series of 50 measurements, so they’re quite reliable.
Once the query is modified:
var query =
from n in numbers.AsParallel().AsOrdered()
where n.IsPrime()
select n;
Specifying the Degree of Parallelization
In most of the talks I’ve given about the TPL, people usually ask about functions that access external resources (services, sockets, etc.). In these cases a bottleneck clearly appears, not because a function needs to make intensive use of the CPU, but because it must wait for an external result. Here it’s usually interesting to specify the degree of parallelization we want. Another interesting case for specifying the degree of parallelization might be the typical producer/consumer scenario.
It’s interesting to note that when specifying the degree of parallelization we’re not forcing n partitions to be used, but simply specifying the maximum value:
var cores = Environment.ProcessorCount;
var query =
from n in numbers.AsParallel().AsOrdered().
WithDegreeOfParallelism(cores / 2)
where n.IsPrime()
select n;
This way, by defining the degree of parallelization as half the number of processor cores we ensure that (for example) we can have one thread that creates elements (producer) and another thread that consumes those elements (consumer).
Canceling a PLINQ Query
Sometimes, a PLINQ query can be canceled. Either because an error was found during the process and it’s no longer necessary to finish calculating the rest of the results, or simply because it was canceled by the user.
In these cases, you use a cancellation token. It comes from the CancellationTokenSource, which represents a possible cancellation and lets you cancel or check the state of an async task. It works with the whole Task Parallel Library, not just PLINQ.
Next, we’re going to modify the code from the example we’ve used so far to simulate an error and check how task cancellation works in PLINQ. To do this, the first thing we’re going to do is create an overload of the IsPrime method, which receives a parameter of type CancellationTokenSource, to be able to cancel the task:
public static bool IsPrime(this int n, CancellationTokenSource cs)
{
if (n == 1000) cs.Cancel();
return IsPrime(n);
}
As an example, when the number to calculate is 1,000 we’ll cancel the task, so it’s not necessary to reach 10 million. This way, on one hand an exception will be thrown and on the other hand the time to execute the PLINQ query will be much less.
private void plinq_cancellable()
{
var numbers = Enumerable.Range(1, 10000000);
using (var cs = new CancellationTokenSource())
{
var clock = Stopwatch.StartNew();
var query = numbers.AsParallel().AsOrdered().
WithCancellation(cs.Token).
Where(p => p.IsPrime(cs));
try
{
var result = query.ToArray();
}
catch (OperationCanceledException ex)
{
Console.WriteLine(ex.Message);
}
clock.Stop();
this.Text = clock.ElapsedMilliseconds.ToString("n2");
}
}
On one hand we have to be careful to wrap the query inside a try-catch block (in this case only the call to ToArray() which is really when the query is executed), and on the other hand we specify that the query can be canceled through WithCancellation.
Next we create an object of type CancellationTokenSource to manage the cancellation of this query. This object will be the one we finally pass to the IsPrime() method and in case it gets canceled it will cause its IsCancellationRequested property to return true and produce a nice OperationCanceledException exception.
PLINQ Limitations
I don’t want to extend much more because I think there’s enough material to make another post later on advanced topics. However, I want to make clear that there are some limitations in PLINQ, like the use of some operators (Take, SkipWhile) and the indexed versions of Select or ElementAt.
Additionally, there are other cases where for performance reasons it’s not recommended to use PLINQ in all cases, due to the overhead it can cause, like the use of Join, Union or GroupBy. However, we’ll address these issues later.
Coming soon we’ll see how to use the static Parallel class, optimized for working with iterative processes, those typical loops that all applications have.
Coming up next…
In the next post we’ll see how to use the static Parallel class, optimized for working with iterative processes, those typical loops that all applications have.

Resistance is futile.