Skip to main content

How to Stream Blockchain Data to Kafka using Functions

Updated on
Nov 13, 2024

9 min read

Overview

Real-time blockchain data processing often requires robust message queues to handle high throughput and ensure reliable data delivery. Apache Kafka is one of the most popular solutions for this use case.

In this guide, we'll show you how to set up a local Kafka cluster and stream blockchain data to it using QuickNode Functions.

You can find the code from this repo in the QuickNode's awesome-functions GitHub repo.

Quick Demo

Watch this short demonstration of setting up and using Kafka with QuickNode Functions:

In this demo, you'll see:

  • Setting up a local Kafka environment with Docker
  • Exposing Kafka via ngrok
  • Creating a Function to process blockchain data
  • Configuring a Stream to send data to Kafka
  • Monitoring data flow in the Kafka UI

Now, let's go through the setup step by step.

What You Will Need


What You Will Do


  • Set up a local Kafka cluster using Docker
  • Expose Kafka to the internet using ngrok
  • Create a Function to stream blockchain data to Kafka
  • Configure Streams to deliver data to your Function
  • Monitor data flow using Kafka UI

Local Setup

First, let's set up our local Kafka environment. Create a new directory for your project and add the following docker-compose.yml:

version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"

kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,EXTERNAL://${NGROK_URL:-localhost:9092}
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
depends_on:
- kafka
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: PLAINTEXT
DYNAMIC_CONFIG_ENABLED: 'true'

This configuration sets up:

  • A Zookeeper instance for Kafka cluster management
  • A Kafka broker configured for both internal and external access
  • A web-based UI for monitoring Kafka

Exposing Kafka

To make our local Kafka accessible to QuickNode Functions, we'll use ngrok. Open a terminal and run:

ngrok tcp 9092

You'll receive a forwarding URL like tcp://2.tcp.ngrok.io:18139. Copy this URL - we'll need it for both Docker and our Function.

Now, start Kafka with the ngrok URL. Don't forget to replace the forwarding URL with your own, without the tcp:// prefix:

NGROK_URL=2.tcp.ngrok.io:18139 docker-compose up -d

Creating the Function

In the QuickNode dashboard, create a new Function and add kafkajs as a dependency. Here's our Function code:

const { Kafka } = require('kafkajs');

// Configure Kafka broker with your ngrok URL without the `tcp://` prefix
const KAFKA_BROKER = process.env.KAFKA_BROKER || '2.tcp.ngrok.io:18139'; // 👈 Replace with your own ngrok URL

async function initializeKafka() {
try {
const kafka = new Kafka({
clientId: 'quicknode-stream-producer',
brokers: [KAFKA_BROKER],
retry: {
initialRetryTime: 100,
retries: 5
}
});

const producer = kafka.producer();
await producer.connect();
console.log('Successfully connected to Kafka broker');
return producer;
} catch (error) {
console.error('Failed to initialize Kafka:', error);
throw error;
}
}

async function main(params) {
let producer = null;
try {
producer = await initializeKafka();

const {
metadata: { dataset, network },
data,
user_data
} = params;

// Create topic name
const sanitizedDataset = dataset.toLowerCase().replace(/[^a-z0-9-]/g, '-');
const topic = `${network.toLowerCase()}-${sanitizedDataset}`;

// Prepare message payload
const messagePayload = {
dataset,
network,
timestamp: new Date().toISOString(),
data,
user_data
};

// Send to Kafka
const result = await producer.send({
topic,
messages: [
{
key: `${network}-${dataset}-${Date.now()}`,
value: JSON.stringify(messagePayload),
headers: {
network,
dataset,
timestamp: new Date().toISOString()
}
}
]
});

console.log(`Successfully sent data to Kafka topic ${topic}`);

// Always disconnect before returning
await producer.disconnect();
console.log('Kafka producer disconnected');

return {
status: 'success',
message: `Data sent to Kafka topic ${topic}`,
metadata: {
dataset,
network,
kafka_result: result
}
};

} catch (error) {
if (producer) {
await producer.disconnect();
console.log('Kafka producer disconnected after error');
}

return {
status: 'error',
message: error.message,
metadata: {
dataset: params.metadata?.dataset,
network: params.metadata?.network
}
};
}
}

module.exports = { main };

Leave testing parameters as is, and click "Save & close".

Key features of this Function:

  • Creates a Kafka producer for each invocation
  • Automatically creates topics based on network and dataset
  • Includes metadata in message headers
  • Properly handles connections and disconnections
  • Includes error handling and logging

Setting Up the Stream


  1. Go to the Streams section in your QuickNode dashboard
  2. Click "Create Stream"
  3. Select your desired chain, network, and dataset
  4. Leave other settings as is
  5. Add your Kafka Function as a destination after selecting "Select an existing Function"
  6. Start the Stream

Monitoring Data Flow

Visit the Kafka UI at http://localhost:8080 to monitor:

  • Topics being created
  • Messages arriving in real-time
  • Consumer groups and partitions
  • Broker health

Kafka UI

You can also check your Function logs in the QuickNode dashboard for detailed execution information.

Common Issues and Solutions

Connection Refused

If you see connection errors:

  • Verify ngrok is running
  • Check the ngrok URL in your Function code
  • Ensure Kafka containers are running

Missing Topics

If topics aren't appearing:

  • Topics are created automatically on first message
  • Check Function logs for successful message delivery
  • Verify topic naming convention in the code

Message Delivery Issues

If messages aren't appearing:

  • Check Function logs for successful delivery
  • Verify topic names in Kafka UI
  • Ensure proper Kafka connection configuration

Production Considerations

While this setup works great for development, for production you should consider:

  1. Using a managed Kafka service instead of local setup
  2. Implementing proper security measures
  3. Setting up monitoring and alerting
  4. Handling connection retries and backoff
  5. Implementing message delivery guarantees

Additional Resources

Final Thoughts

Subscribe to our newsletter for more articles and guides on blockchain development. Regardless of what you are building, we would love to hear about it. Drop us a line on Discord or Twitter and let us know what you're working on!

We ❤️ Feedback!

Let us know if you have any feedback or requests for new topics. We'd love to hear from you.

Share this guide