04 Sep

MapReduce en el mundo real con C#

Dejo aquí un artículo que mi amigo Jesús ha escrito en su blog , más adelante le volveré a echar un vistazo.

Durante el curso pasado estuve cursando el máster de Ingeniería y Tecnología del Software en la Universidad de Sevilla, durante el máster he tratado muchos temas en mayor o menor profundidad. De ellos, uno de los que más me interesó fue el de tratamiento de grandes cantidades de datos.

Hicimos un trabajo sobre Big Data, en el cual hicimos un pequeño estudio sobre tecnologías actuales como Hadoop, Disco y otras alternativas.

Muchas de ellas tienen potentes motores de MapReduce para el tratamiento de datos de manera paralela y distribuida. En este post no quiero entrar en profundidad sobre lo que es Hadoop o MapReduce, aunque si no sabes lo que es MapReduce y vas a continuar leyendo deberías echarle un ojo a esta presentación donde puedes entender el concepto en 1 minuto.

En el trabajo me asignaron un proyecto donde había que tratar unas cantidades de datos considerables. A continuación explicaré como lo enfoqué.

Contexto
El proyecto consistía en tratar grandes cantidades de facturas telefónicas, grandes cantidades es un concepto relativo, en este caso son unos 100 megabytes de texto plano, formato csv, sobre los que hay que realizar un intenso procesamiento.
Después del procesado había que generar un PDF con un resumen de la factura. Es decir, imaginaros vuestra factura mensual de teléfono fijo, pasarle una aplicación y generar un PDF donde se resumen el consumo que has tenido de voz (nacional, internacional), datos, roaming, etc etc. Obviamente esto es un poco más complicado porque en el caso a tratar existían unas tarifas “ad-hoc” que había que tener en cuenta y aplicar.

Enfoque
Obviamente 100 megabytes de texto plano no es problema que haya que abordar con Hadoop en un cluster de amazon con 50 instancias, pero si es cierto que aplicando técnicas convencionales de procesamiento los tiempos de ejecución se alargaban bastante (en torno a 30 minutos), ya que como he comentado anteriormente el procesamiento es intenso. Así que mi enfoque fue el implementar MapReduce usando hilos de ejecución.
Basicamente y para que se entienda mejor, implementar MapReduce en Hadoop viene a ser esto:

Donde cada nodo es un ordenador conectados por red (flechas). Mi enfoque fue que cada nodo fuera un hilo de ejecución, por tanto las flechas ya no representan la red, sino la memoria RAM.

Implementación
Es esta parte adjunto la clase MapReducer en la cual esta toda la lógica a aplicar sobre los datos. La clase no está completa puesto que hay mucha lógica que no tiene que ver con el post. Por ejemplo, dado que las facturas en realidad es un conjunto se subfacturas, hice la clase MapReducer genérica y los tipos se inyectan por reflexión en tiempo de ejecución, lo que hace que el código sea más complejo de leer, así que he omitido toda esa parte puesto que no tiene interés. Básicamente la clase tiene 3 métodos, Map, Reduce y MapReduce.
El único con visibilidad pública es MapReduce puesto que nunca se llaman a Map o Reduce por separado.

Adjunto el código simplificado y comentado.


using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace MyNameSpace
{
public class MapReducer<t>
{
private const int NUM_CORES = 16;
//Aquí es donde se añadiran los trozos durante la fase de "Map"
private ConcurrentBag<report> listBag = new ConcurrentBag<report>();
//Simplemente una envolutura para la "bolsa" de trozos que implementa toda la lógica de concurrencia
private BlockingCollection<report> listChunks;

//Comparador por el cual decidiremos cuando dos trozos tiene la misma clave para poder actualizarlos (Fase "Reduce")
private IEqualityComparer<report> comparer;

//Aquí es donde se reducirán los trozos que se generaron en la fase de "Map"
private ConcurrentDictionary<report report=""> reportStore;

List<tarifa> _tarifas;

public MapReducer()
{
//Aquí se inicializan los objetos que controlaran la concurrencia
listChunks = new BlockingCollection<report>(listBag);
comparer = new ReportComparer();
reportStore = new ConcurrentDictionary<report report="">(comparer);

_tarifas = _tarLoader.BuildTarifas().GetTarifas(typeof(T));
}

private void Map(string filename)
{
//ProduceFileBlocks() parte el fichero en tantos trozos como hilos disponibles haya en el pool.
Parallel.ForEach(ProduceFileBlocks(data as List<t>), block =>
{
//Aquí se realiza el procesado de los datos
Extractor<t> extractor = new Extractor<t>(_tarifas);
var extracted = extractor.Extract(block);
//Aquí se añaden al conjunto de trozos que posteriormente se reduciran
foreach (var item in extracted)
{
listChunks.Add(item);
}

});
//Este es el mecanismo de sincronización, cuando todos los hilos hayan hecho
//CompleteAdding() empezarán a reducir
listChunks.CompleteAdding();
}

private void Reduce()
{
Parallel.ForEach(listChunks.GetConsumingEnumerable(), report =>
{
//Básicamente aquí lo que se hace es ver si cada trozo procesado en el Map está contenido en el la Dictionary,
// y en caso de que si que esté, se actualiza con el valor antiguo más el nuevo.
if (reportStore.ContainsKey(report))
{
var old = reportStore[report];
var update = new Report();

update.Duracion = old.Duracion + report.Duracion;
update.Volumen = old.Volumen + report.Volumen;
update.Ocurrencias = old.Ocurrencias + report.Ocurrencias;
update.SubtotalVodafone = old.SubtotalVodafone + report.SubtotalVodafone;
update.SubtotalHeineken = old.SubtotalHeineken + report.SubtotalHeineken;
update.Diferencia = old.Diferencia + report.Diferencia;
//Este método mira las claves de 'old' y 'update', y si son iguales, actualiza el diccioanrio con 'update'
//Aquí es importante definir un comparador "custom" que dependerá de la lógica de cada implementación
reportStore.TryUpdate(old, update, update);
}
else
{
//Si no hay ninguna clave en el Dictionary se añade el primero tal cual
reportStore.TryAdd(report, report);
}

});
}
//Método que engrana las dos fases. Es el único público, ni Map ni Reduce lo son puesto que no tiene sentido llamarlos aislados
public void MapReduce(string filename)
{
//Inicialización previa en caso de que se ejecute dos veces y las "bolsas" estén llenas.
if (listChunks.IsAddingCompleted)
{
listBag = new ConcurrentBag<report>();
listChunks = new BlockingCollection<report>(listBag);
}
//Aquí se hace map en tantos hilos como hayas configurado. Yo tengo un pool de 16.
System.Threading.ThreadPool.QueueUserWorkItem(delegate(object state)
{
Map(filename);
});
Reduce();
}
//Método para ver los resultados
public void PrintResults()
{
foreach (KeyValuePair<report report=""> item in reportStore)
{
Console.WriteLine(item.Value);
}
}

}
}
</report></report></report></t></t></t></report></report></tarifa></report></report></report></report></report></t>
Conclusiones
.
Bueno, básicamente la idea era explicar un poco cómo funciona esto y que puede ser aplicado en proyectos reales sin tener que currar en Facebook y sin tener que usar un cluster con 50 máquinas

Share this

Leave a reply