Trusted answers to developer questions
Trusted Answers to Developer Questions

Related Tags

golang
apache
kafka
big data

How to use Kafka with Go

Educative Answers Team

Apache Kafka

Apache Kafka is an open-source,fault-tolerant messaging system designed to efficiently handle large amounts of data. Due to this, it is most commonly used in Big Data.

svg viewer

Usage

Once Kafka has been set up, Go language can communicate with the Kafka server using the Confluent Kafka Go library.

To import the library, the following script needs to be in the program:

import (
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

Producer

The next step would be to set up a producer. A producer in Kafka adds data to the stream:

producer, err := kafka.NewProducer(&kafka.ConfigMap{
    "bootstrap.servers": "host1:9092,host2:9092",
    "client.id": socket.gethostname(),
    "acks": "all"})

This script sets up a producer object for a Kafka server. The bootstrap.servers property is a list of comma-separated host-port pairs. The client.id property can be any unique string used to identify a particular user. In this case, the hostname of the current machine is set up as the client ID:

del_chan := make(chan kafka.Event, 10000)
err = producer.Produce(&kafka.Message{
    TopicPartition: kafka.TopicPartition{Topic: "educative", Partition: kafka.PartitionAny},
    Value: []byte(value)},
    del_chan,
)

channel_out := <-del_chan
message_report := channel_out.(*kafka.Message)

if message_report.TopicPartition.Error != nil {
    fmt.Printf(message_report.TopicPartition.Error)
} else {
    fmt.Printf("Message delivered")
}

close(del_chan)

This script demonstrates how to send data to Kafka. The TopicPartition property of the kafka.Message specifies which topic and partition to send the message to. The Value property contains the data that is to be sent. In this case, the variable is value. After the message is delivered, the channel_out variable will receive a sort of log of the message’s completion. The code following this line displays an error if received or a message indicating that the message has been delivered.

Consumer

Next, the consumer side needs to be set up. The consumer receives data from Kafka streams and processes it as required:

consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
     "bootstrap.servers":    "host1:9092,host2:9092",
     "group.id":             "foo",
     "auto.offset.reset":    "smallest"})

As can be seen, the setup code for the consumer and producer is quite similar. A different property for the consumer is the auto.offset.reset property. This property defines the initial starting point in each partition for data consumption to start from. In this case, it has been set to "smallest", which means that data will be consumed starting at the earliest offset.

retrieved := consumer.Poll(0)
switch message := retrieved.(type) {
case *kafka.Message:
    fmt.Printf(string(message.value()))
case kafka.Error:
    fmt.Printf("%% Error: %v\n", message)
default:
    fmt.Printf("Ignored %v\n", message)
}

This script can be used to retrieve a message from Kafka. A type-switch statement is used here to determine whether an error has occurred, a message has been received, or the consumer has been ignored.

RELATED TAGS

golang
apache
kafka
big data
Copyright ©2022 Educative, Inc. All rights reserved
RELATED COURSES

View all Courses

Keep Exploring