As we know kafka is a high throughput , low latency message broker platform with almost real time message processing.
But there are some use cases where we need to retry a message in case of any failures, especially network failures in microservices environment. And as there is no “delay queue” mechanism available in kafka, we can’t really process a given mkessage after certain amount of delay.
But we can still implement a retry mechanism by disabling auto commit and introducing the delay in the application code instead of at Kafka side.
The retry (re-try) flow as described below,
The above diagram is almost self explanatory.
Here is the code for consumer in java
package com.fastcart.kafkamessaging.standalones; import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.Random; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import com.google.common.primitives.Ints; public class Consumer { private long WAIT_TIME = 5000; private int MAX_RETRY_COUNT = 6; private int current_retry_count = 0; private DelayQueue<DelayMessage> delayQueue = new DelayQueue<>(); public static void main(String[] args) throws InterruptedException { Consumer c = new Consumer(); c.startConsumer(); } private void startConsumer() { KafkaConsumer<String, String> consumer = createConsumer(); // TODO resume all partitions on start up if there are any if(consumer.paused()!=null) { consumer.resume(consumer.paused()); } System.out.println("Consumer started..."); try { while (true) { try { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000)); // System.out.println("---------- Batch Of records consumer 1 :" + consumerRecords.count()); for (ConsumerRecord<String, String> record : consumerRecords) { System.out.println("key: " + record.key() + " and Value :" + record.value()); boolean doRetry = doSomeProcess(record); if (doRetry && current_retry_count < MAX_RETRY_COUNT) { // if retry is true System.out.println("retry in progress..."+current_retry_count); current_retry_count++; TopicPartition topicPartition = (TopicPartition) consumerRecords.partitions().toArray()[record.partition()]; consumer.seek(topicPartition, record.offset()); List<TopicPartition> topicPartitionList = new ArrayList<>(); topicPartitionList .add(topicPartition); delayQueue.put(new DelayMessage("Delay Process", WAIT_TIME)); consumer.pause(topicPartitionList); delayQueue.take(); consumer.resume(topicPartitionList); } else { consumer.commitSync(); } } } catch (InterruptedException e) { e.printStackTrace(); } } } finally { consumer.close(); // TODO: we can also re initiate consumer in case of failures } } private boolean doSomeProcess(ConsumerRecord<String, String> record) { System.out.println("Print the record" + record); int number = new Random().nextInt(1000) ; return number % 2 == 0; } /** * Create Consumer * * @return */ private KafkaConsumer<String, String> createConsumer() { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "dev-fastcart-group"); properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "fastcart-client" + new Random().nextInt()); // start reading from last committed messages instead of current messages. properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); // disable automatic commits properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, this.getMaxPollIntervalInMs()); properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, this.getSessionTimeOutInMs()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); List<String> topics = new ArrayList<>(); topics.add("fastcart-topic"); consumer.subscribe(topics); return consumer; } private int getSessionTimeOutInMs() { return (int) Math.addExact(WAIT_TIME, 2 * 60 * 1000); } private int getMaxPollIntervalInMs() { return (int) Math.addExact(WAIT_TIME, 1 * 60 * 1000); } } class DelayMessage implements Delayed { private String message = null; private long delayTime; public DelayMessage(String message, long delayTimeInMills) { this.message = message; this.delayTime = System.currentTimeMillis() + delayTimeInMills; } @Override public long getDelay(TimeUnit unit) { long diff = delayTime - System.currentTimeMillis(); return unit.convert(diff, TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return Ints.saturatedCast(this.delayTime - ((DelayMessage) o).delayTime); } @Override public String toString() { // TODO Auto-generated method stub return "Message:" + message; } }
This can be tested with following Producer
package com.fastcart.kafkamessaging.standalones; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; public class Producer { public static void main(String[] args) { Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); Future<RecordMetadata> response; for(int i=0;i<2;i++) { ProducerRecord<String, String> record = new ProducerRecord<String, String>("fastcart-topic","message_key","Hello World-"+System.currentTimeMillis()); response= producer.send(record); producer.flush(); try { RecordMetadata recordMetdata = response.get(); System.out.println("Partition number: "+recordMetdata.partition()); } catch (InterruptedException | ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } producer.close(); } }