gRPC Bidirectional Streaming in Java: A Complete Guide

Overview

gRPC is a high-performance RPC framework developed by Google, enabling efficient communication between services in different programming languages. One of its standout features is bidirectional streaming, allowing both the client and server to send a sequence of messages to each other. This guide will help you set up and implement gRPC bidirectional streaming in Java.

In gRPC bidirectional streaming in java, both the client and server can send and receive streams of messages simultaneously. This pattern allows for full-duplex communication, making it perfect for use cases such as Messaging applications, real-time collaborative systems, or live video streaming where both parties continuously send and receive data.

gRPC Bidirectional Streaming in Java

In this article, we will explore how to implement a gRPC bidirectional streaming service in Java with an example.

How Bidirectional Streaming Works

  1. The client and server establish a stream.
  2. Both client and server send messages on the stream independently.
  3. The stream remains open, allowing both parties to send multiple messages until one side closes the connection.

Pre-Requisites

Example Use Case

Consider a real-time Messaging application where the client and server continuously exchange messages. Each client can send and receive messages, and both streams remain active during the conversation.

Steps to Implement gRPC Bidirectional Streaming in Java

1. Create Maven Project in IDE

Create Maven project using IDE or Maven command

Maven Project Structure

2. Maven Dependency Management

Include the necessary gRPC and protobuf dependencies on your pom.xml or build.gradle

<dependencies>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-netty</artifactId>
        <version>1.50.0</version>
    </dependency>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-protobuf</artifactId>
        <version>1.50.0</version>
    </dependency>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-stub</artifactId>
        <version>1.50.0</version>
    </dependency>
    <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-java</artifactId>
        <version>3.21.9</version>
    </dependency>
</dependencies>

3. Define the gRPC Service in Protobuf

Here’s a messaging.proto file for a bidirectional Messaging Service:

syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.javatecharc.grpc.proto.message";
option java_outer_classname = "MessagingProto";
option optimize_for = SPEED;

service MessagingService {
    rpc SendMessage(stream SampleMessage) returns (stream SampleMessage);
}

message SampleMessage {
    string sender = 1;
    string message = 2;
    string timestamp = 3;
}

The sendMessage RPC method allows both client and server to exchange streams of SampleMessage objects simultaneously.

4. Generate Java Code

Use the protoc compiler to generate Java code from the protobuf file, or use protobuf-maven-plugin in pom.xml

protoc --java_out=src/main/java --grpc-java_out=src/main/java src/main/proto/messaging.proto

This will generate the necessary Java classes for your gRPC service.

5. Implement the gRPC Server

Next, implement the server that handles bidirectional streaming by receiving and sending messages over the stream.

package com.javatecharc.demo.grpc.bidirectional;

import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

public class MessagingServer {
    public static void main(String[] args) throws IOException, InterruptedException {
        Server server = ServerBuilder.forPort(15050)
                .addService(new MessagingService())
                .build();

        System.out.println("Starting server on port 15050...");
        server.start();
        server.awaitTermination();
    }
}

6. Implement the gRPC Service to Send the Message

Implement the sendMessage on service class to receive and send back to client

package com.javatecharc.demo.grpc.bidirectional;

import com.javatecharc.grpc.proto.message.MessagingServiceGrpc;
import com.javatecharc.grpc.proto.message.SampleMessage;
import io.grpc.stub.StreamObserver;

import java.text.SimpleDateFormat;
import java.util.Date;

public class MessagingService extends MessagingServiceGrpc.MessagingServiceImplBase {
    @Override
    public StreamObserver<SampleMessage> sendMessage(StreamObserver<SampleMessage> responseObserver) {
        return new StreamObserver<>() {

            @Override
            public void onNext(SampleMessage clientMessage) {
                System.out.println("Received from client: " + clientMessage.getSender() + ": " + clientMessage.getMessage());

                // Respond with an acknowledgment
                String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
                SampleMessage response = SampleMessage.newBuilder()
                        .setSender("Grpc Server")
                        .setMessage("Message received: " + clientMessage.getMessage())
                        .setTimestamp(timeStamp)
                        .build();
                responseObserver.onNext(response);
            }

            @Override
            public void onError(Throwable t) {
                System.err.println("Error: " + t.getMessage());
            }

            @Override
            public void onCompleted() {
                responseObserver.onCompleted();
            }
        };
    }
}

In this service:

  • The sendMessage() method returns a StreamObserver that listens to client messages (onNext()).
  • When the server receives a message, it prints the message and sends an acknowledgment back to the client.

7. Implement the gRPC Client

Now, let’s implement the client that sends and receives messages in real-time.

package com.javatecharc.demo.grpc.bidirectional;

import com.javatecharc.grpc.proto.message.MessagingServiceGrpc;
import com.javatecharc.grpc.proto.message.SampleMessage;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Scanner;

public class MessagingClient {
    public static void main(String[] args) {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 15050)
                .usePlaintext()
                .build();

        MessagingServiceGrpc.MessagingServiceStub asyncStub = MessagingServiceGrpc.newStub(channel);

        StreamObserver<SampleMessage> requestObserver = asyncStub.sendMessage(new StreamObserver<>() {
            @Override
            public void onNext(SampleMessage serverMessage) {
                System.out.println("Received from server: " + serverMessage.getSender() + ": " + serverMessage.getMessage());
            }

            @Override
            public void onError(Throwable t) {
                System.err.println("Error: " + t.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("Messaging completed.");
                channel.shutdown();
            }
        });

        // Simulate message by sending messages from client to server
        Scanner scanner = new Scanner(System.in);
        for (int i = 0; i < 5; i++) {
            System.out.print("Enter your message: ");
            String message = scanner.nextLine();
            String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());

            SampleMessage sampleMessage = SampleMessage.newBuilder()
                    .setSender("JavaTechARC")
                    .setMessage(message)
                    .setTimestamp(timeStamp)
                    .build();

            requestObserver.onNext(sampleMessage);
        }

        requestObserver.onCompleted(); // End the sending message after sending messages
    }
}

This client:

  • Sends messages to the server through the requestObserver.
  • Receives and prints messages from the server through the StreamObserver.
  • Allows the user to enter messages through the console and sends them to the server.

Testing the Bidirectional Streaming

  1. Run the server: Execute the MessagingServer class. It should display “Server started on port 15050“.
  2. Run the client: Execute the MessagingClient class. You can type messages in the console and see the server’s responses.
  3. Interact: Type messages in the client console. You should see the messages echoed back from the server.

Sample Execution:

output

Conclusion

gRPC bidirectional streaming in Java provides a powerful way to implement real-time, full-duplex communication between a client and server. With this model, both parties can continuously send and receive messages over the same connection, making it ideal for use cases like live messaging, collaborative tools, and real-time data streams.

The source code of the examples can be found on github.

Leave a Comment

Your email address will not be published. Required fields are marked *

Index
Scroll to Top