Banear

lunes, 26 de agosto de 2013

Colas de mensajería (Maven, Spring-JMS y ActiveMQ)

En esta entrada vamos a ver como configurar (de forma rápida), como configurar colas con Spring y un gestor de colas, como es el caso de Apache ActiveMQ, para enviar y recibir mensajes. Para ello, podéis descargar:

- La versión 5.7.0 de ActiveMQ
- Tomcat 7

Os creáis un proyecto Maven con la siguiente configuración:

En el pom.xml, la dependencia a Spring y al core de ActiveMQ:
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
        http://maven.apache.org/maven-v4_0_0.xsd">
 <artifactId>spring-activemq-sample</artifactId>
 <modelVersion>4.0.0</modelVersion>
 <packaging>war</packaging>
 <groupId>papidal</groupId>
 <version>1.0</version>
  <properties> 
        <releaseCandidate>1</releaseCandidate>
        <spring.version>3.1.1.RELEASE</spring.version>
        <java.version>1.5</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.javadoc.reporting.version>2.7</maven.javadoc.reporting.version>
        <commons.logging.version>1.1.1</commons.logging.version>
        <log4j.version>1.2.16</log4j.version>
        <context.path>spring-activemq-sample</context.path>
    </properties>
 <build> 
  <resources>
   <resource>
    <directory>src/main/resources</directory>
    <filtering>true</filtering>
   </resource>
  </resources>
  <plugins>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-war-plugin</artifactId>
    <configuration>
     <warName>${context.path}</warName>
    </configuration>
   </plugin>
  </plugins>
 </build>
 <dependencies>
  <dependency>
   <groupId>log4j</groupId>
   <artifactId>log4j</artifactId>
   <version>${log4j.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.7.0</version>
  </dependency>
  <dependency>
     <groupId>org.springframework</groupId>
     <artifactId>spring-jms</artifactId>
     <version>${spring.version}</version>
  </dependency>
  <dependency>
     <groupId>org.springframework</groupId>
     <artifactId>spring-jms</artifactId>
     <version>3.1.0.RELEASE</version>
     <scope>compile</scope>
  </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aspects</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-asm</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-expression</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
   <groupId>org.springframework</groupId>
   <artifactId>spring-web</artifactId>
   <version>${spring.version}</version>
  </dependency>
  <dependency>
   <groupId>org.springframework</groupId>
   <artifactId>spring-webmvc</artifactId>
   <version>${spring.version}</version>
  </dependency>
  <dependency>
   <groupId>javax.servlet</groupId>
   <artifactId>servlet-api</artifactId>
   <version>2.4</version>
  </dependency>
 </dependencies>
</project>

Un contexto, context.xml, en el que estarán vuestras colas de entrada y salida, así como la conexión al gestor de colas:

<?xml version="1.0" encoding="UTF-8"?>
<Context>

   <!--
     Conexión ActiveMQ
   -->
   <Resource name="jms/mqConnectionFactory"
      auth="Container"
      type="org.apache.activemq.ActiveMQConnectionFactory"
      description="JMS Connection Factory"
          factory="org.apache.activemq.jndi.JNDIReferenceFactory"
          brokerURL="tcp://localhost:61616" />

   <!-- Cola 1 -->
   <Resource name="jms/testQueueOne"
      auth="Container"
      type="org.apache.activemq.command.ActiveMQQueue"
          factory="org.apache.activemq.jndi.JNDIReferenceFactory"
          physicalName="TestQueueOne"/>

   <!-- Cola 2 -->
   <Resource name="jms/testQueueTwo"
      auth="Container"
      type="org.apache.activemq.command.ActiveMQQueue"
          factory="org.apache.activemq.jndi.JNDIReferenceFactory"
          physicalName="TestQueueTwo"/>

</Context>

El resto de configuración, en spring-config.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:context="http://www.springframework.org/schema/context"
  xmlns:jee="http://www.springframework.org/schema/jee"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context-3.0.xsd
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
       http://www.springframework.org/schema/jee
          http://www.springframework.org/schema/jee/spring-jee-3.0.xsd">


 <!-- Recursos jndi -->
 <jee:jndi-lookup id="mqConnectionFactory" jndi-name="java:comp/env/jms/mqConnectionFactory" />
 <jee:jndi-lookup id="testQueueOne" jndi-name="java:comp/env/jms/testQueueOne" />
 <jee:jndi-lookup id="testQueueTwo" jndi-name="java:comp/env/jms/testQueueTwo" />

 <!-- Clase que recibe los mensajes -->
 <bean id="testMessageListener" class="papidal.TestMessageListener">
     <property name="testMessageSender" ref ="testMessageSender" />
    </bean>

 
 <bean id="poiMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
     <property name="connectionFactory" ref ="mqConnectionFactory" />
     <property name="destination" ref ="testQueueOne"/>
  <property name="messageListener" ref ="testMessageListener"/>
  <property name="concurrentConsumers" value="2" />
    </bean>

 <!-- Clase que envía los mensajes -->
 <bean id="testMessageSender" class="papidal.TestMessageSender">
  <property name="jmsTemplate" ref="jmsTemplate"/>
  <property name="testQueue" ref="testQueueTwo"/>
 </bean>

 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  <property name="connectionFactory" ref="mqConnectionFactory" />
 </bean>

</beans>

En el web.xml, indicáis dónde tenemos la configuración de spring:

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns="http://java.sun.com/xml/ns/javaee"
   xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
   xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
                       http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
   id="WebApp_ID"
   version="2.5">

 <!--
  Fichero de configuración de spring
 -->
 <context-param>
  <param-name>contextConfigLocation</param-name>
  <param-value>
   /WEB-INF/config/spring-config.xml
  </param-value>
 </context-param>

 <!--
  Cargamos el contexto de Spring
 -->
 <listener>
  <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
 </listener>

</web-app>

La clase que escucha en la cola 1, los mensaje que llegan:

package papidal;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.apache.log4j.Logger;


public class TestMessageListener implements MessageListener
{

 private TestMessageSender messageSender_i;

 private static final Logger logger_c = Logger.getLogger(TestMessageListener.class);


 //Escucha los mensajes
 public void onMessage(Message message)
 {
  logger_c.info("Recibo mensaje de la cola [" + message +"]");

  if (message instanceof TextMessage)
  {
   try
   {
    String msgText = ((TextMessage) message).getText();
    logger_c.debug("Mensaje recibido: " + msgText);

    messageSender_i.sendMessage(msgText);

   }
   catch (JMSException jmsEx_p)
   {
    String errMsg = "Error obteniendo el mensaje";
    logger_c.error(errMsg, jmsEx_p);
   }
  }
  else
  {
   String errMsg = "El mensaje no es del tipo TextMessage";
   logger_c.error(errMsg);
   throw new RuntimeException(errMsg);
  }
 }

 
 public void setTestMessageSender(TestMessageSender messageSender_p)
 {
  this.messageSender_i = messageSender_p;
 }
}


La clase que usamos para enviar los mensajes recibidos hacia la cola 2:

package papidal;

import javax.jms.JMSException;
import javax.jms.Queue;
import org.apache.log4j.Logger;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

// Enviamos mensajes a una cola
@Service
public class TestMessageSender
{
 private JmsTemplate jmsTemplate_i;
 private Queue testQueue_i;
 private static final Logger logger_c = Logger .getLogger(TestMessageSender.class);

 
 // Envio del mensaje
 public void sendMessage(String message_p) throws JMSException
 {
  logger_c.debug("Ponemos el mensaje en la cola[" + testQueue_i.toString() + "] Mensaje[" + message_p + "]");
  jmsTemplate_i.convertAndSend(testQueue_i, message_p);
 }

 
 public void setJmsTemplate(JmsTemplate tmpl)
 {
  this.jmsTemplate_i = tmpl;
 }

 
 public void setTestQueue(Queue queue)
 {
  this.testQueue_i = queue;
 }
}



Levantáis el gestor de colas:
"D:\apache-activemq-5.7.0\bin\activemq.bat"

Para administrar:

Simplemente con levantar el servidor ya veremos la cola 1 (en mi caso, un Tomcat 7) con 2 consumidores.

Le damos a "send To", metemos cualquier cosa (típicamente un xml)  y vemos que por la consola de Tomcat hemos recibido el mensaje correctamente, que hemos enviado por la cola 1 y que lo hemos a su vez enviado a la cola 2. 

En el ActiveMQ podemos verlo también (fijaros en la columna de encolados y desencolados. Como la cola para enviar mensajes es la dos, y nosotros escuchamos sólo la 1, la cola 2 no tiene consumidores activos ya que sólo la usamos para enviar mensajes. Si algún modulo quiere escucharla, no hay problema). 

Acordaros de darle a F5 para refrescar la pantalla ;)



A partir de aquí, se pueden definir canales, jugar con el xpath para aceptar o descartar mensajes en función del identificador del envoltorio, usar jaxb para hacer marshall/unmarshall, definir el encoding del parseo de xml a clases Java y viceversa, ...

jueves, 22 de agosto de 2013

Big Data - Configurar Hadoop y ejemplo de MapReduce

Paso 1. Configurando Hadoop en Linux (un clúster de una máquina)

En mi caso he usado la última versión estable de Hadoop del 1 de Agosto de 2013 y el Ubuntu. En concreto:
Linux Ubuntu 12.0.4 LTS

Os recomiendo este enlace http://www.michael-noll.com/tutorials/running-hadoop-on-ubuntu-linux-single-node-cluster/ donde se explica bastante bien como montarlo paso a paso. Aunque posiblemente os encontraréis con algunos problemas que paso a detallar a continuación:

- Que os de un rechazo de conexión en el puerto 22, es decir, "connect to host localhost port 22 connection refused". Esto quiere decir que probablemente, no tenéis descargado el ssh y el sshd o bien no se estén ejecutando. Para comprobar si existen, hacéis un "which ssh" y un "which sshd", si no devuelve una ruta, deberíais bajar el paquete e instalarlo (sudo apt-get install ssh).

- Si habéis seguido todas las instrucciones del enlace, es decir, habéis creado un usuario para hadoop, habéis descargado y configurado hadoop, habéis deshabilitado ipv6, disteis permisos al fichero /etc/hosts, ... llega el momento de levantar los procesos de Hadoop, para ello tenéis la opción de usar el .sh deprecado (/usr/local/hadoop/bin/start-all.sh) o bien en dos pasos, la opción recomendada actualmente por Hadoop:

/usr/local/hadoop/bin/start-dfs.sh (levanta los procesos NameNode, SecondaryNameNode y DataNode)
/usr/local/hadoop/bin/start-mapred.sh (levanta los procesos JobTracker y TaskTracker)

Una vez lanzados los servicios, con jps comprobamos que efectivamente están levantados. Al teclear jps debe saliros como mínimo algo de este estilo:

5113 TaskTracker
4507 DataNode
5239 Jps
4772 SecondaryNameNode
4245 NameNode
4873 JobTracker

Si tenéis Eclipse abierto u otros procesos Java lanzados, os saldrán también, pero no nos interesan (salvo que estén ocupando los puertos que teníamos reservados para Hadoop).
Si alguno de los procesos mencionados no os aparecen, pongo por aquí unas pequeñas ayudas. En general es recomendable lanzar uno a uno los que no aparezcan para obtener más información que a veces no se muestra en el log:

1) Si no aparece DataNode: lanzar "bin/hadoop datanode" y ver el error. Puede ser que necesitemos limpiar las carpetas tmp de hadoop "rm -Rf /app/tmp/hadoop-hduser/*" (no tiene porque estar sólo ahí) y "bin/hadoop namenode -format" con los servicios parados (stop-dfs.sh y stop-mapred.sh).

2) Si no aparece JobTracker: lanzar "bin/hadoop jobtracker" y ver el error. Si está relacionado con permisos/privilegios usar el comando "sudo chown -R hduser /usr/local/hadoop". Si después os da algo de este estilo: "org.apache.hadoop.mapred.SafeModeException: JobTracker is in safe mode", tendréis que salir del modo seguro "bin/hadoop dfsadmin -safemode leave"

Para ver los puertos asignados, podéis usar este comando: "sudo netstat -plten | grep java"

Si volvéis a levantar todo y veis los 6 con el comando "jps", perfecto, sino comentar por aquí el error por si me suena.



Paso2. Ejemplo con MapReduce

Para este ejemplo rapidillo, pero útil para ver cómo funciona (el concepto me recuerda bastante a los Reader/Processor/Writer de Spring-Batch ya que además la ejecución se paraleliza) usaremos:

- Un fichero con los terremotos de magnitud mayor o igual 1.5 que he obtenido a partir de fuentes oficiales, de unos 5Mb. Descargar.
- Una clase central que invocará a nuestras clases Map y Reduce. Descargar.
- La clase que se encarga del Map. Descargar
- La clase que se encarga del Reduce. Descargar




Colocamos nuestro fichero en el sistema de ficheros de Hadoop:

bin/hadoop dfs -put /home/hduser/terremotos terremotos
bin/hadoop dfs -ls /user/hduser/terremotos

Generaremos un jar (Desde eclipse podéis hacerlos fácilmente mediante la opción de exportar) con las clases mencionadas, y con los servicios de hadoop levantados, ejecutaremos el jar: "java -jar Terremotos.jar"

Y así obtenemos la lista de terremotos en Galicia a lo largo de la historia reciente:

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -cat /user/hduser/terremotosOutput/part-00000
Coruña  240
Costa gallega   252
Lugo    1276
OTROS   56518
Ourense 519
Pontevedra      249

Si queremos volver a lanzar el jar, podemos borrar el directorio de salida con "bin/hadoop dfs -rmr /user/hduser/terremotosOutput"


Para pasar el fichero del sistema de ficheros de hadoop al local, podéis hacer algo de este estilo:  ./hadoop dfs -copyToLocal /user/hduser/terremotosOutput/part-00000  /home/hduser