Introduction
In modern applications, building a data pipeline with Hazelcast is important and data pipelines are one of the most critical systems in the whole applications architecture. Data pipelines are responsible for processing and transforming raw data into a format that can be used for analysis.
However, it is not that simple to build a useful data pipeline. It is very challenging. There are many challenges like this:
- Scalability: A very high volume of data can be processed, so the system has to be scaled.
- Latency: Lower latency is important for real-time processing.
- Reliability — It is critical to prevent data inaccessibility or data loss.
This is exactly where Hazelcast becomes a disruptive tool. Hazelcast is a high-speed, low-latency distributed in-memory computing platform. It is because of this architecture that this framework is designed for stream processing in real-time and can be easily vertical as well as horizontal in nature.

In building a data pipeline with Hazelcast article, we are going to build a real-time data pipeline using Hazelcast. Overall this pipeline will streamline the data process and prepare it for some business-critical use cases.
Understanding more details about Hazelcast and what does it do!
Understanding Hazelcast
Hazelcast provides in-memory data grid and real-time processing and stream the data. Hazelcast as data platform designed for processing the data fast and scalable, which is necessary in today’s enterprise application
Key Features of Hazelcast:
1. In-memory Data Processing: Hazelcast stores data in memory into distributed nodes instead of disk, therefore read and write operations are very fast.
2. Distributed Architecture: Hazelcast uses the clustering approach to store data, where one or more nodes are connected as members. That means the system will remain operational if a single node fails.
3. Stream Processing (Hazelcast Jet): Hazelcast Jet is a built-in stream processing engine that real-time data streaming. It simplifies complex data workflows by using Directed Acyclic Graphs (DAGs).
4. Scalability and Fault Tolerance: Hazelcast storage is easily scaled up when data volume increases and automatically fault tolerance is configured in case of failure.
Hazelcast Use Cases
Hazelcast can be used in many different scenarios, such as:
- Real-Time Analytics: for log analysis and event detection.
- Caching: boost the performance of slow databases.
- Distributed Messaging: in high-throughput messaging systems.
- Machine Learning Pipelines: for the real-time data enrichment and ML model integration.
Now we will see how Hazelcast can be made the core of a data pipeline. In the next sections we will cover setup and implementation details.
Designing a Data Pipeline
In building a data pipeline with Hazelcast, Hazelcast data pipeline is an asynchronous streaming process, similar to Java Streaming which was introduced in the Java8 release. To design a data pipeline, you need to do much analysis and planning about the different operations and phases of the pipeline. The main goal of the pipeline is to convert the data from raw to meaningful and usable format.
In this section, we will explore the core components of Pipeline and designing and building a data pipeline with Hazelcast.
Core Components of a Data Pipeline
1. Data Ingestion
Hazelcast provides in-built features to integrate tools like Kafka, MQTT, aur REST APIs. These are the source of data to Hazelcast Pipeline to process the real-time data stream.
2. Data Transformation and Process Efficiently
- Hazelcast Jet uses the Directed Acyclic Graph(DAG) model that processes the data in parallel and distributed manner.
- You can use the Hazelcast SQL for data transformation and aggregation easily into required format.
3. Data Storage and Persistence
- Hazelcast provides in-memory and low latency distributed data storage
- Hazelcast ensures the data replication and fault tolerance, to avoid the data loss incase of failure.
4. Data Analysis and Consumption
- Processed data can be shared via event driven systems or by using API.
- Hazelcast provides the integration tools and libraries to Sink the processed data.
Example Data Pipeline Design
Let’s take the example of Real-Time Weather Monitoring System:
- Data Ingestion: IoT devices collect the weather data and push it into Kafka topics.
- Data Transformation: Hazelcast Jet filter and aggregate the data and calculate the average temperature.
- Data Storage: Persist the processed data into Hazelcast map and data store.
- Data Consumption: Last stage of data pipeline visualization or Sink the data real-time update visualization into Grafana dashboard.

You can customize the Hazelcast data pipeline as required, in the next section we will explore the programing example of log monitoring process.
Build a Hazelcast Pipeline: Real-time Log Monitoring
Another example to use Hazelcast data pipeline is real-time log monitoring. Lets explore the Hazelcast data pipeline integration and process the data step-by-step.
1. Setting up Hazelcast
Add maven dependency on your project pom.xml or Gradle file.
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
<version>5.5.2</version> <!--Add the latest version-->
</dependency>
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-jet</artifactId>
<version>4.5.4</version>
</dependency>
Gradle file dependency
implementation 'com.hazelcast:hazelcast:5.5.0'
implementation 'com.hazelcast:hazelcast:4.5.4'
Explore more about Hazelcast setup on below articles:
- Understanding of Hazelcast QueueStore
- Understanding of Hazelcast MapLoader
- Understanding of Hazelcast MapStore
- How to Start Hazelcast Members and Client?
2. Enable jet and start Hazelcast instance
There are multiple other ways to enable Jet Engine; here, we are using programming Config
. Then, start theHazelcast instance.
Config config = new Config();
config.getJetConfig().setEnabled(true); // Enable the Jet engine
// Step 1: Start Hazelcast Jet Instance
HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance(config);
JetService jetService = hazelcastInstance.getJet();
3. Create a Sample Data Source
Create a sample data with a list of logs, in our example, we have taken multiple log levels e.g. INFO, ERROR, etc. which can be ingested from Kafka, MQTT, or REST APIs.
private List<String> getLogData() {
return Arrays.asList(
"INFO - Service started",
"ERROR - Database connection failed",
"INFO - User login successful",
"ERROR - Timeout while calling API",
"ERROR - NullPointerException at line 45"
);
}
// Step 2: Create Data Source
List<String> inputData = hazelcastInstance.getList(LIST_NAME);
inputData.addAll(getLogData());
4. Define Data Processing Pipeline
- Create a sample pipeline
- Ingest data into the pipeline : get the data from list
Sources.list(LIST_NAME)
- Process data stream : filter the data which contains the
“ERROR”
- Sink the data for visualization : Put the data into
Sinks.map(MAP_NAME
private Pipeline createPipeline(List<String> inputData) {
System.out.println(inputData);
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(Sources.list(LIST_NAME))
.filter(line -> line.toString().contains("ERROR")) // Filter for ERROR logs
.map(line -> line.toString().split(" - ")[1]) // Extract the error message
.groupingKey(errorMessage -> errorMessage) // Group by error message
.aggregate(AggregateOperations.counting()) // Count occurrences
.writeTo(Sinks.map(MAP_NAME)); // Store in Hazelcast Map
return pipeline;
}
// Step 3: Define the Pipeline
Pipeline pipeline = createPipeline(inputData);
5. Execute the Pipeline
After the pipeline creation, add job into process and execute the pipeline by using join()
// Step 4: Execute the Pipeline
jetService.newJob(pipeline).join();
6. Print the output result
Visualize the data – here we are getting data from map printing it for demo.
/ Step 5: Fetch Results from Hazelcast Map
System.out.println("Processed Error Counts:");
hazelcastInstance.getMap(MAP_NAME).forEach((key, value) ->
System.out.println(key + " -> " + value));
Complete Sample Code
The complete code to filter the Error and data visualization for log processing is given below.
package com.javatecharc.demo.pipeline;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.JetService;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import java.util.Arrays;
import java.util.List;
public class HazelcastPipelineDemo {
private static final String LIST_NAME = "errorList";
private static final String MAP_NAME = "errorMap";
public static void main(String[] args) {
HazelcastPipelineDemo hazelcastPipelineDemo = new HazelcastPipelineDemo();
hazelcastPipelineDemo.processDataPipeline();
}
private void processDataPipeline() {
Config config = new Config();
config.getJetConfig().setEnabled(true); // Enable the Jet engine
// Step 1: Start Hazelcast Jet Instance
HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance(config);
JetService jetService = hazelcastInstance.getJet();
// Step 2: Create Data Source
List<String> inputData = hazelcastInstance.getList(LIST_NAME);
inputData.addAll(getLogData());
// Step 3: Define the Pipeline
Pipeline pipeline = createPipeline(inputData);
// Step 4: Execute the Pipeline
jetService.newJob(pipeline).join();
// Step 5: Fetch Results from Hazelcast Map
System.out.println("Processed Error Counts:");
hazelcastInstance.getMap(MAP_NAME).forEach((key, value) ->
System.out.println(key + " -> " + value));
//Step 6: Shutdown the hazelcast Instance
Hazelcast.shutdownAll();
}
private List<String> getLogData() {
return Arrays.asList(
"INFO - Service started",
"ERROR - Database connection failed",
"INFO - User login successful",
"ERROR - Timeout while calling API",
"ERROR - NullPointerException at line 45"
);
}
private Pipeline createPipeline(List<String> inputData) {
System.out.println(inputData);
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(Sources.list(LIST_NAME))
.filter(line -> line.toString().contains("ERROR")) // Filter for ERROR logs
.map(line -> line.toString().split(" - ")[1]) // Extract the error message
.groupingKey(errorMessage -> errorMessage) // Group by error message
.aggregate(AggregateOperations.counting()) // Count occurrences
.writeTo(Sinks.map(MAP_NAME)); // Store in Hazelcast Map
return pipeline;
}
}
Best Practices
Building a data pipeline with Hazelcast is one of the scalable and reliable pipelines to process data streaming. If you follow some best practices then it is more reliable and efficient. Some important tips to design the pipeline.
- Optimize the Hazelcast configuration: Configure the cluster size as per the workload, and use the portioning strategy to improve the performance and workload. Configure the backup data store to avoid the data loss in case of failure.
- Use Hazelcast Jet streaming processing: Use Directed Acyclic Graphs (DAGs) to optimize the fast and reliable processing, implement the streaming processing and aggregation.
- Monitor and Scale: We can integrate Prometheus and Grafana to monitor system metrics visualization.
- Ensure Fault Tolerance: Use job resilience checkpoints and data replication strategy.
- Optimize resource utilization: Use thread pool tuning and memory management to optimize resource utilization.
Conclusion
In this article, we learned the step-by-step guide to building a data pipeline with Hazelcast in an efficient and scalable manner. Hazelcast provides in-memory data storage and Jet features to process, stream, and visualize real-time data as needed.
The sample code is available over the GitHub.