problema

La pregunta que nos planteamos es ¿cómo usar JMS para paralelizar los procesos de un algoritmo? Si tengo un cálculo que se demora mucho y después otro, pero que son independientes el uno del otro, ¿cómo repartirlos en múltiples procesadores (y múltiples máquinas) para luego recoger los resultados y proseguir en menos tiempo? Algo así como multi-threading pero distribuído en un cluster (i.e. entre paraleliación, load-balancing). ¿Puede JMS ayudarnos en ésto?

Con Queues, tendríamos que crear mínimo una cola de peticiones (A) y una cola de respuestas (B),
A: ¿cuánto es 1+1?
B: ¡es 2!
Porque sino se mezclarían los mensajes
A: ¿1+1? ¿2+2? ¡2! ¿3+3? ¡4! ¡6!

Las colas conservan el orden, pero los cálculos no necesariamente retornarán en órden
A: ¿1+1? ¿2+2? ¿3+3?
B: ¡4! ¡6! ¡2!
Lo cual se puede solucionar fácilmente asignándole un tóken a cada cálculo
A: ¿a:1+1? ¿b:2+2? ¿c:3+3?
B: ¡b:4! ¡c:6! ¡a:2!
Pero esto podría causar tacos en caso que los mensajes fueran pesados y se demoraran en transferir. Por ejemplo, para saber el resultado de c tengo que esperar que los que pidieron a y b reciban sus mensajes. Si el que pidió a se cayó, toda la cola quedaría paralizada.
Más importante que este riesgo, si es que a y b los pidieron dos máquinas distintas, no nos sirve que la máquina que pidió a reciba b, porque no sabrá qué hacer con éste y lo perderá o al menos tendría que reencolarlo.
O sea que el orden de la cola no es necesariamente lo que nos sirve.

Con Pub/Sub, tendríamos que publicar un problema y quedar suscritos a su solución
Pub: ¿cuánto es 1+1?
Sub: ¡es 2!
Lo cual resuelve los problemas del orden ya que todos los nodos estarán suscritos a la respuesta y, si bien también utilizarían el tóken para saber si es la respuesta a su problema, sería más eficiente ya que no necesitan esperar a que los demás reciban sus mensajes (no hay tacos ni paralización) y como todos reciben el mensaje, los que no lo usarán simplemente pueden ignorar los mensajes de los demás.
Sin embargo, por lo menos en su implementación más básica, se darán dos nuevos problemas.
Si es que no hay ninguno de los nodos suscritos a una pregunta arriba, ésta se perdería
Pub: ¿cuánto es 1+1?
Sub: ….
O – peor aún – se daría a menudo que hayan más de un nodo desocupado que tome un problema y se ponga a resolver el mismo problema gastando recursos innecesariamente
Pub: ¿cuánto es 1+1?
Sub: ¡es 2!
¡es 2!
etc.

Sin embargo, podría ser que con una combinación de estas lógicas lleguemos a una solución al problema. Como siempre, veamos antes qué se ha hecho ya al respecto.

approachs

Esta página de Camel >> habla de que el problema se ha planteado antes.
De hecho, se habla de parallel processing y load balancing vía JMS. Para paralelizar y preservar el órden de los mensajes recomiendan los Exclusive Consumers >> y los Message Groups >> de ActiveMQ. Pero parece que ésto es al revés, si ya paralelicé, cómo mantener el órden asociando ciertos mensajes a un único thread… También ver los Message Selectors >>

A ver vamos por partes. ¿Qué es Camel? ¿Nos sirve? En la misma página dicen que la definición no es muy clara, y que mejor remítase a StackOverflow >>. En resumen es una librería java para EAI. Corta, se trata de implementar los buenos patrones de integración entre distintas aplicaciones y distintos protocolos. Es decir, no es lo que estamos buscando.

¿Qué hay de los MDB pools?

Según Oracle >> que las colas de por sí proveen de load-balancing

Parece que la solución va por definir dos colas, como dijimos arriba, una de petición y otra de respuesta.
Luego, las clases mandan un mensaje a la petición y quedan a la escucha (en tanto clases durante su vida, no como listeners) de la respuesta a ese mensaje en particular, el cual puede identificar ya sea con el selector o con el correlationId. Los demás mensajes que no han sido pedidos por sus ‘dueños’ simplemente quedan ahí hasta que alguien los tome y no nos importa el órden de entrada o salida.

selectors

Vamos a hacer la prueba de concepto con JMS y después veremos cómo adaptarlo a CMS.
La idea es que un @Stateful bean publique dos preguntas a la cola, se suscriba a sus respuestas y las junte. Por otra parte habrán dos @MessageDriven beans a la espera de cada una de las operaciones. Para filtrar las operaciones usaremos selector. También haya que resolver cómo escoger las respuestas específicas a cada pregunta. Veamos si funciona.

Entonces el staeful crea los dos mensajes y los tira marcados con su Selector (para la inyección de la cola ver >>)

@Inject private Queue queue;
@Inject private transient Session sesion;
//...
MessageProducer productor = sesion.createProducer(queue);
//suma
MapMessage msg = sesion.createMapMessage();
msg.setInt("operando1", this.numero);
msg.setInt("operando2", this.numero2);
msg.setStringProperty("Selector", "uno");
productor.send(msg);
//mult
MapMessage msg2 = sesion.createMapMessage();
msg2.setInt("operando1", this.numero);
msg2.setInt("operando2", this.numero2);
msg2.setStringProperty("Selector", "dos");
productor.send(msg2);

Los cuales son procesados por dos message-driven, uno para cada Selector

@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
@ActivationConfigProperty(propertyName="destination", propertyValue="queue.cola"),
@ActivationConfigProperty(propertyName="acknowledgeMode", propertyValue="Auto-acknowledge"),
@ActivationConfigProperty(propertyName="messageSelector",propertyValue="Select='uno'") //O DOS
})
@ResourceAdapter("activemq-rar-5.5.1.rar")
public class MDB implements MessageListener {
@Inject private Queue queue; //ídem
@Inject private transient Session sesion;

public MDB() {}

public void onMessage(Message message) {
if(message instanceof MapMessage){
try {
MapMessage mapa = (MapMessage)message;
int retorno=//...
Thread.sleep(5000); //hace algo

//lo retorna
MessageProducer productor = sesion.createProducer(queue);
MapMessage msg = sesion.createMapMessage();
msg.setInt("retorno", retorno);
productor.send(msg);
}
catch(JMSException e) {
e.printStackTrace();
}
}
}
}

Como vemos, los MDB hacen su pega y después mandan la respuesta a la misma cola (marcada con algún tipo de tóken para que sepa de qué respuesta se trata. Para ésto, el productor que primero vimos, luego de enviar los mensajes, instancia un MessageListener que queda a la espera de ésta respuesta.

//el envío de msg que vimos arriba
productor.send(msg2);

//suscripción retorno, aquí declararíamos el selector de retorno
MessageConsumer consumidor = sesion.createConsumer(queue);
Listener listener = new Listener();
consumidor.setMessageListener(listener);
session.recover(); //se reinicia la cola

Dos cosas a notar.

  • Lo primero que Listener es un POJO que extiende MessageListener (pero no es un MDB).
  • Más importante, tenemos que reiniciar la cola ya que, como la conexión ya está started, es muy tarde para empezar a escuchar. Hacerlo con recover funciona, siempre y lo haga antes de que termine el request.

Por lo tanto se presentan dos problemas (o síntomas del mismo problema).

  • Que si no ponemos un tiempo de espera después de recover, el thread del request termina antes de poder recibir mensajes. Tener que definir este tiempo es un problema ya que no hay ningún criterio para hacerlo.
  • Que la cantidad de mensajes que recibimos también es arbitrario. Es decir, puede pasar que el thread siga escuchando después de haber recibido sus mensajes.

Una solución flaite para éso podría ser un loop que espere a haber recibido la cantidad de mensajes que está esperando (o haber recibido cada uno de las respuesta que espera)

MessageConsumer consumidor = sesion.createConsumer(queue);//aquí tb se puede usar Selector
Listener listener = new Listener();
consumidor.setMessageListener(listener);
session.recover();
while(listener.recibidos>2){
Thread.sleep(100);//espera un poco más
}

Lo cual funciona. Sin embargo, no sé si este session.recover es lo que corresponde. Cuek! Funciona pero sólo las primeras dos veces, luego recibe sólo el primer mensaje y se queda para siempre en el loop. Puede que la clave esté aquí >>

Each transacted session supports a single series of transactions.

Será que el problema no es porque termine el thread, sino porque como la sesión es inyectada, es transacted (manejada por el container) y la está cerrando por x motivo. Quizás baste con instanciarla a mano.

@Inject private transient Connection conexion;
Session sesion2 = conexion.createSession(false, Session.AUTO_ACKNOWLEDGE); //NO TRANSACTED
MessageConsumer consumidor = sesion2.createConsumer(queue);
Escucha listener = new Escucha();
consumidor.setMessageListener(listener);
conexion.start();//!

Mmm, no pareceriera haber diferencia entre ambas opciones. ¿Será que se necesita independizar el thread de esucha del de espera?
O quizás necesite usar varios MessageListeners síncronos, independientes entre sí.
Parece que en realidad no hay diferencia, porque por estar en un SFSB, se sigue considerando Managed >>.

In an Enterprise JavaBeans component, you cannot use the Session.commit and Session.rollback methods. Instead, you use distributed transactions, which are described in Using the JMS API in Java EE Applications.

Sin embargo el link que da no provee más info de cómo hacerlo

This tutorial does not provide any examples of bean-managed transactions.

De hecho dice explícitamente que no haga justo lo que estoy haciendo (pero en la misma sesión…) No funciona ni siquiera demarcando las transacciones JTA

@Inject UserTransaction utx;
utx.begin();
//...
utx.commit();

Tendrá que ver con este bug de Arjuna?

ARJUNA-16037 Could not find new XAResource to use for recovering non-serializable XAResource

Temporary Queues

Los Temporary Queues >> ¿proveerán una solución para ésto? En todo caso, me interesa estudiarlos porque proveen una solución para el asunto de los tókens. Aers.

Básicamente en la sesión de cada productor, creamos una TemporaryQueue

TemporaryQueue tmpQ = session.createTemporaryQueue();
msg.setJMSReplyTo(respuesta); //y los mensajes deben responderse a ésta
MapMessage msg = session.createMapMessage();
msg.setInt(//etc.

Luego en los MDB mando las respuestas a esta cola que viene en el mensaje

public void onMessage(Message message) {
MessageProducer porducer = session.createProducer(mapa.getJMSReplyTo());//OJO

Para recibir las respuestas, el productor se queda a la escucha de la cola temporal que creó

MessageConsumer consumer = sesion.createConsumer(tmpQ);//OJO
consumer.setMessageListener(new Listener());

Con lo que, como decíamos, además de eliminar la necesidad de estar mandando tókens para usarlos de selectors de las respuestas ya que necesariamente cada pregunta recibirá las respuestas a los mensajes que ella mismo hico, ¡efectivamente se resuelve el problema transaccional de más arriba!

De hecho, tampoco estoy necesitando hacer recover de la sesión al ponerse a escuchar en una nueva cola. O sea es óptimo. El único problema que he observado es el del cierre de la conexión por el container, nada que no pueda arreglarse con manejo transaccional.

JMSCorrelationID

Otra propiedad que nos interesa es la id de correlación, que nos permite saber en respuesta a qué mensaje estamos recibiendo otro. Cada vez que un MDB responde un mensaje, le setea a la respuesta el id de éste como JMSCorrelationID

public void onMessage(Message message) {
//...
msg.setJMSCorrelationID(message.getJMSMessageID());
producer.send(msg);
}

Así, en el productor que está esperando las respuestas a sus preguntas, podemos saber exactamente de cuál de ellos está recibiendo una respuesta. Por ejemplo, podríamos dejar los ids de los mensajes enviados en un mapa, y dejarlo esperando las respuestas (asíncronamente) hasta que cada uno de ellos haya sido procesado, en lugar de usar un contador como lo hemos estado haciendo hasta ahora.

private HashMap<String,Boolean> results;
//al enviar cada mensaje
productor.send(msg);
resuls.put(msg.getJMSMessageID(), false);
//y al recibirlos
public void onMessage(Message msg) {
his.results.put(msg.getJMSCorrelationID(), true);
}//etc.

Entonces, dejamos el thread esperando que estén todos las respuestas para cerrar la sesión

while(listener.results.containsValue(false)){
Thread.sleep(1000);
}
Este sitio utiliza cookies.    Leer más