Problemas de concurrencia

Why concurrent collections?

En un mundo en el que los procesos ya no son secuenciales sino paralelos, es cada vez más posible encontrarnos con problemas de concurrencia al acceder a recursos compartidos. Conceptualmente hablando, esto es algo a los que los desarrolladores ya estamos acostumbrados cuando trabajamos con gestores de bases de datos como Oracle o SQL Server, ya que varios usuarios pueden acceder o modificar la información al mismo.

Sin embargo, la gran mayoría de los desarrolladores pocas veces hemos tenido que lidiar con bloqueos en colecciones en menoria, ya que no todo el mundo crea aplicaciones en las que varios threads acceden a recursos compartidos. De hecho, si alguna vez has lo tenido que hacer sabrás perfectamente que antes de la aparición de la TPL era una de las disciplinas más complejas dentro del desarrollo de software. Algo que favorece la calvície :)

Sin embargo, desde la aparición de la TPL en el .NET Framework 4.0 es mucho más sencillo desarrollar aplicaciones que ejecuten procesos en paralelo o de forma asíncrona, pero esto conlleva que en ocasiones nos olvidemos que hay algunos threads que se ejecutan al mismo tiempo, y esto podría llevar a producir efectos no deseados cuando se trata de acceder a recursos compartidos, como una colección de elementos en memoria.

Por ejemplo: supongamos un escenario en el que tenemos una lista de clientes en memoria y un par de tareas (Task) que acceden a esa colección. La primera tarea podría estar verificando si todos los clientes cumplen una condición X, y si no la cumplen eliminar el cliente de la colección. Mientras tanto la segunda tarea podría estar actualizando alguna propiedad de los clientes, como la edad.

Un escenario real

Encierra este escenario algún peligro? A priori podemos pensar que no. Basta con que la segunda tarea verifique si el cliente existe en la colección antes de actualizarlo. Seguro? Pues no. Al menos no basta si la colección que estamos usando no es una de las nuevas definidas dentro del namespace System.Collections.Concurrent. Y para verlo mejor, hagamos un pequeño ejemplo con código, que es lo que nos gusta a todos.

1 – Partiremos de una clase base Customers con un método que crea n objetos de ejemplo:

public class Customer
{
    public int CustomerId { get; set; }
    public string Name { get; set; }
    public int Age { get; set; }

    public static List<Customer> GetSampleCustomers(int n)
    {
        Random r = new Random(Guid.NewGuid().GetHashCode());
        List<Customer> customers = new List<Customer>();
        for (int i = 0; i < n; i++)
        {
            customers.Add(new Customer()
            {
                CustomerId = i,
                Name = string.Format("Customer {0}", i),
                Age = r.Next(20, 80)
            });
        }
        return customers;
    }
}

2 – A continuación en un formulario declararemos e inicializaremos una colección de tipo Dictionary para almacenar estos objetos cliente:

Dictionary<int, Customer> dic1 = new Dictionary<int,Customer>();

3 – Y en el constructor la rellenamos con 100.000 clientes de ejemplo:

public Form1()
{
    InitializeComponent();
    dic1 = Customer.GetSampleCustomers(100000).ToDictionary(p => p.CustomerId);
}

4 – Ahora, vamos a complicar un poco más el ejemplo y crearemos dos métodos que simulen el borrado y la actualización de los objetos que contiene la colección. Luego llamaremos a éstos métodos para 100 objetos aleatorios de forma paralela para ver que efectos se producen:

private void deleteItem1(int id)
{
    if (dic1.ContainsKey(id) && dic1[id].Age < 30) dic1.Remove(id);
}

private void updateItem1(int id)
{
    if (dic1.ContainsKey(id)) dic1[id].Age++;
}

private void button1_Click(object sender, EventArgs e)
{
    for (int i = 0; i < 100; i++)
    {
        Random r = new Random(Guid.NewGuid().GetHashCode());
        int id = r.Next(1, dic1.Count);
        Task.Factory.StartNew(() => deleteItem1(id));
        Task.Factory.StartNew(() => updateItem1(id));
    }
}

Ejecutamos la aplicación y cuendo pulsemos el botón puede pasar que obtengamos un bonito error como este:

image

Concretamente en la línea que incrementa la edad del cliente:

image

Y digo que puede pasar, pero no es seguro. Tal vez tengamos que repetir el proceso varias veces, y es que en un entorno en el que varios threads acceden a un recurso compartido nada nos asegura que se produzca el error. A veces pasa a la primera y a veces a la cuarta, pero hay que tener presente que cuando esté en producción siempre pasará en viernes por la tarde cinco minutos antes de empezar el fin de semana ;)

Analicemos el porqué del error, por que ¿cómo es posible que no encuentre el elemento en la colección si precisamente en la línea anterior verificamos su existencia? Pues ahí está la gracia, que lo verificamos en la línea anterior. A nosotros -pobres developers- nos parece que la línea anterior en la que verificamos si existe y la línea actual en la que lo emininamos van seguidas una detrás de otra, pero realmente en un entorno asíncrono hay todo un mundo entre ambas. Y eso es porque ambas líneas se ejecutan de forma separada, sin ningún tipo de bloqueo, no existe Transaccionalidad al estilo de las bases de datos, de modo que mientras el primer thread verifica que existe ese elemento en la colección y lo borra, llega el segundo thread y… ZASCA! Lo elimina.

image

El nuevo ConcurrentDictionary

Para estos casos (y muchos otros) aparecen en escena un nuevo conjunto de colecciones especializadas en entornos multithreading. Una de las más populares es una variación del clásico Dictionary, el ConcurrentDictionary. Esta clase proporciona una série de métodos alternativos para agregar, modificar o eliminar elementos llamados TryAdd, TryUpdate o TryRemove. La ventaja de éstos métodos es que son transaccionales y proporcionan Atomicidad. Es decir, garantizan que en caso de que se invoquen, se ejecuten totalmente: O bien se añade/modifica/elimina el elemento de la colección o bien se devuelve el valor False porque no se ha podido realizar la operación. También proporciona un método TryGetValue que intenta obtener un elemento de la colección. Todos estos métodos implementan en su interior bloqueos (lock), para asegurar que nadie más accede al elemento mientras se realiza la operación solicitada.

Un ejemplo real que no falla ;)

Vamos a modificar el ejemplo anterior usando esta nueva colección, para observar su comportamiento en el entorno anterior.

1 – Agregaremos un ConcurrentDictionary después de la declaración del anterior:

ConcurrentDictionary<int, Customer> dic2 =
    new ConcurrentDictionary<int, Customer>();

2 – Lo rellenamos en el constructor, al igual que hacíamos antes. Aquí podemos ver la primera diferencia, ya que usamos uno de los nuevos métodos TryXXX:

Customer.GetSampleCustomers(100000).ForEach(c => dic2.TryAdd(c.CustomerId, c));

3 – A continuación vamos a crear los métodos equivalentes para eliminar y modificar el elemento en la colección:

private void deleteItem2(int id)
{
    Customer c;
    if(!dic2.TryRemove(id, out c)) Console.WriteLine("Error deleting!");
}

private void updateItem2(int id)
{
    Customer c;
    if (!dic2.TryGetValue(id, out c))
    {
        Console.WriteLine("Error getting value");
    }
    else
    {
        Customer newc = new Customer() { CustomerId = c.CustomerId, Name = c.Name, Age = c.Age };
        newc.Age++;
        if (!dic2.TryUpdate(id, newc, c)) Console.WriteLine("Error updating!");
    }
}

En el borrado usamos el método TryRemove que necesita el id del elemento a eliminar, y en caso que lo elimine con éxito, devuelve True y el objeto eliminado en el segundo parámetro out (de salida) del método. En caso que no lo elimine devuelve False y el valor default del objeto a eliminar en el segundo parámetro.

La actualización es un poco más compleja, ya que primero debemos asegurarnos de que el elemento a modificar existe en la colección mediante el método TryGetValue, que básicamente funciona igual que el método TryDelete anterior. Una vez hemos comprobado que dicho objeto existe en la colección creamos una réplica del cliente, modificamos su edad y llamamos al método TryUpdate, el cual se encarga de la modificación.

Es interesante notar que para que la modificación se realice correctamente debemos informar de la clave del objeto en la colección, el nuevo valor del objeto (en este caso la réplica del cliente a la que hemos modificado la edad) y un tercer parámetro que es el valor original del objeto, que es usado para comparar.

4 – Probemos los nuevos métodos en otro botón:

private void button2_Click(object sender, EventArgs e)
{
    for (int i = 0; i < 100; i++)
    {
        Random r = new Random(Guid.NewGuid().GetHashCode());
        int id = r.Next(1, dic1.Count);
        Task.Factory.StartNew(() => deleteItem2(id));
        Task.Factory.StartNew(() => updateItem2(id));
    }
}

Ejecutamos y si vamos a la ventana de consola podremos observar el resultado:

Error getting value
Error getting value
Error getting value
Error getting value
Error updating!
Error getting value
Error getting value
Error getting value

En la mayoría de los casos, obtenemos un error de lectura porque el elemento ya ha sido eliminado de la colección, pero en algunos casos el error se producirá en el método TryUpdate. Eso significa que la llamada a TryGetValue encuentra el elemento, pero cuando lo vamos a modificar éste ya no existe en la colección. Perfecto! :)

Conclusión

La ventaja de usar colecciones específicas en entornos multithreading es clara: Garantizan que el acceso a los recursos compartidos sea transaccional, al más puro estilo de las bases de datos. De acuerdo que su programación sea un poco más compleja, pero no tiene nada que ver con lo que se tenía que hacer antiguamente cuando no existía la TPL.

Por otro lado, el uso de estas colecciones debe hacerse sólo en aquelos casos en los que se accede a recursos compartidos, ya que si no, por un lado estamos complicando nuestro codigo y por otro lado un ConcurrentDictionary no es tan eficiente como lo és un Dictionary, sobre todo en entornos más de escritura que de lectura. Tenéis un post muy detallado al respecto del gran James (aka Black Rabbit) respecto a la diferencia de performance:

http://geekswithblogs.net/BlackRabbitCoder/archive/2010/06/09/c-4-the-curious-concurrentdictionary.aspx

Espero sacar tiempo para otro post en el que explicar otras colecciones de este namespace, como las ‘BlockingCollection’ o las ‘’ConcurrentBag’, que son colecciones más especializadas. La primera está optimizada para escenarios dónde el mismo thread produce y consume objetos, mientras que la segunda ha sido especialmente diseñada para escenarios de productor-consumidor.

Aquí tenéis un pequeño proyecto de ejemplo con el código:

https://www.dropbox.com/s/ej3u3s1v55ltb30/TPL04_Concurrency.zip

Nos leemos! :D

Volver al índice de contenidos

Parallel Series: Tasks, la 8ª maravilla

Nota: Antes de nada quiero disculparme por haber dejado sin publicar en esta serie durante tanto tiempo, de hecho casi un año. Afortunadamente no ha sido ningún problema de salud, si no el tener demasiados frentes abiertos. Ahora que parece que se van cerrando algunos pienso aprovechar para terminar la serie y embarcarme en algún otro nuevo que ya os contaré.

The show must go on

En los posts previos hemos hablado de PLINQ y de la clase Parallel, dos características que facilitan mucho la ejecución de datos en paralelo, sin embargo si alguien me preguntase cual es la característica que más me gusta de la Task Parallel Library lo tendría muy claro: El manejo de tareas asíncronas mediante la clase Task.

Durante muchos años los desarrolladores hemos tenido que lidiar -más ben pelearnos- con la multitarea y la ejecución de código asíncrono. Ya dese las primeras versiones de C# podíamos crear hilos a mano mediante la clase Thread, pero el proceso distaba mucho de ser sencillo y además adolecía de cierta complejidad para manejar cancelaciones o actualizar la interfaz de usuario. Y aunque han habido intentos de mejora como la interfaz IAsyncResult también han aparecido engendros infumables como el BackgroundWorker, el cual es tan malo como hacerse cirugía cerebral con manoplas uno mismo.

Task al rescate!

Así que cuando apareció la TPL y la clase Task los desarrolladores encontramos por fin un método simple y cómodo para ejecutar código asíncrono y además en paralelo. Y esto es realmente muy importante ya que hoy en día en muchas aplicaciones (al menos las que están bien diseñadas) se ejecutan tareas en paralelo para acceder a recursos externos o ‘costosos’, bases de datos, y sobre todo para actualizaciones de la interfaz de usuario. De hecho es tan importante que será una de las mejoras más importantes en la siguiente versión de C# 5.0 de la cual hablaremos al final de la serie.

TaskClass

Actions everywhere

Al igual que la clase estática Parallel y gran parte de la TPL, la clase Task se basa en acciones, de modo que si no las controlas demasiado dale una ojeada al post que publiqué hace un tiempo sobre el tema.

En su sintaxis más básica, se puede utilizar de este modo:

var t = new Task(() => {
    Thread.Sleep(1000);
    Console.WriteLine("A");
    });
t.Start();
Console.WriteLine("B");

O lo que es lo mismo:

Task.Factory.StartNew(() => {
    Thread.Sleep(1000);
    Console.WriteLine("A");
    });
Console.WriteLine("B");

La única diferencia es que en la primera declaramos la variable especificando la acción a ejecutar y luego la ejecutamos explícitamente mediante su método ‘Start’, y en la segunda no utilizamos ninguna variable, sólo especificamos y ejecutamos la acción mediante el método ‘StartNew’ de la clase ‘Task.Factory’.

Observando el código anterior, qué os pensáis que se escribirá primero? A o B? Evidentemente B, ya que podemos imaginar cómo el código no se detiene en el Thread.Sleep(1000) (éste se ejecuta en otro Thread) de modo que ejecuta inmediatamente el print ‘B’. Correcto.

Pero las tareas son mucho más, permiten desde devolver resultados hasta manejar éstas ‘unidades de trabajo’ encadenando tareas a continuación de otras, esperando a que terminen una o un grupo de tareas antes de ejecutar otra, cancelar una tarea o propagar excepciones entre ellas.

1) Devolver un valor desde una tarea…

Como en un método, una tarea puede devolver desde un tipo básico (int, string) hasta cualquier tipo complejo. Para hacer el ejemplo más interesante vamos a utilizar un método que examina la red local en busca de servidores SQL Server y devuelve una lista de strings:

public static List<string> GetSQLServerNames()
{
    List<string> sqlservernames = new List<string>();
    sqlservernames.Add("local");
    SqlDataSourceEnumerator enumSQLServers = SqlDataSourceEnumerator.Instance;
    foreach (DataRow server in enumSQLServers.GetDataSources().Rows)
    {
        if (server["InstanceName"] is System.DBNull)
            sqlservernames.Add(server["ServerName"].ToString());
        else
            sqlservernames.Add(
                string.Format("{0}\\{1}", server["ServerName"], server["InstanceName"]));
    }
    Console.WriteLine("end get servers");
    return sqlservernames;
}

La ventaja de usar este método es que tarda unos segundos en ejecutarse, con lo cual es un candidato perfecto para ser ejecutado de forma asíncrona:

var getServersAsync = new Task<List<string>>(() => GetSQLServerNames());
getServersAsync.Start();
Console.WriteLine("end call");

Si lanzamos este código observaremos que se imprime ‘end call’ inmediatamente, y tarda unos segundos en imprimir ‘end get servers’. Realmente se está ejecutando asíncronamente!

2) …y al terminar continuar con otra tarea

Ahora supongamos que tenemos otro método que se encarga de actualizar la interfaz de usuario  a partir de la lista anterior:

private void updateServersListUI(List<string> servers)
{
    comboBox1.Items.Clear();
    servers.ForEach(p => comboBox1.Items.Add(p));
}

¿No sería lógico que lo hiciese al terminar la tarea anterior? Pues la verdad es que si, y encadenar tareas es algo trivial y que ofrece mucha potencia al desarrollador. De hecho estoy seguro que ya se os ha ocurrido alguna aplicación ;)

Encadenar tareas es tan sencillo como utilizar el método ‘ContinueWith’:

var getServersAsync = new Task<List<string>>(() => GetSQLServerNames());
getServersAsync.Start();
Console.WriteLine("end button2");
getServersAsync.ContinueWith((p) => updateServersListUI(getServersAsync.Result));

Sobre el papel debería funcionar pero no lo va a hacer, ya que hasta ahora no nos hemos fijado en un detalle: En la plataforma .NET no es posible actualizar un control desde otro hilo distinto al que lo ha creado, y toda la interfaz de usuario se crea en el main thread o hilo principal. Y esta limitación todavía existe.

Estableciendo el contexto de ejecución

Antes de la TPL para actualizar la interfaz de usuario desde otro hilo debíamos utilizar el método ‘Invoke’ de la clase ‘Control’, de modo que deberíamos modificar el método anterior de este modo:

private void updateServersListUI(List<string> servers)
{
    if (this.InvokeRequired) this.Invoke(new Action(() =>
    {
        comboBox1.Items.Clear();
        servers.ForEach(p => comboBox1.Items.Add(p));
    }));
}

Pero no va a ser necesario, ya que como parte de la magia de la TPL se nos ofrece la posibilidad de llamar a ‘TaskScheduler.FromCurrentSynchronizationContext’ que nos permite acceder a la interfaz de usuario de forma segura. Así pues lo único que hay que modificar el código anterior es encadenar la segunda tarea usando el contexto de sincronización antes mencionado y olvidarnos de la llamada a Invoke:

getServersAsync.ContinueWith((p) => updateServersListUI(getServersAsync.Result),
TaskScheduler.FromCurrentSynchronizationContext());

Esperando la ejecución de varias tareas

Otra característica muy interesante es la posibilidad de esperar a que termine un grupo de tareas entero, o sólo una de ellas. Supongamos que tenemos unja serie de tareas que se encargan de aplicar unos efectos a una serie de imágenes:

var t1 = Task.Factory.StartNew(
    () => pictureBox1.Image = Properties.Resources.Landscape08.Invert());
var t2 = Task.Factory.StartNew(
    () => pictureBox2.Image = Properties.Resources.Landscape08.Grayscale());
var t3 = Task.Factory.StartNew(
    () => pictureBox3.Image = Properties.Resources.Landscape08.Brightness(140));
var t4 = Task.Factory.StartNew(
    () => pictureBox4.Image = Properties.Resources.Landscape08.Contrast(80));
var t5 = Task.Factory.StartNew(
    () => pictureBox5.Image = Properties.Resources.Landscape08.Gamma(1, 5, 1));
var t6 = Task.Factory.StartNew(
    () => pictureBox6.Image = Properties.Resources.Landscape08.Color(255, 0, 0));

Podría ser interesante no seguir ejecutando el código hasta que la primera termine, o hasta que terminen todas, o hasta que terminen todas pero con un timeout. O sea, que si no terminan todas en 100 milisegundos seguir con la ejecución:

Task.WaitAny(new Task[] {t1, t2, t3, t4, t5, t6});
Task.WaitAll(new Task[] {t1, t2, t3, t4, t5, t6});
Task.WaitAll(new Task[] {t1, t2, t3, t4, t5, t6}, 100);

Otra forma de conseguir esto es mediante la creación de una tarea, especificando que debe esperar a que se complete alguna tarea o todas las especificadas:

var t7 = Task.Factory.ContinueWhenAll(new[] { t1, t2, t3, t4, t5, t6 }, (t) =>
    {
    //DoSomething...
    });

Cancelando tareas

Al igual que en posts anteriores, la clase Task también admite cancelaciones, y a mi juicio suelen ser más utilizadas que sus equivalentes en PLINQ o Parallel, ya que pueden permitir a un usuario cancelar el acceso a un recurso que tarda más de lo previsto (una URL p.e.) y las tareas que debían ejecutarse a continuación.

Partiendo de la base de un método que hace un trabajo largo:

private void DoALongWork(CancellationTokenSource cs)
{
    try
    {
        for (int i = 0; i < 100; i++)
        {
            Thread.Sleep(10);
            cs.Token.ThrowIfCancellationRequested();
        }
    }
    catch (OperationCanceledException ex)
    {
        Console.WriteLine(ex.Message);
    }
}

Podemos cancelar la tarea llamando al método ‘Cancel’ del token:

var cs = new CancellationTokenSource();
var t = Task.Factory.StartNew(
    () => DoALongWork(cs)
    );
Thread.Sleep(500);
cs.Cancel();

Y al cancelarse el código entrará en el catch del método ‘DoALongWork’ en el que se realizarán las acciones apropiadas a la cancelación.

Cancelación y estado

Para terminar, en algunos casos puede ser interesante que en caso de éxito la tarea llame a una segunda, pero en cambio si se cancela llame a una tercera. Para poder hacer esto debemos preguntar por el estado de la primera tarea y realizar un pequeño truco: La excepción no debe ser interceptada en un bloque try!

Así pues el código del método quedaría de este modo (sin try):

private void DoALongWork(CancellationTokenSource cs)
{
    for (int i = 0; i < 100; i++)
    {
        Thread.Sleep(10);
        cs.Token.ThrowIfCancellationRequested();
    }
}

Y la llamada de este otro:

var cs = new CancellationTokenSource();
var t = Task.Factory.StartNew(
    () => DoALongWork(cs)
    );
Thread.Sleep(500);
cs.Cancel();

t.ContinueWith(task =>
    {
        switch (task.Status)
        {
            case TaskStatus.Faulted:
                MessageBox.Show("Fail");
                break;
            case TaskStatus.RanToCompletion:
                MessageBox.Show("Success");
                break;
        }
    }, TaskScheduler.FromCurrentSynchronizationContext());

Como podéis observar, podemos cambiar el flujo del código en función del estado de la tarea.

Importante: Para poder ejecutar este último ejemplo es necesario ejecutar sin depuración (Ctrl+F5)

Bueno, nos hemos dejado algunas cosas en el tintero, pero para no hacer muy largo éste post las veremos más adelante, en otros posts avanzados de la misma serie.

Hasta entonces prometo actualizar la serie con más frecuencia que hasta ahora ;)

Volver al índice de contenidos