diff --git a/.claude/skills/operating-weaviate-cli/SKILL.md b/.claude/skills/operating-weaviate-cli/SKILL.md index 569097a..bc9d38f 100644 --- a/.claude/skills/operating-weaviate-cli/SKILL.md +++ b/.claude/skills/operating-weaviate-cli/SKILL.md @@ -139,9 +139,26 @@ weaviate-cli delete collection --collection MyCollection --json weaviate-cli delete collection --all --json ``` -Key create options: `--multitenant`, `--auto_tenant_creation`, `--auto_tenant_activation`, `--shards N`, `--vectorizer `, `--named_vector`, `--replication_deletion_strategy`, `--object_ttl_type`, `--object_ttl_time`, `--object_ttl_filter_expired`, `--object_ttl_property_name` (only when `object_ttl_type=property`) +Key create options: `--multitenant`, `--auto_tenant_creation`, `--auto_tenant_activation`, `--shards N`, `--vectorizer `, `--named_vector`, `--replication_deletion_strategy`, `--async_replication_config key=value` (repeatable; requires `--async_enabled` and Weaviate >= v1.34.18), `--object_ttl_type`, `--object_ttl_time`, `--object_ttl_filter_expired`, `--object_ttl_property_name` (only when `object_ttl_type=property`) -Mutable fields: `--async_enabled`, `--replication_factor`, `--vector_index`, `--description`, `--training_limit`, `--auto_tenant_creation`, `--auto_tenant_activation`, `--replication_deletion_strategy`, `--object_ttl_type`, `--object_ttl_time`, `--object_ttl_filter_expired`, `--object_ttl_property_name` (only when `object_ttl_type=property`) +Mutable fields: `--async_enabled`, `--replication_factor`, `--vector_index`, `--description`, `--training_limit`, `--auto_tenant_creation`, `--auto_tenant_activation`, `--replication_deletion_strategy`, `--async_replication_config key=value` (repeatable), `--object_ttl_type`, `--object_ttl_time`, `--object_ttl_filter_expired`, `--object_ttl_property_name` (only when `object_ttl_type=property`) + +#### Async Replication Config + +```bash +# Create with async replication tuning (requires --async_enabled and Weaviate >= v1.34.18) +weaviate-cli create collection --collection MyCol --async_enabled \ + --async_replication_config max_workers=10 \ + --async_replication_config frequency=60 \ + --async_replication_config propagation_concurrency=4 --json + +# Update async replication config on existing collection +weaviate-cli update collection --collection MyCol \ + --async_replication_config max_workers=20 \ + --async_replication_config propagation_batch_size=100 --json +``` + +Valid keys (all integers): `max_workers`, `hashtree_height`, `frequency`, `frequency_while_propagating`, `alive_nodes_checking_frequency`, `logging_frequency`, `diff_batch_size`, `diff_per_node_timeout`, `pre_propagation_timeout`, `propagation_timeout`, `propagation_limit`, `propagation_delay`, `propagation_concurrency`, `propagation_batch_size` #### Object TTL diff --git a/.claude/skills/operating-weaviate-cli/references/collections.md b/.claude/skills/operating-weaviate-cli/references/collections.md index 48909b2..ead530d 100644 --- a/.claude/skills/operating-weaviate-cli/references/collections.md +++ b/.claude/skills/operating-weaviate-cli/references/collections.md @@ -43,6 +43,20 @@ weaviate-cli create collection \ - `--object_ttl_time` -- Time to live in seconds (default: None, TTL disabled when omitted) - `--object_ttl_filter_expired` -- Filter expired-but-not-yet-deleted objects from queries - `--object_ttl_property_name` -- Date property name for TTL when `object_ttl_type=property` (default: "releaseDate"). **Only valid when `--object_ttl_type=property`**; rejected otherwise. +- `--async_replication_config` -- Async replication tuning as `key=value` pairs (repeatable). Valid keys: `max_workers`, `hashtree_height`, `frequency`, `frequency_while_propagating`, `alive_nodes_checking_frequency`, `logging_frequency`, `diff_batch_size`, `diff_per_node_timeout`, `pre_propagation_timeout`, `propagation_timeout`, `propagation_limit`, `propagation_delay`, `propagation_concurrency`, `propagation_batch_size`. All values must be integers. Use `reset` to revert all to server defaults. Requires `--async_enabled` on create and Weaviate >= v1.34.18. + +**Async replication config examples:** +```bash +# Create with custom async replication tuning +weaviate-cli create collection --collection MyCol --async_enabled \ + --async_replication_config max_workers=10 \ + --async_replication_config frequency=60 \ + --async_replication_config propagation_concurrency=4 + +# Set a single parameter +weaviate-cli create collection --collection MyCol --async_enabled \ + --async_replication_config propagation_batch_size=200 +``` **Object TTL examples:** ```bash @@ -68,10 +82,18 @@ weaviate-cli update collection \ --json ``` -Mutable fields: `--async_enabled`, `--replication_factor`, `--vector_index`, `--description`, `--training_limit`, `--auto_tenant_creation`, `--auto_tenant_activation`, `--replication_deletion_strategy`, `--object_ttl_type`, `--object_ttl_time`, `--object_ttl_filter_expired`, `--object_ttl_property_name` (only when `object_ttl_type=property`) +Mutable fields: `--async_enabled`, `--replication_factor`, `--vector_index`, `--description`, `--training_limit`, `--auto_tenant_creation`, `--auto_tenant_activation`, `--replication_deletion_strategy`, `--async_replication_config`, `--object_ttl_type`, `--object_ttl_time`, `--object_ttl_filter_expired`, `--object_ttl_property_name` (only when `object_ttl_type=property`) **Immutable (cannot change after creation):** multitenant, vectorizer, named_vector, shards +**Async replication config examples (update):** +```bash +# Update async replication tuning on existing collection +weaviate-cli update collection --collection MyCol \ + --async_replication_config max_workers=20 \ + --async_replication_config propagation_batch_size=100 +``` + **Object TTL options for update:** - `--object_ttl_type` -- TTL event type: create, update, property, **disable** (default: "create") - `--object_ttl_time` -- Time to live in seconds (set together with type to enable TTL) diff --git a/requirements-dev.txt b/requirements-dev.txt index 64cd126..177c26a 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,4 +1,4 @@ -weaviate-client>=4.20.4 +weaviate-client @ git+https://github.com/weaviate/weaviate-python-client.git@jose/fix-async-repl-config-get click==8.1.7 twine pytest diff --git a/setup.cfg b/setup.cfg index c7e11bf..2bebd7b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -37,7 +37,7 @@ classifiers = include_package_data = True python_requires = >=3.9 install_requires = - weaviate-client>=4.20.4 + weaviate-client @ git+https://github.com/weaviate/weaviate-python-client.git@main click==8.1.7 semver>=3.0.2 numpy>=1.24.0 diff --git a/test/unittests/test_managers/test_collection_manager.py b/test/unittests/test_managers/test_collection_manager.py index 60c8c3e..877a042 100644 --- a/test/unittests/test_managers/test_collection_manager.py +++ b/test/unittests/test_managers/test_collection_manager.py @@ -777,3 +777,163 @@ def test_update_collection_with_ttl_disable_type(mock_client, mock_wvc_object_tt assert update_call_kwargs["object_ttl_config"] is not None # Verify the disable method was called mock_wvc_object_ttl["reconfigure"].disable.assert_called_once() + + +def test_create_collection_with_async_replication_config( + mock_client, mock_wvc_object_ttl +): + """Test creating a collection with async replication config parameters.""" + mock_collections = MagicMock() + mock_client.collections = mock_collections + mock_collections.exists.side_effect = [False, True] + mock_client.get_meta.return_value = {"version": "1.36.0"} + + manager = CollectionManager(mock_client) + + async_config = {"max_workers": 10, "frequency": 60, "propagation_concurrency": 4} + + manager.create_collection( + collection="TestCollection", + replication_factor=3, + vector_index="hnsw", + async_enabled=True, + async_replication_config=async_config, + ) + + mock_collections.create.assert_called_once() + create_call_kwargs = mock_collections.create.call_args.kwargs + assert create_call_kwargs["name"] == "TestCollection" + assert create_call_kwargs["replication_config"].asyncEnabled is True + # Verify async_config is set on the replication config + repl_config = create_call_kwargs["replication_config"] + assert repl_config.asyncConfig is not None + assert repl_config.asyncConfig.maxWorkers == 10 + assert repl_config.asyncConfig.frequency == 60 + assert repl_config.asyncConfig.propagationConcurrency == 4 + + +def test_create_collection_without_async_replication_config( + mock_client, mock_wvc_object_ttl +): + """Test creating a collection without async replication config passes None.""" + mock_collections = MagicMock() + mock_client.collections = mock_collections + mock_collections.exists.side_effect = [False, True] + + manager = CollectionManager(mock_client) + + manager.create_collection( + collection="TestCollection", + replication_factor=3, + vector_index="hnsw", + async_enabled=True, + ) + + mock_collections.create.assert_called_once() + repl_config = mock_collections.create.call_args.kwargs["replication_config"] + assert repl_config.asyncConfig is None + + +def test_create_collection_async_replication_config_requires_async_enabled( + mock_client, +): + """Test that async_replication_config is rejected when async_enabled is False.""" + mock_collections = MagicMock() + mock_client.collections = mock_collections + mock_collections.exists.return_value = False + + manager = CollectionManager(mock_client) + + with pytest.raises(Exception, match="requires --async_enabled"): + manager.create_collection( + collection="TestCollection", + replication_factor=3, + vector_index="hnsw", + async_enabled=False, + async_replication_config={"max_workers": 10}, + ) + + mock_collections.create.assert_not_called() + + +def test_update_collection_with_async_replication_config( + mock_client, mock_wvc_object_ttl +): + """Test updating a collection with async replication config parameters.""" + mock_collections = MagicMock() + mock_client.collections = mock_collections + mock_client.collections.exists.side_effect = [True, True] + mock_client.get_meta.return_value = {"version": "1.36.0"} + + mock_collection = MagicMock() + mock_client.collections.get.return_value = mock_collection + mock_collection.config.get.return_value = MagicMock( + replication_config=MagicMock(factor=3), + multi_tenancy_config=MagicMock( + enabled=False, auto_tenant_creation=False, auto_tenant_activation=False + ), + ) + + manager = CollectionManager(mock_client) + + async_config = {"max_workers": 20, "propagation_batch_size": 100} + + manager.update_collection( + collection="TestCollection", + async_replication_config=async_config, + ) + + mock_collection.config.update.assert_called_once() + repl_config = mock_collection.config.update.call_args.kwargs["replication_config"] + assert repl_config.asyncConfig is not None + assert repl_config.asyncConfig.maxWorkers == 20 + assert repl_config.asyncConfig.propagationBatchSize == 100 + + +def test_update_collection_without_async_replication_config( + mock_client, mock_wvc_object_ttl +): + """Test updating a collection without async replication config passes None.""" + mock_collections = MagicMock() + mock_client.collections = mock_collections + mock_client.collections.exists.side_effect = [True, True] + + mock_collection = MagicMock() + mock_client.collections.get.return_value = mock_collection + mock_collection.config.get.return_value = MagicMock( + replication_config=MagicMock(factor=3), + multi_tenancy_config=MagicMock( + enabled=False, auto_tenant_creation=False, auto_tenant_activation=False + ), + ) + + manager = CollectionManager(mock_client) + + manager.update_collection( + collection="TestCollection", + description="Updated", + ) + + mock_collection.config.update.assert_called_once() + repl_config = mock_collection.config.update.call_args.kwargs["replication_config"] + assert repl_config.asyncConfig is None + + +def test_update_collection_async_replication_config_rejected_when_async_false( + mock_client, +): + """Test that async_replication_config is rejected when async_enabled is explicitly False.""" + mock_collections = MagicMock() + mock_client.collections = mock_collections + mock_client.collections.exists.return_value = True + + manager = CollectionManager(mock_client) + + with pytest.raises(Exception, match="cannot be used when --async_enabled is False"): + manager.update_collection( + collection="TestCollection", + async_enabled=False, + async_replication_config={"max_workers": 10}, + ) + + mock_collections.get.return_value.config.update.assert_not_called() diff --git a/test/unittests/test_utils.py b/test/unittests/test_utils.py index 02f9f38..1d14def 100644 --- a/test/unittests/test_utils.py +++ b/test/unittests/test_utils.py @@ -5,6 +5,7 @@ get_random_string, pp_objects, parse_permission, + parse_async_replication_config, ) from weaviate.collections import Collection from io import StringIO @@ -386,3 +387,77 @@ def test_parse_permission_invalid(): with pytest.raises(ValueError, match="Invalid permission action: update_nodes"): parse_permission("update_nodes") + + +def test_parse_async_replication_config_none(): + assert parse_async_replication_config(None) is None + + +def test_parse_async_replication_config_empty(): + assert parse_async_replication_config(()) is None + + +def test_parse_async_replication_config_single_key(): + result = parse_async_replication_config(("max_workers=10",)) + assert result == {"max_workers": 10} + + +def test_parse_async_replication_config_multiple_keys(): + result = parse_async_replication_config( + ("max_workers=10", "frequency=60", "propagation_concurrency=4") + ) + assert result == {"max_workers": 10, "frequency": 60, "propagation_concurrency": 4} + + +def test_parse_async_replication_config_all_keys(): + all_keys = ( + "max_workers=1", + "hashtree_height=2", + "frequency=3", + "frequency_while_propagating=4", + "alive_nodes_checking_frequency=5", + "logging_frequency=6", + "diff_batch_size=7", + "diff_per_node_timeout=8", + "pre_propagation_timeout=9", + "propagation_timeout=10", + "propagation_limit=11", + "propagation_delay=12", + "propagation_concurrency=13", + "propagation_batch_size=14", + ) + result = parse_async_replication_config(all_keys) + assert len(result) == 14 + assert result["max_workers"] == 1 + assert result["propagation_batch_size"] == 14 + + +def test_parse_async_replication_config_invalid_key(): + with pytest.raises(ValueError, match="Unknown async replication config key"): + parse_async_replication_config(("invalid_key=10",)) + + +def test_parse_async_replication_config_invalid_value(): + with pytest.raises(ValueError, match="Must be an integer"): + parse_async_replication_config(("max_workers=abc",)) + + +def test_parse_async_replication_config_missing_equals(): + with pytest.raises(ValueError, match="Expected key=value"): + parse_async_replication_config(("max_workers",)) + + +def test_parse_async_replication_config_reset(): + result = parse_async_replication_config(("reset",)) + assert result == {} + + +def test_parse_async_replication_config_reset_case_insensitive(): + assert parse_async_replication_config(("Reset",)) == {} + assert parse_async_replication_config(("RESET",)) == {} + assert parse_async_replication_config((" reset ",)) == {} + + +def test_parse_async_replication_config_reset_with_other_keys(): + with pytest.raises(ValueError, match="Expected key=value"): + parse_async_replication_config(("reset", "max_workers=10")) diff --git a/weaviate_cli/commands/create.py b/weaviate_cli/commands/create.py index 8952880..c0f4727 100644 --- a/weaviate_cli/commands/create.py +++ b/weaviate_cli/commands/create.py @@ -11,7 +11,12 @@ ) from weaviate_cli.managers.alias_manager import AliasManager from weaviate_cli.managers.backup_manager import BackupManager -from weaviate_cli.utils import get_client_from_context, get_async_client_from_context +from weaviate_cli.utils import ( + get_client_from_context, + get_async_client_from_context, + parse_async_replication_config, + ASYNC_REPLICATION_CONFIG_HELP, +) from weaviate_cli.managers.collection_manager import CollectionManager from weaviate_cli.managers.tenant_manager import TenantManager from weaviate_cli.managers.data_manager import DataManager @@ -215,6 +220,11 @@ def create() -> None: type=int, help="Rescore limit (default: None, set by Weaviate server).", ) +@click.option( + "--async_replication_config", + multiple=True, + help=ASYNC_REPLICATION_CONFIG_HELP, +) @click.pass_context def create_collection_cli( ctx: click.Context, @@ -244,6 +254,7 @@ def create_collection_cli( object_ttl_time: Optional[int], object_ttl_filter_expired: bool, object_ttl_property_name: Optional[str], + async_replication_config: tuple, ) -> None: """Create a collection in Weaviate.""" @@ -289,6 +300,9 @@ def create_collection_cli( object_ttl_time=object_ttl_time, object_ttl_filter_expired=object_ttl_filter_expired, object_ttl_property_name=object_ttl_property_name, + async_replication_config=parse_async_replication_config( + async_replication_config + ), ) except Exception as e: click.echo(f"Error: {e}") diff --git a/weaviate_cli/commands/update.py b/weaviate_cli/commands/update.py index 15c41b0..daaa730 100644 --- a/weaviate_cli/commands/update.py +++ b/weaviate_cli/commands/update.py @@ -7,7 +7,11 @@ from weaviate_cli.managers.alias_manager import AliasManager from weaviate_cli.managers.tenant_manager import TenantManager from weaviate_cli.managers.user_manager import UserManager -from weaviate_cli.utils import get_client_from_context +from weaviate_cli.utils import ( + get_client_from_context, + parse_async_replication_config, + ASYNC_REPLICATION_CONFIG_HELP, +) from weaviate_cli.managers.collection_manager import CollectionManager from weaviate_cli.managers.shard_manager import ShardManager from weaviate_cli.managers.data_manager import DataManager @@ -110,6 +114,11 @@ def update() -> None: type=str, help="Date property name for TTL when object_ttl_type is 'property' (default: 'releaseDate'). Only valid when --object_ttl_type=property.", ) +@click.option( + "--async_replication_config", + multiple=True, + help=ASYNC_REPLICATION_CONFIG_HELP, +) @click.pass_context def update_collection_cli( ctx: click.Context, @@ -127,6 +136,7 @@ def update_collection_cli( object_ttl_time: Optional[int], object_ttl_filter_expired: bool, object_ttl_property_name: Optional[str], + async_replication_config: tuple, ) -> None: """Update a collection in Weaviate.""" @@ -160,6 +170,9 @@ def update_collection_cli( object_ttl_time=object_ttl_time, object_ttl_filter_expired=object_ttl_filter_expired, object_ttl_property_name=object_ttl_property_name, + async_replication_config=parse_async_replication_config( + async_replication_config + ), ) except Exception as e: click.echo(f"Error: {e}") diff --git a/weaviate_cli/defaults.py b/weaviate_cli/defaults.py index fcc9bc1..d60471e 100644 --- a/weaviate_cli/defaults.py +++ b/weaviate_cli/defaults.py @@ -87,6 +87,7 @@ class CreateCollectionDefaults: object_ttl_time: Optional[int] = None object_ttl_filter_expired: Optional[bool] = None object_ttl_property_name: str = "releaseDate" + async_replication_config: Optional[tuple] = None @dataclass @@ -262,6 +263,7 @@ class UpdateCollectionDefaults: object_ttl_time: Optional[int] = None object_ttl_filter_expired: Optional[bool] = None object_ttl_property_name: str = "releaseDate" + async_replication_config: Optional[tuple] = None @dataclass diff --git a/weaviate_cli/managers/collection_manager.py b/weaviate_cli/managers/collection_manager.py index d783252..049f577 100644 --- a/weaviate_cli/managers/collection_manager.py +++ b/weaviate_cli/managers/collection_manager.py @@ -12,7 +12,7 @@ DeleteCollectionDefaults, GetCollectionDefaults, ) -from weaviate_cli.utils import print_json_or_text +from weaviate_cli.utils import print_json_or_text, older_than_version import weaviate.classes.config as wvc from prettytable import PrettyTable @@ -227,6 +227,7 @@ def create_collection( object_ttl_property_name: Optional[ str ] = CreateCollectionDefaults.object_ttl_property_name, + async_replication_config: Optional[Dict[str, int]] = None, ) -> None: if ( @@ -243,6 +244,19 @@ def create_collection( f"Error: Collection '{collection}' already exists in Weaviate. Delete using command." ) + if async_replication_config is not None and not async_enabled: + raise Exception( + "Error: --async_replication_config requires --async_enabled to be set." + ) + + if async_replication_config is not None and older_than_version( + self.client, "1.34.18" + ): + click.echo( + "Warning: --async_replication_config requires Weaviate >= v1.34.18. " + "The server may ignore or reject these settings." + ) + if named_vector_name != "default" and not named_vector: raise Exception( "Error: Named vector name is only supported with named vectors. Please use --named_vector to enable named vectors." @@ -554,6 +568,13 @@ def create_collection( if replication_deletion_strategy else None ), + async_config=( + wvc.Configure.Replication.async_config( + **async_replication_config + ) + if async_replication_config is not None + else None + ), ), sharding_config=( wvc.Configure.sharding(desired_count=shards) if shards > 0 else None @@ -620,6 +641,7 @@ def update_collection( object_ttl_property_name: Optional[ str ] = UpdateCollectionDefaults.object_ttl_property_name, + async_replication_config: Optional[Dict[str, int]] = None, ) -> None: if ( @@ -630,6 +652,19 @@ def update_collection( raise Exception( "object_ttl_property_name is only valid when object_ttl_type is 'property'." ) + if async_replication_config is not None and async_enabled is False: + raise Exception( + "Error: --async_replication_config cannot be used when --async_enabled is False." + ) + + if async_replication_config is not None and older_than_version( + self.client, "1.34.18" + ): + click.echo( + "Warning: --async_replication_config requires Weaviate >= v1.34.18. " + "The server may ignore or reject these settings." + ) + if not self.client.collections.exists(collection): raise Exception( @@ -715,6 +750,13 @@ def update_collection( if replication_deletion_strategy else None ), + async_config=( + wvc.Reconfigure.Replication.async_config( + **async_replication_config + ) + if async_replication_config is not None + else None + ), ) ), multi_tenancy_config=( diff --git a/weaviate_cli/utils.py b/weaviate_cli/utils.py index a1b75d9..9ad0119 100644 --- a/weaviate_cli/utils.py +++ b/weaviate_cli/utils.py @@ -116,6 +116,83 @@ def pp_objects(response, main_properties, json_output: bool = False): print(f"Total: {len(objects)} objects") +ASYNC_REPLICATION_CONFIG_KEYS = { + "max_workers", + "hashtree_height", + "frequency", + "frequency_while_propagating", + "alive_nodes_checking_frequency", + "logging_frequency", + "diff_batch_size", + "diff_per_node_timeout", + "pre_propagation_timeout", + "propagation_timeout", + "propagation_limit", + "propagation_delay", + "propagation_concurrency", + "propagation_batch_size", +} + + +ASYNC_REPLICATION_CONFIG_RESET = "reset" + +ASYNC_REPLICATION_CONFIG_HELP = ( + "Async replication config as key=value pairs. Can be specified multiple times. " + "Valid keys: " + ", ".join(sorted(ASYNC_REPLICATION_CONFIG_KEYS)) + ". " + "All values must be integers. " + 'Use "reset" to revert all async replication settings to server defaults. ' + "Requires --async_enabled on create and Weaviate >= v1.34.18." +) + + +def parse_async_replication_config( + config_tuples: Optional[tuple], +) -> Optional[dict]: + """Parse async replication config key=value tuples into a dict. + + Args: + config_tuples: Tuple of "key=value" strings, e.g. ("max_workers=10", "frequency=60"), + or a single "reset" to revert all settings to server defaults. + + Returns: + Dict mapping parameter names to integer values, empty dict for "reset", + or None if empty/None. + + Raises: + ValueError: If a key is unknown or a value is not a valid integer. + """ + if not config_tuples: + return None + + if len(config_tuples) == 1 and config_tuples[0].strip().lower() == "reset": + return {} + + result = {} + for item in config_tuples: + if "=" not in item: + raise ValueError( + f"Invalid async replication config format: '{item}'. Expected key=value." + ) + key, value = item.split("=", 1) + key = key.strip() + value = value.strip() + + if key not in ASYNC_REPLICATION_CONFIG_KEYS: + raise ValueError( + f"Unknown async replication config key: '{key}'. " + f"Valid keys: {', '.join(sorted(ASYNC_REPLICATION_CONFIG_KEYS))}" + ) + + try: + result[key] = int(value) + except ValueError: + raise ValueError( + f"Invalid value for '{key}': '{value}'. Must be an integer." + ) + + return result if result else None + + def parse_permission(perm: str) -> PermissionsCreateType: """ Convert a permission string to RBAC permission object(s).