Este artículo se trata de cómo traducir el load-balancing con JMS >> a CMS (C++).

En resumen, la idea es tener un único thread que determine qué operaciones hay que hacer, las mande a pedir vía ActiveMQ a distintos procesos/máquinas y luego los recopile, distribuyendo el tiempo que toma cada uno.

Después de resolver los includes, namespaces, librerías, etc. entonces, en un único thread de ejecución lanzamos dos mensajes con una propiedad Selector para ser filtrados según operación por los listeners.

//1
std::auto_ptr<MapMessage> message(session->createMapMessage());
message.get()->setStringProperty("Selector","uno");//OJO
producer->send(message.get());
//2
std::auto_ptr<MapMessage> message2(session->createMapMessage());
message2.get()->setStringProperty("Selector","dos");//OJO
producer->send(message2.get());

(std::auto_ptr es un auto-pointer (con garbage collection, etc.) que ignoraremos por el momento – está porque es un copy/paste de los ejemplos de ActiveMQCpp. De hecho, para saber cómo crear la session, remítase a éstos.)

Luego haremos dos listeners, uno para cada Selector u operación. Por ahora diremos que éstos quedan corriendo como procesos aparte, a la escucha.
Como se debe registrar en el consumidor el listener asíncrono (que implementa MessageListener)

//ej.
consumer->setMessageListener( this );

éstos no podrán abstraerse a un único flujo como el de arriba, así es que tomaremos literalmente la clase de ejemplo SimpleAsyncConsumer, la cual sólo modificaremos para que escuchen con los Selectors que le corresponden:

consumer = session->createConsumer(destination, "Selector='uno'");

Ahora, ¿cómo este mismo consumidor puede retornar sin necesidad de instanciar a su vez un productor? Saltemos de inmediato a la cola temporal para ésto.
Nuevamente, junto con el MessageProducer creamos una TemporaryQueue

TemporaryQueue* tmpQ;
//...
producer = session->createProducer(destination);
tmpQ=session->createTemporaryQueue();

la cual pasamos a los mensajes como su propiedad ReplyTo

message.get()->setCMSReplyTo(tmpQ);
producer->send(message.get());

Así es como el consumidor responderá a esta cola temporal

virtual void onMessage( const Message* message ) throw() {
MessageProducer* productor = session->createProducer(message->getCMSReplyTo());//OJO
MapMessage* respuesta = session->createMapMessage();
productor->send(respuesta);

Que es la que el productor debe quedar escuchando una vez que lanza sus mensajes

//ídem...
message2.get()->setStringProperty("Selector","dos");
producer->send(message2.get());

MessageConsumer* tmpConsumer = session->createConsumer(tmpQ);
tmpConsumer->receive()

Al igual que lo hicimos en Java, nos aseguraremos que el productor haya recibido las respuestas a todos los mensajes que envió antes de continuar. Para tal efecto usamos un map en donde se guardan los ids de los mensajes enviados y un booleano indicando si es que están ok o no.

map<std::string,bool> resultados;
//...
producer->send(message.get());
resultados.insert(std::pair<std::string,bool>(message.get()->getCMSMessageID(),false));

Como los consumidores setean el CMSCorrelationId indicando a qué mensaje están respondiendo

respuesta->setCMSCorrelationID(message->getCMSMessageID());
productor->send(respuesta);

En el productor los marcamos como recibidos

resultados[respuesta->getCMSCorrelationID()]=true;

Por tanto, seguiremos escuchando exactamente hasta que todas las respuestas hayan sido recibidas.
Para eso, como no existe algo así como map.containsValue, tenemos que hacer una pequeña función que recorre el mapa a ver si todavía quedan mensajes por recibir.

bool todoRecibido(map<std::string,bool> resultados){
for(map<std::string,bool>::iterator i=resultados.begin(); i!=resultados.end();i++){
if(i->second==false){
return (false);
}
}
return (true);
}

Por lo tanto, nuestra espera síncrona se hará mientras tal función siga retornando false

while(!todoRecibido(resultados)){
const MapMessage* respuesta = dynamic_cast< const MapMessage* >(tmpConsumer->receive(500));
resultados[respuesta->getCMSCorrelationID()]=true;
respuesta->getInt("intR");
}

Y éso es to, éso es to, éso es todo amigos.

Este sitio utiliza cookies.    Leer más