Kafka with Java

Oubelque Khalid
6 min readNov 15, 2020

Messaging System

When we transfer data from one application to another, we use the Messaging System. It results as, without worrying about how to share data, applications can focus on data only. On the concept of reliable message queuing, distributed messaging is based. Although, messages are asynchronously queued between client applications and messaging system. There are two types of messaging patterns available, i.e. point to point and publish-subscribe (pub-sub) messaging system. However, most of the messaging patterns follow pub-sub.

  • Point to Point Messaging System

Here, messages are persisted in a queue. Although, a particular message can be consumed by a maximum of one consumer only, even if one or more consumers can consume the messages in the queue. Also, it makes sure that as soon as a consumer reads a message in the queue, it disappears from that queue.

  • Publish-Subscribe Messaging System

Here, messages are persisted in a topic. In this system, Kafka Consumers can subscribe to one or more topic and consume all the messages in that topic. Moreover, message producers refer publishers and message consumers are subscribers here.

Apache Kafka

Apache Kafka is a distributed publish-subscribe messaging system and a robust queue that can handle a high volume of data and enables you to pass messages from one end-point to another. Kafka is suitable for both offline and online message consumption. Kafka messages are persisted on the disk and replicated within the cluster to prevent data loss. Kafka is built on top of the ZooKeeper synchronization service. It integrates very well with Apache Storm and Spark for real-time streaming data analysis.

Benefits

Following are a few benefits of Kafka −

  • Reliability − Kafka is distributed, partitioned, replicated and fault tolerance.
  • Scalability − Kafka messaging system scales easily without down time..
  • Durability − Kafka uses Distributed commit log which means messages persists on disk as fast as possible, hence it is durable..
  • Performance − Kafka has high throughput for both publishing and subscribing messages. It maintains stable performance even many TB of messages are stored.

Apache Kafka Use cases

There are many Use Cases of Apache Kafka. So, here we are listing some of the most common use cases :

a. Kafka Messaging

As we know, Kafka is a distributed publish-subscribe messaging system. So, for a more traditional message broker, Kafka works well as a replacement. For a variety of reasons, we use Message brokers. For example, to decouple processing from data producers, to buffer unprocessed messages and many more.

However, Kafka has better throughput, built-in partitioning, replication, and fault-tolerance, in comparison to most other messaging systems. That makes it a good solution for large-scale message processing applications.

b. Website Activity Tracking

To be able to rebuild a user activity tracking pipeline as a set of real-time publish-subscribe feeds, it is the original Use Case for Kafka. That implies site activity is published to central topics with one topic per activity type. Here, site activity refers to page views, searches, or other actions users may take.

c. Kafka Metrics

For operational monitoring data, Kafka is often used. In addition, to produce centralized feeds of operational data, it includes aggregating statistics from distributed applications.

d. Commit Log

While it comes to a distributed system, Kafka can serve as a kind of external commit-log for it. Generally, it replicates data between nodes. Also, acts as a re-syncing mechanism for failed nodes to restore their data. The feature of log compaction in Kafka helps to support this usage. However, Kafka is the same as Apache BookKeeper project, in this usage.

Main parts in a Kafka system

Record: Producer sends messages to Kafka in the form of records. A record is a key-value pair. It contains the topic name and partition number to be sent. Kafka broker keeps records inside topic partitions. Records sequence is maintained at the partition level. You can define the logic on which basis partition will be determined.

Topic: A Topic is a category name to which records are stored and published. Producer writes a record on a topic and the consumer listens to it. A topic can have many partitions but must have at least one.

Partition: A topic partition is a unit of parallelism in Kafka, i.e. two consumers cannot consume messages from the same partition at the same time. A consumer can consume from multiple partitions at the same time.

Offset: A record in a partition has an offset associated with it. Think of it like this: partition is like an array; offsets are like indexs.

Producer: Creates a record and publishes it to the broker.

Consumer: Consumes records from the broker.

Java application using Kafka

In this example we gonna code a simple java application using apache kafka, we will produce message ‘Hello world’, and the consumer should pull read produced messages

First let’s Create maven java project

add Kafka dependency in Pom.xml

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-clients</artifactId>

<version>2.4.1</version>

</dependency>

KAFKA Producer class :

As you can see the class is well commented, where i define each step to create a Producer.

package com.shootSkill.kafka_Example;import java.util.Properties;import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
public class Producer {// define kafka Broker server
private final static String BOOTSTRAP_SERVERS = "127.0.0.1:9092";

// define kafka Topic
private final static String FIRST_TOPIC = "first-topic";
private KafkaProducer<String, String> producer;
public Producer() {
// 2 - Create Producer
producer = new KafkaProducer<String, String>(getProperties());
for(int i=0; i<10; i++) {
// 3 - Create Producer records
String value = "Hello World !!"+i;
String key = "id_"+i;
ProducerRecord<String,String> record = new ProducerRecord<String, String>(FIRST_TOPIC, key ,value);
// 4 - send data asynchronous
producer.send(record, (RecordMetadata metadata, Exception exception) -> {
if(exception == null) {
System.out.println("Message sent ");
System.out.println("Topic : "+metadata.topic());
System.out.println("Partition : "+metadata.partition());
System.out.println("Offset : "+metadata.offset());
System.out.println("Timestamp : "+metadata.timestamp());
}else {
System.out.println("Error while producing the message");
}
});
}
producer.flush();
producer.close();
}

// 1 - Create Producer Properties
public Properties getProperties() {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return properties;
}
public static void main(String[] args) {
Producer producer = new Producer();

}
}KAFKA Consumer class :

As you can see the class is well commented, where i define each step to create a Consumer.

package com.shootSkill.kafka_Example;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class Consumer {// define kafka Broker server
private final static String BOOTSTRAP_SERVERS = "127.0.0.1:9092";


private final static String FIRST_TOPIC = "first-topic";// define kafka Topic
private final static String FIRST_CONSUMER_GROUP = "first-group";// define Consumer group
private KafkaConsumer<String, String> consumer;public Consumer() {
// 2 - Create consumer
consumer = new KafkaConsumer<String, String>(getProperties());

// 3 - Subscribe Consumer to topic
consumer.subscribe(Arrays.asList(FIRST_TOPIC));

// 4 - Poll data
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> consumerRecord : records) {
System.out.println("Key = "+consumerRecord.key()+" Value = "+consumerRecord.value());
}
}
}

// 1 - Create Consumer Properties
public Properties getProperties() {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, FIRST_CONSUMER_GROUP);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return properties;
}
public static void main(String[] args) {Consumer consumer = new Consumer();
}
}

Run Zookeeper :

$ zookeeper-server-start.sh config/zookeeper.properties

Run Kafka Broker :

$ kafka-server-start.sh config/server.properties

Then run Producer class, then run Consumer class, and you should see at the Consumer console Output :

Output :

Latest articles

Installing Kafka On Linux

How to Stop Execution After a certain Time in Java

Implement the Runnable interface using lambda expression in Java?

Dockerize Java Application

Deadlock in java

--

--