Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
<img src="/resources/tiders_logo2.png" alt="Tiders" width="1000">

[![Documentation](https://img.shields.io/badge/documentation-blue?style=for-the-badge&logo=readthedocs)](https://yulesa.github.io/tiders-docs/)
[![PyPI](https://img.shields.io/badge/PyPI-lightgreen?style=for-the-badge&logo=pypi&labelColor=white)](https://pypi.org/project/tiders/)
[![tiders-core](https://img.shields.io/badge/github-black?style=for-the-badge&logo=github)](https://github.com/yulesa/tiders-core)
[![tiders-rpc-client](https://img.shields.io/badge/github-black?style=for-the-badge&logo=github)](https://github.com/yulesa/tiders-rpc-client)
[![Documentation](https://img.shields.io/badge/documentation-blue?style=for-the-badge&logo=readthedocs)](https://yulesa.github.io/tiders-docs/)
[![telegram](https://img.shields.io/badge/telegram-blue?style=for-the-badge&logo=telegram)](https://t.me/tidersindexer)

Tiders is an open-source framework that simplifies getting data out of blockchains and into your favorite tools. Whether you are building a DeFi dashboard, tracking NFT transfers, or running complex analytics, Tiders handles the heavy lifting of fetching, cleaning, transforming and storing blockchain data.

Expand Down
6 changes: 6 additions & 0 deletions examples/uniswap_v3/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ CLICKHOUSE_PASSWORD=default
CLICKHOUSE_DATABASE=default
CLICKHOUSE_SECURE=false

# --- PostgreSQL (only needed with --database postgresql) ---

POSTGRES_USER=postgres
POSTGRES_PASSWORD=secret
POSTGRES_DB=tiders

# --- Iceberg (only needed with --database iceberg) ---

ICEBERG_NAMESPACE=default
87 changes: 84 additions & 3 deletions examples/uniswap_v3/uniswap_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
#
# uv run uniswap_v3.py --provider <hypersync|sqd|rpc> --from_block 12369621 --to_block 12370621
# [--rpc_url URL] \ # only needed with --provider rpc
# [--database BACKEND] # default: pyarrow. Options: pyarrow, duckdb, delta_lake, clickhouse, iceberg
# [--database BACKEND] # default: pyarrow. Options: pyarrow, duckdb, delta_lake, clickhouse, iceberg, postgresql
#
# Output is written to data/uniswap_v3/
#
Expand Down Expand Up @@ -77,7 +77,7 @@
DATA_PATH = str(Path.cwd() / "data")
Path(DATA_PATH).mkdir(parents=True, exist_ok=True)

WRITER_CHOICES = ["clickhouse", "delta_lake", "duckdb", "iceberg", "pyarrow"]
WRITER_CHOICES = ["clickhouse", "delta_lake", "duckdb", "iceberg", "pyarrow", "postgresql"]

# Table name aliases
POOL_CREATED_LOGS_TABLE = "uniswap_v3_factory_pool_created_logs"
Expand Down Expand Up @@ -199,6 +199,31 @@ async def create_writer(database: str) -> cc.Writer:
kind=cc.WriterKind.CLICKHOUSE,
config=cc.ClickHouseWriterConfig(client=client),
)

if database == "postgresql":
import psycopg

host = os.environ.get("POSTGRES_HOST", "localhost")
port = int(os.environ.get("POSTGRES_PORT", "5432"))
user = os.environ.get("POSTGRES_USER", "postgres")
password = os.environ.get("POSTGRES_PASSWORD", "secret")
dbname = os.environ.get("POSTGRES_DB", "tiders")

_conninfo = " ".join(
[
f"host={host}",
f"port={port}",
f"dbname={dbname}",
f"user={user}",
f"password={password}",
]
)
connection = await psycopg.AsyncConnection.connect(_conninfo, autocommit=False)

return cc.Writer(
kind=cc.WriterKind.POSTGRESQL,
config=cc.PostgresqlWriterConfig(connection=connection),
)

if database == "iceberg":
from pyiceberg.catalog import load_catalog
Expand Down Expand Up @@ -350,8 +375,52 @@ async def load_pool_addresses(database: str) -> list[str]:
addresses = {value.as_py() for value in table["pool"] if value.as_py()}
return sorted(addresses)

if database == "clickhouse":
import clickhouse_connect

client = await clickhouse_connect.get_async_client(
host=os.environ.get("CLICKHOUSE_HOST", "localhost"),
port=int(os.environ.get("CLICKHOUSE_PORT", "8123")),
username=os.environ.get("CLICKHOUSE_USER", "default"),
password=os.environ.get("CLICKHOUSE_PASSWORD", "default"),
database=os.environ.get("CLICKHOUSE_DATABASE", "default"),
secure=os.environ.get("CLICKHOUSE_SECURE", "false").lower() == "true",
)
result = await client.query(
f"SELECT DISTINCT pool FROM {POOL_CREATED_TABLE} WHERE pool IS NOT NULL"
)
addresses = {row[0] for row in result.result_rows if row[0]}
return sorted(addresses)

if database == "postgresql":
import psycopg

host = os.environ.get("POSTGRES_HOST", "localhost")
port = int(os.environ.get("POSTGRES_PORT", "5432"))
user = os.environ.get("POSTGRES_USER", "postgres")
password = os.environ.get("POSTGRES_PASSWORD", "secret")
dbname = os.environ.get("POSTGRES_DB", "tiders")

_conninfo = " ".join(
[
f"host={host}",
f"port={port}",
f"dbname={dbname}",
f"user={user}",
f"password={password}",
]
)
async with await psycopg.AsyncConnection.connect(_conninfo) as conn:
async with conn.cursor() as cur:
await cur.execute(
f"SELECT DISTINCT pool FROM {POOL_CREATED_TABLE} WHERE pool IS NOT NULL"
)
rows = await cur.fetchall()
addresses = {row[0] for row in rows if row[0]}
return sorted(addresses)

raise ValueError(
f"Pool loading for database '{database}' is not supported. Use one of: duckdb, pyarrow, delta_lake."
f"Pool loading for database '{database}' is not supported. Use one of: duckdb, pyarrow, delta_lake, clickhouse, postgresql."
)


Expand Down Expand Up @@ -401,6 +470,13 @@ def _pool_event_steps() -> list[cc.Step]:
),
),
)
steps.append(
cc.Step(
name="join_blocks_data",
kind=cc.StepKind.JOIN_BLOCK_DATA,
config=cc.JoinBlockDataConfig(),
)
)

# Convert binary columns (addresses, hashes, topics) to "0x..." hex strings.
steps.append(
Expand Down Expand Up @@ -440,6 +516,7 @@ async def run_pool_events_pipeline(
logs=[
ingest.evm.LogRequest(
address=pool_addresses,
include_blocks=True
)
],
fields=ingest.evm.Fields(
Expand All @@ -455,6 +532,10 @@ async def run_pool_events_pipeline(
topic3=True,
data=True,
),
block=ingest.evm.BlockFields(
timestamp=True,
number=True,
)
),
),
)
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ homepage = "https://github.com/yulesa/tiders"
readme = "README.md"
license = "MIT OR Apache-2.0"
authors = [
{ name = "Ozgur Akkurt", email = "ozgur@steelcake.com" },
{ name = "Yule Andrade", email = "yulesa@gmail.com" },
{ name = "Ozgur Akkurt", email = "ozgur@steelcake.com" },
]
requires-python = ">=3.11"
dependencies = [
Expand All @@ -27,7 +27,7 @@ iceberg = ["pyiceberg>=0.10.0"]
polars = ["polars>=1.34.0"]
pandas = ["pandas>=2.0.0"]
datafusion = ["datafusion>=50.1.0"]
postgresql = ["psycopg[binary]>=3.1.0"]
postgresql = ["psycopg[binary]>=3.1.0; python_version < '3.14'"]
all = ["tiders[duckdb,clickhouse,delta_lake,iceberg,polars,pandas,datafusion,postgresql]"]

[project.scripts]
Expand Down
Loading