fetch.min.bytes

This property specifies the minimum amount of data a broker should send back to a consumer. The broker will wait for messages to pile up until they are larger in aggregate size than fetch.min.bytes before sending them to the consumer. Setting a higher value for this setting reduces the back and forth between the consumer and the broker. The configuration can be set high when there are a large number of consumers or the consumers run CPU-intensive processing on the received data.

fetch.max.wait.ms

The configuration fetch.max.wait.ms specifies how long to wait if the minimum number of bytes to fetch specified by the previous configuration fetch.min.bytes aren’t available.

max.partition.fetch.bytes

The configuration max.partition.fetch.bytes specifies the maximum number of bytes returned by a broker per partition. Obviously, this value has to be higher than the largest message the broker will accept, otherwise the consumer can hang trying to read a message that is greater than max.partition.fetch.bytes but within acceptable limits for the broker. Setting too high a value may return more records than the consumer can process in a timely manner. This can be a problem as the poll() call is also responsible for sending out heartbeats, and thus a delay in the next invocation of poll() may result in a session timeout followed by a rebalance (covered later). This situation can be mitigated by either choosing a lower value for this configuration or setting a high enough value for session timeout.

By default, max.partition.fetch.bytes is set to 1MB. If we have 20 partitions and 4 consumers then each consumer will read from 5 partitions. This implies that each consumer must have at least 5MB of available memory for ConsumerRecords. In reality, more memory per consumer will be needed as other consumers in the group can fail. This triggers a rebalance and causes the remaining consumers to divide up the load for the failed consumer amongst themselves.

session.timeout.ms

This configuration setting allows the consumer to be out of contact with the broker for session.timeout.ms milliseconds and still be considered alive. If session.timeout.ms milliseconds pass without the broker receiving a heartbeat from the consumer, a rebalance is triggered and the partitions assigned to the out of contact consumer are redistributed among the consumers still considered alive. By default this configuration is set to 3 seconds. This setting is closely related to the configuration heatbeat.interval.ms, which determines how frequently the KafkaConsumer's poll() method will send a heartbeat to the broker. Obviously, heatbeat.interval.ms is set lower than session.timeout.ms, usually at one third of the value of session.timeout.ms. Choosing a lower value for session.timeout.ms can help detect a failed consumer faster, but may also result in unnecessary rebalances if a consumer takes longer to complete the poll loop or pauses for garbage collection.

auto.offset.reset

When consumers read from a partition, they also commit an offset to remember the position they last read in the partition. We’ll cover commits and offsets in depth later. When a consumer can’t find a valid offset, (which can happen if the consumer is down for long enough for the record pointed to by the last committed offset to age out), the consumer can choose to start reading records in the partition from the beginning or from the newest record. The configuration auto.offset.reset controls this behavior to read earliest or newest records when a valid offset can’t be determined for a partition.

enable.auto.commit

By default, the offsets are committed automatically, but this behavior can be changed by setting enable.auto.commit to false. You may want to control when offsets are committed and how frequently (using the related configuration auto.commit.interval.ms) to minimize duplicates and avoid missing data.

max.poll.records

This configuration specifies the maximum number of records returned by the poll() call to the Kafka consumer. Setting a limit on the number of records helps control the amount of data the consumer will process in a poll cycle.

client.id

Similar to the producer configuration, the setting client.id acts as an identifier for a consumer. This can be any string that identifies the consumer and can be used for metrics, logging, or quotas.

receive.buffer.bytes and send.buffer.bytes

These configurations control the receive and send buffer sizes for the TCP sockets when reading and writing data. If set to -1, the OS defaults are picked-up. These configurations can be set to higher values, especially when consumers communicate with brokers in different data centers since the network links connecting them have higher latency and lower bandwidth.

partition.assignment.strategy

Topic partitions are assigned to consumers. The assignment of partitions to consumers can be controlled by the class PartitionAssignor. If left to Kafka, there are two assignment strategies:

  • Range: This scheme assigns a consecutive subset of topic partitions to subscribing consumers. Consider two consumers, C1 and C2, and two topics, T1 and T2. Say, T1 has 3 partitions and T2 has 2 partitions. The algorithm will assign partitions 0 and 1 of T1 to the first consumer C1 and partition 2 of T1 to the second consumer C2. For T2, partition 0 will be assigned to C1 and partition 1 will be assigned to C2. Note, that the assignment for each topic partition is done independently of other topics, so that if topic T2 also had three partitions, then consumer C1 will still receive the first two partitions in its share.

  • Round Robin: The partitions from all the topics are sequentially assigned to consumers one by one. Consider the same two consumers and two topics. Say, T1 has 3 partitions and T2 has 2 partitions. The algorithm will assign partition 0 of topic T1 to the first consumer C1, then partition 1 of T1 to C2, then partition 2 of T1 to C1, then partition 0 of T2 to C2 and finally partition 1 of T2 to C1.

We can specify one of the default strategies to use by configuring the partition.assignment.strategy setting. It can be set to either org.apache.kafka.clients.consumer.RangeAssignor or org.apache.kafka.clients.consumer.RoundRobinAssignor. If you want to specify your custom strategy, this setting should be set to your class name.

Get hands-on with 1200+ tech skills courses.