Using Apache Kafka for Asynchronous Communication in Microservices

Written by: Wale Martins

While microservice architecture might not be a silver bullet for all systems, it definitely has its advantages, especially when building a complex system with a lot of different components. Of course, if you're considering microservices, you have to give serious thought to how the different services will communicate.

In this post, we'll look at how to set up an Apache Kafka instance, create a user service to publish data to topics, and build a notification service to consume data from those topics. Specifically, we're going to build a two-factor authentication app where a user will register, receive a mail with a verification code, and complete their registration using the code. The source code can be found here.

Why Apache Kafka?

Kafka is a distributed streaming platform created by LinkedIn in 2011 to handle high throughput, low latency transmission, and processing of streams of records in real time. It's three major capabilities make it ideal for this use case:

  • Publishing and subscribing to streams of records. In this respect, it is similar to a message queue or enterprise messaging system.

  • Storing streams of records in a fault-tolerant way.

  • Processing streams of records as they occur.

Setting Up Apache Kafka

Before starting this tutorial, the following will be required:

  • Docker for Mac or Docker for Windows

  • Knowledge of Docker Compose

  • Knowledge of Node.js

We will be using the Wurstmeister Kafka Docker image. Note that Kafka uses Zookeeper for coordination between different Kafka nodes.

A docker-compose.yml similiar to the one below is used to pull the images for Kafka and Zookeeper. One of the required configuration options for the Kafka service is KAFKA_ZOOKEEPER_CONNECT, which tells Kafka where to find the Zookeeper instance.

version: '2.1'
  services:
    zookeeper:
      container_name: zookeeper
      image: wurstmeister/zookeeper
      ports:
        - "2181:2181"
    kafka:
      container_name: kafka
      image: wurstmeister/kafka
      ports:
        - "9092"
      depends_on:
        - "zookeeper"
      environment:
        KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

Publishing Data to Kafka Topics

To publish data to a Kafka topic, we are going to create a user service that provides two endpoints:

  • /api/register - Stores user details in an In-Memory store node-cache and publishes user data to a Kafka topic, user_account_created.

  • /api/verify - Verifies that the provided code is correct and publishes user data to a Kafka topic, user_account_verified.

We use the node-rdkafka NPM package to create a producer that connects to Kafka from our node app:

  let producerReady;
  producer = new kafka.Producer({
    debug: 'all',
    'client.id': 'user-api',
    'metadata.broker.list': KAFKA_BROKER_LIST,
    'compression.codec': 'gzip',
    'retry.backoff.ms': 200,
    'message.send.max.retries': 10,
    'socket.keepalive.enable': true,
    'queue.buffering.max.messages': 100000,
    'queue.buffering.max.ms': 1000,
    'batch.num.messages': 1000000,
    dr_cb: true
  });
  producer.connect({}, err => {
    if (err) {
      logger.error('connect', err);
    }
  });
  producerReady = new Promise((resolve, reject) => {
    producer.on('ready', () => {
      logger.info('producer ready');
      resolve(producer);
    });
  });

We create a new promise object that resolves to a producer that is ready to start publishing data. This is used in our sendMessage function, which publishes data to a Kafka topic partition:

  KafkaService.prototype.sendMessage = function sendMessage(
    topic,
    payload,
    partition = 0
  ) {
    return producerReady
      .then(producer => {
        const message = Buffer.from(JSON.stringify(payload));
        producer.produce(topic, partition, message);
      })
      .catch(error => logger.error('unable to send message', error));
  };

Consuming Data from Kafka Topics

To consume data from our Kafka topic, we are going to create a notification service that listens for data coming from our topics and sends an email with either a verification code or success message depending on the topic it got the data from.

We create a consumer that connects to Kafka, where KAFKA_BROKER_LIST is a comma-separated list of all Kafka instances.

  process.stdin.resume(); // keep process alive
  require('dotenv').config();
  const Kafka = require('node-rdkafka');
  const logger = require('./logger');
  const sendMail = require('./email');
  const KAFKA_BROKER_LIST = process.env.KAFKA_BROKER_LIST;
  const consumer = new Kafka.KafkaConsumer({
    //'debug': 'all',
    'metadata.broker.list': KAFKA_BROKER_LIST,
    'group.id': 'notification-service',
    'enable.auto.commit': false
  });

The consumer object returned by node-rdkafka is an instance of a readable stream. We wait for the ready event to subscribe to our topics, user_account_created and user_account_verified, and listen for data in those topics:

  const topics = [
    'user_account_created',
    'user_account_verified'
  ];
  //counter to commit offsets every numMessages are received
  let counter = 0;
  let numMessages = 5;
  consumer.on('ready', function(arg) {
    logger.info('consumer ready.' + JSON.stringify(arg));
    consumer.subscribe(topics);
    //start consuming messages
    consumer.consume();
  });
  consumer.on('data', function(metadata) {
    counter++;
    //committing offsets every numMessages
    if (counter % numMessages === 0) {
      logger.info('calling commit');
      consumer.commit(metadata);
    }
    // Output the actual message contents
    const data = JSON.parse(metadata.value.toString());
    logger.info('data value', data);
    if(metadata.topic === 'user_account_created'){
      const to = data.email;
      const subject = 'Verify Account';
      const content = `Hello ${data.first_name},
      Please use this code ${data.code} to complete your verification`;
      sendMail(subject, content,to);
    }else if(metadata.topic === 'user_account_verified') {
      const to = data.email;
      const subject = 'Account Verified';
      const content = `Hello ${data.first_name},
      You have successfully been verified`;
      sendMail(subject, content,to);
    }
  });
  consumer.on('disconnected', function(arg) {
    logger.info('consumer disconnected. ' + JSON.stringify(arg));
  });
  //logging all errors
  consumer.on('event.error', function(err) {
    logger.error('Error from consumer', err, 'code: ', err.code);
  });
  //starting the consumer
  consumer.connect();

The data event handler is called when a message is published to any of the topics we are listening to. Here we parse the incoming message and check the metadata object to know which topic the received data is for, so we can carry out the appropriate action.

Conclusion

Our two-factor authentication app demonstrates the communication pattern between only two microservices using Apache Kafka (there are other systems like RabbitMQ, ZeroMQ), but by decoupling communication between those services, we add flexibility for the future. For example, let's say we add a recommendation service in the future that needs to send out recommendations whenever a new user is signed on; it simply subscribes to the user_account_verified topic, and there would be no need to change the user service.

Resources

Stay up to date

We'll never share your email address and you can opt out at any time, we promise.