Solana token transaction indexer (data pipeline)

Table of Contents
Requirement: get last ~30 days of token transfer data for major tokens (stream new data into the database, handle failures, ensure pipeline recovery and backfill historical data).
The Helius API will be used as a data source for this demo. Standard Websocket was picked over Enhanced Websocket or LaserStream, as I donβt have access to a paid API key at the moment.
β demo link: demo1.runatyr.dev #
π¨ Architecture diagram #

Implementation steps #
πΏ Stream new data into the database #
First, start the database and websocket connection.
Use the logsSubscribe websocket method to get transfers. Pass the mint/token address for the mentions param and confirmed for commitment. In this demo we will monitor BONK token transactions (helps avoid high transaction volumes in other tokens like USDC which cause rate limit errors in my free api key).
For each token to be monitored, a separate websocket is needed. It is not a good idea to use a single websocket with the SPL token program address to then filter by token address, as that generates initially millions of transactions (all tokens).
βΉοΈ Note: For Enterprise customers, use the enhanced websocket as it allows using a single websocket for multiple addresses with the
accountIncludeparameter
To handle websocket disconnections we use a scheduleReconnect method that will run when a disconnection is detected, except for intentional disconnections. In case we need multiple retries, each one will be slower than the previous one (exponential backoff) to prevent overloading the server.
scheduleReconnect() {
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout);
}
log(`Reconnecting in ${this.reconnectDelay}ms...`);
this.reconnectTimeout = setTimeout(() => {
this.connect();
}, this.reconnectDelay);
// Exponential backoff (max 30 seconds)
this.reconnectDelay = Math.min(
this.reconnectDelay * 2,
config.indexer.maxReconnectDelayMs
);
}
This is the summarized data flow related to the websocket: we connect, receive signature and logs, and callback index.js with the received signature.
index.js β websocket.js (connect to Helius WebSocket)
Helius WebSocket β websocket.js (receive signature + logs)
websocket.js β index.js (onSignature) (callback with signature)
π± Data enrichment via rpc node #
Since the standard websocket logsSubscribe only provides basic data: signature + logs, this demo includes additional steps to get full transaction details such as token balances which allows calculating balance changes and thus determining senders/receivers from transactions. We get that extra info from direct rpc queries:
index.js β rpc.js (getTransaction) (fetch full transaction details)
rpc.js β index.js (return transaction data)
index.js β parser.js (parseTransaction) (extract token transfers)
parser.js β parser.js (calculateBalanceChanges) (compute pre/post balance diff)
parser.js β parser.js (filter by mints) (always filter - only save configured mints)
parser.js β index.js (return filtered transfers array)
βΉοΈ Note: If we use enhanced websocket, the previous steps would not be needed, as the websocket messages already include the enriched data such as fee, preBalances and postBalances.
π Memory buffering and database insert #
The database mainly stores from/to address, signature, mint/token address and amount. It has an index for mint address, and from/to addresses to speed up queries.
βΉοΈ Expand to see database createSchema
async createSchema() {
const schema = `
CREATE TABLE IF NOT EXISTS token_transfers (
id BIGSERIAL PRIMARY KEY,
signature VARCHAR(88) NOT NULL UNIQUE,
slot BIGINT NOT NULL,
block_time TIMESTAMP,
mint VARCHAR(44) NOT NULL,
from_address VARCHAR(44) NOT NULL,
to_address VARCHAR(44) NOT NULL,
amount BIGINT NOT NULL,
decimals INT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_transfers_mint
ON token_transfers(mint, block_time DESC);
CREATE INDEX IF NOT EXISTS idx_transfers_from
ON token_transfers(from_address, block_time DESC);
CREATE INDEX IF NOT EXISTS idx_transfers_to
ON token_transfers(to_address, block_time DESC);
CREATE INDEX IF NOT EXISTS idx_transfers_slot
ON token_transfers(slot DESC);
`;
Finally, we buffer the transactions in memory and insert in batches to the database. We use batch insert to improve performance (database insert operations are required less often):
index.js β index.js (buffer) (accumulate transfers)
index.js β database.js (batchInsert) (insert when buffer full/timer)
We buffer and then insert if there are 100 transfers or more in the buffer or if 1 second has passed (it depends on how fast you need the data available).
βΉοΈ Expand to see buffering code snippet
if (transfers.length > 0) {
this.stats.parsed += transfers.length; // Update stats
this.buffer.push(...transfers); // Add to in-memory buffer
// Flush if buffer is full
if (this.buffer.length >= config.indexer.batchSize) { // Default: 100
await this.flushBuffer();
}
}
///ommited code
async flushBuffer() {
if (this.buffer.length === 0) return; // Nothing to do
const batch = [...this.buffer]; // Copy buffer (clone array)
this.buffer = []; // Clear buffer immediately
try {
// Single INSERT with multiple rows
const inserted = await this.db.batchInsert(batch);
this.stats.inserted += inserted;
if (inserted > 0) {
log(`Inserted ${inserted} transfers (buffer: ${batch.length})`);
}
} catch (error) {
// On failure: re-add batch to buffer (retry later)
this.buffer.push(...batch);
}
}
π΄π½ Backfilling historical data #
- For this demo, I plan on using two-phase RPC approach (
getSignaturesForAddressβgetTransaction) - (this is not yet implemented in the demo by the time I sent this document but itβs planned) - For a paying customer:
getTransactionsForAddressRPC method, or enhanced Transactions API can be used to combinegetSignaturesForAddress+getTransactionin a single call. - The backfilling strategy consists of getting
signaturesfor the mint address we are tracking, getting transaction details for those signatures, inserting and repeating until we fill the database up to the oldest desired data. Here is the step by step process:- Find oldest slot number in database
- Calculate target timestamp (current minus 30 days)
- Fetch signature batches (eg. 1000 signatures
limit,beforecertain signature) - Filter historical ( slot < oldestKnownSlot )
- Fetch transaction batches ( concurrent with rate limit )
- Parse and insert (reusing existing parser)
- Check if we reached target timestamp or else restart process
π¨ Handle failures and recover pipeline #
This is a summary of failure and recovery handling:
- Websocket has a
scheduleReconnectmethod to auto reconnect on failure - RPC requests are retried if they fail. Jitter and exponential backup is used to prevent multiple simultaneous requests that fail again
- Database duplicate prevention:
ON CONFLICT (signature) DO NOTHING - Database flush retry: failed batch re-added to buffer
- try/catch blocks for transaction processing, rpc requests and other non-critical processes, to log the error without crashing
- Errors metrics tracked
βΉοΈ Expand to see RPC retry mechanism code logic
// The base delay is 400ms and jitter is 200ms.
// This means retry 1 has 200-600ms, retry 2 600-1000ms, ans so on.
// We log a warning of the retried being done
if (!response.ok) {
// Retry on rate limit (429) or server errors (5xx)
if ((response.status === 429 || response.status >= 500) && retryCount < this.maxRetries) {
const baseDelay = this.baseDelay * Math.pow(2, retryCount); // Exponential backoff
const delay = this.getJitteredDelay(baseDelay);
logWarn(`HTTP ${response.status} for ${signature}, retrying in ${delay.toFixed(0)}ms (attempt ${retryCount + 1}/${this.maxRetries})`);
await this.sleep(delay);
return this.getTransaction(signature, retryCount + 1, delay);
}
βΉοΈ Expand to see RPC retry mechanism code logging and metrics
// The retries are logged and on succed added to a metric
// This helps track if retries are happening and if they are working
if (retryCount > 0) {
logWarn(`β Retry succeeded for ${signature} after ${retryCount} attempt(s) (waited ${lastDelay.toFixed(0)}ms)`);
if (this.stats) {
this.stats.rpcErrorsResolved++;
}
}
//Example log entry:
// token-indexer-85f8d5fb75-226kd indexer [2025-12-19T18:15:55.851Z] β Retry succeeded for 4uov9Qmo5sccY7VmTwSgUJECA61SFvRRFQSrRYwBL8TtYN8miVT7v4KSCCxfo3G7DeBnttE2P86t5ddcRPiM26bV after 1 attempt(s) (waited 337ms)
Here we can see retries are happening while errors are not increasing thanks to resolved retries:

These are other failure/recovery methods that are not implemented in the demo but are a good idea for production:
- Dead letter queue: After 3 retry attempts on a transaction, insert to a database failed_transactions table for future manual review
- Rate limiting: Max X concurrent RPC calls to avoid hitting rate limits when doing concurrent requests
- Metric generation and alerting based on error rate and buffer size.
π§ Recent Improvements: Transaction Loss Investigation #
After initial deployment, end-user tests revealed approximately 5% transaction loss - signatures received from WebSocket but not making it to the database. Here’s how the issue was diagnosed and resolved:
1. Instrumentation & Metrics
Added comprehensive tracking at each pipeline stage:
- Prometheus metrics:
rpc_null_total,rpc_failed_total,parser_no_transfers_total,duplicates_total - Structured logging with full signatures (not truncated) for traceability
- Debug logging to trace individual transactions through the entire pipeline
π Live Metrics Dashboard: View real-time pipeline metrics in Grafana

2. Root Cause Discovery
Analysis revealed the most possible cause was RPC getTransaction() was returning null for ~5% of signatures. Investigation showed this is most likely caused by WebSocket notifications fire immediately when transactions reach “confirmed” status, but RPC nodes need additional time (100-500ms) to index the transaction data.
3. Solution: Retry Logic for Null Responses
The key fix: Added retry logic specifically for null RPC responses. Previously, the system only retried on HTTP errors (429, 5xx) but would immediately give up on null responses. The new logic applies the same exponential backoff retry mechanism when getTransaction() returns null, giving the RPC node time to index the transaction data.
Additionally, the existing retry jitter was improved from fixed Β±200ms to linearly increasing (200ms β 300ms β 400ms β 500ms β 600ms) to better distribute retry attempts and prevent thundering herd effects.
βΉοΈ Expand to see implementation details
Retry Schedule (exponential backoff with increasing jitter):
- Retry 1: 200-600ms delay
- Retry 2: 500-1100ms delay
- Retry 3: 1200-2000ms delay
- Retry 4: 2700-3700ms delay
- Retry 5: 5800-7000ms delay
Code snippet:
// Retry on null response (transaction not yet confirmed/indexed)
if (data.result === null && retryCount < this.maxRetries) {
const baseDelay = this.baseDelay * Math.pow(2, retryCount);
const delay = this.getJitteredDelay(baseDelay, retryCount);
logger.warn({
event: 'rpc_retry',
signature: signature,
attempt: retryCount + 1,
max_attempts: this.maxRetries,
delay_ms: Math.round(delay),
reason: 'null_result'
}, 'RPC returned null, retrying');
await this.sleep(delay);
return this.getTransaction(signature, retryCount + 1, delay);
}
// All retries exhausted and still null - transaction will be LOST
if (data.result === null && retryCount >= this.maxRetries) {
this.attemptStats.fail++;
if (this.stats) {
this.stats.rpcFailed++;
}
logger.error({
event: 'rpc_fail',
rpc_fail_attempt: this.maxRetries,
signature: signature,
reason: 'null_after_retries',
tx_lost: true
}, `RPC returned null after ${this.maxRetries} attempts - TRANSACTION LOST`);
}
4. Results
Transaction loss improved from 5% to 0.5% (10x improvement). The remaining 0.5% was identified using Loki log aggregation - filtering by event="rpc_fail" and analyzing the http_status field revealed all failures were HTTP 429 (rate limits), which is expected behavior on the free tier during high transaction volumes.
πΌοΈ Demo app and Infrastructure info #
The app consists on two main components:
- Backend data pipeline: receives signatures and logs, queries rpc to get transaction data from those signatures, saves to database and exposes data via an API.
- User facing app: At demo1.runatyr.dev Iβve published a web app that queries the demo API every 2 seconds and shows incoming transactions from the BONK token.
βΉοΈ Note: Initially I chose USDC token but it produces too many transactions that then generate RPC rate limit errors. This is not an issue with a paid account as you get higher rate limit and also don’t need to query RPC separately as Helius Enhanced Websockets already has the transaction data included.
You can filter by address (1), do a few BONK transactions to see them appearing live, and there is also a live backend metrics link (2) to see metrics from the backend data pipeline in Grafana.

Live backend metrics example:

βΉοΈ Note: About scaling (horizontal scaling): Multiple indexer instances can run in parallel (different tokens per instance or load balanced), the database supports concurrent writes via
ON CONFLICThandling.
This is running on a self-hosted kubernetes cluster that follows best practices such as:
- Infrastructure as Code
- CI/CD pipeline: manifest syncs, dockerfile builder and tag updater
- Separate dev and prod environments (dev link: https://demo1-dev.runatyr.dev/)
- Observability: Prometheus, alertmanager, grafana