Manage consumers without restarts
The problem :
Let’s assume that your application listen to two kafka topics and due to a bug or due to business needs you need to stop one of the consumer. If you’re using spring kafka, you can set “autoStartup“ property as false and restart the application.
In this article, we are going to see how to stop one of the consumers without restarting the application. Recently we faced this problem in real time due to a bug which lead to wipe the database and consume from the beginning.
Topic and spring kafka configuration :
In our application we have two topics.
- products
- orders
When we consume orders topic, order information will contain the product id and quantity details based on which we validate product details against the database and create an order. But during the recovery, as products and orders both consumed parallelly most of the orders started failing with product does not exist exception.
My self explainable kafka listener configuration looks like the following.
and I’ve introduced a controller to send message these topics.
Now I can test this consumption by sending a message to products or orders topic. (Make sure that you start zookeeper and kafka as a prerequisite for this application)
curl --location --request POST 'http://localhost:8080/send' --header 'Content-Type: application/json' --data-raw '{ "topicName" : "products", "message" : "CoffeeMachine-1 Created" }'
At the time of recovery I like to stop the orders topic consumer till I complete the products topic consumption which will create the required entries in the database.
Solution:
To control the consumer running status, spring kafka provides KafkaListenerEndpointRegistry bean from which you can get all the listeners associated within the application context.
registry.getListenerContainer("<kafkaListenerId>").stop(); registry.getListenerContainer("<kafkaListenerId>").start();
Like the above code, based on the listener id you can do start, stop or get the running statuses of the consumers.
I’ve introduced endpoints to control the consumer running status just for this demo. All you need is pass the right listenerId to the endpoint.
curl --location --request POST 'http://localhost:8080/stop' --header 'Content-Type: application/json' --data-raw '{ "listenerId" : "product_listener" }'
Similarly /start and /status also will work and ConsumerResponse is nothing but the POJO class which contains topicName, runningStatus fields.
You can download this project from github. https://github.com/ashoksl/java_samples/tree/master/consumer