preámbulo

Ya vimos cómo distribuír operaciones entre varios procesos o máquinas (ésa es la gracia) por medio de CMS >> pero ¿cómo saber cuántos procesos dejar a la esucha para cada operación?

Por ejemplo, puedo tener un proceso que haga multiplicación, suma y  trasposición de matrices.
Entonces tendría un thread principal que encola un mensaje por cada operación y queda a la espera de las tres respuestas.
Simultáneamente, en alguna máquina tendría que tener tres procesos escuchando, uno que sepa multiplicar, uno que sepa sumar y uno que sepa trasponer.
Pero podría ser que la suma fuera mucho más frecuente que las demás operaciones, o mucho más liviana. Entonces tendría sentido tener más procesos de suma corriendo por máquina – digamos tres de suma, uno de multiplicación y uno de trasposición – a sabiendas que voy a tener recursos disponibles para procesar las sumas de varios threads principales simultáneamente.
Es más, por otro lado, podría estar aceptando mensajes para cuyos procesos no tengo recursos, porque, por ejemplo, el proceso que traspone está disponible, a pesar que me queda poca ram o que los cpus están al máximo.
Tampoco parece plausible destinar una máquina por proceso, por razones obvias, y porque tampoco constituye una solución al problema.

Se trata entonces de optimizar los recursos de cada máquina, y al mismo tiempo a no estar obligados a predeterminar ‘en duro’ cuántos procesos deben éstas correr, sino que ésto sea determinado dinámicamente según la disponiblidad de recursos.

Para éso, la idea es que cada máquina corra un solo listener, y que éste sea capaz de determinar si puede o no aceptar los mensajes que están en la cola. Éste tendría un selector con las operaciones que sabe hacer, pero además debiera checkear ciertos criterios al recibir un mensaje para reencolarlo en caso que no sea capaz de procesarlo.

//TODO checkear si lanzar una excepción lo reencola automáticamente
//i.e. transacciones

Para ésto, los criterios que se me ocurren serían

  • si hay CPUs disponibles
  • si puedo reservar el espacio de memoria necesario para el cálculo (como memoria dinámica usando new >>)
  • si la matriz ya está en memoria, para reutilizar el pointer (óptimo)

De dar positivo, el listener debiera abrir un nuevo fork en el thread para que haga la pega

pointers

Respecto a ester último punto, lo otro en que habíamos pensado que podía ser útil el multi-threading es en no tener que pasar las matrices gigantes por red entre cada proceso, sino que solamente pasar los punteros como parámetros de método >>

void metodo(arma::mat& matrix, ...){
//

Sin embargo, el límite de esta estrategia necesariamente es la máquina – de hecho ActiveMQ puede correr por métodos dentro de una VM – por lo que para hacer load-balancing entre varias máquinas necesariamente habrá que pensar en una solución, como por ej. un sistema de archivos compartido… si es que éste es realmente un problema – ya vimos que con protocol buffers estos archivos pueden pesar al rededor de 400Mb, faltaría hacer un poco de testing al respecto.

librerías

Hasta C++0x había que usar librerías >>, de las cuales no todas eran independientes de la plataforma. Una de ellas es boost >>, que ya tenemos en el proyecto para otros efectos.
Otra opción quizás es decaf >>, que viene incluído con ActiveMQCpp. Pero nadie lo usa >>
¿Qué es 0x y cómo saber si lo tengo? Es C++11. ¿Cómo saber si mi compilador lo soporta? Si uso g++ 4.6.3, parece que no tiene mucho soporte aún >>
Ok, entonces ahora tengo razones de peso para usar boost, que es lo que quiero 🙂

recursos

Para uso del CPU, ver SIGAR
Por OS >>

threading-an-sich

Primero que nada, si bien los headers de boost threading están (hace tiempo instalados por package manager) y se pueden usar en el proyecto así

#include <boost/thread.hpp> //todos los headers

en tiempo de compilación usarlos escupe el siguiente error

undefined reference to `boost::thread`

ésto porque a diferencia del resto de boost, Boost.Thread no está implementado en los headers >>, sino que hay que importarlo con el linker. En debian, sería con

-lboost_thread

Luego, hay que iniciar un thread pasándole un objeto invocable al constructor. Por ejemplo, le podríamos pasar una función directamente

void funcion(){
cout << "hola" << endl;
}

int main(){
boost::thread hilo(funcion);
return 0;
}

Sin embargo, el comportamiento de ésto es irregular, y muchas veces ejecuta la función dos veces, probablemente porque lo que se le pasa al thread es un copia de ésta. Lo podemos testear ejecutándolo varias veces

for i in {1..5}; do ./ejecutable; done

y he aquí uno de los resultados

holahola
holaholahola
holaholahola

Al parecer, ésto no tiene que ver con que el objeto se copie, sino con que el thread principal termina antes que el que hace el output, razón por la cual la más de las veces no alcanza a hacer el salto de línea, u otra se come media palabra. Es decir que la función, al ser un puntero, vive menos que el thread que la ocupa. Si la esperamos

int main(){
boost::thread hilo(funcion);
hilo.join(); //juntamos los hilos.
return 0;
}

funciona. Ahora, esto no explica por qué hay más outputs que llamadas a la función…

En todo caso, lo que nos interesa es independizar los hilos, no estarlos juntando o esperando. Es por ésto que la documentación de boost hace hincapié en la utilización de callables. Ésto es, objetos función >> , digamos objetos que pueden ser llamados como funciones. En C++ ésto se logra sobescribiendo el operador ( )

struct objetoFuncion{
void operator()(){//sobrescribe operator()
cout << "hola" << endl;
}
};

Éste sí es copiado dentro del nuevo thread y tiene una existencia independiente

int main(){
objetoFuncion o;
boost::thread hilo(o);
return 0;
}

gtest

Sin embargo ésto último nos da un segmentation fault. Éste es un error de memoria… después de mucho lidiar, intentaremos usar Valgrind para detectarlo. Ésta nos muestra que los leaks de memoria se están produciendo en Google Test. De hecho si deslinkeamos la librería, el código funciona. Ya sabíamos que no podríamos testear el multi-threading con Google Test, mas ¿no podemos siquiera tenerlos juntos?

detach

Más encima, el código se comporta erráticamente como al principio, siendo que destruir el thread principal no debiera impedir que el segundo ejecute hasta el final… ¿será que tengo que usar detach explícitamente?
Destruirlo no da resultado

boost::thread hilo(o);
hilo.~thread();

Tampoco usar el método detach

boost::thread hilo(o);
hilo.detach();

Podríamos pensar que se trata de que todos los threads intentan acceder concurrentemente a la consola… en ese sentido haremos la prueba con un archivo. Algo así

void operator()(){//sobrescribe operator
ofstream archivo;
archivo.open("test.txt");
archivo << "test \n";
archivo.close();
}

pasa lo mismo…

El problema aún es que el secundo thread muere con el primero. En efecto, no estaba entendiendo bien para qué sirve detach >> que es para liberar los recursos asociados al thread, y de hecho todos los threads mueren con junto con main.
En nuestro caso no hay problema con ello ya que dejaremos el listener corriendo indefinidamente.

Como últimas consideraciones básicas, anotar la posibilidad de pasar el objeto función por referencia en lugar de ser copiado usando boost::ref(), ésto probablemente nos sirva para las matrices dado su tamaño

objetoFuncion o;
o.propiedad=1;
boost::thread hilo(boost::ref(o)); //dentro del thread, propiedad++
o.propiedad==2//true

en este caso hay que asegurarse que o no sea destruído antes que el thread al cual se lo pasamos.

Y la posibilidad de pasarle parámetros al operator( )

struct objetoFuncion{
void operator()(int a, int b){
cout << a+b << endl;
}
};

int main(){
objetoFuncion o;
boost::thread hilo(o,1,1);//paso de parámetros
hilo.join();
return 0;
}

eso sí, ésto está limitado a 9 parámetros.

El sitio utiliza cookies, para iniciar sesión o para cotizar los servicios. No usamos cookies de terceros.    Leer más
Privacidad