Message Driven Beans (MDB)

There are two types of communication

synchronous communication means that the invoker must wait for the target to complete the request before proceeding (its a bit like a phone conversation), asynchronous means that you don't have to wait around you are basically relying that the request will be completed at a later time (its like an answer phone, hopefully your message will be heard).

Message-Oriented Middleware (MOM) enables messaging in much the same way as an answer phone, it allows you send messages asynchronously. When message is sent, it stores the message and immediately and acknowledges the request, the message sender is known as the producer and the receiver is known as the consumer, the location were the message is sent is known as the destination.

There are a number of MOM products in the market, IBM WebSphere, ActiveMQ, Oracle Advanced Queuing.

MDBs can be used over slow network connections or even if servers go offline using a synchronous service would have a big impact on your service having to wait due to slow connections or overloaded servers, etc. As an example lets presume that ActionBazaar wanted to ship items directly from the warehouse, having made a deal with another company to perform this service. However the other companies servers are slow and occasionally go offline, having a synchronous communication with this company could cause lots of problems as our system would hang until the request would complete. We could make the process asynchronous (use a MOM server) thus when a message is sent to the other company and it responds that it received the message (the message my not have been fully processed yet), we are happy to continue knowing that at some point the message will be processed. The other big advantage is that the systems are loosely coupled, which means if the other are so bad we could switch companies easily.

There are two types of messaging models

Point-to-Point

A single message travels from a single producer to a single consumer, the PTP destinations are called queues. Messages are not delivered in any guaranteed order, if more that one receiver exists a random one will be chosen to receive the message. This type model suits the above example, we want to guarantee that the message is received once and only once.

Publish-Subscribe

This is similar to a newsgroup, a single producer produces a message that is received by any number of consumers who happen to be connected at the time. The destination in this model is called a topic and the consumer is called a subscriber.

This model is ideal for broadcasting information to the masses about hot topics like possible downtime, hot new shares, etc.

Java Messaging Service (JMS)

The JMS API is to messaging what the Java Database Connectivity (JDBC) API is to database access, it is a simple and small API. I will show you an example and then go into detail

JMS example (Producer) @Resource(name="jms/QueueConnectionFactory")
  private ConnectionFactory connectionFactory;

@Resource(name="jms/ShippingRequestQueue")
  private Destination destination;

Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

MessageProducer producer = session.createProducer(destination);

ObjectMessage message = session.createObjectMessage();
ShippingRequest shippingRequest = new ShippingRequest();
shippingRequest.setItem(item);
shippingRequest.setShippingAddress(address);
shippingRequest.setShippingMethod(method);
shippingRequest.setInsuranceAmount(amount);

message.setObject(shippingRequest);

producer.send(message);

session.close();
connection.close();

The first @Resource injection encapsulates all the configuration information needed to connect to the MOM, the second @Resource injects the queue to forward to. The connection object represents the live MOM connection, these are thread safe and designed to be shareable. The session provides a single-threaded, task oriented context for sending and receiving messages, the true option states that this should be transactional, the second option is only used if not using transactional messaging. A MessageProducer is required to send a message and is created by the session. We then create a message and then create the object that will be contained within the message. The producer then sends the message and lastly we tidy up.

A message is made up different parts

Message Headers

Headers are name-value pairs common to all messages. The header will contain to and from address, postage and postmark. there are other common headers

  • JMSCorrelationID
  • JMSReplyTo
  • JMSMessageID
Message Properties

Message properties are just like headers, but are not common across all messages and are created by the application. You could set a properties like

message.setBooleanProperty("Fragile", true);

A property can be a boolean, byte, double, float, int, long, short, String or an Object.

Message Body

This is the payload, in the example above we created a Object message but you can use either of the below

  • BytesMessage
  • MapMessage
  • StreamMessage
  • TextMessage

I have also discussed JMS in my JBoss section and have a detailed example.

MDB handle multithreading right out of the box, without any additional code, they are also pooled by the container. A MDB is used from the pool to handle the message, this increases performance. Most of the complexity is handle behind the scenes which means that code is easier to read.

MDB are just plain old java objects (POJOs) and have a set of simple rules

Now lets have a look at a consumer

JMS example (Consumer) package com.ejb3inaction.actionbazaar.buslogic;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.ejb.MessageDrivenContext;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.sql.DataSource;

@MessageDriven(
   name = "ShippingRequestProcessor",
   activationConfig = {
      @ActivationConfigProperty( propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
      @ActivationConfigProperty( propertyName = "destination", propertyValue ="jms/ShippingRequestQueue")
   }
)
public class ShippingRequestProcessorMDB implements MessageListener {

  private Connection connection;
  private DataSource dataSource;

  // used for the transaction rollback
  @Resource
  private MessageDrivenContext context;

  @Resource(name = "jdbc/TurtleDS", mappedName = "java:/DefaultDS")
  public void setDataSource(DataSource dataSource) {
    this.dataSource = dataSource;
  }

  @PostConstruct
  public void initialize() {
    try {
       connection = dataSource.getConnection();
    } catch (SQLException sqle) {
         sqle.printStackTrace();
    }
  }

  @PreDestroy
  public void cleanup() {
    try {
      connection.close();
      connection = null;
    } catch (SQLException sqle) {
        sqle.printStackTrace();
    }
  }

  public void onMessage(Message message) {
    try {
      ObjectMessage objectMessage = (ObjectMessage) message;
      ShippingRequest shippingRequest = (ShippingRequest) objectMessage.getObject();
      processShippingRequest(shippingRequest);
      System.out.println("Shipping request processed.");
    } catch (JMSException jmse) {
        jmse.printStackTrace();
        context.setRollbackOnly();
    } catch (SQLException sqle) {
        sqle.printStackTrace();
        context.setRollbackOnly();
    }
  }

  // This method would use JPA in the real world to persist the data   
  private void processShippingRequest(ShippingRequest request) throws SQLException {
    Statement statement = connection.createStatement();
    String sql = "INSERT INTO " + "SHIPPING_REQUESTS(" + "ITEM, "
                 + "SHIPPING_ADDRESS, " + "SHIPPING_METHOD, "
                 + "INSURANCE_AMOUNT) " + "VALUES ( " + request.getItem() + ", "
                 + "'" + request.getShippingAddress() + "', " + "' "
                 + request.getShippingMethod() + "', "
                 + request.getInsuranceAmount() + ")";
    System.out.println(sql);
    statement.execute(sql);
  }
}

The code above is pretty simple, the @MessageDriven annotation identifies this object as an MDB configuration. The listener will invoke the OnMessage method which calls the processShippingRequest method, I have included the code for a database connection, you will probably use JPA to persist the data to a database in the real world.

MDB Annotations

MDB support the smallest number of annotations, in fact @MessageDriven and @ActivationConfigProperty are the only two MDB specific annotations. The @MessageDriven annotation is defined as the following

@MessageDriven @Target(TYPE)
@Retention(RUNTIME)
public @interface MessageDriven {
  String name() Default "";
  Class messageListenerInterface default Object.class;
  ActivationConfigProperty[] activationConfig() default {};
  String mappedNamed();
  String description();
}

The name parameter specifies the name of the MDB, the messageListenerInterface parameter specifies which message listener the MDB implements and the last parameter is used to specify listener-specific configuration properties. The container uses the listener interface to register the MDB with the message provider and to pass incoming messages by invoking the implemented message listener methods. You have three options to implement the listener, the third option is use deployment descriptors

option one @MessageDriven (
   name="shippingRequestJMSProcessor",
   messageListenerInterface="javax.jms.MessageListener"
)
public class ShippingRequestProcessorMDB {
option two

...
import javax.jms.MessageListener;
...


public class ShippingRequestProcessorMDB implements MessageListener {

The activationConfig property annotation allows you to provide messaging system-specific configuration through an array of ActivationConfigProperty, it is defined as

@ActivationConfigProperty public @interface ActivationConfigProperty {
   String propertyName();
   String propertyValue();
}

Each activation property is a name-value pair that the underlying message provider understands, the three most common used JMS parameters are

Messages are not normally removed from the queue until the consumer acknowledges them, there are a number of acknowledge modes

Acknowledge Mode Description
Support by MDB
Auto-acknowledge The session automatically acknowledges receipt after a message has been received or is successfully processed, this is the default option if not specified
Yes
Client-acknowledge You have to manually acknowledge the receipt of the message by calling the acknowledge method on the message
No
Dups-ok-acknowledge The session can lazily acknowledge receipt of the message, this is similar to Auto-acknowledge but useful when the application can handle delivery of duplicate messages and rigorous acknowledgement is not a requirement
Yes
SESSION_TRANSACTED This is returned for transacted sessions if the Session.getAcknowledgeMode method is invoked
No

You can make sure that all consumers receive a message by making the message durable, if the durable subscriber is not connected to a topic when a message is received the MOM retains a copy until the subscriber connects and delivers the message.

durable subscription MessageConsumer matchFishingSubscription = session.createDurableSubscriber( matchFishingTopic, "PaulValle");

session.unsubscribe("PaulValle");
make the MDB to be a durable subscriber  activationConfig = {
      @ActivationConfigProperty( propertyName = "destinationType", propertyValue = "javax.jms.Topic"),
      @ActivationConfigProperty( propertyName = "destination", propertyValue ="jms/ShippingRequestTopic")
      @ActivationConfigProperty( propertyName = "subscriptionDurability", propertyValue ="Durable")
   }

In the above example we are creating a durable subscription message consumer to the javax.jms.Topic matchFishingTopic with a subscription ID of "PaulValle", all messages from now on will be held until a consumer with the subscription ID of "PaulValle" receives them, you can use unsubscribe to remove the durability, by the way the default is NonDurable.

You can filter messages using the messageSelector property, using the headers and properties of messages you can specify which messages you want to receive

messageSelector

MessageConsumer consumer = session createConsumer(destination, "Fragile IS TRUE");

or

activationConfig = {
      @ActivationConfigProperty( propertyName = "destinationType", propertyValue = "javax.jms.Topic"),
      @ActivationConfigProperty( propertyName = "destination", propertyValue ="jms/ShippingRequestTopic")
      @ActivationConfigProperty( propertyName = "messageSelector", propertyValue ="fragile IS TRUE")
   }

The most common message selector types are below

Type Description Example
Literals Strings, exact or approximate numeric values or boolean's BidManagerMDB
100
TRUE
Identifiers a message property or header name, case sensitive RECIPIENT
NumOfBids
Fragile
JMSTimestamp
Whitespace Same as defined in the Java language specification: space, tab, form feed, line terminator n/a
Comparison Operators Operators such as: =, >=, <=, <> RECIPIENT='BidManagerMDB'
NumOfBids>=100
Logical Operators The three types: NOT, OR, AND RECIPIENT='BidManagerMDB' AND
NumOfBids>=100
Null Comparison IS NULL and IS NOT NULL FirstName IS NOT NULL
True/False comparison IS [NOT] TRUE and IS [NOT] FALSE Fragile IS TRUE
Fragile IS FALSE

MDB Lifecycle

The container will be responsible for the following

The MDBs can be in one of three states: does not exist, idle and busy

MDBs have two lifecycle callbacks, like session beans these can be used for allocating and releasing injected resources that are used by the onMessage method, I had examples of these callbacks in the example above

MDB Best Practices

MDBs do have some pitfalls and some best practices to watch out for, here are a few tips