diff --git a/.claude/skills/contributing-to-weaviate-cli/references/architecture.md b/.claude/skills/contributing-to-weaviate-cli/references/architecture.md index 3bde4fc..edd3430 100644 --- a/.claude/skills/contributing-to-weaviate-cli/references/architecture.md +++ b/.claude/skills/contributing-to-weaviate-cli/references/architecture.md @@ -81,6 +81,8 @@ class CollectionManager: self.client.collections.create(name=collection, ...) ``` +Manager files: `collection_manager.py`, `tenant_manager.py`, `data_manager.py`, `backup_manager.py`, `export_manager.py`, `role_manager.py`, `user_manager.py`, `node_manager.py`, `shard_manager.py`, `cluster_manager.py`, `alias_manager.py`, `benchmark_manager.py`, `config_manager.py` + Managers handle: - Input validation and error messages - Weaviate client API calls diff --git a/.claude/skills/operating-weaviate-cli/SKILL.md b/.claude/skills/operating-weaviate-cli/SKILL.md index 569097a..43806b3 100644 --- a/.claude/skills/operating-weaviate-cli/SKILL.md +++ b/.claude/skills/operating-weaviate-cli/SKILL.md @@ -113,13 +113,13 @@ weaviate-cli [--config-file FILE] [--user USER] [--json] [opti | Group | Description | |-------|-------------| -| `create` | Create collections, tenants, data, backups, roles, users, aliases, replications | -| `get` | Inspect collections, tenants, shards, backups, roles, users, nodes, aliases, replications | +| `create` | Create collections, tenants, data, backups, exports, roles, users, aliases, replications | +| `get` | Inspect collections, tenants, shards, backups, exports, roles, users, nodes, aliases, replications | | `update` | Update collections, tenants, shards, data, users, aliases | | `delete` | Delete collections, tenants, data, roles, users, aliases, replications | | `query` | Query data (fetch/vector/keyword/hybrid/uuid), replications, sharding state | | `restore` | Restore backups | -| `cancel` | Cancel backups and replications | +| `cancel` | Cancel backups, exports, and replications | | `assign` | Assign roles to users, permissions to roles | | `revoke` | Revoke roles from users, permissions from roles | | `benchmark` | Run QPS benchmarks | @@ -219,6 +219,25 @@ Backends: `s3`, `gcs`, `filesystem`. Options: `--include`, `--exclude`, `--wait` See [references/backups.md](references/backups.md). +### Collection Export + +```bash +weaviate-cli create export-collection --export_id my-export --backend s3 --file_format parquet --wait --json +weaviate-cli create export-collection --export_id my-export --backend s3 --include "Movies,Books" --json +weaviate-cli create export-collection --export_id my-export --backend s3 --exclude "TempData" --json +weaviate-cli create export-collection --export_id my-export --backend s3 --bucket my-bucket --path /exports --json +weaviate-cli get export-collection --export_id my-export --backend s3 --json +weaviate-cli cancel export-collection --export_id my-export --backend s3 --json +``` + +Backends: `filesystem`, `s3`, `gcs`, `azure`. File formats: `parquet`. + +Options: `--include`, `--exclude` (mutually exclusive), `--wait`, `--bucket`, `--path` + +**Prerequisite**: The export backend must be configured on the Weaviate cluster (e.g., `ENABLE_BACKUP=true` for S3 via MinIO in local-k8s). + +See [references/exports.md](references/exports.md). + ### RBAC (Roles, Users, Permissions) ```bash @@ -362,6 +381,13 @@ hot/active <--> cold/inactive 5. For timestamp-based TTL on existing collections: `--inverted_index timestamp` must be set at creation or already enabled 6. For property-based TTL: the date property must exist, be `date` type, and have filterable or rangeable index +### Collection Export Workflow +1. `create export-collection --backend s3 --export_id my-export --wait` -- create and wait for completion +2. `get export-collection --backend s3 --export_id my-export` -- check status (includes shard-level progress) +3. `cancel export-collection --backend s3 --export_id my-export` -- cancel in-progress export + +**Prerequisite**: The export backend must be configured on the cluster. For local-k8s, deploy with `ENABLE_BACKUP=true` to enable S3 via MinIO. + ### Alias Workflow 1. `create collection --collection Movies_v1` -- create the target collection 2. `create alias Movies Movies_v1` -- create alias pointing to collection @@ -416,6 +442,7 @@ When new commands or options are added to `weaviate-cli`: - [references/search.md](references/search.md) -- Search types, options, and selection guide - [references/tenants.md](references/tenants.md) -- Tenant state machine and management - [references/backups.md](references/backups.md) -- Backup/restore options and notes +- [references/exports.md](references/exports.md) -- Collection export options and notes - [references/rbac.md](references/rbac.md) -- Permission format, actions, and examples - [references/cluster.md](references/cluster.md) -- Nodes, shards, replication operations - [references/benchmark.md](references/benchmark.md) -- Benchmark options and output modes diff --git a/.claude/skills/operating-weaviate-cli/references/exports.md b/.claude/skills/operating-weaviate-cli/references/exports.md new file mode 100644 index 0000000..37985e7 --- /dev/null +++ b/.claude/skills/operating-weaviate-cli/references/exports.md @@ -0,0 +1,59 @@ +# Collection Export Reference + +Export collections from Weaviate to external storage backends in Parquet format. + +## Create Export +```bash +weaviate-cli create export-collection --export_id my-export --backend s3 --file_format parquet --wait --json +weaviate-cli create export-collection --export_id my-export --backend s3 --include "Movies,Books" --json +weaviate-cli create export-collection --export_id my-export --backend gcs --exclude "TempData" --json +weaviate-cli create export-collection --export_id my-export --backend s3 --bucket my-bucket --path /exports --wait --json +``` + +## Check Export Status +```bash +weaviate-cli get export-collection --export_id my-export --backend s3 --json +``` + +Returns shard-level progress including objects exported per shard, errors, and timing. + +## Cancel Export +```bash +weaviate-cli cancel export-collection --export_id my-export --backend s3 --json +``` + +Only works while the export is in progress. Returns an error if the export has already completed. + +## Options + +**Create:** +- `--export_id` -- Export identifier (default: "test-export") +- `--backend` -- filesystem, s3, gcs, azure (default: filesystem) +- `--file_format` -- Export format: parquet (default: parquet) +- `--include` -- Comma-separated collections to include +- `--exclude` -- Comma-separated collections to exclude +- `--wait` -- Wait for completion +- `--bucket` -- Bucket name for cloud storage backends +- `--path` -- Path within the storage backend + +**Get Status:** +- `--export_id`, `--backend` -- Same as create +- `--bucket`, `--path` -- Optional, for locating the export + +**Cancel:** +- `--export_id`, `--backend` -- Same as create +- `--bucket`, `--path` -- Optional, for locating the export + +## Prerequisites + +1. The export backend must be configured on the Weaviate cluster +2. For local-k8s, deploy with `ENABLE_BACKUP=true` to enable S3 via MinIO +3. `--include` and `--exclude` are mutually exclusive + +## Notes + +- `--wait` blocks until the export completes (SUCCESS, FAILED, or CANCELED) +- Without `--wait`, the command returns immediately with status STARTED +- Poll progress with `get export-collection` to monitor shard-level status +- Export uses the same storage backends as backups (S3, GCS, Azure, filesystem) +- The `--bucket` defaults to the cluster's configured backup bucket if not specified diff --git a/requirements-dev.txt b/requirements-dev.txt index 3646e43..25915e4 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,4 +1,4 @@ -weaviate-client>=4.16.7 +weaviate-client @ git+https://github.com/weaviate/weaviate-python-client.git@export_collection click==8.1.7 twine pytest diff --git a/setup.cfg b/setup.cfg index bae19a6..0d0df9f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -37,7 +37,7 @@ classifiers = include_package_data = True python_requires = >=3.9 install_requires = - weaviate-client>=4.19.0 + weaviate-client @ git+https://github.com/weaviate/weaviate-python-client.git@export_collection click==8.1.7 semver>=3.0.2 numpy>=1.24.0 diff --git a/test/integration/test_export_integration.py b/test/integration/test_export_integration.py new file mode 100644 index 0000000..6aa4ed2 --- /dev/null +++ b/test/integration/test_export_integration.py @@ -0,0 +1,169 @@ +import json +import pytest +import weaviate +from weaviate_cli.managers.collection_manager import CollectionManager +from weaviate_cli.managers.config_manager import ConfigManager +from weaviate_cli.managers.data_manager import DataManager +from weaviate_cli.managers.export_manager import ExportManager + + +EXPORT_COLLECTION = "ExportTestCollection" + + +@pytest.fixture +def client() -> weaviate.WeaviateClient: + config = ConfigManager() + return config.get_client() + + +@pytest.fixture +def collection_manager(client: weaviate.WeaviateClient) -> CollectionManager: + return CollectionManager(client) + + +@pytest.fixture +def data_manager(client: weaviate.WeaviateClient) -> DataManager: + return DataManager(client) + + +@pytest.fixture +def export_manager(client: weaviate.WeaviateClient) -> ExportManager: + return ExportManager(client) + + +@pytest.fixture +def setup_collection(collection_manager, data_manager): + """Create a collection with data for export tests.""" + try: + collection_manager.create_collection( + collection=EXPORT_COLLECTION, + replication_factor=1, + vectorizer="none", + force_auto_schema=True, + ) + data_manager.create_data( + collection=EXPORT_COLLECTION, + limit=100, + randomize=True, + consistency_level="one", + ) + yield + finally: + if collection_manager.client.collections.exists(EXPORT_COLLECTION): + collection_manager.delete_collection(collection=EXPORT_COLLECTION) + + +def test_create_export_and_get_status( + export_manager: ExportManager, setup_collection, capsys +): + """Test creating an export and getting its status.""" + try: + # Create export with wait + export_manager.create_export( + export_id="integration-test-export", + backend="s3", + file_format="parquet", + include=EXPORT_COLLECTION, + wait=True, + json_output=False, + ) + + out = capsys.readouterr().out + assert "integration-test-export" in out + assert "created successfully" in out + + # Get status + export_manager.get_export_status( + export_id="integration-test-export", + backend="s3", + json_output=True, + ) + + out = capsys.readouterr().out + data = json.loads(out) + assert data["export_id"] == "integration-test-export" + assert data["status"] == "SUCCESS" + assert EXPORT_COLLECTION in data["collections"] + assert "shard_status" in data + except Exception: + raise + + +def test_create_export_json_output( + export_manager: ExportManager, setup_collection, capsys +): + """Test creating an export with JSON output.""" + export_manager.create_export( + export_id="integration-json-export", + backend="s3", + file_format="parquet", + wait=True, + json_output=True, + ) + + out = capsys.readouterr().out + data = json.loads(out) + assert data["status"] == "success" + assert data["export_id"] == "integration-json-export" + assert data["export_status"] == "SUCCESS" + + +def test_create_export_with_exclude( + export_manager: ExportManager, setup_collection, capsys +): + """Test creating an export with exclude filter.""" + export_manager.create_export( + export_id="integration-exclude-export", + backend="s3", + file_format="parquet", + exclude=EXPORT_COLLECTION, + wait=True, + json_output=True, + ) + + out = capsys.readouterr().out + data = json.loads(out) + assert data["status"] == "success" + assert EXPORT_COLLECTION not in data.get("collections", []) + + +def test_create_export_include_and_exclude_raises( + export_manager: ExportManager, setup_collection +): + """Test that specifying both include and exclude raises an error.""" + with pytest.raises(Exception) as exc_info: + export_manager.create_export( + export_id="should-fail", + backend="s3", + file_format="parquet", + include=EXPORT_COLLECTION, + exclude="OtherCollection", + ) + assert "include" in str(exc_info.value).lower() + assert "exclude" in str(exc_info.value).lower() + + +def test_cancel_export(export_manager: ExportManager, setup_collection, capsys): + """Test canceling an export.""" + # Create export without waiting + export_manager.create_export( + export_id="integration-cancel-export", + backend="s3", + file_format="parquet", + wait=False, + ) + capsys.readouterr() # Clear output + + # Try to cancel — may succeed or fail depending on timing + try: + export_manager.cancel_export( + export_id="integration-cancel-export", + backend="s3", + json_output=True, + ) + out = capsys.readouterr().out + data = json.loads(out) + assert data["status"] == "success" + except Exception: + # Export may have already finished — that's OK + pass diff --git a/test/unittests/test_managers/test_export_manager.py b/test/unittests/test_managers/test_export_manager.py new file mode 100644 index 0000000..cfbf9b9 --- /dev/null +++ b/test/unittests/test_managers/test_export_manager.py @@ -0,0 +1,378 @@ +import json +import pytest +from unittest.mock import MagicMock +from datetime import datetime + +from weaviate_cli.managers.export_manager import ExportManager + + +@pytest.fixture +def mock_client_with_export(mock_client: MagicMock) -> MagicMock: + """Configure mock_client with sensible defaults for ExportManager tests.""" + mock_export = MagicMock() + + # Default create return + mock_create_return = MagicMock() + mock_create_return.export_id = "test-export" + mock_create_return.backend = "filesystem" + mock_create_return.path = "/exports/test-export" + mock_create_return.status = MagicMock(value="STARTED") + mock_create_return.started_at = None + mock_create_return.collections = ["Movies", "Books"] + mock_export.create.return_value = mock_create_return + + # Default get_status return + mock_status_return = MagicMock() + mock_status_return.export_id = "test-export" + mock_status_return.backend = "filesystem" + mock_status_return.path = "/exports/test-export" + mock_status_return.status = MagicMock(value="SUCCESS") + mock_status_return.started_at = None + mock_status_return.collections = ["Movies"] + mock_status_return.error = None + mock_status_return.took_in_ms = 1234 + mock_status_return.shard_status = None + mock_export.get_status.return_value = mock_status_return + + # Default cancel return + mock_export.cancel.return_value = True + + mock_client.export = mock_export + return mock_client + + +@pytest.fixture +def export_manager(mock_client_with_export: MagicMock) -> ExportManager: + return ExportManager(mock_client_with_export) + + +# --------------------------------------------------------------------------- +# create_export — validation +# --------------------------------------------------------------------------- + + +def test_create_export_include_and_exclude_raises( + export_manager: ExportManager, +) -> None: + """create_export raises when both include and exclude are specified.""" + with pytest.raises(Exception) as exc_info: + export_manager.create_export( + export_id="test", + backend="filesystem", + file_format="parquet", + include="Movies", + exclude="Books", + ) + + assert "include" in str(exc_info.value).lower() + assert "exclude" in str(exc_info.value).lower() + + +# --------------------------------------------------------------------------- +# create_export — success +# --------------------------------------------------------------------------- + + +def test_create_export_text_output(export_manager: ExportManager, capsys) -> None: + """create_export emits text success message.""" + export_manager.create_export( + export_id="my-export", + backend="filesystem", + file_format="parquet", + json_output=False, + ) + + out = capsys.readouterr().out + assert "my-export" in out + assert "created successfully" in out + + +def test_create_export_json_output(export_manager: ExportManager, capsys) -> None: + """create_export with json_output=True emits JSON with status=success.""" + export_manager.create_export( + export_id="my-export", + backend="filesystem", + file_format="parquet", + json_output=True, + ) + + out = capsys.readouterr().out + data = json.loads(out) + assert data["status"] == "success" + assert data["export_id"] == "test-export" + assert data["collections"] == ["Movies", "Books"] + + +# --------------------------------------------------------------------------- +# create_export — argument passing +# --------------------------------------------------------------------------- + + +def test_create_export_passes_correct_args_with_include( + export_manager: ExportManager, mock_client_with_export: MagicMock +) -> None: + """create_export passes include_collections as a list.""" + export_manager.create_export( + export_id="my-export", + backend="s3", + file_format="parquet", + include="Movies,Books", + ) + + mock_client_with_export.export.create.assert_called_once() + call_kwargs = mock_client_with_export.export.create.call_args.kwargs + assert call_kwargs["export_id"] == "my-export" + assert call_kwargs["include_collections"] == ["Movies", "Books"] + assert call_kwargs["exclude_collections"] is None + + +def test_create_export_passes_correct_args_with_exclude( + export_manager: ExportManager, mock_client_with_export: MagicMock +) -> None: + """create_export passes exclude_collections as a list.""" + export_manager.create_export( + export_id="my-export", + backend="filesystem", + file_format="parquet", + exclude="Movies", + ) + + call_kwargs = mock_client_with_export.export.create.call_args.kwargs + assert call_kwargs["include_collections"] is None + assert call_kwargs["exclude_collections"] == ["Movies"] + + +def test_create_export_passes_none_collections_when_not_specified( + export_manager: ExportManager, mock_client_with_export: MagicMock +) -> None: + """create_export passes None for both when neither is specified.""" + export_manager.create_export( + export_id="my-export", + backend="filesystem", + file_format="parquet", + ) + + call_kwargs = mock_client_with_export.export.create.call_args.kwargs + assert call_kwargs["include_collections"] is None + assert call_kwargs["exclude_collections"] is None + + +def test_create_export_passes_config_with_bucket_and_path( + export_manager: ExportManager, mock_client_with_export: MagicMock +) -> None: + """create_export passes ExportConfig when bucket/path are set.""" + export_manager.create_export( + export_id="my-export", + backend="s3", + file_format="parquet", + bucket="my-bucket", + path="/my/path", + ) + + call_kwargs = mock_client_with_export.export.create.call_args.kwargs + config = call_kwargs["config"] + assert config is not None + assert config.bucket == "my-bucket" + assert config.path == "/my/path" + + +def test_create_export_no_config_when_bucket_and_path_none( + export_manager: ExportManager, mock_client_with_export: MagicMock +) -> None: + """create_export passes config=None when bucket and path are not set.""" + export_manager.create_export( + export_id="my-export", + backend="filesystem", + file_format="parquet", + ) + + call_kwargs = mock_client_with_export.export.create.call_args.kwargs + assert call_kwargs["config"] is None + + +def test_create_export_with_wait( + export_manager: ExportManager, mock_client_with_export: MagicMock +) -> None: + """create_export passes wait_for_completion=True.""" + export_manager.create_export( + export_id="my-export", + backend="filesystem", + file_format="parquet", + wait=True, + ) + + call_kwargs = mock_client_with_export.export.create.call_args.kwargs + assert call_kwargs["wait_for_completion"] is True + + +# --------------------------------------------------------------------------- +# get_export_status — success +# --------------------------------------------------------------------------- + + +def test_get_export_status_text_output(export_manager: ExportManager, capsys) -> None: + """get_export_status emits text output.""" + export_manager.get_export_status( + export_id="my-export", + backend="filesystem", + json_output=False, + ) + + out = capsys.readouterr().out + assert "test-export" in out + assert "SUCCESS" in out + assert "1234" in out + + +def test_get_export_status_json_output(export_manager: ExportManager, capsys) -> None: + """get_export_status with json_output=True emits JSON.""" + export_manager.get_export_status( + export_id="my-export", + backend="filesystem", + json_output=True, + ) + + out = capsys.readouterr().out + data = json.loads(out) + assert data["export_id"] == "test-export" + assert data["status"] == "SUCCESS" + assert data["took_in_ms"] == 1234 + + +def test_get_export_status_passes_correct_args( + export_manager: ExportManager, mock_client_with_export: MagicMock +) -> None: + """get_export_status passes correct args to client.""" + export_manager.get_export_status( + export_id="my-export", + backend="s3", + bucket="my-bucket", + path="/my/path", + ) + + mock_client_with_export.export.get_status.assert_called_once() + call_kwargs = mock_client_with_export.export.get_status.call_args.kwargs + assert call_kwargs["export_id"] == "my-export" + assert call_kwargs["bucket"] == "my-bucket" + assert call_kwargs["path"] == "/my/path" + + +def test_get_export_status_with_shard_status_json( + export_manager: ExportManager, mock_client_with_export: MagicMock, capsys +) -> None: + """get_export_status includes shard_status in JSON output when present.""" + mock_shard_progress = MagicMock() + mock_shard_progress.status = MagicMock(value="SUCCESS") + mock_shard_progress.objects_exported = 500 + mock_shard_progress.error = None + mock_shard_progress.skip_reason = None + + mock_status = mock_client_with_export.export.get_status.return_value + mock_status.shard_status = {"Movies": {"shard1": mock_shard_progress}} + + export_manager.get_export_status( + export_id="my-export", + backend="filesystem", + json_output=True, + ) + + out = capsys.readouterr().out + data = json.loads(out) + assert "shard_status" in data + assert data["shard_status"]["Movies"]["shard1"]["status"] == "SUCCESS" + assert data["shard_status"]["Movies"]["shard1"]["objects_exported"] == 500 + + +def test_get_export_status_with_error_json( + export_manager: ExportManager, mock_client_with_export: MagicMock, capsys +) -> None: + """get_export_status includes error in JSON output when present.""" + mock_status = mock_client_with_export.export.get_status.return_value + mock_status.status = MagicMock(value="FAILED") + mock_status.error = "Something went wrong" + + export_manager.get_export_status( + export_id="my-export", + backend="filesystem", + json_output=True, + ) + + out = capsys.readouterr().out + data = json.loads(out) + assert data["status"] == "FAILED" + assert data["error"] == "Something went wrong" + + +# --------------------------------------------------------------------------- +# cancel_export — success +# --------------------------------------------------------------------------- + + +def test_cancel_export_success_text_output( + export_manager: ExportManager, capsys +) -> None: + """cancel_export when successful emits text success message.""" + export_manager.cancel_export( + export_id="my-export", + backend="filesystem", + json_output=False, + ) + + out = capsys.readouterr().out + assert "my-export" in out + assert "canceled successfully" in out + + +def test_cancel_export_success_json_output( + export_manager: ExportManager, capsys +) -> None: + """cancel_export when successful and json_output=True emits JSON.""" + export_manager.cancel_export( + export_id="my-export", + backend="filesystem", + json_output=True, + ) + + out = capsys.readouterr().out + data = json.loads(out) + assert data["status"] == "success" + assert "my-export" in data["message"] + + +def test_cancel_export_passes_correct_args( + export_manager: ExportManager, mock_client_with_export: MagicMock +) -> None: + """cancel_export passes correct args to client.""" + export_manager.cancel_export( + export_id="my-export", + backend="gcs", + bucket="my-bucket", + path="/my/path", + ) + + mock_client_with_export.export.cancel.assert_called_once() + call_kwargs = mock_client_with_export.export.cancel.call_args.kwargs + assert call_kwargs["export_id"] == "my-export" + assert call_kwargs["bucket"] == "my-bucket" + assert call_kwargs["path"] == "/my/path" + + +# --------------------------------------------------------------------------- +# cancel_export — failure +# --------------------------------------------------------------------------- + + +def test_cancel_export_failure_raises( + export_manager: ExportManager, mock_client_with_export: MagicMock +) -> None: + """cancel_export when client returns False raises an exception.""" + mock_client_with_export.export.cancel.return_value = False + + with pytest.raises(Exception) as exc_info: + export_manager.cancel_export( + export_id="my-export", + backend="filesystem", + ) + + assert "my-export" in str(exc_info.value) + assert "could not be canceled" in str(exc_info.value) diff --git a/weaviate_cli/commands/cancel.py b/weaviate_cli/commands/cancel.py index 649768a..7234c14 100644 --- a/weaviate_cli/commands/cancel.py +++ b/weaviate_cli/commands/cancel.py @@ -1,10 +1,12 @@ import json import click import sys +from typing import Optional from weaviate_cli.utils import get_client_from_context from weaviate_cli.managers.backup_manager import BackupManager from weaviate_cli.managers.cluster_manager import ClusterManager -from weaviate_cli.defaults import CancelBackupDefaults +from weaviate_cli.managers.export_manager import ExportManager +from weaviate_cli.defaults import CancelBackupDefaults, CancelExportCollectionDefaults # Create Group @@ -85,3 +87,59 @@ def cancel_replication_cli(ctx: click.Context, op_id: str, json_output: bool) -> finally: if client: client.close() + + +@cancel.command("export-collection") +@click.option( + "--export_id", + default=CancelExportCollectionDefaults.export_id, + help=f"Identifier for the export (default: {CancelExportCollectionDefaults.export_id}).", +) +@click.option( + "--backend", + default=CancelExportCollectionDefaults.backend, + type=click.Choice(["filesystem", "s3", "gcs", "azure"]), + help=f"The backend used for storing the export (default: {CancelExportCollectionDefaults.backend}).", +) +@click.option( + "--bucket", + default=CancelExportCollectionDefaults.bucket, + help="Bucket name for cloud storage backends.", +) +@click.option( + "--path", + default=CancelExportCollectionDefaults.path, + help="Path within the storage backend.", +) +@click.option( + "--json", "json_output", is_flag=True, default=False, help="Output in JSON format." +) +@click.pass_context +def cancel_export_collection_cli( + ctx: click.Context, + export_id: str, + backend: str, + bucket: Optional[str], + path: Optional[str], + json_output: bool, +) -> None: + """Cancel a collection export in Weaviate.""" + client = None + try: + client = get_client_from_context(ctx) + export_manager = ExportManager(client) + export_manager.cancel_export( + export_id=export_id, + backend=backend, + bucket=bucket, + path=path, + json_output=json_output, + ) + except Exception as e: + click.echo(f"Error: {e}") + if client: + client.close() + sys.exit(1) + finally: + if client: + client.close() diff --git a/weaviate_cli/commands/create.py b/weaviate_cli/commands/create.py index d47d60b..d927dfc 100644 --- a/weaviate_cli/commands/create.py +++ b/weaviate_cli/commands/create.py @@ -11,6 +11,7 @@ ) from weaviate_cli.managers.alias_manager import AliasManager from weaviate_cli.managers.backup_manager import BackupManager +from weaviate_cli.managers.export_manager import ExportManager from weaviate_cli.utils import get_client_from_context, get_async_client_from_context from weaviate_cli.managers.collection_manager import CollectionManager from weaviate_cli.managers.tenant_manager import TenantManager @@ -22,6 +23,7 @@ from weaviate_cli.defaults import ( CreateBackupDefaults, CreateCollectionDefaults, + CreateExportCollectionDefaults, CreateTenantsDefaults, CreateDataDefaults, CreateRoleDefaults, @@ -822,3 +824,88 @@ def create_replication_cli( finally: if client: client.close() + + +@create.command("export-collection") +@click.option( + "--export_id", + default=CreateExportCollectionDefaults.export_id, + help=f"Identifier for the export (default: {CreateExportCollectionDefaults.export_id}).", +) +@click.option( + "--backend", + default=CreateExportCollectionDefaults.backend, + type=click.Choice(["filesystem", "s3", "gcs", "azure"]), + help=f"The backend used for storing the export (default: {CreateExportCollectionDefaults.backend}).", +) +@click.option( + "--file_format", + default=CreateExportCollectionDefaults.file_format, + type=click.Choice(["parquet"]), + help=f"The file format for the export (default: {CreateExportCollectionDefaults.file_format}).", +) +@click.option( + "--include", + default=CreateExportCollectionDefaults.include, + help="Comma separated list of collections to include in the export.", +) +@click.option( + "--exclude", + default=CreateExportCollectionDefaults.exclude, + help="Comma separated list of collections to exclude from the export.", +) +@click.option( + "--wait", + is_flag=True, + help="Wait for the export to complete before returning.", +) +@click.option( + "--bucket", + default=CreateExportCollectionDefaults.bucket, + help="Bucket name for cloud storage backends.", +) +@click.option( + "--path", + default=CreateExportCollectionDefaults.path, + help="Path within the storage backend.", +) +@click.option( + "--json", "json_output", is_flag=True, default=False, help="Output in JSON format." +) +@click.pass_context +def create_export_collection_cli( + ctx: click.Context, + export_id: str, + backend: str, + file_format: str, + include: Optional[str], + exclude: Optional[str], + wait: bool, + bucket: Optional[str], + path: Optional[str], + json_output: bool, +) -> None: + """Create a collection export in Weaviate.""" + client: Optional[WeaviateClient] = None + try: + client = get_client_from_context(ctx) + export_manager = ExportManager(client) + export_manager.create_export( + export_id=export_id, + backend=backend, + file_format=file_format, + include=include, + exclude=exclude, + wait=wait, + bucket=bucket, + path=path, + json_output=json_output, + ) + except Exception as e: + click.echo(f"Error: {e}") + if client: + client.close() + sys.exit(1) + finally: + if client: + client.close() diff --git a/weaviate_cli/commands/get.py b/weaviate_cli/commands/get.py index 3f0a057..86f7bf9 100644 --- a/weaviate_cli/commands/get.py +++ b/weaviate_cli/commands/get.py @@ -8,6 +8,7 @@ collection_name_complete, ) from weaviate_cli.managers.alias_manager import AliasManager +from weaviate_cli.managers.export_manager import ExportManager from weaviate_cli.managers.role_manager import RoleManager from weaviate_cli.managers.tenant_manager import TenantManager from weaviate_cli.managers.user_manager import UserManager @@ -19,6 +20,7 @@ from weaviate.rbac.models import Role from weaviate_cli.defaults import ( GetBackupDefaults, + GetExportCollectionDefaults, GetTenantsDefaults, GetShardsDefaults, GetCollectionDefaults, @@ -565,3 +567,59 @@ def get_replications_cli(ctx: click.Context, json_output: bool) -> None: finally: if client: client.close() + + +@get.command("export-collection") +@click.option( + "--export_id", + default=GetExportCollectionDefaults.export_id, + help=f"Identifier for the export (default: {GetExportCollectionDefaults.export_id}).", +) +@click.option( + "--backend", + default=GetExportCollectionDefaults.backend, + type=click.Choice(["filesystem", "s3", "gcs", "azure"]), + help=f"The backend used for storing the export (default: {GetExportCollectionDefaults.backend}).", +) +@click.option( + "--bucket", + default=GetExportCollectionDefaults.bucket, + help="Bucket name for cloud storage backends.", +) +@click.option( + "--path", + default=GetExportCollectionDefaults.path, + help="Path within the storage backend.", +) +@click.option( + "--json", "json_output", is_flag=True, default=False, help="Output in JSON format." +) +@click.pass_context +def get_export_collection_cli( + ctx: click.Context, + export_id: str, + backend: str, + bucket: Optional[str], + path: Optional[str], + json_output: bool, +) -> None: + """Get the status of a collection export in Weaviate.""" + client = None + try: + client = get_client_from_context(ctx) + export_manager = ExportManager(client) + export_manager.get_export_status( + export_id=export_id, + backend=backend, + bucket=bucket, + path=path, + json_output=json_output, + ) + except Exception as e: + click.echo(f"Error: {e}") + if client: + client.close() + sys.exit(1) + finally: + if client: + client.close() diff --git a/weaviate_cli/defaults.py b/weaviate_cli/defaults.py index 9e5e3d2..f3a4517 100644 --- a/weaviate_cli/defaults.py +++ b/weaviate_cli/defaults.py @@ -302,3 +302,31 @@ class GetAliasDefaults: alias_name: Optional[str] = None collection: Optional[str] = None all: bool = False + + +@dataclass +class CreateExportCollectionDefaults: + export_id: str = "test-export" + backend: str = "filesystem" + file_format: str = "parquet" + include: Optional[str] = None + exclude: Optional[str] = None + wait: bool = False + bucket: Optional[str] = None + path: Optional[str] = None + + +@dataclass +class GetExportCollectionDefaults: + export_id: str = "test-export" + backend: str = "filesystem" + bucket: Optional[str] = None + path: Optional[str] = None + + +@dataclass +class CancelExportCollectionDefaults: + export_id: str = "test-export" + backend: str = "filesystem" + bucket: Optional[str] = None + path: Optional[str] = None diff --git a/weaviate_cli/managers/export_manager.py b/weaviate_cli/managers/export_manager.py new file mode 100644 index 0000000..839b5fd --- /dev/null +++ b/weaviate_cli/managers/export_manager.py @@ -0,0 +1,204 @@ +import json +import click +from typing import Optional +from weaviate.client import WeaviateClient +from weaviate.export.export import ( + ExportConfig, + ExportFileFormat, + ExportStorage, + ExportStatusReturn, +) +from weaviate_cli.defaults import ( + CreateExportCollectionDefaults, + GetExportCollectionDefaults, + CancelExportCollectionDefaults, +) + + +BACKEND_MAP = { + "filesystem": ExportStorage.FILESYSTEM, + "s3": ExportStorage.S3, + "gcs": ExportStorage.GCS, + "azure": ExportStorage.AZURE, +} + +FILE_FORMAT_MAP = { + "parquet": ExportFileFormat.PARQUET, +} + + +class ExportManager: + def __init__(self, client: WeaviateClient) -> None: + self.client: WeaviateClient = client + + def create_export( + self, + export_id: str = CreateExportCollectionDefaults.export_id, + backend: str = CreateExportCollectionDefaults.backend, + file_format: str = CreateExportCollectionDefaults.file_format, + include: Optional[str] = CreateExportCollectionDefaults.include, + exclude: Optional[str] = CreateExportCollectionDefaults.exclude, + wait: bool = CreateExportCollectionDefaults.wait, + bucket: Optional[str] = CreateExportCollectionDefaults.bucket, + path: Optional[str] = CreateExportCollectionDefaults.path, + json_output: bool = False, + ) -> None: + if include and exclude: + raise Exception( + "Cannot specify both --include and --exclude. Use one or the other." + ) + + backend_enum = BACKEND_MAP[backend] + file_format_enum = FILE_FORMAT_MAP[file_format] + + config = None + if bucket or path: + config = ExportConfig(bucket=bucket, path=path) + + include_collections = ( + [c.strip() for c in include.split(",") if c.strip()] if include else None + ) + exclude_collections = ( + [c.strip() for c in exclude.split(",") if c.strip()] if exclude else None + ) + + result = self.client.export.create( + export_id=export_id, + backend=backend_enum, + file_format=file_format_enum, + include_collections=include_collections, + exclude_collections=exclude_collections, + wait_for_completion=wait, + config=config, + ) + + if json_output: + data = { + "status": "success", + "export_id": result.export_id, + "backend": result.backend, + "path": result.path, + "export_status": result.status.value, + "collections": result.collections, + } + if result.started_at: + data["started_at"] = str(result.started_at) + click.echo(json.dumps(data, indent=2, default=str)) + else: + click.echo( + f"Export '{export_id}' created successfully with status '{result.status.value}'." + ) + if result.collections: + click.echo(f"Collections: {', '.join(result.collections)}") + + def get_export_status( + self, + export_id: str = GetExportCollectionDefaults.export_id, + backend: str = GetExportCollectionDefaults.backend, + bucket: Optional[str] = GetExportCollectionDefaults.bucket, + path: Optional[str] = GetExportCollectionDefaults.path, + json_output: bool = False, + ) -> None: + backend_enum = BACKEND_MAP[backend] + + result = self.client.export.get_status( + export_id=export_id, + backend=backend_enum, + bucket=bucket, + path=path, + ) + + self._print_export_status(result, json_output=json_output) + + def cancel_export( + self, + export_id: str = CancelExportCollectionDefaults.export_id, + backend: str = CancelExportCollectionDefaults.backend, + bucket: Optional[str] = CancelExportCollectionDefaults.bucket, + path: Optional[str] = CancelExportCollectionDefaults.path, + json_output: bool = False, + ) -> None: + backend_enum = BACKEND_MAP[backend] + + success = self.client.export.cancel( + export_id=export_id, + backend=backend_enum, + bucket=bucket, + path=path, + ) + + if success: + if json_output: + click.echo( + json.dumps( + { + "status": "success", + "message": f"Export '{export_id}' canceled successfully.", + }, + indent=2, + ) + ) + else: + click.echo(f"Export '{export_id}' canceled successfully.") + else: + raise Exception(f"Export '{export_id}' could not be canceled.") + + def _print_export_status( + self, result: ExportStatusReturn, json_output: bool = False + ) -> None: + if json_output: + data = { + "export_id": result.export_id, + "backend": result.backend, + "path": result.path, + "status": result.status.value, + "collections": result.collections, + } + if result.started_at: + data["started_at"] = str(result.started_at) + if result.error: + data["error"] = result.error + if result.took_in_ms is not None: + data["took_in_ms"] = result.took_in_ms + if result.shard_status: + data["shard_status"] = { + collection: { + shard: { + "status": progress.status.value, + "objects_exported": progress.objects_exported, + **({"error": progress.error} if progress.error else {}), + **( + {"skip_reason": progress.skip_reason} + if progress.skip_reason + else {} + ), + } + for shard, progress in shards.items() + } + for collection, shards in result.shard_status.items() + } + click.echo(json.dumps(data, indent=2, default=str)) + else: + click.echo(f"Export ID: {result.export_id}") + click.echo(f"Backend: {result.backend}") + click.echo(f"Path: {result.path}") + click.echo(f"Status: {result.status.value}") + if result.collections: + click.echo(f"Collections: {', '.join(result.collections)}") + if result.started_at: + click.echo(f"Started at: {result.started_at}") + if result.error: + click.echo(f"Error: {result.error}") + if result.took_in_ms is not None: + click.echo(f"Took: {result.took_in_ms}ms") + if result.shard_status: + click.echo("Shard Status:") + for collection, shards in result.shard_status.items(): + click.echo(f" {collection}:") + for shard, progress in shards.items(): + status_line = f" {shard}: {progress.status.value} ({progress.objects_exported} objects)" + if progress.error: + status_line += f" - Error: {progress.error}" + if progress.skip_reason: + status_line += f" - Skipped: {progress.skip_reason}" + click.echo(status_line)