November 6, 2023

Indexing New Data from HarperDB into Elasticsearch

Welcome to Community Posts
Click below to read the full article.
Arrow
Summary of What to Expect
Table of Contents

Event-driven architectures and change data capture are two popular software paradigms useful for streaming data in modern applications. For example, to stream data from PostgreSQL databases, Debezium can read write-ahead-logs to look for changes and publish them to Kafka for further processing. 

While HarperDB does not currently have native integrations for Debezium or Elasticsearch, we can leverage HarperDB’s application layer to periodically fetch data and index it in Elasticsearch. In this tutorial, we’ll go over a proof of concept to index new data from HarperDB into Elasticsearch. 

Setup

We’ll use Docker Compose to run a local instance of HarperDB and Elasticsearch. 

---
version: "3"
services:
  elasticsearch:
    image: elasticsearch:8.8.0
    ports:
      - 9200:9200
      - 9300:9300
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
  harperdb:
    image: harperdb/harperdb
    container_name: harperdb
    restart: unless-stopped
    volumes:
      - ./harperdb:/home/harperdb/hdb
    ports:
      - 9925:9925
      - 9926:9926
    environment:
      - HDB_ADMIN_USERNAME=HDB_ADMIN
      - HDB_ADMIN_PASSWORD=password

This spins up a single-node instance of Elasticsearch as well as HarperDB. Note that I’m mapping a local directory called `harperdb` to persist data, but you can opt to skip that step. 

We’ll create our standard `dev` schema and `dogs` table via curl:

curl --location localhost:9925 \
--header 'Content-Type: application/json' \
--header 'Authorization: Basic SERCX0FETUlOOnBhc3N3b3Jk' \
--data-raw '{
  "operation": "create_schema",
  "schema": "dev"
}'

curl --location 'http://localhost:9925' \
--header 'Content-Type: application/json' \
--header 'Authorization: Basic SERCX0FETUlOOnBhc3N3b3Jk' \
--data '{
    "operation": "create_table",
    "schema": "dev",
    "table": "dogs",
    "hash_attribute": "id"
}'

We can seed some sample data: 

curl --location 'http://localhost:9925' \
--header 'Content-Type: application/json' \
--header 'Authorization: Basic SERCX0FETUlOOnBhc3N3b3Jk' \
--data '{
"operation": "insert",
"schema": "dev",
"table": "dogs",
"records": 
[
  {
    "id": 1,
    "dog_name": "Harper",
    "owner_name": "Stephen",
    "breed_id": 346,
    "age": 7,
    "weight_lbs": 55,
    "adorable": true
  },
  {
    "id": 2,
    "dog_name": "Alby",
    "owner_name": "Kaylan",
    "breed_id": 348,
    "age": 7,
    "weight_lbs": 84,
    "adorable": true
  },
  {
    "id": 3,
    "dog_name": "Billy",
    "owner_name": "Zach",
    "breed_id": 347,
    "age": 6,
    "weight_lbs": 60,
    "adorable": true
  }
]
}'

Finally, let’s create the index on Elasticsearch:

curl -X PUT http://localhost:9200/dogs

Writing a Custom Connector

We will write a simple custom connector to periodically poll data from HarperDB and index them to Elasticsearch if it’s new data that we have not seen before. In this simple example, we will use the id field to determine this, but for other workloads, using the timestamp field may be more appropriate. 

We are using `axios` to request data from HarperDB, the `elasticsearch` client to connect to Elasticsearch, and finally `node-cron` to schedule our polling operation. 

const axios = require('axios');
const elasticsearch = require('elasticsearch');
const cron = require('node-cron');

// Elasticsearch client setup
const elasticClient = new elasticsearch.Client({
  host: 'localhost:9200',
  log: 'error'
});

let lastProcessedId = 0;

async function fetchDataAndIndex() {
  const data = JSON.stringify({
"operation": "search_by_conditions",
  	"schema": "dev",
  	"table": "dogs",
"get_attributes": ["*"],
"conditions": [
    {
      "search_attribute": "id",
      "search_type": "greater_than",
      "search_value": lastProcessedId
    }
]
  });
  
  const config = {
    method: 'post',
    url: 'http://localhost:9925',
    headers: { 
      'Content-Type': 'application/json',
      'Authorization': 'Basic SERCX0FETUlOOnBhc3N3b3Jk'
    },
    data : data
  };

  try {
    const response = await axios(config);
    const newData = response.data;

    for (const item of newData) {
      try {
        const { id, ...rest } = item;

        console.log("data is ", rest);

        await elasticClient.index({
          index: 'dogs',
          id: id,
          body: rest 
        });

        lastProcessedId = id;
        console.log(`Indexed successfully: ${id}`);
      } catch (error) {
        console.error(`Error indexing data: ${error}`);
      }
    }

  } catch (error) {
    console.error(`Error fetching data: ${error}`);
  }
}

// Schedule the API call and indexing job to run every 5 seconds
cron.schedule('*/5 * * * * *', () => {
  console.log('Fetching data from HarperDB and indexing into Elasticsearch...');
  fetchDataAndIndex();
});


The code is relatively simple. We use `cron.schedule` to run the `fetchDataAndIndex` function every 5 seconds. Within that field, we run the sql query to fetch all the data where the id is greater than what we’ve seen before. The body of that text is then indexed into Elasticsearch. 

Testing the Connector

Once we run the connector, it will immediately index our existing records and increment the id to 3. Then every read after that will return no data and skip. 

We can run a curl command to search for our indexed data:

curl -X GET "localhost:9200/dogs/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "match": { "dog_name": "harper" } } }'

That query will return one result as we expect. 

While the connector is running, we can issue more data into HarperDB. We can use same dog names or other fields so that when we search we can get more than one result. 

Parting Thoughts

In this tutorial, we set up a proof of concept for polling data from HarperDB and continuously indexing to Elasticsearch. Compared to more traditional change data capture architectures using Debezium, data isn’t “pushed” but rather polled. We are also using the incrementing id field, but for some workloads, using the timestamp field would be more appropriate. Finally, we are not persisting the last indexed id, so restarting the connector would attempt to reindex from start. These are all limitations that a production version of this connector would need to address, but until HarperDB comes out with a native Debezium or Elasticsearch connector, we can use this custom workaround to bridge the data gap.