diff --git a/.github/workflows/github-action-build.yml b/.github/workflows/github-action-build.yml index af2a60920d5..0bbe6fd83d7 100644 --- a/.github/workflows/github-action-build.yml +++ b/.github/workflows/github-action-build.yml @@ -97,7 +97,6 @@ jobs: POSTGRES_PASSWORD: postgres ports: - 5432:5432 - # Add a health check so steps wait until Postgres is ready options: >- --health-cmd="pg_isready -U postgres" --health-interval=10s @@ -129,20 +128,105 @@ jobs: psql -h localhost -U postgres -f sql/texera_ddl.sql psql -h localhost -U postgres -f sql/iceberg_postgres_catalog.sql psql -h localhost -U postgres -f sql/texera_lakefs.sql + psql -h localhost -U postgres -f sql/texera_lakekeeper.sql env: PGPASSWORD: postgres - name: Create texera_db_for_test_cases run: psql -h localhost -U postgres -v DB_NAME=texera_db_for_test_cases -f sql/texera_ddl.sql env: PGPASSWORD: postgres + - name: Start MinIO + run: | + docker run -d --name minio --network host \ + -e MINIO_ROOT_USER=texera_minio \ + -e MINIO_ROOT_PASSWORD=password \ + minio/minio:RELEASE.2025-02-28T09-55-16Z server /data + + for i in $(seq 1 30); do + curl -sf http://localhost:9000/minio/health/live && break + echo "Waiting for MinIO... (attempt $i)" + sleep 2 + done + - name: Start Lakekeeper + run: | + docker run --rm --network host \ + -e LAKEKEEPER__PG_DATABASE_URL_READ=postgres://postgres:postgres@localhost:5432/texera_lakekeeper \ + -e LAKEKEEPER__PG_DATABASE_URL_WRITE=postgres://postgres:postgres@localhost:5432/texera_lakekeeper \ + -e LAKEKEEPER__PG_ENCRYPTION_KEY=texera_key \ + vakamo/lakekeeper:v0.11.0 migrate + + docker run -d --name lakekeeper --network host \ + -e LAKEKEEPER__PG_DATABASE_URL_READ=postgres://postgres:postgres@localhost:5432/texera_lakekeeper \ + -e LAKEKEEPER__PG_DATABASE_URL_WRITE=postgres://postgres:postgres@localhost:5432/texera_lakekeeper \ + -e LAKEKEEPER__PG_ENCRYPTION_KEY=texera_key \ + -e LAKEKEEPER__METRICS_PORT=9091 \ + vakamo/lakekeeper:v0.11.0 serve + + for i in $(seq 1 30); do + docker exec lakekeeper /home/nonroot/lakekeeper healthcheck && break + echo "Waiting for Lakekeeper to be ready... (attempt $i)" + sleep 2 + done + + # Final check - fail with logs if Lakekeeper didn't start + docker exec lakekeeper /home/nonroot/lakekeeper healthcheck || { + echo "Lakekeeper failed to start. Container logs:" + docker logs lakekeeper + exit 1 + } + - name: Initialize Lakekeeper Warehouse + run: | + docker run --rm --network host --entrypoint sh minio/mc -c \ + "mc alias set minio http://localhost:9000 texera_minio password && \ + mc mb --ignore-existing minio/texera-iceberg" + + curl -sf -X POST -H 'Content-Type: application/json' \ + -d '{"project-id":"00000000-0000-0000-0000-000000000000","project-name":"default"}' \ + http://localhost:8181/management/v1/project || true + + curl -sf -X POST -H 'Content-Type: application/json' -d '{ + "warehouse-name": "texera", + "project-id": "00000000-0000-0000-0000-000000000000", + "storage-profile": { + "type": "s3", + "bucket": "texera-iceberg", + "region": "us-west-2", + "endpoint": "http://localhost:9000", + "flavor": "s3-compat", + "path-style-access": true, + "sts-enabled": false + }, + "storage-credential": { + "type": "s3", + "credential-type": "access-key", + "aws-access-key-id": "texera_minio", + "aws-secret-access-key": "password" + } + }' http://localhost:8181/management/v1/warehouse - name: Compile with sbt run: sbt clean package + env: + STORAGE_ICEBERG_CATALOG_TYPE: rest + STORAGE_ICEBERG_CATALOG_REST_URI: http://localhost:8181/catalog/ + STORAGE_ICEBERG_CATALOG_REST_WAREHOUSE_NAME: texera + STORAGE_S3_ENDPOINT: http://localhost:9000 + STORAGE_S3_REGION: us-west-2 + STORAGE_S3_AUTH_USERNAME: texera_minio + STORAGE_S3_AUTH_PASSWORD: password - name: Set docker-java API version run: | echo "api.version=1.52" >> ~/.docker-java.properties cat ~/.docker-java.properties - name: Run backend tests run: sbt test + env: + STORAGE_ICEBERG_CATALOG_TYPE: rest + STORAGE_ICEBERG_CATALOG_REST_URI: http://localhost:8181/catalog/ + STORAGE_ICEBERG_CATALOG_REST_WAREHOUSE_NAME: texera + STORAGE_S3_ENDPOINT: http://localhost:9000 + STORAGE_S3_REGION: us-west-2 + STORAGE_S3_AUTH_USERNAME: texera_minio + STORAGE_S3_AUTH_PASSWORD: password python: strategy: @@ -166,9 +250,87 @@ jobs: run: sudo apt-get update && sudo apt-get install -y postgresql - name: Start PostgreSQL Service run: sudo systemctl start postgresql + - name: Configure PostgreSQL for TCP password auth + run: | + sudo -u postgres psql -c "ALTER USER postgres PASSWORD 'postgres';" + PG_HBA=$(sudo -u postgres psql -t -c "SHOW hba_file;" | xargs) + sudo sed -i 's/local\s\+all\s\+all\s\+peer/local all all md5/' "$PG_HBA" + echo "host all all 127.0.0.1/32 md5" | sudo tee -a "$PG_HBA" + echo "host all all ::1/128 md5" | sudo tee -a "$PG_HBA" + sudo systemctl restart postgresql - name: Create Database and User run: | - cd sql && sudo -u postgres psql -f iceberg_postgres_catalog.sql + cd sql + sudo -u postgres psql -f iceberg_postgres_catalog.sql + sudo -u postgres psql -f texera_lakekeeper.sql + - name: Start MinIO + run: | + docker run -d --name minio --network host \ + -e MINIO_ROOT_USER=texera_minio \ + -e MINIO_ROOT_PASSWORD=password \ + minio/minio:RELEASE.2025-02-28T09-55-16Z server /data + + for i in $(seq 1 30); do + curl -sf http://localhost:9000/minio/health/live && break + echo "Waiting for MinIO... (attempt $i)" + sleep 2 + done + - name: Start Lakekeeper + run: | + docker run --rm --network host \ + -e LAKEKEEPER__PG_DATABASE_URL_READ=postgres://postgres:postgres@localhost:5432/texera_lakekeeper \ + -e LAKEKEEPER__PG_DATABASE_URL_WRITE=postgres://postgres:postgres@localhost:5432/texera_lakekeeper \ + -e LAKEKEEPER__PG_ENCRYPTION_KEY=texera_key \ + vakamo/lakekeeper:v0.11.0 migrate + + docker run -d --name lakekeeper --network host \ + -e LAKEKEEPER__PG_DATABASE_URL_READ=postgres://postgres:postgres@localhost:5432/texera_lakekeeper \ + -e LAKEKEEPER__PG_DATABASE_URL_WRITE=postgres://postgres:postgres@localhost:5432/texera_lakekeeper \ + -e LAKEKEEPER__PG_ENCRYPTION_KEY=texera_key \ + -e LAKEKEEPER__METRICS_PORT=9091 \ + vakamo/lakekeeper:v0.11.0 serve + + for i in $(seq 1 30); do + docker exec lakekeeper /home/nonroot/lakekeeper healthcheck && break + echo "Waiting for Lakekeeper to be ready... (attempt $i)" + sleep 2 + done + + # Final check - fail with logs if Lakekeeper didn't start + docker exec lakekeeper /home/nonroot/lakekeeper healthcheck || { + echo "Lakekeeper failed to start. Container logs:" + docker logs lakekeeper + exit 1 + } + - name: Initialize Lakekeeper Warehouse + run: | + docker run --rm --network host --entrypoint sh minio/mc -c \ + "mc alias set minio http://localhost:9000 texera_minio password && \ + mc mb --ignore-existing minio/texera-iceberg" + + curl -sf -X POST -H 'Content-Type: application/json' \ + -d '{"project-id":"00000000-0000-0000-0000-000000000000","project-name":"default"}' \ + http://localhost:8181/management/v1/project || true + + curl -sf -X POST -H 'Content-Type: application/json' -d '{ + "warehouse-name": "texera", + "project-id": "00000000-0000-0000-0000-000000000000", + "storage-profile": { + "type": "s3", + "bucket": "texera-iceberg", + "region": "us-west-2", + "endpoint": "http://localhost:9000", + "flavor": "s3-compat", + "path-style-access": true, + "sts-enabled": false + }, + "storage-credential": { + "type": "s3", + "credential-type": "access-key", + "aws-access-key-id": "texera_minio", + "aws-secret-access-key": "password" + } + }' http://localhost:8181/management/v1/warehouse - name: Lint with Ruff run: | cd amber/src/main/python && ruff check . && ruff format --check . diff --git a/amber/requirements.txt b/amber/requirements.txt index 803ab682d5e..8cca5d201f8 100644 --- a/amber/requirements.txt +++ b/amber/requirements.txt @@ -43,7 +43,10 @@ bidict==0.22.0 cached_property==1.5.2 psutil==5.9.0 tzlocal==2.1 -pyiceberg==0.8.1 +pyiceberg==0.9.0 +s3fs==2025.9.0 +aiobotocore==2.25.1 +botocore==1.40.53 readerwriterlock==1.0.9 tenacity==8.5.0 SQLAlchemy==2.0.37 diff --git a/amber/src/main/python/core/storage/iceberg/iceberg_catalog_instance.py b/amber/src/main/python/core/storage/iceberg/iceberg_catalog_instance.py index b1478fadf03..0059808f9f8 100644 --- a/amber/src/main/python/core/storage/iceberg/iceberg_catalog_instance.py +++ b/amber/src/main/python/core/storage/iceberg/iceberg_catalog_instance.py @@ -18,14 +18,17 @@ from pyiceberg.catalog import Catalog from typing import Optional -from core.storage.iceberg.iceberg_utils import create_postgres_catalog +from core.storage.iceberg.iceberg_utils import ( + create_postgres_catalog, + create_rest_catalog, +) from core.storage.storage_config import StorageConfig class IcebergCatalogInstance: """ IcebergCatalogInstance is a singleton that manages the Iceberg catalog instance. - Currently only postgres SQL catalog is supported. + Supports postgres SQL catalog and REST catalog. - Provides a single shared catalog for all Iceberg table-related operations. - Lazily initializes the catalog on first access. - Supports replacing the catalog instance for testing or reconfiguration. @@ -39,16 +42,31 @@ def get_instance(cls): Retrieves the singleton Iceberg catalog instance. - If the catalog is not initialized, it is lazily created using the configured properties. + - Supports "postgres" and "rest" catalog types. :return: the Iceberg catalog instance. """ if cls._instance is None: - cls._instance = create_postgres_catalog( - "texera_iceberg", - StorageConfig.ICEBERG_FILE_STORAGE_DIRECTORY_PATH, - StorageConfig.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME, - StorageConfig.ICEBERG_POSTGRES_CATALOG_USERNAME, - StorageConfig.ICEBERG_POSTGRES_CATALOG_PASSWORD, - ) + catalog_type = StorageConfig.ICEBERG_CATALOG_TYPE + if catalog_type == "postgres": + cls._instance = create_postgres_catalog( + "texera_iceberg", + StorageConfig.ICEBERG_FILE_STORAGE_DIRECTORY_PATH, + StorageConfig.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME, + StorageConfig.ICEBERG_POSTGRES_CATALOG_USERNAME, + StorageConfig.ICEBERG_POSTGRES_CATALOG_PASSWORD, + ) + elif catalog_type == "rest": + cls._instance = create_rest_catalog( + "texera_iceberg", + StorageConfig.ICEBERG_REST_CATALOG_WAREHOUSE_NAME, + StorageConfig.ICEBERG_REST_CATALOG_URI, + StorageConfig.S3_ENDPOINT, + StorageConfig.S3_REGION, + StorageConfig.S3_AUTH_USERNAME, + StorageConfig.S3_AUTH_PASSWORD, + ) + else: + raise ValueError(f"Unsupported catalog type: {catalog_type}") return cls._instance @classmethod diff --git a/amber/src/main/python/core/storage/iceberg/iceberg_utils.py b/amber/src/main/python/core/storage/iceberg/iceberg_utils.py index 9e17b2e0e82..c1f9df2e403 100644 --- a/amber/src/main/python/core/storage/iceberg/iceberg_utils.py +++ b/amber/src/main/python/core/storage/iceberg/iceberg_utils.py @@ -17,7 +17,7 @@ import pyarrow as pa import pyiceberg.table -from pyiceberg.catalog import Catalog +from pyiceberg.catalog import Catalog, load_catalog from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.expressions import AlwaysTrue from pyiceberg.io.pyarrow import ArrowScan @@ -153,6 +153,44 @@ def create_postgres_catalog( ) +def create_rest_catalog( + catalog_name: str, + warehouse_name: str, + rest_uri: str, + s3_endpoint: str, + s3_region: str, + s3_username: str, + s3_password: str, +) -> Catalog: + """ + Creates a REST catalog instance by connecting to a REST endpoint. + - Configures the catalog to interact with a REST endpoint. + - The warehouse_name parameter specifies the warehouse identifier (name for Lakekeeper). + - Configures S3FileIO for MinIO/S3 storage backend. + :param catalog_name: the name of the catalog. + :param warehouse_name: the warehouse identifier (name for Lakekeeper). + :param rest_uri: the URI of the REST catalog endpoint. + :param s3_endpoint: the S3 endpoint URL. + :param s3_region: the S3 region. + :param s3_username: the S3 access key ID. + :param s3_password: the S3 secret access key. + :return: a Catalog instance (REST catalog). + """ + return load_catalog( + catalog_name, + **{ + "type": "rest", + "uri": rest_uri, + "warehouse": warehouse_name, + "s3.endpoint": s3_endpoint, + "s3.access-key-id": s3_username, + "s3.secret-access-key": s3_password, + "s3.region": s3_region, + "s3.path-style-access": "true", + }, + ) + + def create_table( catalog: Catalog, table_namespace: str, diff --git a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py index 34711beb652..3cc48da6bdf 100644 --- a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py +++ b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py @@ -37,16 +37,19 @@ # Hardcoded storage config only for test purposes. StorageConfig.initialize( + catalog_type="rest", postgres_uri_without_scheme="localhost:5432/texera_iceberg_catalog", postgres_username="texera", postgres_password="password", + rest_catalog_uri="http://localhost:8181/catalog/", + rest_catalog_warehouse_name="texera", table_result_namespace="operator-port-result", directory_path="../../../../../../amber/user-resources/workflow-results", commit_batch_size=4096, s3_endpoint="http://localhost:9000", - s3_region="us-east-1", - s3_auth_username="minioadmin", - s3_auth_password="minioadmin", + s3_region="us-west-2", + s3_auth_username="texera_minio", + s3_auth_password="password", ) diff --git a/amber/src/main/python/core/storage/storage_config.py b/amber/src/main/python/core/storage/storage_config.py index c55495ea14c..0e47bdb71ae 100644 --- a/amber/src/main/python/core/storage/storage_config.py +++ b/amber/src/main/python/core/storage/storage_config.py @@ -25,14 +25,17 @@ class StorageConfig: _initialized = False + ICEBERG_CATALOG_TYPE = None ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME = None ICEBERG_POSTGRES_CATALOG_USERNAME = None ICEBERG_POSTGRES_CATALOG_PASSWORD = None + ICEBERG_REST_CATALOG_URI = None + ICEBERG_REST_CATALOG_WAREHOUSE_NAME = None ICEBERG_TABLE_RESULT_NAMESPACE = None ICEBERG_FILE_STORAGE_DIRECTORY_PATH = None ICEBERG_TABLE_COMMIT_BATCH_SIZE = None - # S3 configs (for large_binary_manager module) + # S3 configs S3_ENDPOINT = None S3_REGION = None S3_AUTH_USERNAME = None @@ -41,9 +44,12 @@ class StorageConfig: @classmethod def initialize( cls, + catalog_type, postgres_uri_without_scheme, postgres_username, postgres_password, + rest_catalog_uri, + rest_catalog_warehouse_name, table_result_namespace, directory_path, commit_batch_size, @@ -57,9 +63,13 @@ def initialize( "Storage config has already been initialized and cannot be modified." ) + cls.ICEBERG_CATALOG_TYPE = catalog_type cls.ICEBERG_POSTGRES_CATALOG_URI_WITHOUT_SCHEME = postgres_uri_without_scheme cls.ICEBERG_POSTGRES_CATALOG_USERNAME = postgres_username cls.ICEBERG_POSTGRES_CATALOG_PASSWORD = postgres_password + cls.ICEBERG_REST_CATALOG_URI = rest_catalog_uri + cls.ICEBERG_REST_CATALOG_WAREHOUSE_NAME = rest_catalog_warehouse_name + cls.ICEBERG_TABLE_RESULT_NAMESPACE = table_result_namespace cls.ICEBERG_FILE_STORAGE_DIRECTORY_PATH = directory_path cls.ICEBERG_TABLE_COMMIT_BATCH_SIZE = int(commit_batch_size) diff --git a/amber/src/main/python/pytexera/storage/test_large_binary_manager.py b/amber/src/main/python/pytexera/storage/test_large_binary_manager.py index a657f244f38..82537457e69 100644 --- a/amber/src/main/python/pytexera/storage/test_large_binary_manager.py +++ b/amber/src/main/python/pytexera/storage/test_large_binary_manager.py @@ -27,16 +27,19 @@ def setup_storage_config(self): """Initialize StorageConfig for tests.""" if not StorageConfig._initialized: StorageConfig.initialize( + catalog_type="rest", postgres_uri_without_scheme="localhost:5432/test", postgres_username="test", postgres_password="test", + rest_catalog_uri="http://localhost:8181/catalog/", + rest_catalog_warehouse_name="texera", table_result_namespace="test", directory_path="/tmp/test", commit_batch_size=1000, s3_endpoint="http://localhost:9000", - s3_region="us-east-1", - s3_auth_username="minioadmin", - s3_auth_password="minioadmin", + s3_region="us-west-2", + s3_auth_username="texera_minio", + s3_auth_password="password", ) def test_get_s3_client_initializes_once(self): diff --git a/amber/src/main/python/texera_run_python_worker.py b/amber/src/main/python/texera_run_python_worker.py index 3ebf81c201f..8687298f819 100644 --- a/amber/src/main/python/texera_run_python_worker.py +++ b/amber/src/main/python/texera_run_python_worker.py @@ -45,9 +45,12 @@ def init_loguru_logger(stream_log_level) -> None: output_port, logger_level, r_path, + iceberg_catalog_type, iceberg_postgres_catalog_uri_without_scheme, iceberg_postgres_catalog_username, iceberg_postgres_catalog_password, + iceberg_rest_catalog_uri, + iceberg_rest_catalog_warehouse_name, iceberg_table_namespace, iceberg_file_storage_directory_path, iceberg_table_commit_batch_size, @@ -58,9 +61,12 @@ def init_loguru_logger(stream_log_level) -> None: ) = sys.argv init_loguru_logger(logger_level) StorageConfig.initialize( + iceberg_catalog_type, iceberg_postgres_catalog_uri_without_scheme, iceberg_postgres_catalog_username, iceberg_postgres_catalog_password, + iceberg_rest_catalog_uri, + iceberg_rest_catalog_warehouse_name, iceberg_table_namespace, iceberg_file_storage_directory_path, iceberg_table_commit_batch_size, diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala index 558b99c9b7b..d2bc5f50253 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala @@ -178,9 +178,12 @@ class PythonWorkflowWorker( Integer.toString(pythonProxyServer.getPortNumber.get()), UdfConfig.pythonLogStreamHandlerLevel, RENVPath, + StorageConfig.icebergCatalogType, StorageConfig.icebergPostgresCatalogUriWithoutScheme, StorageConfig.icebergPostgresCatalogUsername, StorageConfig.icebergPostgresCatalogPassword, + StorageConfig.icebergRESTCatalogUri, + StorageConfig.icebergRESTCatalogWarehouseName, StorageConfig.icebergTableResultNamespace, StorageConfig.fileStorageDirectoryPath.toString, StorageConfig.icebergTableCommitBatchSize.toString, diff --git a/bin/bootstrap-lakekeeper.sh b/bin/bootstrap-lakekeeper.sh new file mode 100755 index 00000000000..1d4adc659c2 --- /dev/null +++ b/bin/bootstrap-lakekeeper.sh @@ -0,0 +1,525 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Bootstrap script to start Lakekeeper and create warehouse (idempotent). +# This script does four things: +# 1. Starts Lakekeeper if it's not already running +# 2. Bootstraps the Lakekeeper server (creates default project, idempotent) +# 3. Checks if MinIO bucket exists (and creates it if needed) +# 4. Checks and creates the warehouse if it doesn't exist +# +# +# Usage: +# ./bin/bootstrap-lakekeeper.sh + +set -e + +# ============================================================================== +# User Configuration - Edit the values below before running this script +# ============================================================================== + +# Lakekeeper binary path +LAKEKEEPER_BINARY_PATH="" + +# Lakekeeper PostgreSQL connection URLs +#(LAKEKEEPER__PG_DATABASE_URL_READ="postgres://postgres_user:postgres_urlencoded_password@hostname:5432/texera_lakekeeper" +# LAKEKEEPER__PG_DATABASE_URL_WRITE="postgres://postgres_user:postgres_urlencoded_password@hostname:5432/texera_lakekeeper") +LAKEKEEPER__PG_DATABASE_URL_READ="" +LAKEKEEPER__PG_DATABASE_URL_WRITE="" + +# Lakekeeper encryption key +LAKEKEEPER__PG_ENCRYPTION_KEY="texera_key" + +# Lakekeeper metrics port +LAKEKEEPER__METRICS_PORT="9091" + +# ============================================================================== +# End of User Configuration +# ============================================================================== + +# Read remaining configuration from storage.conf +# Priority: user config above > storage.conf > default value + +# Find storage.conf path +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +if [ -n "$TEXERA_HOME" ]; then + STORAGE_CONF_PATH="$TEXERA_HOME/common/config/src/main/resources/storage.conf" +else + STORAGE_CONF_PATH="$SCRIPT_DIR/../common/config/src/main/resources/storage.conf" +fi + +# Extract values from storage.conf using pyhocon for proper HOCON parsing +if [ -f "$STORAGE_CONF_PATH" ]; then + # Check if pyhocon is available + if ! command -v python3 >/dev/null 2>&1; then + echo "✗ Error: python3 is required to parse storage.conf" + echo " Please install Python 3" + exit 1 + fi + + if ! python3 -c "import pyhocon" 2>/dev/null; then + echo "✗ Error: pyhocon is required to parse storage.conf" + echo " Install it with: pip install pyhocon" + exit 1 + fi + + # Use batch mode to parse all config values in a single python invocation + CONF_OUTPUT=$(python3 "$SCRIPT_DIR/parse-storage-config.py" --batch \ + REST_URI_FROM_CONF=storage.iceberg.catalog.rest.uri \ + WAREHOUSE_NAME_FROM_CONF=storage.iceberg.catalog.rest.warehouse-name \ + REST_REGION_FROM_CONF=storage.iceberg.catalog.rest.region \ + S3_BUCKET_FROM_CONF=storage.iceberg.catalog.rest.s3-bucket \ + S3_ENDPOINT_FROM_CONF=storage.s3.endpoint \ + S3_USERNAME_FROM_CONF=storage.s3.auth.username \ + S3_PASSWORD_FROM_CONF=storage.s3.auth.password \ + 2>/dev/null) || true + + # Parse the batch output (each line is VAR_NAME=value) + while IFS='=' read -r var_name var_value; do + [ -z "$var_name" ] && continue + declare "$var_name=$var_value" + done <<< "$CONF_OUTPUT" + + # Strip trailing /catalog/ from REST URI + REST_URI_FROM_CONF=$(echo "${REST_URI_FROM_CONF:-}" | sed 's|/catalog/*$||') + + echo "Configuration read from storage.conf:" + echo " REST_URI=$REST_URI_FROM_CONF" + echo " WAREHOUSE_NAME=$WAREHOUSE_NAME_FROM_CONF" + echo " REGION=$REST_REGION_FROM_CONF" + echo " S3_BUCKET=$S3_BUCKET_FROM_CONF" + echo " S3_ENDPOINT=$S3_ENDPOINT_FROM_CONF" + echo " S3_USERNAME=$S3_USERNAME_FROM_CONF" + echo " S3_PASSWORD=***" + echo "" +else + REST_URI_FROM_CONF="" + WAREHOUSE_NAME_FROM_CONF="" + REST_REGION_FROM_CONF="" + S3_BUCKET_FROM_CONF="" + S3_ENDPOINT_FROM_CONF="" + S3_USERNAME_FROM_CONF="" + S3_PASSWORD_FROM_CONF="" + echo "storage.conf not found, using environment variables or defaults" + echo "" +fi + +# Use values from storage.conf with defaults +LAKEKEEPER_BASE_URI="${REST_URI_FROM_CONF:-http://localhost:8181}" +WAREHOUSE_NAME="${WAREHOUSE_NAME_FROM_CONF:-texera}" +S3_REGION="${REST_REGION_FROM_CONF:-us-west-2}" +S3_BUCKET="${S3_BUCKET_FROM_CONF:-texera-iceberg}" +S3_ENDPOINT="${S3_ENDPOINT_FROM_CONF:-http://localhost:9000}" +S3_USERNAME="${S3_USERNAME_FROM_CONF:-texera_minio}" +S3_PASSWORD="${S3_PASSWORD_FROM_CONF:-password}" +STORAGE_PATH="s3://${S3_BUCKET}/iceberg/${WAREHOUSE_NAME}" + +echo "==========================================" +echo "Lakekeeper Bootstrap and Warehouse Setup" +echo "==========================================" +echo "Lakekeeper Base URI: $LAKEKEEPER_BASE_URI" +echo "Lakekeeper Binary: ${LAKEKEEPER_BINARY_PATH:-lakekeeper}" +echo "Warehouse Name: $WAREHOUSE_NAME" +echo "S3 Endpoint: $S3_ENDPOINT" +echo "S3 Bucket: $S3_BUCKET" +echo "Storage Path: $STORAGE_PATH" +echo "" + +# Function to check if Lakekeeper is running +check_lakekeeper_running() { + local health_url="${LAKEKEEPER_BASE_URI}/health" + if curl -s -f "$health_url" > /dev/null 2>&1; then + return 0 # Running + else + return 1 # Not running + fi +} + +# Function to bootstrap the Lakekeeper server (creates default project). +# This is idempotent - safe to call even if already bootstrapped. +# Returns: 0=success (or already bootstrapped), 1=failure +bootstrap_lakekeeper_server() { + local base_uri="$1" + local bootstrap_url="${base_uri}/management/v1/bootstrap" + + echo "Bootstrapping Lakekeeper server (creating default project)..." + echo " URL: $bootstrap_url" + + local temp_response + temp_response=$(mktemp) || { + echo "✗ Failed to create temporary file" + return 1 + } + + local http_code + http_code=$(curl -s -o "$temp_response" -w "%{http_code}" \ + -X POST \ + -H "Content-Type: application/json" \ + -d '{"accept-terms-of-use": true}' \ + "$bootstrap_url" 2>/dev/null || echo "000") + + echo " HTTP status: $http_code" + + case "$http_code" in + 000) + echo "✗ Failed to connect to Lakekeeper at $bootstrap_url" + rm -f "$temp_response" || true + return 1 + ;; + 2*) + echo "✓ Lakekeeper server bootstrapped successfully (HTTP $http_code)" + rm -f "$temp_response" || true + return 0 + ;; + *) + if grep -q "CatalogAlreadyBootstrapped" "$temp_response" 2>/dev/null; then + echo "✓ Lakekeeper server already bootstrapped (HTTP $http_code), continuing." + rm -f "$temp_response" || true + return 0 + fi + echo "✗ Failed to bootstrap Lakekeeper server (HTTP $http_code)" + echo " Response body:" + cat "$temp_response" | sed 's/^/ /' || true + rm -f "$temp_response" || true + return 1 + ;; + esac +} + +# Function to check if MinIO bucket exists (requires AWS CLI) +check_minio_bucket() { + local bucket_name="$1" + local endpoint="$2" + local username="$3" + local password="$4" + + if ! command -v aws >/dev/null 2>&1; then + echo "✗ Error: AWS CLI is required for MinIO bucket operations." + echo " Install it with: pip install awscli" + return 1 + fi + + if AWS_ACCESS_KEY_ID="$username" AWS_SECRET_ACCESS_KEY="$password" AWS_DEFAULT_REGION="us-west-2" \ + aws --endpoint-url="$endpoint" s3 ls "s3://${bucket_name}/" >/dev/null 2>&1; then + return 0 # Bucket exists + else + return 1 # Bucket doesn't exist or error + fi +} + +# Function to create MinIO bucket (requires AWS CLI) +create_minio_bucket() { + local bucket_name="$1" + local endpoint="$2" + local username="$3" + local password="$4" + + if ! command -v aws >/dev/null 2>&1; then + echo "✗ Error: AWS CLI is required for MinIO bucket operations." + echo " Install it with: pip install awscli" + return 1 + fi + + if AWS_ACCESS_KEY_ID="$username" AWS_SECRET_ACCESS_KEY="$password" AWS_DEFAULT_REGION="us-west-2" \ + aws --endpoint-url="$endpoint" s3 mb "s3://${bucket_name}" >/dev/null 2>&1; then + return 0 # Success + else + return 1 # Failed + fi +} + +# Function to start Lakekeeper +start_lakekeeper() { + export LAKEKEEPER__METRICS_PORT + export LAKEKEEPER__PG_ENCRYPTION_KEY + + echo "Starting Lakekeeper..." + + # Validate LAKEKEEPER_BINARY_PATH + if [ -z "$LAKEKEEPER_BINARY_PATH" ]; then + echo "✗ Error: LAKEKEEPER_BINARY_PATH is not set." + echo " Please set it in the User Configuration section at the top of this script." + exit 1 + fi + + if [ ! -x "$LAKEKEEPER_BINARY_PATH" ]; then + echo "✗ Error: Lakekeeper binary not found or not executable at '$LAKEKEEPER_BINARY_PATH'" + echo " Please update LAKEKEEPER_BINARY_PATH in the User Configuration section." + exit 1 + fi + + local binary_path="$LAKEKEEPER_BINARY_PATH" + + # Validate required database URLs + if [ -z "$LAKEKEEPER__PG_DATABASE_URL_READ" ] || [ -z "$LAKEKEEPER__PG_DATABASE_URL_WRITE" ]; then + echo "✗ Error: Database URLs not configured." + echo " Please set LAKEKEEPER__PG_DATABASE_URL_READ and LAKEKEEPER__PG_DATABASE_URL_WRITE" + echo " in the User Configuration section at the top of this script." + exit 1 + fi + export LAKEKEEPER__PG_DATABASE_URL_READ + export LAKEKEEPER__PG_DATABASE_URL_WRITE + + # Run migration first + echo "Running Lakekeeper migration..." + if ! "$binary_path" migrate; then + echo "✗ Failed to run Lakekeeper migration" + return 1 + fi + + # Start Lakekeeper in background + echo "Starting Lakekeeper server..." + nohup "$binary_path" serve > /tmp/lakekeeper.log 2>&1 & + local lakekeeper_pid=$! + echo "Lakekeeper started with PID: $lakekeeper_pid" + + # Wait for Lakekeeper to be ready + echo "Waiting for Lakekeeper to be ready..." + local max_attempts=30 + local attempt=1 + while [ $attempt -le $max_attempts ]; do + if check_lakekeeper_running; then + echo "✓ Lakekeeper is ready!" + return 0 + fi + if [ $attempt -eq $max_attempts ]; then + echo "✗ Lakekeeper did not become ready after $max_attempts attempts" + echo " Check logs at /tmp/lakekeeper.log" + return 1 + fi + echo " Waiting for Lakekeeper... ($attempt/$max_attempts)" + sleep 2 + attempt=$((attempt + 1)) + done +} + +# Function to check if warehouse exists +# Returns: 0=exists, 1=not found, 2=connection error +check_warehouse_exists() { + local warehouse_name="$1" + local base_uri="$2" + + local list_url="${base_uri}/management/v1/warehouse" + + echo "Checking if warehouse '$warehouse_name' exists..." + + local temp_response + temp_response=$(mktemp) || { + echo "✗ Failed to create temporary file" + return 2 + } + + local http_code + http_code=$(curl -s -o "$temp_response" -w "%{http_code}" "$list_url" 2>/dev/null || echo "000") + + if [ "$http_code" = "000" ]; then + rm -f "$temp_response" || true + echo "✗ Failed to connect to Lakekeeper at $list_url" + return 2 + fi + + if [ "$http_code" != "200" ]; then + echo "⚠ Warning: Unexpected HTTP status $http_code when listing warehouses" + cat "$temp_response" 2>/dev/null | sed 's/^/ /' || true + rm -f "$temp_response" || true + return 1 + fi + + # Check if warehouse name exists in the response + # Response format: {"warehouses":[{"name":"...",...},...]} + local found=1 + if command -v jq >/dev/null 2>&1; then + if jq -e ".warehouses[] | select(.name == \"$warehouse_name\")" "$temp_response" >/dev/null 2>&1; then + found=0 + fi + else + if grep -q "\"name\"[[:space:]]*:[[:space:]]*\"$warehouse_name\"" "$temp_response" 2>/dev/null; then + found=0 + fi + fi + + rm -f "$temp_response" 2>/dev/null || true + return $found +} + +# Function to create warehouse +# Args: warehouse_name base_uri s3_bucket s3_region s3_endpoint s3_username s3_password +# Returns: 0=success, 1=failure +create_warehouse() { + local warehouse_name="$1" + local base_uri="$2" + local bucket="$3" + local region="$4" + local endpoint="$5" + local username="$6" + local password="$7" + + local create_url="${base_uri}/management/v1/warehouse" + + local create_payload=$(cat </dev/null | sed 's/^/ /' || true + rm -f "$temp_response" || true + return 1 + ;; + esac +} + +# Step 1: Check if Lakekeeper is running, start if not +echo "Step 1: Checking Lakekeeper status..." +if check_lakekeeper_running; then + echo "✓ Lakekeeper is already running" +else + echo "Lakekeeper is not running, attempting to start..." + if start_lakekeeper; then + echo "✓ Lakekeeper started successfully" + else + echo "✗ Failed to start Lakekeeper" + exit 1 + fi +fi +echo "" + +# Step 2: Bootstrap the Lakekeeper server (creates default project) +echo "Step 2: Bootstrapping Lakekeeper server..." +if bootstrap_lakekeeper_server "$LAKEKEEPER_BASE_URI"; then + echo "✓ Lakekeeper server bootstrap completed" +else + echo "✗ Failed to bootstrap Lakekeeper server" + echo " Please check that Lakekeeper is running and accessible at $LAKEKEEPER_BASE_URI" + exit 1 +fi +echo "" + +# Step 3: Check and create MinIO bucket +echo "Step 3: Checking MinIO bucket..." +if check_minio_bucket "$S3_BUCKET" "$S3_ENDPOINT" "$S3_USERNAME" "$S3_PASSWORD"; then + echo "✓ MinIO bucket '$S3_BUCKET' already exists" +else + echo "MinIO bucket '$S3_BUCKET' does not exist, creating..." + if create_minio_bucket "$S3_BUCKET" "$S3_ENDPOINT" "$S3_USERNAME" "$S3_PASSWORD"; then + echo "✓ MinIO bucket '$S3_BUCKET' created successfully" + else + echo "✗ Failed to create MinIO bucket '$S3_BUCKET'" + echo " Please ensure MinIO is running and accessible at $S3_ENDPOINT" + exit 1 + fi +fi +echo "" + +# Step 4: Check and create warehouse +echo "Step 4: Checking and creating warehouse..." + +set +e # Temporarily disable exit on error to capture function return value +check_warehouse_exists "$WAREHOUSE_NAME" "$LAKEKEEPER_BASE_URI" +check_result=$? +set -e # Re-enable exit on error + +case $check_result in + 0) + echo "✓ Warehouse '$WAREHOUSE_NAME' already exists, skipping creation." + echo "" + echo "==========================================" + echo "✓ Bootstrap completed successfully!" + echo "==========================================" + exit 0 + ;; + 1) + echo "Warehouse '$WAREHOUSE_NAME' does not exist, will create..." + ;; + 2) + exit 1 + ;; + *) + echo "✗ Unexpected error (code: $check_result)" + exit 1 + ;; +esac + +# Create warehouse +if create_warehouse "$WAREHOUSE_NAME" "$LAKEKEEPER_BASE_URI" "$S3_BUCKET" "$S3_REGION" "$S3_ENDPOINT" "$S3_USERNAME" "$S3_PASSWORD"; then + echo "" + echo "==========================================" + echo "✓ Bootstrap completed successfully!" + echo "==========================================" + exit 0 +else + echo "" + echo "==========================================" + echo "✗ Bootstrap failed!" + echo "==========================================" + exit 1 +fi diff --git a/bin/k8s/Chart.yaml b/bin/k8s/Chart.yaml index 1e6dcfbef83..9f6122fc3fc 100644 --- a/bin/k8s/Chart.yaml +++ b/bin/k8s/Chart.yaml @@ -59,6 +59,11 @@ dependencies: repository: oci://docker.io/envoyproxy alias: envoy-gateway + - name: lakekeeper + version: 0.9.0 + repository: https://lakekeeper.github.io/lakekeeper-charts/ + condition: lakekeeper.enabled + - name: metrics-server version: 3.12.2 repository: https://kubernetes-sigs.github.io/metrics-server/ diff --git a/bin/k8s/files/texera_lakekeeper.sql b/bin/k8s/files/texera_lakekeeper.sql new file mode 120000 index 00000000000..6ddbed93822 --- /dev/null +++ b/bin/k8s/files/texera_lakekeeper.sql @@ -0,0 +1 @@ +../../../sql/texera_lakekeeper.sql \ No newline at end of file diff --git a/bin/k8s/templates/external-names.yaml b/bin/k8s/templates/external-names.yaml index 69540067b81..691c92e0b19 100644 --- a/bin/k8s/templates/external-names.yaml +++ b/bin/k8s/templates/external-names.yaml @@ -81,4 +81,10 @@ to access services in the main namespace using the same service names. "externalName" (printf "%s-minio.%s.svc.cluster.local" .Release.Name $namespace) ) | nindent 0 }} - +--- +{{/* Lakekeeper ExternalName - Add this block */}} +{{- include "external-name-service" (dict + "name" (printf "%s-lakekeeper" .Release.Name) + "namespace" $workflowComputingUnitPoolNamespace + "externalName" (printf "%s-lakekeeper.%s.svc.cluster.local" .Release.Name $namespace) +) | nindent 0 }} diff --git a/bin/k8s/templates/lakekeeper-init-job.yaml b/bin/k8s/templates/lakekeeper-init-job.yaml new file mode 100644 index 00000000000..0a3540b6055 --- /dev/null +++ b/bin/k8s/templates/lakekeeper-init-job.yaml @@ -0,0 +1,137 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +{{- if .Values.lakekeeperInit.enabled }} +apiVersion: batch/v1 +kind: Job +metadata: + name: {{ .Release.Name }}-lakekeeper-init + namespace: {{ .Release.Namespace }} +spec: + backoffLimit: 3 + template: + metadata: + name: {{ .Release.Name }}-lakekeeper-init + spec: + restartPolicy: Never + containers: + - name: lakekeeper-init + image: alpine:3.19 + env: + - name: STORAGE_S3_ENDPOINT + value: http://{{ .Release.Name }}-minio:9000 + - name: STORAGE_S3_AUTH_USERNAME + valueFrom: + secretKeyRef: + name: {{ .Release.Name }}-minio + key: root-user + - name: STORAGE_S3_AUTH_PASSWORD + valueFrom: + secretKeyRef: + name: {{ .Release.Name }}-minio + key: root-password + - name: STORAGE_ICEBERG_CATALOG_REST_S3_BUCKET + value: {{ .Values.lakekeeperInit.warehouse.s3Bucket | quote }} + - name: STORAGE_ICEBERG_CATALOG_REST_REGION + value: {{ .Values.lakekeeperInit.warehouse.region | quote }} + - name: STORAGE_ICEBERG_CATALOG_REST_WAREHOUSE_NAME + value: {{ .Values.lakekeeperInit.warehouse.name | quote }} + - name: LAKEKEEPER_BASE_URI + value: http://{{ .Release.Name }}-lakekeeper:{{ .Values.lakekeeper.catalog.service.externalPort }} + - name: LAKEKEEPER_PROJECT_ID + value: {{ .Values.lakekeeperInit.defaultProject.id | quote }} + - name: LAKEKEEPER_PROJECT_NAME + value: {{ .Values.lakekeeperInit.defaultProject.name | quote }} + command: + - /bin/sh + - -c + - | + set -e + + apk add --no-cache curl ca-certificates wget + wget -q https://dl.min.io/client/mc/release/linux-amd64/mc -O /usr/local/bin/mc + chmod +x /usr/local/bin/mc + + check_status() { + if [ "$1" -ge 200 ] && [ "$1" -lt 300 ]; then + echo "Created $2 successfully (HTTP $1)." + elif [ "$1" -eq 409 ]; then + echo "$2 already exists (HTTP 409). Treating as success." + else + echo "Failed to create $2. HTTP Code: $1" + echo "ERROR RESPONSE:" + if [ -f /tmp/response.txt ]; then cat /tmp/response.txt; fi + echo "" + exit 1 + fi + } + + echo "Waiting for Lakekeeper health endpoint..." + until curl -s -f "${LAKEKEEPER_BASE_URI}/health" > /dev/null 2>&1; do + sleep 3 + done + + echo "Step 1: Initializing MinIO bucket '${STORAGE_ICEBERG_CATALOG_REST_S3_BUCKET}'..." + mc alias set minio "${STORAGE_S3_ENDPOINT}" "${STORAGE_S3_AUTH_USERNAME}" "${STORAGE_S3_AUTH_PASSWORD}" || true + if mc ls minio/${STORAGE_ICEBERG_CATALOG_REST_S3_BUCKET} > /dev/null 2>&1; then + echo "MinIO bucket '${STORAGE_ICEBERG_CATALOG_REST_S3_BUCKET}' already exists." + else + mc mb minio/${STORAGE_ICEBERG_CATALOG_REST_S3_BUCKET} + echo "MinIO bucket '${STORAGE_ICEBERG_CATALOG_REST_S3_BUCKET}' created successfully." + fi + + echo "Step 2: Initializing default project..." + PROJECT_PAYLOAD="{\"project-id\":\"${LAKEKEEPER_PROJECT_ID}\",\"project-name\":\"${LAKEKEEPER_PROJECT_NAME}\"}" + PROJECT_CODE=$(curl -s -o /tmp/response.txt -w "%{http_code}" \ + -X POST \ + -H "Content-Type: application/json" \ + -d "${PROJECT_PAYLOAD}" \ + "${LAKEKEEPER_BASE_URI}/management/v1/project" || echo "000") + check_status "${PROJECT_CODE}" "Default Project" + + echo "Step 3: Initializing warehouse '${STORAGE_ICEBERG_CATALOG_REST_WAREHOUSE_NAME}'..." + CREATE_PAYLOAD=$(cat < /tmp/texera_lakekeeper.sql +{{ .Files.Get "files/texera_lakekeeper.sql" | indent 6 }} + EOF + psql -U postgres -f /tmp/texera_lakekeeper.sql + echo "Initializing Texera database..." cat <<'EOF' > /tmp/texera_ddl.sql {{ .Files.Get "files/texera_ddl.sql" | indent 6 }} diff --git a/bin/k8s/templates/webserver-deployment.yaml b/bin/k8s/templates/webserver-deployment.yaml index 56642c54785..983c6269947 100644 --- a/bin/k8s/templates/webserver-deployment.yaml +++ b/bin/k8s/templates/webserver-deployment.yaml @@ -60,6 +60,17 @@ spec: secretKeyRef: name: {{ .Release.Name }}-lakefs-secret key: secret_key + # Workflow Result (Lakekeeper REST catalog) + - name: STORAGE_ICEBERG_CATALOG_TYPE + value: rest + - name: STORAGE_ICEBERG_CATALOG_REST_URI + value: http://{{ .Release.Name }}-lakekeeper:{{ .Values.lakekeeper.catalog.service.externalPort }}/catalog + - name: STORAGE_ICEBERG_CATALOG_REST_WAREHOUSE_NAME + value: {{ .Values.lakekeeperInit.warehouse.name | quote }} + - name: STORAGE_ICEBERG_CATALOG_REST_REGION + value: {{ .Values.lakekeeperInit.warehouse.region | quote }} + - name: STORAGE_ICEBERG_CATALOG_REST_S3_BUCKET + value: {{ .Values.lakekeeperInit.warehouse.s3Bucket | quote }} {{- range .Values.texeraEnvVars }} - name: {{ .name }} value: "{{ .value }}" diff --git a/bin/k8s/templates/workflow-computing-unit-manager-deployment.yaml b/bin/k8s/templates/workflow-computing-unit-manager-deployment.yaml index 5241d9160a3..7a0185cd465 100644 --- a/bin/k8s/templates/workflow-computing-unit-manager-deployment.yaml +++ b/bin/k8s/templates/workflow-computing-unit-manager-deployment.yaml @@ -33,6 +33,25 @@ spec: app: {{ .Release.Name }}-{{ .Values.workflowComputingUnitManager.name }} spec: serviceAccountName: {{ .Values.workflowComputingUnitManager.serviceAccountName }} + initContainers: + - name: wait-lakekeeper + image: curlimages/curl:latest + command: + - /bin/sh + - -c + - | + set -e + LAKEKEEPER_BASE_URI="http://{{ .Release.Name }}-lakekeeper:{{ .Values.lakekeeper.catalog.service.externalPort }}" + WAREHOUSE_NAME="{{ .Values.lakekeeperInit.warehouse.name }}" + echo "Waiting for Lakekeeper to become healthy..." + until curl -s -f "${LAKEKEEPER_BASE_URI}/health" > /dev/null 2>&1; do + sleep 1 + done + echo "Waiting for warehouse '${WAREHOUSE_NAME}' to exist..." + until curl -s "${LAKEKEEPER_BASE_URI}/management/v1/warehouse" | grep -q "\"name\"[[:space:]]*:[[:space:]]*\"${WAREHOUSE_NAME}\""; do + sleep 1 + done + echo "Lakekeeper warehouse is ready." containers: - name: {{ .Values.workflowComputingUnitManager.name }} image: {{ .Values.texera.imageRegistry }}/{{ .Values.workflowComputingUnitManager.imageName }}:{{ .Values.texera.imageTag }} @@ -88,16 +107,15 @@ spec: key: secret_key # Workflow Result - name: STORAGE_ICEBERG_CATALOG_TYPE - value: postgres - - name: STORAGE_ICEBERG_CATALOG_POSTGRES_URI_WITHOUT_SCHEME - value: {{ .Release.Name }}-postgresql:5432/texera_iceberg_catalog - - name: STORAGE_ICEBERG_CATALOG_POSTGRES_USERNAME - value: postgres - - name: STORAGE_ICEBERG_CATALOG_POSTGRES_PASSWORD - valueFrom: - secretKeyRef: - name: {{ .Release.Name }}-postgresql - key: postgres-password + value: rest + - name: STORAGE_ICEBERG_CATALOG_REST_URI + value: http://{{ .Release.Name }}-lakekeeper:{{ .Values.lakekeeper.catalog.service.externalPort }}/catalog + - name: STORAGE_ICEBERG_CATALOG_REST_WAREHOUSE_NAME + value: {{ .Values.lakekeeperInit.warehouse.name | quote }} + - name: STORAGE_ICEBERG_CATALOG_REST_REGION + value: {{ .Values.lakekeeperInit.warehouse.region | quote }} + - name: STORAGE_ICEBERG_CATALOG_REST_S3_BUCKET + value: {{ .Values.lakekeeperInit.warehouse.s3Bucket | quote }} {{- range .Values.texeraEnvVars }} - name: {{ .name }} value: "{{ .value }}" diff --git a/bin/k8s/values.yaml b/bin/k8s/values.yaml index 7558591c4dd..ef9a89f9b55 100644 --- a/bin/k8s/values.yaml +++ b/bin/k8s/values.yaml @@ -113,6 +113,38 @@ lakefs: access_key_id: texera_minio secret_access_key: password +lakekeeper: + enabled: true + postgresql: + enabled: false + internalOpenFGA: false + catalog: + replicas: 1 + image: + repository: vakamo/lakekeeper + tag: v0.11.0 + pullPolicy: IfNotPresent + service: + externalPort: 8181 + externalDatabase: + type: postgres + host_read: texera-postgresql + host_write: texera-postgresql + port: 5432 + database: texera_lakekeeper + user: postgres + password: root_password + +lakekeeperInit: + enabled: true + defaultProject: + id: "00000000-0000-0000-0000-000000000000" + name: default + warehouse: + name: texera + region: us-west-2 + s3Bucket: texera-iceberg + # Part2: configurations of Texera-related micro services texeraImages: pullPolicy: Always diff --git a/bin/parse-storage-config.py b/bin/parse-storage-config.py new file mode 100755 index 00000000000..262dba45add --- /dev/null +++ b/bin/parse-storage-config.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Parse storage.conf HOCON file with environment variable resolution. +This script properly handles HOCON syntax including environment variable substitution. + +Usage: + # Single key mode (backward compatible): + python3 bin/parse-storage-config.py storage.iceberg.catalog.rest.uri + + # Batch mode (outputs VAR_NAME=value lines): + python3 bin/parse-storage-config.py --batch VAR1=key.path1 VAR2=key.path2 ... + +Examples: + python3 bin/parse-storage-config.py storage.s3.endpoint + python3 bin/parse-storage-config.py --batch \ + REST_URI=storage.iceberg.catalog.rest.uri \ + WAREHOUSE_NAME=storage.iceberg.catalog.rest.warehouse-name \ + S3_ENDPOINT=storage.s3.endpoint +""" + +import os +import sys +from pathlib import Path + +try: + from pyhocon import ConfigFactory +except ImportError: + print("Error: pyhocon is not installed. Install it with: pip install pyhocon", file=sys.stderr) + sys.exit(1) + + +def find_storage_conf(): + """Find storage.conf path.""" + texera_home = os.environ.get("TEXERA_HOME") + if texera_home: + conf_path = Path(texera_home) / "common" / "config" / "src" / "main" / "resources" / "storage.conf" + else: + script_dir = Path(__file__).parent + conf_path = script_dir.parent / "common" / "config" / "src" / "main" / "resources" / "storage.conf" + + if not conf_path.exists(): + print(f"Error: storage.conf not found at {conf_path}", file=sys.stderr) + sys.exit(1) + + return conf_path + + +def parse_storage_config(): + """Parse storage.conf with environment variable resolution.""" + conf_path = find_storage_conf() + config = ConfigFactory.parse_file(str(conf_path)) + return config + + +def get_value(config, key_path): + """Get value from config by dot-separated key path.""" + try: + return config.get_string(key_path) + except Exception: + return None + + +def main(): + if len(sys.argv) < 2: + config = parse_storage_config() + print(config.get("storage", {})) + return + + if sys.argv[1] == "--batch": + config = parse_storage_config() + for arg in sys.argv[2:]: + if "=" not in arg: + print(f"Error: batch argument must be VAR_NAME=key.path, got '{arg}'", file=sys.stderr) + sys.exit(1) + var_name, key_path = arg.split("=", 1) + value = get_value(config, key_path) + if value is None: + value = "" + print(f"{var_name}={value}") + else: + key_path = sys.argv[1] + config = parse_storage_config() + value = get_value(config, key_path) + if value is None: + print(f"Key '{key_path}' not found", file=sys.stderr) + sys.exit(1) + print(value) + + +if __name__ == "__main__": + main() diff --git a/bin/single-node/.env b/bin/single-node/.env index 2c949de0fbe..094ce25416a 100644 --- a/bin/single-node/.env +++ b/bin/single-node/.env @@ -58,6 +58,18 @@ STORAGE_ICEBERG_CATALOG_POSTGRES_URI_WITHOUT_SCHEME=texera-postgres:5432/texera_ STORAGE_ICEBERG_CATALOG_POSTGRES_USERNAME=texera STORAGE_ICEBERG_CATALOG_POSTGRES_PASSWORD=password +LAKEKEEPER__PG_DATABASE_URL_READ=postgres://texera:password@texera-postgres:5432/texera_lakekeeper +LAKEKEEPER__PG_DATABASE_URL_WRITE=postgres://texera:password@texera-postgres:5432/texera_lakekeeper +LAKEKEEPER__PG_ENCRYPTION_KEY=texera_key +LAKEKEEPER_BASE_URI=http://texera-lakekeeper:8181 +STORAGE_ICEBERG_CATALOG_REST_WAREHOUSE_NAME=texera +STORAGE_ICEBERG_CATALOG_REST_S3_BUCKET=texera-iceberg +STORAGE_S3_AUTH_USERNAME=texera_minio +STORAGE_S3_AUTH_PASSWORD=password +STORAGE_ICEBERG_CATALOG_REST_REGION=us-west-2 + +STORAGE_ICEBERG_CATALOG_REST_URI=http://texera-lakekeeper:8181/catalog +STORAGE_ICEBERG_CATALOG_TYPE=rest # Admin credentials for Texera (used for login and example data loading) USER_SYS_ADMIN_USERNAME=texera -USER_SYS_ADMIN_PASSWORD=texera \ No newline at end of file +USER_SYS_ADMIN_PASSWORD=texera diff --git a/bin/single-node/docker-compose.yml b/bin/single-node/docker-compose.yml index 8c63d0d3652..0fb5ded49ed 100644 --- a/bin/single-node/docker-compose.yml +++ b/bin/single-node/docker-compose.yml @@ -75,6 +75,142 @@ services: timeout: 5s retries: 10 + # Lakekeeper migration init container + # This runs once to migrate the database before the lakekeeper server starts + lakekeeper-migrate: + image: vakamo/lakekeeper:v0.11.0 + container_name: texera-lakekeeper-migrate + depends_on: + postgres: + condition: service_healthy + env_file: + - .env + restart: "no" + entrypoint: ["/home/nonroot/lakekeeper"] + command: ["migrate"] + + # Lakekeeper is the Iceberg REST catalog service + lakekeeper: + image: vakamo/lakekeeper:v0.11.0 + container_name: texera-lakekeeper + restart: always + depends_on: + postgres: + condition: service_healthy + minio: + condition: service_started + lakekeeper-migrate: + condition: service_completed_successfully + env_file: + - .env + entrypoint: ["/home/nonroot/lakekeeper"] + command: ["serve"] + ports: + - "8181:8181" + healthcheck: + test: ["CMD", "/home/nonroot/lakekeeper", "healthcheck"] + interval: 10s + timeout: 5s + retries: 10 + start_period: 10s + + lakekeeper-init: + image: alpine:3.19 + container_name: texera-lakekeeper-init + depends_on: + lakekeeper: + condition: service_healthy + minio: + condition: service_started + env_file: + - .env + restart: "no" + entrypoint: [ "/bin/sh", "-c" ] + command: + - | + set -e + + echo "Installing dependencies..." + apk add --no-cache curl ca-certificates + + echo "Installing MinIO Client..." + wget -q https://dl.min.io/client/mc/release/linux-amd64/mc -O /usr/local/bin/mc + chmod +x /usr/local/bin/mc + + check_status() { + if [ "$$1" -ge 200 ] && [ "$$1" -lt 300 ]; then + echo "Created $$2 successfully (HTTP $$1)." + elif [ "$$1" -eq 409 ]; then + echo "$$2 already exists (HTTP 409). Treating as success." + else + echo "Failed to create $$2. HTTP Code: $$1" + echo "ERROR RESPONSE:" + if [ -f /tmp/response.txt ]; then cat /tmp/response.txt; fi + echo "" + exit 1 + fi + } + + echo "Step 1: Initializing MinIO bucket '$$STORAGE_ICEBERG_CATALOG_REST_S3_BUCKET'..." + mc alias set minio "$$STORAGE_S3_ENDPOINT" "$$STORAGE_S3_AUTH_USERNAME" "$$STORAGE_S3_AUTH_PASSWORD" || true + if mc ls minio/$$STORAGE_ICEBERG_CATALOG_REST_S3_BUCKET > /dev/null 2>&1; then + echo "MinIO bucket '$$STORAGE_ICEBERG_CATALOG_REST_S3_BUCKET' already exists." + else + mc mb minio/$$STORAGE_ICEBERG_CATALOG_REST_S3_BUCKET || { + echo "Failed to create MinIO bucket '$$STORAGE_ICEBERG_CATALOG_REST_S3_BUCKET'" + exit 1 + } + echo "MinIO bucket '$$STORAGE_ICEBERG_CATALOG_REST_S3_BUCKET' created successfully." + fi + + + echo "Step 2: Initializing Default Project..." + PROJECT_PAYLOAD='{"project-id": "00000000-0000-0000-0000-000000000000", "project-name": "default"}' + + PROJECT_CODE=$$(curl -s -o /tmp/response.txt -w "%{http_code}" \ + -X POST \ + -H "Content-Type: application/json" \ + -d "$$PROJECT_PAYLOAD" \ + "$$LAKEKEEPER_BASE_URI/management/v1/project" || echo "000") + + check_status "$$PROJECT_CODE" "Default Project" + + + echo "Step 3: Initializing Warehouse '$$STORAGE_ICEBERG_CATALOG_REST_WAREHOUSE_NAME'..." + CREATE_PAYLOAD=$$(cat < IcebergUtil.createRestCatalog( "texera_iceberg", - StorageConfig.fileStorageDirectoryPath + StorageConfig.icebergRESTCatalogWarehouseName ) case "postgres" => IcebergUtil.createPostgresCatalog( diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala index 549cb4b9d17..dd2e40bc30d 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala @@ -107,10 +107,19 @@ private[storage] class IcebergTableWriter[T]( private def flushBuffer(): Unit = { if (buffer.nonEmpty) { // Create a unique file path using the writer's identifier and the filename index - val filepath = Paths.get(table.location()).resolve(s"${writerIdentifier}_${filenameIdx}") + // Handle S3 URIs (s3://) differently from local file paths to preserve URI format + val location = table.location() + val filepathString = if (location.startsWith("s3://")) { + // For S3 URIs, append path component directly as string to preserve s3:// format + val basePath = if (location.endsWith("/")) location else s"$location/" + s"$basePath${writerIdentifier}_${filenameIdx}" + } else { + // For local file paths, use Paths.get() for proper path resolution + Paths.get(location).resolve(s"${writerIdentifier}_${filenameIdx}").toString + } // Increment the filename index by 1 filenameIdx += 1 - val outputFile: OutputFile = table.io().newOutputFile(filepath.toString) + val outputFile: OutputFile = table.io().newOutputFile(filepathString) // Create a Parquet data writer to write a new file val dataWriter: DataWriter[Record] = Parquet .writeData(outputFile) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala index ad6ac07c1ff..39f010ef3fb 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala @@ -22,9 +22,10 @@ package org.apache.texera.amber.util import org.apache.texera.amber.config.StorageConfig import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, LargeBinary, Schema, Tuple} import org.apache.hadoop.conf.Configuration -import org.apache.iceberg.catalog.{Catalog, TableIdentifier} +import org.apache.iceberg.catalog.{Catalog, SupportsNamespaces, TableIdentifier} import org.apache.iceberg.data.parquet.GenericParquetReaders import org.apache.iceberg.data.{GenericRecord, Record} +import org.apache.iceberg.aws.s3.S3FileIO import org.apache.iceberg.hadoop.{HadoopCatalog, HadoopFileIO} import org.apache.iceberg.io.{CloseableIterable, InputFile} import org.apache.iceberg.jdbc.JdbcCatalog @@ -40,6 +41,8 @@ import org.apache.iceberg.{ TableProperties, Schema => IcebergSchema } +import org.apache.iceberg.catalog.Namespace +import org.apache.iceberg.exceptions.AlreadyExistsException import java.nio.ByteBuffer import java.nio.file.Path @@ -96,22 +99,32 @@ object IcebergUtil { * TODO: Add authentication support, such as OAuth2, using `OAuth2Properties`. * * @param catalogName the name of the catalog. - * @param warehouse the root path for the warehouse where the tables are stored. + * @param warehouse the warehouse identifier (for Lakekeeper). * @return the initialized RESTCatalog instance. */ def createRestCatalog( catalogName: String, - warehouse: Path + warehouse: String ): RESTCatalog = { val catalog = new RESTCatalog() - catalog.initialize( - catalogName, - Map( - "warehouse" -> warehouse.toString, - CatalogProperties.URI -> StorageConfig.icebergRESTCatalogUri, - CatalogProperties.FILE_IO_IMPL -> classOf[HadoopFileIO].getName - ).asJava + + // Build base properties map + var properties = Map( + "warehouse" -> warehouse, + CatalogProperties.URI -> StorageConfig.icebergRESTCatalogUri + ) + + properties = properties ++ Map( + CatalogProperties.FILE_IO_IMPL -> classOf[S3FileIO].getName, + // S3FileIO configuration for MinIO + "s3.endpoint" -> StorageConfig.s3Endpoint, + "s3.access-key-id" -> StorageConfig.s3Username, + "s3.secret-access-key" -> StorageConfig.s3Password, + "s3.region" -> StorageConfig.s3Region, + "s3.path-style-access" -> "true" ) + + catalog.initialize(catalogName, properties.asJava) catalog } @@ -165,6 +178,20 @@ object IcebergUtil { TableProperties.COMMIT_MIN_RETRY_WAIT_MS -> StorageConfig.icebergTableCommitMinRetryWaitMs.toString ) + val namespace = Namespace.of(tableNamespace) + + catalog match { + case nsCatalog: SupportsNamespaces => + try nsCatalog.createNamespace(namespace, Map.empty[String, String].asJava) + catch { + case _: AlreadyExistsException => () + } + case _ => + throw new IllegalArgumentException( + s"Catalog ${catalog.getClass.getName} does not support namespaces" + ) + } + val identifier = TableIdentifier.of(tableNamespace, tableName) if (catalog.tableExists(identifier) && overrideIfExists) { catalog.dropTable(identifier) diff --git a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala index 1249d067835..9b214b9755c 100644 --- a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala +++ b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala @@ -69,6 +69,8 @@ object ComputingUnitManagingResource { private lazy val computingUnitEnvironmentVariables: Map[String, Any] = Map( // Variables for saving results to Iceberg EnvironmentalVariable.ENV_ICEBERG_CATALOG_TYPE -> StorageConfig.icebergCatalogType, + EnvironmentalVariable.ENV_ICEBERG_CATALOG_REST_URI -> StorageConfig.icebergRESTCatalogUri, + EnvironmentalVariable.ENV_ICEBERG_CATALOG_REST_WAREHOUSE_NAME -> StorageConfig.icebergRESTCatalogWarehouseName, EnvironmentalVariable.ENV_ICEBERG_CATALOG_POSTGRES_URI_WITHOUT_SCHEME -> StorageConfig.icebergPostgresCatalogUriWithoutScheme, EnvironmentalVariable.ENV_ICEBERG_CATALOG_POSTGRES_USERNAME -> StorageConfig.icebergPostgresCatalogUsername, EnvironmentalVariable.ENV_ICEBERG_CATALOG_POSTGRES_PASSWORD -> StorageConfig.icebergPostgresCatalogPassword, diff --git a/sql/texera_lakekeeper.sql b/sql/texera_lakekeeper.sql new file mode 100644 index 00000000000..afdca6946cc --- /dev/null +++ b/sql/texera_lakekeeper.sql @@ -0,0 +1,21 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +\c postgres + +DROP DATABASE IF EXISTS texera_lakekeeper; +CREATE DATABASE texera_lakekeeper;