From fe92984f460d563b95278ec6895021322c7f2fae Mon Sep 17 00:00:00 2001 From: Tom Durrant Date: Wed, 18 Feb 2026 17:04:16 +1100 Subject: [PATCH 1/2] Add Pydantic-based postprocessor configuration framework Implement configuration-driven postprocessing to bring parity with run backend framework. BREAKING CHANGES: - Replace string-based processor names with config objects - ModelRun.postprocess() now requires BasePostprocessorConfig instances - CLI --processor option replaced with --processor-config (required) - LocalPipelineBackend.execute() requires processor_config parameter New Features: - BasePostprocessorConfig abstract class with common fields - NoopPostprocessorConfig concrete implementation - Entry point-based dynamic config loading (rompy.postprocess.config) - ProcessorConfig type alias for Union of all processor configs - CLI --processor-config option for postprocess/pipeline commands - CLI --processor-type option for backends validate command - Example configs in examples/backends/postprocessor_configs/ Files Added: - src/rompy/postprocess/config.py - Config classes and loading - tests/test_postprocess_config.py - Comprehensive test suite (18 tests) - examples/backends/postprocessor_configs/ - Example YAML configs Files Modified: - src/rompy/cli.py - Updated CLI commands - src/rompy/model.py - Updated postprocess() method - src/rompy/pipeline/__init__.py - Updated LocalPipelineBackend - pyproject.toml - Added entry point registration - tests/backends/test_enhanced_backends.py - Updated pipeline tests - README.md - Added postprocessor configuration section - HISTORY.rst - Added v0.6.0 changelog entry All tests passing: 254 passed, 15 skipped --- HISTORY.rst | 18 ++ README.md | 37 +++ .../backends/postprocessor_configs/README.md | 45 ++++ .../postprocessor_configs/noop_advanced.yml | 23 ++ .../postprocessor_configs/noop_basic.yml | 5 + pyproject.toml | 3 + src/rompy/cli.py | 173 +++++++++---- src/rompy/model.py | 48 ++-- src/rompy/pipeline/__init__.py | 23 +- src/rompy/postprocess/__init__.py | 18 +- src/rompy/postprocess/config.py | 226 +++++++++++++++++ tests/backends/test_enhanced_backends.py | 57 +++-- tests/test_postprocess_config.py | 234 ++++++++++++++++++ 13 files changed, 808 insertions(+), 102 deletions(-) create mode 100644 examples/backends/postprocessor_configs/README.md create mode 100644 examples/backends/postprocessor_configs/noop_advanced.yml create mode 100644 examples/backends/postprocessor_configs/noop_basic.yml create mode 100644 src/rompy/postprocess/config.py create mode 100644 tests/test_postprocess_config.py diff --git a/HISTORY.rst b/HISTORY.rst index 541e092b..f3111f43 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -33,6 +33,24 @@ are continually evolving. Contributions and feedback are welcome! Releases ******** +0.6.0 (2026-02-18) +___________________ + +Breaking Changes +---------------- +* CLI: Replaced `--processor` option with `--processor-config` (required) in `rompy postprocess` and `rompy pipeline` commands. +* API: `ModelRun.postprocess()` now requires a `BasePostprocessorConfig` instance instead of a string processor name. +* Pipeline: `LocalPipelineBackend.execute()` now accepts a `BasePostprocessorConfig` instance for the `processor` parameter. + +New Features +------------ +* Added Pydantic-based postprocessor configuration framework (`BasePostprocessorConfig`, `NoopPostprocessorConfig`). +* Added `--processor-config` CLI option for specifying postprocessor configuration files (YAML/JSON). +* Added `rompy.postprocess.config` entry point group for registering postprocessor configurations. +* Added `rompy backends validate --processor-type` for validating postprocessor configurations. +* Added comprehensive test suite for postprocessor configuration framework. +* Added example postprocessor configuration files. + 0.5.0 (2025-07-13) ___________________ diff --git a/README.md b/README.md index 08b17e4e..a909c066 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ Key Features: - Templated, reproducible model configuration using pydantic and xarray - Unified interfaces for grids, data, boundaries, and spectra - Extensible plugin system for models, data sources, backends, and postprocessors +- Pydantic-based postprocessor configuration with CLI support - Robust logging and formatting for consistent output and diagnostics - Example notebooks and comprehensive documentation for rapid onboarding - Support for local, Docker, and HPC execution backends @@ -28,6 +29,42 @@ rompy is under active development—features, model support, and documentation a See +# Postprocessor Configuration + +ROMPY now supports Pydantic-based postprocessor configuration via YAML/JSON files. + +## Usage + +### Postprocess with a config file + +```bash +rompy postprocess model_config.yml --processor-config processor.yml +``` + +### Pipeline with postprocessor config + +```bash +rompy pipeline model_config.yml --processor-config processor.yml +``` + +### Validate a postprocessor config + +```bash +rompy backends validate --processor-type noop processor.yml +``` + +## Example Configuration + +```yaml +type: noop +validate_outputs: true +timeout: 3600 +env_vars: + DEBUG: "1" +``` + +See `examples/backends/postprocessor_configs/` for more examples. + # Code Formatting and Pre-commit Hooks This repository enforces Python code formatting using [black](https://github.com/psf/black) via the pre-commit framework. diff --git a/examples/backends/postprocessor_configs/README.md b/examples/backends/postprocessor_configs/README.md new file mode 100644 index 00000000..fc2b7496 --- /dev/null +++ b/examples/backends/postprocessor_configs/README.md @@ -0,0 +1,45 @@ +# Postprocessor Configuration Examples + +This directory contains example postprocessor configuration files for ROMPY. + +## Available Postprocessors + +### Noop Postprocessor + +The `noop` postprocessor is a placeholder that performs no actual processing but can optionally validate that model outputs exist. + +## Files + +- **noop_basic.yml** - Minimal configuration with just the required type field +- **noop_advanced.yml** - Shows all available configuration options + +## Usage + +### Validate a configuration file + +```bash +rompy backends validate --processor-type noop noop_basic.yml +``` + +### Use in postprocessing + +```bash +rompy postprocess model_config.yml --processor-config noop_basic.yml +``` + +### Use in pipeline + +```bash +rompy pipeline model_config.yml --processor-config noop_basic.yml +``` + +## Configuration Schema + +All postprocessor configurations share these common fields: + +- **type** (required): The postprocessor type (e.g., "noop") +- **timeout** (optional): Maximum execution time in seconds (60-86400, default: 3600) +- **env_vars** (optional): Dictionary of environment variables +- **working_dir** (optional): Working directory for execution + +Processor-specific fields vary by type. See individual example files for details. diff --git a/examples/backends/postprocessor_configs/noop_advanced.yml b/examples/backends/postprocessor_configs/noop_advanced.yml new file mode 100644 index 00000000..e9b4bae5 --- /dev/null +++ b/examples/backends/postprocessor_configs/noop_advanced.yml @@ -0,0 +1,23 @@ +# Advanced Noop Postprocessor Configuration +# Shows all available options for the noop postprocessor + +# Required: specifies this is a noop postprocessor +type: noop + +# Whether to validate that expected output files exist +# Default: true +validate_outputs: true + +# Maximum execution time in seconds (60-86400) +# Default: 3600 (1 hour) +timeout: 3600 + +# Additional environment variables to set during execution +# Default: {} +env_vars: + DEBUG: "1" + LOG_LEVEL: "INFO" + +# Working directory for execution +# If not specified, uses model output directory +# working_dir: /path/to/output diff --git a/examples/backends/postprocessor_configs/noop_basic.yml b/examples/backends/postprocessor_configs/noop_basic.yml new file mode 100644 index 00000000..267da340 --- /dev/null +++ b/examples/backends/postprocessor_configs/noop_basic.yml @@ -0,0 +1,5 @@ +# Basic Noop Postprocessor Configuration +# This is the simplest possible postprocessor config - just validate outputs exist + +type: noop +validate_outputs: true diff --git a/pyproject.toml b/pyproject.toml index 5c76d8d8..e7ab46ed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -86,6 +86,9 @@ slurm = "rompy.run.slurm:SlurmRunBackend" [project.entry-points."rompy.postprocess"] noop = "rompy.postprocess:NoopPostprocessor" +[project.entry-points."rompy.postprocess.config"] +noop = "rompy.postprocess.config:NoopPostprocessorConfig" + [project.entry-points."rompy.pipeline"] local = "rompy.pipeline:LocalPipelineBackend" diff --git a/src/rompy/cli.py b/src/rompy/cli.py index 76bf8961..a129754e 100644 --- a/src/rompy/cli.py +++ b/src/rompy/cli.py @@ -364,7 +364,12 @@ def _load_backend_config(backend_config_file): @cli.command() @click.argument("config", type=click.Path(exists=True), required=False) @click.option("--run-backend", default="local", help="Execution backend for run stage") -@click.option("--processor", default="noop", help="Postprocessor to use") +@click.option( + "--processor-config", + type=click.Path(exists=True), + required=True, + help="YAML/JSON file with postprocessor configuration (required)", +) @click.option( "--cleanup-on-failure/--no-cleanup", default=False, help="Clean up on failure" ) @@ -375,7 +380,7 @@ def _load_backend_config(backend_config_file): def pipeline( config, run_backend, - processor, + processor_config, cleanup_on_failure, validate_stages, verbose, @@ -399,10 +404,15 @@ def pipeline( config_data = load_config(config, from_env=config_from_env) model_run = ModelRun(**config_data) + # Load processor configuration + from rompy.postprocess.config import _load_processor_config + + processor_cfg = _load_processor_config(processor_config) + logger.info(f"Running pipeline for: {model_run.config.model_type}") logger.info(f"Run ID: {model_run.run_id}") logger.info( - f"Pipeline: generate → run({run_backend}) → postprocess({processor})" + f"Pipeline: generate → run({run_backend}) → postprocess({processor_cfg.type})" ) start_time = datetime.now() @@ -411,7 +421,7 @@ def pipeline( results = model_run.pipeline( pipeline_backend="local", run_backend=run_backend, - processor=processor, + processor=processor_cfg, cleanup_on_failure=cleanup_on_failure, validate_stages=validate_stages, ) @@ -496,7 +506,10 @@ def generate( @cli.command() @click.argument("config", type=click.Path(exists=True), required=False) @click.option( - "--processor", default="noop", help="Postprocessor to use (default: noop)" + "--processor-config", + type=click.Path(exists=True), + required=True, + help="YAML/JSON file with postprocessor configuration (required)", ) @click.option("--output-dir", help="Override output directory for postprocessing") @click.option( @@ -507,7 +520,7 @@ def generate( @add_common_options def postprocess( config, - processor, + processor_config, output_dir, validate_outputs, verbose, @@ -517,7 +530,15 @@ def postprocess( simple_logs, config_from_env, ): - """Run postprocessing on model outputs using the specified postprocessor.""" + """Run postprocessing on model outputs using the specified postprocessor. + + Examples: + # Run with processor configuration file + rompy postprocess config.yml --processor-config processor.yml + + # Run with config from environment variable + rompy postprocess --config-from-env --processor-config processor.yml + """ configure_logging(verbose, log_dir, simple_logs, ascii_only, show_warnings) # Validate config source @@ -531,14 +552,19 @@ def postprocess( config_data = load_config(config, from_env=config_from_env) model_run = ModelRun(**config_data) + # Load processor configuration + from rompy.postprocess.config import _load_processor_config + + processor_cfg = _load_processor_config(processor_config) + logger.info(f"Running postprocessing for: {model_run.config.model_type}") logger.info(f"Run ID: {model_run.run_id}") - logger.info(f"Postprocessor: {processor}") + logger.info(f"Postprocessor: {processor_cfg.type}") # Run postprocessing start_time = datetime.now() results = model_run.postprocess( - processor=processor, + processor=processor_cfg, output_dir=output_dir, validate_outputs=validate_outputs, ) @@ -637,10 +663,16 @@ def list_backends( type=click.Choice(["local", "docker"]), help="Backend type to validate as", ) +@click.option( + "--processor-type", + type=click.Choice(["noop"]), + help="Postprocessor type to validate as (alternative to --backend-type)", +) @add_common_options def validate_backend_config( config_file, backend_type, + processor_type, verbose, log_dir, show_warnings, @@ -648,60 +680,91 @@ def validate_backend_config( simple_logs, config_from_env, ): - """Validate a backend configuration file.""" + """Validate a backend or postprocessor configuration file.""" configure_logging(verbose, log_dir, simple_logs, ascii_only, show_warnings) + # Check that only one type is specified + if backend_type and processor_type: + raise click.UsageError( + "Cannot specify both --backend-type and --processor-type. Choose one." + ) + try: - # Load configuration - config_data = load_config(config_file) - - # Determine backend type and extract config parameters - if backend_type: - config_type = backend_type - elif "backend_type" in config_data: - config_type = config_data.pop("backend_type") - elif "type" in config_data: - config_type = config_data.pop("type") - else: - raise click.UsageError( - "Backend type must be specified via --backend-type or 'type' field in config" + if processor_type: + # Validate postprocessor configuration + from rompy.postprocess.config import ( + _load_processor_config, + validate_postprocessor_config, + ) + + is_valid, message, config = validate_postprocessor_config( + config_file, processor_type ) - # Validate configuration - if config_type == "local": - config = LocalConfig(**config_data) - logger.info("✅ Local backend configuration is valid") - elif config_type == "docker": - config = DockerConfig(**config_data) - logger.info("✅ Docker backend configuration is valid") + if not is_valid: + raise click.UsageError(message) + + logger.info(f"✅ Postprocessor configuration is valid") + logger.info(f"Processor type: {config.type}") + logger.info(f"Timeout: {config.timeout}s") + if hasattr(config, "validate_outputs"): + logger.info(f"Validate outputs: {config.validate_outputs}") + if config.env_vars: + logger.info(f"Environment variables: {list(config.env_vars.keys())}") + if config.working_dir: + logger.info(f"Working directory: {config.working_dir}") + else: - raise click.UsageError(f"Unknown backend type: {config_type}") - - # Show configuration details - logger.info(f"Backend type: {config_type}") - logger.info(f"Timeout: {config.timeout}s") - if config.env_vars: - logger.info(f"Environment variables: {list(config.env_vars.keys())}") - if config.working_dir: - logger.info(f"Working directory: {config.working_dir}") - - # Type-specific details - if isinstance(config, LocalConfig): - if config.command: - logger.info(f"Command: {config.command}") - elif isinstance(config, DockerConfig): - if config.image: - logger.info(f"Image: {config.image}") - if config.dockerfile: - logger.info(f"Dockerfile: {config.dockerfile}") - logger.info(f"CPU: {config.cpu}") - if config.memory: - logger.info(f"Memory: {config.memory}") - if config.volumes: - logger.info(f"Volumes: {len(config.volumes)} mounts") + # Load configuration + config_data = load_config(config_file) + + # Determine backend type and extract config parameters + if backend_type: + config_type = backend_type + elif "backend_type" in config_data: + config_type = config_data.pop("backend_type") + elif "type" in config_data: + config_type = config_data.pop("type") + else: + raise click.UsageError( + "Backend type must be specified via --backend-type or 'type' field in config" + ) + + # Validate configuration + if config_type == "local": + config = LocalConfig(**config_data) + logger.info("✅ Local backend configuration is valid") + elif config_type == "docker": + config = DockerConfig(**config_data) + logger.info("✅ Docker backend configuration is valid") + else: + raise click.UsageError(f"Unknown backend type: {config_type}") + + # Show configuration details + logger.info(f"Backend type: {config_type}") + logger.info(f"Timeout: {config.timeout}s") + if config.env_vars: + logger.info(f"Environment variables: {list(config.env_vars.keys())}") + if config.working_dir: + logger.info(f"Working directory: {config.working_dir}") + + # Type-specific details + if isinstance(config, LocalConfig): + if config.command: + logger.info(f"Command: {config.command}") + elif isinstance(config, DockerConfig): + if config.image: + logger.info(f"Image: {config.image}") + if config.dockerfile: + logger.info(f"Dockerfile: {config.dockerfile}") + logger.info(f"CPU: {config.cpu}") + if config.memory: + logger.info(f"Memory: {config.memory}") + if config.volumes: + logger.info(f"Volumes: {len(config.volumes)} mounts") except Exception as e: - logger.error(f"❌ Backend configuration validation failed: {e}") + logger.error(f"❌ Configuration validation failed: {e}") if verbose > 0: logger.exception("Full traceback:") sys.exit(1) diff --git a/src/rompy/model.py b/src/rompy/model.py index 61dab175..406b3a5e 100644 --- a/src/rompy/model.py +++ b/src/rompy/model.py @@ -366,38 +366,48 @@ def run(self, backend: BackendConfig, workspace_dir: Optional[str] = None) -> bo # Pass the config object and workspace_dir to the backend return backend_instance.run(self, config=backend, workspace_dir=workspace_dir) - def postprocess(self, processor: str = "noop", **kwargs) -> Dict[str, Any]: + def postprocess( + self, processor: "BasePostprocessorConfig", **kwargs + ) -> Dict[str, Any]: """ - Postprocess the model outputs using the specified processor. + Postprocess the model outputs using the specified processor configuration. - This method uses entry points to load and execute the appropriate postprocessor. - Available processors are automatically discovered from the rompy.postprocess entry point group. - - Built-in processors: - - "noop": A placeholder processor that does nothing but returns success + This method uses the provided configuration to instantiate and execute + the appropriate postprocessor. The processor type is determined by the + configuration object. Args: - processor: Name of the postprocessor to use (default: "noop") - **kwargs: Additional processor-specific parameters + processor: Configuration object for the postprocessor to use + **kwargs: Additional processor-specific parameters (override config values) Returns: Dictionary with results from the postprocessing Raises: - ValueError: If the specified processor is not available + TypeError: If processor is not a BasePostprocessorConfig instance """ - # Get the requested postprocessor class from entry points - if processor not in POSTPROCESSORS: - available = list(POSTPROCESSORS.keys()) - raise ValueError( - f"Unknown postprocessor: {processor}. " - f"Available processors: {', '.join(available)}" + from rompy.postprocess.config import BasePostprocessorConfig + + if not isinstance(processor, BasePostprocessorConfig): + raise TypeError( + f"processor must be a BasePostprocessorConfig instance, " + f"got {type(processor).__name__}" ) - # Create an instance and process the outputs - processor_class = POSTPROCESSORS[processor] + # Get processor class from config + processor_class = processor.get_postprocessor_class() processor_instance = processor_class() - return processor_instance.process(self, **kwargs) + + # Extract processor-specific fields (exclude common base fields) + base_fields = {"timeout", "env_vars", "working_dir", "type"} + processor_fields = { + k: v for k, v in processor.model_dump().items() if k not in base_fields + } + + # Merge with any user-provided kwargs (kwargs take precedence) + processor_fields.update(kwargs) + + return processor_instance.process(self, **processor_fields) def pipeline(self, pipeline_backend: str = "local", **kwargs) -> Dict[str, Any]: """ diff --git a/src/rompy/pipeline/__init__.py b/src/rompy/pipeline/__init__.py index d0c051be..5aab946e 100644 --- a/src/rompy/pipeline/__init__.py +++ b/src/rompy/pipeline/__init__.py @@ -24,7 +24,7 @@ def execute( self, model_run, run_backend: str = "local", - processor: str = "noop", + processor: "BasePostprocessorConfig" = None, run_kwargs: Optional[Dict[str, Any]] = None, process_kwargs: Optional[Dict[str, Any]] = None, cleanup_on_failure: bool = False, @@ -36,7 +36,7 @@ def execute( Args: model_run: The ModelRun instance to execute run_backend: Backend to use for the run stage ("local" or "docker") - processor: Processor to use for the postprocess stage + processor: Processor configuration for the postprocess stage run_kwargs: Additional parameters for the run stage process_kwargs: Additional parameters for the postprocess stage cleanup_on_failure: Whether to cleanup outputs on pipeline failure @@ -48,7 +48,10 @@ def execute( Raises: ValueError: If model_run is invalid or parameters are invalid + TypeError: If processor is not a BasePostprocessorConfig instance """ + from rompy.postprocess.config import BasePostprocessorConfig + # Validate input parameters if not model_run: raise ValueError("model_run cannot be None") @@ -59,8 +62,14 @@ def execute( if not isinstance(run_backend, str) or not run_backend.strip(): raise ValueError("run_backend must be a non-empty string") - if not isinstance(processor, str) or not processor.strip(): - raise ValueError("processor must be a non-empty string") + if processor is None: + raise ValueError("processor configuration is required") + + if not isinstance(processor, BasePostprocessorConfig): + raise TypeError( + f"processor must be a BasePostprocessorConfig instance, " + f"got {type(processor).__name__}" + ) # Initialize parameters run_kwargs = run_kwargs or {} @@ -68,7 +77,7 @@ def execute( logger.info(f"Starting pipeline execution for run_id: {model_run.run_id}") logger.info( - f"Pipeline configuration: run_backend='{run_backend}', processor='{processor}'" + f"Pipeline configuration: run_backend='{run_backend}', processor='{processor.type}'" ) pipeline_results = { @@ -76,7 +85,7 @@ def execute( "run_id": model_run.run_id, "stages_completed": [], "run_backend": run_backend, - "processor": processor, + "processor": processor.type, } try: @@ -148,7 +157,7 @@ def execute( } # Stage 3: Postprocess outputs - logger.info(f"Stage 3: Postprocessing with {processor}") + logger.info(f"Stage 3: Postprocessing with {processor.type}") try: postprocess_results = model_run.postprocess( diff --git a/src/rompy/postprocess/__init__.py b/src/rompy/postprocess/__init__.py index e8113a53..3d4c0ced 100644 --- a/src/rompy/postprocess/__init__.py +++ b/src/rompy/postprocess/__init__.py @@ -1,15 +1,29 @@ """ -No-op postprocessor for model outputs. +Postprocessor module for ROMPY. -This module provides a basic postprocessor that does nothing. +This module provides postprocessor classes and their configurations for +processing model outputs after execution. """ import logging from pathlib import Path from typing import Any, Dict, Optional, Union +from .config import ( + BasePostprocessorConfig, + NoopPostprocessorConfig, + ProcessorConfig, +) + logger = logging.getLogger(__name__) +__all__ = [ + "NoopPostprocessor", + "NoopPostprocessorConfig", + "BasePostprocessorConfig", + "ProcessorConfig", +] + class NoopPostprocessor: """A postprocessor that does nothing. diff --git a/src/rompy/postprocess/config.py b/src/rompy/postprocess/config.py new file mode 100644 index 00000000..aa081740 --- /dev/null +++ b/src/rompy/postprocess/config.py @@ -0,0 +1,226 @@ +""" +Postprocessor configuration classes for ROMPY. + +This module provides Pydantic-based configuration classes for different +postprocessor types. These configurations handle transient execution parameters +while maintaining type safety and validation. +""" + +from abc import ABC, abstractmethod +from pathlib import Path +from typing import TYPE_CHECKING, Dict, Literal, Optional, Union + +from pydantic import BaseModel, ConfigDict, Field, field_validator + +if TYPE_CHECKING: + from . import NoopPostprocessor + + +class BasePostprocessorConfig(BaseModel, ABC): + """Base class for all postprocessor configurations. + + This class defines common configuration parameters that apply to all + postprocessor types, such as timeouts and environment variables. + """ + + timeout: int = Field( + 3600, + ge=60, + le=86400, + description="Maximum execution time in seconds (1 minute to 24 hours)", + ) + + env_vars: Dict[str, str] = Field( + default_factory=dict, + description="Additional environment variables to set during execution", + ) + + working_dir: Optional[Path] = Field( + None, + description="Working directory for execution (defaults to model output directory)", + ) + + model_config = ConfigDict( + validate_assignment=True, + extra="forbid", # Don't allow extra fields + use_enum_values=True, + ) + + @field_validator("working_dir") + @classmethod + def validate_working_dir(cls, v): + """Validate working directory exists if specified.""" + if v is not None: + path = Path(v) + if not path.exists(): + raise ValueError(f"Working directory does not exist: {path}") + if not path.is_dir(): + raise ValueError(f"Working directory is not a directory: {path}") + return v + + @field_validator("env_vars") + @classmethod + def validate_env_vars(cls, v): + """Validate environment variables.""" + if not isinstance(v, dict): + raise ValueError("env_vars must be a dictionary") + + for key, value in v.items(): + if not isinstance(key, str) or not isinstance(value, str): + raise ValueError("Environment variable keys and values must be strings") + if not key: + raise ValueError("Environment variable keys cannot be empty") + + return v + + @abstractmethod + def get_postprocessor_class(self): + """Return the postprocessor class that should handle this configuration. + + Returns: + The postprocessor class to use for execution + """ + pass + + +class NoopPostprocessorConfig(BasePostprocessorConfig): + """Configuration for no-operation postprocessor. + + This configuration is used when no postprocessing is required but output + validation may still be needed. It provides the simplest postprocessor + that can optionally validate that model outputs exist. + """ + + type: Literal["noop"] = "noop" + + validate_outputs: bool = Field( + True, description="Whether to validate that expected outputs exist" + ) + + def get_postprocessor_class(self): + """Return the NoopPostprocessor class.""" + from . import NoopPostprocessor + + return NoopPostprocessor + + model_config = ConfigDict( + json_schema_extra={ + "examples": [ + { + "type": "noop", + "timeout": 3600, + "validate_outputs": True, + }, + { + "type": "noop", + "timeout": 1800, + "validate_outputs": False, + "env_vars": {"DEBUG": "1"}, + }, + {"type": "noop", "working_dir": "/path/to/output/dir"}, + ] + } + ) + + +# Type alias for all postprocessor configurations +ProcessorConfig = Union[NoopPostprocessorConfig] + + +def _load_processor_config(config_file): + """Load postprocessor configuration from a YAML or JSON file. + + This function reads a configuration file, extracts the processor type, + and instantiates the appropriate configuration class using entry points. + + Args: + config_file: Path to the configuration file (YAML or JSON) + + Returns: + An instance of the appropriate postprocessor config class + + Raises: + ValueError: If the processor type is not found in registered entry points + FileNotFoundError: If the config file doesn't exist + yaml.YAMLError: If the file is neither valid JSON nor valid YAML + """ + import json + from importlib.metadata import entry_points + + path = Path(config_file) + + if not path.exists(): + raise FileNotFoundError(f"Config file not found: {config_file}") + + content = path.read_text() + + # Try JSON first, then YAML + try: + config_data = json.loads(content) + except json.JSONDecodeError: + import yaml + + config_data = yaml.safe_load(content) + + if not isinstance(config_data, dict): + raise ValueError( + f"Config file must contain a dictionary, got {type(config_data)}" + ) + + processor_type = config_data.pop("type", None) + + if processor_type is None: + raise ValueError("Config file must contain a 'type' field") + + # Load from entry point + eps = entry_points(group="rompy.postprocess.config") + for ep in eps: + if ep.name == processor_type: + config_class = ep.load() + return config_class(**config_data) + + # If we get here, type wasn't found + available = [ep.name for ep in eps] + if available: + available_str = ", ".join(available) + raise ValueError( + f"Unknown processor type: '{processor_type}'. Available types: {available_str}" + ) + else: + raise ValueError( + f"Unknown processor type: '{processor_type}'. No postprocessor types are registered." + ) + + +def validate_postprocessor_config(config_file, processor_type=None): + """Validate a postprocessor configuration file. + + This function validates that a configuration file is valid YAML/JSON, + contains a valid processor type, and can be instantiated. + + Args: + config_file: Path to the configuration file to validate + processor_type: Optional specific processor type to validate against. + If None, validates against the type in the config. + + Returns: + Tuple of (is_valid: bool, message: str, config: Optional[BasePostprocessorConfig]) + """ + try: + config = _load_processor_config(config_file) + + if processor_type is not None and config.type != processor_type: + return ( + False, + f"Config type '{config.type}' does not match expected type '{processor_type}'", + None, + ) + + return (True, f"Valid {config.type} configuration", config) + + except FileNotFoundError as e: + return (False, f"Config file not found: {e}", None) + except ValueError as e: + return (False, f"Validation error: {e}", None) + except Exception as e: + return (False, f"Unexpected error: {e}", None) diff --git a/tests/backends/test_enhanced_backends.py b/tests/backends/test_enhanced_backends.py index 9511f25c..a1197f97 100644 --- a/tests/backends/test_enhanced_backends.py +++ b/tests/backends/test_enhanced_backends.py @@ -17,6 +17,7 @@ from rompy.model import ModelRun from rompy.pipeline import LocalPipelineBackend from rompy.postprocess import NoopPostprocessor +from rompy.postprocess.config import NoopPostprocessorConfig from rompy.run import LocalRunBackend @@ -53,6 +54,12 @@ def model_run_with_run_method(tmp_path): ) +@pytest.fixture +def processor_config(): + """Create a NoopPostprocessorConfig for testing.""" + return NoopPostprocessorConfig(validate_outputs=False) + + class TestEnhancedLocalRunBackend: """Test the enhanced LocalRunBackend with validation and error handling.""" @@ -341,27 +348,29 @@ def test_execute_validation_invalid_run_backend(self, model_run): backend.execute(model_run, run_backend="") def test_execute_validation_invalid_processor(self, model_run): - """Test that execute raises ValueError for invalid processor.""" + """Test that execute raises TypeError for invalid processor type.""" backend = LocalPipelineBackend() - with pytest.raises(ValueError, match="processor must be a non-empty string"): - backend.execute(model_run, processor="") + with pytest.raises( + TypeError, match="must be a BasePostprocessorConfig instance" + ): + backend.execute(model_run, processor="noop") - def test_execute_generate_failure(self, model_run): + def test_execute_generate_failure(self, model_run, processor_config): """Test pipeline failure during generate stage.""" backend = LocalPipelineBackend() with patch( "rompy.model.ModelRun.generate", side_effect=Exception("Generate failed") ): - result = backend.execute(model_run) + result = backend.execute(model_run, processor=processor_config) assert result["success"] is False assert result["stage"] == "generate" assert "Generate failed" in result["message"] assert "generate" not in result["stages_completed"] - def test_execute_run_failure(self, model_run, tmp_path): + def test_execute_run_failure(self, model_run, tmp_path, processor_config): """Test pipeline failure during run stage.""" backend = LocalPipelineBackend() @@ -370,14 +379,14 @@ def test_execute_run_failure(self, model_run, tmp_path): with patch("rompy.model.ModelRun.generate", return_value=str(output_dir)): with patch("rompy.model.ModelRun.run", return_value=False): - result = backend.execute(model_run) + result = backend.execute(model_run, processor=processor_config) assert result["success"] is False assert result["stage"] == "run" assert "generate" in result["stages_completed"] assert "run" not in result["stages_completed"] - def test_execute_run_exception(self, model_run, tmp_path): + def test_execute_run_exception(self, model_run, tmp_path, processor_config): """Test pipeline failure during run stage with exception.""" backend = LocalPipelineBackend() @@ -386,13 +395,13 @@ def test_execute_run_exception(self, model_run, tmp_path): with patch("rompy.model.ModelRun.generate", return_value=str(output_dir)): with patch("rompy.model.ModelRun.run", side_effect=Exception("Run failed")): - result = backend.execute(model_run) + result = backend.execute(model_run, processor=processor_config) assert result["success"] is False assert result["stage"] == "run" assert "Run failed" in result["message"] - def test_execute_postprocess_failure(self, model_run, tmp_path): + def test_execute_postprocess_failure(self, model_run, tmp_path, processor_config): """Test pipeline failure during postprocess stage.""" backend = LocalPipelineBackend() @@ -405,7 +414,7 @@ def test_execute_postprocess_failure(self, model_run, tmp_path): "rompy.model.ModelRun.postprocess", side_effect=Exception("Postprocess failed"), ): - result = backend.execute(model_run) + result = backend.execute(model_run, processor=processor_config) assert result["success"] is False assert result["stage"] == "postprocess" @@ -413,7 +422,7 @@ def test_execute_postprocess_failure(self, model_run, tmp_path): assert "run" in result["stages_completed"] assert "postprocess" not in result["stages_completed"] - def test_execute_success_complete(self, model_run, tmp_path): + def test_execute_success_complete(self, model_run, tmp_path, processor_config): """Test successful complete pipeline execution.""" backend = LocalPipelineBackend() @@ -431,7 +440,7 @@ def test_execute_success_complete(self, model_run, tmp_path): result = backend.execute( model_run, run_backend="local", - processor="noop", + processor=processor_config, run_kwargs={"param1": "value1"}, process_kwargs={"param2": "value2"}, ) @@ -444,7 +453,9 @@ def test_execute_success_complete(self, model_run, tmp_path): assert "postprocess" in result["stages_completed"] assert result["message"] == "Pipeline completed successfully" - def test_execute_with_validation_failure(self, model_run, tmp_path): + def test_execute_with_validation_failure( + self, model_run, tmp_path, processor_config + ): """Test pipeline with stage validation failure.""" backend = LocalPipelineBackend() @@ -452,13 +463,17 @@ def test_execute_with_validation_failure(self, model_run, tmp_path): with patch( "rompy.model.ModelRun.generate", return_value=str(tmp_path / "nonexistent") ): - result = backend.execute(model_run, validate_stages=True) + result = backend.execute( + model_run, processor=processor_config, validate_stages=True + ) assert result["success"] is False assert result["stage"] == "generate" assert "not found after generation" in result["message"] - def test_execute_with_cleanup_on_failure(self, model_run, tmp_path): + def test_execute_with_cleanup_on_failure( + self, model_run, tmp_path, processor_config + ): """Test pipeline with cleanup on failure.""" backend = LocalPipelineBackend() @@ -469,7 +484,9 @@ def test_execute_with_cleanup_on_failure(self, model_run, tmp_path): with patch("rompy.model.ModelRun.generate", return_value=str(output_dir)): with patch("rompy.model.ModelRun.run", return_value=False): - result = backend.execute(model_run, cleanup_on_failure=True) + result = backend.execute( + model_run, processor=processor_config, cleanup_on_failure=True + ) assert result["success"] is False # Directory should be cleaned up @@ -502,7 +519,9 @@ def test_cleanup_outputs_failure(self, model_run, tmp_path): # Should not raise exception, just log warning backend._cleanup_outputs(model_run) - def test_execute_postprocess_warning_on_failure(self, model_run, tmp_path): + def test_execute_postprocess_warning_on_failure( + self, model_run, tmp_path, processor_config + ): """Test pipeline continues when postprocessing reports failure but doesn't raise.""" backend = LocalPipelineBackend() @@ -521,7 +540,7 @@ def test_execute_postprocess_warning_on_failure(self, model_run, tmp_path): "rompy.model.ModelRun.postprocess", return_value=mock_postprocess_result, ): - result = backend.execute(model_run) + result = backend.execute(model_run, processor=processor_config) # Pipeline should still succeed assert result["success"] is True diff --git a/tests/test_postprocess_config.py b/tests/test_postprocess_config.py new file mode 100644 index 00000000..4bdbffed --- /dev/null +++ b/tests/test_postprocess_config.py @@ -0,0 +1,234 @@ +""" +Tests for postprocessor configuration framework. + +This module tests the postprocessor config classes, loading, and validation. +""" + +import json +import tempfile +from pathlib import Path + +import pytest +import yaml + +from rompy.postprocess import ( + BasePostprocessorConfig, + NoopPostprocessorConfig, + ProcessorConfig, +) +from rompy.postprocess.config import ( + _load_processor_config, + validate_postprocessor_config, +) + + +class TestBasePostprocessorConfig: + """Tests for BasePostprocessorConfig abstract base class.""" + + def test_is_abstract(self): + """Test that BasePostprocessorConfig cannot be instantiated.""" + with pytest.raises(TypeError): + BasePostprocessorConfig() + + def test_can_subclass(self): + """Test that BasePostprocessorConfig can be subclassed.""" + + class TestConfig(BasePostprocessorConfig): + type: str = "test" + + def get_postprocessor_class(self): + return object + + # Should not raise + config = TestConfig() + assert config.timeout == 3600 + assert config.env_vars == {} + assert config.working_dir is None + + +class TestNoopPostprocessorConfig: + """Tests for NoopPostprocessorConfig concrete class.""" + + def test_default_values(self): + """Test default values are set correctly.""" + config = NoopPostprocessorConfig() + assert config.type == "noop" + assert config.validate_outputs is True + assert config.timeout == 3600 + assert config.env_vars == {} + assert config.working_dir is None + + def test_custom_values(self): + """Test custom values can be set.""" + config = NoopPostprocessorConfig( + validate_outputs=False, + timeout=7200, + env_vars={"DEBUG": "1"}, + ) + assert config.validate_outputs is False + assert config.timeout == 7200 + assert config.env_vars == {"DEBUG": "1"} + + def test_validation_rejects_invalid_timeout(self): + """Test that invalid timeout values are rejected.""" + with pytest.raises(ValueError): + NoopPostprocessorConfig(timeout=30) # Below minimum + + with pytest.raises(ValueError): + NoopPostprocessorConfig(timeout=90000) # Above maximum + + def test_get_postprocessor_class(self): + """Test that get_postprocessor_class returns NoopPostprocessor.""" + from rompy.postprocess import NoopPostprocessor + + config = NoopPostprocessorConfig() + processor_class = config.get_postprocessor_class() + assert processor_class is NoopPostprocessor + + +class TestLoadProcessorConfig: + """Tests for _load_processor_config function.""" + + def test_load_from_yaml(self, tmp_path): + """Test loading config from YAML file.""" + config_file = tmp_path / "test_config.yml" + config_data = {"type": "noop", "validate_outputs": False} + config_file.write_text(yaml.dump(config_data)) + + config = _load_processor_config(config_file) + assert isinstance(config, NoopPostprocessorConfig) + assert config.type == "noop" + assert config.validate_outputs is False + + def test_load_from_json(self, tmp_path): + """Test loading config from JSON file.""" + config_file = tmp_path / "test_config.json" + config_data = {"type": "noop", "timeout": 7200} + config_file.write_text(json.dumps(config_data)) + + config = _load_processor_config(config_file) + assert isinstance(config, NoopPostprocessorConfig) + assert config.timeout == 7200 + + def test_file_not_found(self, tmp_path): + """Test error when file doesn't exist.""" + config_file = tmp_path / "nonexistent.yml" + + with pytest.raises(FileNotFoundError): + _load_processor_config(config_file) + + def test_missing_type_field(self, tmp_path): + """Test error when type field is missing.""" + config_file = tmp_path / "bad_config.yml" + config_file.write_text(yaml.dump({"validate_outputs": True})) + + with pytest.raises(ValueError, match="type"): + _load_processor_config(config_file) + + def test_unknown_processor_type(self, tmp_path): + """Test error for unknown processor type.""" + config_file = tmp_path / "unknown_config.yml" + config_file.write_text(yaml.dump({"type": "unknown"})) + + with pytest.raises(ValueError, match="Unknown processor type"): + _load_processor_config(config_file) + + +class TestValidatePostprocessorConfig: + """Tests for validate_postprocessor_config function.""" + + def test_valid_config(self, tmp_path): + """Test validation passes for valid config.""" + config_file = tmp_path / "valid_config.yml" + config_file.write_text(yaml.dump({"type": "noop"})) + + is_valid, message, config = validate_postprocessor_config(config_file) + assert is_valid is True + assert "Valid" in message + assert isinstance(config, NoopPostprocessorConfig) + + def test_type_mismatch(self, tmp_path): + """Test validation fails when type doesn't match expected.""" + config_file = tmp_path / "config.yml" + config_file.write_text(yaml.dump({"type": "noop"})) + + is_valid, message, config = validate_postprocessor_config( + config_file, processor_type="other" + ) + assert is_valid is False + assert "does not match" in message + assert config is None + + def test_file_not_found(self, tmp_path): + """Test validation handles missing file.""" + config_file = tmp_path / "nonexistent.yml" + + is_valid, message, config = validate_postprocessor_config(config_file) + assert is_valid is False + assert "not found" in message + assert config is None + + +class TestProcessorConfigTypeAlias: + """Tests for ProcessorConfig type alias.""" + + def test_is_union(self): + """Test that ProcessorConfig is a Union type or single type.""" + import typing + + origin = typing.get_origin(ProcessorConfig) + # Union[SingleType] is optimized to just SingleType + assert origin is typing.Union or ProcessorConfig is NoopPostprocessorConfig + + def test_includes_noop(self): + """Test that ProcessorConfig includes NoopPostprocessorConfig.""" + import typing + + args = typing.get_args(ProcessorConfig) + # Union[SingleType] is optimized to just SingleType + assert ( + NoopPostprocessorConfig in args + or ProcessorConfig is NoopPostprocessorConfig + ) + + +class TestModelRunIntegration: + """Integration tests for ModelRun.postprocess with config objects.""" + + def test_postprocess_accepts_config(self): + """Test that ModelRun.postprocess accepts config objects.""" + from rompy.core.time import TimeRange + from rompy.model import ModelRun + from datetime import datetime + + model = ModelRun( + run_id="test", + period=TimeRange( + start=datetime(2020, 1, 1), + end=datetime(2020, 1, 2), + interval="1H", + ), + ) + + config = NoopPostprocessorConfig(validate_outputs=False) + # Should not raise TypeError + result = model.postprocess(config) + assert isinstance(result, dict) + + def test_postprocess_rejects_string(self): + """Test that ModelRun.postprocess rejects string processor names.""" + from rompy.core.time import TimeRange + from rompy.model import ModelRun + from datetime import datetime + + model = ModelRun( + run_id="test", + period=TimeRange( + start=datetime(2020, 1, 1), + end=datetime(2020, 1, 2), + interval="1H", + ), + ) + + with pytest.raises(TypeError, match="BasePostprocessorConfig"): + model.postprocess("noop") From 713eb3ddd39190c7e71762bec6084733fee10f2d Mon Sep 17 00:00:00 2001 From: Tom Durrant Date: Wed, 18 Feb 2026 17:09:53 +1100 Subject: [PATCH 2/2] Update documentation for postprocessor configuration framework Add comprehensive documentation for Pydantic-based postprocessor configuration system across all relevant docs: - CLI documentation: Added --processor-config and --processor-type options for postprocess, pipeline, and backends validate commands - Backends documentation: Added complete Postprocessor Configuration section with usage examples, validation, and CLI integration - Plugin architecture: Updated postprocessor section to reflect configuration-based approach with detailed implementation examples - Configuration deep dive: Added postprocessor configuration section with validation, serialization, and entry point loading - Backend reference: Comprehensive developer documentation for custom postprocessor configurations and implementations Key documentation additions: - Configuration file formats (YAML/JSON) - Entry point registration and discovery - Type-safe validation with Pydantic - Custom postprocessor configuration development - CLI integration and usage examples - Configuration serialization and schema generation - Integration with pipeline backends All documentation now reflects the new configuration-driven postprocessing API that replaces the old string-based processor selection. --- docs/backends.md | 332 +++++++++++++++++++--- docs/cli.md | 110 +++++++- docs/configuration_deep_dive.md | 85 ++++++ docs/developer/backend_reference.md | 423 +++++++++++++++++++++++++--- docs/plugin_architecture.md | 219 ++++++++++++-- 5 files changed, 1064 insertions(+), 105 deletions(-) diff --git a/docs/backends.md b/docs/backends.md index f44af306..8a7f60ee 100644 --- a/docs/backends.md +++ b/docs/backends.md @@ -508,21 +508,25 @@ For detailed implementation guidance, see [backend_reference](developer/backend_ ### Postprocessors -Handle results after model execution using postprocessor classes: +Handle results after model execution using postprocessor configuration classes: ```python -# Basic post-processing -results = model_run.postprocess(processor="archive") - -# Custom post-processing -results = model_run.postprocess( - processor="custom_analyzer", - output_format="netcdf", - compress=True +from rompy.postprocess.config import NoopPostprocessorConfig + +# Basic post-processing with configuration +processor_config = NoopPostprocessorConfig(validate_outputs=True) +results = model_run.postprocess(processor=processor_config) + +# Custom post-processing with advanced options +processor_config = NoopPostprocessorConfig( + validate_outputs=True, + timeout=7200, + env_vars={"DEBUG": "1"} ) +results = model_run.postprocess(processor=processor_config) ``` -For available postprocessors, see `rompy.backends.postprocessors`. +For postprocessor configuration details, see [Postprocessor Configuration](#postprocessor-configuration). ### Schema Generation @@ -541,6 +545,259 @@ with open("local_schema.json", "w") as f: json.dump(local_schema, f, indent=2) ``` +## Postprocessor Configuration + +Postprocessors handle model output analysis and transformation using Pydantic-based configuration classes. + +### Configuration Types + +All postprocessor configurations inherit from `rompy.postprocess.config.BasePostprocessorConfig`. + +#### NoopPostprocessorConfig - Validation Only + +Validate model outputs without additional processing using `rompy.postprocess.config.NoopPostprocessorConfig`: + +**Basic Usage:** + +```python +from rompy.postprocess.config import NoopPostprocessorConfig + +config = NoopPostprocessorConfig( + validate_outputs=True, + timeout=3600 +) +``` + +**Advanced Configuration:** + +```yaml +# noop_advanced.yml +type: noop +validate_outputs: true +timeout: 7200 +env_vars: + DEBUG: "1" + LOG_LEVEL: "INFO" +working_dir: "./processing" +``` + +**Key Parameters:** + +* `validate_outputs`: Validate model outputs before processing (default: False) +* `timeout`: Maximum processing time in seconds (60-86400) +* `env_vars`: Environment variables for processing context +* `working_dir`: Working directory for processing operations + +For complete parameter documentation, see `rompy.postprocess.config.NoopPostprocessorConfig`. + +### Using Postprocessor Configurations + +#### With ModelRun + +Postprocessor configurations integrate directly with Rompy's model execution: + +```python +from rompy.model import ModelRun +from rompy.postprocess.config import NoopPostprocessorConfig + +# Load your model +model_run = ModelRun.from_file("model_config.yml") + +# Execute model +model_run.run(backend=backend_config) + +# Post-process with configuration +processor_config = NoopPostprocessorConfig( + validate_outputs=True, + timeout=3600 +) +results = model_run.postprocess(processor=processor_config) + +if results["success"]: + print("Post-processing completed successfully") +else: + print(f"Post-processing failed: {results.get('error')}") +``` + +#### From Configuration Files + +Load postprocessor configurations from YAML or JSON files: + +```python +from rompy.postprocess.config import _load_processor_config + +# Load configuration from file +processor_config = _load_processor_config("processor.yml") + +# Use configuration +results = model_run.postprocess(processor=processor_config) +``` + +### Configuration File Format + +Postprocessor configurations use YAML or JSON with a `type` field: + +**YAML Format:** + +```yaml +# Basic configuration +type: noop +validate_outputs: true +timeout: 3600 + +--- +# Advanced configuration +type: noop +validate_outputs: true +timeout: 7200 +env_vars: + DEBUG: "1" + LOG_LEVEL: "INFO" + PROCESSING_MODE: "detailed" +working_dir: "./processing" +``` + +**JSON Format:** + +```json +{ + "type": "noop", + "validate_outputs": true, + "timeout": 3600, + "env_vars": { + "DEBUG": "1" + } +} +``` + +### CLI Integration + +Use postprocessor configurations with CLI commands: + +```bash +# Post-process existing outputs +rompy postprocess model_config.yml --processor-config processor.yml + +# Run complete pipeline with postprocessor +rompy pipeline model_config.yml \ + --run-backend local \ + --processor-config processor.yml + +# Validate postprocessor configuration +rompy backends validate processor.yml --processor-type noop +``` + +### Validation and Error Handling + +#### Type Safety + +Pydantic provides comprehensive validation: + +```python +from rompy.postprocess.config import NoopPostprocessorConfig +from pydantic import ValidationError + +try: + # Invalid timeout (too short) + config = NoopPostprocessorConfig(timeout=30) +except ValidationError as e: + print(f"Validation error: {e}") + +try: + # Invalid env_vars type + config = NoopPostprocessorConfig(env_vars=["invalid"]) +except ValidationError as e: + print(f"Configuration error: {e}") +``` + +#### Configuration Validation + +Each configuration class validates fields according to processing requirements: + +**BasePostprocessorConfig Validation:** + +* `timeout` must be between 60 and 86400 seconds +* `env_vars` must be string key-value pairs +* `working_dir` must exist if specified +* `validate_outputs` must be boolean + +### Custom Postprocessor Configurations + +Create custom postprocessor configurations by inheriting from `BasePostprocessorConfig`: + +```python +from rompy.postprocess.config import BasePostprocessorConfig +from pydantic import Field +from typing import Optional + +class AnalysisPostprocessorConfig(BasePostprocessorConfig): + """Configuration for analysis postprocessor.""" + + type: str = Field("analysis", const=True) + metrics: list[str] = Field(default_factory=list, description="Metrics to calculate") + output_format: str = Field("netcdf", description="Output format") + compress: bool = Field(True, description="Compress output files") + plot_config: Optional[dict] = Field(None, description="Plotting configuration") + + def get_postprocessor_class(self): + """Return the postprocessor class for this configuration.""" + from mypackage.postprocess import AnalysisPostprocessor + return AnalysisPostprocessor +``` + +Register custom configurations in `pyproject.toml`: + +```toml +[project.entry-points."rompy.postprocess.config"] +analysis = "mypackage.postprocess.config:AnalysisPostprocessorConfig" +``` + +### Best Practices + +1. **Version Control**: Keep postprocessor configurations in version control alongside model configurations + +2. **Environment Variables**: Use environment variables for environment-specific settings: + +```python +import os +config = NoopPostprocessorConfig( + env_vars={"DATA_DIR": os.environ["DATA_DIR"]} +) +``` + +3. **Validation**: Always validate configurations before production use: + +```bash +rompy backends validate processor.yml --processor-type noop +``` + +4. **Documentation**: Document postprocessor configurations with comments: + +```yaml +# Production post-processing configuration +type: noop +validate_outputs: true # Ensure all expected outputs exist +timeout: 7200 # 2 hours for large model outputs +env_vars: + DEBUG: "0" # Disable debug mode in production +``` + +### Schema Generation + +Generate configuration schemas for validation and documentation: + +```python +from rompy.postprocess.config import NoopPostprocessorConfig +import json + +# Generate JSON schema +schema = NoopPostprocessorConfig.model_json_schema() + +# Save for external validation +with open("noop_schema.json", "w") as f: + json.dump(schema, f, indent=2) +``` + ## Integration Examples ### Complete Workflow Example @@ -577,34 +834,39 @@ else: ### Pipeline Integration ```python -from rompy.pipeline import Pipeline +from rompy.pipeline import LocalPipelineBackend from rompy.backends import LocalConfig, DockerConfig +from rompy.postprocess.config import NoopPostprocessorConfig # Create pipeline with different backends for different stages -pipeline = Pipeline([ - { - "name": "preprocessing", - "backend": LocalConfig(timeout=1800), - "command": "python preprocess.py" - }, - { - "name": "simulation", - "backend": DockerConfig( - image="swan:latest", - cpu=16, - memory="32g", - timeout=14400 - ) - }, - { - "name": "postprocessing", - "backend": LocalConfig(timeout=3600), - "command": "python postprocess.py" - } -]) +backend = LocalPipelineBackend() + +# Configure run backend +run_config = DockerConfig( + image="swan:latest", + cpu=16, + memory="32g", + timeout=14400 +) + +# Configure postprocessor +processor_config = NoopPostprocessorConfig( + validate_outputs=True, + timeout=3600 +) # Execute pipeline -results = pipeline.run() +results = backend.execute( + model_run=model_run, + run_backend=run_config, + processor_config=processor_config, + cleanup_on_failure=False +) + +if results["success"]: + print(f"Pipeline completed. Stages: {results['stages_completed']}") +else: + print(f"Pipeline failed at stage: {results['stages_completed'][-1]}") ``` ## Notebook Examples @@ -621,8 +883,10 @@ For complete API documentation, see: * `rompy.backends.config.BaseBackendConfig` - Base configuration class * `rompy.backends.config.LocalConfig` - Local execution configuration * `rompy.backends.config.DockerConfig` - Docker execution configuration +* `rompy.postprocess.config.BasePostprocessorConfig` - Base postprocessor configuration +* `rompy.postprocess.config.NoopPostprocessorConfig` - No-op postprocessor configuration * `rompy.run` - Run backend implementations -* `rompy.backends.postprocessors` - Postprocessor implementations +* `rompy.postprocess` - Postprocessor implementations * [backend_reference](developer/backend_reference.md) - Comprehensive technical reference The backend system provides a robust, type-safe foundation for model execution while maintaining flexibility for different deployment scenarios. From simple local development to complex containerized production environments, the backend system adapts to your needs while ensuring consistent, reproducible results. \ No newline at end of file diff --git a/docs/cli.md b/docs/cli.md index bc507f8b..d9b00826 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -64,7 +64,7 @@ list : List available backends and configuration types validate -: Validate a backend configuration file +: Validate a backend or postprocessor configuration file schema : Generate JSON schema for backend configurations @@ -78,9 +78,12 @@ create # List available backends rompy backends list -# Validate configuration +# Validate backend configuration rompy backends validate my_config.yml --backend-type local +# Validate postprocessor configuration +rompy backends validate processor.yml --processor-type noop + # Generate schema rompy backends schema --backend-type docker --format json @@ -101,8 +104,8 @@ rompy pipeline [] [OPTIONS] `--run-backend TEXT` : Execution backend for run stage (default: local) -`--processor TEXT` -: Postprocessor to use (default: noop) +`--processor-config PATH` +: **Required.** YAML/JSON file with postprocessor configuration `--cleanup-on-failure, --no-cleanup` : Clean up outputs on pipeline failure (default: False) @@ -113,10 +116,41 @@ rompy pipeline [] [OPTIONS] `--config-from-env` : Load configuration from ROMPY_CONFIG environment variable instead of file -**Example:** +**Examples:** ```bash -rompy pipeline config.yaml --run-backend docker --processor analysis +# Run pipeline with postprocessor configuration +rompy pipeline config.yaml --processor-config processor.yml + +# Run pipeline with Docker backend and custom postprocessor config +rompy pipeline config.yaml --run-backend docker --processor-config noop.yml +``` + +### postprocess + +Run postprocessing on existing model outputs. + +```bash +rompy postprocess [] --processor-config [OPTIONS] +``` + +**Options:** + +`--processor-config PATH` +: **Required.** YAML/JSON file with postprocessor configuration + +`--config-from-env` +: Load configuration from ROMPY_CONFIG environment variable instead of file + +**Examples:** + +```bash +# Run postprocessing with config file +rompy postprocess model_config.yml --processor-config processor.yml + +# Use environment variable for model config +export ROMPY_CONFIG="$(cat model_config.yml)" +rompy postprocess --config-from-env --processor-config processor.yml ``` ### generate @@ -244,6 +278,48 @@ executable: "/usr/local/bin/swan" For complete configuration options, see [backend_reference](developer/backend_reference.md). +## Postprocessor Configuration Files + +Postprocessor configurations are defined in YAML or JSON files with a `type` field indicating the processor type: + +**No-op Postprocessor Configuration:** + +```yaml +type: noop +validate_outputs: true +timeout: 3600 +env_vars: + DEBUG: "1" + LOG_LEVEL: "INFO" +``` + +**Common Configuration Fields:** + +All postprocessor configurations inherit from `BasePostprocessorConfig` and support: + +* `validate_outputs` - Validate model outputs before processing (default: False) +* `timeout` - Maximum processing time in seconds (60-86400) +* `env_vars` - Environment variables as string key-value pairs +* `working_dir` - Working directory for processing operations + +**Loading Postprocessor Configurations:** + +Postprocessors are dynamically loaded via entry points defined in `pyproject.toml`: + +```toml +[project.entry-points."rompy.postprocess.config"] +noop = "rompy.postprocess.config:NoopPostprocessorConfig" +``` + +**Validation:** + +```bash +# Validate postprocessor configuration +rompy backends validate processor.yml --processor-type noop +``` + +For programmatic usage and custom postprocessor development, see [developer/backend_reference](developer/backend_reference.md). + ## Global Options All commands support these common options: @@ -291,6 +367,21 @@ Handle model output analysis and transformation: - **visualization**: Generate plots and animations - **netcdf**: NetCDF output processing and compression +**Configuration:** + +Postprocessors are configured using YAML/JSON files with a `type` field: + +```yaml +# noop_processor.yml +type: noop +validate_outputs: true +timeout: 3600 +env_vars: + DEBUG: "1" +``` + +For complete postprocessor configuration options, see [Postprocessor Configuration](#postprocessor-configuration-files). + ### Pipeline Backends Orchestrate complete workflows: @@ -314,7 +405,7 @@ Complete pipeline with analysis: ```bash rompy pipeline ocean_model.yaml \ --run-backend local \ - --processor analysis \ + --processor-config analysis.yml \ --validate-stages ``` @@ -363,7 +454,10 @@ pipeline: backend: local local: run_backend: docker - processor: analysis + processor_config: + type: analysis + validate_outputs: true + timeout: 3600 cleanup_on_failure: false ``` diff --git a/docs/configuration_deep_dive.md b/docs/configuration_deep_dive.md index f9b119f3..971ff15e 100644 --- a/docs/configuration_deep_dive.md +++ b/docs/configuration_deep_dive.md @@ -168,6 +168,91 @@ class MultiSourceConfig(BaseModel): Different ocean models have specific configuration requirements. See the [Model-Specific Guides](models.md) for detailed information about configuring SWAN, SCHISM, and other supported models. +## Postprocessor Configuration + +Postprocessors use Pydantic-based configuration classes for type-safe, validated parameter handling. All postprocessor configurations inherit from `BasePostprocessorConfig`. + +### Basic Postprocessor Configuration + +```python +from rompy.postprocess.config import NoopPostprocessorConfig + +# Create configuration programmatically +config = NoopPostprocessorConfig( + validate_outputs=True, + timeout=3600, + env_vars={"DEBUG": "1"} +) + +# Use with model +results = model_run.postprocess(processor=config) +``` + +### Loading from Files + +```python +from rompy.postprocess.config import _load_processor_config + +# Load from YAML file +config = _load_processor_config("processor.yml") + +# Use loaded configuration +results = model_run.postprocess(processor=config) +``` + +### Configuration Validation + +Pydantic provides comprehensive validation: + +```python +from rompy.postprocess.config import NoopPostprocessorConfig +from pydantic import ValidationError + +try: + # This will raise ValidationError if validation fails + config = NoopPostprocessorConfig( + timeout=30 # Invalid: below minimum of 60 + ) +except ValidationError as e: + print(f"Configuration validation error: {e}") +``` + +### Serialization and Reproducibility + +Save and load configurations for reproducibility: + +```python +import json +from pathlib import Path + +# Save configuration +config = NoopPostprocessorConfig(validate_outputs=True, timeout=7200) +config_dict = config.model_dump() + +with open("processor_config.json", "w") as f: + json.dump(config_dict, f, indent=2) + +# Load configuration +with open("processor_config.json", "r") as f: + config_data = json.load(f) + +# Reconstruct configuration +reconstructed = NoopPostprocessorConfig(**config_data) +``` + +### Entry Point-Based Loading + +Postprocessor configurations are dynamically loaded via entry points: + +```toml +# In pyproject.toml +[project.entry-points."rompy.postprocess.config"] +noop = "rompy.postprocess.config:NoopPostprocessorConfig" +analysis = "mypackage.config:AnalysisPostprocessorConfig" +``` + +The `_load_processor_config` function automatically discovers and loads the appropriate configuration class based on the `type` field. + ## Next Steps - Review the [Model-Specific Guides](models.md) for configuration details for specific models diff --git a/docs/developer/backend_reference.md b/docs/developer/backend_reference.md index a5f2b64d..105dfe87 100644 --- a/docs/developer/backend_reference.md +++ b/docs/developer/backend_reference.md @@ -389,69 +389,420 @@ For complete backend discovery implementation, see `rompy.backends`. ## Postprocessor System -Postprocessors handle model outputs after execution. The system supports built-in and custom postprocessors. +Postprocessors handle model outputs after execution using Pydantic-based configuration classes for type-safe parameter handling. -### Built-in Postprocessors +### Postprocessor Configuration Architecture -Available postprocessors include: +``` +BasePostprocessorConfig +├── NoopPostprocessorConfig # Validation-only processor +└── CustomPostprocessorConfig # User-defined configurations +``` + +### Configuration Loading + +Configurations are loaded from files using entry point discovery: -* **noop**: No-operation processor (default) -* **archive**: Archive outputs to compressed files -* **analyze**: Analyze model results -* **visualize**: Generate visualization outputs +```python +from rompy.postprocess.config import _load_processor_config + +# Load from YAML/JSON file +config = _load_processor_config("processor.yml") -For complete postprocessor documentation, see `rompy.backends.postprocessors`. +# The loader discovers the config class via entry points +# based on the 'type' field in the file +``` + +**Configuration File Format:** + +```yaml +# processor.yml +type: noop +validate_outputs: true +timeout: 3600 +env_vars: + DEBUG: "1" + LOG_LEVEL: "INFO" +``` + +For complete postprocessor configuration documentation, see `rompy.postprocess.config`. + +### Built-in Postprocessor Configurations + +**NoopPostprocessorConfig:** + +The no-operation processor validates outputs without additional processing: + +```python +from rompy.postprocess.config import NoopPostprocessorConfig + +config = NoopPostprocessorConfig( + validate_outputs=True, + timeout=3600, + env_vars={"DEBUG": "1"} +) + +results = model_run.postprocess(processor=config) +``` + +**Key Parameters:** + +* `validate_outputs`: Validate model outputs (default: False) +* `timeout`: Maximum processing time in seconds (60-86400) +* `env_vars`: Environment variables as string key-value pairs +* `working_dir`: Working directory for processing operations ### Usage Patterns +**Programmatic Usage:** + ```python -# Basic postprocessing -results = model_run.postprocess(processor="archive") +from rompy.postprocess.config import NoopPostprocessorConfig -# Custom postprocessing with options -results = model_run.postprocess( - processor="analyze", - output_format="netcdf", - compress=True, - analysis_type="spectral" +# Create configuration +config = NoopPostprocessorConfig( + validate_outputs=True, + timeout=7200 ) + +# Use in postprocessing +results = model_run.postprocess(processor=config) + +if results["success"]: + print("Post-processing completed") +else: + print(f"Post-processing failed: {results.get('error')}") +``` + +**From Configuration Files:** + +```python +from rompy.postprocess.config import _load_processor_config + +# Load configuration +config = _load_processor_config("processor.yml") + +# Use configuration +results = model_run.postprocess(processor=config) +``` + +**CLI Usage:** + +```bash +# Post-process with configuration file +rompy postprocess model.yml --processor-config processor.yml + +# Run complete pipeline with postprocessor +rompy pipeline model.yml \ + --run-backend local \ + --processor-config processor.yml + +# Validate postprocessor configuration +rompy backends validate processor.yml --processor-type noop ``` -### Custom Postprocessors +### Custom Postprocessor Configurations -Create custom postprocessors by implementing the processor interface: +Create custom postprocessor configurations by inheriting from `BasePostprocessorConfig`: ```python -from typing import Dict, Any +from rompy.postprocess.config import BasePostprocessorConfig +from pydantic import Field +from typing import Optional, List + +class AnalysisPostprocessorConfig(BasePostprocessorConfig): + """Configuration for analysis postprocessor.""" + + type: str = Field("analysis", const=True) + + # Analysis-specific fields + metrics: List[str] = Field( + default_factory=list, + description="Metrics to calculate" + ) + output_format: str = Field( + "netcdf", + description="Output format for results" + ) + compress: bool = Field( + True, + description="Compress output files" + ) + plot_config: Optional[dict] = Field( + None, + description="Configuration for plot generation" + ) + + def get_postprocessor_class(self): + """Return the postprocessor implementation class. + + This method is called to instantiate the actual postprocessor + that will handle the processing logic. + """ + from mypackage.postprocess import AnalysisPostprocessor + return AnalysisPostprocessor +``` -class CustomPostprocessor: - """Custom postprocessor example.""" +### Custom Postprocessor Implementation - def process(self, model_run, **kwargs) -> Dict[str, Any]: - """Process model outputs.""" - try: - # Custom processing logic here - output_dir = Path(model_run.output_dir) / model_run.run_id +Implement the postprocessor class that uses your configuration: - # Process files in output_dir - processed_files = self._process_outputs(output_dir, **kwargs) +```python +from pathlib import Path +from typing import Dict, Any +import logging +class AnalysisPostprocessor: + """Analysis postprocessor implementation.""" + + def __init__(self): + self.logger = logging.getLogger(__name__) + + def process( + self, + model_run, + config: AnalysisPostprocessorConfig, + **kwargs + ) -> Dict[str, Any]: + """Process model outputs using configuration. + + Args: + model_run: The ModelRun instance + config: The AnalysisPostprocessorConfig instance + **kwargs: Additional processor-specific parameters + + Returns: + dict: Processing results with success status + """ + try: + output_dir = Path(model_run.output_dir) / model_run.run_id + + # Validate outputs if requested + if config.validate_outputs: + self._validate_outputs(output_dir) + + # Calculate metrics from configuration + metrics = self._calculate_metrics( + output_dir, + metrics=config.metrics, + output_format=config.output_format + ) + + # Generate plots if configured + plots = [] + if config.plot_config: + plots = self._generate_plots(output_dir, config.plot_config) + + # Compress outputs if requested + if config.compress: + compressed_files = self._compress_outputs(output_dir) + else: + compressed_files = [] + return { "success": True, - "processed_files": processed_files, - "message": "Custom processing completed" + "metrics": metrics, + "plots": plots, + "compressed_files": compressed_files, + "message": "Analysis completed successfully" } - + except Exception as e: + self.logger.exception(f"Analysis failed: {e}") return { "success": False, - "error": str(e) + "error": str(e), + "message": f"Analysis failed: {e}" } - - def _process_outputs(self, output_dir, **kwargs): - """Implementation-specific processing.""" - # Custom processing logic + + def _validate_outputs(self, output_dir): + """Validate that expected outputs exist.""" + # Implementation details + pass + + def _calculate_metrics(self, output_dir, metrics, output_format): + """Calculate requested metrics.""" + # Implementation details + pass + + def _generate_plots(self, output_dir, plot_config): + """Generate plots based on configuration.""" + # Implementation details pass + + def _compress_outputs(self, output_dir): + """Compress output files.""" + # Implementation details + pass +``` + +### Entry Points Registration + +Register custom postprocessor configurations and implementations in `pyproject.toml`: + +```toml +[project.entry-points."rompy.postprocess.config"] +analysis = "mypackage.postprocess.config:AnalysisPostprocessorConfig" + +[project.entry-points."rompy.postprocess"] +analysis = "mypackage.postprocess:AnalysisPostprocessor" +``` + +### Configuration Discovery + +The system automatically discovers registered postprocessor configurations: + +```python +from importlib.metadata import entry_points + +# Get all available postprocessor configurations +eps = entry_points(group="rompy.postprocess.config") +available_configs = {ep.name: ep.load() for ep in eps} + +print("Available postprocessor configurations:", list(available_configs.keys())) + +# Use custom configuration +from mypackage.postprocess.config import AnalysisPostprocessorConfig + +config = AnalysisPostprocessorConfig( + validate_outputs=True, + metrics=["mean", "variance", "peak"], + output_format="netcdf", + compress=True, + plot_config={"figsize": (10, 8), "dpi": 300} +) + +success = model_run.postprocess(processor=config) +``` + +### Configuration Validation + +Pydantic provides comprehensive validation for postprocessor configurations: + +```python +from rompy.postprocess.config import NoopPostprocessorConfig +from pydantic import ValidationError + +try: + # Invalid timeout (too short) + config = NoopPostprocessorConfig(timeout=30) +except ValidationError as e: + for error in e.errors(): + print(f"Field {error['loc']}: {error['msg']}") + +try: + # Invalid env_vars type + config = NoopPostprocessorConfig(env_vars=["invalid"]) +except ValidationError as e: + print(f"Validation error: {e}") +``` + +**Common Validation Rules:** + +* `timeout`: Must be between 60 and 86400 seconds +* `env_vars`: Must be string key-value pairs +* `working_dir`: Must exist if specified +* `validate_outputs`: Must be boolean +* Custom fields: Validated according to field definitions + +### Configuration Serialization + +Save and load configurations for reproducibility: + +```python +import json +from rompy.postprocess.config import NoopPostprocessorConfig + +# Create configuration +config = NoopPostprocessorConfig( + validate_outputs=True, + timeout=7200, + env_vars={"DEBUG": "1"} +) + +# Serialize to dict +config_dict = config.model_dump() + +# Save to file +with open("processor_config.json", "w") as f: + json.dump(config_dict, f, indent=2) + +# Load from file +with open("processor_config.json") as f: + loaded_data = json.load(f) + +# Reconstruct configuration +reconstructed = NoopPostprocessorConfig(**loaded_data) +``` + +### Schema Generation + +Generate JSON schemas for validation and documentation: + +```python +from rompy.postprocess.config import NoopPostprocessorConfig +import json + +# Generate JSON schema +schema = NoopPostprocessorConfig.model_json_schema() + +# Save for external validation +with open("noop_schema.json", "w") as f: + json.dump(schema, f, indent=2) + +# Use schema for validation +import jsonschema + +config_data = { + "validate_outputs": True, + "timeout": 3600 +} + +try: + jsonschema.validate(config_data, schema) + print("Configuration is valid") +except jsonschema.ValidationError as e: + print(f"Validation error: {e.message}") +``` + +### Integration with Pipeline + +Postprocessor configurations integrate seamlessly with pipeline backends: + +```python +from rompy.pipeline import LocalPipelineBackend +from rompy.backends import DockerConfig +from rompy.postprocess.config import NoopPostprocessorConfig + +# Create pipeline backend +pipeline = LocalPipelineBackend() + +# Configure run backend +run_config = DockerConfig( + image="swan:latest", + cpu=8, + memory="16g", + timeout=7200 +) + +# Configure postprocessor +processor_config = NoopPostprocessorConfig( + validate_outputs=True, + timeout=3600 +) + +# Execute pipeline +results = pipeline.execute( + model_run=model_run, + run_backend=run_config, + processor_config=processor_config, + cleanup_on_failure=False +) + +if results["success"]: + print(f"Pipeline completed: {results['stages_completed']}") +else: + print(f"Pipeline failed: {results.get('error')}") ``` ## Best Practices diff --git a/docs/plugin_architecture.md b/docs/plugin_architecture.md index c1c2648a..5e7e7958 100644 --- a/docs/plugin_architecture.md +++ b/docs/plugin_architecture.md @@ -10,7 +10,8 @@ ROMPY implements three main plugin categories using Python entry points: 2. **Data Source Plugins (`rompy.source`)**: Custom data acquisition implementations. 3. **Execution Plugins**: Three subcategories: - **Run Backends (`rompy.run`)**: Model execution environments. - - **Postprocessors (`rompy.postprocess`)**: Output analysis and transformation. + - **Postprocessors (`rompy.postprocess`)**: Output analysis and transformation implementations. + - **Postprocessor Configurations (`rompy.postprocess.config`)**: Pydantic-based postprocessor configurations. - **Pipeline Backends (`rompy.pipeline`)**: Workflow orchestration. ## Dual Selection Pattern @@ -43,6 +44,9 @@ intake = "rompy.core.source:SourceIntake" [project.entry-points."rompy.run"] local = "rompy.run:LocalRunBackend" docker = "rompy.run.docker:DockerRunBackend" + +[project.entry-points."rompy.postprocess.config"] +noop = "rompy.postprocess.config:NoopPostprocessorConfig" ``` For basic plugin usage, please see the [Getting Started Guide](getting_started.md) and [Advanced Topics](backends.md). @@ -136,55 +140,142 @@ custom = "mypackage.backends:CustomRunBackend" ## Postprocessors -Postprocessors handle analysis and transformation of model outputs. They implement a `process()` method that returns a dictionary with results. +Postprocessors handle analysis and transformation of model outputs. They use Pydantic configuration classes for type-safe parameter handling. + +### Postprocessor Configuration + +All postprocessor configurations inherit from `BasePostprocessorConfig` and are registered via entry points: + +```toml +[project.entry-points."rompy.postprocess.config"] +noop = "rompy.postprocess.config:NoopPostprocessorConfig" +analysis = "mypackage.config:AnalysisPostprocessorConfig" +``` -### Built-in Postprocessors +### Built-in Postprocessor Configurations -#### No-op Processor +#### No-op Processor Configuration The `noop` processor provides basic validation without processing: ```python +from rompy.postprocess.config import NoopPostprocessorConfig + # Basic validation -results = model.postprocess(processor="noop") +config = NoopPostprocessorConfig(validate_outputs=True) +results = model.postprocess(processor=config) -# With custom validation -results = model.postprocess( - processor="noop", +# With custom configuration +config = NoopPostprocessorConfig( validate_outputs=True, - output_dir="./custom_output" + timeout=3600, + env_vars={"DEBUG": "1"} ) +results = model.postprocess(processor=config) ``` -### Custom Postprocessors +**From Configuration File:** -Create custom postprocessors by implementing the process interface: +```yaml +# processor.yml +type: noop +validate_outputs: true +timeout: 3600 +env_vars: + DEBUG: "1" + LOG_LEVEL: "INFO" +``` ```python +from rompy.postprocess.config import _load_processor_config + +# Load from file +config = _load_processor_config("processor.yml") +results = model.postprocess(processor=config) +``` + +### Custom Postprocessor Configurations + +Create custom postprocessor configurations by inheriting from `BasePostprocessorConfig`: + +```python +from rompy.postprocess.config import BasePostprocessorConfig +from pydantic import Field +from typing import Optional + +class AnalysisPostprocessorConfig(BasePostprocessorConfig): + """Configuration for analysis postprocessor.""" + + type: str = Field("analysis", const=True) + metrics: list[str] = Field( + default_factory=list, + description="Metrics to calculate" + ) + output_format: str = Field( + "netcdf", + description="Output format for results" + ) + compress: bool = Field( + True, + description="Compress output files" + ) + plot_config: Optional[dict] = Field( + None, + description="Configuration for plotting" + ) + + def get_postprocessor_class(self): + """Return the postprocessor implementation class.""" + from mypackage.postprocess import AnalysisPostprocessor + return AnalysisPostprocessor +``` + +### Custom Postprocessor Implementation + +Create the postprocessor implementation class: + +```python +from pathlib import Path +from typing import Dict, Any + class AnalysisPostprocessor: """Custom postprocessor for model analysis.""" - def process(self, model_run, **kwargs): - """Process model outputs. + def process(self, model_run, config: AnalysisPostprocessorConfig, **kwargs) -> Dict[str, Any]: + """Process model outputs with configuration. Args: model_run: The ModelRun instance - **kwargs: Processor-specific parameters + config: The AnalysisPostprocessorConfig instance + **kwargs: Additional processor-specific parameters Returns: - dict: Processing results + dict: Processing results with success status """ try: output_dir = Path(model_run.output_dir) / model_run.run_id - # Custom analysis logic - metrics = self._calculate_metrics(output_dir) - plots = self._generate_plots(output_dir) + # Use configuration parameters + metrics = self._calculate_metrics( + output_dir, + metrics=config.metrics, + output_format=config.output_format + ) + + if config.plot_config: + plots = self._generate_plots(output_dir, config.plot_config) + else: + plots = [] + + # Optionally compress outputs + if config.compress: + self._compress_outputs(output_dir) return { "success": True, "metrics": metrics, "plots": plots, + "compressed": config.compress, "message": "Analysis completed successfully" } @@ -194,13 +285,79 @@ class AnalysisPostprocessor: "error": str(e), "message": f"Analysis failed: {e}" } + + def _calculate_metrics(self, output_dir, metrics, output_format): + """Calculate requested metrics.""" + # Implementation details + pass + + def _generate_plots(self, output_dir, plot_config): + """Generate plots based on configuration.""" + # Implementation details + pass + + def _compress_outputs(self, output_dir): + """Compress output files.""" + # Implementation details + pass ``` -Register via entry points: +Register via entry points in `pyproject.toml`: ```toml +[project.entry-points."rompy.postprocess.config"] +analysis = "mypackage.postprocess.config:AnalysisPostprocessorConfig" + [project.entry-points."rompy.postprocess"] -analysis = "mypackage.processors:AnalysisPostprocessor" +analysis = "mypackage.postprocess:AnalysisPostprocessor" +``` + +### Usage Example + +```python +from mypackage.postprocess.config import AnalysisPostprocessorConfig + +# Create configuration +config = AnalysisPostprocessorConfig( + validate_outputs=True, + metrics=["mean", "variance", "peak"], + output_format="netcdf", + compress=True, + plot_config={ + "figsize": (10, 8), + "dpi": 300 + } +) + +# Use in model workflow +model = ModelRun.from_file("model.yml") +model.run(backend=backend_config) +results = model.postprocess(processor=config) + +if results["success"]: + print(f"Calculated metrics: {results['metrics']}") + print(f"Generated plots: {results['plots']}") +``` + +**CLI Usage:** + +```yaml +# analysis_processor.yml +type: analysis +validate_outputs: true +metrics: + - mean + - variance + - peak +output_format: netcdf +compress: true +plot_config: + figsize: [10, 8] + dpi: 300 +``` + +```bash +rompy postprocess model.yml --processor-config analysis_processor.yml ``` ## Pipeline Backends @@ -214,16 +371,22 @@ Pipeline backends orchestrate the complete model workflow from input generation The `local` pipeline executes all stages locally: ```python +from rompy.postprocess.config import NoopPostprocessorConfig + # Basic pipeline results = model.pipeline(pipeline_backend="local") -# With custom backends +# With custom configurations +processor_config = NoopPostprocessorConfig( + validate_outputs=True, + timeout=3600 +) + results = model.pipeline( pipeline_backend="local", run_backend="docker", - processor="analysis", + processor_config=processor_config, run_kwargs={"image": "rompy/model:latest", "cpu": 4}, - process_kwargs={"create_plots": True}, cleanup_on_failure=True ) ``` @@ -236,11 +399,13 @@ Create custom pipeline backends for distributed or cloud execution: class CloudPipelineBackend: """Pipeline backend for cloud execution.""" - def execute(self, model_run, **kwargs): + def execute(self, model_run, run_backend, processor_config, **kwargs): """Execute the complete pipeline. Args: model_run: The ModelRun instance + run_backend: Backend configuration for model execution + processor_config: BasePostprocessorConfig instance for postprocessing **kwargs: Pipeline-specific parameters Returns: @@ -258,7 +423,7 @@ class CloudPipelineBackend: results["stages_completed"].append("generate") # Stage 2: Submit to cloud - job_id = self._submit_cloud_job(model_run, **kwargs) + job_id = self._submit_cloud_job(model_run, run_backend, **kwargs) results["job_id"] = job_id results["stages_completed"].append("submit") @@ -266,9 +431,9 @@ class CloudPipelineBackend: self._wait_for_completion(job_id) results["stages_completed"].append("execute") - # Stage 4: Download and process results + # Stage 4: Download and process results with configuration outputs = self._download_results(job_id) - processed = self._process_outputs(outputs, **kwargs) + processed = self._process_outputs(outputs, processor_config) results["outputs"] = processed results["stages_completed"].append("postprocess")