Skip to main content

Filters

Updated on
Mar 31, 2025

Filters enable you to customize and filter your streams payload using JavaScript (ECMAScript) code. With the help of filters, you can match specific patterns and rules against the data from your stream, and personalize the data sent to your destination. This feature is in beta and can be configured within the QuickNode developer portal, and through the Streams REST API.

Understanding Filters

In Streams, a filter_function is an optional configuration option for your stream. This function must be named main and it modifies the data out of the stream before it's sent to its destination. Using filters, you can precisely control the data you receive, ensuring you only pay for and process the data you need.

Benefits of Filters

  • Cost Efficiency: Reduce your data and costs significantly down to a minimum of 2.5KB per block.
  • Customizability: Implement custom filter functions to match specific patterns or criteria, offering great flexibility in data handling.
  • Enhanced Data Relevance: Filters ensure that you receive data that is directly relevant to your needs, increasing the overall usefulness of the data.
  • Modify the Payload Before Streaming: Customize the payload from your stream before it is sent to your Streams destination.

Pricing & Data Usage

Understanding Streams Data Billing

QuickNode Streams are billed based on the amount of data delivered to your destination. When using Filters, there are a few important considerations regarding how data is measured and billed:

  • You are charged based on the size of the data after filtering is applied
  • The free tier includes 1GB of data per month across all streams
  • Additional data is billed according to your plan's overage rate
  • You can filter data down to a Filter Processing Minimum

Filter Processing Minimum

When using Filters with your streams, a minimum data charge of 2.5KB applies to each processed block, even if the filtered payload is smaller. This minimum reflects the computational resources required to run your custom filter logic against blockchain data. Here's how billing works in different scenarios:

ScenarioBilling Amount
No filter used, payload is 1KB1KB (actual size)
No filter used, payload is 3KB3KB (actual size)
Filter used, filtered result is 1KB2.5KB (minimum)
Filter used, filtered result is 3KB3KB (actual size)
Filter used, everything filtered out (0KB)2.5KB (minimum)

Monitoring Data Usage

You can monitor your Streams usage in several places:

  • Stream Metrics Dashboard: Each stream has a dedicated metrics page showing detailed usage statistics
  • Filters Metrics: View filter execution performance and statistics on "Sub-Threshold Blocks" processed that are below the Filter Processing Minimum
  • Usage Insights Panel: See cumulative billable data and blocks processed for each stream on stream details page
  • Account Billing Page: Track overall Streams usage and historical trends

Example Filter Functions

Below are examples of how you might define a filter function to target data from Streams.

Please note:

  • Your filter must be named main.
  • Your filter function must return an object.

Payload Shape (Data & Metadata)

Streams will send the payload data in stream.data and the metadata in stream.metadata.

Return Hash and Block Number

This filter works with the block dataset.

function main(stream) {
const data = stream.data

var numberDecimal = parseInt(data[0].number, 16)
var filteredData = {
hash: data[0].hash,
number: numberDecimal,
}
return filteredData
}

Get Receipts for ERC20 Transfers

This filter works with the block_with_receipts dataset.

function main(stream) {
try {
const data = stream.data
var filteredReceipts = []

data.receipts.forEach(receipt => {
let relevantLogs = receipt.logs.filter(
log =>
log.topics[0] ===
'0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef' &&
log.topics.length === 3
)
if (relevantLogs.length > 0) {
filteredReceipts.push(receipt)
}
})

return {
totalReceipts: data.receipts.length,
filteredCount: filteredReceipts.length,
receipts: filteredReceipts,
}
} catch (e) {
return { error: e.message }
}
}

Get Transactions and Receipts for Specific Addresses

This filter works with the block_with_receipts dataset.

function main(data) {
try {
const data = stream.data

const addresses = [
'0x56220b7e25c7d0885159915cdebf5819f2090f57',
'0x351e1b4079cf180971025a3b35dadea1d809de26',
'0xa61551e4e455edebaa7c59f006a1d2956d46eecc',
]
var addressSet = new Set(addresses.map(address => address.toLowerCase()))
var paddedAddressSet = new Set(
addresses.map(
address => '0x' + address.toLowerCase().slice(2).padStart(64, '0')
)
)

var matchingTransactions = []
var matchingReceipts = []

data.block.transactions.forEach(transaction => {
let transactionMatches =
(transaction.from && addressSet.has(transaction.from.toLowerCase())) ||
(transaction.to && addressSet.has(transaction.to.toLowerCase()))

if (transactionMatches) {
matchingTransactions.push(transaction)
}
})

data.receipts.forEach(receipt => {
let receiptMatches =
receipt.logs &&
receipt.logs.some(
log =>
log.topics &&
log.topics.length > 1 &&
(paddedAddressSet.has(log.topics[1]) ||
(log.topics.length > 2 && paddedAddressSet.has(log.topics[2])))
)
if (receiptMatches) {
matchingReceipts.push(receipt)
}
})

if (matchingTransactions.length === 0 && matchingReceipts.length === 0) {
return null
}

return {
transactions: matchingTransactions,
receipts: matchingReceipts,
}
} catch (e) {
return { error: e.message }
}
}

How to Use Filters

To apply filters to your stream, specify the filter_function configuration option when creating a stream using the Streams REST API and in the "Stream payload" step of the Streams configuration wizard. You can then define your custom filter function to match specific criteria or patterns according to your requirements.

To omit sending payloads when your filter doesn't match data within a specific block, you can add conditional logic to return null instead of an empty result.

Using Key-Value Store with Streams filters


Key-Value Store can be accessed and managed seamlessly within a Streams filter. You can create new lists and key-value sets, add and remove items from your lists, update key-value sets, retrieve set value, and check if data from the streaming dataset matches an item on your list. To learn more about Key-Value Store, please visit the Key-Value Store documentation


note

All qnLib methods are asynchronous and return Promises. Always use the await keyword when calling these methods, and ensure your filter's main function is declared with async.

Available Key-Value Store functions inside your Streams filter

Lists


  • qnUpsertList: Creates or updates a new list in Key-Value Store.
  • qnGetList: Retrieves all item from a specific list in Key-Value Store.
  • qnGetAllLists: Retrieves all lists in Key-Value Store.
  • qnAddListItem: Adds an item to a specific list in Key-Value Store.
  • qnRemoveListItem: Removes an item from a specific list in Key-Value Store.
  • qnContainsListItem: Checks if an item exists in a specific list in Key-Value Store.
  • qnDeleteList: Deletes a specific list in Key-Value Store.

Example filter code for lists

The filter code below can be used within Streams to demonstrate ways that Key-Value Store lists can be used within your Streams filter.

async function main() {
// List of results for each operation
let results = {}

try {
// Create a new list
results.createList = await qnLib.qnUpsertList('list_docs_example', {
add_items: ['item1', 'item2'],
})

// Update a list
results.qnUpsertList = await qnLib.qnUpsertList('list_docs_example', {
add_items: ['item3'],
remove_items: ['item1'],
})

// Get a list
results.qnGetList = await qnLib.qnGetList('list_docs_example')

// Get all lists
results.qnGetAllLists = await qnLib.qnGetAllLists()

// Add item to the list
results.qnAddListItem4 = await qnLib.qnAddListItem('list_docs_example', 'item4')
results.qnAddListItem5 = await qnLib.qnAddListItem('list_docs_example', 'item5')

// Remove item from the list
results.qnRemoveListItem4 = await qnLib.qnRemoveListItem('list_docs_example', 'item4')

// Get a list after changes
results.qnGetListAfterChanges = await qnLib.qnGetList('list_docs_example')

// Check if an item is in the list
results.qnContainsListItem2 = await qnLib.qnContainsListItem(
'list_docs_example',
'item2'
)
results.qnContainsListItem1 = await qnLib.qnContainsListItem(
'list_docs_example',
'item1'
)

// Delete the list
results.qnDeleteList = await qnLib.qnDeleteList('list_docs_example')
} catch (error) {
results.error = error.message
}

return results
}

Sets


  • qnAddSet: Creates a key-value set in Key-Value Store.
  • qnBulkSets: Creates and removes bulk key-value sets in Key-Value Store.
  • qnDeleteSet: Deletes a key-value set from Key-Value Store.
  • qnGetSet: Retrieves the value of a specific key from Key-Value Store.
  • qnListAllSets: List all keys for sets in Key-Value Store.

Example filter code for Key-Value Store sets

The filter code below can be used within Streams to demonstrate ways that Key-Value Store sets can be used within your Streams filter.

async function main() {
// List of results for each operation
let results = {}

try {
// Create a set
results.qnAddSet = await qnLib.qnAddSet('set_docs_example', 'value1')

// Get a set
results.qnGetSet = await qnLib.qnGetSet('set_docs_example')

// Get all sets
results.qnListAllSets = await qnLib.qnListAllSets()

// Bulk add/remove sets
results.qnBulkSets = await qnLib.qnBulkSets({
add_sets: {
set_docs_example2: 'value1',
set_docs_example3: 'value2',
},
delete_sets: ['set_docs_example1'],
})

// Get all sets after bulk
results.qnListAllSets2 = await qnLib.qnListAllSets()

// Delete key values
results.qnDeleteSet2 = await qnLib.qnDeleteSet('set_docs_example2')
results.qnDeleteSet3 = await qnLib.qnDeleteSet('set_docs_example3')
} catch (error) {
results.error = error.message
}

return results
}

Common Pitfalls with Key-Value Store Methods

Missing await keyword

All qnLib methods are asynchronous and return Promises. Failing to use await will result in unexpected behavior:

// INCORRECT - returns a Promise, not the actual result
function main() {
const result = qnLib.qnGetList('myList');
// result is a Promise object, not the actual list items
return result;
}

// CORRECT - returns the resolved value from the Promise
async function main() {
const result = await qnLib.qnGetList('myList');
// result contains the actual list items
return result;
}

Boolean checks with async methods

Be especially careful when using methods like qnContainsListItem in conditional statements:

// INCORRECT - checks if Promise exists, not the result
function main() {
if (qnLib.qnContainsListItem('myList', 'item')) {
// This will always evaluate to true regardless of whether
// the item is in the list or not!
return { exists: true };
}
return { exists: false };
}

// CORRECT - awaits the boolean result
async function main() {
if (await qnLib.qnContainsListItem('myList', 'item')) {
// This will only evaluate to true if the item is in the list
return { exists: true };
}
return { exists: false };
}

Error handling for large operations

When working with large datasets, it's important to implement proper error handling:

// INCORRECT - no error handling for large list operations
async function main() {
// This might fail with large lists
await qnLib.qnUpsertList('myList', { add_items: veryLargeArray });
return { success: true };
}

// CORRECT - handles potential size-related errors
async function main() {
try {
// Use chunking for large operations
const chunkSize = 50000;
for (let i = 0; i < veryLargeArray.length; i += chunkSize) {
const chunk = veryLargeArray.slice(i, i + chunkSize);
await qnLib.qnUpsertList('myList', { add_items: chunk });
}
return { success: true };
} catch (error) {
return { error: `Failed to update list: ${error.message}` };
}
}

Cascading async operations

Chain operations carefully to avoid cascading failures:

// INCORRECT - continues execution even if a prerequisite operation fails
async function main() {
await qnLib.qnAddListItem('myList', 'important_item');
const result = await qnLib.qnGetList('myList');
return { items: result };
}

// CORRECT - checks intermediate results before continuing
async function main() {
try {
const addResult = await qnLib.qnAddListItem('myList', 'important_item');
if (!addResult) {
return { error: "Failed to add item to list" };
}

const items = await qnLib.qnGetList('myList');
return { items };
} catch (error) {
return { error: `Operation failed: ${error.message}` };
}
}

Performance considerations for lookups - balancing network calls vs. memory usage

Choose the right approach based on the size of your lists:

// APPROACH 1: Individual lookups - better for VERY LARGE watchlists
// Good when your watchlist has many thousands of items but you only need to check a few
async function main(stream) {
const addresses = extractAddresses(stream.data);
// Assume this extracts just a few addresses (e.g., 5-10)
const results = [];

for (const address of addresses) {
// With large watchlists, individual containment checks can be more efficient
// than retrieving the entire list
if (await qnLib.qnContainsListItem('watchlist', address)) {
results.push(address);
}
}

return { matches: results };
}

// APPROACH 2: In-memory filtering - better for smaller watchlists
// Good when you need to check many items against a reasonably sized watchlist
async function main(stream) {
const addresses = extractAddresses(stream.data);
// Assume this extracts many addresses (e.g., 100+)

// If watchlist is relatively small (hundreds or few thousands of items),
// fetching once is more efficient than many individual lookups
const watchlist = await qnLib.qnGetList('watchlist');
const watchlistSet = new Set(watchlist);

// Perform lookups in memory
const results = addresses.filter(address => watchlistSet.has(address));

return { matches: results };
}

// APPROACH 3: Hybrid strategy - balancing both approaches
async function main(stream) {
const addresses = extractAddresses(stream.data);

// Choose approach based on scale
if (addresses.length <= 10) {
// For few lookups, use individual containment checks
const results = [];
for (const address of addresses) {
if (await qnLib.qnContainsListItem('watchlist', address)) {
results.push(address);
}
}
return { matches: results };
} else {
// For many lookups against a manageable watchlist, load into memory
const watchlist = await qnLib.qnGetList('watchlist');
const watchlistSet = new Set(watchlist);
const results = addresses.filter(address => watchlistSet.has(address));
return { matches: results };
}
}

Decoding EVM Data

When working with EVM-compatible chains, you can decode transaction receipts and logs using the decodeEVMReceipts function. This function takes raw transaction receipts and contract ABIs as inputs, and transforms the encoded blockchain data into a human-readable format.

The decoding process:

  1. Matches event signatures in transaction logs with the provided ABIs
  2. Decodes parameters according to their types (addresses, integers, strings, etc.)
  3. Returns structured data with named parameters instead of raw hex data in decodedLogs object

The decodeEVMReceipts function accepts two parameters:

  • receipts: Array of transaction receipts to decode
  • abis: Array of contract ABIs (can be passed as strings or objects)

This enables you to:

  • Monitor specific smart contract events
  • Extract and process event parameters
  • Filter transactions based on decoded data
  • Track multiple contracts and event types simultaneously

Below you can find examples of Filter code that relies on decodeEVMReceipts for decoding EVM data, using block_with_receipts dataset.

Basic ERC20 Transfer Events

function main(stream) {
const erc20Abi = `[{
"anonymous": false,
"inputs": [
{"indexed": true, "type": "address", "name": "from"},
{"indexed": true, "type": "address", "name": "to"},
{"indexed": false, "type": "uint256", "name": "value"}
],
"name": "Transfer",
"type": "event"
}]`

const data = stream.data
var result = decodeEVMReceipts(data[0].receipts, [erc20Abi])

// Filter for receipts with decoded logs
result = result.filter(
receipt => receipt.decodedLogs && receipt.decodedLogs.length > 0
)

return { result }
}

Dynamic ABI Loading with Key-Value Store

You can store contract ABIs in Key-Value Store and load them dynamically in your filter. This approach is particularly useful when working with multiple contracts or when ABIs need to be updated frequently.

Uploading ABIs to Key-Value Store

Before using ABIs in your filter example below, you need to upload them to the Key-Value Store. You can do this using the REST API. You need to create an API key to use our REST APIs for Key-Value Store in QuickNode dashboard.

curl -X POST \
"https://api.quicknode.com/kv/rest/v1/sets" \
-H "accept: application/json" \
-H "Content-Type: application/json" \
-H "x-api-key: YOUR_API_KEY" \
-d '{
"key": "usdc_abi",
"value": "[{\"anonymous\":false,\"inputs\":[{\"indexed\":true,\"name\":\"from\",\"type\":\"address\"},{\"indexed\":true,\"name\":\"to\",\"type\":\"address\"},{\"indexed\":false,\"name\":\"value\",\"type\":\"uint256\"}],\"name\":\"Transfer\",\"type\":\"event\"}]"
}'

USDC and Uniswap ABI example

async function main(stream) {
// Fetch ABI from Key-Value Store — requires you to have ABIs uploaded there first
const usdcAbi = await qnLib.qnGetSet('usdc_abi')
const uniswapAbi = await qnLib.qnGetSet('uniswap_abi')

const USDC_ADDRESS =
'0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48'.toLowerCase()

const data = stream.data ? stream.data : stream
var result = decodeEVMReceipts(data[0].receipts, [usdcAbi, uniswapAbi])

// Filter for USDC events only
result = result.filter(
receipt =>
receipt.decodedLogs &&
receipt.decodedLogs.length > 0 &&
receipt.decodedLogs.some(
log => log.address?.toLowerCase() === USDC_ADDRESS
)
)

return { result }
}

Complex Event Decoding (NFT Marketplace)

function main(stream) {
// OpenSea Seaport OrderFulfilled event
// test on Ethereum, 21292520 block
const seaportAbi = `[{
"anonymous": false,
"inputs": [
{"type": "bytes32", "name": "orderHash", "indexed": false},
{"type": "address", "name": "offerer", "indexed": true},
{"type": "address", "name": "zone", "indexed": true},
{"type": "address", "name": "recipient", "indexed": false},
{
"components": [
{"type": "uint8", "name": "itemType"},
{"type": "address", "name": "token"},
{"type": "uint256", "name": "identifier"},
{"type": "uint256", "name": "amount"}
],
"type": "tuple[]",
"name": "offer",
"indexed": false
},
{
"components": [
{"type": "uint8", "name": "itemType"},
{"type": "address", "name": "token"},
{"type": "uint256", "name": "identifier"},
{"type": "uint256", "name": "amount"},
{"type": "address", "name": "recipient"}
],
"type": "tuple[]",
"name": "consideration",
"indexed": false
}
],
"name": "OrderFulfilled",
"type": "event"
}]`

const SEAPORT_ADDRESS =
'0x00000000006c3852cbEf3e08E8dF289169EdE581'.toLowerCase()

const data = stream.data
var result = decodeEVMReceipts(data[0].receipts, [seaportAbi])

result = result.filter(
receipt =>
receipt.decodedLogs &&
receipt.decodedLogs.length > 0 &&
receipt.decodedLogs.some(
log =>
log.address?.toLowerCase() === SEAPORT_ADDRESS &&
log.name === 'OrderFulfilled'
)
)

return { result }
}

Multiple Contract Events

async function main(stream) {
// Load multiple ABIs for different protocols
const aaveAbi = await qnLib.qnGetSet('aave_v3_pool_abi')
const uniswapAbi = await qnLib.qnGetSet('uniswap_v3_pool_abi')
const curveAbi = await qnLib.qnGetSet('curve_pool_abi')

const AAVE_ADDRESS =
'0x87870Bca3F3fD6335C3F4ce8392D69350B4fA4E2'.toLowerCase()
const UNISWAP_POOL =
'0x8ad599c3A0ff1De082011EFDDc58f1908eb6e6D8'.toLowerCase()

const data = stream.data
var result = decodeEVMReceipts(data[0].receipts, [
aaveAbi,
uniswapAbi,
curveAbi,
])

// Get all DeFi protocol events
result = result.filter(
receipt =>
receipt.decodedLogs &&
receipt.decodedLogs.length > 0 &&
receipt.decodedLogs.some(log =>
[AAVE_ADDRESS, UNISWAP_POOL].includes(log.address?.toLowerCase())
)
)

return { result }
}

Working with Decoded Data

The decoded data will include:

  • Transaction metadata (hash, block number, etc.)
  • Decoded events in decodedLogs array
  • Event parameters with their proper types (address, uint256, etc.)

For example, a decoded ERC20 transfer event would look like:

{
"address": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
"name": "Transfer",
"from": "0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640",
"to": "0x6f1cdbbb4d53d226cf4b917bf768b94acbab6168",
"value": "138908045566"
}

Common Use Cases

  • Monitoring specific contract events
  • Filtering transactions by event type
  • Extracting parameter values from events
  • Tracking token transfers and approvals
  • Decoding complex marketplace orders
  • Monitoring DeFi protocol activities

Managing ABIs with Key-Value Store

You can store and manage your contract ABIs using Key-Value Store:

  1. Upload ABIs using REST APIs
  2. Update ABIs when contracts are upgraded
  3. Load multiple ABIs dynamically in your filter
  4. Share ABIs across different streams

Note

The filter function must be named main and return an object containing the modified data.

Share this doc