Hazelcast is well known for its in-memory data and platform, which include various distributed data structures such as maps, sets, and queues. Among these, Hazelcast’s distributed queues offer a strong mechanism for managing data and messages across a cluster of nodes. To enhance the performance of these queues, Hazelcast provides an important interface called Hazelcast QueueStore
, which integrates in-memory queues with persistent storage.
What is Hazelcast QueueStore?
Hazelcast QueueStore
API is an interface that allows you to connect a Hazelcast distributed in-memory queue to an external store, such as a database or a file system. By using this, it can ensure that the messages in your queue are not lost even if cluster shuts down or restarts.
Key Features of QueueStore
- Persistence:
QueueStore
provides a feature to persist the messages in to persistence layer and ensures that data or messages should not be loss if the cluster fails. - Durability: As messages are stored on persistence storage, with
QueueStore
, you can rely on the durability of queue operations. - Customizable Storage Logic: QueueStore allows you to customize the implementation, the custom storage logic, allowing you to choose the best storage as needed, whether it’s a relational database, NoSQL database, or even a simple file system.
How Does QueueStore Work?
When you configure a Hazelcast queue with a QueueStore
, the queue operations interact with the persistent storage as follows:
- Removing Data: When an item is removed from the queue, the
delete
method ofQueueStore
ensures that the item is also removed from the persistent storage, maintaining consistency.
- Storing Data: When an item is added to the queue, the
store
method ofQueueStore
is called to save this item in the persistent storage.
- Loading Data: When the queue is accessed, the
QueueStore
can retrieve items from the persistent store using theload
method. This ensures that even after a system restart, the queue can continue processing the remaining items.
IQueue<String> iQueue = hazelcastInstance.getQueue("employee_queue");
iQueue.add("Test");
iQueue.remove("Test");
Hazelcast QueueStore Method:
- T load(Long key) – Loads the value from Datastore to Queue for an specific key .
- Map<Long,T> loadAll(Collection<Long> keys) – Loads the given multiple keys.
- Set<Long> loadAllKeys() – Loads all of the keys from the database store.
- void store(Long key, T value) – Stores the key-value pair to the datastore.
- void storeAll(Map<Long,T> map) – Stores the multiple entries.
- void delete(Long key) – Deletes the entry with a given key from the store.
- void deleteAll(Collection<Long> keys) – Deletes multiple entries from the store.
QueueStore configurations
<hazelcast>
<queue name="myQueue">
<queue-store enabled="true">
<class-name>com.mycompany.MyQueueStore</class-name>
<properties>
<property name="binary">false</property>
<property name="memory-limit">1000</property>
<property name="bulk-load">500</property>
</properties>
</queue-store>
</queue>
</hazelcast>
Custom Hazelcast QueueStore implementation
QueueStore allow the user to customize the implementation. You can create database connection to connect internal datasource.
@Slf4j
public class DemoQueueStore implements QueueStore<Employee> {
ConnectionPool pool = new HikariDataSourcePool();
/**
* Employee Queue has 2 column empId as key of Q and Data into Json format
* @param empId
* @param employee
*/
@Override
public void store(Long empId, Employee employee) {
String storeQuery = "INSERT INTO EMPLOYEE_UPDATED_QUEUE(EMPID, DATA) VALUES(?, to_json(?::jsonb)";
try (Connection connection = pool.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(storeQuery)) {
preparedStatement.setInt(1, employee.getEmpId());
preparedStatement.setString(2, new Gson().toJson(employee));
preparedStatement.executeUpdate();
} catch (Exception exception) {
log.error("Exception : {}", exception);
throw new DatabaseSQLException(exception.getMessage());
}
}
@Override
public void storeAll(Map<Long, Employee> map) {
}
@Override
public void delete(Long aLong) {
}
@Override
public void deleteAll(Collection<Long> collection) {
}
@Override
public Employee load(Long aLong) {
return null;
}
@Override
public Map<Long, Employee> loadAll(Collection<Long> collection) {
return Map.of();
}
@Override
public Set<Long> loadAllKeys() {
return Set.of();
}
}
Conclusion
Hazelcast QueueStore
 is similar like MapStore a efficient tool that enhances the reliability and durability of distributed queues by integrating them with persistent storage. By implementing QueueStore
, you can safeguard your application against data loss, ensuring that tasks and messages are preserved even in the event of failures. This makes QueueStore
 an essential feature for any distributed system that requires both the speed of in-memory processing and the safety of persistent storage.The code snippet available on github.
Pingback: Hazelcast Hikari connection pooling
Pingback: Understanding Hazelcast PN Counter: A Comprehensive Guide - Java Tech ARC 3i