Understanding of Hazelcast QueueStore

Hazelcast is well known for its in-memory data and platform, which include various distributed data structures such as maps, sets, and queues. Among these, Hazelcast’s distributed queues offer a strong mechanism for managing data and messages across a cluster of nodes. To enhance the performance of these queues, Hazelcast provides an important interface called Hazelcast QueueStore, which integrates in-memory queues with persistent storage.

Hazelcast QueueStore

What is Hazelcast QueueStore?

Hazelcast QueueStore API is an interface that allows you to connect a Hazelcast distributed in-memory queue to an external store, such as a database or a file system. By using this, it can ensure that the messages in your queue are not lost even if cluster shuts down or restarts.

Key Features of QueueStore

  • Persistence: QueueStore provides a feature to persist the messages in to persistence layer and ensures that data or messages should not be loss if the cluster fails.
  • Durability:  As messages are stored on persistence storage, with QueueStore, you can rely on the durability of queue operations.
  • Customizable Storage Logic: QueueStore allows you to customize the implementation, the custom storage logic, allowing you to choose the best storage as needed, whether it’s a relational database, NoSQL database, or even a simple file system.

How Does QueueStore Work?

When you configure a Hazelcast queue with a QueueStore, the queue operations interact with the persistent storage as follows:

  • Removing Data: When an item is removed from the queue, the delete method of QueueStore ensures that the item is also removed from the persistent storage, maintaining consistency.
  • Storing Data: When an item is added to the queue, the store method of QueueStore is called to save this item in the persistent storage.
  • Loading Data: When the queue is accessed, the QueueStore can retrieve items from the persistent store using the load method. This ensures that even after a system restart, the queue can continue processing the remaining items.
IQueue<String> iQueue = hazelcastInstance.getQueue("employee_queue");
iQueue.add("Test");
iQueue.remove("Test");

Hazelcast QueueStore Method:

  • T load(Long key) – Loads the value from Datastore to Queue for an specific key .
  • Map<Long,T> loadAll(Collection<Long> keys) – Loads the given multiple keys.
  • Set<Long> loadAllKeys() – Loads all of the keys from the database store.
  • void store(Long key, T value) – Stores the key-value pair to the datastore.
  • void storeAll(Map<Long,T> map) – Stores the multiple entries.
  • void delete(Long key) – Deletes the entry with a given key from the store.
  • void deleteAll(Collection<Long> keys) – Deletes multiple entries from the store.

QueueStore configurations

<hazelcast>
    <queue name="myQueue">
        <queue-store enabled="true">
            <class-name>com.mycompany.MyQueueStore</class-name>
            <properties>
               <property name="binary">false</property>
               <property name="memory-limit">1000</property>
               <property name="bulk-load">500</property>
           </properties>
        </queue-store>
    </queue>
</hazelcast>

Custom Hazelcast QueueStore implementation

QueueStore allow the user to customize the implementation. You can create database connection to connect internal datasource.

@Slf4j
public class DemoQueueStore implements QueueStore&lt;Employee&gt; {
    ConnectionPool pool = new HikariDataSourcePool();

    /**
     * Employee Queue has 2 column empId as key of Q and Data into Json format
     * @param empId
     * @param employee
     */
    @Override
    public void store(Long empId, Employee employee) {
        String storeQuery = "INSERT INTO EMPLOYEE_UPDATED_QUEUE(EMPID, DATA) VALUES(?, to_json(?::jsonb)";
        try (Connection connection = pool.getConnection();
             PreparedStatement preparedStatement = connection.prepareStatement(storeQuery)) {
            preparedStatement.setInt(1, employee.getEmpId());
            preparedStatement.setString(2, new Gson().toJson(employee));

            preparedStatement.executeUpdate();

        } catch (Exception exception) {
            log.error("Exception : {}", exception);
            throw new DatabaseSQLException(exception.getMessage());
        }
    }

    @Override
    public void storeAll(Map&lt;Long, Employee&gt; map) {
    }

    @Override
    public void delete(Long aLong) {
    }

    @Override
    public void deleteAll(Collection&lt;Long&gt; collection) {
    }

    @Override
    public Employee load(Long aLong) {
        return null;
    }

    @Override
    public Map&lt;Long, Employee&gt; loadAll(Collection&lt;Long&gt; collection) {
        return Map.of();
    }

    @Override
    public Set&lt;Long&gt; loadAllKeys() {
        return Set.of();
    }
}

Conclusion

Hazelcast QueueStore is similar like MapStore a efficient tool that enhances the reliability and durability of distributed queues by integrating them with persistent storage. By implementing QueueStore, you can safeguard your application against data loss, ensuring that tasks and messages are preserved even in the event of failures. This makes QueueStore an essential feature for any distributed system that requires both the speed of in-memory processing and the safety of persistent storage.The code snippet available on github.

2 thoughts on “Understanding of Hazelcast QueueStore”

  1. Pingback: Hazelcast Hikari connection pooling

  2. Pingback: Understanding Hazelcast PN Counter: A Comprehensive Guide - Java Tech ARC 3i

Leave a Comment

Your email address will not be published. Required fields are marked *

Index
Scroll to Top