Skip to main content

How to Set Up a Stream using REST API

Created on
Updated on
Nov 26, 2024

10 min read

Overview

Using APIs allow for easy automation, seamless integration with other systems, and the ability to scale more efficiently. By using the QuickNode Streams REST API, developers can directly integrate blockchain data into their backends, skipping the need for manual data handling through user interfaces.

In this guide, we will walk you through setting up and managing Streams using the QuickNode REST API. You will learn how to create Streams, configure destinations, and apply filters to process blockchain data.

What You Will Do

  • Create a new Stream with customizable configurations with different destinations like Webhooks, AWS S3, and PostgreSQL
  • Apply filters to process incoming data

What You Will Need

What is Streams?

Streams is a powerful feature offered by QuickNode that allows you to receive real-time and historical blockchain data delivered to various destinations. It supports flexible filtering, guaranteed delivery, and real-time processing, making it an ideal solution for applications that require historical or real-time data.

Key Features of Streams


  • Customizable Filtering: Instead of pulling massive datasets and filtering manually, define your own filtering logic, reducing data noise and processing overhead.
  • Multiple Destinations: Effortlessly stream data to Webhooks, AWS S3, PostgreSQL, and other services with simple configuration.
  • Historical Data Backfilling: Retrieve historical blockchain data using predefined templates, with transparent pricing and completion time estimates provided upfront for better planning.
  • Cost-Efficient: Pricing is based on the amount of data processed, ensuring predictable costs even for high-volume applications.
  • Data Consistency: With reorg handling built in, you can rely on consistent data delivery and ensure that no blocks or transactions are missed, even during network reorganizations.

API Capabilities

The Streams API allows you to:

  • Create and manage streams
  • Test and apply filter functions
  • Configure multiple destination types
  • Monitor and track your streams’ performance

Learn more about Streams REST API capabilities here.

In the next section, we’ll walk through how to write a filtering function to work with Ethereum block data. Then, we’ll create a new Stream using the API and apply the filter function.

Prerequisites

QuickNode Account and API Key

You will need an API key in order to use the Streams API. If you haven't signed up already and created any endpoint, you can create an account here.

To create your keys, log in to your QuickNode Account, click on the avatar icon on the top left, and select API Keys. It will bring you to the API Keys page.

Generate an API key with STREAMS_REST permissions and keep your API key handy as you'll use it to authenticate your requests.

API Key

Webhook Destination

Streams supports multiple destinations, including webhooks. To create a webhook destination, you will need to create a webhook endpoint in your web server. You can use any web server, such as TypedWebhook to create a webhook endpoint easily.

Generally, you see a webhook URL once you visit online webhook creation tools like TypedWebhook. Copy the URL and keep it handy as you'll use it later.

How to Set Up a Stream using API

Stream Setup Flow Overview

Before diving into the code, let's walk through the steps involved in setting up a Stream using the API:


  1. Filter Function: Write and test a filter function (main) that processes blockchain data.
  2. Test Filter Function: Use QuickNode API to validate the filter logic. (testFilterFunction)
  3. Create a Stream: Once validated, use the API to create a Stream and configure destinations like Webhooks or AWS S3. (setupQuickNodeStream)
  4. Run and Manage Streams: After creating a Stream, you can manage and interact with it using the API.

Step 1: Initialize Your Project

To get started, create a new directory for your project and initialize it with npm. This will create a package.json file that will keep track of your project’s dependencies.

mkdir quicknode-streams-api
cd quicknode-streams-api
npm init -y

Next, install the required dependencies for your project. You can use the following command to install the required dependencies:

npm install axios

Axios is a popular HTTP client library used for making HTTP requests in JavaScript. It provides a simple and intuitive API for sending HTTP requests and handling responses.

Step 2: Writing a Filter Function

After initializing your project, the next step is to write a filter function. A filter function is responsible for filtering out unwanted data from the blockchain. In this example, we’ll extract only block number as integer and block hash from the block dataset. To learn more about datasets that Streams can process, check out our Streams Docs.

Create a new file named index.js in the root directory of your project. This file will contain all the code for creating a Stream using the API including filtering logic. We'll build this file step by step.

Copy the code below into the index.js file and replace the QUICKNODE_API_KEY and WEBHOOK_URL placeholders with your actual API key and webhook URL.

index.js
const axios = require('axios')

const QUICKNODE_API_KEY = 'YOUR-QUICKNODE-API-KEY' // 👈 Replace with your actual API key
const WEBHOOK_URL = 'YOUR-WEBHOOK-URL' // 👈 Replace with your webhook URL

function main(stream) {
try {
// If stream is configured with metadata in the body, the data may be nested under a `data` key
const data = stream.data ? stream.data : stream

// data is an array of objects. If there is a batch of events, the array will contain multiple objects.
// For the test, there is only one object in the array.
const numberDecimal = parseInt(data[0].number, 16)
const filteredData = {
hash: data[0].hash,
number: numberDecimal,
}
return filteredData
} catch (error) {
return { error: error.message, stack: error.stack }
}
}

Next, we test the filter function using the QuickNode API.

Step 3: Testing the Filter Function

Before creating a stream, it's important to validate that your filter logic works with actual data. QuickNode's API provides a convenient method to test filter functions, allowing you to verify the behavior with a specific block number.

These parameters are required for testing the filter function:

  • network: The network name, such as ethereum-mainnet.
  • dataset: The dataset name, such as block.
  • filter_function: The base64-encoded filter function.
  • block: The block number or hash to test the filter function with.

For details, see Streams - Filter Functions Documentation.

To validate the filter function's result, you need to choose a block number and get the corresponding block hash. The code below uses 17811625 as an example block number on Ethereum Mainnet. Once you have chosen the block number or hash, you can test the filter function using the following code.

Add the following code just after the main() function.

This function takes a base64-encoded filter function as an input parameter, sends a test request to QuickNode’s API, verifying that the filtering logic behaves correctly by comparing the output with a known block hash and number. We'll call this function from the setupQuickNodeStream function.

index.js continued
async function testFilterFunction(base64FilterFunction) {
const hash =
'0xb72704063570e4b5a5f972f380fad5e43e1e8c9a1b0e36f204b9282c89adc677' // 👈 Hash of the test block
const number = '17811625' // 👈 Number of the test block

let data = JSON.stringify({
network: 'ethereum-mainnet',
dataset: 'block',
filter_function: base64FilterFunction,
block: number,
})

try {
const response = await axios.post(
'https://api.quicknode.com/streams/rest/v1/streams/test_filter',
data,
{
headers: {
accept: 'application/json',
'Content-Type': 'application/json',
'x-api-key': QUICKNODE_API_KEY,
},
}
)

if (
response.status === 201 &&
response.data.hash === hash &&
response.data.number === number
) {
console.log('Filter function tested successfully!')
return true
} else {
console.error('Error testing filter function:', response.status)
return false
}
} catch (error) {
console.error('Error testing filter function:', error.message)
throw error
}
}

Step 4: Creating a Stream

Now that we have created the filter function, we can create a Stream using the QuickNode API. We'll use the setupQuickNodeStream function to create a new Stream with the validated filter function.

Add the following code just after the testFilterFunction function.

This code first converts the filter function to a base64 string, tests it, and proceeds to create a stream if the test is successful. The stream is configured with a webhook destination, including error handling for retries and timeouts.

Check the Create Stream documentation in order to see all the available options for Streams.

index.js continued
async function setupQuickNodeStream(startSlot, endSlot) {
const filterFunctionString = main.toString()
const base64FilterFunction =
Buffer.from(filterFunctionString).toString('base64')

const testResult = await testFilterFunction(base64FilterFunction)
if (!testResult) {
console.error('Filter function failed. Stream not created.')
return
}

console.log('Filter function passed. Proceeding to create the stream.')

const streamConfig = {
name: 'Streams with API Test',
network: 'ethereum-mainnet',
dataset: 'block',
filter_function: base64FilterFunction,
destination: {
url: WEBHOOK_URL,
compression: 'none',
headers: {
'Content-Type': 'application/json',
},
max_retry: 3,
retry_interval_sec: 1,
},
status: 'active',
}

try {
const response = await axios.post(
'https://api.quicknode.com/streams/rest/v1/streams',
streamConfig,
{
headers: {
accept: 'application/json',
'Content-Type': 'application/json',
'x-api-key': QUICKNODE_API_KEY,
},
}
)

console.log('Stream created successfully:', response.data)
return response.data.id
} catch (error) {
console.error('Error creating stream:', error.message)
throw error
}
}

Step 5: Destination Options (Optional)

QuickNode supports multiple destinations, meaning you can direct your blockchain data to various storage or processing services. In this guide, we are using a webhook destination, but you can configure AWS S3, PostgreSQL, or Snowflake for different needs. Here’s how to configure each.

AWS S3

AWS S3 Bucket
const streamConfig = {
name: 'Streams with API Test',
network: 'ethereum-mainnet',
dataset: 'block',
filter_function: base64FilterFunction,
destination: {
s3: {
endpoint: 'your-s3-endpoint',
bucket: 'your-bucket-name',
region: 'us-east-1',
access_key: 'YOUR_ACCESS_KEY',
secret_key: 'YOUR_SECRET_KEY',
file_compression_type: 'gzip',
file_type: 'json',
max_retry: 3,
retry_interval_sec: 1,
use_ssl: true,
},
},
status: 'active',
}

PostgreSQL

PostgreSQL
const streamConfig = {
name: 'Streams with API Test',
network: 'ethereum-mainnet',
dataset: 'block',
filter_function: base64FilterFunction,
destination: {
postgresql: {
host: 'your-db-host',
port: 5432,
database: 'your-database',
table_name: 'your-table-name',
username: 'your-username',
password: 'your-password',
ssl_mode: 'require',
max_retry: 3,
retry_interval_sec: 1,
},
},
status: 'active',
}

Snowflake

Snowflake
const streamConfig = {
name: 'Streams with API Test',
network: 'ethereum-mainnet',
dataset: 'block',
filter_function: base64FilterFunction,
destination: {
snowflake: {
account: 'your-account',
warehouse: 'your-warehouse',
host: 'your-host',
database: 'your-database',
protocol: 'snowflake-protocol',
schema: 'your-schema',
table_name: 'your-table-name',
username: 'your-username',
max_retry: 3,
retry_interval_sec: 1,
},
},
status: 'active',
}

Step 6: Run the Stream

Now that we have configured our Stream, let's run it. We'll use the setupQuickNodeStream function to create a new Stream with the validated filter function. As you can see, we're passing the start and end slots as parameters. If you want to stream infinitely, you can pass -1 as the end slot.

Add the code below to the end of your index.js file.

index.js continued
setupQuickNodeStream(1, 100000)

Then, run the script using the following command:

node index.js

Then, you should see a successful message and details of your Stream in the console.

Step 7: Managing Streams

After successfully creating a stream, you can manage and interact with it using the following abilities:


Each of these operations enables you to dynamically manage your data streams, providing full control over their lifecycle.

Conclusion

In this guide, we covered how to set up a Stream using the QuickNode API, implement filtering functions, and manage existing streams. By leveraging these API capabilities, you can automate your data pipelines, customize your data flow, and ensure consistency with minimal manual intervention. With QuickNode Streams, handling blockchain data becomes a seamless process, giving you the flexibility to focus on building your applications while we handle the complexities of real-time data management.

We ❤️ Feedback!

Let us know if you have any feedback or requests for new topics. We'd love to hear from you.

Additional Resources

Share this guide