JMS Implementation

Java Messaging Service API to send messages between different components. We can see the definition of it in many places.So we’re skipping to define it. We are just going to see the various implementations of sending and receiving messages. As we know, we have two JMS mechanisms, Queue and Topic. Now we will just explore the implementation of JMS queue, not the JMS topic that we will see in the next part of JMS article. This article covers the below,
  1. Message Sending to a queue.
  2. Message Reception from a queue. 
      1. Synchronous Message Reception.
      2. Asynchronous Message Reception.
Note : the examples are based on spring framework.
Message Sending and Synchronous Message Reception:
We need to create jmstemplate in application context. Please see the below application context,
      <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="messageDestination" class="org.apache.activemq.command.ActiveMQQueue" >
<constructor-arg value="employee" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="receiveTimeout" value="10000" />
</bean>
<bean id="employeeMsgSender" class="com.employee.proxy.EmployeeMsgSender">
<property name="destination" ref="messageDestination" />
<property name="jmsTemplate" ref="jmsTemplate" />
</bean>
<bean id="employeeMsgConsumer" class="com.employee.proxy.EmployeeMsgConsumer">
<property name="destination" ref="messageDestination" />
<property name="jmsTemplate" ref="jmsTemplate" />
</bean>
Destination represents the queue and the queue name is employee. EmployeeMsgSender and 
EmployeeMsgConsumer will send and receive messages respectively by using this jmstemplate. 
please see below code snippets,

sendEmployeeToQueue method of EmployeeMsgSender,
 public void sendEmployeeToQueue(final Employee employee) {  
System.out.println("Producer sends " + employee);
ObjectMapper objectMapper = new ObjectMapper();
String strEmployee = null;
try {
strEmployee = objectMapper.writeValueAsString(employee);
} catch (JsonGenerationException e) {
e.printStackTrace();
} catch (JsonMappingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
final String employeeMsg = strEmployee;
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session)
throws JMSException {
return session.createTextMessage(employeeMsg);
}
});
}
getNewEmployeeMsg method of EmployeeMsgConsumer,
 public Employee getNewEmployeeMsg(){  
TextMessage textMessage = (TextMessage) jmsTemplate
.receive(destination);
String employeeMsg = null;
try {
employeeMsg = textMessage.getText();
} catch (JMSException e1) {
e1.printStackTrace();
}
ObjectMapper objectMapper = new ObjectMapper();
Employee employee = null;
try {
employee = objectMapper.readValue(employeeMsg, Employee.class);
} catch (JsonParseException e) {
e.printStackTrace();
} catch (JsonMappingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return employee;
}

MessageConverter:
                      If we want to keep this message conversion separated, we can use MessageConverter. For this, Create a class which implements MessageConverter and implements the methods fromMessage and toMessage.

 public class EmployeeConverter implements MessageConverter{  
@Override
public Object fromMessage(Message arg0) throws JMSException,
MessageConversionException {
Employee employee = null;
ObjectMapper objectMapper = new ObjectMapper();
ActiveMQTextMessage activeMQTxtMsg = (ActiveMQTextMessage) arg0;
try {
String employeeMsg = activeMQTxtMsg.getText();
employee = objectMapper.readValue(employeeMsg, Employee.class);
} catch (JMSException | IOException e) {
e.printStackTrace();
}
return employee;
}
@Override
public Message toMessage(Object arg0, Session arg1) throws JMSException,
MessageConversionException {
Employee employee = (Employee) arg0;
ObjectMapper objectMapper = new ObjectMapper();
String strEmployee = null;
try {
strEmployee = objectMapper.writeValueAsString(employee);
} catch (JsonGenerationException e) {
e.printStackTrace();
} catch (JsonMappingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
Message message = arg1.createTextMessage(strEmployee);
return message;
}
}

Now we don’t need to convert the employee object before sending and after receiving. Instead we can directly send the Employee object by convertAndSend method and receive employee object by receiveAndConvert method.Spring will take of converting before sending and after receiving the message by calling the above methods in MessageConverted class. Please see the below code snippets,

convertAndSendEmployee method of EmployeeMsgSender. We can call this method instead of calling the sendEmployeeToQueue method.

 public void convertAndSendEmployee(final Employee employee) {  
jmsTemplate.convertAndSend(destination,employee);
}

receiveAndConvertEmployee method of EmployeeMsgConsumer. We can use this method instead of the getNewEmployeeMsg method.
 public Employee receiveAndConvertEmployee(){  
Employee employee = (Employee) jmsTemplate
.receiveAndConvert(destination);
return employee;
}

Please Click here to download this example.

Asynchronous Message Reception:

Create a class which implements MessageListener interface and it’s onMessage method.
 public class EmployeeMessageListener implements MessageListener {  
@Override
public void onMessage(Message arg0) {
Employee employee = null;
ObjectMapper objectMapper = new ObjectMapper();
ActiveMQTextMessage activeMQTxtMsg = (ActiveMQTextMessage) arg0;
try {
String employeeMsg = activeMQTxtMsg.getText();
employee = objectMapper.readValue(employeeMsg, Employee.class);
} catch (JMSException | IOException e) {
e.printStackTrace();
}
System.out.println("Received Employee Object is : "+employee);
}
}
Create a message listener container and add this above message listener class as a reference. We also need to pass the connection factory reference and the queue name. Please see the below application context for the better understanding,
      <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="employeeMessageListener" class="com.employee.listener.EmployeeMessageListener"/>
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destinationName" value="employee" />
<property name="messageListener" ref="employeeMessageListener" />
</bean>

When we start your application by instantiating ApplicationContext, this’ll start listening to the queue names employee for the incoming message. We can test this by sending message to this queue. The onMessage method will be called when the queue received any message.

Please Click here to download this example.

Please have a look at the message converter, before closing the JMS queue exploration.

Published