Search⌘ K
AI Features

The Kafka Consumer API

Explore the Kafka Consumer API by learning how to subscribe to topics, poll for messages, process data, and commit offsets. Understand the role of consumer groups and how they enable scalable, fault-tolerant stream processing using Kafka Java clients.

Introduction to Kafka consumers

In Apache Kafka, consumers are responsible for reading data from topics and processing it—whether that means storing it, transforming it, or triggering downstream actions.

If producers are responsible for sending events into Kafka, consumers are responsible for turning those events into value.

At a high level, consuming data from Kafka involves four core steps:

  1. Subscribing to topics

  2. Polling for messages

  3. Processing messages

  4. Committing offsets

Kafka Consumer high-level workflow

Understanding Kafka consumer workflow is essential before building scalable, fault-tolerant streaming applications.

Kafka Consumer high-level workflow
Kafka Consumer high-level workflow

Let’s walk through each step in order.

Step 1: Subscribing to topics

The first step in consuming data from Kafka is to subscribe to one or more Kafka topics. This is typically done by creating a consumer instance and specifying the names of the topics to subscribe to. The consumer then sends a request to one of the Kafka brokers to join the corresponding consumer group and start receiving messages from the subscribed topics.

Step 2: Polling for messages

Once the consumer has subscribed to a topic, it begins the process of polling for messages. This involves sending a request to a Kafka broker to fetch a batch of messages from the subscribed topics. The consumer can control the size of the message batch by specifying the maximum number of messages to fetch in each request. The consumer then waits for the broker to respond with the fetched messages.

Step 3: Processing messages

When the consumer receives a batch of messages from the Kafka broker, it begins processing the messages. This involves applying some business logic to the message data, which could include transforming it, storing it in a database, or forwarding it to another system. The exact details of how the messages are processed depend on the specific use case and requirements of the consumer application.

Step 4: Committing offsets

After the consumer has processed a batch of messages, it needs to commit the offsets of the messages that have been successfully processed. This is important for ensuring that the consumer can resume reading from where it left off in case of a failure or a restart. The consumer can commit the offsets manually by explicitly calling the commit API, or it can use the automatic offset commit feature provided by the Kafka Consumer API.

In addition to these core steps, several other aspects of Kafka consumers are important to consider. For example, consumers can be configured to run in a single-threaded or multi-threaded mode, which can affect the parallelism and throughput of the consumer application. Consumers can also use various partition assignment strategies to control how Kafka topics are partitioned and how messages are distributed across the consumer group. By carefully tuning these aspects of the consumer application, developers can optimize the performance and scalability of their Kafka consumers.

Kafka consumer groups

A consumer group is a key feature of the Kafka architecture that enables multiple consumer instances to work together to read data from Kafka topics. A consumer group is a logical entity that consists of one or more consumer instances that share the same group ID. When a consumer group subscribes to a Kafka topic, the group instances work together to read data from the topic in a coordinated manner.

Conceptually, a consumer group works by dividing the partitions of a Kafka topic among the group instances. Each partition of a topic can only be read by one consumer instance at a time, so the group instances need to coordinate to ensure that each partition is being processed by a single instance. This coordination is done through a combination of heartbeats and group rebalancing.

Kafka consumer groups load balancing
Kafka consumer groups load balancing

As the above illustration shows, different consumer groups consume different partitions of a single Kafka topic.

When a consumer instance joins a group, it sends a heartbeat message to the group coordinator to indicate that it is alive and processing messages. If the coordinator does not receive a heartbeat from a consumer instance within a certain period of time, it assumes that the instance has failed and initiates a rebalance of the group.

During a rebalance, the group coordinator reassigns the partitions of the topic among the active group instances in a way that ensures that each partition is read by only one instance. The assignment strategy used for rebalancing can be controlled by the consumer group configuration. Once the rebalance is complete, each instance is notified of its assigned partitions and can start polling for messages.

The use of consumer groups provides several benefits for Kafka consumers. One key benefit is load balancing because the group instances can work together to distribute the processing load across multiple machines. This can improve the throughput and fault tolerance of the consumer application.

Another benefit of consumer groups is the ability to scale the number of consumer instances up or down as needed, without disrupting the overall processing of the Kafka topic. For example, if a consumer group experiences high message volume, additional consumer instances can be added to the group to increase the processing capacity.

Sample code

Click the “Run” button in the widget below. This will initiate the build process and start the Kafka consumer application. Once the application has started, you should see the Started Kafka Consumer for Topic: message.


After that, follow the steps outlined below:

package com.example;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;

public class KafkaConsumerExample {
    public static void main(String[] args) {

        String defaultTopicName = "test-topic";
        String defaultGroupName = "test-group";

        String topicName = System.getenv("TOPIC_NAME") != null ? System.getenv("TOPIC_NAME") : defaultTopicName;
        String groupName = System.getenv("CONSUMER_GROUP_NAME") != null ? System.getenv("CONSUMER_GROUP_NAME") : defaultGroupName;

        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupName);
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(topicName));

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Stopping Kafka Consumer");
            consumer.close();
        }));

        System.out.println("Started Kafka Consumer for Topic: "+ topicName + ", Consumer Group: " + groupName);


        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("topic=%s, partition=%d, offset=%d, key=%s, value=%s\n",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
            }
        } finally {
            consumer.close();
        }
    }

}
Kafka Consumer application

Click the “+” button to open a new terminal tab and enter the following commands to send messages using the Kafka console producer:

cd /app/confluent-7.9.1/bin/
Enter into the folder
./kafka-console-producer --broker-list localhost:9092 --topic test-topic --property "parse.key=true" --property "key.separator=:"
Creating a topic

You should see a > prompt. You can enter messages that you want to send to the topic (press the “enter” key after each message to send it to Kafka). Enter the messages using this format: <key>:<value>. For example, key1:value1, key2:value2, key3:value3, etc.

Return to the consumer application terminal to verify that the messages were received. You should see logs similar to the following:

topic=test-topic, partition=0, offset=0, key=key1, value=value1
topic=test-topic, partition=0, offset=1, key=key2, value=value2
topic=test-topic, partition=0, offset=2, key=key3, value=value3
...
Application logs

Code explanation

  • Lines 3–12: We import the required packages.

  • Lines 20–21: We read the values for the topic name and consumer group from environment variables, if available. Otherwise, default values are used.

  • Lines 23–28: We create a new Properties object to specify the configuration for the Kafka consumer.

    • We set the bootstrap.servers property to localhost:9092, which specifies the location of the Kafka brokers.

    • We set the consumer group name.

    • We set the key and value de-serializer properties to org.apache.kafka.common.serialization.StringDeserializer.

    • We set AUTO_OFFSET_RESET to earliest.

  • Lines 30–31: We create a new instance of KafkaConsumer, and it subscribes to the topic specified in the topicName variable.

  • Lines 33–36: We add a shutdown hook to the runtime to ensure that the consumer is properly closed when the application is shut down.

  • Lines 42–48: We execute the following logic in an infinite loop:

    • The consumer polls for new messages on the topic, with a timeout of 100 milliseconds.

    • If messages are received, each message’s topic, partition, offset, key, and value are printed to the console.

In this lesson, we learnt about Kafka consumers, along with the concept of consumer groups, with practical examples of how to use these with the Kafka Java client.