Kafka migration from hosted service to in-house cluster

Overview:

Freshworks operates 5 Kafka clusters in various regions in development and production environments. Kafka is central to all inter-product communications at Freshworks. So much so that the team handling our Kafka Cluster is internally known as Central. 

Our biggest cluster contains 21 Kafka nodes, supporting over 12TB of data per day with peak egress of ~250MB/s of throughput.

 

We used a hosted service provider for our Kafka infrastructure but moved to a self-hosted solution. In the long run, this is certainly resource efficient for teams looking to scale.

We are required to be prepared on three fronts to achieve this transition:

  1. Set up replication: Set up a Kafka Connect cluster that will be used to replicate all the data from the source(externally hosted) cluster to the destination(self-hosted) cluster. 
  2. Migrate consumers: Move all the consumer groups from the source cluster to the destination cluster. Move them to the same message they consumed on the source cluster so there is no duplication of message processing. This is the trickiest part of this migration.
  3. Migrate producers: Move the producers from the source cluster to the destination cluster.

 

Set up replication

To set up the replication correctly, we ensured that:

  • All the topics and their equivalent partitions are created automatically in the destination cluster
  • We could scale up/down the replication cluster nodes to handle the unexpected traffic
  • We could monitor the lag between the source and destination clusters so that data flows near real-time.

 

Setting up the Kafka Connect cluster

We used cp-docker-images to set up the replication cluster. To start a worker node, we started the cluster with the env variables mentioned in Appendix  kafka_connect_env.sh

Once the configurations were set, we started the image on port 8083 with the following command-

To scale up the nodes in the replication cluster, we changed the CONNECT_REST_ADVERTISED_HOST_NAME env variable for each worker node.

 

Step 2. Adding Replicator task to the Connect cluster

Once the connected cluster is up and running, the replicator configuration needs to be loaded to the cluster that will be picked by the Kafka connect cluster nodes. The task definition can be found in Appendix replicator_task.json.  The most important configuration is the ‘tasks.max’, which specifies the number of tasks it needs to run. Depending on the traffic in the cluster, we can increase/decrease these tasks. As long as nodes are available in the cluster, these tasks will be evenly distributed among the nodes in the Kafka connect cluster.

To put this configuration into the cluster, login into any cluster nodes and run the following command.

We can use the below commands, which can be helpful once the replication is setup:

The lag for the consumer group, as mentioned in the config “replicator-group” can be used to check if the replication is happening and is happening in real time.

 

Step 3. Monitoring the replication

In our task definition, replicator_task.json, we mentioned the consumer group id as replicator-group. We can now use this group to find the replication delay in consuming from the source cluster. Since this is acting as a normal Kafka consumer, we can use the standard kafka-consumer-groups.sh utility. We ran it as a cron job and exported the metrics to a file. Later we exported this file to our Prometheus server to set up the relevant alerts.

 

Migrate consumers:

Now that we have a replica setup where all the messages are copied from the source cluster to the destination cluster, we now need to move our consumer groups, one at a time. This can be tricky and involves multiple steps. 

Step 1: Stop all the consumers of a consumer group which are drawing from the source cluster

Step 2: Find all the topics that this consumer group was listening to

Step 3: Find all the offsets for each partition that this consumer group has committed to

Step 4: Move to the timestamp in the destination cluster where the last message was committed

Step 5: Since more than one message can be committed in a single millisecond, we need to traverse all the messages and find the exact message that was committed. If your consumer can handle duplicate messages, you can skip this step

Step 6: Commit the offset in the new cluster for this consumer group and restart consumers

We used the Kafka python library to automate this whole process. Below are the code snippets for each step

 

Step 2: Find all the topics for this consumer group.

Step 3: Find all the partitions of the topics in the source cluster

We can get all the committed partitions offsets with

Since we have to re-read the last message again, we will decrease all the offsets by 1

Step 4 Details:

To get to the first message which was committed in the destination cluster for the timestamp, we can use the offsets_for_times method, which will take us to the first message that was committed on this timestamp.

Step 5 Details:

Since multiple messages can be committed in the same millisecond, we need to get to the exact message committed to avoid duplicate message processing.


To do this, we will create a hashMap of the last message committed in the source cluster, storing its SHA256 value. Now in the destination cluster, we start from the same millisecond and compare one message simultaneously to see if the SHA value matches. Once we find the value, we have arrived at the last committed message for each topic partition. 

Step 6 details:

Now that we have these offsets in the destination cluster, we can commit these offsets and restart the consumers with destination cluster configs.

For the full script, please look at the complete script.

 

Migrating producers 

In our producer use cases, we did not have to worry about ordering of messages in the topic. Since the data is already being replicated, a rolling restart on the producer was sufficient for us. If you have to worry about ordering of messages, you should stop all of your producers to a topic, preferably store somewhere to avoid downtime, replay those messages and then restart your producers.

Summary

  • Since this migration involves all the teams consuming and producing to the Kafka cluster, ensure the team has allocated the right bandwidth all across.
  • If the consumers can handle a few seconds of duplicate messages, just restart your consumers with the new configuration without stopping them. In case they are not, completely stop consumers and then restart with the consumers with the new configuration.
  • Have the necessary alerts in the system to ensure minimum replication from the source cluster to the destination cluster.

 

APPENDIX

  1. kafka_connect_env.sh

 

2.replicator_task.json

 

      3. Full migration script