Building Real-time Applications with Amplication and Kafka

Muly Gottlieb
Muly Gottlieb
Nov 9, 2023
Building Real-time Applications with Amplication and KafkaBuilding Real-time Applications with Amplication and Kafka

Modern applications require near-real-time communication. For example, if you were working on an airline system, you'd need to ensure that your notifications are sent in real-time.

Imagine a situation where a flight was canceled, and your users were informed after 1 hour.

That would be highly chaotic and frustrating to your users and would significantly impact the user experience of your application. Therefore, you need to utilize real-time notifications for such mission-critical use cases. One such tool that lets you build real-time systems is Kafka.

What is Kafka?


(https://kafka.apache.org/) is a distributed streaming platform that handles real-time data streams. It operates on a **publish-subscribe** model, allowing producers to publish messages to topics and consumers (subscribers) to subscribe and consume those messages.

A typical Kafka architecture is comprised of these components:

  • Topics: Stream of records or messages. It is a category or feed name to which producers publish messages and from which consumers consume messages.
  • Producers: Producers in Kafka are responsible for publishing (writing) messages to topics. They produce data or records in Kafka by sending messages to a specific topic.
  • Consumers (Subscribers): Consumers in Kafka are responsible for subscribing to topics and consuming (reading) messages from them. Consumers read data from one or more topics by subscribing to those topics.
  • Brokers: Brokers are the fundamental units of a Kafka cluster. They are responsible for handling the storage and replication of Kafka topics. A Kafka cluster consists of one or more brokers that work together to provide fault tolerance and scalability. Brokers store and manage the topic partitions and handle the data transfer between producers and consumers.

As shown above, Kafka works on a pub-sub model. In a nutshell, events (messages) are pushed onto a topic that has a set of subscribers. The subscribers immediately receive a Kafka Topic message and process it independently.

Such pub-sub architectures are known to:

  1. Be highly scalable horizontally.
  2. Have a high throughput
  3. Have Low Latency

Additionally, it offers strong durability guaranteed by persisting messages on a disk and replicating them across a cluster of servers.

Automate and standardize
backend development.
Get a demo

Building an Application with Kafka

Now that we understand Kafka, it's time to get our hands dirty and build an application around it to showcase its real-time functionalities. But, building applications, especially microservices, from scratch is time-consuming. You need to spend a lot of time on your boilerplate, setting up Kafka connections, and so much more.

One of the best approaches to bootstrapping a microservice with Kafka is through Amplication. If you've not heard of it, it's a platform that generates production-ready backend services. It lets you bootstrap your microservices and create a deployable service in under a few minutes. Luckily for us, it has a pre-built Kafka integration that we can use to set up Kafka communication directly into our app.

Automate and standardize
backend development.
Get a demo

Step 01: Setting up Amplication

Creating an Amplication Project

First things first, you'll need an Amplication project. If you're new to Amplication, here are the steps you'll need to follow to create your very first Amplication project:

  1. Authenticate with GitHub
  2. Connect a GitHub repository to store the project.
  3. Choose to generate GraphQL, REST, and an Admin UI.
  4. Add MongoDB as a database
  5. Create a new project with one service - Order Service (from the Amplication template)

After you've created the project, you can head over to your connected GitHub repo to see the newly created project. Amplication automatically syncs your code with GitHub, so all you have to do is focus on delivering business value.


Before we proceed any further, I'd recommend creating two more services with Amplication so that we can demo the Kafka integration later in this article. To do so, head over to your newly created project, click on "Add Resource", and then choose "Service" as shown below:


Next, you'll be directed to the same flow as before. Over here, I've created two services: "Product" and "Notification". Once you successfully finish creating the services, you'll see all three services, as shown below:


In this example architecture, the three services were created for the following purposes:

  1. Notification Service: Responsible for handling notification management
  2. Product Service: Responsible for managing a list of products
  3. Order Service: Responsible for managing orders.

With these, we can build an order notification system that triggers a notification (like the airline scenario discussed above) when an order is created.

Step 02: Building Entities

Earlier, we used a default set of entities for the order service (from the Amplication template). However, the two other services - Product and Notification do not have any entities. Therefore, these entities must be created. With Amplication, navigate to your service and then to the "Entities" section and add new entities for Product and Notification, as shown below:

The Product Entity


As shown above, the Product entity is kept simple with only a single attribute - "name".

The Notification Entity


I've kept the Notification entity simple, with only a Title and a Message. But you're free to extend this to add more attributes, such as the User and Notification Type with Amplication's ample data types.

At this point, it is advisable to click "Rebuild" in order to generate the code for your services with everything you defined up to now and get a PR from Amplication into your GitHub repository. Wait for the build to finish and go check out your PR to see the services that Amplicaion created for you, with all the relevant entities, DB access, authentication and authorization, APIs, and more. When you are happy with the results - go ahead and merge this PR. The first PR from Amplication is big because it creates the entire service(s) for you with all relevant modules. Rebuilding your services after each group of changes will allow you to get smaller PRs from Amplication after the initial one, and this will allow you to observe the specific code changes that Amplication creates corresponding to each group of changes you made in Amplication.

Step 03: Adding a Message Broker

Once you've added your entities, you're ready to integrate Kafka. But, to do so, you need to add a Message Broker. The broker is responsible for managing the Kafka cluster and ensuring communication across microservices.

With Amplication, you can create a broker by navigating to "Add Resource" and by selecting "Message Broker," as shown below:


Once you've created the broker successfully, you should see the message broker settings as seen below:


Step 04: Adding Topics to the Broker

Next, you can start creating your Topics. To do so, navigate to your Broker and click on "Topics". You'll see the screen below:


For our use case, let's create a topic that will handle events that handle order changes. For example, if an order changes from sent to delivered, we might want to send a notification, update the logs, and more.

Therefore, let's create the Order Status Topic. Once you've created the topic, your screen should look like the following:


Step 05: Connecting Microservices with the Topic

Next, we must set up connections with our microservices and the topic. This lets the microservices start communication with each other through the topic. Luckily, you don't have to code it yourself. Instead, you do this via Amplication itself.

To do so, head over to your microservices in Amplication. We can configure message broker integration from each microservice. To do so, let's first set up an integration with the Order Service.

The order service will send messages to the broker when the order status changes.

Therefore, the order service will act as a publisher. To map this in Amplication, head to the Order Service and navigate to "Connections". You should see the output below:


Enable the integration and set the message pattern to "Send".


This ensures that the order service can send messages to the topic.

Likewise, navigate onto your Notification Service, turn on its message broker integration, and set the message pattern on the Order Status to Receive. This means that the Notification Service will be programmed to listen to messages that are published to the Order Status topic. If you've done this right, you should see the configuration below:


Step 06: Adding the Kafka Plugin

Now that everything is ready with the generic topics and brokers, we can add the Kafka plugin to use Kafka as the backbone for this event-driven architecture. To do so, you'll have to add the Kafka plugin at the microservice level.

Open your microservice in Amplication, navigate to the plugins section, and install Kafka.


Ensure that you install Kafka on all services that use the topic. In our case, it'll be the Order and Notification Service.

If you've done everything correctly, your changelog on Amplication should look as below:


Now that we've added the code for the Kafka integration go ahead and commit the changes to ensure that we are saving our state as we go. Click "Rebuild", and Amplication will open a new PR on your GitHub repository containing the changes you created since the last "Rebuild".


Did you know? Once you click "Build," Amplication will automatically build all your services to ensure they work as expected 🥳.

Next, head to your GitHub repository to check out what Amplication has done behind the scenes.

If you observe the PR, you can notice that Amplication has made some changes to some files:

  1. apps/order-service/.env
  2. apps/order-service/docker-compose.yml
  3. apps/order-service/package.json
  4. apps/order-service/src/kafka/kafka.module.ts
  5. apps/order-service/src/app.module.ts

Similarly, our Order Service and Notification service have updated the same files.

Amplication makes these changes to ensure your Kafka connectivity is running smoothly. Let's understand the changes that Amplication has made on each file:

Change 1: apps/order-service/.env


The env changes let you run the Kafka client locally.

Change 2: apps/order-service/docker-compose.yml


When you start the service locally, the Docker Compose file lets you spin up a Kafka Broker along with Zookeeper and the Kafka web UI.

Change 3: apps/order-service/package.json


The package.json includes the library for Kafka.

Change 04: apps/order-service/src/kafka/kafka.module.ts


A Kafka module was generated that creates a Kafka client with dependency injection. The producer service that was generated is responsible for emitting events.

Change 05: apps/order-service/src/app.module.ts


The Kafka module is automatically added to the global app module.

Next, go ahead and merge the PR and clone the repo locally to your machine.

After you've cloned the repo, we can add custom logic by ourselves to help showcase the communication across these services.

Did you know? Amplication includes a sophisticated git merging strategy ("Smart git sync") that knows how to seamlessly handly merging of your custom code with the generated code, in a way that would minimize collisions and make sure that your changes are never overwritten by Amplication 🥳.

Step 07: Adding the Custom Kafka Communication Logic

Before proceeding, you will want to install the dependencies locally. To do so, we'll first set up all the services. Execute the following command to initialize the services (replace service-dir with the appropriate name):

cd apps && cd service-dir && npm i 

Ensure to do this on all services.

Now, let's add an event that's emitted from the order service. To do so, open up the apps/order-service/src/order/order.service.ts and include the following code:

import { Injectable } from "@nestjs/common";
import { PrismaService } from "../prisma/prisma.service";
import { OrderServiceBase } from "./base/order.service.base";
import { Prisma, Order } from "@prisma/client";
import { KafkaProducerService } from "src/kafka/kafka.producer.service";
import { MyMessageBrokerTopics } from "src/kafka/topics";

@Injectable()
export class OrderService extends OrderServiceBase {
  constructor(protected readonly prisma: PrismaService, protected readonly kafkaProducer: KafkaProducerService) {
    super(prisma);
  }

  async create<T extends Prisma.OrderCreateArgs>(args: Prisma.SelectSubset<T, Prisma.OrderCreateArgs>): Promise<Order> {
    const order = await super.create(args);

    await this.kafkaProducer.emitMessage(MyMessageBrokerTopics.OrderStatus, {
      key: null,
      value: {
        id: order.id,
        price: order.totalPrice,
        status: 'CREATED'
      }
    });

    console.log('Message pushed to Topic')

    return order;
  }
}

The snippet above will create an order and immediately publish an event to the Order Status topic with the Order ID, Status, and Price of the order. Its consumers can do anything they want with the data.

Step 08: Launching the Services

Next, we can launch the two services - Order and Notification services locally to see the real-time communication in action. But before we proceed, we'll have to make some changes to ensure we can run the app locally.

  1. Update the environment variables of the Order Service
  2. Update the environment variables of the Notification Service

We'll be updating the database connections and application ports to avoid conflicts. Therefore, update your env as shown below:

Order Environment

BCRYPT_SALT=10
COMPOSE_PROJECT_NAME=amp_clofcb6670j1eif01vzhspkid
PORT=3000
DB_USER=admin
DB_PASSWORD=admin
DB_PORT=27017
DB_NAME=orders
DB_URL=mongodb://admin:admin@localhost:27017/orders?authSource=admin
KAFKA_BROKERS=localhost:9092
KAFKA_ENABLE_SSL=false
KAFKA_CLIENT_ID=order-service
KAFKA_GROUP_ID=order-service

Notification Environment

BCRYPT_SALT=10
COMPOSE_PROJECT_NAME=amp_clofcvcyj16kofn01h2m0jk7i
PORT=3001
KAFKA_BROKERS=localhost:9092
KAFKA_ENABLE_SSL=false
KAFKA_CLIENT_ID=notification-service
KAFKA_GROUP_ID=notification-service
DB_USER=admin
DB_PASSWORD=admin
DB_PORT=27017
DB_NAME=notifications
DB_URL=mongodb://admin:admin@localhost:27017/notifications?authSource=admin

Now, let's start the Order Service. To do so, open the Order Service folder in your terminal and run the following:

"start the database where the server component will connect to"
npm run docker:dev

"initialize the database"
npm run db:init

"start the server component"
npm run start

Execute the same commands for your Notification Service as well to start your services.

The commands will pull the required Docker images and launch the containers and the app.

Next, navigate in your browser tolocalhost:8080 to view the Kafka UI. Head over to "Consumers" to see the services being registered:


This means that our services are now ready for communication.

Next, let's inspect the Order Service by visiting - http://localhost:3000/api/. This will open up its Swagger UI.


Let's go ahead and create an order first. To do so, let's invoke the POST /api/orders endpoint. When doing so, ensure that you're using the following payload:

{
  "discount": 1550,
  "quantity": 1550,
  "totalPrice": 1550
}

After you've done so, you'll see the output shown below:


This ensures that our database connection has been set up properly and that our Order Service is now working.

Next, head over to the kafka.controller.ts in your Notification service and add the snippet below:

import { EventPattern, Payload } from "@nestjs/microservices";
import { KafkaMessage } from "./KafkaMessage";
import { Controller } from "@nestjs/common";

@Controller("kafka-controller")
export class KafkaController {
  @EventPattern("order.status")
  async onOrderStatus(
    @Payload()
    message: KafkaMessage
  ): Promise<void> {
    const { value, topic } = message as any;
    console.log({ value, topic });
  }
}

The function above processes all messages that are received through Kafka onto the Notification Service (on the order.status topic). You can handle all of your events here. But to keep things simple, I've only logged the message and the topic that the message came from.

To test the changes, restart your notification server and re-execute the POST /api/orders/ endpoint with our custom payload. Next, head to your terminal to view the logs on the Notification service. If you've done everything right, you should see the output shown below:


The figure above depicts the payload we passed from the Order Service along with the name of the topic that we defined in Amplication.

You can further customize this method to add a service like an Email Service that sends emails to the user.

Wrapping up

And there we have it 🥳!

It didn't even take 20 minutes to build two microservices connected to MongoDB and communicate with Kafka for seamless event-driven communication. As you can see, using Amplication to speed up your microservice development with integrations like Kafka is enormously advantageous to ensure your systems can deliver real-time notifications and, more importantly, that you can deliver a lot of VALUE really FAST by using Amplication to speed up your backend development processes, allowing you to "JUST CODE WHAT MATTERS".

Thank you for reading!