- Message Sending to a queue.
- Message Reception from a queue.
- Synchronous Message Reception.
- Asynchronous Message Reception.
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="messageDestination" class="org.apache.activemq.command.ActiveMQQueue" >
<constructor-arg value="employee" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="receiveTimeout" value="10000" />
</bean>
<bean id="employeeMsgSender" class="com.employee.proxy.EmployeeMsgSender">
<property name="destination" ref="messageDestination" />
<property name="jmsTemplate" ref="jmsTemplate" />
</bean>
<bean id="employeeMsgConsumer" class="com.employee.proxy.EmployeeMsgConsumer">
<property name="destination" ref="messageDestination" />
<property name="jmsTemplate" ref="jmsTemplate" />
</bean>
public void sendEmployeeToQueue(final Employee employee) {
System.out.println("Producer sends " + employee);
ObjectMapper objectMapper = new ObjectMapper();
String strEmployee = null;
try {
strEmployee = objectMapper.writeValueAsString(employee);
} catch (JsonGenerationException e) {
e.printStackTrace();
} catch (JsonMappingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
final String employeeMsg = strEmployee;
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session)
throws JMSException {
return session.createTextMessage(employeeMsg);
}
});
}
public Employee getNewEmployeeMsg(){
TextMessage textMessage = (TextMessage) jmsTemplate
.receive(destination);
String employeeMsg = null;
try {
employeeMsg = textMessage.getText();
} catch (JMSException e1) {
e1.printStackTrace();
}
ObjectMapper objectMapper = new ObjectMapper();
Employee employee = null;
try {
employee = objectMapper.readValue(employeeMsg, Employee.class);
} catch (JsonParseException e) {
e.printStackTrace();
} catch (JsonMappingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return employee;
}
MessageConverter:
If we want to keep this message conversion separated, we can use MessageConverter. For this, Create a class which implements MessageConverter and implements the methods fromMessage and toMessage.
public class EmployeeConverter implements MessageConverter{
@Override
public Object fromMessage(Message arg0) throws JMSException,
MessageConversionException {
Employee employee = null;
ObjectMapper objectMapper = new ObjectMapper();
ActiveMQTextMessage activeMQTxtMsg = (ActiveMQTextMessage) arg0;
try {
String employeeMsg = activeMQTxtMsg.getText();
employee = objectMapper.readValue(employeeMsg, Employee.class);
} catch (JMSException | IOException e) {
e.printStackTrace();
}
return employee;
}
@Override
public Message toMessage(Object arg0, Session arg1) throws JMSException,
MessageConversionException {
Employee employee = (Employee) arg0;
ObjectMapper objectMapper = new ObjectMapper();
String strEmployee = null;
try {
strEmployee = objectMapper.writeValueAsString(employee);
} catch (JsonGenerationException e) {
e.printStackTrace();
} catch (JsonMappingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
Message message = arg1.createTextMessage(strEmployee);
return message;
}
}
Now we don’t need to convert the employee object before sending and after receiving. Instead we can directly send the Employee object by convertAndSend method and receive employee object by receiveAndConvert method.Spring will take of converting before sending and after receiving the message by calling the above methods in MessageConverted class. Please see the below code snippets,
convertAndSendEmployee method of EmployeeMsgSender. We can call this method instead of calling the sendEmployeeToQueue method.
public void convertAndSendEmployee(final Employee employee) {
jmsTemplate.convertAndSend(destination,employee);
}
public Employee receiveAndConvertEmployee(){
Employee employee = (Employee) jmsTemplate
.receiveAndConvert(destination);
return employee;
}
Please Click here to download this example.
public class EmployeeMessageListener implements MessageListener {
@Override
public void onMessage(Message arg0) {
Employee employee = null;
ObjectMapper objectMapper = new ObjectMapper();
ActiveMQTextMessage activeMQTxtMsg = (ActiveMQTextMessage) arg0;
try {
String employeeMsg = activeMQTxtMsg.getText();
employee = objectMapper.readValue(employeeMsg, Employee.class);
} catch (JMSException | IOException e) {
e.printStackTrace();
}
System.out.println("Received Employee Object is : "+employee);
}
}
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="employeeMessageListener" class="com.employee.listener.EmployeeMessageListener"/>
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destinationName" value="employee" />
<property name="messageListener" ref="employeeMessageListener" />
</bean>
When we start your application by instantiating ApplicationContext, this’ll start listening to the queue names employee for the incoming message. We can test this by sending message to this queue. The onMessage method will be called when the queue received any message.
Please Click here to download this example.
Please have a look at the message converter, before closing the JMS queue exploration.