Skip to main content

How to Build a Real-Time Token Transfer Indexer with QuickNode Streams and Functions

Created on
Updated on
Nov 26, 2024

15 min read

Streams

Streams is available to all users with a QuickNode plan. For teams with unique requirements, we offer tailored datasets, dedicated support, and custom integrations. Contact our team for more information.

Overview

In this guide, we'll walk you through creating a real-time token transfer indexer using QuickNode's Streams and Functions. This system will monitor ERC20, ERC721, and ERC1155 token transfers on the Base blockchain, process the data, and send enriched information to a webhook for further use.

What You Will Do


  • Set up a Stream on QuickNode that filters ERC20, ERC721, and ERC1155 token transfer events from the blockchain.
  • Implement a Function that processes and enriches the transfer data and sends the processed data to a webhook.

What You Will Need


  • A QuickNode account.
  • Basic understanding of JavaScript and Ethereum transactions and events. Check out our guide on Ethereum transactions and events to learn more.
  • A QuickNode RPC endpoint on Base Mainnet.
  • A QuickNode IPFS gateway
  • A webhook URL to receive the enriched token transfer data. You can get a free webhook URL at webhook.site.

Building a Token Transfer Indexer

Create a Stream on QuickNode

First, navigate to QuickNode Streams page in the Dashboard and click "Create Stream".

Next, create a Stream with the following settings:


  • Chain: Base
  • Network: Mainnet
  • Dataset: Block with Receipts
  • Stream start: Latest block (you can change this based on your needs)
  • Stream payload: Modify the Stream before streaming
  • Reorg Handling: Leave as-is

Select the option to modify the payload before streaming.

Next, copy and paste the following code to filter the streaming data for ERC20, ERC721, and ERC1155 token transfers, extract key data, and generate the payload to send to your function for processing.

function stripPadding(logTopic) {
return logTopic ? '0x' + logTopic.slice(-40).toLowerCase() : ''
}

function parseSingleData(data) {
if (!data || data === '0x') return { tokenId: 0, value: 0 }
const idHex = data.slice(2, 66).replace(/^0+/, '') || '0'
const valueHex = data.slice(66).replace(/^0+/, '') || '0'
const id = idHex === '0' ? 0 : BigInt('0x' + idHex)
const value = valueHex === '0' ? 0 : BigInt('0x' + valueHex)
return { tokenId: id, value: value }
}

function parseBatchData(data) {
if (!data || data.length < 130) return { ids: [], values: [] }
const idsArrayOffset = parseInt(data.slice(2, 66), 16) * 2 + 2
const valuesArrayOffset = parseInt(data.slice(66, 130), 16) * 2 + 2
const tokenCount = (valuesArrayOffset - idsArrayOffset) / 64

const ids = Array.from({ length: tokenCount }, (_, i) => {
const idHex =
data
.slice(idsArrayOffset + i * 64, idsArrayOffset + (i + 1) * 64)
.replace(/^0+/, '') || '0'
return idHex === '0' ? 0 : BigInt('0x' + idHex)
})

const values = Array.from({ length: tokenCount }, (_, i) => {
const valueHex =
data
.slice(valuesArrayOffset + i * 64, valuesArrayOffset + (i + 1) * 64)
.replace(/^0+/, '') || '0'
return valueHex === '0' ? 0 : BigInt('0x' + valueHex)
})

return { ids, values }
}

function main(data) {
try {
if (!data || !data.streamData) {
return null
}

const streamData = Array.isArray(data.streamData)
? data.streamData
: [data.streamData]
const erc20Transfers = []
const erc721Transfers = []
const erc1155Transfers = []

streamData.forEach(stream => {
if (!stream || !stream.block || !stream.receipts) {
return
}

const blockTimestamp = stream.block.timestamp
? parseInt(stream.block.timestamp, 16) * 1000
: Date.now()

stream.receipts.forEach(receipt => {
if (!receipt || !receipt.logs) return

receipt.logs.forEach(log => {
if (!log || !log.topics || log.topics.length === 0) return

if (
log.topics[0] ===
'0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
) {
if (log.topics.length === 3 && log.data && log.data !== '0x') {
const valueHex = log.data.slice(2).replace(/^0+/, '')
const value = valueHex ? BigInt('0x' + valueHex).toString() : '0'
erc20Transfers.push({
type: 'ERC20',
sender: stripPadding(log.topics[1]),
receiver: stripPadding(log.topics[2]),
value: value,
contract: log.address,
txHash: log.transactionHash,
txIndex: log.transactionIndex,
blockTimestamp: blockTimestamp,
})
} else if (
log.topics.length === 4 &&
(!log.data || log.data === '0x')
) {
const tokenId = BigInt(log.topics[3]).toString()
erc721Transfers.push({
type: 'ERC721',
sender: stripPadding(log.topics[1]),
receiver: stripPadding(log.topics[2]),
tokenId: tokenId,
contract: log.address,
txHash: log.transactionHash,
txIndex: log.transactionIndex,
blockTimestamp: blockTimestamp,
})
}
} else if (
log.topics[0] ===
'0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62'
) {
const { tokenId, value } = parseSingleData(log.data)
erc1155Transfers.push({
type: 'ERC1155_Single',
operator: stripPadding(log.topics[1]),
sender: stripPadding(log.topics[2]),
receiver: stripPadding(log.topics[3]),
tokenId: tokenId.toString(),
value: value.toString(),
contract: log.address,
txHash: log.transactionHash,
txIndex: log.transactionIndex,
blockTimestamp: blockTimestamp,
})
} else if (
log.topics[0] ===
'0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb'
) {
const { ids, values } = parseBatchData(log.data)
ids.forEach((id, index) => {
erc1155Transfers.push({
type: 'ERC1155_Batch',
operator: stripPadding(log.topics[1]),
from: stripPadding(log.topics[2]),
to: stripPadding(log.topics[3]),
tokenId: id.toString(),
value: values[index].toString(),
contract: log.address,
txHash: log.transactionHash,
txIndex: log.transactionIndex,
blockTimestamp: blockTimestamp,
})
})
}
})
})
})

if (
!erc20Transfers.length &&
!erc721Transfers.length &&
!erc1155Transfers.length
) {
return null
}

return {
erc20: erc20Transfers,
erc721: erc721Transfers,
erc1155: erc1155Transfers,
}
} catch (e) {
console.error('Error in main function:', e)
return { error: e.message }
}
}

Test the Stream filter

Click the Run test button to test your filter against a single block of data. Once the test is complete, you will see a sample of the data payload that the Stream will generate.

Click the Next button and then choose "Functions" for your Stream destination. Then, in the Function dropdown, choose the "Create a new Function" option.

Implement the Function

In the Functions settings page, under "Select a namespace", choose "Create a new namespace" and input a name (i.e, TokenIndexing), then click "Create namespace". Next, give your Function a name (i.e., BaseTokenIndexer), and then click next (you can leave all the other settings as the defaults). In the next step, paste the following code to implement the Function that retrieves token metadata via RPC. Make sure to update the values for BASE_QUICKNODE_URL, WEBHOOK_URL, and QUICKNODE_IPFS_GATEWAY with your own real values.

const { Web3 } = require('web3')
const axios = require('axios')

// Minimal ABIs
const ERC20_ABI = [
{
constant: true,
inputs: [],
name: 'name',
outputs: [{ name: '', type: 'string' }],
type: 'function',
},
{
constant: true,
inputs: [],
name: 'symbol',
outputs: [{ name: '', type: 'string' }],
type: 'function',
},
{
constant: true,
inputs: [],
name: 'decimals',
outputs: [{ name: '', type: 'uint8' }],
type: 'function',
},
]

const ERC721_ABI = [
{
constant: true,
inputs: [],
name: 'name',
outputs: [{ name: '', type: 'string' }],
type: 'function',
},
{
constant: true,
inputs: [],
name: 'symbol',
outputs: [{ name: '', type: 'string' }],
type: 'function',
},
{
constant: true,
inputs: [{ name: '_tokenId', type: 'uint256' }],
name: 'tokenURI',
outputs: [{ name: '', type: 'string' }],
type: 'function',
},
]

const ERC1155_ABI = [
{
constant: true,
inputs: [],
name: 'name',
outputs: [{ name: '', type: 'string' }],
type: 'function',
},
{
constant: true,
inputs: [],
name: 'symbol',
outputs: [{ name: '', type: 'string' }],
type: 'function',
},
{
constant: true,
inputs: [{ name: '_id', type: 'uint256' }],
name: 'uri',
outputs: [{ name: '', type: 'string' }],
type: 'function',
},
{
inputs: [{ internalType: 'uint256', name: 'id', type: 'uint256' }],
name: 'uri',
outputs: [{ internalType: 'string', name: '', type: 'string' }],
stateMutability: 'view',
type: 'function',
},
]

// Base QuickNode URL
const BASE_QUICKNODE_URL =
'https://my-real-endpoint.base-mainnet.quiknode.pro/token/'

// Webhook URL
const WEBHOOK_URL = 'https://webhook.site/real-webhook-url'

// IPFS Gateway URL
const QUICKNODE_IPFS_GATEWAY = 'https://real-ipfs-gateway/'

// Create a Web3 instance
let web3

async function setupWeb3() {
web3 = new Web3(BASE_QUICKNODE_URL)
}

const ERC20_CACHE = new Map()

async function getERC20Info(contractAddress) {
if (ERC20_CACHE.has(contractAddress)) {
return ERC20_CACHE.get(contractAddress)
}

const contract = new web3.eth.Contract(ERC20_ABI, contractAddress)
let retries = 3
while (retries > 0) {
try {
const [name, symbol, decimals] = await Promise.all([
contract.methods.name().call(),
contract.methods.symbol().call(),
contract.methods.decimals().call(),
])
const info = { name, symbol, decimals: parseInt(decimals) }
ERC20_CACHE.set(contractAddress, info)
return info
} catch (error) {
console.error(
`Error fetching ERC20 info for ${contractAddress} (Retry ${
4 - retries
}/3):`,
error
)
retries--
if (retries === 0) {
console.error(
`Failed to fetch ERC20 info for ${contractAddress} after 3 attempts`
)
return { name: 'Unknown', symbol: 'Unknown', decimals: 18 }
}
await new Promise(resolve => setTimeout(resolve, 1000)) // Wait 1 second before retrying
}
}
}

async function fetchTokenMetadata(url, retries = 3) {
if (!url || url === 'Unknown') {
console.log('No valid URL provided for metadata')
return {}
}

// Handle IPFS URIs
if (url.startsWith('ipfs://')) {
url = `${QUICKNODE_IPFS_GATEWAY}ipfs/${url.slice(7)}`
}

for (let i = 0; i < retries; i++) {
try {
console.log(`Attempting to fetch metadata from: ${url}`)
const response = await axios.get(url, { timeout: 5000 })
let metadata = response.data

// If the metadata is a string, it might be another URL to follow
if (
typeof metadata === 'string' &&
(metadata.startsWith('http') || metadata.startsWith('ipfs://'))
) {
return await fetchTokenMetadata(metadata, retries - 1)
}

console.log('Successfully fetched metadata:', metadata)
return metadata
} catch (error) {
console.error(
`Error fetching metadata (attempt ${i + 1}/${retries}):`,
error.message
)
if (i === retries - 1) {
return {}
}
await new Promise(resolve => setTimeout(resolve, 1000)) // Wait 1 second before retrying
}
}
return {}
}

async function getNFTInfo(contractAddress, tokenId, type) {
const ABI = type === 'ERC721' ? ERC721_ABI : ERC1155_ABI
const contract = new web3.eth.Contract(ABI, contractAddress)
try {
console.log(
`Fetching NFT info for contract: ${contractAddress}, tokenId: ${tokenId}`
)
let [name, symbol, tokenURI] = await Promise.all([
contract.methods
.name()
.call()
.catch(() => 'Unknown'),
contract.methods
.symbol()
.call()
.catch(() => 'Unknown'),
(type === 'ERC721'
? contract.methods.tokenURI(tokenId).call()
: contract.methods.uri(tokenId).call()
).catch(() => 'Unknown'),
])

console.log(`Raw tokenURI: ${tokenURI}`)

// Handle ERC1155 URI with {id} placeholder
if (type === 'ERC1155' && tokenURI.includes('{id}')) {
const hexId = web3.utils.padLeft(web3.utils.toHex(tokenId), 64).slice(2)
tokenURI = tokenURI.replace('{id}', hexId)
console.log(`Adjusted ERC1155 tokenURI: ${tokenURI}`)
}

const metadata = await fetchTokenMetadata(tokenURI)
console.log(`Fetched metadata:`, metadata)

return {
name,
symbol,
tokenURI,
metadata,
image: metadata.image || 'No image available',
description: metadata.description || 'No description available',
}
} catch (error) {
console.error(
`Error fetching NFT info for ${contractAddress} token ${tokenId}:`,
error
)
return {
name: 'Unknown',
symbol: 'Unknown',
tokenURI: 'Unknown',
metadata: {},
image: 'No image available',
description: 'No description available',
}
}
}

function formatTokenValue(value, decimals) {
const divisor = BigInt(10) ** BigInt(decimals)
const bigIntValue = BigInt(value)
const integerPart = bigIntValue / divisor
const fractionalPart = bigIntValue % divisor

// Pad the fractional part with leading zeros
const fractionalStr = fractionalPart.toString().padStart(decimals, '0')

// Trim trailing zeros
const trimmedFractionalStr = fractionalStr.replace(/0+$/, '')

return trimmedFractionalStr
? `${integerPart}.${trimmedFractionalStr}`
: `${integerPart}`
}

async function processERC20Transfers(transfers) {
const results = []
for (const transfer of transfers) {
const tokenInfo = await getERC20Info(transfer.contract)
results.push({
...transfer,
...tokenInfo,
valueFormatted: formatTokenValue(transfer.value, tokenInfo.decimals),
})
await new Promise(resolve => setTimeout(resolve, 100)) // Add a 100ms delay between requests
}
return results
}

async function processNFTTransfers(transfers, type) {
return Promise.all(
transfers.map(async transfer => {
console.log(
`Processing ${type} transfer for contract: ${transfer.contract}, tokenId: ${transfer.tokenId}`
)
const nftInfo = await getNFTInfo(
transfer.contract,
transfer.tokenId,
type
)
console.log(`Processed ${type} transfer:`, nftInfo)
return {
...transfer,
...nftInfo,
metadata: nftInfo.metadata,
image: nftInfo.image,
description: nftInfo.description,
}
})
)
}

async function sendToWebhook(data) {
try {
await axios.post(WEBHOOK_URL, data)
console.log('Data sent to webhook successfully')
} catch (error) {
console.error('Error sending data to webhook:', error)
}
}

async function main(params) {
try {
if (!params || !params.data) {
throw new Error('Invalid input: params.data is missing')
}

const { erc20 = [], erc721 = [], erc1155 = [] } = params.data
await setupWeb3()

const enrichedData = {
erc20: await processERC20Transfers(erc20),
erc721: await processNFTTransfers(erc721, 'ERC721'),
erc1155: await processNFTTransfers(erc1155, 'ERC1155'),
}

await sendToWebhook(enrichedData)

return {
status: 'Data processed and sent to webhook',
transferCounts: {
erc20: erc20.length,
erc721: erc721.length,
erc1155: erc1155.length,
},
}
} catch (error) {
console.error('Error in main function:', error)
return {
status: 'Error processing data',
error: error.message,
}
}
}

exports.main = main

Click save and close to exit the Function wizard. Next, go back to Streams, and click "Create Stream" to return to the Stream wizard.

Connecting the Stream to the Function

In the Streams wizard, select the Function we just created in the Streams destinations settings. Then, click deploy to start your Stream. Next, check your webhook to see the processed data coming in. Next, let's take a moment to recap how our Stream, Function, QuickNode RPC, and IPFS Gateway will work together to capture and index token transfers.

How it Works


  • Stream Filtering: The Stream captures all token transfer events for ERC20, ERC721, and ERC1155 tokens.
  • Data Processing: The Function receives the filtered data and enriches it:
    • For ERC20 tokens: It fetches token name, symbol, and decimals.
    • For NFTs (ERC721 and ERC1155): It retrieves token metadata, including name, symbol, and token URI.
  • Data Enrichment: The Function uses the QuickNode RPC endpoint to make additional blockchain calls for fetching token information.
  • IPFS Integration: For NFTs with IPFS-based metadata, the Function uses the QuickNode IPFS gateway to retrieve the metadata.
  • Webhook Delivery: The enriched data is sent to the specified webhook URL for further use or storage.

Key Features


  • Real-time Processing: Capture and process token transfers as they happen on the blockchain.
  • Multi-token Support: Handle ERC20, ERC721, and ERC1155 transfers in a single system.
  • Data Enrichment: Automatically fetch and append relevant token information to transfer data.
  • Error Handling: Implement retries and fallbacks for robust data retrieval.
  • Scalability: Leverage QuickNode's infrastructure to handle high volumes of transfer events.

Customization Options


  • Modify the Stream filter to focus on specific contracts or event types.
  • Adjust the Function code to add more custom processing logic or integrate with other services.
  • Implement additional data storage solutions beyond the webhook (e.g., database integration) directly within the function.

More resources


Conclusion

This guide demonstrates how to build a powerful, real-time token transfer indexer using QuickNode's Streams and Functions. This setup provides a foundation for various blockchain applications, including portfolio trackers, trading bots, and analytics platforms. By leveraging QuickNode's infrastructure, you can easily scale this solution to handle large volumes of data across multiple blockchains, enabling you to build robust and responsive blockchain applications.

We ❤️ Feedback!

Let us know if you have any feedback or requests for new topics. We'd love to hear from you.

Share this guide