diff --git a/README.md b/README.md index bcee02f..8cea55d 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,10 @@ Tiders +[![Documentation](https://img.shields.io/badge/documentation-blue?style=for-the-badge&logo=readthedocs)](https://yulesa.github.io/tiders-docs/) [![PyPI](https://img.shields.io/badge/PyPI-lightgreen?style=for-the-badge&logo=pypi&labelColor=white)](https://pypi.org/project/tiders/) [![tiders-core](https://img.shields.io/badge/github-black?style=for-the-badge&logo=github)](https://github.com/yulesa/tiders-core) [![tiders-rpc-client](https://img.shields.io/badge/github-black?style=for-the-badge&logo=github)](https://github.com/yulesa/tiders-rpc-client) -[![Documentation](https://img.shields.io/badge/documentation-blue?style=for-the-badge&logo=readthedocs)](https://yulesa.github.io/tiders-docs/) +[![telegram](https://img.shields.io/badge/telegram-blue?style=for-the-badge&logo=telegram)](https://t.me/tidersindexer) Tiders is an open-source framework that simplifies getting data out of blockchains and into your favorite tools. Whether you are building a DeFi dashboard, tracking NFT transfers, or running complex analytics, Tiders handles the heavy lifting of fetching, cleaning, transforming and storing blockchain data. diff --git a/examples/uniswap_v3/.env.example b/examples/uniswap_v3/.env.example index 05e22c9..31d7066 100644 --- a/examples/uniswap_v3/.env.example +++ b/examples/uniswap_v3/.env.example @@ -18,6 +18,12 @@ CLICKHOUSE_PASSWORD=default CLICKHOUSE_DATABASE=default CLICKHOUSE_SECURE=false +# --- PostgreSQL (only needed with --database postgresql) --- + +POSTGRES_USER=postgres +POSTGRES_PASSWORD=secret +POSTGRES_DB=tiders + # --- Iceberg (only needed with --database iceberg) --- ICEBERG_NAMESPACE=default diff --git a/examples/uniswap_v3/uniswap_v3.py b/examples/uniswap_v3/uniswap_v3.py index 0802190..4573323 100644 --- a/examples/uniswap_v3/uniswap_v3.py +++ b/examples/uniswap_v3/uniswap_v3.py @@ -33,7 +33,7 @@ # # uv run uniswap_v3.py --provider --from_block 12369621 --to_block 12370621 # [--rpc_url URL] \ # only needed with --provider rpc -# [--database BACKEND] # default: pyarrow. Options: pyarrow, duckdb, delta_lake, clickhouse, iceberg +# [--database BACKEND] # default: pyarrow. Options: pyarrow, duckdb, delta_lake, clickhouse, iceberg, postgresql # # Output is written to data/uniswap_v3/ # @@ -77,7 +77,7 @@ DATA_PATH = str(Path.cwd() / "data") Path(DATA_PATH).mkdir(parents=True, exist_ok=True) -WRITER_CHOICES = ["clickhouse", "delta_lake", "duckdb", "iceberg", "pyarrow"] +WRITER_CHOICES = ["clickhouse", "delta_lake", "duckdb", "iceberg", "pyarrow", "postgresql"] # Table name aliases POOL_CREATED_LOGS_TABLE = "uniswap_v3_factory_pool_created_logs" @@ -199,6 +199,31 @@ async def create_writer(database: str) -> cc.Writer: kind=cc.WriterKind.CLICKHOUSE, config=cc.ClickHouseWriterConfig(client=client), ) + + if database == "postgresql": + import psycopg + + host = os.environ.get("POSTGRES_HOST", "localhost") + port = int(os.environ.get("POSTGRES_PORT", "5432")) + user = os.environ.get("POSTGRES_USER", "postgres") + password = os.environ.get("POSTGRES_PASSWORD", "secret") + dbname = os.environ.get("POSTGRES_DB", "tiders") + + _conninfo = " ".join( + [ + f"host={host}", + f"port={port}", + f"dbname={dbname}", + f"user={user}", + f"password={password}", + ] + ) + connection = await psycopg.AsyncConnection.connect(_conninfo, autocommit=False) + + return cc.Writer( + kind=cc.WriterKind.POSTGRESQL, + config=cc.PostgresqlWriterConfig(connection=connection), + ) if database == "iceberg": from pyiceberg.catalog import load_catalog @@ -350,8 +375,52 @@ async def load_pool_addresses(database: str) -> list[str]: addresses = {value.as_py() for value in table["pool"] if value.as_py()} return sorted(addresses) + if database == "clickhouse": + import clickhouse_connect + + client = await clickhouse_connect.get_async_client( + host=os.environ.get("CLICKHOUSE_HOST", "localhost"), + port=int(os.environ.get("CLICKHOUSE_PORT", "8123")), + username=os.environ.get("CLICKHOUSE_USER", "default"), + password=os.environ.get("CLICKHOUSE_PASSWORD", "default"), + database=os.environ.get("CLICKHOUSE_DATABASE", "default"), + secure=os.environ.get("CLICKHOUSE_SECURE", "false").lower() == "true", + ) + result = await client.query( + f"SELECT DISTINCT pool FROM {POOL_CREATED_TABLE} WHERE pool IS NOT NULL" + ) + addresses = {row[0] for row in result.result_rows if row[0]} + return sorted(addresses) + + if database == "postgresql": + import psycopg + + host = os.environ.get("POSTGRES_HOST", "localhost") + port = int(os.environ.get("POSTGRES_PORT", "5432")) + user = os.environ.get("POSTGRES_USER", "postgres") + password = os.environ.get("POSTGRES_PASSWORD", "secret") + dbname = os.environ.get("POSTGRES_DB", "tiders") + + _conninfo = " ".join( + [ + f"host={host}", + f"port={port}", + f"dbname={dbname}", + f"user={user}", + f"password={password}", + ] + ) + async with await psycopg.AsyncConnection.connect(_conninfo) as conn: + async with conn.cursor() as cur: + await cur.execute( + f"SELECT DISTINCT pool FROM {POOL_CREATED_TABLE} WHERE pool IS NOT NULL" + ) + rows = await cur.fetchall() + addresses = {row[0] for row in rows if row[0]} + return sorted(addresses) + raise ValueError( - f"Pool loading for database '{database}' is not supported. Use one of: duckdb, pyarrow, delta_lake." + f"Pool loading for database '{database}' is not supported. Use one of: duckdb, pyarrow, delta_lake, clickhouse, postgresql." ) @@ -401,6 +470,13 @@ def _pool_event_steps() -> list[cc.Step]: ), ), ) + steps.append( + cc.Step( + name="join_blocks_data", + kind=cc.StepKind.JOIN_BLOCK_DATA, + config=cc.JoinBlockDataConfig(), + ) + ) # Convert binary columns (addresses, hashes, topics) to "0x..." hex strings. steps.append( @@ -440,6 +516,7 @@ async def run_pool_events_pipeline( logs=[ ingest.evm.LogRequest( address=pool_addresses, + include_blocks=True ) ], fields=ingest.evm.Fields( @@ -455,6 +532,10 @@ async def run_pool_events_pipeline( topic3=True, data=True, ), + block=ingest.evm.BlockFields( + timestamp=True, + number=True, + ) ), ), ) diff --git a/pyproject.toml b/pyproject.toml index 603af62..7cfff6d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,8 +7,8 @@ homepage = "https://github.com/yulesa/tiders" readme = "README.md" license = "MIT OR Apache-2.0" authors = [ - { name = "Ozgur Akkurt", email = "ozgur@steelcake.com" }, { name = "Yule Andrade", email = "yulesa@gmail.com" }, + { name = "Ozgur Akkurt", email = "ozgur@steelcake.com" }, ] requires-python = ">=3.11" dependencies = [ @@ -27,7 +27,7 @@ iceberg = ["pyiceberg>=0.10.0"] polars = ["polars>=1.34.0"] pandas = ["pandas>=2.0.0"] datafusion = ["datafusion>=50.1.0"] -postgresql = ["psycopg[binary]>=3.1.0"] +postgresql = ["psycopg[binary]>=3.1.0; python_version < '3.14'"] all = ["tiders[duckdb,clickhouse,delta_lake,iceberg,polars,pandas,datafusion,postgresql]"] [project.scripts]