Skip to content

Plugin Architecture

Rompy features a flexible plugin-based architecture that allows for extensible model execution and output processing. The system uses Python entry points to automatically discover and load plugins, making it easy to extend with custom implementations.

Core Plugin Categories

ROMPY implements three main plugin categories using Python entry points:

  1. Configuration Plugins (rompy.config): Model-specific configurations.
  2. Data Source Plugins (rompy.source): Custom data acquisition implementations.
  3. Execution Plugins: Three subcategories:
    • Run Backends (rompy.run): Model execution environments.
    • Postprocessors (rompy.postprocess): Output analysis and transformation implementations.
    • Postprocessor Configurations (rompy.postprocess.config): Pydantic-based postprocessor configurations.
    • Pipeline Backends (rompy.pipeline): Workflow orchestration.

Dual Selection Pattern

ROMPY uses two distinct selection patterns for different plugin types:

  • Pattern 1: Pydantic Discriminated Union (for configurations)

    • Selection: At model instantiation time via a model_type discriminator field.
    • State: The configuration becomes part of the persistent, serializable model state.
    • Benefit: Enables reproducible science with full validation.
  • Pattern 2: Runtime String Selection (for backends)

    • Selection: At execution time via string parameters.
    • State: Allows environment-specific deployment without changing the model configuration.
    • Benefit: Supports late binding and optional availability.

Entry Point Registration

All plugins are registered in pyproject.toml:

[project.entry-points."rompy.config"]
swan = "rompy.swan.config:SwanConfig"
schism = "rompy.schism.config:SCHISMConfig"

[project.entry-points."rompy.source"]
file = "rompy.core.source:SourceFile"
intake = "rompy.core.source:SourceIntake"

[project.entry-points."rompy.run"]
local = "rompy.run:LocalRunBackend"
docker = "rompy.run.docker:DockerRunBackend"

[project.entry-points."rompy.postprocess.config"]
noop = "rompy.postprocess.config:NoopPostprocessorConfig"

For basic plugin usage, please see the Getting Started Guide and Advanced Topics.

Run Backends

Run backends are responsible for executing models in different environments. They all implement a common interface with a run() method.

Built-in Run Backends

Local Backend

The local backend executes models directly on the local system:

# Basic local execution
success = model.run(backend="local")

# With custom command
success = model.run(
    backend="local",
    command="./my_model_executable",
    env_vars={"OMP_NUM_THREADS": "4"},
    timeout=3600
)

Docker Backend

The docker backend executes models inside Docker containers:

# Using pre-built image
success = model.run(
    backend="docker",
    image="rompy/schism:latest",
    executable="/usr/local/bin/schism",
    cpu=4,
    volumes=["./data:/data:ro"],
    env_vars={"MODEL_CONFIG": "production"}
)

# Building from Dockerfile
success = model.run(
    backend="docker",
    dockerfile="./docker/Dockerfile",
    build_args={"MODEL_VERSION": "1.0.0"},
    executable="/usr/local/bin/model",
    mpiexec="mpiexec",
    cpu=8
)

Custom Run Backends

You can create custom run backends by implementing the run interface:

class CustomRunBackend:
    """Custom run backend example."""

    def run(self, model_run, **kwargs):
        """Execute the model run.

        Args:
            model_run: The ModelRun instance
            **kwargs: Backend-specific parameters

        Returns:
            bool: True if successful, False otherwise
        """
        try:
            # Generate model inputs
            model_run.generate()

            # Custom execution logic here
            return self._execute_custom_logic(model_run, **kwargs)

        except Exception as e:
            logger.exception(f"Custom backend failed: {e}")
            return False

Register custom backends via entry points in pyproject.toml:

[project.entry-points."rompy.run"]
custom = "mypackage.backends:CustomRunBackend"

Postprocessors

Postprocessors handle analysis and transformation of model outputs. They use Pydantic configuration classes for type-safe parameter handling.

Postprocessor Configuration

All postprocessor configurations inherit from BasePostprocessorConfig and are registered via entry points:

[project.entry-points."rompy.postprocess.config"]
noop = "rompy.postprocess.config:NoopPostprocessorConfig"
analysis = "mypackage.config:AnalysisPostprocessorConfig"

Built-in Postprocessor Configurations

No-op Processor Configuration

The noop processor provides basic validation without processing:

from rompy.postprocess.config import NoopPostprocessorConfig

# Basic validation
config = NoopPostprocessorConfig(validate_outputs=True)
results = model.postprocess(processor=config)

# With custom configuration
config = NoopPostprocessorConfig(
    validate_outputs=True,
    timeout=3600,
    env_vars={"DEBUG": "1"}
)
results = model.postprocess(processor=config)

From Configuration File:

# processor.yml
type: noop
validate_outputs: true
timeout: 3600
env_vars:
  DEBUG: "1"
  LOG_LEVEL: "INFO"
from rompy.postprocess.config import _load_processor_config

# Load from file
config = _load_processor_config("processor.yml")
results = model.postprocess(processor=config)

Custom Postprocessor Configurations

Create custom postprocessor configurations by inheriting from BasePostprocessorConfig:

from rompy.postprocess.config import BasePostprocessorConfig
from pydantic import Field
from typing import Optional

class AnalysisPostprocessorConfig(BasePostprocessorConfig):
    """Configuration for analysis postprocessor."""

    type: str = Field("analysis", const=True)
    metrics: list[str] = Field(
        default_factory=list,
        description="Metrics to calculate"
    )
    output_format: str = Field(
        "netcdf",
        description="Output format for results"
    )
    compress: bool = Field(
        True,
        description="Compress output files"
    )
    plot_config: Optional[dict] = Field(
        None,
        description="Configuration for plotting"
    )

    def get_postprocessor_class(self):
        """Return the postprocessor implementation class."""
        from mypackage.postprocess import AnalysisPostprocessor
        return AnalysisPostprocessor

Custom Postprocessor Implementation

Create the postprocessor implementation class:

from pathlib import Path
from typing import Dict, Any

class AnalysisPostprocessor:
    """Custom postprocessor for model analysis."""

    def process(self, model_run, config: AnalysisPostprocessorConfig, **kwargs) -> Dict[str, Any]:
        """Process model outputs with configuration.

        Args:
            model_run: The ModelRun instance
            config: The AnalysisPostprocessorConfig instance
            **kwargs: Additional processor-specific parameters

        Returns:
            dict: Processing results with success status
        """
        try:
            output_dir = Path(model_run.output_dir) / model_run.run_id

            # Use configuration parameters
            metrics = self._calculate_metrics(
                output_dir,
                metrics=config.metrics,
                output_format=config.output_format
            )

            if config.plot_config:
                plots = self._generate_plots(output_dir, config.plot_config)
            else:
                plots = []

            # Optionally compress outputs
            if config.compress:
                self._compress_outputs(output_dir)

            return {
                "success": True,
                "metrics": metrics,
                "plots": plots,
                "compressed": config.compress,
                "message": "Analysis completed successfully"
            }

        except Exception as e:
            return {
                "success": False,
                "error": str(e),
                "message": f"Analysis failed: {e}"
            }

    def _calculate_metrics(self, output_dir, metrics, output_format):
        """Calculate requested metrics."""
        # Implementation details
        pass

    def _generate_plots(self, output_dir, plot_config):
        """Generate plots based on configuration."""
        # Implementation details
        pass

    def _compress_outputs(self, output_dir):
        """Compress output files."""
        # Implementation details
        pass

Register via entry points in pyproject.toml:

[project.entry-points."rompy.postprocess.config"]
analysis = "mypackage.postprocess.config:AnalysisPostprocessorConfig"

[project.entry-points."rompy.postprocess"]
analysis = "mypackage.postprocess:AnalysisPostprocessor"

Usage Example

from mypackage.postprocess.config import AnalysisPostprocessorConfig

# Create configuration
config = AnalysisPostprocessorConfig(
    validate_outputs=True,
    metrics=["mean", "variance", "peak"],
    output_format="netcdf",
    compress=True,
    plot_config={
        "figsize": (10, 8),
        "dpi": 300
    }
)

# Use in model workflow
model = ModelRun.from_file("model.yml")
model.run(backend=backend_config)
results = model.postprocess(processor=config)

if results["success"]:
    print(f"Calculated metrics: {results['metrics']}")
    print(f"Generated plots: {results['plots']}")

CLI Usage:

# analysis_processor.yml
type: analysis
validate_outputs: true
metrics:
  - mean
  - variance
  - peak
output_format: netcdf
compress: true
plot_config:
  figsize: [10, 8]
  dpi: 300
rompy postprocess model.yml --processor-config analysis_processor.yml

Pipeline Backends

Pipeline backends orchestrate the complete model workflow from input generation through execution to output processing.

Built-in Pipeline Backends

Local Pipeline

The local pipeline executes all stages locally:

from rompy.postprocess.config import NoopPostprocessorConfig

# Basic pipeline
results = model.pipeline(pipeline_backend="local")

# With custom configurations
processor_config = NoopPostprocessorConfig(
    validate_outputs=True,
    timeout=3600
)

results = model.pipeline(
    pipeline_backend="local",
    run_backend="docker",
    processor_config=processor_config,
    run_kwargs={"image": "rompy/model:latest", "cpu": 4},
    cleanup_on_failure=True
)

Custom Pipeline Backends

Create custom pipeline backends for distributed or cloud execution:

class CloudPipelineBackend:
    """Pipeline backend for cloud execution."""

    def execute(self, model_run, run_backend, processor_config, **kwargs):
        """Execute the complete pipeline.

        Args:
            model_run: The ModelRun instance
            run_backend: Backend configuration for model execution
            processor_config: BasePostprocessorConfig instance for postprocessing
            **kwargs: Pipeline-specific parameters

        Returns:
            dict: Pipeline execution results
        """
        results = {
            "success": False,
            "run_id": model_run.run_id,
            "stages_completed": []
        }

        try:
            # Stage 1: Generate inputs
            model_run.generate()
            results["stages_completed"].append("generate")

            # Stage 2: Submit to cloud
            job_id = self._submit_cloud_job(model_run, run_backend, **kwargs)
            results["job_id"] = job_id
            results["stages_completed"].append("submit")

            # Stage 3: Wait for completion
            self._wait_for_completion(job_id)
            results["stages_completed"].append("execute")

            # Stage 4: Download and process results with configuration
            outputs = self._download_results(job_id)
            processed = self._process_outputs(outputs, processor_config)
            results["outputs"] = processed
            results["stages_completed"].append("postprocess")

            results["success"] = True
            return results

        except Exception as e:
            results["error"] = str(e)
            return results

Best Practices

Error Handling

  • Always wrap main logic in try-catch blocks
  • Return appropriate boolean/dict responses
  • Log errors with sufficient detail for debugging
  • Clean up resources on failure when possible

Parameter Validation

  • Validate required parameters early
  • Provide clear error messages for invalid inputs
  • Use type hints for better IDE support
  • Document all parameters in docstrings

Logging

  • Use structured logging with appropriate levels
  • Include run_id and context in log messages
  • Log progress for long-running operations
  • Avoid logging sensitive information

Resource Management

  • Clean up temporary files and directories
  • Handle timeouts gracefully
  • Implement proper cancellation mechanisms
  • Monitor resource usage for long-running processes

Testing

  • Write unit tests for all backend methods
  • Mock external dependencies (Docker, cloud APIs)
  • Test error conditions and edge cases
  • Include integration tests where appropriate

API Reference

run

Local execution backend for model runs.

This module provides the local run backend implementation.

Attributes

logger module-attribute

logger = getLogger(__name__)

Classes

LocalRunBackend

Execute models locally using the system's Python interpreter.

This is the simplest backend that just runs the model directly on the local system.

Source code in rompy/run/__init__.py
class LocalRunBackend:
    """Execute models locally using the system's Python interpreter.

    This is the simplest backend that just runs the model directly
    on the local system.
    """

    def run(
        self, model_run, config: "LocalConfig", workspace_dir: Optional[str] = None
    ) -> bool:
        """Run the model locally.

        Args:
            model_run: The ModelRun instance to execute
            config: LocalConfig instance with execution parameters
            workspace_dir: Path to the generated workspace directory (if None, will generate)

        Returns:
            True if execution was successful, False otherwise

        Raises:
            ValueError: If model_run is invalid
            TimeoutError: If execution exceeds timeout
        """
        # Validate input parameters
        if not model_run:
            raise ValueError("model_run cannot be None")

        if not hasattr(model_run, "run_id"):
            raise ValueError("model_run must have a run_id attribute")

        # Use config parameters
        exec_command = config.command
        exec_working_dir = config.working_dir
        exec_env_vars = config.env_vars
        exec_timeout = config.timeout
        exec_stream_output = getattr(config, "stream_output", False)

        logger.debug(
            f"Using LocalConfig: timeout={exec_timeout}, env_vars={list(exec_env_vars.keys())}"
        )

        logger.info(f"Starting local execution for run_id: {model_run.run_id}")

        try:
            # Use provided workspace or generate if not provided (for backwards compatibility)
            if workspace_dir is None:
                logger.warning(
                    "No workspace_dir provided, generating files (this may cause double generation in pipeline)"
                )
                staging_dir = model_run.generate()
                logger.info(f"Model inputs generated in: {staging_dir}")
            else:
                logger.info(f"Using provided workspace directory: {workspace_dir}")
                staging_dir = workspace_dir

            # Set working directory
            if exec_working_dir:
                work_dir = Path(exec_working_dir)
            else:
                work_dir = (
                    Path(staging_dir)
                    if staging_dir
                    else Path(model_run.output_dir) / model_run.run_id
                )

            if not work_dir.exists():
                logger.error(f"Working directory does not exist: {work_dir}")
                return False

            # Prepare environment
            env = os.environ.copy()
            if exec_env_vars:
                env.update(exec_env_vars)
                logger.debug(
                    f"Added environment variables: {list(exec_env_vars.keys())}"
                )

            # Execute command or config.run()
            if exec_command:
                success = self._execute_command(
                    exec_command, work_dir, env, exec_timeout, exec_stream_output
                )
            else:
                success = self._execute_config_run(model_run, work_dir, env)

            if success:
                logger.info(
                    f"Local execution completed successfully for run_id: {model_run.run_id}"
                )
            else:
                logger.error(f"Local execution failed for run_id: {model_run.run_id}")

            return success

        except TimeoutError:
            logger.error(f"Model execution timed out after {exec_timeout} seconds")
            raise
        except Exception as e:
            logger.exception(f"Failed to run model locally: {e}")
            return False

    def _execute_command(
        self,
        command: str,
        work_dir: Path,
        env: Dict[str, str],
        timeout: Optional[int],
        stream_output: bool = False,
    ) -> bool:
        """Execute a shell command.

        Args:
            command: Command to execute
            work_dir: Working directory
            env: Environment variables
            timeout: Execution timeout
            stream_output: Whether to stream output in real-time

        Returns:
            True if successful, False otherwise
        """
        if stream_output:
            return self._execute_command_streaming(command, work_dir, env, timeout)
        else:
            return self._execute_command_buffered(command, work_dir, env, timeout)

    def _execute_command_buffered(
        self, command: str, work_dir: Path, env: Dict[str, str], timeout: Optional[int]
    ) -> bool:
        """Execute a shell command with buffered output."""
        logger.info(f"Executing command: {command}")
        logger.debug(f"Working directory: {work_dir}")

        try:
            result = subprocess.run(
                command,
                shell=True,
                cwd=work_dir,
                env=env,
                timeout=timeout,
                capture_output=True,
                text=True,
                check=False,
            )

            if result.stdout:
                logger.info(f"Command stdout:\n{result.stdout}")
            if result.stderr:
                if result.returncode == 0:
                    logger.warning(f"Command stderr:\n{result.stderr}")
                else:
                    logger.error(f"Command stderr:\n{result.stderr}")

            if result.returncode == 0:
                logger.debug("Command completed successfully")
                return True
            else:
                logger.error(f"Command failed with return code: {result.returncode}")
                return False

        except subprocess.TimeoutExpired:
            logger.error(f"Command timed out after {timeout} seconds")
            raise TimeoutError(f"Command execution timed out after {timeout} seconds")
        except Exception as e:
            logger.exception(f"Error executing command: {e}")
            return False

    def _execute_command_streaming(
        self, command: str, work_dir: Path, env: Dict[str, str], timeout: Optional[int]
    ) -> bool:
        """Execute a shell command with streaming output."""
        logger.info(f"Executing command: {command}")
        logger.debug(f"Working directory: {work_dir}")

        try:
            process = subprocess.Popen(
                command,
                shell=True,
                cwd=work_dir,
                env=env,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True,
                bufsize=1,
            )

            # Capture output while streaming
            stdout_lines = []
            stderr_lines = []

            def read_stream(stream, lines, log_func):
                """Read from a stream and log each line."""
                for line in stream:
                    line = line.rstrip()
                    lines.append(line)
                    log_func(line)

            # Start threads to read stdout and stderr concurrently
            stdout_thread = threading.Thread(
                target=read_stream,
                args=(process.stdout, stdout_lines, logger.info),
            )
            stderr_thread = threading.Thread(
                target=read_stream,
                args=(process.stderr, stderr_lines, lambda msg: logger.warning(msg)),
            )

            stdout_thread.start()
            stderr_thread.start()

            # Wait for process to complete with timeout
            try:
                returncode = process.wait(timeout=timeout)
            except subprocess.TimeoutExpired:
                process.kill()
                process.wait()
                logger.error(f"Command timed out after {timeout} seconds")
                raise TimeoutError(
                    f"Command execution timed out after {timeout} seconds"
                )

            # Wait for reader threads to finish
            stdout_thread.join()
            stderr_thread.join()

            # Log remaining stderr if any (after process completed)
            # (Thread already handled most output, but capture any final lines)

            if returncode == 0:
                logger.debug("Command completed successfully")
                return True
            else:
                logger.error(f"Command failed with return code: {returncode}")
                if stderr_lines:
                    logger.error("Command stderr:\n" + "\n".join(stderr_lines))
                return False

        except Exception as e:
            logger.exception(f"Error executing command: {e}")
            return False

    def _execute_config_run(
        self, model_run, work_dir: Path, env: Dict[str, str]
    ) -> bool:
        """Execute the model using config.run() method.

        Args:
            model_run: The ModelRun instance
            work_dir: Working directory
            env: Environment variables

        Returns:
            True if successful, False otherwise
        """
        # Check if config has a run method
        if not hasattr(model_run.config, "run") or not callable(model_run.config.run):
            logger.warning(
                "Model config does not have a run method. Nothing to execute."
            )
            return True

        logger.info("Executing model using config.run() method")

        try:
            # Set working directory in environment for config.run()
            original_cwd = os.getcwd()
            os.chdir(work_dir)

            # Update environment
            original_env = {}
            for key, value in env.items():
                if key in os.environ:
                    original_env[key] = os.environ[key]
                os.environ[key] = value

            try:
                # Execute the config run method
                result = model_run.config.run(model_run)

                if isinstance(result, bool):
                    return result
                else:
                    logger.warning(f"config.run() returned non-boolean value: {result}")
                    return True

            finally:
                # Restore original environment and directory
                os.chdir(original_cwd)
                for key, value in env.items():
                    if key in original_env:
                        os.environ[key] = original_env[key]
                    else:
                        os.environ.pop(key, None)

        except Exception as e:
            logger.exception(f"Error in config.run(): {e}")
            return False
Functions
run
run(model_run, config: LocalConfig, workspace_dir: Optional[str] = None) -> bool

Run the model locally.

Parameters:

Name Type Description Default
model_run

The ModelRun instance to execute

required
config LocalConfig

LocalConfig instance with execution parameters

required
workspace_dir Optional[str]

Path to the generated workspace directory (if None, will generate)

None

Returns:

Type Description
bool

True if execution was successful, False otherwise

Raises:

Type Description
ValueError

If model_run is invalid

TimeoutError

If execution exceeds timeout

Source code in rompy/run/__init__.py
def run(
    self, model_run, config: "LocalConfig", workspace_dir: Optional[str] = None
) -> bool:
    """Run the model locally.

    Args:
        model_run: The ModelRun instance to execute
        config: LocalConfig instance with execution parameters
        workspace_dir: Path to the generated workspace directory (if None, will generate)

    Returns:
        True if execution was successful, False otherwise

    Raises:
        ValueError: If model_run is invalid
        TimeoutError: If execution exceeds timeout
    """
    # Validate input parameters
    if not model_run:
        raise ValueError("model_run cannot be None")

    if not hasattr(model_run, "run_id"):
        raise ValueError("model_run must have a run_id attribute")

    # Use config parameters
    exec_command = config.command
    exec_working_dir = config.working_dir
    exec_env_vars = config.env_vars
    exec_timeout = config.timeout
    exec_stream_output = getattr(config, "stream_output", False)

    logger.debug(
        f"Using LocalConfig: timeout={exec_timeout}, env_vars={list(exec_env_vars.keys())}"
    )

    logger.info(f"Starting local execution for run_id: {model_run.run_id}")

    try:
        # Use provided workspace or generate if not provided (for backwards compatibility)
        if workspace_dir is None:
            logger.warning(
                "No workspace_dir provided, generating files (this may cause double generation in pipeline)"
            )
            staging_dir = model_run.generate()
            logger.info(f"Model inputs generated in: {staging_dir}")
        else:
            logger.info(f"Using provided workspace directory: {workspace_dir}")
            staging_dir = workspace_dir

        # Set working directory
        if exec_working_dir:
            work_dir = Path(exec_working_dir)
        else:
            work_dir = (
                Path(staging_dir)
                if staging_dir
                else Path(model_run.output_dir) / model_run.run_id
            )

        if not work_dir.exists():
            logger.error(f"Working directory does not exist: {work_dir}")
            return False

        # Prepare environment
        env = os.environ.copy()
        if exec_env_vars:
            env.update(exec_env_vars)
            logger.debug(
                f"Added environment variables: {list(exec_env_vars.keys())}"
            )

        # Execute command or config.run()
        if exec_command:
            success = self._execute_command(
                exec_command, work_dir, env, exec_timeout, exec_stream_output
            )
        else:
            success = self._execute_config_run(model_run, work_dir, env)

        if success:
            logger.info(
                f"Local execution completed successfully for run_id: {model_run.run_id}"
            )
        else:
            logger.error(f"Local execution failed for run_id: {model_run.run_id}")

        return success

    except TimeoutError:
        logger.error(f"Model execution timed out after {exec_timeout} seconds")
        raise
    except Exception as e:
        logger.exception(f"Failed to run model locally: {e}")
        return False

postprocess

Postprocessor module for ROMPY.

This module provides postprocessor classes and their configurations for processing model outputs after execution.

Attributes

ProcessorConfig module-attribute

ProcessorConfig = Union[NoopPostprocessorConfig]

logger module-attribute

logger = getLogger(__name__)

Classes

BasePostprocessorConfig

Bases: BaseModel, ABC

Base class for all postprocessor configurations.

This class defines common configuration parameters that apply to all postprocessor types, such as timeouts and environment variables.

Source code in rompy/postprocess/config.py
class BasePostprocessorConfig(BaseModel, ABC):
    """Base class for all postprocessor configurations.

    This class defines common configuration parameters that apply to all
    postprocessor types, such as timeouts and environment variables.
    """

    timeout: int = Field(
        3600,
        ge=60,
        le=86400,
        description="Maximum execution time in seconds (1 minute to 24 hours)",
    )

    env_vars: Dict[str, str] = Field(
        default_factory=dict,
        description="Additional environment variables to set during execution",
    )

    working_dir: Optional[Path] = Field(
        None,
        description="Working directory for execution (defaults to model output directory)",
    )

    model_config = ConfigDict(
        validate_assignment=True,
        extra="forbid",  # Don't allow extra fields
        use_enum_values=True,
    )

    @field_validator("working_dir")
    @classmethod
    def validate_working_dir(cls, v):
        """Validate working directory exists if specified."""
        if v is not None:
            path = Path(v)
            if not path.exists():
                raise ValueError(f"Working directory does not exist: {path}")
            if not path.is_dir():
                raise ValueError(f"Working directory is not a directory: {path}")
        return v

    @field_validator("env_vars")
    @classmethod
    def validate_env_vars(cls, v):
        """Validate environment variables."""
        if not isinstance(v, dict):
            raise ValueError("env_vars must be a dictionary")

        for key, value in v.items():
            if not isinstance(key, str) or not isinstance(value, str):
                raise ValueError("Environment variable keys and values must be strings")
            if not key:
                raise ValueError("Environment variable keys cannot be empty")

        return v

    @abstractmethod
    def get_postprocessor_class(self):
        """Return the postprocessor class that should handle this configuration.

        Returns:
            The postprocessor class to use for execution
        """
        pass
Attributes
timeout class-attribute instance-attribute
timeout: int = Field(3600, ge=60, le=86400, description='Maximum execution time in seconds (1 minute to 24 hours)')
env_vars class-attribute instance-attribute
env_vars: Dict[str, str] = Field(default_factory=dict, description='Additional environment variables to set during execution')
working_dir class-attribute instance-attribute
working_dir: Optional[Path] = Field(None, description='Working directory for execution (defaults to model output directory)')
model_config class-attribute instance-attribute
model_config = ConfigDict(validate_assignment=True, extra='forbid', use_enum_values=True)
Functions
validate_working_dir classmethod
validate_working_dir(v)

Validate working directory exists if specified.

Source code in rompy/postprocess/config.py
@field_validator("working_dir")
@classmethod
def validate_working_dir(cls, v):
    """Validate working directory exists if specified."""
    if v is not None:
        path = Path(v)
        if not path.exists():
            raise ValueError(f"Working directory does not exist: {path}")
        if not path.is_dir():
            raise ValueError(f"Working directory is not a directory: {path}")
    return v
validate_env_vars classmethod
validate_env_vars(v)

Validate environment variables.

Source code in rompy/postprocess/config.py
@field_validator("env_vars")
@classmethod
def validate_env_vars(cls, v):
    """Validate environment variables."""
    if not isinstance(v, dict):
        raise ValueError("env_vars must be a dictionary")

    for key, value in v.items():
        if not isinstance(key, str) or not isinstance(value, str):
            raise ValueError("Environment variable keys and values must be strings")
        if not key:
            raise ValueError("Environment variable keys cannot be empty")

    return v
get_postprocessor_class abstractmethod
get_postprocessor_class()

Return the postprocessor class that should handle this configuration.

Returns:

Type Description

The postprocessor class to use for execution

Source code in rompy/postprocess/config.py
@abstractmethod
def get_postprocessor_class(self):
    """Return the postprocessor class that should handle this configuration.

    Returns:
        The postprocessor class to use for execution
    """
    pass

NoopPostprocessorConfig

Bases: BasePostprocessorConfig

Configuration for no-operation postprocessor.

This configuration is used when no postprocessing is required but output validation may still be needed. It provides the simplest postprocessor that can optionally validate that model outputs exist.

Source code in rompy/postprocess/config.py
class NoopPostprocessorConfig(BasePostprocessorConfig):
    """Configuration for no-operation postprocessor.

    This configuration is used when no postprocessing is required but output
    validation may still be needed. It provides the simplest postprocessor
    that can optionally validate that model outputs exist.
    """

    type: Literal["noop"] = "noop"

    validate_outputs: bool = Field(
        True, description="Whether to validate that expected outputs exist"
    )

    def get_postprocessor_class(self):
        """Return the NoopPostprocessor class."""
        from . import NoopPostprocessor

        return NoopPostprocessor

    model_config = ConfigDict(
        json_schema_extra={
            "examples": [
                {
                    "type": "noop",
                    "timeout": 3600,
                    "validate_outputs": True,
                },
                {
                    "type": "noop",
                    "timeout": 1800,
                    "validate_outputs": False,
                    "env_vars": {"DEBUG": "1"},
                },
                {"type": "noop", "working_dir": "/path/to/output/dir"},
            ]
        }
    )
Attributes
type class-attribute instance-attribute
type: Literal['noop'] = 'noop'
validate_outputs class-attribute instance-attribute
validate_outputs: bool = Field(True, description='Whether to validate that expected outputs exist')
model_config class-attribute instance-attribute
model_config = ConfigDict(json_schema_extra={'examples': [{'type': 'noop', 'timeout': 3600, 'validate_outputs': True}, {'type': 'noop', 'timeout': 1800, 'validate_outputs': False, 'env_vars': {'DEBUG': '1'}}, {'type': 'noop', 'working_dir': '/path/to/output/dir'}]})
Functions
get_postprocessor_class
get_postprocessor_class()

Return the NoopPostprocessor class.

Source code in rompy/postprocess/config.py
def get_postprocessor_class(self):
    """Return the NoopPostprocessor class."""
    from . import NoopPostprocessor

    return NoopPostprocessor

NoopPostprocessor

A postprocessor that does nothing.

This is a placeholder implementation that simply returns a success message. It's useful as a base class or for testing.

Source code in rompy/postprocess/__init__.py
class NoopPostprocessor:
    """A postprocessor that does nothing.

    This is a placeholder implementation that simply returns a success message.
    It's useful as a base class or for testing.
    """

    def process(
        self,
        model_run,
        validate_outputs: bool = True,
        output_dir: Optional[Union[str, Path]] = None,
        **kwargs,
    ) -> Dict[str, Any]:
        """Process the output of a model run (does nothing).

        Args:
            model_run: The ModelRun instance whose outputs to process
            validate_outputs: Whether to validate that output directory exists
            output_dir: Override output directory to check (defaults to model_run output)
            **kwargs: Additional parameters (unused)

        Returns:
            Dictionary with processing results

        Raises:
            ValueError: If model_run is invalid
        """
        # Validate input parameters
        if not model_run:
            raise ValueError("model_run cannot be None")

        if not hasattr(model_run, "run_id"):
            raise ValueError("model_run must have a run_id attribute")

        logger.info(f"Starting no-op postprocessing for run_id: {model_run.run_id}")

        try:
            # Determine output directory
            if output_dir:
                check_dir = Path(output_dir)
            else:
                check_dir = Path(model_run.output_dir) / model_run.run_id

            # Validate outputs if requested
            if validate_outputs:
                if not check_dir.exists():
                    logger.warning(f"Output directory does not exist: {check_dir}")
                    return {
                        "success": False,
                        "message": f"Output directory not found: {check_dir}",
                        "run_id": model_run.run_id,
                        "output_dir": str(check_dir),
                    }
                else:
                    # Count files in output directory
                    file_count = sum(1 for f in check_dir.rglob("*") if f.is_file())
                    logger.info(f"Found {file_count} output files in {check_dir}")

            logger.info(
                f"No-op postprocessing completed for run_id: {model_run.run_id}"
            )

            return {
                "success": True,
                "message": "No postprocessing requested - validation only",
                "run_id": model_run.run_id,
                "output_dir": str(check_dir),
                "validated": validate_outputs,
            }

        except Exception as e:
            logger.exception(f"Error in no-op postprocessor: {e}")
            return {
                "success": False,
                "message": f"Error in postprocessor: {str(e)}",
                "run_id": getattr(model_run, "run_id", "unknown"),
                "error": str(e),
            }
Functions
process
process(model_run, validate_outputs: bool = True, output_dir: Optional[Union[str, Path]] = None, **kwargs) -> Dict[str, Any]

Process the output of a model run (does nothing).

Parameters:

Name Type Description Default
model_run

The ModelRun instance whose outputs to process

required
validate_outputs bool

Whether to validate that output directory exists

True
output_dir Optional[Union[str, Path]]

Override output directory to check (defaults to model_run output)

None
**kwargs

Additional parameters (unused)

{}

Returns:

Type Description
Dict[str, Any]

Dictionary with processing results

Raises:

Type Description
ValueError

If model_run is invalid

Source code in rompy/postprocess/__init__.py
def process(
    self,
    model_run,
    validate_outputs: bool = True,
    output_dir: Optional[Union[str, Path]] = None,
    **kwargs,
) -> Dict[str, Any]:
    """Process the output of a model run (does nothing).

    Args:
        model_run: The ModelRun instance whose outputs to process
        validate_outputs: Whether to validate that output directory exists
        output_dir: Override output directory to check (defaults to model_run output)
        **kwargs: Additional parameters (unused)

    Returns:
        Dictionary with processing results

    Raises:
        ValueError: If model_run is invalid
    """
    # Validate input parameters
    if not model_run:
        raise ValueError("model_run cannot be None")

    if not hasattr(model_run, "run_id"):
        raise ValueError("model_run must have a run_id attribute")

    logger.info(f"Starting no-op postprocessing for run_id: {model_run.run_id}")

    try:
        # Determine output directory
        if output_dir:
            check_dir = Path(output_dir)
        else:
            check_dir = Path(model_run.output_dir) / model_run.run_id

        # Validate outputs if requested
        if validate_outputs:
            if not check_dir.exists():
                logger.warning(f"Output directory does not exist: {check_dir}")
                return {
                    "success": False,
                    "message": f"Output directory not found: {check_dir}",
                    "run_id": model_run.run_id,
                    "output_dir": str(check_dir),
                }
            else:
                # Count files in output directory
                file_count = sum(1 for f in check_dir.rglob("*") if f.is_file())
                logger.info(f"Found {file_count} output files in {check_dir}")

        logger.info(
            f"No-op postprocessing completed for run_id: {model_run.run_id}"
        )

        return {
            "success": True,
            "message": "No postprocessing requested - validation only",
            "run_id": model_run.run_id,
            "output_dir": str(check_dir),
            "validated": validate_outputs,
        }

    except Exception as e:
        logger.exception(f"Error in no-op postprocessor: {e}")
        return {
            "success": False,
            "message": f"Error in postprocessor: {str(e)}",
            "run_id": getattr(model_run, "run_id", "unknown"),
            "error": str(e),
        }

pipeline

Local pipeline backend for model execution.

This module provides the local pipeline backend implementation.

Attributes

logger module-attribute

logger = getLogger(__name__)

Classes

LocalPipelineBackend

Local pipeline backend that executes the full workflow locally.

This backend uses the existing generate(), run() and postprocess() methods to execute the complete pipeline locally.

Source code in rompy/pipeline/__init__.py
class LocalPipelineBackend:
    """Local pipeline backend that executes the full workflow locally.

    This backend uses the existing generate(), run() and postprocess() methods
    to execute the complete pipeline locally.
    """

    def execute(
        self,
        model_run,
        backend_config: Union[LocalConfig, DockerConfig, "SlurmConfig"] = None,
        processor: "BasePostprocessorConfig" = None,
        run_kwargs: Optional[Dict[str, Any]] = None,
        process_kwargs: Optional[Dict[str, Any]] = None,
        cleanup_on_failure: bool = False,
        validate_stages: bool = True,
        **kwargs,
    ) -> Dict[str, Any]:
        """Execute the model pipeline locally.

        Args:
            model_run: The ModelRun instance to execute
            backend_config: Backend configuration object (LocalConfig, DockerConfig, etc.)
            processor: Processor configuration for the postprocess stage
            run_kwargs: Additional parameters for the run stage (deprecated, use backend_config)
            process_kwargs: Additional parameters for the postprocess stage
            cleanup_on_failure: Whether to cleanup outputs on pipeline failure
            validate_stages: Whether to validate each stage before proceeding
            **kwargs: Additional parameters (for backward compatibility)

        Returns:
            Combined results from the pipeline execution

        Raises:
            ValueError: If model_run is invalid or parameters are invalid
            TypeError: If processor is not a BasePostprocessorConfig instance
        """
        from rompy.postprocess.config import BasePostprocessorConfig
        from rompy.backends.config import BaseBackendConfig

        # Validate input parameters
        if not model_run:
            raise ValueError("model_run cannot be None")

        if not hasattr(model_run, "run_id"):
            raise ValueError("model_run must have a run_id attribute")

        # Handle backward compatibility: accept run_backend string from kwargs
        if backend_config is None:
            run_backend = kwargs.get("run_backend")
            if run_backend:
                logger.warning(
                    "Passing run_backend as string is deprecated. "
                    "Use backend_config parameter instead."
                )
                run_kwargs = run_kwargs or {}
                backend_config = self._create_backend_config(run_backend, run_kwargs)
            else:
                raise ValueError(
                    "backend_config is required. Provide a BackendConfig instance."
                )

        if not isinstance(backend_config, BaseBackendConfig):
            raise TypeError(
                f"backend_config must be a BaseBackendConfig instance, "
                f"got {type(backend_config).__name__}"
            )

        if processor is None:
            raise ValueError("processor configuration is required")

        if not isinstance(processor, BasePostprocessorConfig):
            raise TypeError(
                f"processor must be a BasePostprocessorConfig instance, "
                f"got {type(processor).__name__}"
            )

        # Initialize parameters
        process_kwargs = process_kwargs or {}

        backend_type = backend_config.__class__.__name__.replace("Config", "").lower()
        logger.info(f"Starting pipeline execution for run_id: {model_run.run_id}")
        logger.info(
            f"Pipeline configuration: backend='{backend_type}', processor='{processor.type}'"
        )

        pipeline_results = {
            "success": False,
            "run_id": model_run.run_id,
            "stages_completed": [],
            "backend": backend_type,
            "processor": processor.type,
        }

        try:
            # Stage 1: Generate input files
            logger.info(f"Stage 1: Generating input files for {model_run.run_id}")

            try:
                staging_dir = model_run.generate()
                pipeline_results["staging_dir"] = (
                    str(staging_dir) if staging_dir else None
                )
                pipeline_results["stages_completed"].append("generate")
                logger.info(f"Input files generated successfully in: {staging_dir}")
            except Exception as e:
                logger.exception(f"Failed to generate input files: {e}")
                return {
                    **pipeline_results,
                    "stage": "generate",
                    "message": f"Input file generation failed: {str(e)}",
                    "error": str(e),
                }

            # Validate generation stage
            if validate_stages:
                output_dir = Path(model_run.output_dir) / model_run.run_id
                if not output_dir.exists():
                    logger.error(f"Output directory was not created: {output_dir}")
                    return {
                        **pipeline_results,
                        "stage": "generate",
                        "message": f"Output directory not found after generation: {output_dir}",
                    }

            # Stage 2: Run the model
            logger.info(f"Stage 2: Running model using {backend_type} backend")

            try:
                # Pass the generated workspace directory to avoid duplicate generation
                run_success = model_run.run(
                    backend=backend_config, workspace_dir=staging_dir
                )
                pipeline_results["run_success"] = run_success

                if not run_success:
                    logger.error("Model run failed")
                    if cleanup_on_failure:
                        self._cleanup_outputs(model_run)
                    return {
                        **pipeline_results,
                        "stage": "run",
                        "message": "Model run failed",
                    }

                pipeline_results["stages_completed"].append("run")
                logger.info("Model run completed successfully")

            except Exception as e:
                logger.exception(f"Error during model run: {e}")
                if cleanup_on_failure:
                    self._cleanup_outputs(model_run)
                return {
                    **pipeline_results,
                    "stage": "run",
                    "message": f"Model run error: {str(e)}",
                    "error": str(e),
                }

            # Stage 3: Postprocess outputs
            logger.info(f"Stage 3: Postprocessing with {processor.type}")

            try:
                postprocess_results = model_run.postprocess(
                    processor=processor, **process_kwargs
                )
                pipeline_results["postprocess_results"] = postprocess_results
                pipeline_results["stages_completed"].append("postprocess")

                # Check if postprocessing was successful
                if isinstance(
                    postprocess_results, dict
                ) and not postprocess_results.get("success", True):
                    logger.warning(
                        "Postprocessing reported failure but pipeline will continue"
                    )

                logger.info("Postprocessing completed")

            except Exception as e:
                logger.exception(f"Error during postprocessing: {e}")
                return {
                    **pipeline_results,
                    "stage": "postprocess",
                    "message": f"Postprocessing error: {str(e)}",
                    "error": str(e),
                }

            # Pipeline completed successfully
            pipeline_results["success"] = True
            pipeline_results["message"] = "Pipeline completed successfully"

            logger.info(
                f"Pipeline execution completed successfully for run_id: {model_run.run_id}"
            )
            return pipeline_results

        except Exception as e:
            logger.exception(f"Unexpected error in pipeline execution: {e}")
            if cleanup_on_failure:
                self._cleanup_outputs(model_run)
            return {
                **pipeline_results,
                "stage": "pipeline",
                "message": f"Pipeline error: {str(e)}",
                "error": str(e),
            }

    def _cleanup_outputs(self, model_run) -> None:
        """Clean up output files on pipeline failure.

        Args:
            model_run: The ModelRun instance
        """
        try:
            output_dir = Path(model_run.output_dir) / model_run.run_id
            if output_dir.exists():
                logger.info(f"Cleaning up output directory: {output_dir}")
                import shutil

                shutil.rmtree(output_dir)
                logger.info("Cleanup completed")
        except Exception as e:
            logger.warning(f"Failed to cleanup output directory: {e}")

    def _create_backend_config(self, run_backend: str, run_kwargs: Dict[str, Any]):
        """Create appropriate backend configuration from string name and kwargs.

        Args:
            run_backend: Backend name ("local" or "docker")
            run_kwargs: Additional configuration parameters

        Returns:
            Backend configuration object

        Raises:
            ValueError: If backend name is not supported
        """
        if run_backend == "local":
            # Filter kwargs to only include valid LocalConfig fields
            valid_fields = set(LocalConfig.model_fields.keys())
            filtered_kwargs = {k: v for k, v in run_kwargs.items() if k in valid_fields}
            if filtered_kwargs != run_kwargs:
                invalid_fields = set(run_kwargs.keys()) - valid_fields
                logger.warning(f"Ignoring invalid LocalConfig fields: {invalid_fields}")
            return LocalConfig(**filtered_kwargs)
        elif run_backend == "docker":
            # Filter kwargs to only include valid DockerConfig fields
            valid_fields = set(DockerConfig.model_fields.keys())
            filtered_kwargs = {k: v for k, v in run_kwargs.items() if k in valid_fields}
            if filtered_kwargs != run_kwargs:
                invalid_fields = set(run_kwargs.keys()) - valid_fields
                logger.warning(
                    f"Ignoring invalid DockerConfig fields: {invalid_fields}"
                )
            return DockerConfig(**filtered_kwargs)
        else:
            raise ValueError(
                f"Unsupported backend: {run_backend}. Supported: local, docker"
            )
Functions
execute
execute(model_run, backend_config: Union[LocalConfig, DockerConfig, SlurmConfig] = None, processor: BasePostprocessorConfig = None, run_kwargs: Optional[Dict[str, Any]] = None, process_kwargs: Optional[Dict[str, Any]] = None, cleanup_on_failure: bool = False, validate_stages: bool = True, **kwargs) -> Dict[str, Any]

Execute the model pipeline locally.

Parameters:

Name Type Description Default
model_run

The ModelRun instance to execute

required
backend_config Union[LocalConfig, DockerConfig, SlurmConfig]

Backend configuration object (LocalConfig, DockerConfig, etc.)

None
processor BasePostprocessorConfig

Processor configuration for the postprocess stage

None
run_kwargs Optional[Dict[str, Any]]

Additional parameters for the run stage (deprecated, use backend_config)

None
process_kwargs Optional[Dict[str, Any]]

Additional parameters for the postprocess stage

None
cleanup_on_failure bool

Whether to cleanup outputs on pipeline failure

False
validate_stages bool

Whether to validate each stage before proceeding

True
**kwargs

Additional parameters (for backward compatibility)

{}

Returns:

Type Description
Dict[str, Any]

Combined results from the pipeline execution

Raises:

Type Description
ValueError

If model_run is invalid or parameters are invalid

TypeError

If processor is not a BasePostprocessorConfig instance

Source code in rompy/pipeline/__init__.py
def execute(
    self,
    model_run,
    backend_config: Union[LocalConfig, DockerConfig, "SlurmConfig"] = None,
    processor: "BasePostprocessorConfig" = None,
    run_kwargs: Optional[Dict[str, Any]] = None,
    process_kwargs: Optional[Dict[str, Any]] = None,
    cleanup_on_failure: bool = False,
    validate_stages: bool = True,
    **kwargs,
) -> Dict[str, Any]:
    """Execute the model pipeline locally.

    Args:
        model_run: The ModelRun instance to execute
        backend_config: Backend configuration object (LocalConfig, DockerConfig, etc.)
        processor: Processor configuration for the postprocess stage
        run_kwargs: Additional parameters for the run stage (deprecated, use backend_config)
        process_kwargs: Additional parameters for the postprocess stage
        cleanup_on_failure: Whether to cleanup outputs on pipeline failure
        validate_stages: Whether to validate each stage before proceeding
        **kwargs: Additional parameters (for backward compatibility)

    Returns:
        Combined results from the pipeline execution

    Raises:
        ValueError: If model_run is invalid or parameters are invalid
        TypeError: If processor is not a BasePostprocessorConfig instance
    """
    from rompy.postprocess.config import BasePostprocessorConfig
    from rompy.backends.config import BaseBackendConfig

    # Validate input parameters
    if not model_run:
        raise ValueError("model_run cannot be None")

    if not hasattr(model_run, "run_id"):
        raise ValueError("model_run must have a run_id attribute")

    # Handle backward compatibility: accept run_backend string from kwargs
    if backend_config is None:
        run_backend = kwargs.get("run_backend")
        if run_backend:
            logger.warning(
                "Passing run_backend as string is deprecated. "
                "Use backend_config parameter instead."
            )
            run_kwargs = run_kwargs or {}
            backend_config = self._create_backend_config(run_backend, run_kwargs)
        else:
            raise ValueError(
                "backend_config is required. Provide a BackendConfig instance."
            )

    if not isinstance(backend_config, BaseBackendConfig):
        raise TypeError(
            f"backend_config must be a BaseBackendConfig instance, "
            f"got {type(backend_config).__name__}"
        )

    if processor is None:
        raise ValueError("processor configuration is required")

    if not isinstance(processor, BasePostprocessorConfig):
        raise TypeError(
            f"processor must be a BasePostprocessorConfig instance, "
            f"got {type(processor).__name__}"
        )

    # Initialize parameters
    process_kwargs = process_kwargs or {}

    backend_type = backend_config.__class__.__name__.replace("Config", "").lower()
    logger.info(f"Starting pipeline execution for run_id: {model_run.run_id}")
    logger.info(
        f"Pipeline configuration: backend='{backend_type}', processor='{processor.type}'"
    )

    pipeline_results = {
        "success": False,
        "run_id": model_run.run_id,
        "stages_completed": [],
        "backend": backend_type,
        "processor": processor.type,
    }

    try:
        # Stage 1: Generate input files
        logger.info(f"Stage 1: Generating input files for {model_run.run_id}")

        try:
            staging_dir = model_run.generate()
            pipeline_results["staging_dir"] = (
                str(staging_dir) if staging_dir else None
            )
            pipeline_results["stages_completed"].append("generate")
            logger.info(f"Input files generated successfully in: {staging_dir}")
        except Exception as e:
            logger.exception(f"Failed to generate input files: {e}")
            return {
                **pipeline_results,
                "stage": "generate",
                "message": f"Input file generation failed: {str(e)}",
                "error": str(e),
            }

        # Validate generation stage
        if validate_stages:
            output_dir = Path(model_run.output_dir) / model_run.run_id
            if not output_dir.exists():
                logger.error(f"Output directory was not created: {output_dir}")
                return {
                    **pipeline_results,
                    "stage": "generate",
                    "message": f"Output directory not found after generation: {output_dir}",
                }

        # Stage 2: Run the model
        logger.info(f"Stage 2: Running model using {backend_type} backend")

        try:
            # Pass the generated workspace directory to avoid duplicate generation
            run_success = model_run.run(
                backend=backend_config, workspace_dir=staging_dir
            )
            pipeline_results["run_success"] = run_success

            if not run_success:
                logger.error("Model run failed")
                if cleanup_on_failure:
                    self._cleanup_outputs(model_run)
                return {
                    **pipeline_results,
                    "stage": "run",
                    "message": "Model run failed",
                }

            pipeline_results["stages_completed"].append("run")
            logger.info("Model run completed successfully")

        except Exception as e:
            logger.exception(f"Error during model run: {e}")
            if cleanup_on_failure:
                self._cleanup_outputs(model_run)
            return {
                **pipeline_results,
                "stage": "run",
                "message": f"Model run error: {str(e)}",
                "error": str(e),
            }

        # Stage 3: Postprocess outputs
        logger.info(f"Stage 3: Postprocessing with {processor.type}")

        try:
            postprocess_results = model_run.postprocess(
                processor=processor, **process_kwargs
            )
            pipeline_results["postprocess_results"] = postprocess_results
            pipeline_results["stages_completed"].append("postprocess")

            # Check if postprocessing was successful
            if isinstance(
                postprocess_results, dict
            ) and not postprocess_results.get("success", True):
                logger.warning(
                    "Postprocessing reported failure but pipeline will continue"
                )

            logger.info("Postprocessing completed")

        except Exception as e:
            logger.exception(f"Error during postprocessing: {e}")
            return {
                **pipeline_results,
                "stage": "postprocess",
                "message": f"Postprocessing error: {str(e)}",
                "error": str(e),
            }

        # Pipeline completed successfully
        pipeline_results["success"] = True
        pipeline_results["message"] = "Pipeline completed successfully"

        logger.info(
            f"Pipeline execution completed successfully for run_id: {model_run.run_id}"
        )
        return pipeline_results

    except Exception as e:
        logger.exception(f"Unexpected error in pipeline execution: {e}")
        if cleanup_on_failure:
            self._cleanup_outputs(model_run)
        return {
            **pipeline_results,
            "stage": "pipeline",
            "message": f"Pipeline error: {str(e)}",
            "error": str(e),
        }

Next Steps