10 min read
Overview
Using APIs allow for easy automation, seamless integration with other systems, and the ability to scale more efficiently. By using the QuickNode Streams REST API, developers can directly integrate blockchain data into their backends, skipping the need for manual data handling through user interfaces.
In this guide, we will walk you through setting up and managing Streams using the QuickNode REST API. You will learn how to create Streams, configure destinations, and apply filters to process blockchain data.
What You Will Do
- Create a new Stream with customizable configurations with different destinations like Webhooks, AWS S3, and PostgreSQL
- Apply filters to process incoming data
What You Will Need
- A QuickNode account
- A code editor (e.g., VSCode)
- Familiarity with REST APIs and JSON
- Node.js installed
What is Streams?
Streams is a powerful feature offered by QuickNode that allows you to receive real-time and historical blockchain data delivered to various destinations. It supports flexible filtering, guaranteed delivery, and real-time processing, making it an ideal solution for applications that require historical or real-time data.
Key Features of Streams
- Customizable Filtering: Instead of pulling massive datasets and filtering manually, define your own filtering logic, reducing data noise and processing overhead.
- Multiple Destinations: Effortlessly stream data to Webhooks, AWS S3, PostgreSQL, and other services with simple configuration.
- Historical Data Backfilling: Retrieve historical blockchain data using predefined templates, with transparent pricing and completion time estimates provided upfront for better planning.
- Cost-Efficient: Pricing is based on the amount of data processed, ensuring predictable costs even for high-volume applications.
- Data Consistency: With reorg handling built in, you can rely on consistent data delivery and ensure that no blocks or transactions are missed, even during network reorganizations.
API Capabilities
The Streams API allows you to:
- Create and manage streams
- Test and apply filter functions
- Configure multiple destination types
- Monitor and track your streams’ performance
Learn more about Streams REST API capabilities here.
In the next section, we’ll walk through how to write a filtering function to work with Ethereum block data. Then, we’ll create a new Stream using the API and apply the filter function.
Prerequisites
QuickNode Account and API Key
You will need an API key in order to use the Streams API. If you haven't signed up already and created any endpoint, you can create an account here.
To create your keys, log in to your QuickNode Account, click on the avatar icon on the top left, and select API Keys. It will bring you to the API Keys page.
Generate an API key with STREAMS_REST
permissions and keep your API key handy as you'll use it to authenticate your requests.
Webhook Destination
Streams supports multiple destinations, including webhooks. To create a webhook destination, you will need to create a webhook endpoint in your web server. You can use any web server, such as TypedWebhook to create a webhook endpoint easily.
Generally, you see a webhook URL once you visit online webhook creation tools like TypedWebhook. Copy the URL and keep it handy as you'll use it later.
How to Set Up a Stream using API
Stream Setup Flow Overview
Before diving into the code, let's walk through the steps involved in setting up a Stream using the API:
- Filter Function: Write and test a filter function (
main
) that processes blockchain data. - Test Filter Function: Use QuickNode API to validate the filter logic. (
testFilterFunction
) - Create a Stream: Once validated, use the API to create a Stream and configure destinations like Webhooks or AWS S3. (
setupQuickNodeStream
) - Run and Manage Streams: After creating a Stream, you can manage and interact with it using the API.
Step 1: Initialize Your Project
To get started, create a new directory for your project and initialize it with npm. This will create a package.json file that will keep track of your project’s dependencies.
mkdir quicknode-streams-api
cd quicknode-streams-api
npm init -y
Next, install the required dependencies for your project. You can use the following command to install the required dependencies:
npm install axios
Axios is a popular HTTP client library used for making HTTP requests in JavaScript. It provides a simple and intuitive API for sending HTTP requests and handling responses.
Step 2: Writing a Filter Function
After initializing your project, the next step is to write a filter function. A filter function is responsible for filtering out unwanted data from the blockchain. In this example, we’ll extract only block number as integer and block hash from the block
dataset. To learn more about datasets that Streams can process, check out our Streams Docs.
Create a new file named index.js
in the root directory of your project. This file will contain all the code for creating a Stream using the API including filtering logic. We'll build this file step by step.
Copy the code below into the index.js
file and replace the QUICKNODE_API_KEY
and WEBHOOK_URL
placeholders with your actual API key and webhook URL.
const axios = require('axios')
const QUICKNODE_API_KEY = 'YOUR-QUICKNODE-API-KEY' // 👈 Replace with your actual API key
const WEBHOOK_URL = 'YOUR-WEBHOOK-URL' // 👈 Replace with your webhook URL
function main(stream) {
try {
const data = stream.data
// data is an array of objects. If there is a batch of events, the array will contain multiple objects.
// For the test, there is only one object in the array.
const numberDecimal = parseInt(data[0].number, 16)
const filteredData = {
hash: data[0].hash,
number: numberDecimal,
}
return filteredData
} catch (error) {
return { error: error.message, stack: error.stack }
}
}
Next, we test the filter function using the QuickNode API.
Step 3: Testing the Filter Function
Before creating a stream, it's important to validate that your filter logic works with actual data. QuickNode's API provides a convenient method to test filter functions, allowing you to verify the behavior with a specific block number.
These parameters are required for testing the filter function:
network
: The network name, such asethereum-mainnet
.dataset
: The dataset name, such asblock
.filter_function
: The base64-encoded filter function.block
: The block number or hash to test the filter function with.
For details, see Streams - Filter Functions Documentation.
To validate the filter function's result, you need to choose a block number and get the corresponding block hash. The code below uses 17811625
as an example block number on Ethereum Mainnet. Once you have chosen the block number or hash, you can test the filter function using the following code.
Add the following code just after the main()
function.
This function takes a base64-encoded filter function as an input parameter, sends a test request to QuickNode’s API, verifying that the filtering logic behaves correctly by comparing the output with a known block hash and number. We'll call this function from the setupQuickNodeStream
function.
async function testFilterFunction(base64FilterFunction) {
const hash =
'0xb72704063570e4b5a5f972f380fad5e43e1e8c9a1b0e36f204b9282c89adc677' // 👈 Hash of the test block
const number = '17811625' // 👈 Number of the test block
let data = JSON.stringify({
network: 'ethereum-mainnet',
dataset: 'block',
filter_function: base64FilterFunction,
block: number,
})
try {
const response = await axios.post(
'https://api.quicknode.com/streams/rest/v1/streams/test_filter',
data,
{
headers: {
accept: 'application/json',
'Content-Type': 'application/json',
'x-api-key': QUICKNODE_API_KEY,
},
}
)
if (
response.status === 201 &&
response.data.hash === hash &&
response.data.number === number
) {
console.log('Filter function tested successfully!')
return true
} else {
console.error('Error testing filter function:', response.status)
return false
}
} catch (error) {
console.error('Error testing filter function:', error.message)
throw error
}
}
Step 4: Creating a Stream
Now that we have created the filter function, we can create a Stream using the QuickNode API. We'll use the setupQuickNodeStream
function to create a new Stream with the validated filter function.
Add the following code just after the testFilterFunction
function.
This code first converts the filter function to a base64 string, tests it, and proceeds to create a stream if the test is successful. The stream is configured with a webhook destination, including error handling for retries and timeouts.
Check the Create Stream documentation in order to see all the available options for Streams.
async function setupQuickNodeStream(startSlot, endSlot) {
const filterFunctionString = main.toString()
const base64FilterFunction =
Buffer.from(filterFunctionString).toString('base64')
const testResult = await testFilterFunction(base64FilterFunction)
if (!testResult) {
console.error('Filter function failed. Stream not created.')
return
}
console.log('Filter function passed. Proceeding to create the stream.')
const streamConfig = {
name: 'Streams with API Test',
network: 'ethereum-mainnet',
dataset: 'block',
filter_function: base64FilterFunction,
destination: {
url: WEBHOOK_URL,
compression: 'none',
headers: {
'Content-Type': 'application/json',
},
max_retry: 3,
retry_interval_sec: 1,
},
status: 'active',
}
try {
const response = await axios.post(
'https://api.quicknode.com/streams/rest/v1/streams',
streamConfig,
{
headers: {
accept: 'application/json',
'Content-Type': 'application/json',
'x-api-key': QUICKNODE_API_KEY,
},
}
)
console.log('Stream created successfully:', response.data)
return response.data.id
} catch (error) {
console.error('Error creating stream:', error.message)
throw error
}
}
Step 5: Destination Options (Optional)
QuickNode supports multiple destinations, meaning you can direct your blockchain data to various storage or processing services. In this guide, we are using a webhook
destination, but you can configure AWS S3, PostgreSQL, or Snowflake for different needs. Here’s how to configure each.
AWS S3
const streamConfig = {
name: 'Streams with API Test',
network: 'ethereum-mainnet',
dataset: 'block',
filter_function: base64FilterFunction,
destination: {
s3: {
endpoint: 'your-s3-endpoint',
bucket: 'your-bucket-name',
region: 'us-east-1',
access_key: 'YOUR_ACCESS_KEY',
secret_key: 'YOUR_SECRET_KEY',
file_compression_type: 'gzip',
file_type: 'json',
max_retry: 3,
retry_interval_sec: 1,
use_ssl: true,
},
},
status: 'active',
}
PostgreSQL
const streamConfig = {
name: 'Streams with API Test',
network: 'ethereum-mainnet',
dataset: 'block',
filter_function: base64FilterFunction,
destination: {
postgresql: {
host: 'your-db-host',
port: 5432,
database: 'your-database',
table_name: 'your-table-name',
username: 'your-username',
password: 'your-password',
ssl_mode: 'require',
max_retry: 3,
retry_interval_sec: 1,
},
},
status: 'active',
}
Snowflake
const streamConfig = {
name: 'Streams with API Test',
network: 'ethereum-mainnet',
dataset: 'block',
filter_function: base64FilterFunction,
destination: {
snowflake: {
account: 'your-account',
warehouse: 'your-warehouse',
host: 'your-host',
database: 'your-database',
protocol: 'snowflake-protocol',
schema: 'your-schema',
table_name: 'your-table-name',
username: 'your-username',
max_retry: 3,
retry_interval_sec: 1,
},
},
status: 'active',
}
Step 6: Run the Stream
Now that we have configured our Stream, let's run it. We'll use the setupQuickNodeStream
function to create a new Stream with the validated filter function. As you can see, we're passing the start and end slots as parameters. If you want to stream infinitely, you can pass -1
as the end slot.
Add the code below to the end of your index.js
file.
setupQuickNodeStream(1, 100000)
Then, run the script using the following command:
node index.js
Then, you should see a successful message and details of your Stream in the console.
Step 7: Managing Streams
After successfully creating a stream, you can manage and interact with it using the following abilities:
- Retrieve Streams (GET): List all active streams.
- Remove Streams (DELETE): Permanently remove all streams.
- Retrieve Stream by ID (GET): Retrieve a Stream by its unique ID.
- Update Stream (PATCH): Modify an existing stream’s configuration.
- Delete Stream by ID (DELETE): Remove a specific stream by its unique ID.
- Activate Stream by ID (POST): Start a paused stream to resume data flow.
- Pause Stream by ID (POST): Temporarily pause a stream.
- Terminate Stream by ID (POST): Stop a stream permanently without deleting it.
- Test Filter (POST): Test your filtering logic before applying it to ensure it behaves as expected.
Each of these operations enables you to dynamically manage your data streams, providing full control over their lifecycle.
Conclusion
In this guide, we covered how to set up a Stream using the QuickNode API, implement filtering functions, and manage existing streams. By leveraging these API capabilities, you can automate your data pipelines, customize your data flow, and ensure consistency with minimal manual intervention. With QuickNode Streams, handling blockchain data becomes a seamless process, giving you the flexibility to focus on building your applications while we handle the complexities of real-time data management.
We ❤️ Feedback!
Let us know if you have any feedback or requests for new topics. We'd love to hear from you.
Additional Resources
- Documentation: QuickNode Streams
- Documentation: Streams API Documentation
- Technical Guide: How to Build a Blockchain Indexer with Streams
- Technical Guide: How to Backfill Ethereum ERC-20 Token Transfer Data
- Other Streams-related guides