How To: Get data from OLAP cubes

cuberubikToday I had to read some data from different OLAP cubes and show this information in our application. This could be accomplished using ADOMD.NET client library, which is the multidimensional equivalent to the ADO.NET libraries.

This library provides a set of classes (Connections, Commands and Readers) that allows developers to connect, query and get data from these multidimensional sources in the same way the classical ADO.NET library does.

Maybe the easiest way to read data is using the class AdomdDataReader. This object retrieves a read-only, forward-only, flattened stream of data from an analytical data source, and also implements the IDataReader interface (like the good-old DataReader).

In non dynamic scenarios, we developers, tend to use ORMs like NHibertate, EF or Dapper (my favorite), to translate data from our models to classes. But these ORMs doesn’t support multidimensional sources, because after all, they are only supersets or extensions of the ADO.NET model and are based on this existing object model.

So, our goal here will be replicate in a multidimensional environment the way Dapper allows you to connect to a server, read some data and (the most important part) return these data as a typed List<T> of objects.

Let’s see how it works before dig into the details:

First of all we have a SQL script that returns some sample data of 3 different measures year by year (gains in period, gains year-to-date and the amount of money).

mddata

And we’d like to translate these data into a generic List of my own DTO:

Notice that in C# properties can not contain spaces so we previously have to ‘map’ the DataReader column names into valid property names. The trick here is create an attribute for decorating each property with the equivalent column name in the reader.

The magic here is inside the generic Query<T> Method. This method is just an extension method of the AdomdConnection object (Dapper style) and it’s mission will be:

  1. Replace the parameters in the command string “…Where ({ @PortfolioId })”
  2. Execute the command against a valid connection
  3. Transform the returned IDataReader into a List<MyResultDTO>

We’ll focus on the third one. Let’s create another extension method of the IDataReader interface. OMG! Code on interfaces? Yes… I’ll burn in hell, I know it… :D

This is the extension method with some auxiliary functions:

EDIT (2018-12-11) : Added custom attribute to the post, used to decorate our DTO.

As you can see the method iterates over the reader’s data and uses reflection to create an object per each row, map each column’s value to the correct property and then add the new object to the returned collection (*). Easy!

adomd_result

(*) image courtesy of oz-code, the best debugger addin for visual studio #imho ;)

Hope this helps! ;)

How To: Connect via SFTP using SSH.NET

Overview

In these times of APIs-everywhere, it may sound like an anachronism the use SFTP to connect to a remote server and get a list of files for exchanging information, but in the financial world (sadly) it’s more common than you think.

In my current project I’ve to connect to a remote server via Secure File Transfer Protocol (aka SFTP) using a user name, a RSA private key and a phassphrase. Once connected, the goal is to read some files from a remote folder and download them to a local folder.

To accomplish this I recommend you to use SSH.NET, one of the most popular SSH libraries for .NET, available on nuget.

sshnet

More info about SSH.NET here: https://github.com/sshnet/SSH.NET/

Using SSH.NET to connect to the server

The most difficult part here is configure the connection, as usual. We need to provide the server url and port, in combination with the username and a file that contains the private key of the RSA certificate, and -of course- the passphrase.

Here’s the final code, and it works :)

Bonus * the trick *

I’ve spent a lot of time before it works, receiving a SSH exception “Invalid private key file”, but the same key file works fine when using the app Filezilla to connect, so… #WhatTheHell :S

PuTTY-User-Key-File-2: ssh-rsa
Encryption: aes256-cbc
Comment: rsa-key-KBL-20171006
Public-Lines: 6
AAB3NzaC1yc2EAAAJQAAAQEAnHp1CA4xF04ZdOQ/rsxJoW9fPJ2RD
FgMNVIqsUsjeRbIoZ2y8SMD9b7MMB0lpKXgJ2dYDgOnh2q
j4VTpEoI2JWh4NdQgSH0O+2oLmQnwgDPT7Kva095ggEQiqScX4+31aY02/nz
mK86sxq/sUsW/UqgS+pPViRQLVzDXFf8XIYSSZngmV+Rk108BQ==
Private-Lines: 14
khHfZWB0vBIFtKc4s98xGDFhwZNJQByUTtE7um0tcU4cwy1QTPf3GIuN
vyhHxIGx3LBtFJqzbZVJtOJVSYjkBTGbNc62D0uJCxYVf8PUUStI6GbOkkyDW/Vt
beWZ3s/DugsImjcbPxdEz/2X86uzd5U5v4/wGKQr8GWJtNksMcJ
k7JgRYGA/t0cSE2980MxhZOBg2Gn7+0A6mWgSf2Rr7hpcqsou1
hmc1HYtN7Oj4WT7hvRt8ZAC3/ekTdJ4K3K7vKglSHoQ
tnimHaanJHz4RGBb78Alllk+OYk3TN0Etcwod3401cLpjYYeq6veZLA/KfCHuiJ+
+Zqoy//NY9egfXd1hB0kmiemwO8wGfLS7ppS/WvPOknW9I8SNMllR1vmO3Hk6S3x
KfAG03ZWNoKDLvAIUllNyMpf9p8oKLF2ny4bJKsfNtr4Y0ejQJUFkC
uNz1S4VJ8j1fRcIjx4yT121B9BDfp486RUmnEgsOFEtmVyHPfNxYzDXq2MjTf4l/
MI5cKWHrDbDC/Cu3YvkF3gekLAb/j9Cie/feHmSnbuZ2VEr6zUt10yaH4hPejCOw
FYDZb0I8xxxWJZ6BbpLWDqeD0Oiss8UnDhha/iKvodA9LIG0T
VRlScvGvuzClGYkc7UIWIoARvdxp46YlGMu4mWGeVkNcrxXnmUkdKyNqGjAGJoK/
Private-MAC: d288fffe72914eb62ed60a7e50fd8ce775

The key file (*.ppk) looks something similar to this

This is because the private key must be compatible with SshNet, so we have to convert the private key using PuTTY key Generator (the same app we used to create the certificate). Once opened in PuTTY, just export your file key to OpenSSH and use this new ppk file instead of the previous one.

exportosk

If you open the new file you will see that the key file now begins with

—–BEGIN RSA PRIVATE KEY—–

And ends with

—–END RSA PRIVATE KEY—–

Finally, this is a valid SSH private key and it works like a charm ;)

Hope you enjoy it!

DotNet Spain Conference 2015

Este fin de semana he tenido la oportunidad de participar en el mayor evento de la comunidad .NET en España hasta la fecha.

Se ha realizado en la Universidad de Alcalá, con record de asistentes. Cosa que no me extraña viendo la cantidad de charlas y hands on lab que se han realizado en varios tracks en paralelo. La verdad es que ha sido un éxito a todos los niveles, y sólo puedo decir que va a ser difícil superarlo en futuras ediciones. Se ha puesto el listón muy alto! :D

dotnetspain

En lo que respecta a mi, tuve la gran suerte de presentar -y que me aprobaran- una charla conjunta con mi colega Alex Casquete, sobre buenas prácticas sobre async, algo que precisamente no es mainstream pero que curiosamente ha tenido una aceptación muy buena. De hecho ha sido mejor de la que yo pensaba que podía tener en un principio, así que muchas gracias a todos por tanto feedback positivo!

Os dejo aquí las slides de la charla y las demos que utilizamos, ya que -para mi sorpresa- me las ha pedido bastante gente:

Sin embargo, en un evento como este no todo son sesiones técnicas. Lo mejor de todo es que he tenido la oportunidad de volver a encontrarme con un montón de viejos conocidos, con los que compartir buenos momentos y algunas cervezas, de modo que el networking ha sido… fantástico!

Os dejo con algunas fotos del eventazo. Gracias a todos! :D

Parallel Series: Tasks, la 8ª maravilla

Nota: Antes de nada quiero disculparme por haber dejado sin publicar en esta serie durante tanto tiempo, de hecho casi un año. Afortunadamente no ha sido ningún problema de salud, si no el tener demasiados frentes abiertos. Ahora que parece que se van cerrando algunos pienso aprovechar para terminar la serie y embarcarme en algún otro nuevo que ya os contaré.

The show must go on

En los posts previos hemos hablado de PLINQ y de la clase Parallel, dos características que facilitan mucho la ejecución de datos en paralelo, sin embargo si alguien me preguntase cual es la característica que más me gusta de la Task Parallel Library lo tendría muy claro: El manejo de tareas asíncronas mediante la clase Task.

Durante muchos años los desarrolladores hemos tenido que lidiar -más ben pelearnos- con la multitarea y la ejecución de código asíncrono. Ya dese las primeras versiones de C# podíamos crear hilos a mano mediante la clase Thread, pero el proceso distaba mucho de ser sencillo y además adolecía de cierta complejidad para manejar cancelaciones o actualizar la interfaz de usuario. Y aunque han habido intentos de mejora como la interfaz IAsyncResult también han aparecido engendros infumables como el BackgroundWorker, el cual es tan malo como hacerse cirugía cerebral con manoplas uno mismo.

Task al rescate!

Así que cuando apareció la TPL y la clase Task los desarrolladores encontramos por fin un método simple y cómodo para ejecutar código asíncrono y además en paralelo. Y esto es realmente muy importante ya que hoy en día en muchas aplicaciones (al menos las que están bien diseñadas) se ejecutan tareas en paralelo para acceder a recursos externos o ‘costosos’, bases de datos, y sobre todo para actualizaciones de la interfaz de usuario. De hecho es tan importante que será una de las mejoras más importantes en la siguiente versión de C# 5.0 de la cual hablaremos al final de la serie.

TaskClass

Actions everywhere

Al igual que la clase estática Parallel y gran parte de la TPL, la clase Task se basa en acciones, de modo que si no las controlas demasiado dale una ojeada al post que publiqué hace un tiempo sobre el tema.

En su sintaxis más básica, se puede utilizar de este modo:

var t = new Task(() => {
    Thread.Sleep(1000);
    Console.WriteLine("A");
    });
t.Start();
Console.WriteLine("B");

O lo que es lo mismo:

Task.Factory.StartNew(() => {
    Thread.Sleep(1000);
    Console.WriteLine("A");
    });
Console.WriteLine("B");

La única diferencia es que en la primera declaramos la variable especificando la acción a ejecutar y luego la ejecutamos explícitamente mediante su método ‘Start’, y en la segunda no utilizamos ninguna variable, sólo especificamos y ejecutamos la acción mediante el método ‘StartNew’ de la clase ‘Task.Factory’.

Observando el código anterior, qué os pensáis que se escribirá primero? A o B? Evidentemente B, ya que podemos imaginar cómo el código no se detiene en el Thread.Sleep(1000) (éste se ejecuta en otro Thread) de modo que ejecuta inmediatamente el print ‘B’. Correcto.

Pero las tareas son mucho más, permiten desde devolver resultados hasta manejar éstas ‘unidades de trabajo’ encadenando tareas a continuación de otras, esperando a que terminen una o un grupo de tareas antes de ejecutar otra, cancelar una tarea o propagar excepciones entre ellas.

1) Devolver un valor desde una tarea…

Como en un método, una tarea puede devolver desde un tipo básico (int, string) hasta cualquier tipo complejo. Para hacer el ejemplo más interesante vamos a utilizar un método que examina la red local en busca de servidores SQL Server y devuelve una lista de strings:

public static List<string> GetSQLServerNames()
{
    List<string> sqlservernames = new List<string>();
    sqlservernames.Add("local");
    SqlDataSourceEnumerator enumSQLServers = SqlDataSourceEnumerator.Instance;
    foreach (DataRow server in enumSQLServers.GetDataSources().Rows)
    {
        if (server["InstanceName"] is System.DBNull)
            sqlservernames.Add(server["ServerName"].ToString());
        else
            sqlservernames.Add(
                string.Format("{0}\\{1}", server["ServerName"], server["InstanceName"]));
    }
    Console.WriteLine("end get servers");
    return sqlservernames;
}

La ventaja de usar este método es que tarda unos segundos en ejecutarse, con lo cual es un candidato perfecto para ser ejecutado de forma asíncrona:

var getServersAsync = new Task<List<string>>(() => GetSQLServerNames());
getServersAsync.Start();
Console.WriteLine("end call");

Si lanzamos este código observaremos que se imprime ‘end call’ inmediatamente, y tarda unos segundos en imprimir ‘end get servers’. Realmente se está ejecutando asíncronamente!

2) …y al terminar continuar con otra tarea

Ahora supongamos que tenemos otro método que se encarga de actualizar la interfaz de usuario  a partir de la lista anterior:

private void updateServersListUI(List<string> servers)
{
    comboBox1.Items.Clear();
    servers.ForEach(p => comboBox1.Items.Add(p));
}

¿No sería lógico que lo hiciese al terminar la tarea anterior? Pues la verdad es que si, y encadenar tareas es algo trivial y que ofrece mucha potencia al desarrollador. De hecho estoy seguro que ya se os ha ocurrido alguna aplicación ;)

Encadenar tareas es tan sencillo como utilizar el método ‘ContinueWith’:

var getServersAsync = new Task<List<string>>(() => GetSQLServerNames());
getServersAsync.Start();
Console.WriteLine("end button2");
getServersAsync.ContinueWith((p) => updateServersListUI(getServersAsync.Result));

Sobre el papel debería funcionar pero no lo va a hacer, ya que hasta ahora no nos hemos fijado en un detalle: En la plataforma .NET no es posible actualizar un control desde otro hilo distinto al que lo ha creado, y toda la interfaz de usuario se crea en el main thread o hilo principal. Y esta limitación todavía existe.

Estableciendo el contexto de ejecución

Antes de la TPL para actualizar la interfaz de usuario desde otro hilo debíamos utilizar el método ‘Invoke’ de la clase ‘Control’, de modo que deberíamos modificar el método anterior de este modo:

private void updateServersListUI(List<string> servers)
{
    if (this.InvokeRequired) this.Invoke(new Action(() =>
    {
        comboBox1.Items.Clear();
        servers.ForEach(p => comboBox1.Items.Add(p));
    }));
}

Pero no va a ser necesario, ya que como parte de la magia de la TPL se nos ofrece la posibilidad de llamar a ‘TaskScheduler.FromCurrentSynchronizationContext’ que nos permite acceder a la interfaz de usuario de forma segura. Así pues lo único que hay que modificar el código anterior es encadenar la segunda tarea usando el contexto de sincronización antes mencionado y olvidarnos de la llamada a Invoke:

getServersAsync.ContinueWith((p) => updateServersListUI(getServersAsync.Result),
TaskScheduler.FromCurrentSynchronizationContext());

Esperando la ejecución de varias tareas

Otra característica muy interesante es la posibilidad de esperar a que termine un grupo de tareas entero, o sólo una de ellas. Supongamos que tenemos unja serie de tareas que se encargan de aplicar unos efectos a una serie de imágenes:

var t1 = Task.Factory.StartNew(
    () => pictureBox1.Image = Properties.Resources.Landscape08.Invert());
var t2 = Task.Factory.StartNew(
    () => pictureBox2.Image = Properties.Resources.Landscape08.Grayscale());
var t3 = Task.Factory.StartNew(
    () => pictureBox3.Image = Properties.Resources.Landscape08.Brightness(140));
var t4 = Task.Factory.StartNew(
    () => pictureBox4.Image = Properties.Resources.Landscape08.Contrast(80));
var t5 = Task.Factory.StartNew(
    () => pictureBox5.Image = Properties.Resources.Landscape08.Gamma(1, 5, 1));
var t6 = Task.Factory.StartNew(
    () => pictureBox6.Image = Properties.Resources.Landscape08.Color(255, 0, 0));

Podría ser interesante no seguir ejecutando el código hasta que la primera termine, o hasta que terminen todas, o hasta que terminen todas pero con un timeout. O sea, que si no terminan todas en 100 milisegundos seguir con la ejecución:

Task.WaitAny(new Task[] {t1, t2, t3, t4, t5, t6});
Task.WaitAll(new Task[] {t1, t2, t3, t4, t5, t6});
Task.WaitAll(new Task[] {t1, t2, t3, t4, t5, t6}, 100);

Otra forma de conseguir esto es mediante la creación de una tarea, especificando que debe esperar a que se complete alguna tarea o todas las especificadas:

var t7 = Task.Factory.ContinueWhenAll(new[] { t1, t2, t3, t4, t5, t6 }, (t) =>
    {
    //DoSomething...
    });

Cancelando tareas

Al igual que en posts anteriores, la clase Task también admite cancelaciones, y a mi juicio suelen ser más utilizadas que sus equivalentes en PLINQ o Parallel, ya que pueden permitir a un usuario cancelar el acceso a un recurso que tarda más de lo previsto (una URL p.e.) y las tareas que debían ejecutarse a continuación.

Partiendo de la base de un método que hace un trabajo largo:

private void DoALongWork(CancellationTokenSource cs)
{
    try
    {
        for (int i = 0; i < 100; i++)
        {
            Thread.Sleep(10);
            cs.Token.ThrowIfCancellationRequested();
        }
    }
    catch (OperationCanceledException ex)
    {
        Console.WriteLine(ex.Message);
    }
}

Podemos cancelar la tarea llamando al método ‘Cancel’ del token:

var cs = new CancellationTokenSource();
var t = Task.Factory.StartNew(
    () => DoALongWork(cs)
    );
Thread.Sleep(500);
cs.Cancel();

Y al cancelarse el código entrará en el catch del método ‘DoALongWork’ en el que se realizarán las acciones apropiadas a la cancelación.

Cancelación y estado

Para terminar, en algunos casos puede ser interesante que en caso de éxito la tarea llame a una segunda, pero en cambio si se cancela llame a una tercera. Para poder hacer esto debemos preguntar por el estado de la primera tarea y realizar un pequeño truco: La excepción no debe ser interceptada en un bloque try!

Así pues el código del método quedaría de este modo (sin try):

private void DoALongWork(CancellationTokenSource cs)
{
    for (int i = 0; i < 100; i++)
    {
        Thread.Sleep(10);
        cs.Token.ThrowIfCancellationRequested();
    }
}

Y la llamada de este otro:

var cs = new CancellationTokenSource();
var t = Task.Factory.StartNew(
    () => DoALongWork(cs)
    );
Thread.Sleep(500);
cs.Cancel();

t.ContinueWith(task =>
    {
        switch (task.Status)
        {
            case TaskStatus.Faulted:
                MessageBox.Show("Fail");
                break;
            case TaskStatus.RanToCompletion:
                MessageBox.Show("Success");
                break;
        }
    }, TaskScheduler.FromCurrentSynchronizationContext());

Como podéis observar, podemos cambiar el flujo del código en función del estado de la tarea.

Importante: Para poder ejecutar este último ejemplo es necesario ejecutar sin depuración (Ctrl+F5)

Bueno, nos hemos dejado algunas cosas en el tintero, pero para no hacer muy largo éste post las veremos más adelante, en otros posts avanzados de la misma serie.

Hasta entonces prometo actualizar la serie con más frecuencia que hasta ahora ;)

Volver al índice de contenidos

Parallel Series: La clase estática Parallel

3 métodos para los reyes elfos bajo el cielo

Hoy quiero hablaros de la clase estática Parallel. Esta clase provee soporte para paralelizar bucles y regiones, y al igual que PLINQ su uso es muy sencillo. Cabe destacar que está especialmente optimizada para iteraciones, y que en este contexto se desenvuelve un poco mejor que PLINQ. No hay una diferencia significativa en tiempos absolutos, pero puede verse perfectamente si utilizamos el magnífico profiler de Visual Studio 2010. No obstante, pueden existir situaciones en las que si se necesita afinar mucho el rendimiento en iteraciones, y aquí es dónde tiene más sentido utilizar dos de los tres métodos de esta clase: For y ForEach. Al tercero lo llamaremos Cirdan y apenas aparecerá en esta historia (en realidad me refiero a Invoke pero tampoco aparecerá por aquí).

Parallel_Class

Comprendiendo las acciones

Los dos métodos tienen una firma muy similar en su forma más sencilla. Ambos iteran sobre una serie de instrucciones realizando n veces cada una de ellas. Y aquí es dónde vemos aparecer los parámetros de tipo Action:

public static ParallelLoopResult For
    (int fromInclusive, int toExclusive, Action<int> body)
public static ParallelLoopResult ForEach<TSource>
    (IEnumerable<TSource> source, Action<TSource> body)

Un Action<T>, al igual que su hermano Func<T> es uno de los elementos de C# importados de la programación funcional, y desde el momento en que uno se acostumbra a usarlo, cuesta pensar cómo ha podido desarrollar toda su vida anterior. Si no, los que estéis acostumbrados a usar expresiones lambda en LINQ, imagináos que desaparecen de un día para otro.

No quiero empezar a divagar ahora sobre programación funcional, aunque si que quiero hacer incapié en el uso de Actions y lo importantes que se han vuelto en los últimos años. De hecho, recientemente he dedicado un post a cómo Action y Func han simplificado mucho el trabajo con delegados a los desarrolladores.

El método Parallel.For

Pero volviendo al tema que nos ocupa, si observamos la firma del método Parallel.For podremos ver que en lo importante no difiere demasiado de su homólogo for de toda la vida: Ambos tienen un inicio, un final y unas acciones a realizar un número determinado de veces.

Así que partiendo del método IsPrime que ya utilizamos en el anterior post sobre PLINQ, vamos a ver una comparativa entre las sintaxis de éstos dos métodos:

for (int i = 0; i < 100; i++)
{
    if(i.IsPrime())
        Console.WriteLine(string.Format("{0} es primo", i));
    else
        Console.WriteLine(string.Format("{0} no es primo", i));
}

Parallel.For(0, 100, (i) =>
    {
        if (i.IsPrime())
            Console.WriteLine(string.Format("{0} es primo", i));
        else
            Console.WriteLine(string.Format("{0} no es primo", i));
    });

En ambos  casos tenemos una serie de líneas que deben ejecutarse 100 veces. Concretamente desde 0 hasta 99, ya que el elemento superior no se incluye en ninguno de los dos casos. Sólo se ve un poco extraño el uso del Action<T>, pero podéis pensar en que la variable int i del primer bucle for, aquí se transforma en la parte (i) a la izquierda de la expresión lambda (=>). Y las acciones a ejecutar del primer for son exactamente iguales y van a la derecha de la expresión lambda.

So, let’s parallelize!

Viéndolo de este modo debe resultar extremadamente sencillo transformar todos nuestros bucles de este modo, así que ¿debemos hacerlo? La respuesta es NO.

En algunas ocasiones no vamos a obtener rendimiento por el hecho de paralelizar, ya que si el trabajo a realizar es mínimo, tardaremos más tiempo en dividir el trabajo en distintos threads, ejecutarlos y consolidar la información que en ejecutar la tarea sin paralelizar. También podría ser que nos encontrásemos un cuello de botella externo en un dispositivo de I/O, como un puerto, un servidor remoto o un socket.

Otro claro ejemplo de esto son los bucles anidados. Es común anidar varias estructuras for o foreach para realizar ciertos algoritmos. En este caso el candidato a ser paralelizado siempre es el bucle exterior y no es necesario (de hecho sería contraproducente) paralelizar los bucles internos:

Parallel.For(0, 100, (z) =>
    {
        for (int i = 0; i < 100; i++)
        {
            if (i.IsPrime())
                Console.WriteLine(string.Format("{0} es primo", i));
            else
                Console.WriteLine(string.Format("{0} no es primo", i));
        }
    });

Por lo pronto resulta bastante evidente, ya que si paralelizamos en bucle exterior necesitaríamos un ordenador con 100 cores y evidentementemente todavía no existen, así que la TPL tiene que agrupar estas tareas para adaptarlas a los cores disponibles, tardando cierto tiempo en hacer la sincronización (parecido a los primeros ejemplos con monos de la serie). Imagináos entonces si paralelizamos ambos bucles: 100 x 100 = 10.000 cores? Simplemente no tiene sentido.

Mi consejo es que en todos los casos en los que se decida paralelizar un bucle (y esto también vale para las consultas PLINQ) se realice primero una comparativa de rendimiento.

ParallelFor_Profiling

El método Parallel.ForEach

En cuanto al método ForEach es prácticamente igual al anterior con la salvedad que no tenemos un inicio y un final, sino una secuencia de entrada de datos (basada en IEnumerable, como PLINQ) y una variable que usamos para iterar por cada uno de los elementos de la secuencia y realizar una serie de acciones.

Consideremos el siguiente código:

List<FeedDefinition> feeds = new List<FeedDefinition>();
clock.Restart();
var blogs = FeedsEngine.GetBlogsUrls();
foreach (var blog in blogs)
{
    feeds.AddRange(FeedsEngine.GetBlogFeeds(blog));
}
clock.Stop();
this.Text = clock.ElapsedMilliseconds.ToString("n2");
feeds.ForEach(p => Console.WriteLine(p.Name));

Suponiendo que tenemos un método FeedsEngine.GetBlogsUrls que devuelve una lista de urls de proporcionan contenido RSS, el código anterior se conecta a cada una de las urls e intenta descargar toda la información de los posts mediante un método FeedsEngine.GetBlogFeeds(blog).

Nota: El código completo lo podréis encontrar en el post (todavía no publicado) ‘Código de ejemplo de las Parallel Series’, que contiene todos los ejemplos de todos los posts de la serie.

Como podéis imaginar este proceso totalmente secuencial es un serio candidato a ser paralelizado, ya que la mayoría del tiempo de este proceso es tiempo desperdiciado intentando a conectar con un servidor externo y que éste responda a las peticiones. En este caso paralelizar va a ser de gran ayuda aunque es importante comprender que en este caso la ganancia de rendimiento no va a ser por usar más potencia local, sino por lanzar las peticiones a los distintos servidores de forma asíncrona.

Así pues, basta cambiar la parte del bucle foreach por su versión paralelizada:

Parallel.ForEach(blogs, (blog) =>
    {
        feeds.AddRange(FeedsEngine.GetBlogFeeds(blog));
    });

En la que definimos la secuencia de datos a utilizar y declaramos la variable blog al vuelo (el compilador infiere el tipo automáticamente) a la izquierda de la expresión lambda, y a la derecha las acciones que deseamos realizar, que son exactamente iguales a la anterior versión foreach.

Y comprobaremos como se ejecuta mucho más rápido. En mi estación de trabajo pasamos de 6,7 segundos a 1,4 lo que no está nada mal.

Explorando más opciones

En la clase Parallel al igual que en las consultas PLINQ, existe la posibilidad de especificar el grado de paralelismo así como de cancelar la ejecución de un bucle. Sólo debemos usar una de las sobrecargas que utiliza un objeto de tipo ParallelOptions.

private void button11_Click(object sender, EventArgs e)
{
    CancellationTokenSource cs = new CancellationTokenSource();
    var cores = Environment.ProcessorCount;
    clock.Restart();
    var options = new ParallelOptions() {
        MaxDegreeOfParallelism = cores / 2,
        CancellationToken= cs.Token };
    try
    {
        Parallel.For(1, 10, options,
            (i) =>
            {
                dowork_cancel(i, cs);
            });
    }
    catch (Exception ex)
    {
        MessageBox.Show(ex.Message);
    }
    clock.Stop();
    this.Text = clock.ElapsedMilliseconds.ToString("n2");
}

void dowork_cancel(int i, CancellationTokenSource cs)
{
    Thread.Sleep(1000);
    if (i == 5) cs.Cancel();
}

En el caso anterior especificamos un grado de paralelización de la mitad del número de cores y preparemos la consulta para su posible cancelación (algo que simulamos en el interior del método dowork_cancel al llegar el contador a 5).

Próximanante en sus pantallas…

Todavía no he terminado ni la mitad de los posts de esta serie y ya estoy viendo que sería muy interesante ampliarla, mostrando ciertas características avanzadas que -por extensión- no quiero incluir en estos posts, más introductorios.

Más adelante veremos cómo salir o parar (no es lo mismo) un bucle parallelizado mediante un objeto ParallelLoopState, lidiar con variables compartidas o inicializar variables locales a cada partición. Pero eso lo dejamos para los posts avanzados de las Parallel Series.

Volver al índice de contenidos

Parallel Series: Parallel LINQ (PLINQ)

LINQ power!

Creo que estaremos todos de acuerdo en que LINQ ha supuesto una revolución en la forma de desarrollar, y ha hecho que muchos desarrolladores de otros lenguajes nos miren con cierto tono de envidia… E incluso que otras plataformas estén haciendo serios esfuerzos para incorporarlo en sus Frameworks :-)

Ahora, con la llegada de la Task Parallel Library, se abre un mundo de posibilidades gracias a PLINQ, que permite -de forma extremadamente sencilla- convertir cualquier consulta LINQ secuencial en una consulta paralelizable, permitiendo su segmentación y ejecución en los distintos cores en paralelo.

Es decir, cualquier consulta LINQ como ésta, en la que tenemos un array llamado numbers y un método IsPrime que devuelve un valor boolean en función de si un número es primo:

var query =
    from n in numbers
    where n.IsPrime()
    select n;

Puede ser paralelizada simplemente agregando esto:

var query =
    from n in numbers.AsParallel()
    where n.IsPrime()
    select n;

Método IsPrime:

public static class BaseTypesExtensions
{
    public static bool IsPrime(this int n) //1 = false, 2 = true, 3 = true...
    {
        if (n <= 1) return false;
        if ((n & 1) == 0)
        {
            if (n == 2) return true;
            else return false;
        }
        for (int i = 3; (i * i) <= n; i += 2)
        {
            if ((n % i) == 0) return false;
        }
        return n != 1;
    }
}

Partiendo de que el array de números contiene 10 millones de números enteros, y de que mi estación de trabajo actual tiene un procesador i7 con 8 cores, el resultado es abrumador:

La consulta LINQ tarda  5,2 segundos frente a los 1,3 segundos de la segunda.

Es decir, casi 4 segundos menos o un 400% más rápido.

¿Dónde está la magia?

La magia verdadera es que PLINQ nos abstrae de todo el proceso de paralelización, creación de tareas, threads, sincronización y consolidación de los datos.

Y además creo que lo hace de forma muy elegante :-)

Como ya sabemos, las consultas LINQ to objects se basan en IEnumerable<T> (gracias Generics!) que expone un enumerador para recorrer los elementos de una secuencia de elementos de tipo T. Esto hace que todas las colecciones que puedan devolverse en este tipo de consultas (IOrderedEnumerable, IQueryable, etc.) implementen esta interfaz. Hasta aquí nada nuevo bajo el sol.

Sin embargo, en la consulta PLINQ al utilizar el método extensor AsParallel() estamos transformando la secuencia de entrada de IEnumerable<T> a ParallelQuery <T> permitiendo la segmentación de los elementos de la secuencia y ejecutando cada uno de los segmentos en un thread distinto. Y por supuesto, repartiendo el trabajo en los diversos cores (si los hay).

AsParallel

La secuencia de entrada se particiona y se manda por fragmentos a distintos threads que invocan al método IsPrime devolviendo true (T) o false (F), y posteriormente los consolida en una secuencia de salida que puede ser consumida.

No obstante, el hecho de paralelizar el trabajo no garantiza que el resultado sea devuelto en el mismo orden, ya que es posible que un thread termine antes que otro y devuelva su resultado parcial antes de lo esperado. Así que, si la ordenación de los datos de salida es importante tenemos que ir un paso más allá.

asordered

Los primeros elementos deberían ser 2, 3, 5, 7… no 59 y 71 ¿?

PLINQ y la ordenación

Para asegurar la ordenación del conjunto de resultados, basta agregar el método AsOrdered() a la consulta. Este método asegura la correcta ordenación, a costa de implementar una serie de mecanismos de sincronización. Estos mecanismos, lógicamente retardan un poco el tiempo de entrega de los resultados, pero es despreciable. En mi estación de trabajo se arrojan unos valores de 1,311 segundos sin ordenar frente a 1,344 segundos ordenados (apenas 30 milésimas). Estos resultados son la media de una serie de 50 mediciones, con lo que son bastante fiables.

Una vez modificada la consulta:

var query =
    from n in numbers.AsParallel().AsOrdered()
    where n.IsPrime()
    select n;

El resultado es claro:

asordered2

Especificar el grado de paralelización

En la mayoría de las charlas que he dado sobre la TPL se acostumbra a preguntar respecto a funciones que acceden a recursos externos (servicios, sockets, etc.). En estos casos aparece claramente un cuello de botella, y no porque una función necesite hacer uso intensivo de la CPU, sino porque debe esperar un resultado externo. Aquí suele ser interesante especificar el grado de paralelización de deseamos. Otro caso interesante para especificar el grado de paralelización puede ser el típico escenario de productor/consumidor.

Es interesante notar que al especificar el grado de paralelización no estamos forzando a que se usen n particiones, sino que simplemente estamos especificando el valor máximo:

var cores = Environment.ProcessorCount;
var query =
    from n in numbers.AsParallel().AsOrdered().
        WithDegreeOfParallelism(cores / 2)
    where n.IsPrime()
    select n;

De este modo, al definir el grado de paralelización en la mitad del número de cores del procesador nos aseguramos que (por ejemplo) podremos tener un hilo que vaya creando elementos (productor) y otro hilo que vaya consumiendo dichos elementos (consumidor).

Cancelación de una consulta PLINQ

En ocasiones, una consulta PLINQ puede ser cancelada. Bien porque durante el proceso se ha encontrado un error y ya no es necesario terminar de calcular el resto de resultados, o simplemente porque ha sido cancelada por el usuario.

Es estos casos, es necesario utilizar un token de cancelación. Este token tiene su origen en la estructura CancellationTokenSource, que representa ‘una potencial cancelación’ y proporciona los mecanismos para cancelar y comprobar el estado de una tarea asíncrona, de modo que puede utilizarse con todos los elementos de la Task Parallel Library, no sólo con PLINQ.

A continuación, vamos a modificar el código del ejemplo que hemos usado hasta ahora para simular un error y comprobar el funcionamiento de la cancelación de tareas en PLINQ. Para ello lo primero que vamos a hacer es crear una sobrecarga del método IsPrime, que reciba un parámetro de tipo CancellationTokenSource, para poder cancelar la tarea:

public static bool IsPrime(this int n, CancellationTokenSource cs)
{
    if (n == 1000) cs.Cancel();
    return IsPrime(n);
}

A modo de ejemplo, cuando el número a calcular sea 1.000 cancelaremos la tarea, de modo que no sea necesario llegar a los 10 millones. De este modo, por un lado se lanzará una excepción y por otro el tiempo en ejecutar la consulta PLINQ será mucho menor.

private void plinq_cancellable()
{
    var numbers = Enumerable.Range(1, 10000000);
    using (var cs = new CancellationTokenSource())
    {
        clock.Restart();
        var query = numbers.AsParallel().AsOrdered().
            WithCancellation(cs.Token).
            Where(p => p.IsPrime(cs));
        try
        {
            var result = query.ToList();
        }
        catch (OperationCanceledException ex)
        {
            Console.WriteLine(ex.Message);
        }
        clock.Stop();
        this.Text = clock.ElapsedMilliseconds.ToString("n2");
    }
}

Por un lado tenemos que tener la precaución de envolver la consulta dentro de un bloque try-catch (en este caso sólo la llamada a ToArray() que es realmente cuando se ejecuta la consulta), y por el otro especificamos que la consulta puede ser cancelada mediante WithCancellation. A continuación creamos un objeto de tipo CancellationTokenSource para administrar la cancelación de esta consulta. Este objeto será el que finalmente pasemos al método IsPrime() y en caso que se cancele provocará que su propiedad IsCancellationRequested devuelva true y que se produzca una bonita excepción de tipo OperationCanceledException.

WithCancellation

Limitaciones de PLINQ

No quiero extenderme mucho más porque creo que hay material suficiente para hacer un post más adelante sobre temas avanzados. Sin embargo quiero dejar claro que existen algunas limitaciones en PLINQ, como el uso de algunos operadores (Take, SkipWhile) y de las versiones indexadas de Select o ElementAt.

Además existen otros casos en los que por cuestiones de rendimiento no es recomendable usar PLINQ en todos los casos, debido al sobrecoste que puede llegar a ocasionar, como el uso de Join, Union o GroupBy. Sin embargo, trataremos éstas cuestiones más adelante.

Próximamente veremos cómo utilizar la clase estática Parallel, optimizada para trabajar con procesos iterativos, esos típicos bucles que todas las aplicaciones tienen.

Volver al índice de contenidos