PubSub Messaging with Spring Data Redis

1. Overview

In this second article from the series exploring Spring Data Redis, we’ll have a look at the pub/sub message queues.

In Redis, publishers are not programmed to send their messages to specific subscribers. Rather, published messages are characterized into channels, without knowledge of what (if any) subscribers there may be.

Similarly, subscribers express interest in one or more topics and only receive messages that are of interest, without knowledge of what (if any) publishers there are.

This decoupling of publishers and subscribers can allow for greater scalability and a more dynamic network topology.

2. Redis Configuration

Let’s start adding the configuration which is required for the message queues.

First, we’ll define a MessageListenerAdapter bean which contains a custom implementation of the MessageListener interface called RedisMessageSubscriber. This bean acts as a subscriber in the pub-sub messaging model:

@Bean
MessageListenerAdapter messageListener() {
    return new MessageListenerAdapter(new RedisMessageSubscriber());
}

RedisMessageListenerContainer is a class provided by Spring Data Redis which provides asynchronous behavior for Redis message listeners. This is called internally and, according to the Spring Data Redis documentation – “handles the low level details of listening, converting and message dispatching.”

@Bean
RedisMessageListenerContainer redisContainer() {
    RedisMessageListenerContainer container
      = new RedisMessageListenerContainer();
    container.setConnectionFactory(jedisConnectionFactory());
    container.addMessageListener(messageListener(), topic());
    return container;
}

We will also create a bean using a custom-built MessagePublisher interface and a RedisMessagePublisher implementation. This way, we can have a generic message-publishing API, and have the Redis implementation take a redisTemplate and topic as constructor arguments:

@Bean
MessagePublisher redisPublisher() {
    return new RedisMessagePublisher(redisTemplate(), topic());
}

Finally, we’ll set up a topic to which the publisher will send messages, and the subscriber will receive them:

@Bean
ChannelTopic topic() {
    return new ChannelTopic("messageQueue");
}

3. Publishing Messages


==== 3.1. Defining the MessagePublisher Interface

Spring Data Redis does not provide a MessagePublisher interface to be used for message distribution. We can define a custom interface which will use redisTemplate in implementation:

public interface MessagePublisher {
    void publish(String message);
}

3.2. RedisMessagePublisher Implementation

Our next step is to provide an implementation of the MessagePublisher interface, adding message publishing details and using the functions in redisTemplate.

The template contains a very rich set of functions for wide range of operations – out of which convertAndSend is capable of sending a message to a queue through a topic:

public class RedisMessagePublisher implements MessagePublisher {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    @Autowired
    private ChannelTopic topic;

    public RedisMessagePublisher() {
    }

    public RedisMessagePublisher(
      RedisTemplate<String, Object> redisTemplate, ChannelTopic topic) {
      this.redisTemplate = redisTemplate;
      this.topic = topic;
    }

    public void publish(String message) {
        redisTemplate.convertAndSend(topic.getTopic(), message);
    }
}

As you can see, the publisher implementation is straightforward. It uses the convertAndSend() method of the redisTemplate to format and publish the given message to the configured topic.

A topic implements publish and subscribe semantics: when a message is published, it goes to all the subscribers who are registered to listen on that topic.

4. Subscribing to Messages

RedisMessageSubscriber implements the Spring Data Redis-provided MessageListener interface:

@Service
public class RedisMessageSubscriber implements MessageListener {

    public static List<String> messageList = new ArrayList<String>();

    public void onMessage(Message message, byte[] pattern) {
        messageList.add(message.toString());
        System.out.println("Message received: " + message.toString());
    }
}

Note that there is a second parameter called pattern, which we have not used in this example. The Spring Data Redis documentation states that this parameter represents the, “pattern matching the channel (if specified)”, but that it can be null.

5. Sending and Receiving Messages

Now we’ll put it all together. Let’s create a message and then publish it using the RedisMessagePublisher:

String message = "Message " + UUID.randomUUID();
redisMessagePublisher.publish(message);

When we call publish(message), the content is sent to Redis, where it is routed to the message queue topic defined in our publisher. Then it is distributed to the subscribers of that topic.

You may already have noticed that RedisMessageSubscriber is a listener, which registers itself to the queue for retrieval of messages.

On the arrival of the message, the subscriber’s onMessage() method defined triggered.

In our example, we can verify that we’ve received messages that have been published by checking the messageList in our RedisMessageSubscriber:

RedisMessageSubscriber.messageList.get(0).contains(message)

6. Conclusion

In this article, we examined a pub/sub message queue implementation using Spring Data Redis.

The implementation of the above example can be found in a GitHub project.

Leave a Reply

Your email address will not be published.