From ce53bfd773093d585fcbbf88ba139523d1c1d8e4 Mon Sep 17 00:00:00 2001 From: Jose Luis Franco Arza Date: Tue, 10 Mar 2026 10:47:32 +0100 Subject: [PATCH] Add collection export commands (create, get, cancel). Implements CLI support for exporting collections to external storage backends (S3, GCS, Azure, filesystem) in Parquet format. Includes unit tests, integration tests, and skill documentation updates. Closes #158 Co-Authored-By: Claude Opus 4.6 --- .../references/architecture.md | 2 + .../skills/operating-weaviate-cli/SKILL.md | 33 +- .../references/exports.md | 59 +++ requirements-dev.txt | 2 +- setup.cfg | 2 +- test/integration/test_export_integration.py | 169 ++++++++ .../test_managers/test_export_manager.py | 378 ++++++++++++++++++ weaviate_cli/commands/cancel.py | 60 ++- weaviate_cli/commands/create.py | 87 ++++ weaviate_cli/commands/get.py | 58 +++ weaviate_cli/defaults.py | 28 ++ weaviate_cli/managers/export_manager.py | 204 ++++++++++ 12 files changed, 1076 insertions(+), 6 deletions(-) create mode 100644 .claude/skills/operating-weaviate-cli/references/exports.md create mode 100644 test/integration/test_export_integration.py create mode 100644 test/unittests/test_managers/test_export_manager.py create mode 100644 weaviate_cli/managers/export_manager.py diff --git a/.claude/skills/contributing-to-weaviate-cli/references/architecture.md b/.claude/skills/contributing-to-weaviate-cli/references/architecture.md index 3bde4fc..edd3430 100644 --- a/.claude/skills/contributing-to-weaviate-cli/references/architecture.md +++ b/.claude/skills/contributing-to-weaviate-cli/references/architecture.md @@ -81,6 +81,8 @@ class CollectionManager: self.client.collections.create(name=collection, ...) ``` +Manager files: `collection_manager.py`, `tenant_manager.py`, `data_manager.py`, `backup_manager.py`, `export_manager.py`, `role_manager.py`, `user_manager.py`, `node_manager.py`, `shard_manager.py`, `cluster_manager.py`, `alias_manager.py`, `benchmark_manager.py`, `config_manager.py` + Managers handle: - Input validation and error messages - Weaviate client API calls diff --git a/.claude/skills/operating-weaviate-cli/SKILL.md b/.claude/skills/operating-weaviate-cli/SKILL.md index 569097a..43806b3 100644 --- a/.claude/skills/operating-weaviate-cli/SKILL.md +++ b/.claude/skills/operating-weaviate-cli/SKILL.md @@ -113,13 +113,13 @@ weaviate-cli [--config-file FILE] [--user USER] [--json] [opti | Group | Description | |-------|-------------| -| `create` | Create collections, tenants, data, backups, roles, users, aliases, replications | -| `get` | Inspect collections, tenants, shards, backups, roles, users, nodes, aliases, replications | +| `create` | Create collections, tenants, data, backups, exports, roles, users, aliases, replications | +| `get` | Inspect collections, tenants, shards, backups, exports, roles, users, nodes, aliases, replications | | `update` | Update collections, tenants, shards, data, users, aliases | | `delete` | Delete collections, tenants, data, roles, users, aliases, replications | | `query` | Query data (fetch/vector/keyword/hybrid/uuid), replications, sharding state | | `restore` | Restore backups | -| `cancel` | Cancel backups and replications | +| `cancel` | Cancel backups, exports, and replications | | `assign` | Assign roles to users, permissions to roles | | `revoke` | Revoke roles from users, permissions from roles | | `benchmark` | Run QPS benchmarks | @@ -219,6 +219,25 @@ Backends: `s3`, `gcs`, `filesystem`. Options: `--include`, `--exclude`, `--wait` See [references/backups.md](references/backups.md). +### Collection Export + +```bash +weaviate-cli create export-collection --export_id my-export --backend s3 --file_format parquet --wait --json +weaviate-cli create export-collection --export_id my-export --backend s3 --include "Movies,Books" --json +weaviate-cli create export-collection --export_id my-export --backend s3 --exclude "TempData" --json +weaviate-cli create export-collection --export_id my-export --backend s3 --bucket my-bucket --path /exports --json +weaviate-cli get export-collection --export_id my-export --backend s3 --json +weaviate-cli cancel export-collection --export_id my-export --backend s3 --json +``` + +Backends: `filesystem`, `s3`, `gcs`, `azure`. File formats: `parquet`. + +Options: `--include`, `--exclude` (mutually exclusive), `--wait`, `--bucket`, `--path` + +**Prerequisite**: The export backend must be configured on the Weaviate cluster (e.g., `ENABLE_BACKUP=true` for S3 via MinIO in local-k8s). + +See [references/exports.md](references/exports.md). + ### RBAC (Roles, Users, Permissions) ```bash @@ -362,6 +381,13 @@ hot/active <--> cold/inactive 5. For timestamp-based TTL on existing collections: `--inverted_index timestamp` must be set at creation or already enabled 6. For property-based TTL: the date property must exist, be `date` type, and have filterable or rangeable index +### Collection Export Workflow +1. `create export-collection --backend s3 --export_id my-export --wait` -- create and wait for completion +2. `get export-collection --backend s3 --export_id my-export` -- check status (includes shard-level progress) +3. `cancel export-collection --backend s3 --export_id my-export` -- cancel in-progress export + +**Prerequisite**: The export backend must be configured on the cluster. For local-k8s, deploy with `ENABLE_BACKUP=true` to enable S3 via MinIO. + ### Alias Workflow 1. `create collection --collection Movies_v1` -- create the target collection 2. `create alias Movies Movies_v1` -- create alias pointing to collection @@ -416,6 +442,7 @@ When new commands or options are added to `weaviate-cli`: - [references/search.md](references/search.md) -- Search types, options, and selection guide - [references/tenants.md](references/tenants.md) -- Tenant state machine and management - [references/backups.md](references/backups.md) -- Backup/restore options and notes +- [references/exports.md](references/exports.md) -- Collection export options and notes - [references/rbac.md](references/rbac.md) -- Permission format, actions, and examples - [references/cluster.md](references/cluster.md) -- Nodes, shards, replication operations - [references/benchmark.md](references/benchmark.md) -- Benchmark options and output modes diff --git a/.claude/skills/operating-weaviate-cli/references/exports.md b/.claude/skills/operating-weaviate-cli/references/exports.md new file mode 100644 index 0000000..37985e7 --- /dev/null +++ b/.claude/skills/operating-weaviate-cli/references/exports.md @@ -0,0 +1,59 @@ +# Collection Export Reference + +Export collections from Weaviate to external storage backends in Parquet format. + +## Create Export +```bash +weaviate-cli create export-collection --export_id my-export --backend s3 --file_format parquet --wait --json +weaviate-cli create export-collection --export_id my-export --backend s3 --include "Movies,Books" --json +weaviate-cli create export-collection --export_id my-export --backend gcs --exclude "TempData" --json +weaviate-cli create export-collection --export_id my-export --backend s3 --bucket my-bucket --path /exports --wait --json +``` + +## Check Export Status +```bash +weaviate-cli get export-collection --export_id my-export --backend s3 --json +``` + +Returns shard-level progress including objects exported per shard, errors, and timing. + +## Cancel Export +```bash +weaviate-cli cancel export-collection --export_id my-export --backend s3 --json +``` + +Only works while the export is in progress. Returns an error if the export has already completed. + +## Options + +**Create:** +- `--export_id` -- Export identifier (default: "test-export") +- `--backend` -- filesystem, s3, gcs, azure (default: filesystem) +- `--file_format` -- Export format: parquet (default: parquet) +- `--include` -- Comma-separated collections to include +- `--exclude` -- Comma-separated collections to exclude +- `--wait` -- Wait for completion +- `--bucket` -- Bucket name for cloud storage backends +- `--path` -- Path within the storage backend + +**Get Status:** +- `--export_id`, `--backend` -- Same as create +- `--bucket`, `--path` -- Optional, for locating the export + +**Cancel:** +- `--export_id`, `--backend` -- Same as create +- `--bucket`, `--path` -- Optional, for locating the export + +## Prerequisites + +1. The export backend must be configured on the Weaviate cluster +2. For local-k8s, deploy with `ENABLE_BACKUP=true` to enable S3 via MinIO +3. `--include` and `--exclude` are mutually exclusive + +## Notes + +- `--wait` blocks until the export completes (SUCCESS, FAILED, or CANCELED) +- Without `--wait`, the command returns immediately with status STARTED +- Poll progress with `get export-collection` to monitor shard-level status +- Export uses the same storage backends as backups (S3, GCS, Azure, filesystem) +- The `--bucket` defaults to the cluster's configured backup bucket if not specified diff --git a/requirements-dev.txt b/requirements-dev.txt index 3646e43..25915e4 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,4 +1,4 @@ -weaviate-client>=4.16.7 +weaviate-client @ git+https://github.com/weaviate/weaviate-python-client.git@export_collection click==8.1.7 twine pytest diff --git a/setup.cfg b/setup.cfg index bae19a6..0d0df9f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -37,7 +37,7 @@ classifiers = include_package_data = True python_requires = >=3.9 install_requires = - weaviate-client>=4.19.0 + weaviate-client @ git+https://github.com/weaviate/weaviate-python-client.git@export_collection click==8.1.7 semver>=3.0.2 numpy>=1.24.0 diff --git a/test/integration/test_export_integration.py b/test/integration/test_export_integration.py new file mode 100644 index 0000000..6aa4ed2 --- /dev/null +++ b/test/integration/test_export_integration.py @@ -0,0 +1,169 @@ +import json +import pytest +import weaviate +from weaviate_cli.managers.collection_manager import CollectionManager +from weaviate_cli.managers.config_manager import ConfigManager +from weaviate_cli.managers.data_manager import DataManager +from weaviate_cli.managers.export_manager import ExportManager + + +EXPORT_COLLECTION = "ExportTestCollection" + + +@pytest.fixture +def client() -> weaviate.WeaviateClient: + config = ConfigManager() + return config.get_client() + + +@pytest.fixture +def collection_manager(client: weaviate.WeaviateClient) -> CollectionManager: + return CollectionManager(client) + + +@pytest.fixture +def data_manager(client: weaviate.WeaviateClient) -> DataManager: + return DataManager(client) + + +@pytest.fixture +def export_manager(client: weaviate.WeaviateClient) -> ExportManager: + return ExportManager(client) + + +@pytest.fixture +def setup_collection(collection_manager, data_manager): + """Create a collection with data for export tests.""" + try: + collection_manager.create_collection( + collection=EXPORT_COLLECTION, + replication_factor=1, + vectorizer="none", + force_auto_schema=True, + ) + data_manager.create_data( + collection=EXPORT_COLLECTION, + limit=100, + randomize=True, + consistency_level="one", + ) + yield + finally: + if collection_manager.client.collections.exists(EXPORT_COLLECTION): + collection_manager.delete_collection(collection=EXPORT_COLLECTION) + + +def test_create_export_and_get_status( + export_manager: ExportManager, setup_collection, capsys +): + """Test creating an export and getting its status.""" + try: + # Create export with wait + export_manager.create_export( + export_id="integration-test-export", + backend="s3", + file_format="parquet", + include=EXPORT_COLLECTION, + wait=True, + json_output=False, + ) + + out = capsys.readouterr().out + assert "integration-test-export" in out + assert "created successfully" in out + + # Get status + export_manager.get_export_status( + export_id="integration-test-export", + backend="s3", + json_output=True, + ) + + out = capsys.readouterr().out + data = json.loads(out) + assert data["export_id"] == "integration-test-export" + assert data["status"] == "SUCCESS" + assert EXPORT_COLLECTION in data["collections"] + assert "shard_status" in data + except Exception: + raise + + +def test_create_export_json_output( + export_manager: ExportManager, setup_collection, capsys +): + """Test creating an export with JSON output.""" + export_manager.create_export( + export_id="integration-json-export", + backend="s3", + file_format="parquet", + wait=True, + json_output=True, + ) + + out = capsys.readouterr().out + data = json.loads(out) + assert data["status"] == "success" + assert data["export_id"] == "integration-json-export" + assert data["export_status"] == "SUCCESS" + + +def test_create_export_with_exclude( + export_manager: ExportManager, setup_collection, capsys +): + """Test creating an export with exclude filter.""" + export_manager.create_export( + export_id="integration-exclude-export", + backend="s3", + file_format="parquet", + exclude=EXPORT_COLLECTION, + wait=True, + json_output=True, + ) + + out = capsys.readouterr().out + data = json.loads(out) + assert data["status"] == "success" + assert EXPORT_COLLECTION not in data.get("collections", []) + + +def test_create_export_include_and_exclude_raises( + export_manager: ExportManager, setup_collection +): + """Test that specifying both include and exclude raises an error.""" + with pytest.raises(Exception) as exc_info: + export_manager.create_export( + export_id="should-fail", + backend="s3", + file_format="parquet", + include=EXPORT_COLLECTION, + exclude="OtherCollection", + ) + assert "include" in str(exc_info.value).lower() + assert "exclude" in str(exc_info.value).lower() + + +def test_cancel_export(export_manager: ExportManager, setup_collection, capsys): + """Test canceling an export.""" + # Create export without waiting + export_manager.create_export( + export_id="integration-cancel-export", + backend="s3", + file_format="parquet", + wait=False, + ) + capsys.readouterr() # Clear output + + # Try to cancel — may succeed or fail depending on timing + try: + export_manager.cancel_export( + export_id="integration-cancel-export", + backend="s3", + json_output=True, + ) + out = capsys.readouterr().out + data = json.loads(out) + assert data["status"] == "success" + except Exception: + # Export may have already finished — that's OK + pass diff --git a/test/unittests/test_managers/test_export_manager.py b/test/unittests/test_managers/test_export_manager.py new file mode 100644 index 0000000..cfbf9b9 --- /dev/null +++ b/test/unittests/test_managers/test_export_manager.py @@ -0,0 +1,378 @@ +import json +import pytest +from unittest.mock import MagicMock +from datetime import datetime + +from weaviate_cli.managers.export_manager import ExportManager + + +@pytest.fixture +def mock_client_with_export(mock_client: MagicMock) -> MagicMock: + """Configure mock_client with sensible defaults for ExportManager tests.""" + mock_export = MagicMock() + + # Default create return + mock_create_return = MagicMock() + mock_create_return.export_id = "test-export" + mock_create_return.backend = "filesystem" + mock_create_return.path = "/exports/test-export" + mock_create_return.status = MagicMock(value="STARTED") + mock_create_return.started_at = None + mock_create_return.collections = ["Movies", "Books"] + mock_export.create.return_value = mock_create_return + + # Default get_status return + mock_status_return = MagicMock() + mock_status_return.export_id = "test-export" + mock_status_return.backend = "filesystem" + mock_status_return.path = "/exports/test-export" + mock_status_return.status = MagicMock(value="SUCCESS") + mock_status_return.started_at = None + mock_status_return.collections = ["Movies"] + mock_status_return.error = None + mock_status_return.took_in_ms = 1234 + mock_status_return.shard_status = None + mock_export.get_status.return_value = mock_status_return + + # Default cancel return + mock_export.cancel.return_value = True + + mock_client.export = mock_export + return mock_client + + +@pytest.fixture +def export_manager(mock_client_with_export: MagicMock) -> ExportManager: + return ExportManager(mock_client_with_export) + + +# --------------------------------------------------------------------------- +# create_export — validation +# --------------------------------------------------------------------------- + + +def test_create_export_include_and_exclude_raises( + export_manager: ExportManager, +) -> None: + """create_export raises when both include and exclude are specified.""" + with pytest.raises(Exception) as exc_info: + export_manager.create_export( + export_id="test", + backend="filesystem", + file_format="parquet", + include="Movies", + exclude="Books", + ) + + assert "include" in str(exc_info.value).lower() + assert "exclude" in str(exc_info.value).lower() + + +# --------------------------------------------------------------------------- +# create_export — success +# --------------------------------------------------------------------------- + + +def test_create_export_text_output(export_manager: ExportManager, capsys) -> None: + """create_export emits text success message.""" + export_manager.create_export( + export_id="my-export", + backend="filesystem", + file_format="parquet", + json_output=False, + ) + + out = capsys.readouterr().out + assert "my-export" in out + assert "created successfully" in out + + +def test_create_export_json_output(export_manager: ExportManager, capsys) -> None: + """create_export with json_output=True emits JSON with status=success.""" + export_manager.create_export( + export_id="my-export", + backend="filesystem", + file_format="parquet", + json_output=True, + ) + + out = capsys.readouterr().out + data = json.loads(out) + assert data["status"] == "success" + assert data["export_id"] == "test-export" + assert data["collections"] == ["Movies", "Books"] + + +# --------------------------------------------------------------------------- +# create_export — argument passing +# --------------------------------------------------------------------------- + + +def test_create_export_passes_correct_args_with_include( + export_manager: ExportManager, mock_client_with_export: MagicMock +) -> None: + """create_export passes include_collections as a list.""" + export_manager.create_export( + export_id="my-export", + backend="s3", + file_format="parquet", + include="Movies,Books", + ) + + mock_client_with_export.export.create.assert_called_once() + call_kwargs = mock_client_with_export.export.create.call_args.kwargs + assert call_kwargs["export_id"] == "my-export" + assert call_kwargs["include_collections"] == ["Movies", "Books"] + assert call_kwargs["exclude_collections"] is None + + +def test_create_export_passes_correct_args_with_exclude( + export_manager: ExportManager, mock_client_with_export: MagicMock +) -> None: + """create_export passes exclude_collections as a list.""" + export_manager.create_export( + export_id="my-export", + backend="filesystem", + file_format="parquet", + exclude="Movies", + ) + + call_kwargs = mock_client_with_export.export.create.call_args.kwargs + assert call_kwargs["include_collections"] is None + assert call_kwargs["exclude_collections"] == ["Movies"] + + +def test_create_export_passes_none_collections_when_not_specified( + export_manager: ExportManager, mock_client_with_export: MagicMock +) -> None: + """create_export passes None for both when neither is specified.""" + export_manager.create_export( + export_id="my-export", + backend="filesystem", + file_format="parquet", + ) + + call_kwargs = mock_client_with_export.export.create.call_args.kwargs + assert call_kwargs["include_collections"] is None + assert call_kwargs["exclude_collections"] is None + + +def test_create_export_passes_config_with_bucket_and_path( + export_manager: ExportManager, mock_client_with_export: MagicMock +) -> None: + """create_export passes ExportConfig when bucket/path are set.""" + export_manager.create_export( + export_id="my-export", + backend="s3", + file_format="parquet", + bucket="my-bucket", + path="/my/path", + ) + + call_kwargs = mock_client_with_export.export.create.call_args.kwargs + config = call_kwargs["config"] + assert config is not None + assert config.bucket == "my-bucket" + assert config.path == "/my/path" + + +def test_create_export_no_config_when_bucket_and_path_none( + export_manager: ExportManager, mock_client_with_export: MagicMock +) -> None: + """create_export passes config=None when bucket and path are not set.""" + export_manager.create_export( + export_id="my-export", + backend="filesystem", + file_format="parquet", + ) + + call_kwargs = mock_client_with_export.export.create.call_args.kwargs + assert call_kwargs["config"] is None + + +def test_create_export_with_wait( + export_manager: ExportManager, mock_client_with_export: MagicMock +) -> None: + """create_export passes wait_for_completion=True.""" + export_manager.create_export( + export_id="my-export", + backend="filesystem", + file_format="parquet", + wait=True, + ) + + call_kwargs = mock_client_with_export.export.create.call_args.kwargs + assert call_kwargs["wait_for_completion"] is True + + +# --------------------------------------------------------------------------- +# get_export_status — success +# --------------------------------------------------------------------------- + + +def test_get_export_status_text_output(export_manager: ExportManager, capsys) -> None: + """get_export_status emits text output.""" + export_manager.get_export_status( + export_id="my-export", + backend="filesystem", + json_output=False, + ) + + out = capsys.readouterr().out + assert "test-export" in out + assert "SUCCESS" in out + assert "1234" in out + + +def test_get_export_status_json_output(export_manager: ExportManager, capsys) -> None: + """get_export_status with json_output=True emits JSON.""" + export_manager.get_export_status( + export_id="my-export", + backend="filesystem", + json_output=True, + ) + + out = capsys.readouterr().out + data = json.loads(out) + assert data["export_id"] == "test-export" + assert data["status"] == "SUCCESS" + assert data["took_in_ms"] == 1234 + + +def test_get_export_status_passes_correct_args( + export_manager: ExportManager, mock_client_with_export: MagicMock +) -> None: + """get_export_status passes correct args to client.""" + export_manager.get_export_status( + export_id="my-export", + backend="s3", + bucket="my-bucket", + path="/my/path", + ) + + mock_client_with_export.export.get_status.assert_called_once() + call_kwargs = mock_client_with_export.export.get_status.call_args.kwargs + assert call_kwargs["export_id"] == "my-export" + assert call_kwargs["bucket"] == "my-bucket" + assert call_kwargs["path"] == "/my/path" + + +def test_get_export_status_with_shard_status_json( + export_manager: ExportManager, mock_client_with_export: MagicMock, capsys +) -> None: + """get_export_status includes shard_status in JSON output when present.""" + mock_shard_progress = MagicMock() + mock_shard_progress.status = MagicMock(value="SUCCESS") + mock_shard_progress.objects_exported = 500 + mock_shard_progress.error = None + mock_shard_progress.skip_reason = None + + mock_status = mock_client_with_export.export.get_status.return_value + mock_status.shard_status = {"Movies": {"shard1": mock_shard_progress}} + + export_manager.get_export_status( + export_id="my-export", + backend="filesystem", + json_output=True, + ) + + out = capsys.readouterr().out + data = json.loads(out) + assert "shard_status" in data + assert data["shard_status"]["Movies"]["shard1"]["status"] == "SUCCESS" + assert data["shard_status"]["Movies"]["shard1"]["objects_exported"] == 500 + + +def test_get_export_status_with_error_json( + export_manager: ExportManager, mock_client_with_export: MagicMock, capsys +) -> None: + """get_export_status includes error in JSON output when present.""" + mock_status = mock_client_with_export.export.get_status.return_value + mock_status.status = MagicMock(value="FAILED") + mock_status.error = "Something went wrong" + + export_manager.get_export_status( + export_id="my-export", + backend="filesystem", + json_output=True, + ) + + out = capsys.readouterr().out + data = json.loads(out) + assert data["status"] == "FAILED" + assert data["error"] == "Something went wrong" + + +# --------------------------------------------------------------------------- +# cancel_export — success +# --------------------------------------------------------------------------- + + +def test_cancel_export_success_text_output( + export_manager: ExportManager, capsys +) -> None: + """cancel_export when successful emits text success message.""" + export_manager.cancel_export( + export_id="my-export", + backend="filesystem", + json_output=False, + ) + + out = capsys.readouterr().out + assert "my-export" in out + assert "canceled successfully" in out + + +def test_cancel_export_success_json_output( + export_manager: ExportManager, capsys +) -> None: + """cancel_export when successful and json_output=True emits JSON.""" + export_manager.cancel_export( + export_id="my-export", + backend="filesystem", + json_output=True, + ) + + out = capsys.readouterr().out + data = json.loads(out) + assert data["status"] == "success" + assert "my-export" in data["message"] + + +def test_cancel_export_passes_correct_args( + export_manager: ExportManager, mock_client_with_export: MagicMock +) -> None: + """cancel_export passes correct args to client.""" + export_manager.cancel_export( + export_id="my-export", + backend="gcs", + bucket="my-bucket", + path="/my/path", + ) + + mock_client_with_export.export.cancel.assert_called_once() + call_kwargs = mock_client_with_export.export.cancel.call_args.kwargs + assert call_kwargs["export_id"] == "my-export" + assert call_kwargs["bucket"] == "my-bucket" + assert call_kwargs["path"] == "/my/path" + + +# --------------------------------------------------------------------------- +# cancel_export — failure +# --------------------------------------------------------------------------- + + +def test_cancel_export_failure_raises( + export_manager: ExportManager, mock_client_with_export: MagicMock +) -> None: + """cancel_export when client returns False raises an exception.""" + mock_client_with_export.export.cancel.return_value = False + + with pytest.raises(Exception) as exc_info: + export_manager.cancel_export( + export_id="my-export", + backend="filesystem", + ) + + assert "my-export" in str(exc_info.value) + assert "could not be canceled" in str(exc_info.value) diff --git a/weaviate_cli/commands/cancel.py b/weaviate_cli/commands/cancel.py index 649768a..7234c14 100644 --- a/weaviate_cli/commands/cancel.py +++ b/weaviate_cli/commands/cancel.py @@ -1,10 +1,12 @@ import json import click import sys +from typing import Optional from weaviate_cli.utils import get_client_from_context from weaviate_cli.managers.backup_manager import BackupManager from weaviate_cli.managers.cluster_manager import ClusterManager -from weaviate_cli.defaults import CancelBackupDefaults +from weaviate_cli.managers.export_manager import ExportManager +from weaviate_cli.defaults import CancelBackupDefaults, CancelExportCollectionDefaults # Create Group @@ -85,3 +87,59 @@ def cancel_replication_cli(ctx: click.Context, op_id: str, json_output: bool) -> finally: if client: client.close() + + +@cancel.command("export-collection") +@click.option( + "--export_id", + default=CancelExportCollectionDefaults.export_id, + help=f"Identifier for the export (default: {CancelExportCollectionDefaults.export_id}).", +) +@click.option( + "--backend", + default=CancelExportCollectionDefaults.backend, + type=click.Choice(["filesystem", "s3", "gcs", "azure"]), + help=f"The backend used for storing the export (default: {CancelExportCollectionDefaults.backend}).", +) +@click.option( + "--bucket", + default=CancelExportCollectionDefaults.bucket, + help="Bucket name for cloud storage backends.", +) +@click.option( + "--path", + default=CancelExportCollectionDefaults.path, + help="Path within the storage backend.", +) +@click.option( + "--json", "json_output", is_flag=True, default=False, help="Output in JSON format." +) +@click.pass_context +def cancel_export_collection_cli( + ctx: click.Context, + export_id: str, + backend: str, + bucket: Optional[str], + path: Optional[str], + json_output: bool, +) -> None: + """Cancel a collection export in Weaviate.""" + client = None + try: + client = get_client_from_context(ctx) + export_manager = ExportManager(client) + export_manager.cancel_export( + export_id=export_id, + backend=backend, + bucket=bucket, + path=path, + json_output=json_output, + ) + except Exception as e: + click.echo(f"Error: {e}") + if client: + client.close() + sys.exit(1) + finally: + if client: + client.close() diff --git a/weaviate_cli/commands/create.py b/weaviate_cli/commands/create.py index d47d60b..d927dfc 100644 --- a/weaviate_cli/commands/create.py +++ b/weaviate_cli/commands/create.py @@ -11,6 +11,7 @@ ) from weaviate_cli.managers.alias_manager import AliasManager from weaviate_cli.managers.backup_manager import BackupManager +from weaviate_cli.managers.export_manager import ExportManager from weaviate_cli.utils import get_client_from_context, get_async_client_from_context from weaviate_cli.managers.collection_manager import CollectionManager from weaviate_cli.managers.tenant_manager import TenantManager @@ -22,6 +23,7 @@ from weaviate_cli.defaults import ( CreateBackupDefaults, CreateCollectionDefaults, + CreateExportCollectionDefaults, CreateTenantsDefaults, CreateDataDefaults, CreateRoleDefaults, @@ -822,3 +824,88 @@ def create_replication_cli( finally: if client: client.close() + + +@create.command("export-collection") +@click.option( + "--export_id", + default=CreateExportCollectionDefaults.export_id, + help=f"Identifier for the export (default: {CreateExportCollectionDefaults.export_id}).", +) +@click.option( + "--backend", + default=CreateExportCollectionDefaults.backend, + type=click.Choice(["filesystem", "s3", "gcs", "azure"]), + help=f"The backend used for storing the export (default: {CreateExportCollectionDefaults.backend}).", +) +@click.option( + "--file_format", + default=CreateExportCollectionDefaults.file_format, + type=click.Choice(["parquet"]), + help=f"The file format for the export (default: {CreateExportCollectionDefaults.file_format}).", +) +@click.option( + "--include", + default=CreateExportCollectionDefaults.include, + help="Comma separated list of collections to include in the export.", +) +@click.option( + "--exclude", + default=CreateExportCollectionDefaults.exclude, + help="Comma separated list of collections to exclude from the export.", +) +@click.option( + "--wait", + is_flag=True, + help="Wait for the export to complete before returning.", +) +@click.option( + "--bucket", + default=CreateExportCollectionDefaults.bucket, + help="Bucket name for cloud storage backends.", +) +@click.option( + "--path", + default=CreateExportCollectionDefaults.path, + help="Path within the storage backend.", +) +@click.option( + "--json", "json_output", is_flag=True, default=False, help="Output in JSON format." +) +@click.pass_context +def create_export_collection_cli( + ctx: click.Context, + export_id: str, + backend: str, + file_format: str, + include: Optional[str], + exclude: Optional[str], + wait: bool, + bucket: Optional[str], + path: Optional[str], + json_output: bool, +) -> None: + """Create a collection export in Weaviate.""" + client: Optional[WeaviateClient] = None + try: + client = get_client_from_context(ctx) + export_manager = ExportManager(client) + export_manager.create_export( + export_id=export_id, + backend=backend, + file_format=file_format, + include=include, + exclude=exclude, + wait=wait, + bucket=bucket, + path=path, + json_output=json_output, + ) + except Exception as e: + click.echo(f"Error: {e}") + if client: + client.close() + sys.exit(1) + finally: + if client: + client.close() diff --git a/weaviate_cli/commands/get.py b/weaviate_cli/commands/get.py index 3f0a057..86f7bf9 100644 --- a/weaviate_cli/commands/get.py +++ b/weaviate_cli/commands/get.py @@ -8,6 +8,7 @@ collection_name_complete, ) from weaviate_cli.managers.alias_manager import AliasManager +from weaviate_cli.managers.export_manager import ExportManager from weaviate_cli.managers.role_manager import RoleManager from weaviate_cli.managers.tenant_manager import TenantManager from weaviate_cli.managers.user_manager import UserManager @@ -19,6 +20,7 @@ from weaviate.rbac.models import Role from weaviate_cli.defaults import ( GetBackupDefaults, + GetExportCollectionDefaults, GetTenantsDefaults, GetShardsDefaults, GetCollectionDefaults, @@ -565,3 +567,59 @@ def get_replications_cli(ctx: click.Context, json_output: bool) -> None: finally: if client: client.close() + + +@get.command("export-collection") +@click.option( + "--export_id", + default=GetExportCollectionDefaults.export_id, + help=f"Identifier for the export (default: {GetExportCollectionDefaults.export_id}).", +) +@click.option( + "--backend", + default=GetExportCollectionDefaults.backend, + type=click.Choice(["filesystem", "s3", "gcs", "azure"]), + help=f"The backend used for storing the export (default: {GetExportCollectionDefaults.backend}).", +) +@click.option( + "--bucket", + default=GetExportCollectionDefaults.bucket, + help="Bucket name for cloud storage backends.", +) +@click.option( + "--path", + default=GetExportCollectionDefaults.path, + help="Path within the storage backend.", +) +@click.option( + "--json", "json_output", is_flag=True, default=False, help="Output in JSON format." +) +@click.pass_context +def get_export_collection_cli( + ctx: click.Context, + export_id: str, + backend: str, + bucket: Optional[str], + path: Optional[str], + json_output: bool, +) -> None: + """Get the status of a collection export in Weaviate.""" + client = None + try: + client = get_client_from_context(ctx) + export_manager = ExportManager(client) + export_manager.get_export_status( + export_id=export_id, + backend=backend, + bucket=bucket, + path=path, + json_output=json_output, + ) + except Exception as e: + click.echo(f"Error: {e}") + if client: + client.close() + sys.exit(1) + finally: + if client: + client.close() diff --git a/weaviate_cli/defaults.py b/weaviate_cli/defaults.py index 9e5e3d2..f3a4517 100644 --- a/weaviate_cli/defaults.py +++ b/weaviate_cli/defaults.py @@ -302,3 +302,31 @@ class GetAliasDefaults: alias_name: Optional[str] = None collection: Optional[str] = None all: bool = False + + +@dataclass +class CreateExportCollectionDefaults: + export_id: str = "test-export" + backend: str = "filesystem" + file_format: str = "parquet" + include: Optional[str] = None + exclude: Optional[str] = None + wait: bool = False + bucket: Optional[str] = None + path: Optional[str] = None + + +@dataclass +class GetExportCollectionDefaults: + export_id: str = "test-export" + backend: str = "filesystem" + bucket: Optional[str] = None + path: Optional[str] = None + + +@dataclass +class CancelExportCollectionDefaults: + export_id: str = "test-export" + backend: str = "filesystem" + bucket: Optional[str] = None + path: Optional[str] = None diff --git a/weaviate_cli/managers/export_manager.py b/weaviate_cli/managers/export_manager.py new file mode 100644 index 0000000..839b5fd --- /dev/null +++ b/weaviate_cli/managers/export_manager.py @@ -0,0 +1,204 @@ +import json +import click +from typing import Optional +from weaviate.client import WeaviateClient +from weaviate.export.export import ( + ExportConfig, + ExportFileFormat, + ExportStorage, + ExportStatusReturn, +) +from weaviate_cli.defaults import ( + CreateExportCollectionDefaults, + GetExportCollectionDefaults, + CancelExportCollectionDefaults, +) + + +BACKEND_MAP = { + "filesystem": ExportStorage.FILESYSTEM, + "s3": ExportStorage.S3, + "gcs": ExportStorage.GCS, + "azure": ExportStorage.AZURE, +} + +FILE_FORMAT_MAP = { + "parquet": ExportFileFormat.PARQUET, +} + + +class ExportManager: + def __init__(self, client: WeaviateClient) -> None: + self.client: WeaviateClient = client + + def create_export( + self, + export_id: str = CreateExportCollectionDefaults.export_id, + backend: str = CreateExportCollectionDefaults.backend, + file_format: str = CreateExportCollectionDefaults.file_format, + include: Optional[str] = CreateExportCollectionDefaults.include, + exclude: Optional[str] = CreateExportCollectionDefaults.exclude, + wait: bool = CreateExportCollectionDefaults.wait, + bucket: Optional[str] = CreateExportCollectionDefaults.bucket, + path: Optional[str] = CreateExportCollectionDefaults.path, + json_output: bool = False, + ) -> None: + if include and exclude: + raise Exception( + "Cannot specify both --include and --exclude. Use one or the other." + ) + + backend_enum = BACKEND_MAP[backend] + file_format_enum = FILE_FORMAT_MAP[file_format] + + config = None + if bucket or path: + config = ExportConfig(bucket=bucket, path=path) + + include_collections = ( + [c.strip() for c in include.split(",") if c.strip()] if include else None + ) + exclude_collections = ( + [c.strip() for c in exclude.split(",") if c.strip()] if exclude else None + ) + + result = self.client.export.create( + export_id=export_id, + backend=backend_enum, + file_format=file_format_enum, + include_collections=include_collections, + exclude_collections=exclude_collections, + wait_for_completion=wait, + config=config, + ) + + if json_output: + data = { + "status": "success", + "export_id": result.export_id, + "backend": result.backend, + "path": result.path, + "export_status": result.status.value, + "collections": result.collections, + } + if result.started_at: + data["started_at"] = str(result.started_at) + click.echo(json.dumps(data, indent=2, default=str)) + else: + click.echo( + f"Export '{export_id}' created successfully with status '{result.status.value}'." + ) + if result.collections: + click.echo(f"Collections: {', '.join(result.collections)}") + + def get_export_status( + self, + export_id: str = GetExportCollectionDefaults.export_id, + backend: str = GetExportCollectionDefaults.backend, + bucket: Optional[str] = GetExportCollectionDefaults.bucket, + path: Optional[str] = GetExportCollectionDefaults.path, + json_output: bool = False, + ) -> None: + backend_enum = BACKEND_MAP[backend] + + result = self.client.export.get_status( + export_id=export_id, + backend=backend_enum, + bucket=bucket, + path=path, + ) + + self._print_export_status(result, json_output=json_output) + + def cancel_export( + self, + export_id: str = CancelExportCollectionDefaults.export_id, + backend: str = CancelExportCollectionDefaults.backend, + bucket: Optional[str] = CancelExportCollectionDefaults.bucket, + path: Optional[str] = CancelExportCollectionDefaults.path, + json_output: bool = False, + ) -> None: + backend_enum = BACKEND_MAP[backend] + + success = self.client.export.cancel( + export_id=export_id, + backend=backend_enum, + bucket=bucket, + path=path, + ) + + if success: + if json_output: + click.echo( + json.dumps( + { + "status": "success", + "message": f"Export '{export_id}' canceled successfully.", + }, + indent=2, + ) + ) + else: + click.echo(f"Export '{export_id}' canceled successfully.") + else: + raise Exception(f"Export '{export_id}' could not be canceled.") + + def _print_export_status( + self, result: ExportStatusReturn, json_output: bool = False + ) -> None: + if json_output: + data = { + "export_id": result.export_id, + "backend": result.backend, + "path": result.path, + "status": result.status.value, + "collections": result.collections, + } + if result.started_at: + data["started_at"] = str(result.started_at) + if result.error: + data["error"] = result.error + if result.took_in_ms is not None: + data["took_in_ms"] = result.took_in_ms + if result.shard_status: + data["shard_status"] = { + collection: { + shard: { + "status": progress.status.value, + "objects_exported": progress.objects_exported, + **({"error": progress.error} if progress.error else {}), + **( + {"skip_reason": progress.skip_reason} + if progress.skip_reason + else {} + ), + } + for shard, progress in shards.items() + } + for collection, shards in result.shard_status.items() + } + click.echo(json.dumps(data, indent=2, default=str)) + else: + click.echo(f"Export ID: {result.export_id}") + click.echo(f"Backend: {result.backend}") + click.echo(f"Path: {result.path}") + click.echo(f"Status: {result.status.value}") + if result.collections: + click.echo(f"Collections: {', '.join(result.collections)}") + if result.started_at: + click.echo(f"Started at: {result.started_at}") + if result.error: + click.echo(f"Error: {result.error}") + if result.took_in_ms is not None: + click.echo(f"Took: {result.took_in_ms}ms") + if result.shard_status: + click.echo("Shard Status:") + for collection, shards in result.shard_status.items(): + click.echo(f" {collection}:") + for shard, progress in shards.items(): + status_line = f" {shard}: {progress.status.value} ({progress.objects_exported} objects)" + if progress.error: + status_line += f" - Error: {progress.error}" + if progress.skip_reason: + status_line += f" - Skipped: {progress.skip_reason}" + click.echo(status_line)