From f9d75d1043f0fdfe16a5b2f7fb1937361c944c77 Mon Sep 17 00:00:00 2001 From: Mohamed Shahin Date: Thu, 21 Nov 2024 14:25:37 +0000 Subject: [PATCH 1/4] new feature - add batch import from a local json file --- README.md | 6 +- cli.py | 8 ++ weaviate_cli/commands/batch.py | 99 +++++++++++++++++++++++ weaviate_cli/managers/batch_manager.py | 108 +++++++++++++++++++++++++ 4 files changed, 220 insertions(+), 1 deletion(-) create mode 100644 weaviate_cli/commands/batch.py create mode 100644 weaviate_cli/managers/batch_manager.py diff --git a/README.md b/README.md index 75ac7e9..ec3c359 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ A powerful command-line interface for managing and interacting with Weaviate vector databases directly from your terminal. ## Key Features -- **Collections**: Create, update, delete and get collection configurations +- **Collections**: Create, batch, update, delete and get collection configurations - **Data Management**: Import, query, update and delete data with various search types (vector, keyword, hybrid) - **Multi-tenancy**: Manage tenants and their states across collections - **Backup & Restore**: Create and restore backups with support for S3, GCS and filesystem @@ -40,6 +40,9 @@ weaviate-cli create collection --collection movies --vectorizer transformers # Import test data weaviate-cli create data --collection movies --limit 1000 +# Batch data import from local json file +weaviate-cli batch insert --collection --path --vectorizer --replication-factor 3 + # Query data weaviate-cli query data --collection movies --search-type hybrid --query "action movies" ``` @@ -47,6 +50,7 @@ weaviate-cli query data --collection movies --search-type hybrid --query "action ## Core Commands - **create**: Create collections, tenants, backups or import data +- **batch**: batch import a collection from a local file - **delete**: Remove collections, tenants or data - **update**: Modify collection settings, tenant states or data - **get**: Retrieve collection info, tenant details or shard status diff --git a/cli.py b/cli.py index a61fdf9..5750130 100644 --- a/cli.py +++ b/cli.py @@ -9,8 +9,12 @@ from weaviate_cli.commands.query import query from weaviate_cli.commands.restore import restore from weaviate_cli.commands.cancel import cancel +<<<<<<< HEAD from weaviate_cli.commands.assign import assign from weaviate_cli.commands.revoke import revoke +======= +from weaviate_cli.commands.batch import batch +>>>>>>> 19aebe7 (new feature - add batch import from a local json file) from weaviate_cli import __version__ @@ -60,8 +64,12 @@ def main(ctx: click.Context, config_file: Optional[str], user: Optional[str]): main.add_command(restore) main.add_command(query) main.add_command(cancel) +<<<<<<< HEAD main.add_command(assign) main.add_command(revoke) +======= +main.add_command(batch) +>>>>>>> 19aebe7 (new feature - add batch import from a local json file) if __name__ == "__main__": main() diff --git a/weaviate_cli/commands/batch.py b/weaviate_cli/commands/batch.py new file mode 100644 index 0000000..014385e --- /dev/null +++ b/weaviate_cli/commands/batch.py @@ -0,0 +1,99 @@ +import click +import sys +import os +import json +from weaviate_cli.utils import get_client_from_context +from weaviate.exceptions import WeaviateConnectionError +from weaviate_cli.defaults import CreateCollectionDefaults +from weaviate_cli.managers.batch_manager import BatchManager + +@click.group() +def batch() -> None: + """Batch operations in Weaviate.""" + pass + +@batch.command("insert") +@click.option( + "--collection", + required=True, + help="The name of the collection (class) to insert data into.", +) +@click.option( + "--path", + required=True, + type=str, + help="Path to the JSON file containing the data.", +) +@click.option( + "--vectorizer", + default=CreateCollectionDefaults.vectorizer, + type=click.Choice( + ["contextionary", "transformers", "openai", "ollama", "cohere", "jinaai"] + ), + help="Vectorizer to use.", +) +@click.option( + "--shards", + default=1, + help="Number of shards for the collection (default: 1).", +) +@click.option( + "--replication-factor", + default=1, + help="Replication factor for the collection (default: 1).", +) +@click.pass_context +def batch_insert_cli(ctx, collection, path, vectorizer, shards, replication_factor): + """ + Insert data into a Weaviate collection (class) in batch mode. + """ + # Validate the file path and extension + if not os.path.isfile(path): + click.echo(f"Error: The file {path} does not exist.") + sys.exit(1) + if not path.endswith(".json"): + click.echo("Error: The file must have a .json extension.") + sys.exit(1) + + # Load the JSON data + try: + with open(path, "r") as file: + data = json.load(file) + except json.JSONDecodeError: + click.echo(f"Error: The file {path} is not a valid JSON file.") + sys.exit(1) + + # Validate JSON structure + if not isinstance(data, list) or not all(isinstance(obj, dict) for obj in data): + click.echo("Error: The JSON file must contain a list of objects (e.g., [{...}, {...}]).") + sys.exit(1) + + # Initialize the Weaviate client + client = None + try: + client = get_client_from_context(ctx) + batch_manager = BatchManager(client) + + # Create the collection (if it doesn't exist) + click.echo(f"Ensuring collection '{collection}' exists...") + batch_manager.create_collection( + collection=collection, + vectorizer=vectorizer, + shards=shards, + replication_factor=replication_factor, + force_auto_schema=True, + ) + + # Insert the data in batch mode + click.echo(f"Inserting data into collection '{collection}'...") + batch_manager.batch_insert(collection, data) + + except WeaviateConnectionError as wce: + click.echo(f"Connection error: {wce}") + sys.exit(1) + except Exception as e: + click.echo(f"Error: {e}") + sys.exit(1) + finally: + if client: + client.close() \ No newline at end of file diff --git a/weaviate_cli/managers/batch_manager.py b/weaviate_cli/managers/batch_manager.py new file mode 100644 index 0000000..76f6199 --- /dev/null +++ b/weaviate_cli/managers/batch_manager.py @@ -0,0 +1,108 @@ +import click +from typing import Dict, List, Optional +import weaviate.classes.config as wvc +from weaviate.client import WeaviateClient + + +class BatchManager: + def __init__(self, client: WeaviateClient) -> None: + self.client = client + + def create_collection( + self, + collection: str, + vectorizer: str = "contextionary", + shards: int = 1, + replication_factor: int = 1, + force_auto_schema: bool = True, + ) -> None: + """ + Create a collection dynamically for batch insertion. + + Args: + collection (str): Name of the collection to create. + vectorizer (str): Vectorizer type (e.g., openai, transformers). + shards (int): Number of shards for the collection. + replication_factor (int): Replication factor for the collection. + force_auto_schema (bool): Whether to let Weaviate infer schema from inserted data. + """ + if self.client.collections.exists(collection): + click.echo(f"Collection '{collection}' already exists. Skipping creation.") + return + + # Map vectorizers to Weaviate configurations + vectorizer_map: Dict[str, wvc.VectorizerConfig] = { + "contextionary": wvc.Configure.Vectorizer.text2vec_contextionary(), + "transformers": wvc.Configure.Vectorizer.text2vec_transformers(), + "openai": wvc.Configure.Vectorizer.text2vec_openai(), + "ollama": wvc.Configure.Vectorizer.text2vec_ollama( + model="snowflake-arctic-embed:33m" + ), + "cohere": wvc.Configure.Vectorizer.text2vec_cohere(), + "jinaai": wvc.Configure.Vectorizer.text2vec_jinaai(), + } + + # Validate vectorizer + if vectorizer not in vectorizer_map: + raise ValueError( + f"Invalid vectorizer '{vectorizer}'. Choose from: {list(vectorizer_map.keys())}" + ) + + try: + # Create collection with configuration + self.client.collections.create( + name=collection, + vector_index_config=wvc.Configure.VectorIndex.hnsw(), + replication_config=wvc.Configure.replication( + factor=replication_factor, + async_enabled=False, + deletion_strategy=wvc.ReplicationDeletionStrategy.DELETE_ON_CONFLICT, + ), + sharding_config=wvc.Configure.sharding(desired_count=shards), + vectorizer_config=vectorizer_map[vectorizer], + properties=None if force_auto_schema else [], + ) + click.echo(f"Collection '{collection}' created successfully with vectorizer '{vectorizer}'.") + except Exception as e: + raise Exception(f"Error creating collection '{collection}': {e}") + + def batch_insert( + self, + collection: str, + data: List[Dict], + ) -> None: + """ + Insert data into a collection in batch. + + Args: + collection (str): Name of the collection. + data (List[Dict]): Data to be inserted. + """ + if not self.client.collections.exists(collection): + raise Exception(f"Collection '{collection}' does not exist. Cannot insert data.") + + try: + # Perform batch insertion using Weaviate's dynamic batch + with self.client.batch.dynamic() as batch: + for record in data: + # Remove the reserved 'id' key, if present - to avoid Error message: WeaviateInsertInvalidPropertyError("It is forbidden to insert `id` or `vector` + if "id" in record: + record.pop("id") + + # Add the object to the batch + batch.add_object( + collection=collection, + properties=record, + ) + click.echo(f"Processed record") # add '{record}' <- if you would like to see the record being processed + except Exception as e: + raise Exception(f"Batch insertion failed: {e}") + + # Check for failed objects + failed_objects = self.client.batch.failed_objects + if failed_objects: + click.echo(f"Number of failed objects: {len(failed_objects)}") + for i, failed_obj in enumerate(failed_objects, 1): + click.echo(f"Failed object {i}: {failed_obj}") + else: + click.echo(f"All objects successfully inserted into '{collection}'.") \ No newline at end of file From 9574bc1637bd6a3d70ca30d4dc05efd372c955af Mon Sep 17 00:00:00 2001 From: Mohamed Shahin Date: Wed, 27 Nov 2024 11:37:09 +0000 Subject: [PATCH 2/4] Formatted files to pass Black checks --- weaviate_cli/commands/batch.py | 10 +++++++--- weaviate_cli/managers/batch_manager.py | 14 ++++++++++---- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/weaviate_cli/commands/batch.py b/weaviate_cli/commands/batch.py index 014385e..e5c9050 100644 --- a/weaviate_cli/commands/batch.py +++ b/weaviate_cli/commands/batch.py @@ -5,13 +5,15 @@ from weaviate_cli.utils import get_client_from_context from weaviate.exceptions import WeaviateConnectionError from weaviate_cli.defaults import CreateCollectionDefaults -from weaviate_cli.managers.batch_manager import BatchManager +from weaviate_cli.managers.batch_manager import BatchManager + @click.group() def batch() -> None: """Batch operations in Weaviate.""" pass + @batch.command("insert") @click.option( "--collection", @@ -65,7 +67,9 @@ def batch_insert_cli(ctx, collection, path, vectorizer, shards, replication_fact # Validate JSON structure if not isinstance(data, list) or not all(isinstance(obj, dict) for obj in data): - click.echo("Error: The JSON file must contain a list of objects (e.g., [{...}, {...}]).") + click.echo( + "Error: The JSON file must contain a list of objects (e.g., [{...}, {...}])." + ) sys.exit(1) # Initialize the Weaviate client @@ -96,4 +100,4 @@ def batch_insert_cli(ctx, collection, path, vectorizer, shards, replication_fact sys.exit(1) finally: if client: - client.close() \ No newline at end of file + client.close() diff --git a/weaviate_cli/managers/batch_manager.py b/weaviate_cli/managers/batch_manager.py index 76f6199..bcb2275 100644 --- a/weaviate_cli/managers/batch_manager.py +++ b/weaviate_cli/managers/batch_manager.py @@ -62,7 +62,9 @@ def create_collection( vectorizer_config=vectorizer_map[vectorizer], properties=None if force_auto_schema else [], ) - click.echo(f"Collection '{collection}' created successfully with vectorizer '{vectorizer}'.") + click.echo( + f"Collection '{collection}' created successfully with vectorizer '{vectorizer}'." + ) except Exception as e: raise Exception(f"Error creating collection '{collection}': {e}") @@ -79,7 +81,9 @@ def batch_insert( data (List[Dict]): Data to be inserted. """ if not self.client.collections.exists(collection): - raise Exception(f"Collection '{collection}' does not exist. Cannot insert data.") + raise Exception( + f"Collection '{collection}' does not exist. Cannot insert data." + ) try: # Perform batch insertion using Weaviate's dynamic batch @@ -94,7 +98,9 @@ def batch_insert( collection=collection, properties=record, ) - click.echo(f"Processed record") # add '{record}' <- if you would like to see the record being processed + click.echo( + f"Processed record" + ) # add '{record}' <- if you would like to see the record being processed except Exception as e: raise Exception(f"Batch insertion failed: {e}") @@ -105,4 +111,4 @@ def batch_insert( for i, failed_obj in enumerate(failed_objects, 1): click.echo(f"Failed object {i}: {failed_obj}") else: - click.echo(f"All objects successfully inserted into '{collection}'.") \ No newline at end of file + click.echo(f"All objects successfully inserted into '{collection}'.") From 42c5a8fda03e76a8d5f47a5ae362474864916e33 Mon Sep 17 00:00:00 2001 From: Rodrigo Lopez Date: Thu, 16 Jan 2025 17:02:12 +0100 Subject: [PATCH 3/4] Fix conflicts after rebasing --- cli.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/cli.py b/cli.py index 5750130..da36bd0 100644 --- a/cli.py +++ b/cli.py @@ -9,12 +9,9 @@ from weaviate_cli.commands.query import query from weaviate_cli.commands.restore import restore from weaviate_cli.commands.cancel import cancel -<<<<<<< HEAD from weaviate_cli.commands.assign import assign from weaviate_cli.commands.revoke import revoke -======= from weaviate_cli.commands.batch import batch ->>>>>>> 19aebe7 (new feature - add batch import from a local json file) from weaviate_cli import __version__ @@ -64,12 +61,9 @@ def main(ctx: click.Context, config_file: Optional[str], user: Optional[str]): main.add_command(restore) main.add_command(query) main.add_command(cancel) -<<<<<<< HEAD main.add_command(assign) main.add_command(revoke) -======= main.add_command(batch) ->>>>>>> 19aebe7 (new feature - add batch import from a local json file) if __name__ == "__main__": main() From 8b86a8d5c6448033b9351ba06e150335d72d5c9f Mon Sep 17 00:00:00 2001 From: Rodrigo Lopez Date: Thu, 16 Jan 2025 17:51:16 +0100 Subject: [PATCH 4/4] batch: Remove nested ids from data --- weaviate_cli/datasets/movies.json | 2 +- weaviate_cli/managers/batch_manager.py | 28 +++++++++++++++++++++++--- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/weaviate_cli/datasets/movies.json b/weaviate_cli/datasets/movies.json index 7970d22..d8340cb 100644 --- a/weaviate_cli/datasets/movies.json +++ b/weaviate_cli/datasets/movies.json @@ -1,6 +1,6 @@ [ { - "budget": 237000000, + "budget": "237000000", "cast": "Sam Worthington Zoe Saldana Sigourney Weaver Stephen Lang Michelle Rodriguez", "director": "James Cameron", "genres": "Action Adventure Fantasy Science Fiction", diff --git a/weaviate_cli/managers/batch_manager.py b/weaviate_cli/managers/batch_manager.py index bcb2275..36f6b88 100644 --- a/weaviate_cli/managers/batch_manager.py +++ b/weaviate_cli/managers/batch_manager.py @@ -89,9 +89,31 @@ def batch_insert( # Perform batch insertion using Weaviate's dynamic batch with self.client.batch.dynamic() as batch: for record in data: - # Remove the reserved 'id' key, if present - to avoid Error message: WeaviateInsertInvalidPropertyError("It is forbidden to insert `id` or `vector` - if "id" in record: - record.pop("id") + + def remove_id_keys(data: Dict) -> Dict: + if not isinstance(data, dict): + return data + cleaned_data = {} + for key, value in data.items(): + if key != "id": + if isinstance(value, dict): + cleaned_data[key] = remove_id_keys(value) + elif isinstance(value, list): + cleaned_data[key] = [ + ( + remove_id_keys(item) + if isinstance(item, dict) + else item + ) + for item in value + ] + else: + cleaned_data[key] = value + + return cleaned_data + + # Remove all keys and subkeys that start with 'id' + record = remove_id_keys(record) # Add the object to the batch batch.add_object(