Kafka

Learn about Apahe Kafka and how it nables real time event based streaming.

Need for streaming based system in Big Data

A unified platform for handling all the real-time data feeds a large company might have.

High throughput to support high volume event feeds.

Support real-time processing of these feeds to create new, derived feeds.

Support large data backlogs to handle periodic ingestion from offline systems.

Support low-latency delivery to handle more traditional messaging use cases. Guarantee fault-tolerance in the presence of machine failures.

What is Kafka

Kafka is Distributed,Persistent,Reliable and High throughput Pub Sub based event based messaging system.

Kafka Concepts

There are one or more servers available in Apache Kafka cluster, basically, these servers (each) are what we call a broker.Brokers are also responsible for maintaining general state information of the system, leader election, etc

Producers write data to brokers.

Consumers read data from brokers.

All this is distributed.

Data is stored in topics.

Topics are split into partitions, which are replicated.

Topic: feed name to which messages are published

A topic consists of partitions. Partition: ordered + immutable sequence of messages that is continually appended to - a commit log

#partitions of a topic is configurable

#partitions determines max consumer (group) parallelism

Kafka Logs

A log is nothing different but another way to view a partition. Basically, a data source writes messages to the log. Further, one or more consumers read that data from the log at any time they want.

Offset: messages in the partitions are each assigned a unique (per partition) and sequential id called the offset Consumers track their pointers via (offset, partition, topic) tuples

Replicas: “backups” of a partition They exist solely to prevent data loss. Replicas are never read from, never written to. They do NOT help to increase producer or consumer parallelism! Kafka tolerates (numReplicas - 1) dead brokers before losing data

Consumer Group

Kafka can have multiple consumer process/instance running. Basically, one consumer group will have one unique group-id. Moreover, exactly one consumer instance reads the data from one partition in one consumer group, at the time of reading. Since, there is more than one consumer group, in that case, one instance from each of these groups can read from one single partition. However, there will be some inactive consumers, if the number of consumers exceeds the number of partitions. Let’s understand it with an example if there are 8 consumers and 6 partitions in a single consumer group, that means there will be 2 inactive consumers.

Role of ZooKeeper in Apache Kafka

Apache Zookeeper serves as the coordination interface between the Kafka brokers and consumers.

Also, we can say it is a distributed configuration and synchronization service.

Basically, ZooKeeper cluster shares the information with the Kafka servers.

Moreover, Kafka stores basic metadata information in ZooKeeper Kafka, such as topics, brokers, consumer offsets (queue readers) and so on.

In addition, failure of Kafka Zookeeper/broker does not affect the Kafka cluster. It is because the critical information which is stored in the ZooKeeper is replicated across its ensembles. Then Kafka restores the state as ZooKeeper restarts, leading to zero downtime for Kafka.

Writing data to Kafka

You use Kafka “producers” to write data to Kafka brokers.

Available for JVM (Java, Scala), C/C++, Python, Ruby, etc.

In order to send messages asynchronously to a topic, KafkaProducer class provides send method. So, the signature of send() is

producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1) , callback);

Simple Kafka Producer Application

However, make sure that first start ZooKeeper and Kafka broker then create your own topic in Kafka broker using create topic command. Then create a Java class named Sim-pleProducer.java and proceed with the following coding:

//import util.properties packages
import java.util.Properties;
//import simple producer packages
import org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;
//Create java class named “SimpleProducer”
public class SimpleProducer {
public static void main(String[] args) throws Exception{
// Check arguments length value
if(args.length == 0){
System.out.println("Enter topic name”);
return;
}
//Assign topicName to string variable
String topicName = args[0].toString();
// create instance for properties to access producer configs
Properties props = new Properties();
//Assign localhost id
props.put("bootstrap.servers", “localhost:9092");
//Set acknowledgements for producer requests.
props.put("acks", “all");
//If the request fails, the producer can automatically retry,
props.put("retries", 0);
//Specify buffer size in config
props.put("batch.size", 16384);
//Reduce the no of requests less than 0
props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
Producer<String, String> producer = new KafkaProducer
<String, String>(props);
for(int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>(topicName,
Integer.toString(i), Integer.toString(i)));
System.out.println(“Message sent successfully”);
producer.close();
}
}

Acks

In Kafka, a message is considered committed when “any required” ISR (in-sync replicas) for that partition have applied it to their data log.

Message acking is about conveying this “Yes, committed!” information back from the brokers to the producer client.

Exact meaning of “any required” is defined by request.required.acks.

Only producers must configure acking Exact behavior is configured via request.required.acks, which determines when a produce request is considered completed. Allows you to trade latency (speed) <-> durability (data safety).

Consumers: Acking and how you configured it on the side of producers do not matter to consumers because only committed messages are ever given out to consumers. They don’t need to worry about potentially seeing a message that could be lost if the leader fails.

Typical values of request.required.acks 0: producer never waits for an ack from the broker.Gives the lowest latency but the weakest durability guarantees.

1: producer gets an ack after the leader replica has received the data. Gives better durability as the we wait until the lead broker acks the request. Only msgs that were written to the now-dead leader but not yet replicated will be lost.

-1: producer gets an ack after all ISR have received the data. Gives the best durability as Kafka guarantees that no data will be lost as long as at least one ISR remains.

Beware of interplay with request.timeout.ms!

"The amount of time the broker will wait trying to meet the request.required.acks requirement before sending back an error to the client.”

Reading data from Kafka

After creating a Kafka Producer to send messages to Apache Kafka cluster. Now, we are creating a Kafka Consumer to consume messages from the Kafka cluster.

Kafka Consumer subscribes to one or more topics in the Kafka cluster then further feeds on tokens or messages from the Kafka Topics. In addition, using Heartbeat we can know the connectivity of Consumer to Kafka Cluster. However, let’s define Heartbeat. It is set up at Consumer to let Zookeeper or Broker Coordinator know if the Consumer is still connected to the Cluster. So, Kafka Consumer is no longer connected to the Cluster, if the heartbeat is absent. In that case, the Broker Coordinator has to re-balance the load. Moreover, Heartbeat is an overhead to the cluster. Also, by keeping the data throughput and overhead in consideration, we can configure the interval at which the heartbeat is at Consumer.

Moreover, we can group the consumers, and the consumers in the Consumer Group in Kafka could share the partitions of the Kafka Topics they subscribed to. To understand see, if there are N partitions in a Topic, N consumers in the Kafka Consumer Group and the group has subscribed to a Topic, each consumer would read data from a partition of the topic. Hence, we can say, this is just a heads up that Consumers could be in groups.

Make sure the producer application steps remain the same here. Here also start your ZooKeeper and Kafka broker. Further, create a SimpleConsumer application with the java class named SimpleCon-sumer.java. Then type the following code:

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
if(args.length == 0){
System.out.println("Enter topic name");
return;
}
//Kafka consumer configuration settings
String topicName = args[0].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serializa-tion.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer
<String, String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicName))
//print the topic name
System.out.println("Subscribed to topic " + topicName);
int i = 0;
while (true) {
ConsumerRecords<String, String> records = con-sumer.poll(100);
for (ConsumerRecord<String, String> record : records)
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
}
}

Re-balancing of a Consumer Basically, an addition of more processes/threads will cause Kafka to re-balance. Basically, if somehow any consumer or broker fails to send heartbeat to ZooKeeper, then it can be re-configured via the Kafka cluster. Also, Kafka will assign available partitions to the available threads, possibly moving a partition to another process, during this re-balance.