A
point-to-point (PTP) product or application is built on the concept of message
queues, senders and receivers. Each message is addressed to a specific queue
and receiving clients extract messages from the queues established to hold
their messages. Queues retain all messages sent to them until the messages are
consumed. The following post introduces the basic concepts of JMS
point-to-point messaging and illustrates them with a code sample using ActiveMQ.
POINT-TO-POINT MESSAGING
PTP
messaging has the following characteristics:
- Each message
has only one consumer.
- A sender and a
receiver of a message have no timing dependencies. The
receiver can fetch the message whether or not it was running when the
client sent the message.
- The receiver acknowledges
the successful processing of a message.
ACTIVEMQ
EXAMPLE
Let's
illustrate the above characteristics by creating a message producer that sends
a message containing a first and last name to a queue. In turn a message
consumer will read the message and transform it into a greeting.
The Producer class
contains a constructor which creates a message producer and needed connection
and session objects. The sendName() operation takes as input a first
and last name which are set on a TextMessage which in turn is sent to
the queue set on the message producer.
public
class Producer {
private static final Logger LOGGER =
LoggerFactory
.getLogger(Producer.class);
private String clientId;
private Connection connection;
private Session session;
private MessageProducer messageProducer;
public void create(String clientId, String
queueName) throws JMSException {
this.clientId = clientId;
// create a Connection Factory
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_BROKER_URL);
// create a Connection
connection = connectionFactory.createConnection();
connection.setClientID(clientId);
// create a Session
session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// create the
Queue to which messages will be sent
Queue queue =
session.createQueue(queueName);
// create a MessageProducer for sending
messages
messageProducer =
session.createProducer(queue);
}
public void closeConnection() throws
JMSException {
connection.close();
}
public void sendName(String firstName,
String lastName) throws JMSException {
String text = firstName + " "
+ lastName;
// create a JMS TextMessage
TextMessage textMessage =
session.createTextMessage(text);
// send the message to the queue
destination
messageProducer.send(textMessage);
LOGGER.debug(clientId + ": sent
message with text='{}'", text);
}
}
The Consumer class contains a constructor which creates a message consumer and needed connection and session objects.
The getGreeting() operation reads a message from the queue and creates a greeting which is returned. Aside from the timeout parameter an additional acknowledge parameter is passed which is used to determine whether the received message should be acknowledged or not.
public
class Consumer {
private static final Logger LOGGER =
LoggerFactory
.getLogger(Consumer.class);
private static String NO_GREETING =
"no greeting";
private String clientId;
private Connection connection;
private Session session;
private MessageConsumer messageConsumer;
public void create(String clientId, String
queueName) throws JMSException {
this.clientId = clientId;
// create a Connection Factory
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_BROKER_URL);
// create a Connection
connection =
connectionFactory.createConnection();
connection.setClientID(clientId);
// create a Session
session =
connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// create the Queue from which messages
will be received
Queue queue = session.createQueue(queueName);
// create a MessageConsumer for
receiving messages
messageConsumer =
session.createConsumer(queue);
// start the connection in order to
receive messages
connection.start();
}
public void closeConnection() throws
JMSException {
connection.close();
}
public String getGreeting(int timeout,
boolean acknowledge)
throws JMSException {
String greeting = NO_GREETING;
// read a message from the queue
destination
Message message =
messageConsumer.receive(timeout);
// check if a message was received
if (message != null) {
// cast the message to the correct
type
TextMessage textMessage =
(TextMessage) message;
// retrieve the message content
String text =
textMessage.getText();
LOGGER.debug(clientId + ":
received message with text='{}'", text);
if (acknowledge) {
// acknowledge the successful
processing of the message
message.acknowledge();
LOGGER.debug(clientId + ":
message acknowledged");
} else {
LOGGER.debug(clientId + ":
message not acknowledged");
}
// create greeting
greeting = "Hello " +
text + "!";
} else {
LOGGER.debug(clientId + ": no
message received");
}
LOGGER.info("greeting={}",
greeting);
return greeting;
}
}
No comments:
Post a Comment