Uncategorized

Kafka Consumer with re-try (re try) mechanism

As we know kafka is a high throughput , low latency message broker platform with almost real time message processing.

But there are some use cases where we need to retry a message in case of any failures, especially network failures in microservices environment. And as there is no “delay queue” mechanism available in kafka, we can’t really process a given mkessage after certain amount of delay.

But we can still implement a retry mechanism by disabling auto commit and introducing the delay in the application code instead of at Kafka side.

The retry (re-try) flow as described below,

The above diagram is almost self explanatory.

Here is the code for consumer in java

package com.fastcart.kafkamessaging.standalones;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import com.google.common.primitives.Ints;

public class Consumer {

	private long WAIT_TIME = 5000;
	private int MAX_RETRY_COUNT = 6;
	private int current_retry_count = 0;
	private DelayQueue<DelayMessage> delayQueue = new DelayQueue<>();

	public static void main(String[] args) throws InterruptedException {
		Consumer c = new Consumer();

		c.startConsumer();

	}

	private void startConsumer() {

		
		KafkaConsumer<String, String> consumer = createConsumer();
		// TODO resume all partitions on start up if there are any
		if(consumer.paused()!=null) {
		consumer.resume(consumer.paused());
		}
		System.out.println("Consumer started...");
		try {
			while (true) {

				try {
					ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
				//	System.out.println("---------- Batch Of records consumer 1 :" + consumerRecords.count());
					for (ConsumerRecord<String, String> record : consumerRecords) {
						System.out.println("key: " + record.key() + " and Value :" + record.value());
						boolean doRetry = doSomeProcess(record);
						if (doRetry && current_retry_count < MAX_RETRY_COUNT) {
							// if retry is true
							System.out.println("retry in progress..."+current_retry_count);
							current_retry_count++;
							TopicPartition topicPartition = (TopicPartition) consumerRecords.partitions().toArray()[record.partition()];
							consumer.seek(topicPartition, record.offset());
							List<TopicPartition> topicPartitionList = new ArrayList<>();
							topicPartitionList
									.add(topicPartition);
							delayQueue.put(new DelayMessage("Delay Process", WAIT_TIME));
							consumer.pause(topicPartitionList);
							delayQueue.take();
							consumer.resume(topicPartitionList);

						} else {
							consumer.commitSync();
						}
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}

			}
		} finally {
			consumer.close();
			// TODO: we can also re initiate consumer in case of failures
		}
	}

	private boolean doSomeProcess(ConsumerRecord<String, String> record) {
		System.out.println("Print the record" + record);
		int number = new Random().nextInt(1000) ;
		return number % 2 == 0;

	}

	/**
	 * Create Consumer
	 * 
	 * @return
	 */
	private KafkaConsumer<String, String> createConsumer() {
		Properties properties = new Properties();
		properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		properties.put(ConsumerConfig.GROUP_ID_CONFIG, "dev-fastcart-group");
		properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "fastcart-client" + new Random().nextInt());
		// start reading from last committed messages instead of current messages.
		properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
		// disable automatic commits
		properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
		properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, this.getMaxPollIntervalInMs());
		properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, this.getSessionTimeOutInMs());

		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
		List<String> topics = new ArrayList<>();
		topics.add("fastcart-topic");
		consumer.subscribe(topics);
		return consumer;
	}

	private int getSessionTimeOutInMs() {
		return (int) Math.addExact(WAIT_TIME, 2 * 60 * 1000);
	}

	private int getMaxPollIntervalInMs() {
		return (int) Math.addExact(WAIT_TIME, 1 * 60 * 1000);
	}

}

class DelayMessage implements Delayed {
	private String message = null;
	private long delayTime;

	public DelayMessage(String message, long delayTimeInMills) {

		this.message = message;
		this.delayTime = System.currentTimeMillis() + delayTimeInMills;
	}

	@Override
	public long getDelay(TimeUnit unit) {
		long diff = delayTime - System.currentTimeMillis();
		return unit.convert(diff, TimeUnit.MILLISECONDS);
	}

	@Override
	public int compareTo(Delayed o) {
		return Ints.saturatedCast(this.delayTime - ((DelayMessage) o).delayTime);
	}

	@Override
	public String toString() {
		// TODO Auto-generated method stub
		return "Message:" + message;
	}
}

This can be tested with following Producer

package com.fastcart.kafkamessaging.standalones;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

public class Producer {

	public static void main(String[] args) {

		Properties properties = new Properties();
		properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
		properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
		
		KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
		Future<RecordMetadata> response;
		for(int i=0;i<2;i++) {
		ProducerRecord<String, String> record = new ProducerRecord<String, String>("fastcart-topic","message_key","Hello World-"+System.currentTimeMillis());

		 response= producer.send(record);
		producer.flush();
		try {
			RecordMetadata recordMetdata = response.get();
			System.out.println("Partition number: "+recordMetdata.partition());
		} catch (InterruptedException | ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		}
		producer.close();
		
	}

}