Skip to content

0xfave/onchain-intelligence-engine

Repository files navigation

Solana Token Data ETL Pipeline

A real-time Solana on-chain intelligence engine that tracks DEX activity and surfaces high-signal trading and risk events before they appear on dashboards.

Overview

Extract transaction data from Solana RPC, transform it into structured analytics data, and load it into ClickHouse for fast querying and analysis.

Architecture

┌─────────────────┐     ┌──────────────────┐     ┌─────────────────┐
│ Helius RPC      │────▶│ ETL Pipeline     │────▶│ ClickHouse      │
│ (Data Source)   │     │ (Rust + Carbon)  │     │ (Time-Series)   │
└─────────────────┘     └──────────────────┘     └─────────────────┘
                               │                        ▲
                               ▼                        │
                        ┌──────────────────┐     ┌─────────────────┐
                        │ Token Holders    │     │ Backfill        │
                        │ Background Job   │     │ Historical Data │
                        └──────────────────┘     └─────────────────┘

Supported Protocols

Protocol Status Events Tracked
Pump.fun ✅ Active Buy, Sell, Create, Migrate
Moonshot ✅ Active Buy, Sell, Create, MigrateFunds
Launchlabs ✅ Active Trade, Create, MigrateToAmm, MigrateToCpswap
PumpSwap ✅ Active Swap, AddLiquidity, RemoveLiquidity

Features

Real-time Streaming

  • Live transaction monitoring via Helius RPC
  • Carbon library decoders for battle-tested instruction parsing
  • Sub-second latency from transaction to ClickHouse

Historical Backfill

  • cargo run -- --backfill - Backfill 6 months of historical data
  • Resumable - stores progress in ClickHouse, continues from where it stopped
  • Parallel metadata fetching with rate limiting

Token Metadata Enrichment

  • Fetches token names, symbols, and images via Helius getTokenMetadata
  • Automatic metadata backfill for all discovered tokens
  • Image URIs from on-chain metadata (Arweave/IPFS)

Holder Tracking

  • Complete holder lists for all tracked tokens
  • Adaptive refresh intervals (5min for new tokens, 60min for established)
  • Memory-bounded tracking (5000 tokens max)

Quick Start

# Clone and setup
git clone <repo>
cd solana-token-data-etl

# Install dependencies
cargo build

# Configure environment
cp .env.example .env
# Edit .env with your Helius API key and ClickHouse credentials

# Run real-time crawler
cargo run

# Run historical backfill (6 months)
cargo run -- --backfill

Environment Variables

# Helius RPC (required)
HELIUS_API_KEY=your_api_key
HELIUS_RPC_URL=https://mainnet.helius-rpc.com/?api-key=$HELIUS_API_KEY

# ClickHouse (required)
CLICKHOUSE_HOST=localhost
CLICKHOUSE_PORT=8123
CLICKHOUSE_DB=default
CLICKHOUSE_USER=default
CLICKHOUSE_PASSWORD=

# Optional overrides
CONFIG_PATH=config.toml

config.toml

[protocols.pumpfun]
enabled = true
program_id = "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P"

[protocols.pumpswap]
enabled = true
program_id = "pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA"

[protocols.launchlabs]
enabled = true
program_id = "LanMV9sAd7wArD4vJFi2qDdfnVhFxYSUg6eADduJ3uj"

[protocols.moonshot]
enabled = true
program_id = "MoonCVVNZFSYkqNXP6bxHLPL6QQJiMagDL3qcqUQTrG"

[holder_refresh]
max_tracked_tokens = 5000
new_token_threshold_secs = 3600
refresh_interval_new_secs = 300
refresh_interval_established_secs = 3600
inactive_threshold_secs = 86400
wake_up_interval_secs = 30

Database Schema

dex_events - All DEX trades

CREATE TABLE dex_events (
    slot Int64,
    block_time DateTime,
    tx_signature String,
    market LowCardinality(String),      -- "pumpfun", "moonshot", "launchlabs", "pumpswap"
    event_type LowCardinality(String),  -- "trade", "create", "migrate"
    action LowCardinality(String),      -- "Buy", "Sell", "Create", "Migrate"
    is_buy UInt8,
    is_sell UInt8,
    is_create UInt8,
    user String,
    pool String,
    base_mint String,
    quote_mint String,
    base_token_symbol String,
    quote_token_symbol String,
    token_name Nullable(String),
    token_symbol Nullable(String),
    token_uri Nullable(String),
    token_creator Nullable(String),
    token_deployer Nullable(String),
    amount UInt64,
    sol_amount Nullable(UInt64),
    token_amount Nullable(UInt64),
    price Nullable(Float64),
    fees_sol Nullable(Float64),
    indexed_at Int64
) ENGINE = MergeTree
PARTITION BY toYYYYMM(block_time)
ORDER BY (market, base_mint, block_time, slot);

tokens - Token metadata

CREATE TABLE tokens (
    mint String,
    symbol String,
    name String,
    decimals UInt8,
    image_uri Nullable(String),
    market LowCardinality(String),
    creator String,
    deployer String,
    creation_tx String,
    quote_token String,
    status LowCardinality(String) DEFAULT 'default',
    created_at Int64,
    updated_at Int64 DEFAULT now64()
) ENGINE = ReplacingMergeTree(updated_at)
ORDER BY (mint);

token_all_holders - Complete holder lists

CREATE TABLE token_all_holders (
    mint String,
    holder String,
    balance UInt64,
    updated_at Int64 DEFAULT now64()
) ENGINE = ReplacingMergeTree(updated_at)
ORDER BY (mint, balance DESC);

Example Queries

-- Top traded tokens in the last 24h
SELECT base_mint, COUNT(*) as trades, SUM(amount) as volume
FROM dex_events
WHERE block_time > now() - INTERVAL 1 DAY
GROUP BY base_mint
ORDER BY volume DESC
LIMIT 20;

-- Top holders for a token
SELECT holder, balance
FROM token_all_holders
WHERE mint = 'TOKEN_MINT_ADDRESS'
ORDER BY balance DESC
LIMIT 20;

-- Biggest buyers in the last hour
SELECT user, SUM(sol_amount) as total_bought
FROM dex_events
WHERE market = 'pumpfun'
    AND action = 'Buy'
    AND block_time > now() - INTERVAL 1 HOUR
GROUP BY user
ORDER BY total_bought DESC
LIMIT 10;

-- Track a specific wallet's activity
SELECT *
FROM dex_events
WHERE user = 'WALLET_ADDRESS'
ORDER BY block_time DESC
LIMIT 100;

Backfill Behavior

The backfill command is resumable:

  1. First run: Fetches all transactions from 6 months ago to now
  2. Stores last processed signature per protocol in backfill_state table
  3. Subsequent runs: Continue from last signature, only fetch new transactions
  4. Fetches token metadata (name, symbol, image) for all discovered tokens
# Initial backfill (takes hours for full history)
cargo run -- --backfill

# Next day - only fetches new transactions (minutes)
cargo run -- --backfill

Design Decisions

1. Multi-Protocol Storage

Single dex_events table with market column instead of protocol-specific tables.

Benefits:

  • One query for cross-protocol analytics
  • No schema changes when adding new protocols
  • Shared storage and query optimization

2. Adaptive Holder Refresh

Different refresh rates based on token age:

Token Age Refresh Interval Rationale
< 1 hour 5 minutes Rapid holder changes
>= 1 hour 60 minutes Stable distribution

3. Memory-Bounded Tracking

Max 5000 tokens tracked for holders, evict oldest by last_trade.

Benefits:

  • Prevents unbounded memory growth
  • Prioritizes active/trading tokens
  • Automatic cleanup of stale data

4. Metadata Backfill

Token names/symbols/images fetched via Helius getTokenMetadata API.

Behavior:

  • Names/symbols empty on creation, filled during backfill
  • Images fetched from on-chain URI (Arweave/IPFS)
  • Replaces empty values on conflict via ReplacingMergeTree

Project Structure

src/
├── main.rs                    # CLI entry point (run / --backfill)
├── core/
│   ├── app_config.rs          # Config loading from TOML
│   ├── backfill.rs            # Historical data backfill
│   ├── crawler.rs             # Main ETL orchestrator
│   ├── datasource/rpc.rs      # Generic RPC connection
│   ├── event.rs               # DexEventRow struct
│   ├── holder_fetcher.rs      # Token holder refresh job
│   └── market.rs              # Market enum
├── protocols/
│   ├── pumpfun/               # Pump.fun processor
│   ├── pumpswap/              # PumpSwap processor
│   ├── moonshot/              # Moonshot processor
│   └── launchlabs/            # Launchlabs processor
└── storage/
    └── clickhouse.rs          # ClickHouse client & queries

config.toml                    # Protocol configuration
migrations/                    # ClickHouse schema
.env                           # Environment variables

Technology Stack

  • Language: Rust 2021 Edition
  • RPC: Helius (enhanced Solana RPC)
  • Decoders: Carbon library (battle-tested)
  • Database: ClickHouse (time-series analytics)
  • HTTP: reqwest (token metadata fetching)
  • Async: Tokio runtime

License

MIT License - see LICENSE.md

About

Real-time Solana on-chain intelligence engine that tracks DEX activity and converts raw transaction data into actionable trading and risk signals. Built with Rust and ClickHouse for sub-second ingestion, enabling detection of wallet behavior, liquidity shifts, and high-signal events before they surface on dashboards.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors