[LEADS-290] Database Integrartion e2e#202
[LEADS-290] Database Integrartion e2e#202bsatapat-jpg wants to merge 1 commit intolightspeed-core:mainfrom
Conversation
WalkthroughThis pull request restructures the configuration system from a single Changes
Sequence Diagram(s)sequenceDiagram
participant SystemLoader
participant YAMLConfig
participant StorageFactory
participant Pipeline
participant DBBackend
participant FileBackend
SystemLoader->>YAMLConfig: load config_data["storage"]
YAMLConfig-->>SystemLoader: storage backend list
SystemLoader->>SystemLoader: _parse_storage_config(backends)
SystemLoader-->>Pipeline: SystemConfig with storage list
Pipeline->>StorageFactory: get_file_config(config.storage)
StorageFactory-->>Pipeline: FileBackendConfig (with output_dir)
Pipeline->>StorageFactory: create_database_backend(config.storage)
StorageFactory->>DBBackend: instantiate SQLStorageBackend
DBBackend-->>StorageFactory: backend instance (or None)
StorageFactory-->>Pipeline: Optional[SQLStorageBackend]
Pipeline->>DBBackend: _initialize_storage(RunInfo)
DBBackend-->>Pipeline: initialized
Pipeline->>Pipeline: _process_eval_data()
loop per result
Pipeline->>DBBackend: _save_result_to_storage(result)
DBBackend-->>Pipeline: saved
end
Pipeline->>DBBackend: _finalize_storage()
DBBackend-->>Pipeline: finalized
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (2)
src/lightspeed_evaluation/core/storage/config.py (1)
111-115: Consider security implications of plaintext password storage.The
passwordfield stores credentials in plaintext within the configuration. While this may be acceptable for local development, consider documenting that production deployments should use environment variables or secrets management.📝 Suggested documentation improvement
# Connection parameters (required for remote databases, ignored for SQLite) - host: Optional[str] = Field(default=None, description="Database host") - port: Optional[int] = Field(default=None, description="Database port") - user: Optional[str] = Field(default=None, description="Database username") - password: Optional[str] = Field(default=None, description="Database password") + host: Optional[str] = Field(default=None, description="Database host") + port: Optional[int] = Field(default=None, description="Database port") + user: Optional[str] = Field(default=None, description="Database username") + password: Optional[str] = Field( + default=None, + description="Database password. For production, consider using environment variables.", + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/lightspeed_evaluation/core/storage/config.py` around lines 111 - 115, The password field currently stores credentials in plaintext; update the configuration to discourage plaintext storage and support secrets: change the Field description for password in the Pydantic model to document that production credentials must come from environment variables or a secrets manager, and migrate the type to a secret-aware type (e.g., pydantic.SecretStr) or add an env-loading path so callers of the model (where password is referenced) accept secrets rather than raw strings; specifically update the Field description for password and modify the model definition (the password field and any places that consume it) to use SecretStr or an env-based loader to avoid embedding plaintext credentials.src/lightspeed_evaluation/core/output/generator.py (1)
484-487: Redundanthasattrcheck can be simplified.Per the
SystemConfigmodel definition,storageis defined withdefault_factory=list, so it always exists as an attribute (neverNone, just possibly empty). Thehasattrguard is defensive but unnecessary. Theget_file_config()function already handles empty lists by returning a defaultFileBackendConfig().♻️ Suggested simplification
def _get_included_config_sections(self) -> list[str]: """Get list of configuration sections to include in summaries. Returns: List of section names (e.g., ['llm', 'embedding', 'api']). """ - if self.system_config is not None and hasattr(self.system_config, "storage"): + if self.system_config is not None: file_config = get_file_config(self.system_config.storage) return file_config.summary_config_sections return DEFAULT_STORED_CONFIGS🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/lightspeed_evaluation/core/output/generator.py` around lines 484 - 487, The hasattr(self.system_config, "storage") check is redundant because SystemConfig.storage always exists (default_factory=list); simplify the branch in the method by only checking if self.system_config is not None, then call get_file_config(self.system_config.storage) and return file_config.summary_config_sections, otherwise return DEFAULT_STORED_CONFIGS—remove the hasattr(...) guard and rely on get_file_config to handle empty lists.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/lightspeed_evaluation/core/storage/factory.py`:
- Around line 69-83: The helpers get_database_config and the analogous
get_file_config currently return the first matching instance and silently ignore
duplicates; change each to collect all matches (iterate storage_configs and
append when isinstance(..., DatabaseBackendConfig) or FileBackendConfig), and if
more than one match is found raise a clear ValueError (or a ConfigError) naming
the duplicate backend type and listing the found entries so misconfiguration
fails fast; alternatively, if you prefer caller-side handling, change the return
type to List[DatabaseBackendConfig] / List[FileBackendConfig] and return all
matches instead of a single Optional, updating callers accordingly.
- Around line 38-42: _update_connection_url() currently returns a plain
interpolated DSN string containing raw credentials; change _build_connection_url
(the function in factory.py) to build and return a sqlalchemy.engine.URL object
via sqlalchemy.engine.URL.create(...) (use the same driver mapping logic:
"postgresql" vs "mysql+pymysql" and DEFAULT_PORTS fallback) so SQLAlchemy will
escape credentials correctly, and update the return type hint in factory.py to
sqlalchemy.engine.URL; also update any type hints and usages in sql_storage.py
(where create_engine is called) to accept a URL object and pass the URL directly
to create_engine() rather than a formatted string.
In `@src/lightspeed_evaluation/core/system/loader.py`:
- Around line 196-219: The current _parse_storage_config method logs unknown
storage backend types and drops them; change this to raise
core.system.exceptions.ConfigurationError when backend_type is unrecognized so
invalid configs fail fast; locate _parse_storage_config and replace the
logger.warning branch with raising ConfigurationError including the invalid type
and item contents, and optionally refactor the whole parsing to use Pydantic's
TypeAdapter with StorageBackendConfig (Annotated[Union[FileBackendConfig,
DatabaseBackendConfig], Field(discriminator="type")]) to let Pydantic
validate/dispatch automatically and surface validation errors instead of manual
dispatch.
In `@src/lightspeed_evaluation/pipeline/evaluation/pipeline.py`:
- Around line 184-191: Initialization catches StorageError but simply drops
self.storage_backend without cleaning up; also subsequent save failures only log
and continue, causing repeated warnings and potential resource leaks. Update the
try/except around get_file_config/RunInfo/self.storage_backend.initialize to, on
exception, attempt to gracefully teardown the partially-initialized backend
(call its close/shutdown/teardown if available), set self.storage_backend =
None, and log that persistence has been disabled; likewise, in the result-save
path that currently logs on save failures (the block handling storage write
errors), on the first write error perform the same teardown-and-disable sequence
(call close/shutdown if present, set self.storage_backend = None) so persistence
is disabled after the first real storage failure to avoid repeated warnings and
leaked state.
- Around line 158-166: The finalize/storage shutdown call is not guaranteed
because exceptions from future.result() can skip the _finalize_storage() call;
wrap the processing and any code that waits on futures (where future.result() is
called) in a try/finally so _finalize_storage() always runs. Specifically, call
self._initialize_storage() before the try, run results =
self._process_eval_data(evaluation_data) (and any future.result() joins) inside
the try, and move self._finalize_storage() into the finally block to ensure
cleanup even on exceptions raised by future.result().
---
Nitpick comments:
In `@src/lightspeed_evaluation/core/output/generator.py`:
- Around line 484-487: The hasattr(self.system_config, "storage") check is
redundant because SystemConfig.storage always exists (default_factory=list);
simplify the branch in the method by only checking if self.system_config is not
None, then call get_file_config(self.system_config.storage) and return
file_config.summary_config_sections, otherwise return
DEFAULT_STORED_CONFIGS—remove the hasattr(...) guard and rely on get_file_config
to handle empty lists.
In `@src/lightspeed_evaluation/core/storage/config.py`:
- Around line 111-115: The password field currently stores credentials in
plaintext; update the configuration to discourage plaintext storage and support
secrets: change the Field description for password in the Pydantic model to
document that production credentials must come from environment variables or a
secrets manager, and migrate the type to a secret-aware type (e.g.,
pydantic.SecretStr) or add an env-loading path so callers of the model (where
password is referenced) accept secrets rather than raw strings; specifically
update the Field description for password and modify the model definition (the
password field and any places that consume it) to use SecretStr or an env-based
loader to avoid embedding plaintext credentials.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 762b1a9a-dfae-4386-8b69-157561b3037e
📒 Files selected for processing (24)
config/system.yamlsrc/lightspeed_evaluation/__init__.pysrc/lightspeed_evaluation/core/models/__init__.pysrc/lightspeed_evaluation/core/models/system.pysrc/lightspeed_evaluation/core/output/generator.pysrc/lightspeed_evaluation/core/storage/__init__.pysrc/lightspeed_evaluation/core/storage/config.pysrc/lightspeed_evaluation/core/storage/exceptions.pysrc/lightspeed_evaluation/core/storage/factory.pysrc/lightspeed_evaluation/core/storage/sql_storage.pysrc/lightspeed_evaluation/core/system/exceptions.pysrc/lightspeed_evaluation/core/system/loader.pysrc/lightspeed_evaluation/pipeline/evaluation/pipeline.pysrc/lightspeed_evaluation/runner/evaluation.pytests/unit/core/config/test_models.pytests/unit/core/models/test_system.pytests/unit/core/output/conftest.pytests/unit/core/output/test_final_coverage.pytests/unit/core/output/test_generator.pytests/unit/core/storage/test_config.pytests/unit/core/storage/test_factory.pytests/unit/core/system/test_loader.pytests/unit/pipeline/evaluation/conftest.pytests/unit/runner/test_evaluation.py
💤 Files with no reviewable changes (3)
- src/lightspeed_evaluation/core/storage/exceptions.py
- src/lightspeed_evaluation/init.py
- src/lightspeed_evaluation/core/models/init.py
| # Remote database (postgres, mysql) | ||
| driver = "postgresql" if config.type == "postgres" else "mysql+pymysql" | ||
| port = config.port or DEFAULT_PORTS.get(config.type, 5432) | ||
|
|
||
| return f"{driver}://{config.user}:{config.password}@{config.host}:{port}/{config.database}" |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In SQLAlchemy, when usernames or passwords contain reserved characters like @, :, or /, is the recommended approach to build the engine URL with sqlalchemy.engine.URL.create(...).render_as_string(hide_password=False) instead of manual string interpolation?
💡 Result:
No, the recommended approach in SQLAlchemy for handling usernames or passwords with reserved characters like @, :, or / is to use sqlalchemy.engine.URL.create to build the URL object directly and pass it to create_engine. This avoids manual string interpolation and automatic URL parsing issues entirely, as the URL object accepts raw unescaped credentials and handles escaping internally when needed. Manual string interpolation with urllib.parse.quote or quote_plus on credentials is an alternative but error-prone, as it requires correct encoding and can lead to parsing ambiguities. Using URL.create(...).render_as_string(hide_password=False) to build a string for the engine is not recommended and unnecessary. The URL object can be passed directly to create_engine without converting to a string, bypassing string parsing. Official docs emphasize programmatic URL creation to "bypass[] the need for string parsing". Even when rendered to string, render_as_string may not always produce a round-trippable URL due to historical encoding choices (e.g., issues with brackets or query chars in database names), as noted in GitHub issues. Example from docs and community sources: from sqlalchemy import create_engine from sqlalchemy.engine import URL url = URL.create( "postgresql+pg8000", username="dbuser", password="kx@jj5/g", # raw, unescaped host="pghost10", database="appdb", ) engine = create_engine(url) # passes URL object directly
Citations:
- 1: http://docs.sqlalchemy.org/en/latest/core/engines.html
- 2: https://stackoverflow.com/questions/66371841/how-do-i-use-sqlalchemy-create-engine-with-password-that-includes-an
- 3: https://medium.com/@anshul.suresh99/how-to-handle-and-other-special-characters-in-database-passwords-with-sqlalchemy-ed25e90bd97a
- 4: Escaping Special Characters such as @ signs in Passwords sqlalchemy/sqlalchemy#9827
- 5: https://pydoc.dev/sqlalchemy/latest/sqlalchemy.engine.url.URL.html
- 6: https://stackoverflow.com/questions/40462863/is-there-a-way-to-get-the-connection-string-out-of-sqlalchemy-in-log-suitable-fo
🏁 Script executed:
rg "_build_connection_url" --type py -B 2 -A 5Repository: lightspeed-core/lightspeed-evaluation
Length of output: 1284
🏁 Script executed:
rg "class SQLStorageBackend" --type py -A 20Repository: lightspeed-core/lightspeed-evaluation
Length of output: 1916
🏁 Script executed:
rg "connection_url" src/lightspeed_evaluation/core/storage/sql_storage.py -B 1 -A 1Repository: lightspeed-core/lightspeed-evaluation
Length of output: 871
🏁 Script executed:
cat -n src/lightspeed_evaluation/core/storage/factory.py | head -50Repository: lightspeed-core/lightspeed-evaluation
Length of output: 1714
Use SQLAlchemy URL objects instead of string interpolation for database credentials.
Line 42 interpolates raw credentials into the connection URL string. When credentials contain reserved characters like @, :, or /, the DSN becomes unparseable. SQLAlchemy's recommended approach is to use sqlalchemy.engine.URL.create() and pass the URL object directly to create_engine(), which automatically handles credential escaping without string parsing.
Update _build_connection_url() to return a URL object and update the type hints in both factory.py and sql_storage.py:
Suggested fix
+from sqlalchemy.engine import URL
def _build_connection_url(config: DatabaseBackendConfig) -> URL:
"""Build SQLAlchemy connection URL from config.
Args:
config: Database backend configuration.
Returns:
SQLAlchemy URL object.
"""
if config.type == "sqlite":
- return f"sqlite:///{config.database}"
+ return URL.create("sqlite", database=config.database)
# Remote database (postgres, mysql)
driver = "postgresql" if config.type == "postgres" else "mysql+pymysql"
port = config.port or DEFAULT_PORTS.get(config.type, 5432)
- return f"{driver}://{config.user}:{config.password}@{config.host}:{port}/{config.database}"
+ return URL.create(
+ drivername=driver,
+ username=config.user,
+ password=config.password,
+ host=config.host,
+ port=port,
+ database=config.database,
+ )Then update the type hints in sql_storage.py:
+from sqlalchemy.engine import URL
def __init__(
self,
- connection_url: str,
+ connection_url: URL,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/lightspeed_evaluation/core/storage/factory.py` around lines 38 - 42,
_update_connection_url() currently returns a plain interpolated DSN string
containing raw credentials; change _build_connection_url (the function in
factory.py) to build and return a sqlalchemy.engine.URL object via
sqlalchemy.engine.URL.create(...) (use the same driver mapping logic:
"postgresql" vs "mysql+pymysql" and DEFAULT_PORTS fallback) so SQLAlchemy will
escape credentials correctly, and update the return type hint in factory.py to
sqlalchemy.engine.URL; also update any type hints and usages in sql_storage.py
(where create_engine is called) to accept a URL object and pass the URL directly
to create_engine() rather than a formatted string.
| def get_database_config( | ||
| storage_configs: Sequence[StorageBackendConfig], | ||
| ) -> Optional[DatabaseBackendConfig]: | ||
| """Extract database configuration from storage configs list. | ||
|
|
||
| Args: | ||
| storage_configs: Sequence of storage backend configurations. | ||
|
|
||
| Returns: | ||
| First database config found, or None. | ||
| """ | ||
| for config in storage_configs: | ||
| if isinstance(config, DatabaseBackendConfig): | ||
| return config | ||
| return None |
There was a problem hiding this comment.
Don't silently ignore duplicate backend entries.
Line 82 and Line 99 return the first match and drop the rest. With a list-based storage config, a second file or database backend can be accepted but never used, which is a hard-to-debug misconfiguration.
Either validate uniqueness during config loading or change these helpers to return all matches and let callers decide what to do.
Also applies to: 86-100
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/lightspeed_evaluation/core/storage/factory.py` around lines 69 - 83, The
helpers get_database_config and the analogous get_file_config currently return
the first matching instance and silently ignore duplicates; change each to
collect all matches (iterate storage_configs and append when isinstance(...,
DatabaseBackendConfig) or FileBackendConfig), and if more than one match is
found raise a clear ValueError (or a ConfigError) naming the duplicate backend
type and listing the found entries so misconfiguration fails fast;
alternatively, if you prefer caller-side handling, change the return type to
List[DatabaseBackendConfig] / List[FileBackendConfig] and return all matches
instead of a single Optional, updating callers accordingly.
| def _parse_storage_config( | ||
| self, storage_data: list[dict[str, Any]] | ||
| ) -> list[StorageBackendConfig]: | ||
| """Parse storage backend configurations from YAML data. | ||
|
|
||
| Args: | ||
| storage_data: List of storage backend config dicts from YAML. | ||
|
|
||
| Returns: | ||
| List of StorageBackendConfig objects. | ||
| """ | ||
| if not storage_data: | ||
| return [] | ||
|
|
||
| backends: list[StorageBackendConfig] = [] | ||
| for item in storage_data: | ||
| backend_type = item.get("type") | ||
| if backend_type == "file": | ||
| backends.append(FileBackendConfig(**item)) | ||
| elif backend_type in ("sqlite", "postgres", "mysql"): | ||
| backends.append(DatabaseBackendConfig(**item)) | ||
| else: | ||
| logger.warning("Unknown storage backend type: %s", backend_type) | ||
| return backends |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify that StorageBackendConfig is a discriminated union and can parse automatically
ast-grep --pattern $'StorageBackendConfig = Annotated[
$$$
]'Repository: lightspeed-core/lightspeed-evaluation
Length of output: 414
🏁 Script executed:
# 1. Check if ConfigurationError exists and is exported from core.system.exceptions
find . -name "exceptions.py" -o -name "error.py" | head -20Repository: lightspeed-core/lightspeed-evaluation
Length of output: 195
🏁 Script executed:
# 2. Check the complete loader.py file for context around _parse_storage_config
wc -l src/lightspeed_evaluation/core/system/loader.pyRepository: lightspeed-core/lightspeed-evaluation
Length of output: 132
🏁 Script executed:
# 3. Check imports in loader.py to see what's already imported from pydantic
head -50 src/lightspeed_evaluation/core/system/loader.pyRepository: lightspeed-core/lightspeed-evaluation
Length of output: 1412
🏁 Script executed:
# 4. Search for existing TypeAdapter usage in codebase
rg "TypeAdapter" --type pyRepository: lightspeed-core/lightspeed-evaluation
Length of output: 63
🏁 Script executed:
# 5. Search for ValidationError usage
rg "ValidationError" --type pyRepository: lightspeed-core/lightspeed-evaluation
Length of output: 12959
🏁 Script executed:
cat -n src/lightspeed_evaluation/core/system/exceptions.pyRepository: lightspeed-core/lightspeed-evaluation
Length of output: 2240
🏁 Script executed:
# Check what exceptions are specifically available
rg "class.*Error" src/lightspeed_evaluation/core/system/exceptions.pyRepository: lightspeed-core/lightspeed-evaluation
Length of output: 385
🏁 Script executed:
# Check if ConfigurationError exists in the codebase
rg "ConfigurationError" --type py | head -20Repository: lightspeed-core/lightspeed-evaluation
Length of output: 2309
🏁 Script executed:
# Look at the actual _parse_storage_config implementation and surrounding context
sed -n '190,220p' src/lightspeed_evaluation/core/system/loader.pyRepository: lightspeed-core/lightspeed-evaluation
Length of output: 1181
🏁 Script executed:
# Check if _parse_storage_config is tested
rg "_parse_storage_config" --type py -A 5Repository: lightspeed-core/lightspeed-evaluation
Length of output: 1165
🏁 Script executed:
# Check what exception pattern is used in loader.py for other similar validation
rg "raise.*Error" src/lightspeed_evaluation/core/system/loader.pyRepository: lightspeed-core/lightspeed-evaluation
Length of output: 351
🏁 Script executed:
# Verify Pydantic v2 with TypeAdapter is available - check pyproject.toml
cat src/lightspeed_evaluation/../../../pyproject.toml 2>/dev/null || cat pyproject.toml 2>/dev/null || find . -name "pyproject.toml" -type f | head -1 | xargs cat | head -100Repository: lightspeed-core/lightspeed-evaluation
Length of output: 3718
🏁 Script executed:
# Check if there are other examples of TypeAdapter or discriminated union usage in the codebase
rg "Union\[.*\].*Field.*discriminator" --type pyRepository: lightspeed-core/lightspeed-evaluation
Length of output: 63
🏁 Script executed:
# Check the validation error pattern used in loader.py context
sed -n '1,50p' src/lightspeed_evaluation/core/system/loader.py | grep -A 5 "import"Repository: lightspeed-core/lightspeed-evaluation
Length of output: 658
Unknown storage backend types should raise ConfigurationError per guidelines rather than being silently dropped.
The code currently logs a warning for unknown storage backend types but silently drops them from the result. According to the coding guidelines, storage configuration code should use custom exceptions from core.system.exceptions for error handling. Raise ConfigurationError for invalid backend types instead of logging and continuing.
Optionally, consider leveraging Pydantic's discriminated union to eliminate the manual dispatch. Since StorageBackendConfig is defined as Annotated[Union[FileBackendConfig, DatabaseBackendConfig], Field(discriminator="type")], you can use Pydantic's TypeAdapter to automatically parse and validate all backends:
Optional refactor: Use TypeAdapter for automatic dispatch
+from pydantic import TypeAdapter, ValidationError
+
+_storage_adapter = TypeAdapter(list[StorageBackendConfig])
+
def _parse_storage_config(
self, storage_data: list[dict[str, Any]]
) -> list[StorageBackendConfig]:
"""Parse storage backend configurations from YAML data.
Args:
storage_data: List of storage backend config dicts from YAML.
Returns:
List of StorageBackendConfig objects.
+
+ Raises:
+ ConfigurationError: If any backend config is invalid or has unknown type.
"""
if not storage_data:
return []
- backends: list[StorageBackendConfig] = []
- for item in storage_data:
- backend_type = item.get("type")
- if backend_type == "file":
- backends.append(FileBackendConfig(**item))
- elif backend_type in ("sqlite", "postgres", "mysql"):
- backends.append(DatabaseBackendConfig(**item))
- else:
- logger.warning("Unknown storage backend type: %s", backend_type)
- return backends
+ try:
+ return _storage_adapter.validate_python(storage_data)
+ except ValidationError as e:
+ raise ConfigurationError(f"Invalid storage backend configuration: {e}") from e🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/lightspeed_evaluation/core/system/loader.py` around lines 196 - 219, The
current _parse_storage_config method logs unknown storage backend types and
drops them; change this to raise core.system.exceptions.ConfigurationError when
backend_type is unrecognized so invalid configs fail fast; locate
_parse_storage_config and replace the logger.warning branch with raising
ConfigurationError including the invalid type and item contents, and optionally
refactor the whole parsing to use Pydantic's TypeAdapter with
StorageBackendConfig (Annotated[Union[FileBackendConfig, DatabaseBackendConfig],
Field(discriminator="type")]) to let Pydantic validate/dispatch automatically
and surface validation errors instead of manual dispatch.
| # Initialize storage backend | ||
| self._initialize_storage() | ||
|
|
||
| # Process each conversation | ||
| logger.info("Processing conversations") | ||
| results = self._process_eval_data(evaluation_data) | ||
|
|
||
| # Finalize storage | ||
| self._finalize_storage() |
There was a problem hiding this comment.
Finalize storage in a finally block.
future.result() on Line 232 can raise, which currently skips Lines 165-166 entirely. That leaves interrupted runs unfinalized in the database.
💡 Suggested fix
# Initialize storage backend
self._initialize_storage()
-
- # Process each conversation
- logger.info("Processing conversations")
- results = self._process_eval_data(evaluation_data)
-
- # Finalize storage
- self._finalize_storage()
+ try:
+ logger.info("Processing conversations")
+ results = self._process_eval_data(evaluation_data)
+ finally:
+ self._finalize_storage()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Initialize storage backend | |
| self._initialize_storage() | |
| # Process each conversation | |
| logger.info("Processing conversations") | |
| results = self._process_eval_data(evaluation_data) | |
| # Finalize storage | |
| self._finalize_storage() | |
| # Initialize storage backend | |
| self._initialize_storage() | |
| try: | |
| logger.info("Processing conversations") | |
| results = self._process_eval_data(evaluation_data) | |
| finally: | |
| self._finalize_storage() |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/lightspeed_evaluation/pipeline/evaluation/pipeline.py` around lines 158 -
166, The finalize/storage shutdown call is not guaranteed because exceptions
from future.result() can skip the _finalize_storage() call; wrap the processing
and any code that waits on futures (where future.result() is called) in a
try/finally so _finalize_storage() always runs. Specifically, call
self._initialize_storage() before the try, run results =
self._process_eval_data(evaluation_data) (and any future.result() joins) inside
the try, and move self._finalize_storage() into the finally block to ensure
cleanup even on exceptions raised by future.result().
| try: | ||
| file_config = get_file_config(self.system_config.storage) | ||
| run_info = RunInfo(name=file_config.base_filename) | ||
| self.storage_backend.initialize(run_info) | ||
| logger.info("Database storage initialized (run_id=%s)", run_info.run_id) | ||
| except StorageError as e: | ||
| logger.warning("Failed to initialize database storage: %s", e) | ||
| self.storage_backend = None |
There was a problem hiding this comment.
Disable persistence after the first storage error.
On Lines 190-191 the backend is discarded without cleanup, and on Lines 214-215 save failures only log and continue. If initialization partially succeeds or the DB drops mid-run, that can leak backend state and emit one warning per remaining result.
💡 Suggested fix
+ def _disable_storage_backend(self, message: str, error: StorageError) -> None:
+ """Stop using database storage after a backend failure."""
+ backend = self.storage_backend
+ self.storage_backend = None
+ logger.warning("%s: %s", message, error)
+ if backend is not None:
+ backend.close()
+
def _initialize_storage(self) -> None:
"""Initialize storage backend for this evaluation run."""
if self.storage_backend is None:
return
@@
try:
file_config = get_file_config(self.system_config.storage)
run_info = RunInfo(name=file_config.base_filename)
self.storage_backend.initialize(run_info)
logger.info("Database storage initialized (run_id=%s)", run_info.run_id)
except StorageError as e:
- logger.warning("Failed to initialize database storage: %s", e)
- self.storage_backend = None
+ self._disable_storage_backend("Failed to initialize database storage", e)
@@
try:
self.storage_backend.save_result(result)
except StorageError as e:
- logger.warning("Failed to save result to database: %s", e)
+ self._disable_storage_backend("Failed to save result to database", e)Also applies to: 207-215
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/lightspeed_evaluation/pipeline/evaluation/pipeline.py` around lines 184 -
191, Initialization catches StorageError but simply drops self.storage_backend
without cleaning up; also subsequent save failures only log and continue,
causing repeated warnings and potential resource leaks. Update the try/except
around get_file_config/RunInfo/self.storage_backend.initialize to, on exception,
attempt to gracefully teardown the partially-initialized backend (call its
close/shutdown/teardown if available), set self.storage_backend = None, and log
that persistence has been disabled; likewise, in the result-save path that
currently logs on save failures (the block handling storage write errors), on
the first write error perform the same teardown-and-disable sequence (call
close/shutdown if present, set self.storage_backend = None) so persistence is
disabled after the first real storage failure to avoid repeated warnings and
leaked state.
Description
Type of change
Tools used to create PR
Identify any AI code assistants used in this PR (for transparency and review context)
Related Tickets & Documents
Checklist before requesting a review
Testing
Summary by CodeRabbit
New Features