Problemas de concurrencia

Why concurrent collections?

En un mundo en el que los procesos ya no son secuenciales sino paralelos, es cada vez más posible encontrarnos con problemas de concurrencia al acceder a recursos compartidos. Conceptualmente hablando, esto es algo a los que los desarrolladores ya estamos acostumbrados cuando trabajamos con gestores de bases de datos como Oracle o SQL Server, ya que varios usuarios pueden acceder o modificar la información al mismo.

Sin embargo, la gran mayoría de los desarrolladores pocas veces hemos tenido que lidiar con bloqueos en colecciones en menoria, ya que no todo el mundo crea aplicaciones en las que varios threads acceden a recursos compartidos. De hecho, si alguna vez has lo tenido que hacer sabrás perfectamente que antes de la aparición de la TPL era una de las disciplinas más complejas dentro del desarrollo de software. Algo que favorece la calvície :)

Sin embargo, desde la aparición de la TPL en el .NET Framework 4.0 es mucho más sencillo desarrollar aplicaciones que ejecuten procesos en paralelo o de forma asíncrona, pero esto conlleva que en ocasiones nos olvidemos que hay algunos threads que se ejecutan al mismo tiempo, y esto podría llevar a producir efectos no deseados cuando se trata de acceder a recursos compartidos, como una colección de elementos en memoria.

Por ejemplo: supongamos un escenario en el que tenemos una lista de clientes en memoria y un par de tareas (Task) que acceden a esa colección. La primera tarea podría estar verificando si todos los clientes cumplen una condición X, y si no la cumplen eliminar el cliente de la colección. Mientras tanto la segunda tarea podría estar actualizando alguna propiedad de los clientes, como la edad.

Un escenario real

Encierra este escenario algún peligro? A priori podemos pensar que no. Basta con que la segunda tarea verifique si el cliente existe en la colección antes de actualizarlo. Seguro? Pues no. Al menos no basta si la colección que estamos usando no es una de las nuevas definidas dentro del namespace System.Collections.Concurrent. Y para verlo mejor, hagamos un pequeño ejemplo con código, que es lo que nos gusta a todos.

1 – Partiremos de una clase base Customers con un método que crea n objetos de ejemplo:

public class Customer
{
    public int CustomerId { get; set; }
    public string Name { get; set; }
    public int Age { get; set; }

    public static List<Customer> GetSampleCustomers(int n)
    {
        Random r = new Random(Guid.NewGuid().GetHashCode());
        List<Customer> customers = new List<Customer>();
        for (int i = 0; i < n; i++)
        {
            customers.Add(new Customer()
            {
                CustomerId = i,
                Name = string.Format("Customer {0}", i),
                Age = r.Next(20, 80)
            });
        }
        return customers;
    }
}

2 – A continuación en un formulario declararemos e inicializaremos una colección de tipo Dictionary para almacenar estos objetos cliente:

Dictionary<int, Customer> dic1 = new Dictionary<int,Customer>();

3 – Y en el constructor la rellenamos con 100.000 clientes de ejemplo:

public Form1()
{
    InitializeComponent();
    dic1 = Customer.GetSampleCustomers(100000).ToDictionary(p => p.CustomerId);
}

4 – Ahora, vamos a complicar un poco más el ejemplo y crearemos dos métodos que simulen el borrado y la actualización de los objetos que contiene la colección. Luego llamaremos a éstos métodos para 100 objetos aleatorios de forma paralela para ver que efectos se producen:

private void deleteItem1(int id)
{
    if (dic1.ContainsKey(id) && dic1[id].Age < 30) dic1.Remove(id);
}

private void updateItem1(int id)
{
    if (dic1.ContainsKey(id)) dic1[id].Age++;
}

private void button1_Click(object sender, EventArgs e)
{
    for (int i = 0; i < 100; i++)
    {
        Random r = new Random(Guid.NewGuid().GetHashCode());
        int id = r.Next(1, dic1.Count);
        Task.Factory.StartNew(() => deleteItem1(id));
        Task.Factory.StartNew(() => updateItem1(id));
    }
}

Ejecutamos la aplicación y cuendo pulsemos el botón puede pasar que obtengamos un bonito error como este:

image

Concretamente en la línea que incrementa la edad del cliente:

image

Y digo que puede pasar, pero no es seguro. Tal vez tengamos que repetir el proceso varias veces, y es que en un entorno en el que varios threads acceden a un recurso compartido nada nos asegura que se produzca el error. A veces pasa a la primera y a veces a la cuarta, pero hay que tener presente que cuando esté en producción siempre pasará en viernes por la tarde cinco minutos antes de empezar el fin de semana ;)

Analicemos el porqué del error, por que ¿cómo es posible que no encuentre el elemento en la colección si precisamente en la línea anterior verificamos su existencia? Pues ahí está la gracia, que lo verificamos en la línea anterior. A nosotros -pobres developers- nos parece que la línea anterior en la que verificamos si existe y la línea actual en la que lo emininamos van seguidas una detrás de otra, pero realmente en un entorno asíncrono hay todo un mundo entre ambas. Y eso es porque ambas líneas se ejecutan de forma separada, sin ningún tipo de bloqueo, no existe Transaccionalidad al estilo de las bases de datos, de modo que mientras el primer thread verifica que existe ese elemento en la colección y lo borra, llega el segundo thread y… ZASCA! Lo elimina.

image

El nuevo ConcurrentDictionary

Para estos casos (y muchos otros) aparecen en escena un nuevo conjunto de colecciones especializadas en entornos multithreading. Una de las más populares es una variación del clásico Dictionary, el ConcurrentDictionary. Esta clase proporciona una série de métodos alternativos para agregar, modificar o eliminar elementos llamados TryAdd, TryUpdate o TryRemove. La ventaja de éstos métodos es que son transaccionales y proporcionan Atomicidad. Es decir, garantizan que en caso de que se invoquen, se ejecuten totalmente: O bien se añade/modifica/elimina el elemento de la colección o bien se devuelve el valor False porque no se ha podido realizar la operación. También proporciona un método TryGetValue que intenta obtener un elemento de la colección. Todos estos métodos implementan en su interior bloqueos (lock), para asegurar que nadie más accede al elemento mientras se realiza la operación solicitada.

Un ejemplo real que no falla ;)

Vamos a modificar el ejemplo anterior usando esta nueva colección, para observar su comportamiento en el entorno anterior.

1 – Agregaremos un ConcurrentDictionary después de la declaración del anterior:

ConcurrentDictionary<int, Customer> dic2 =
    new ConcurrentDictionary<int, Customer>();

2 – Lo rellenamos en el constructor, al igual que hacíamos antes. Aquí podemos ver la primera diferencia, ya que usamos uno de los nuevos métodos TryXXX:

Customer.GetSampleCustomers(100000).ForEach(c => dic2.TryAdd(c.CustomerId, c));

3 – A continuación vamos a crear los métodos equivalentes para eliminar y modificar el elemento en la colección:

private void deleteItem2(int id)
{
    Customer c;
    if(!dic2.TryRemove(id, out c)) Console.WriteLine("Error deleting!");
}

private void updateItem2(int id)
{
    Customer c;
    if (!dic2.TryGetValue(id, out c))
    {
        Console.WriteLine("Error getting value");
    }
    else
    {
        Customer newc = new Customer() { CustomerId = c.CustomerId, Name = c.Name, Age = c.Age };
        newc.Age++;
        if (!dic2.TryUpdate(id, newc, c)) Console.WriteLine("Error updating!");
    }
}

En el borrado usamos el método TryRemove que necesita el id del elemento a eliminar, y en caso que lo elimine con éxito, devuelve True y el objeto eliminado en el segundo parámetro out (de salida) del método. En caso que no lo elimine devuelve False y el valor default del objeto a eliminar en el segundo parámetro.

La actualización es un poco más compleja, ya que primero debemos asegurarnos de que el elemento a modificar existe en la colección mediante el método TryGetValue, que básicamente funciona igual que el método TryDelete anterior. Una vez hemos comprobado que dicho objeto existe en la colección creamos una réplica del cliente, modificamos su edad y llamamos al método TryUpdate, el cual se encarga de la modificación.

Es interesante notar que para que la modificación se realice correctamente debemos informar de la clave del objeto en la colección, el nuevo valor del objeto (en este caso la réplica del cliente a la que hemos modificado la edad) y un tercer parámetro que es el valor original del objeto, que es usado para comparar.

4 – Probemos los nuevos métodos en otro botón:

private void button2_Click(object sender, EventArgs e)
{
    for (int i = 0; i < 100; i++)
    {
        Random r = new Random(Guid.NewGuid().GetHashCode());
        int id = r.Next(1, dic1.Count);
        Task.Factory.StartNew(() => deleteItem2(id));
        Task.Factory.StartNew(() => updateItem2(id));
    }
}

Ejecutamos y si vamos a la ventana de consola podremos observar el resultado:

Error getting value
Error getting value
Error getting value
Error getting value
Error updating!
Error getting value
Error getting value
Error getting value

En la mayoría de los casos, obtenemos un error de lectura porque el elemento ya ha sido eliminado de la colección, pero en algunos casos el error se producirá en el método TryUpdate. Eso significa que la llamada a TryGetValue encuentra el elemento, pero cuando lo vamos a modificar éste ya no existe en la colección. Perfecto! :)

Conclusión

La ventaja de usar colecciones específicas en entornos multithreading es clara: Garantizan que el acceso a los recursos compartidos sea transaccional, al más puro estilo de las bases de datos. De acuerdo que su programación sea un poco más compleja, pero no tiene nada que ver con lo que se tenía que hacer antiguamente cuando no existía la TPL.

Por otro lado, el uso de estas colecciones debe hacerse sólo en aquelos casos en los que se accede a recursos compartidos, ya que si no, por un lado estamos complicando nuestro codigo y por otro lado un ConcurrentDictionary no es tan eficiente como lo és un Dictionary, sobre todo en entornos más de escritura que de lectura. Tenéis un post muy detallado al respecto del gran James (aka Black Rabbit) respecto a la diferencia de performance:

http://geekswithblogs.net/BlackRabbitCoder/archive/2010/06/09/c-4-the-curious-concurrentdictionary.aspx

Espero sacar tiempo para otro post en el que explicar otras colecciones de este namespace, como las ‘BlockingCollection’ o las ‘’ConcurrentBag’, que son colecciones más especializadas. La primera está optimizada para escenarios dónde el mismo thread produce y consume objetos, mientras que la segunda ha sido especialmente diseñada para escenarios de productor-consumidor.

Aquí tenéis un pequeño proyecto de ejemplo con el código:

https://www.dropbox.com/s/ej3u3s1v55ltb30/TPL04_Concurrency.zip

Nos leemos! :D

Volver al índice de contenidos

Parallel Series: Tasks, la 8ª maravilla

Nota: Antes de nada quiero disculparme por haber dejado sin publicar en esta serie durante tanto tiempo, de hecho casi un año. Afortunadamente no ha sido ningún problema de salud, si no el tener demasiados frentes abiertos. Ahora que parece que se van cerrando algunos pienso aprovechar para terminar la serie y embarcarme en algún otro nuevo que ya os contaré.

The show must go on

En los posts previos hemos hablado de PLINQ y de la clase Parallel, dos características que facilitan mucho la ejecución de datos en paralelo, sin embargo si alguien me preguntase cual es la característica que más me gusta de la Task Parallel Library lo tendría muy claro: El manejo de tareas asíncronas mediante la clase Task.

Durante muchos años los desarrolladores hemos tenido que lidiar -más ben pelearnos- con la multitarea y la ejecución de código asíncrono. Ya dese las primeras versiones de C# podíamos crear hilos a mano mediante la clase Thread, pero el proceso distaba mucho de ser sencillo y además adolecía de cierta complejidad para manejar cancelaciones o actualizar la interfaz de usuario. Y aunque han habido intentos de mejora como la interfaz IAsyncResult también han aparecido engendros infumables como el BackgroundWorker, el cual es tan malo como hacerse cirugía cerebral con manoplas uno mismo.

Task al rescate!

Así que cuando apareció la TPL y la clase Task los desarrolladores encontramos por fin un método simple y cómodo para ejecutar código asíncrono y además en paralelo. Y esto es realmente muy importante ya que hoy en día en muchas aplicaciones (al menos las que están bien diseñadas) se ejecutan tareas en paralelo para acceder a recursos externos o ‘costosos’, bases de datos, y sobre todo para actualizaciones de la interfaz de usuario. De hecho es tan importante que será una de las mejoras más importantes en la siguiente versión de C# 5.0 de la cual hablaremos al final de la serie.

TaskClass

Actions everywhere

Al igual que la clase estática Parallel y gran parte de la TPL, la clase Task se basa en acciones, de modo que si no las controlas demasiado dale una ojeada al post que publiqué hace un tiempo sobre el tema.

En su sintaxis más básica, se puede utilizar de este modo:

var t = new Task(() => {
    Thread.Sleep(1000);
    Console.WriteLine("A");
    });
t.Start();
Console.WriteLine("B");

O lo que es lo mismo:

Task.Factory.StartNew(() => {
    Thread.Sleep(1000);
    Console.WriteLine("A");
    });
Console.WriteLine("B");

La única diferencia es que en la primera declaramos la variable especificando la acción a ejecutar y luego la ejecutamos explícitamente mediante su método ‘Start’, y en la segunda no utilizamos ninguna variable, sólo especificamos y ejecutamos la acción mediante el método ‘StartNew’ de la clase ‘Task.Factory’.

Observando el código anterior, qué os pensáis que se escribirá primero? A o B? Evidentemente B, ya que podemos imaginar cómo el código no se detiene en el Thread.Sleep(1000) (éste se ejecuta en otro Thread) de modo que ejecuta inmediatamente el print ‘B’. Correcto.

Pero las tareas son mucho más, permiten desde devolver resultados hasta manejar éstas ‘unidades de trabajo’ encadenando tareas a continuación de otras, esperando a que terminen una o un grupo de tareas antes de ejecutar otra, cancelar una tarea o propagar excepciones entre ellas.

1) Devolver un valor desde una tarea…

Como en un método, una tarea puede devolver desde un tipo básico (int, string) hasta cualquier tipo complejo. Para hacer el ejemplo más interesante vamos a utilizar un método que examina la red local en busca de servidores SQL Server y devuelve una lista de strings:

public static List<string> GetSQLServerNames()
{
    List<string> sqlservernames = new List<string>();
    sqlservernames.Add("local");
    SqlDataSourceEnumerator enumSQLServers = SqlDataSourceEnumerator.Instance;
    foreach (DataRow server in enumSQLServers.GetDataSources().Rows)
    {
        if (server["InstanceName"] is System.DBNull)
            sqlservernames.Add(server["ServerName"].ToString());
        else
            sqlservernames.Add(
                string.Format("{0}\\{1}", server["ServerName"], server["InstanceName"]));
    }
    Console.WriteLine("end get servers");
    return sqlservernames;
}

La ventaja de usar este método es que tarda unos segundos en ejecutarse, con lo cual es un candidato perfecto para ser ejecutado de forma asíncrona:

var getServersAsync = new Task<List<string>>(() => GetSQLServerNames());
getServersAsync.Start();
Console.WriteLine("end call");

Si lanzamos este código observaremos que se imprime ‘end call’ inmediatamente, y tarda unos segundos en imprimir ‘end get servers’. Realmente se está ejecutando asíncronamente!

2) …y al terminar continuar con otra tarea

Ahora supongamos que tenemos otro método que se encarga de actualizar la interfaz de usuario  a partir de la lista anterior:

private void updateServersListUI(List<string> servers)
{
    comboBox1.Items.Clear();
    servers.ForEach(p => comboBox1.Items.Add(p));
}

¿No sería lógico que lo hiciese al terminar la tarea anterior? Pues la verdad es que si, y encadenar tareas es algo trivial y que ofrece mucha potencia al desarrollador. De hecho estoy seguro que ya se os ha ocurrido alguna aplicación ;)

Encadenar tareas es tan sencillo como utilizar el método ‘ContinueWith’:

var getServersAsync = new Task<List<string>>(() => GetSQLServerNames());
getServersAsync.Start();
Console.WriteLine("end button2");
getServersAsync.ContinueWith((p) => updateServersListUI(getServersAsync.Result));

Sobre el papel debería funcionar pero no lo va a hacer, ya que hasta ahora no nos hemos fijado en un detalle: En la plataforma .NET no es posible actualizar un control desde otro hilo distinto al que lo ha creado, y toda la interfaz de usuario se crea en el main thread o hilo principal. Y esta limitación todavía existe.

Estableciendo el contexto de ejecución

Antes de la TPL para actualizar la interfaz de usuario desde otro hilo debíamos utilizar el método ‘Invoke’ de la clase ‘Control’, de modo que deberíamos modificar el método anterior de este modo:

private void updateServersListUI(List<string> servers)
{
    if (this.InvokeRequired) this.Invoke(new Action(() =>
    {
        comboBox1.Items.Clear();
        servers.ForEach(p => comboBox1.Items.Add(p));
    }));
}

Pero no va a ser necesario, ya que como parte de la magia de la TPL se nos ofrece la posibilidad de llamar a ‘TaskScheduler.FromCurrentSynchronizationContext’ que nos permite acceder a la interfaz de usuario de forma segura. Así pues lo único que hay que modificar el código anterior es encadenar la segunda tarea usando el contexto de sincronización antes mencionado y olvidarnos de la llamada a Invoke:

getServersAsync.ContinueWith((p) => updateServersListUI(getServersAsync.Result),
TaskScheduler.FromCurrentSynchronizationContext());

Esperando la ejecución de varias tareas

Otra característica muy interesante es la posibilidad de esperar a que termine un grupo de tareas entero, o sólo una de ellas. Supongamos que tenemos unja serie de tareas que se encargan de aplicar unos efectos a una serie de imágenes:

var t1 = Task.Factory.StartNew(
    () => pictureBox1.Image = Properties.Resources.Landscape08.Invert());
var t2 = Task.Factory.StartNew(
    () => pictureBox2.Image = Properties.Resources.Landscape08.Grayscale());
var t3 = Task.Factory.StartNew(
    () => pictureBox3.Image = Properties.Resources.Landscape08.Brightness(140));
var t4 = Task.Factory.StartNew(
    () => pictureBox4.Image = Properties.Resources.Landscape08.Contrast(80));
var t5 = Task.Factory.StartNew(
    () => pictureBox5.Image = Properties.Resources.Landscape08.Gamma(1, 5, 1));
var t6 = Task.Factory.StartNew(
    () => pictureBox6.Image = Properties.Resources.Landscape08.Color(255, 0, 0));

Podría ser interesante no seguir ejecutando el código hasta que la primera termine, o hasta que terminen todas, o hasta que terminen todas pero con un timeout. O sea, que si no terminan todas en 100 milisegundos seguir con la ejecución:

Task.WaitAny(new Task[] {t1, t2, t3, t4, t5, t6});
Task.WaitAll(new Task[] {t1, t2, t3, t4, t5, t6});
Task.WaitAll(new Task[] {t1, t2, t3, t4, t5, t6}, 100);

Otra forma de conseguir esto es mediante la creación de una tarea, especificando que debe esperar a que se complete alguna tarea o todas las especificadas:

var t7 = Task.Factory.ContinueWhenAll(new[] { t1, t2, t3, t4, t5, t6 }, (t) =>
    {
    //DoSomething...
    });

Cancelando tareas

Al igual que en posts anteriores, la clase Task también admite cancelaciones, y a mi juicio suelen ser más utilizadas que sus equivalentes en PLINQ o Parallel, ya que pueden permitir a un usuario cancelar el acceso a un recurso que tarda más de lo previsto (una URL p.e.) y las tareas que debían ejecutarse a continuación.

Partiendo de la base de un método que hace un trabajo largo:

private void DoALongWork(CancellationTokenSource cs)
{
    try
    {
        for (int i = 0; i < 100; i++)
        {
            Thread.Sleep(10);
            cs.Token.ThrowIfCancellationRequested();
        }
    }
    catch (OperationCanceledException ex)
    {
        Console.WriteLine(ex.Message);
    }
}

Podemos cancelar la tarea llamando al método ‘Cancel’ del token:

var cs = new CancellationTokenSource();
var t = Task.Factory.StartNew(
    () => DoALongWork(cs)
    );
Thread.Sleep(500);
cs.Cancel();

Y al cancelarse el código entrará en el catch del método ‘DoALongWork’ en el que se realizarán las acciones apropiadas a la cancelación.

Cancelación y estado

Para terminar, en algunos casos puede ser interesante que en caso de éxito la tarea llame a una segunda, pero en cambio si se cancela llame a una tercera. Para poder hacer esto debemos preguntar por el estado de la primera tarea y realizar un pequeño truco: La excepción no debe ser interceptada en un bloque try!

Así pues el código del método quedaría de este modo (sin try):

private void DoALongWork(CancellationTokenSource cs)
{
    for (int i = 0; i < 100; i++)
    {
        Thread.Sleep(10);
        cs.Token.ThrowIfCancellationRequested();
    }
}

Y la llamada de este otro:

var cs = new CancellationTokenSource();
var t = Task.Factory.StartNew(
    () => DoALongWork(cs)
    );
Thread.Sleep(500);
cs.Cancel();

t.ContinueWith(task =>
    {
        switch (task.Status)
        {
            case TaskStatus.Faulted:
                MessageBox.Show("Fail");
                break;
            case TaskStatus.RanToCompletion:
                MessageBox.Show("Success");
                break;
        }
    }, TaskScheduler.FromCurrentSynchronizationContext());

Como podéis observar, podemos cambiar el flujo del código en función del estado de la tarea.

Importante: Para poder ejecutar este último ejemplo es necesario ejecutar sin depuración (Ctrl+F5)

Bueno, nos hemos dejado algunas cosas en el tintero, pero para no hacer muy largo éste post las veremos más adelante, en otros posts avanzados de la misma serie.

Hasta entonces prometo actualizar la serie con más frecuencia que hasta ahora ;)

Volver al índice de contenidos

Parallel Series: La clase estática Parallel

3 métodos para los reyes elfos bajo el cielo

Hoy quiero hablaros de la clase estática Parallel. Esta clase provee soporte para paralelizar bucles y regiones, y al igual que PLINQ su uso es muy sencillo. Cabe destacar que está especialmente optimizada para iteraciones, y que en este contexto se desenvuelve un poco mejor que PLINQ. No hay una diferencia significativa en tiempos absolutos, pero puede verse perfectamente si utilizamos el magnífico profiler de Visual Studio 2010. No obstante, pueden existir situaciones en las que si se necesita afinar mucho el rendimiento en iteraciones, y aquí es dónde tiene más sentido utilizar dos de los tres métodos de esta clase: For y ForEach. Al tercero lo llamaremos Cirdan y apenas aparecerá en esta historia (en realidad me refiero a Invoke pero tampoco aparecerá por aquí).

Parallel_Class

Comprendiendo las acciones

Los dos métodos tienen una firma muy similar en su forma más sencilla. Ambos iteran sobre una serie de instrucciones realizando n veces cada una de ellas. Y aquí es dónde vemos aparecer los parámetros de tipo Action:

public static ParallelLoopResult For
    (int fromInclusive, int toExclusive, Action<int> body)
public static ParallelLoopResult ForEach<TSource>
    (IEnumerable<TSource> source, Action<TSource> body)

Un Action<T>, al igual que su hermano Func<T> es uno de los elementos de C# importados de la programación funcional, y desde el momento en que uno se acostumbra a usarlo, cuesta pensar cómo ha podido desarrollar toda su vida anterior. Si no, los que estéis acostumbrados a usar expresiones lambda en LINQ, imagináos que desaparecen de un día para otro.

No quiero empezar a divagar ahora sobre programación funcional, aunque si que quiero hacer incapié en el uso de Actions y lo importantes que se han vuelto en los últimos años. De hecho, recientemente he dedicado un post a cómo Action y Func han simplificado mucho el trabajo con delegados a los desarrolladores.

El método Parallel.For

Pero volviendo al tema que nos ocupa, si observamos la firma del método Parallel.For podremos ver que en lo importante no difiere demasiado de su homólogo for de toda la vida: Ambos tienen un inicio, un final y unas acciones a realizar un número determinado de veces.

Así que partiendo del método IsPrime que ya utilizamos en el anterior post sobre PLINQ, vamos a ver una comparativa entre las sintaxis de éstos dos métodos:

for (int i = 0; i < 100; i++)
{
    if(i.IsPrime())
        Console.WriteLine(string.Format("{0} es primo", i));
    else
        Console.WriteLine(string.Format("{0} no es primo", i));
}

Parallel.For(0, 100, (i) =>
    {
        if (i.IsPrime())
            Console.WriteLine(string.Format("{0} es primo", i));
        else
            Console.WriteLine(string.Format("{0} no es primo", i));
    });

En ambos  casos tenemos una serie de líneas que deben ejecutarse 100 veces. Concretamente desde 0 hasta 99, ya que el elemento superior no se incluye en ninguno de los dos casos. Sólo se ve un poco extraño el uso del Action<T>, pero podéis pensar en que la variable int i del primer bucle for, aquí se transforma en la parte (i) a la izquierda de la expresión lambda (=>). Y las acciones a ejecutar del primer for son exactamente iguales y van a la derecha de la expresión lambda.

So, let’s parallelize!

Viéndolo de este modo debe resultar extremadamente sencillo transformar todos nuestros bucles de este modo, así que ¿debemos hacerlo? La respuesta es NO.

En algunas ocasiones no vamos a obtener rendimiento por el hecho de paralelizar, ya que si el trabajo a realizar es mínimo, tardaremos más tiempo en dividir el trabajo en distintos threads, ejecutarlos y consolidar la información que en ejecutar la tarea sin paralelizar. También podría ser que nos encontrásemos un cuello de botella externo en un dispositivo de I/O, como un puerto, un servidor remoto o un socket.

Otro claro ejemplo de esto son los bucles anidados. Es común anidar varias estructuras for o foreach para realizar ciertos algoritmos. En este caso el candidato a ser paralelizado siempre es el bucle exterior y no es necesario (de hecho sería contraproducente) paralelizar los bucles internos:

Parallel.For(0, 100, (z) =>
    {
        for (int i = 0; i < 100; i++)
        {
            if (i.IsPrime())
                Console.WriteLine(string.Format("{0} es primo", i));
            else
                Console.WriteLine(string.Format("{0} no es primo", i));
        }
    });

Por lo pronto resulta bastante evidente, ya que si paralelizamos en bucle exterior necesitaríamos un ordenador con 100 cores y evidentementemente todavía no existen, así que la TPL tiene que agrupar estas tareas para adaptarlas a los cores disponibles, tardando cierto tiempo en hacer la sincronización (parecido a los primeros ejemplos con monos de la serie). Imagináos entonces si paralelizamos ambos bucles: 100 x 100 = 10.000 cores? Simplemente no tiene sentido.

Mi consejo es que en todos los casos en los que se decida paralelizar un bucle (y esto también vale para las consultas PLINQ) se realice primero una comparativa de rendimiento.

ParallelFor_Profiling

El método Parallel.ForEach

En cuanto al método ForEach es prácticamente igual al anterior con la salvedad que no tenemos un inicio y un final, sino una secuencia de entrada de datos (basada en IEnumerable, como PLINQ) y una variable que usamos para iterar por cada uno de los elementos de la secuencia y realizar una serie de acciones.

Consideremos el siguiente código:

List<FeedDefinition> feeds = new List<FeedDefinition>();
clock.Restart();
var blogs = FeedsEngine.GetBlogsUrls();
foreach (var blog in blogs)
{
    feeds.AddRange(FeedsEngine.GetBlogFeeds(blog));
}
clock.Stop();
this.Text = clock.ElapsedMilliseconds.ToString("n2");
feeds.ForEach(p => Console.WriteLine(p.Name));

Suponiendo que tenemos un método FeedsEngine.GetBlogsUrls que devuelve una lista de urls de proporcionan contenido RSS, el código anterior se conecta a cada una de las urls e intenta descargar toda la información de los posts mediante un método FeedsEngine.GetBlogFeeds(blog).

Nota: El código completo lo podréis encontrar en el post (todavía no publicado) ‘Código de ejemplo de las Parallel Series’, que contiene todos los ejemplos de todos los posts de la serie.

Como podéis imaginar este proceso totalmente secuencial es un serio candidato a ser paralelizado, ya que la mayoría del tiempo de este proceso es tiempo desperdiciado intentando a conectar con un servidor externo y que éste responda a las peticiones. En este caso paralelizar va a ser de gran ayuda aunque es importante comprender que en este caso la ganancia de rendimiento no va a ser por usar más potencia local, sino por lanzar las peticiones a los distintos servidores de forma asíncrona.

Así pues, basta cambiar la parte del bucle foreach por su versión paralelizada:

Parallel.ForEach(blogs, (blog) =>
    {
        feeds.AddRange(FeedsEngine.GetBlogFeeds(blog));
    });

En la que definimos la secuencia de datos a utilizar y declaramos la variable blog al vuelo (el compilador infiere el tipo automáticamente) a la izquierda de la expresión lambda, y a la derecha las acciones que deseamos realizar, que son exactamente iguales a la anterior versión foreach.

Y comprobaremos como se ejecuta mucho más rápido. En mi estación de trabajo pasamos de 6,7 segundos a 1,4 lo que no está nada mal.

Explorando más opciones

En la clase Parallel al igual que en las consultas PLINQ, existe la posibilidad de especificar el grado de paralelismo así como de cancelar la ejecución de un bucle. Sólo debemos usar una de las sobrecargas que utiliza un objeto de tipo ParallelOptions.

private void button11_Click(object sender, EventArgs e)
{
    CancellationTokenSource cs = new CancellationTokenSource();
    var cores = Environment.ProcessorCount;
    clock.Restart();
    var options = new ParallelOptions() {
        MaxDegreeOfParallelism = cores / 2,
        CancellationToken= cs.Token };
    try
    {
        Parallel.For(1, 10, options,
            (i) =>
            {
                dowork_cancel(i, cs);
            });
    }
    catch (Exception ex)
    {
        MessageBox.Show(ex.Message);
    }
    clock.Stop();
    this.Text = clock.ElapsedMilliseconds.ToString("n2");
}

void dowork_cancel(int i, CancellationTokenSource cs)
{
    Thread.Sleep(1000);
    if (i == 5) cs.Cancel();
}

En el caso anterior especificamos un grado de paralelización de la mitad del número de cores y preparemos la consulta para su posible cancelación (algo que simulamos en el interior del método dowork_cancel al llegar el contador a 5).

Próximanante en sus pantallas…

Todavía no he terminado ni la mitad de los posts de esta serie y ya estoy viendo que sería muy interesante ampliarla, mostrando ciertas características avanzadas que -por extensión- no quiero incluir en estos posts, más introductorios.

Más adelante veremos cómo salir o parar (no es lo mismo) un bucle parallelizado mediante un objeto ParallelLoopState, lidiar con variables compartidas o inicializar variables locales a cada partición. Pero eso lo dejamos para los posts avanzados de las Parallel Series.

Volver al índice de contenidos

Parallel Series: Parallel LINQ (PLINQ)

LINQ power!

Creo que estaremos todos de acuerdo en que LINQ ha supuesto una revolución en la forma de desarrollar, y ha hecho que muchos desarrolladores de otros lenguajes nos miren con cierto tono de envidia… E incluso que otras plataformas estén haciendo serios esfuerzos para incorporarlo en sus Frameworks :-)

Ahora, con la llegada de la Task Parallel Library, se abre un mundo de posibilidades gracias a PLINQ, que permite -de forma extremadamente sencilla- convertir cualquier consulta LINQ secuencial en una consulta paralelizable, permitiendo su segmentación y ejecución en los distintos cores en paralelo.

Es decir, cualquier consulta LINQ como ésta, en la que tenemos un array llamado numbers y un método IsPrime que devuelve un valor boolean en función de si un número es primo:

var query =
    from n in numbers
    where n.IsPrime()
    select n;

Puede ser paralelizada simplemente agregando esto:

var query =
    from n in numbers.AsParallel()
    where n.IsPrime()
    select n;

Método IsPrime:

public static class BaseTypesExtensions
{
    public static bool IsPrime(this int n) //1 = false, 2 = true, 3 = true...
    {
        if (n <= 1) return false;
        if ((n & 1) == 0)
        {
            if (n == 2) return true;
            else return false;
        }
        for (int i = 3; (i * i) <= n; i += 2)
        {
            if ((n % i) == 0) return false;
        }
        return n != 1;
    }
}

Partiendo de que el array de números contiene 10 millones de números enteros, y de que mi estación de trabajo actual tiene un procesador i7 con 8 cores, el resultado es abrumador:

La consulta LINQ tarda  5,2 segundos frente a los 1,3 segundos de la segunda.

Es decir, casi 4 segundos menos o un 400% más rápido.

¿Dónde está la magia?

La magia verdadera es que PLINQ nos abstrae de todo el proceso de paralelización, creación de tareas, threads, sincronización y consolidación de los datos.

Y además creo que lo hace de forma muy elegante :-)

Como ya sabemos, las consultas LINQ to objects se basan en IEnumerable<T> (gracias Generics!) que expone un enumerador para recorrer los elementos de una secuencia de elementos de tipo T. Esto hace que todas las colecciones que puedan devolverse en este tipo de consultas (IOrderedEnumerable, IQueryable, etc.) implementen esta interfaz. Hasta aquí nada nuevo bajo el sol.

Sin embargo, en la consulta PLINQ al utilizar el método extensor AsParallel() estamos transformando la secuencia de entrada de IEnumerable<T> a ParallelQuery <T> permitiendo la segmentación de los elementos de la secuencia y ejecutando cada uno de los segmentos en un thread distinto. Y por supuesto, repartiendo el trabajo en los diversos cores (si los hay).

AsParallel

La secuencia de entrada se particiona y se manda por fragmentos a distintos threads que invocan al método IsPrime devolviendo true (T) o false (F), y posteriormente los consolida en una secuencia de salida que puede ser consumida.

No obstante, el hecho de paralelizar el trabajo no garantiza que el resultado sea devuelto en el mismo orden, ya que es posible que un thread termine antes que otro y devuelva su resultado parcial antes de lo esperado. Así que, si la ordenación de los datos de salida es importante tenemos que ir un paso más allá.

asordered

Los primeros elementos deberían ser 2, 3, 5, 7… no 59 y 71 ¿?

PLINQ y la ordenación

Para asegurar la ordenación del conjunto de resultados, basta agregar el método AsOrdered() a la consulta. Este método asegura la correcta ordenación, a costa de implementar una serie de mecanismos de sincronización. Estos mecanismos, lógicamente retardan un poco el tiempo de entrega de los resultados, pero es despreciable. En mi estación de trabajo se arrojan unos valores de 1,311 segundos sin ordenar frente a 1,344 segundos ordenados (apenas 30 milésimas). Estos resultados son la media de una serie de 50 mediciones, con lo que son bastante fiables.

Una vez modificada la consulta:

var query =
    from n in numbers.AsParallel().AsOrdered()
    where n.IsPrime()
    select n;

El resultado es claro:

asordered2

Especificar el grado de paralelización

En la mayoría de las charlas que he dado sobre la TPL se acostumbra a preguntar respecto a funciones que acceden a recursos externos (servicios, sockets, etc.). En estos casos aparece claramente un cuello de botella, y no porque una función necesite hacer uso intensivo de la CPU, sino porque debe esperar un resultado externo. Aquí suele ser interesante especificar el grado de paralelización de deseamos. Otro caso interesante para especificar el grado de paralelización puede ser el típico escenario de productor/consumidor.

Es interesante notar que al especificar el grado de paralelización no estamos forzando a que se usen n particiones, sino que simplemente estamos especificando el valor máximo:

var cores = Environment.ProcessorCount;
var query =
    from n in numbers.AsParallel().AsOrdered().
        WithDegreeOfParallelism(cores / 2)
    where n.IsPrime()
    select n;

De este modo, al definir el grado de paralelización en la mitad del número de cores del procesador nos aseguramos que (por ejemplo) podremos tener un hilo que vaya creando elementos (productor) y otro hilo que vaya consumiendo dichos elementos (consumidor).

Cancelación de una consulta PLINQ

En ocasiones, una consulta PLINQ puede ser cancelada. Bien porque durante el proceso se ha encontrado un error y ya no es necesario terminar de calcular el resto de resultados, o simplemente porque ha sido cancelada por el usuario.

Es estos casos, es necesario utilizar un token de cancelación. Este token tiene su origen en la estructura CancellationTokenSource, que representa ‘una potencial cancelación’ y proporciona los mecanismos para cancelar y comprobar el estado de una tarea asíncrona, de modo que puede utilizarse con todos los elementos de la Task Parallel Library, no sólo con PLINQ.

A continuación, vamos a modificar el código del ejemplo que hemos usado hasta ahora para simular un error y comprobar el funcionamiento de la cancelación de tareas en PLINQ. Para ello lo primero que vamos a hacer es crear una sobrecarga del método IsPrime, que reciba un parámetro de tipo CancellationTokenSource, para poder cancelar la tarea:

public static bool IsPrime(this int n, CancellationTokenSource cs)
{
    if (n == 1000) cs.Cancel();
    return IsPrime(n);
}

A modo de ejemplo, cuando el número a calcular sea 1.000 cancelaremos la tarea, de modo que no sea necesario llegar a los 10 millones. De este modo, por un lado se lanzará una excepción y por otro el tiempo en ejecutar la consulta PLINQ será mucho menor.

private void plinq_cancellable()
{
    var numbers = Enumerable.Range(1, 10000000);
    using (var cs = new CancellationTokenSource())
    {
        clock.Restart();
        var query = numbers.AsParallel().AsOrdered().
            WithCancellation(cs.Token).
            Where(p => p.IsPrime(cs));
        try
        {
            var result = query.ToList();
        }
        catch (OperationCanceledException ex)
        {
            Console.WriteLine(ex.Message);
        }
        clock.Stop();
        this.Text = clock.ElapsedMilliseconds.ToString("n2");
    }
}

Por un lado tenemos que tener la precaución de envolver la consulta dentro de un bloque try-catch (en este caso sólo la llamada a ToArray() que es realmente cuando se ejecuta la consulta), y por el otro especificamos que la consulta puede ser cancelada mediante WithCancellation. A continuación creamos un objeto de tipo CancellationTokenSource para administrar la cancelación de esta consulta. Este objeto será el que finalmente pasemos al método IsPrime() y en caso que se cancele provocará que su propiedad IsCancellationRequested devuelva true y que se produzca una bonita excepción de tipo OperationCanceledException.

WithCancellation

Limitaciones de PLINQ

No quiero extenderme mucho más porque creo que hay material suficiente para hacer un post más adelante sobre temas avanzados. Sin embargo quiero dejar claro que existen algunas limitaciones en PLINQ, como el uso de algunos operadores (Take, SkipWhile) y de las versiones indexadas de Select o ElementAt.

Además existen otros casos en los que por cuestiones de rendimiento no es recomendable usar PLINQ en todos los casos, debido al sobrecoste que puede llegar a ocasionar, como el uso de Join, Union o GroupBy. Sin embargo, trataremos éstas cuestiones más adelante.

Próximamente veremos cómo utilizar la clase estática Parallel, optimizada para trabajar con procesos iterativos, esos típicos bucles que todas las aplicaciones tienen.

Volver al índice de contenidos

Parallel Series: El Alfa

Prólogo

Cada versión del .NET framework nos sorprende con una serie de novedades, y para cada uno de nosotros hay -al menos- una que nos roba el corazón. A mi me sucedió con la aparición de Generics con el framework 2.0, los métodos extensores y LINQ en las versiones 3.0 y 3.5 y la Task Parallel Library en la versión 4.0.

Pensándolo detenidamente tampoco no es tan extraño, al fin y al cabo siempre me ha gustado la programación asíncrona (algo que si alguien le llega a contar a alguno de mis primeros maestros de tecnología digital, se habría muerto de la risa). Sin embargo, con los años éste que escribe ha llegado -más por tozudez que por talento natural- a conocer un poco los entresijos de la programación asíncrona. Una disciplina en la que tan importante es saber lo que hay que hacer, como lo que no hay que hacer.

Así pues, cuando llegó a mis manos la primera preview de Visual Studio 2010, una de las primeras cosas que hice fue mirar que demonios era eso de la Task Parallel Library. Primero porque ya hacía un tiempo que había escuchado acerca de la muerte anunciada de la ley de Moore. Y segundo porque cualquier cosa que hiciese más llevadero el trabajo de realizar y depurar aplicaciones multihilo, bienvenido iba a ser.

tpl

Desde entonces hasta ahora he tenido la suerte de poder dedicar un poco de tiempo a esta maravilla, de modo que me he propuesto crear una serie bastante larga de posts y vídeos sobre el tema. Más que nada porque a mi juicio hay poca documentación (al menos en Español) y algo así no puede quedar relegado al olvido. Y es que en multitud de ocasiones que cuando le explico a alguien el porqué es importante y en que consiste la TPL, acostumbra a decir: “Que guapo! Pues no tenía ni idea…”. Y eso, me mata.

Nacen las Parallel Series

En los próximos posts voy a ir definiendo el índice de contenidos de la serie, aunque a medida que crezca la serie es más que probable que vaya siendo modificado y ampliado. También veremos algo de historia para ponernos en contexto, aclararemos conceptos base que van a ser necesarios más adelante, e iremos desgranando los apartados principales de la Task Parallel Library.

La idea es empezar desde abajo e ir subiendo de nivel, hasta llegar a los aspectos más complejos, como los problemas de concurrencia o la depuración de este tipo de aplicaciones. En cada apartado prometo poner al menos un ejemplo y si puedo más, porque los humanos -y los developers heredamos de esta clase base- aprendemos mucho mejor los conceptos teóricos si van acompañados de la práctica.

Nos vemos muy pronto ;-)

Channel9ParallelSeries1

Volver al índice de contenidos