Kafka Consumer with re-try (re try) mechanism
As we know kafka is a high throughput , low latency message broker platform with almost real time message processing.
But there are some use cases where we need to retry a message in case of any failures, especially network failures in microservices environment. And as there is no “delay queue” mechanism available in kafka, we can’t really process a given mkessage after certain amount of delay.
But we can still implement a retry mechanism by disabling auto commit and introducing the delay in the application code instead of at Kafka side.
The retry (re-try) flow as described below,
The above diagram is almost self explanatory.
Here is the code for consumer in java
package com.fastcart.kafkamessaging.standalones;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import com.google.common.primitives.Ints;
public class Consumer {
private long WAIT_TIME = 5000;
private int MAX_RETRY_COUNT = 6;
private int current_retry_count = 0;
private DelayQueue<DelayMessage> delayQueue = new DelayQueue<>();
public static void main(String[] args) throws InterruptedException {
Consumer c = new Consumer();
c.startConsumer();
}
private void startConsumer() {
KafkaConsumer<String, String> consumer = createConsumer();
// TODO resume all partitions on start up if there are any
if(consumer.paused()!=null) {
consumer.resume(consumer.paused());
}
System.out.println("Consumer started...");
try {
while (true) {
try {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
// System.out.println("---------- Batch Of records consumer 1 :" + consumerRecords.count());
for (ConsumerRecord<String, String> record : consumerRecords) {
System.out.println("key: " + record.key() + " and Value :" + record.value());
boolean doRetry = doSomeProcess(record);
if (doRetry && current_retry_count < MAX_RETRY_COUNT) {
// if retry is true
System.out.println("retry in progress..."+current_retry_count);
current_retry_count++;
TopicPartition topicPartition = (TopicPartition) consumerRecords.partitions().toArray()[record.partition()];
consumer.seek(topicPartition, record.offset());
List<TopicPartition> topicPartitionList = new ArrayList<>();
topicPartitionList
.add(topicPartition);
delayQueue.put(new DelayMessage("Delay Process", WAIT_TIME));
consumer.pause(topicPartitionList);
delayQueue.take();
consumer.resume(topicPartitionList);
} else {
consumer.commitSync();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
consumer.close();
// TODO: we can also re initiate consumer in case of failures
}
}
private boolean doSomeProcess(ConsumerRecord<String, String> record) {
System.out.println("Print the record" + record);
int number = new Random().nextInt(1000) ;
return number % 2 == 0;
}
/**
* Create Consumer
*
* @return
*/
private KafkaConsumer<String, String> createConsumer() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "dev-fastcart-group");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "fastcart-client" + new Random().nextInt());
// start reading from last committed messages instead of current messages.
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
// disable automatic commits
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, this.getMaxPollIntervalInMs());
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, this.getSessionTimeOutInMs());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
List<String> topics = new ArrayList<>();
topics.add("fastcart-topic");
consumer.subscribe(topics);
return consumer;
}
private int getSessionTimeOutInMs() {
return (int) Math.addExact(WAIT_TIME, 2 * 60 * 1000);
}
private int getMaxPollIntervalInMs() {
return (int) Math.addExact(WAIT_TIME, 1 * 60 * 1000);
}
}
class DelayMessage implements Delayed {
private String message = null;
private long delayTime;
public DelayMessage(String message, long delayTimeInMills) {
this.message = message;
this.delayTime = System.currentTimeMillis() + delayTimeInMills;
}
@Override
public long getDelay(TimeUnit unit) {
long diff = delayTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Ints.saturatedCast(this.delayTime - ((DelayMessage) o).delayTime);
}
@Override
public String toString() {
// TODO Auto-generated method stub
return "Message:" + message;
}
}
This can be tested with following Producer
package com.fastcart.kafkamessaging.standalones;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
public class Producer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
Future<RecordMetadata> response;
for(int i=0;i<2;i++) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>("fastcart-topic","message_key","Hello World-"+System.currentTimeMillis());
response= producer.send(record);
producer.flush();
try {
RecordMetadata recordMetdata = response.get();
System.out.println("Partition number: "+recordMetdata.partition());
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
producer.close();
}
}