May 16, 2023

Streaming HarperDB Records with NATS and Kafka

Welcome to Community Posts
Click below to read the full article.
Arrow
Summary of What to Expect
Table of Contents

Change Data Capture (CDC) is a popular design pattern used to track changes in data from a source database and stream those changes to downstream processes. In HarperDB, the clustering engine and custom functions features can be used in conjunction to implement CDC. In this tutorial, we’ll see how to utilize the internal NATS streaming service and the Fastify Kafka plugin to publish new records to Kafka.  

HarperDB Setup

To start out, we need to spin up HarperDB with custom functions enabled alongside Kafka and Zookeeper. Add the following contents to `docker-compose.yml`:

services:
 harperdb:
   image: harperdb/harperdb
   container_name: harperdb
   ports:
     - 9925:9925
     - 9926:9926
   volumes:
     - ./harperdb:/home/harperdb/hdb
   environment:
     - HDB_ADMIN_USERNAME=admin
     - HDB_ADMIN_PASSWORD=admin
     - CUSTOM_FUNCTIONS=true
 zookeeper:
   image: confluentinc/cp-zookeeper
   container_name: zookeeper
   ports:
     - "2181:2181"
   environment:
     ZOOKEEPER_CLIENT_PORT: 2181
     ZOOKEEPER_TICK_TIME: 2000
 kafka:
   image: confluentinc/cp-kafka
   container_name: kafka
   links:
     - zookeeper
   ports:
     - "9092:9092"
   environment:
     KAFKA_BROKER_ID: 1
     KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
     KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:9092'
     KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
     KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0


For this example, we’ll store the database contents locally in `./harperdb` directory: `mkdir harperdb`. Also, note that we are not specifying `CLUSTERING_ENABLED=true` in docker-compose. This will break the initial startup, and we’ll configure via Harper Studio console. 

Start up the services via `docker-compose up -d`.

Now we need to connect our local instance to Harper Studio. Specify the username and password from the docker compose file as well as port and host. 

After we log in, we can create a cluster user:

Finally, let’s create a schema and table. We’ll use our favorite `dev` schema and `dog` table respectively. 

Custom Functions Setup

HarperDB has an existing template utilizing the internal NATS stream and publishing to WebSockets: ​​https://github.com/HarperDB-Add-Ons/cf-template-websockets

We will modify this setup to publish to Kafka. But first, clone this repo into the `custom_function` directory of your HarperDB instance. 

```

cd harperdb/custom_functions

git clone https://github.com/HarperDB-Add-Ons/cf-template-websockets.git

```

To get this working, we need to rename `config.json.example` to `config.json` and update our NATS user and pass to one we created via HarperDB Studio. Finally, run `npm i` to install the dependencies. 

NOTE: HarperDB Studio cannot parse file names with multiple “.” so it may say “File does not exist”. Simply rename the files if you want to see the file contents on the console. 

Now restart HarperDB, and we can use the example client file (`client.example.js`) to test the WebSocket connection. 

import WebSocket from 'ws';


const ws = new WebSocket(`ws://127.0.0.1:9926/cf-template-websockets/`);
ws.on("open", () => {
 console.log("open!");
 ws.send(
   JSON.stringify({
     action: "subscribe",
     schema: "dev",
     table: "dog",
     id: "*",
   })
 );
});
ws.on("message", (msg) => {
 console.log("msg", msg.toString());
});
ws.on("error", console.error);
new Promise((r) => setTimeout(r, 1 << 30));

Once we start this function, we should see the message “open!” and adding new records to our `dog` table will print out the records:

```
curl --location 'http://localhost:9925' \
--header 'Authorization: Basic YWRtaW46YWRtaW4=' \
--header 'Content-Type: application/json' \
--data '{
    "operation": "insert",
    "schema": "dev",
    "table": "dog",
    "records": [
        {
            "dog_name": "Kenny",
            "owner_name": "Kyle",
            "breed_id": 154,
            "age": 7,
            "weight_lbs": 38
        }
    ]
}'
```

```
open!
msg {"dog_name":"Kenny","owner_name":"Kyle","breed_id":154,"age":7,"weight_lbs":38,"id":"2dc7aafb-c454-477f-8943-1005d791935c","__updatedtime__":1683914573393.6323,"__createdtime__":1683914573393.6323}
```

Modifying to Publish to Kafka

Instead of publishing messages back to the WebSocket client, let’s now publish JSON messages to Kafka. To do so, install the Fastify Kafka library: `npm i fastify-kafkajs`.

Then we can import and register the Kafka client. 

import fastifyWebsocket from "@fastify/websocket";


…


 // kafka client
 await server.register(fastifyKafkaJS, {
   clientConfig: {
       brokers: ['kafka:9092'],
       clientId: 'demo-app'
   }});

We can now simply modify the `onPublishedMessage` function to publish to Kafka instead of writing back to the socket:

 function onPublishedMessage(message) {
     server.kafka.producer.send({
       topic: 'test-topic',
       messages: [{ key: message.id, value: JSON.stringify(message)}]
     })
   }

Now restart the server and connect to our WebSocket client again. Publish another message to HarperDB, and we can check that it has been published to Kafka by sshing into the Kafka container and using the `kafka-console-consumer` binary:

```
sh-4.4$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --from-beginning
{"dog_name":"Kenny","owner_name":"Kyle","breed_id":154,"age":7,"weight_lbs":38,"id":"7d42ad97-0b5b-47e0-8b3b-badc67b58491","__updatedtime__":1683917222162.693,"__createdtime__":1683917222162.693}
```

Wrapping Up

In this tutorial, we saw how to use the internal NATS stream to listen to changes to data in HarperDB. We then created a Fastify route to subscribe to those tables and publish those new messages to WebSockets and Kafka. You can modify the onPublishedMessage method to publish to multiple topics and also run this WebSocket client in the background to emulate a Debezium-like experience.