JMS Client – To produce and consume messages in JMS Queue

In this article, we will code JMS client to send messages and another JMS client to consume messages from JMS Queue. We will use the same JMS queue created in the previous article

So, at the end of this article we will have two JMS client namely

  • JMS client to enqueue messages (JMS Producer)
  • JMS client to consume messages from queue (JMS Consumer)
  • Status of the JMS Queue at different instances

1. Assumptions :

Already there is one sample queue created in the Oracle Weblogic application server, we will enqueue and consume messages from this queue.

Details of the queue,

  • Queue Name: TestQueue
  • Queue JNDI Name: jms/test/TestQueue
  • ConnectionFactory JNDI Name: jms/test/TestConnectionFactory

Let’s get started with coding both JMS client for producing and consuming messages from queue

2. JMS Client – Produce/Consume messages in JMS Queue :

Step 1: Initial status of queue

Step 1.A: Click TestJMSModule from available options under JMS Module

1_JMS_Client_for_weblogic_queue_select_desired_jms_module

Step 1.B: Click TestQueue from available options under TestJMSModule to investigate status of this queue

2_JMS_Client_for_weblogic_queue_select_desired_jms_queue_from_jms_module

Step 1.C: Dashboard of the TestQueue

Click “Monitoring” tab to navigate to monitor the status of the TestQueue

3_JMS_Client_for_weblogic_queue_dashboard_for_testqueue

Step 1.D: Currently, there are no messages available in the TestQueue to be consumed

4_JMS_Client_for_weblogic_queue_initial_status_of_our_testqueue

Step 2: JMS Queue Producer client –> to send messages to TestQueue

JMSQueueProducer.java

package net.bench.resources.jms.queue.client;

import java.util.Hashtable;

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class JMSQueueProducer {

	// JNDI name for Weblogic, ConnectionFactory & Queue
	private static final String WEBLOGIC_JMS_URL = "t3://localhost:7101"; // Weblogic JMS URL
	public final static String WEBLOGIC_JNDI_FACTORY_NAME = "weblogic.jndi.WLInitialContextFactory"; // Weblogic JNDI
	private static final String CONNECTION_FACTORY_JNDI_NAME = "jms/test/TestConnFactory"; // Weblogic ConnectionFactory JNDI
	private static final String QUEUE_JNDI_NAME = "jms/test/TestQueue"; // Weblogic Queue JNDI

	// variables
	private Hashtable<String, String> wlsEnvParamHashTbl = null;
	private static InitialContext initialContext = null;
	private static QueueConnectionFactory queueConnectionFactory = null;
	private static QueueConnection queueConnection = null;
	private static QueueSession queueSession = null;
	private static Queue queue = null;
	private static QueueSender queueSender = null;
	private static TextMessage textMessage = null;

	/**
	 * default constructor
	 */
	public JMSQueueProducer() {

		wlsEnvParamHashTbl = new Hashtable<String, String>();
		wlsEnvParamHashTbl.put(Context.PROVIDER_URL, WEBLOGIC_JMS_URL); // set Weblogic JMS URL
		wlsEnvParamHashTbl.put(Context.INITIAL_CONTEXT_FACTORY, WEBLOGIC_JNDI_FACTORY_NAME); // set Weblogic JNDI
		wlsEnvParamHashTbl.put(Context.SECURITY_PRINCIPAL, "weblogic"); // set Weblogic UserName
		wlsEnvParamHashTbl.put(Context.SECURITY_CREDENTIALS, "weblogic1"); // set Weblogic PassWord
	}

	/**
	 * This method initializes all necessary connection parameters for establishing JMS Queue communication
	 *
	 * @throws NamingException
	 * @throws JMSException
	 */
	public void initializeConnParams() throws NamingException, JMSException {

		initialContext = new InitialContext(wlsEnvParamHashTbl); // set InitialContext
		queueConnectionFactory = (QueueConnectionFactory) initialContext.lookup(CONNECTION_FACTORY_JNDI_NAME); // lookup using initial context
		queueConnection = queueConnectionFactory.createQueueConnection(); // create ConnectionFactory
		queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); // create QueueSession
		queue = (Queue) initialContext.lookup(QUEUE_JNDI_NAME); // lookup Queue JNDI using initial context created above
		queueSender = queueSession.createSender(queue); // create Sender using Queue JNDI as arguments
		textMessage = queueSession.createTextMessage(); // create TextMessage for Queue
		queueConnection.start(); // start Queue connection
	}

	/**
	 * This is the actual method to SEND messages to JMS Queues
	 *
	 * @param message
	 * @throws JMSException
	 */
	public void send(String message) throws JMSException {

		textMessage.setText(message);
		queueSender.send(textMessage);
	}

	/**
	 * This method closes all connections
	 *
	 * @throws JMSException
	 */
	public void closeConnParams() throws JMSException {

		queueSender.close();
		queueSession.close();
		queueConnection.close();
	}

	/**
	 * This is main() method to initiate all necessary calls
	 *
	 * @param args
	 */
	public static void main(String[] args) {

		// create an object to invoke initialize(), send() and close() methods
		JMSQueueProducer jmsQueueProducer = new JMSQueueProducer();
		String enqueueMessage = "JMS Queue testing";

		try {
			jmsQueueProducer.initializeConnParams(); // invokes initialize method to all necessary connections
			jmsQueueProducer.send(enqueueMessage); // invokes send method with actual message
			jmsQueueProducer.closeConnParams(); // invokes to close all connections

			System.out.println("JMSQueueProducer: Text message \"" + enqueueMessage + "\" enqueued to JMS Queue 'TestQueue' successfully ");
		}
		catch (NamingException nex) {
			nex.printStackTrace();
		}
		catch (JMSException jmsex) {
			jmsex.printStackTrace();
		}
	}
}

Output:

JMSQueueProducer: Text message "JMS Queue testing" enqueued to JMS Queue 'TestQueue' successfully

Step 3: After executing above JMS client for enqueueing messages into the JMS queue

Step 3.A: Current status of the TestQueue (Dashboard)

Currently, there is one message available to be consumed

5_JMS_Client_for_weblogic_queue_after_enqueqing_message_into_TestQueue

Step 3.B: Click “Show Messages” after selecting “TestJMSModule!TestQueue”–> where recently one  new messages has been enqueued using above JMS Java client

6_JMS_Client_for_weblogic_queue_click_show_messages_to_view_recent_msg

Step 3.C: After clicking “Show Messages” after selecting “TestJMSModule!TestQueue” –> Click on ID to view actual messages enqueued into the TestQueue

7_JMS_Client_for_weblogic_queue_click_on_ID

Actual message

8_JMS_Client_for_weblogic_queue_click_on_ID_to_view_actual_message

Next step is to consume above enqueued message from another JMS Java client

Step 4: JMS Queue Consumer client –> to consume messages from TestQueue

JMSQueueConsumer.java

package net.bench.resources.jms.queue.client;

import java.util.Hashtable;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class JMSQueueConsumer implements MessageListener {

	// JNDI name for Weblogic, ConnectionFactory & Queue
	private static final String WEBLOGIC_JMS_URL = "t3://localhost:7101"; // Weblogic JMS URL
	public final static String WEBLOGIC_JNDI_FACTORY_NAME = "weblogic.jndi.WLInitialContextFactory"; // Weblogic JNDI
	private static final String CONNECTION_FACTORY_JNDI_NAME = "jms/test/TestConnFactory"; // Weblogic ConnectionFactory JNDI
	private static final String QUEUE_JNDI_NAME = "jms/test/TestQueue"; // Weblogic Queue JNDI

	// variables
	private Hashtable<String, String> wlsEnvParamHashTbl = null;
	private static InitialContext initialContext = null;
	private static QueueConnectionFactory queueConnectionFactory = null;
	private static QueueConnection queueConnection = null;
	private static QueueSession queueSession = null;
	private static Queue queue = null;
	private static QueueReceiver queueReceiver = null;
	private static TextMessage textMessage = null;
	private boolean quit = false;

	/**
	 * default constructor
	 */
	public JMSQueueConsumer() {

		wlsEnvParamHashTbl = new Hashtable<String, String>();
		wlsEnvParamHashTbl.put(Context.PROVIDER_URL, WEBLOGIC_JMS_URL); // set Weblogic JMS URL
		wlsEnvParamHashTbl.put(Context.INITIAL_CONTEXT_FACTORY, WEBLOGIC_JNDI_FACTORY_NAME); // set Weblogic JNDI
		wlsEnvParamHashTbl.put(Context.SECURITY_PRINCIPAL, "weblogic"); // set Weblogic UserName
		wlsEnvParamHashTbl.put(Context.SECURITY_CREDENTIALS, "weblogic1"); // set Weblogic PassWord
	}

	/**
	 * This method initializes all necessary connection parameters for establishing JMS Queue communication
	 *
	 * @throws NamingException
	 * @throws JMSException
	 */
	public void initializeConnParams() throws NamingException, JMSException {

		initialContext = new InitialContext(wlsEnvParamHashTbl); // set InitialContext
		queueConnectionFactory = (QueueConnectionFactory) initialContext.lookup(CONNECTION_FACTORY_JNDI_NAME); // lookup using initial context
		queueConnection = queueConnectionFactory.createQueueConnection(); // create ConnectionFactory
		queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); // create QueueSession
		queue = (Queue) initialContext.lookup(QUEUE_JNDI_NAME); // lookup Queue JNDI using initial context created above
		queueReceiver = queueSession.createReceiver(queue); // create Receiver using Queue JNDI as arguments
		queueReceiver.setMessageListener(this); // set Message Listener
		queueConnection.start(); // start Queue connection
	}

	/**
	 * onMessage() listener from MessageListener class to read messages
	 */
	@Override
	public void onMessage(Message message) {

		String consumedMessagefromQueue = null;

		try {
			if (message instanceof TextMessage) { // It's TextMessage...

				textMessage = (TextMessage) message;
				consumedMessagefromQueue = textMessage.getText();
			}
			else { // If it is not a TextMessage...

				consumedMessagefromQueue = message.toString();
			}

			// finally print the message to the output console
			System.out.println("JMSQueueConsumer: Message consumed from JMS Queue 'TestQueue' is >>>" + consumedMessagefromQueue + "<<<");

			if (consumedMessagefromQueue.equalsIgnoreCase("quit")) {
				synchronized(this) {
					quit = true;
					this.notifyAll(); // Notify main thread to quit
				}
			}
		}
		catch (JMSException jmsex) {
			jmsex.printStackTrace();
		}
	}

	/**
	 * This method closes all connections
	 *
	 * @throws JMSException
	 */
	public void close() throws JMSException {

		queueReceiver.close();
		queueSession.close();
		queueConnection.close();
	}

	/**
	 * This is main() method to initiate all necessary calls
	 *
	 * @param args
	 */
	public static void main(String[] args)  throws Exception {

		// create an object to invoke initialize(), onMessage() and close() methods
		JMSQueueConsumer jmsQueueConsumer = new JMSQueueConsumer();

		try {
			// to invoke initialize() method to establish all necessary connections
			jmsQueueConsumer.initializeConnParams();

			//
			synchronized(jmsQueueConsumer) {
				while (! jmsQueueConsumer.quit) {
					try {
						jmsQueueConsumer.wait();
					} catch (InterruptedException ie) {}
				}
			}

			// to invoke close() method to close all established connections
			jmsQueueConsumer.close();
		}
		catch (NamingException nex) {
			nex.printStackTrace();
		}
		catch (JMSException jmsex) {
			jmsex.printStackTrace();
		}
	}
}

Output:

JMSQueueConsumer: Message consumed from JMS Queue 'TestQueue' is >>>JMS Queue testing<<<

Step 5: After executing above JMS client for consuming messages from JMS queue

One message is consumed as indicated in the below screen capture

9_JMS_Client_for_weblogic_queue_after_executing_JMS_Consumer_client

Finally, there are no new messages to be consumed

10_JMS_Client_for_weblogic_queue_after_executing_JMS_Consumer_client_status_of_queue

In the next article<link>, we will code Java client to publish messages to JMS Topic and consume messages by subscribing to JMS Topic

Related Articles :

References :

Happy Coding !!
Happy Learning !!

Oracle WebLogic - Steps to create Connection Factory and Topic
Oracle WebLogic - Steps to create Connection Factory and Queue