Skip to content

Plugin Architecture

Rompy features a flexible plugin-based architecture that allows for extensible model execution and output processing. The system uses Python entry points to automatically discover and load backends, making it easy to extend with custom implementations.

For basic plugin usage, please see the User Guide and Advanced Topics.

Architecture Overview

The plugin architecture is built around three main categories:

  1. Run Backends (rompy.run): Handle model execution in different environments
  2. Postprocessors (rompy.postprocess): Handle model output analysis and transformation
  3. Pipeline Backends (rompy.pipeline): Orchestrate complete model workflows

Each category uses Python entry points for automatic discovery and loading, allowing third-party packages to easily extend rompy's capabilities.

Run Backends

Run backends are responsible for executing models in different environments. They all implement a common interface with a run() method.

Built-in Run Backends

Local Backend

The local backend executes models directly on the local system:

# Basic local execution
success = model.run(backend="local")

# With custom command
success = model.run(
    backend="local",
    command="./my_model_executable",
    env_vars={"OMP_NUM_THREADS": "4"},
    timeout=3600
)

Docker Backend

The docker backend executes models inside Docker containers:

# Using pre-built image
success = model.run(
    backend="docker",
    image="rompy/schism:latest",
    executable="/usr/local/bin/schism",
    cpu=4,
    volumes=["./data:/data:ro"],
    env_vars={"MODEL_CONFIG": "production"}
)

# Building from Dockerfile
success = model.run(
    backend="docker",
    dockerfile="./docker/Dockerfile",
    build_args={"MODEL_VERSION": "1.0.0"},
    executable="/usr/local/bin/model",
    mpiexec="mpiexec",
    cpu=8
)

Custom Run Backends

You can create custom run backends by implementing the run interface:

class CustomRunBackend:
    """Custom run backend example."""

    def run(self, model_run, **kwargs):
        """Execute the model run.

        Args:
            model_run: The ModelRun instance
            **kwargs: Backend-specific parameters

        Returns:
            bool: True if successful, False otherwise
        """
        try:
            # Generate model inputs
            model_run.generate()

            # Custom execution logic here
            return self._execute_custom_logic(model_run, **kwargs)

        except Exception as e:
            logger.exception(f"Custom backend failed: {e}")
            return False

Register custom backends via entry points in pyproject.toml:

[project.entry-points."rompy.run"]
custom = "mypackage.backends:CustomRunBackend"

Postprocessors

Postprocessors handle analysis and transformation of model outputs. They implement a process() method that returns a dictionary with results.

Built-in Postprocessors

No-op Processor

The noop processor provides basic validation without processing:

# Basic validation
results = model.postprocess(processor="noop")

# With custom validation
results = model.postprocess(
    processor="noop",
    validate_outputs=True,
    output_dir="./custom_output"
)

Custom Postprocessors

Create custom postprocessors by implementing the process interface:

class AnalysisPostprocessor:
    """Custom postprocessor for model analysis."""

    def process(self, model_run, **kwargs):
        """Process model outputs.

        Args:
            model_run: The ModelRun instance
            **kwargs: Processor-specific parameters

        Returns:
            dict: Processing results
        """
        try:
            output_dir = Path(model_run.output_dir) / model_run.run_id

            # Custom analysis logic
            metrics = self._calculate_metrics(output_dir)
            plots = self._generate_plots(output_dir)

            return {
                "success": True,
                "metrics": metrics,
                "plots": plots,
                "message": "Analysis completed successfully"
            }

        except Exception as e:
            return {
                "success": False,
                "error": str(e),
                "message": f"Analysis failed: {e}"
            }

Register via entry points:

[project.entry-points."rompy.postprocess"]
analysis = "mypackage.processors:AnalysisPostprocessor"

Pipeline Backends

Pipeline backends orchestrate the complete model workflow from input generation through execution to output processing.

Built-in Pipeline Backends

Local Pipeline

The local pipeline executes all stages locally:

# Basic pipeline
results = model.pipeline(pipeline_backend="local")

# With custom backends
results = model.pipeline(
    pipeline_backend="local",
    run_backend="docker",
    processor="analysis",
    run_kwargs={"image": "rompy/model:latest", "cpu": 4},
    process_kwargs={"create_plots": True},
    cleanup_on_failure=True
)

Custom Pipeline Backends

Create custom pipeline backends for distributed or cloud execution:

class CloudPipelineBackend:
    """Pipeline backend for cloud execution."""

    def execute(self, model_run, **kwargs):
        """Execute the complete pipeline.

        Args:
            model_run: The ModelRun instance
            **kwargs: Pipeline-specific parameters

        Returns:
            dict: Pipeline execution results
        """
        results = {
            "success": False,
            "run_id": model_run.run_id,
            "stages_completed": []
        }

        try:
            # Stage 1: Generate inputs
            model_run.generate()
            results["stages_completed"].append("generate")

            # Stage 2: Submit to cloud
            job_id = self._submit_cloud_job(model_run, **kwargs)
            results["job_id"] = job_id
            results["stages_completed"].append("submit")

            # Stage 3: Wait for completion
            self._wait_for_completion(job_id)
            results["stages_completed"].append("execute")

            # Stage 4: Download and process results
            outputs = self._download_results(job_id)
            processed = self._process_outputs(outputs, **kwargs)
            results["outputs"] = processed
            results["stages_completed"].append("postprocess")

            results["success"] = True
            return results

        except Exception as e:
            results["error"] = str(e)
            return results

Best Practices

Error Handling

  • Always wrap main logic in try-catch blocks
  • Return appropriate boolean/dict responses
  • Log errors with sufficient detail for debugging
  • Clean up resources on failure when possible

Parameter Validation

  • Validate required parameters early
  • Provide clear error messages for invalid inputs
  • Use type hints for better IDE support
  • Document all parameters in docstrings

Logging

  • Use structured logging with appropriate levels
  • Include run_id and context in log messages
  • Log progress for long-running operations
  • Avoid logging sensitive information

Resource Management

  • Clean up temporary files and directories
  • Handle timeouts gracefully
  • Implement proper cancellation mechanisms
  • Monitor resource usage for long-running processes

Testing

  • Write unit tests for all backend methods
  • Mock external dependencies (Docker, cloud APIs)
  • Test error conditions and edge cases
  • Include integration tests where appropriate

API Reference

run

Local execution backend for model runs.

This module provides the local run backend implementation.

Attributes

logger module-attribute

logger = getLogger(__name__)

Classes

LocalRunBackend

Execute models locally using the system's Python interpreter.

This is the simplest backend that just runs the model directly on the local system.

Source code in rompy/run/__init__.py
class LocalRunBackend:
    """Execute models locally using the system's Python interpreter.

    This is the simplest backend that just runs the model directly
    on the local system.
    """

    def run(
        self, model_run, config: "LocalConfig", workspace_dir: Optional[str] = None
    ) -> bool:
        """Run the model locally.

        Args:
            model_run: The ModelRun instance to execute
            config: LocalConfig instance with execution parameters
            workspace_dir: Path to the generated workspace directory (if None, will generate)

        Returns:
            True if execution was successful, False otherwise

        Raises:
            ValueError: If model_run is invalid
            TimeoutError: If execution exceeds timeout
        """
        # Validate input parameters
        if not model_run:
            raise ValueError("model_run cannot be None")

        if not hasattr(model_run, "run_id"):
            raise ValueError("model_run must have a run_id attribute")

        # Use config parameters
        exec_command = config.command
        exec_working_dir = config.working_dir
        exec_env_vars = config.env_vars
        exec_timeout = config.timeout

        logger.debug(
            f"Using LocalConfig: timeout={exec_timeout}, env_vars={list(exec_env_vars.keys())}"
        )

        logger.info(f"Starting local execution for run_id: {model_run.run_id}")

        try:
            # Use provided workspace or generate if not provided (for backwards compatibility)
            if workspace_dir is None:
                logger.warning(
                    "No workspace_dir provided, generating files (this may cause double generation in pipeline)"
                )
                staging_dir = model_run.generate()
                logger.info(f"Model inputs generated in: {staging_dir}")
            else:
                logger.info(f"Using provided workspace directory: {workspace_dir}")
                staging_dir = workspace_dir

            # Set working directory
            if exec_working_dir:
                work_dir = Path(exec_working_dir)
            else:
                work_dir = (
                    Path(staging_dir)
                    if staging_dir
                    else Path(model_run.output_dir) / model_run.run_id
                )

            if not work_dir.exists():
                logger.error(f"Working directory does not exist: {work_dir}")
                return False

            # Prepare environment
            env = os.environ.copy()
            if exec_env_vars:
                env.update(exec_env_vars)
                logger.debug(
                    f"Added environment variables: {list(exec_env_vars.keys())}"
                )

            # Execute command or config.run()
            if exec_command:
                success = self._execute_command(
                    exec_command, work_dir, env, exec_timeout
                )
            else:
                success = self._execute_config_run(model_run, work_dir, env)

            if success:
                logger.info(
                    f"Local execution completed successfully for run_id: {model_run.run_id}"
                )
            else:
                logger.error(f"Local execution failed for run_id: {model_run.run_id}")

            return success

        except TimeoutError:
            logger.error(f"Model execution timed out after {exec_timeout} seconds")
            raise
        except Exception as e:
            logger.exception(f"Failed to run model locally: {e}")
            return False

    def _execute_command(
        self, command: str, work_dir: Path, env: Dict[str, str], timeout: Optional[int]
    ) -> bool:
        """Execute a shell command.

        Args:
            command: Command to execute
            work_dir: Working directory
            env: Environment variables
            timeout: Execution timeout

        Returns:
            True if successful, False otherwise
        """
        logger.info(f"Executing command: {command}")
        logger.debug(f"Working directory: {work_dir}")

        try:
            result = subprocess.run(
                command,
                shell=True,
                cwd=work_dir,
                env=env,
                timeout=timeout,
                capture_output=True,
                text=True,
                check=False,
            )

            # Log output
            if result.stdout:
                logger.info(f"Command stdout:\n{result.stdout}")
            if result.stderr:
                if result.returncode == 0:
                    logger.warning(f"Command stderr:\n{result.stderr}")
                else:
                    logger.error(f"Command stderr:\n{result.stderr}")

            if result.returncode == 0:
                logger.debug("Command completed successfully")
                return True
            else:
                logger.error(f"Command failed with return code: {result.returncode}")
                return False

        except subprocess.TimeoutExpired:
            logger.error(f"Command timed out after {timeout} seconds")
            raise TimeoutError(f"Command execution timed out after {timeout} seconds")
        except Exception as e:
            logger.exception(f"Error executing command: {e}")
            return False

    def _execute_config_run(
        self, model_run, work_dir: Path, env: Dict[str, str]
    ) -> bool:
        """Execute the model using config.run() method.

        Args:
            model_run: The ModelRun instance
            work_dir: Working directory
            env: Environment variables

        Returns:
            True if successful, False otherwise
        """
        # Check if config has a run method
        if not hasattr(model_run.config, "run") or not callable(model_run.config.run):
            logger.warning(
                "Model config does not have a run method. Nothing to execute."
            )
            return True

        logger.info("Executing model using config.run() method")

        try:
            # Set working directory in environment for config.run()
            original_cwd = os.getcwd()
            os.chdir(work_dir)

            # Update environment
            original_env = {}
            for key, value in env.items():
                if key in os.environ:
                    original_env[key] = os.environ[key]
                os.environ[key] = value

            try:
                # Execute the config run method
                result = model_run.config.run(model_run)

                if isinstance(result, bool):
                    return result
                else:
                    logger.warning(f"config.run() returned non-boolean value: {result}")
                    return True

            finally:
                # Restore original environment and directory
                os.chdir(original_cwd)
                for key, value in env.items():
                    if key in original_env:
                        os.environ[key] = original_env[key]
                    else:
                        os.environ.pop(key, None)

        except Exception as e:
            logger.exception(f"Error in config.run(): {e}")
            return False
Functions
run
run(model_run, config: LocalConfig, workspace_dir: Optional[str] = None) -> bool

Run the model locally.

Parameters:

Name Type Description Default
model_run

The ModelRun instance to execute

required
config LocalConfig

LocalConfig instance with execution parameters

required
workspace_dir Optional[str]

Path to the generated workspace directory (if None, will generate)

None

Returns:

Type Description
bool

True if execution was successful, False otherwise

Raises:

Type Description
ValueError

If model_run is invalid

TimeoutError

If execution exceeds timeout

Source code in rompy/run/__init__.py
def run(
    self, model_run, config: "LocalConfig", workspace_dir: Optional[str] = None
) -> bool:
    """Run the model locally.

    Args:
        model_run: The ModelRun instance to execute
        config: LocalConfig instance with execution parameters
        workspace_dir: Path to the generated workspace directory (if None, will generate)

    Returns:
        True if execution was successful, False otherwise

    Raises:
        ValueError: If model_run is invalid
        TimeoutError: If execution exceeds timeout
    """
    # Validate input parameters
    if not model_run:
        raise ValueError("model_run cannot be None")

    if not hasattr(model_run, "run_id"):
        raise ValueError("model_run must have a run_id attribute")

    # Use config parameters
    exec_command = config.command
    exec_working_dir = config.working_dir
    exec_env_vars = config.env_vars
    exec_timeout = config.timeout

    logger.debug(
        f"Using LocalConfig: timeout={exec_timeout}, env_vars={list(exec_env_vars.keys())}"
    )

    logger.info(f"Starting local execution for run_id: {model_run.run_id}")

    try:
        # Use provided workspace or generate if not provided (for backwards compatibility)
        if workspace_dir is None:
            logger.warning(
                "No workspace_dir provided, generating files (this may cause double generation in pipeline)"
            )
            staging_dir = model_run.generate()
            logger.info(f"Model inputs generated in: {staging_dir}")
        else:
            logger.info(f"Using provided workspace directory: {workspace_dir}")
            staging_dir = workspace_dir

        # Set working directory
        if exec_working_dir:
            work_dir = Path(exec_working_dir)
        else:
            work_dir = (
                Path(staging_dir)
                if staging_dir
                else Path(model_run.output_dir) / model_run.run_id
            )

        if not work_dir.exists():
            logger.error(f"Working directory does not exist: {work_dir}")
            return False

        # Prepare environment
        env = os.environ.copy()
        if exec_env_vars:
            env.update(exec_env_vars)
            logger.debug(
                f"Added environment variables: {list(exec_env_vars.keys())}"
            )

        # Execute command or config.run()
        if exec_command:
            success = self._execute_command(
                exec_command, work_dir, env, exec_timeout
            )
        else:
            success = self._execute_config_run(model_run, work_dir, env)

        if success:
            logger.info(
                f"Local execution completed successfully for run_id: {model_run.run_id}"
            )
        else:
            logger.error(f"Local execution failed for run_id: {model_run.run_id}")

        return success

    except TimeoutError:
        logger.error(f"Model execution timed out after {exec_timeout} seconds")
        raise
    except Exception as e:
        logger.exception(f"Failed to run model locally: {e}")
        return False

postprocess

No-op postprocessor for model outputs.

This module provides a basic postprocessor that does nothing.

Attributes

logger module-attribute

logger = getLogger(__name__)

Classes

NoopPostprocessor

A postprocessor that does nothing.

This is a placeholder implementation that simply returns a success message. It's useful as a base class or for testing.

Source code in rompy/postprocess/__init__.py
class NoopPostprocessor:
    """A postprocessor that does nothing.

    This is a placeholder implementation that simply returns a success message.
    It's useful as a base class or for testing.
    """

    def process(
        self,
        model_run,
        validate_outputs: bool = True,
        output_dir: Optional[Union[str, Path]] = None,
        **kwargs,
    ) -> Dict[str, Any]:
        """Process the output of a model run (does nothing).

        Args:
            model_run: The ModelRun instance whose outputs to process
            validate_outputs: Whether to validate that output directory exists
            output_dir: Override output directory to check (defaults to model_run output)
            **kwargs: Additional parameters (unused)

        Returns:
            Dictionary with processing results

        Raises:
            ValueError: If model_run is invalid
        """
        # Validate input parameters
        if not model_run:
            raise ValueError("model_run cannot be None")

        if not hasattr(model_run, "run_id"):
            raise ValueError("model_run must have a run_id attribute")

        logger.info(f"Starting no-op postprocessing for run_id: {model_run.run_id}")

        try:
            # Determine output directory
            if output_dir:
                check_dir = Path(output_dir)
            else:
                check_dir = Path(model_run.output_dir) / model_run.run_id

            # Validate outputs if requested
            if validate_outputs:
                if not check_dir.exists():
                    logger.warning(f"Output directory does not exist: {check_dir}")
                    return {
                        "success": False,
                        "message": f"Output directory not found: {check_dir}",
                        "run_id": model_run.run_id,
                        "output_dir": str(check_dir),
                    }
                else:
                    # Count files in output directory
                    file_count = sum(1 for f in check_dir.rglob("*") if f.is_file())
                    logger.info(f"Found {file_count} output files in {check_dir}")

            logger.info(
                f"No-op postprocessing completed for run_id: {model_run.run_id}"
            )

            return {
                "success": True,
                "message": "No postprocessing requested - validation only",
                "run_id": model_run.run_id,
                "output_dir": str(check_dir),
                "validated": validate_outputs,
            }

        except Exception as e:
            logger.exception(f"Error in no-op postprocessor: {e}")
            return {
                "success": False,
                "message": f"Error in postprocessor: {str(e)}",
                "run_id": getattr(model_run, "run_id", "unknown"),
                "error": str(e),
            }
Functions
process
process(model_run, validate_outputs: bool = True, output_dir: Optional[Union[str, Path]] = None, **kwargs) -> Dict[str, Any]

Process the output of a model run (does nothing).

Parameters:

Name Type Description Default
model_run

The ModelRun instance whose outputs to process

required
validate_outputs bool

Whether to validate that output directory exists

True
output_dir Optional[Union[str, Path]]

Override output directory to check (defaults to model_run output)

None
**kwargs

Additional parameters (unused)

{}

Returns:

Type Description
Dict[str, Any]

Dictionary with processing results

Raises:

Type Description
ValueError

If model_run is invalid

Source code in rompy/postprocess/__init__.py
def process(
    self,
    model_run,
    validate_outputs: bool = True,
    output_dir: Optional[Union[str, Path]] = None,
    **kwargs,
) -> Dict[str, Any]:
    """Process the output of a model run (does nothing).

    Args:
        model_run: The ModelRun instance whose outputs to process
        validate_outputs: Whether to validate that output directory exists
        output_dir: Override output directory to check (defaults to model_run output)
        **kwargs: Additional parameters (unused)

    Returns:
        Dictionary with processing results

    Raises:
        ValueError: If model_run is invalid
    """
    # Validate input parameters
    if not model_run:
        raise ValueError("model_run cannot be None")

    if not hasattr(model_run, "run_id"):
        raise ValueError("model_run must have a run_id attribute")

    logger.info(f"Starting no-op postprocessing for run_id: {model_run.run_id}")

    try:
        # Determine output directory
        if output_dir:
            check_dir = Path(output_dir)
        else:
            check_dir = Path(model_run.output_dir) / model_run.run_id

        # Validate outputs if requested
        if validate_outputs:
            if not check_dir.exists():
                logger.warning(f"Output directory does not exist: {check_dir}")
                return {
                    "success": False,
                    "message": f"Output directory not found: {check_dir}",
                    "run_id": model_run.run_id,
                    "output_dir": str(check_dir),
                }
            else:
                # Count files in output directory
                file_count = sum(1 for f in check_dir.rglob("*") if f.is_file())
                logger.info(f"Found {file_count} output files in {check_dir}")

        logger.info(
            f"No-op postprocessing completed for run_id: {model_run.run_id}"
        )

        return {
            "success": True,
            "message": "No postprocessing requested - validation only",
            "run_id": model_run.run_id,
            "output_dir": str(check_dir),
            "validated": validate_outputs,
        }

    except Exception as e:
        logger.exception(f"Error in no-op postprocessor: {e}")
        return {
            "success": False,
            "message": f"Error in postprocessor: {str(e)}",
            "run_id": getattr(model_run, "run_id", "unknown"),
            "error": str(e),
        }

pipeline

Local pipeline backend for model execution.

This module provides the local pipeline backend implementation.

Attributes

logger module-attribute

logger = getLogger(__name__)

Classes

LocalPipelineBackend

Local pipeline backend that executes the full workflow locally.

This backend uses the existing generate(), run() and postprocess() methods to execute the complete pipeline locally.

Source code in rompy/pipeline/__init__.py
class LocalPipelineBackend:
    """Local pipeline backend that executes the full workflow locally.

    This backend uses the existing generate(), run() and postprocess() methods
    to execute the complete pipeline locally.
    """

    def execute(
        self,
        model_run,
        run_backend: str = "local",
        processor: str = "noop",
        run_kwargs: Optional[Dict[str, Any]] = None,
        process_kwargs: Optional[Dict[str, Any]] = None,
        cleanup_on_failure: bool = False,
        validate_stages: bool = True,
        **kwargs,
    ) -> Dict[str, Any]:
        """Execute the model pipeline locally.

        Args:
            model_run: The ModelRun instance to execute
            run_backend: Backend to use for the run stage ("local" or "docker")
            processor: Processor to use for the postprocess stage
            run_kwargs: Additional parameters for the run stage
            process_kwargs: Additional parameters for the postprocess stage
            cleanup_on_failure: Whether to cleanup outputs on pipeline failure
            validate_stages: Whether to validate each stage before proceeding
            **kwargs: Additional parameters (unused)

        Returns:
            Combined results from the pipeline execution

        Raises:
            ValueError: If model_run is invalid or parameters are invalid
        """
        # Validate input parameters
        if not model_run:
            raise ValueError("model_run cannot be None")

        if not hasattr(model_run, "run_id"):
            raise ValueError("model_run must have a run_id attribute")

        if not isinstance(run_backend, str) or not run_backend.strip():
            raise ValueError("run_backend must be a non-empty string")

        if not isinstance(processor, str) or not processor.strip():
            raise ValueError("processor must be a non-empty string")

        # Initialize parameters
        run_kwargs = run_kwargs or {}
        process_kwargs = process_kwargs or {}

        logger.info(f"Starting pipeline execution for run_id: {model_run.run_id}")
        logger.info(
            f"Pipeline configuration: run_backend='{run_backend}', processor='{processor}'"
        )

        pipeline_results = {
            "success": False,
            "run_id": model_run.run_id,
            "stages_completed": [],
            "run_backend": run_backend,
            "processor": processor,
        }

        try:
            # Stage 1: Generate input files
            logger.info(f"Stage 1: Generating input files for {model_run.run_id}")

            try:
                staging_dir = model_run.generate()
                pipeline_results["staging_dir"] = (
                    str(staging_dir) if staging_dir else None
                )
                pipeline_results["stages_completed"].append("generate")
                logger.info(f"Input files generated successfully in: {staging_dir}")
            except Exception as e:
                logger.exception(f"Failed to generate input files: {e}")
                return {
                    **pipeline_results,
                    "stage": "generate",
                    "message": f"Input file generation failed: {str(e)}",
                    "error": str(e),
                }

            # Validate generation stage
            if validate_stages:
                output_dir = Path(model_run.output_dir) / model_run.run_id
                if not output_dir.exists():
                    logger.error(f"Output directory was not created: {output_dir}")
                    return {
                        **pipeline_results,
                        "stage": "generate",
                        "message": f"Output directory not found after generation: {output_dir}",
                    }

            # Stage 2: Run the model
            logger.info(f"Stage 2: Running model using {run_backend} backend")

            try:
                # Create appropriate backend configuration
                backend_config = self._create_backend_config(run_backend, run_kwargs)

                # Pass the generated workspace directory to avoid duplicate generation
                run_success = model_run.run(
                    backend=backend_config, workspace_dir=staging_dir
                )
                pipeline_results["run_success"] = run_success

                if not run_success:
                    logger.error("Model run failed")
                    if cleanup_on_failure:
                        self._cleanup_outputs(model_run)
                    return {
                        **pipeline_results,
                        "stage": "run",
                        "message": "Model run failed",
                    }

                pipeline_results["stages_completed"].append("run")
                logger.info("Model run completed successfully")

            except Exception as e:
                logger.exception(f"Error during model run: {e}")
                if cleanup_on_failure:
                    self._cleanup_outputs(model_run)
                return {
                    **pipeline_results,
                    "stage": "run",
                    "message": f"Model run error: {str(e)}",
                    "error": str(e),
                }

            # Stage 3: Postprocess outputs
            logger.info(f"Stage 3: Postprocessing with {processor}")

            try:
                postprocess_results = model_run.postprocess(
                    processor=processor, **process_kwargs
                )
                pipeline_results["postprocess_results"] = postprocess_results
                pipeline_results["stages_completed"].append("postprocess")

                # Check if postprocessing was successful
                if isinstance(
                    postprocess_results, dict
                ) and not postprocess_results.get("success", True):
                    logger.warning(
                        "Postprocessing reported failure but pipeline will continue"
                    )

                logger.info("Postprocessing completed")

            except Exception as e:
                logger.exception(f"Error during postprocessing: {e}")
                return {
                    **pipeline_results,
                    "stage": "postprocess",
                    "message": f"Postprocessing error: {str(e)}",
                    "error": str(e),
                }

            # Pipeline completed successfully
            pipeline_results["success"] = True
            pipeline_results["message"] = "Pipeline completed successfully"

            logger.info(
                f"Pipeline execution completed successfully for run_id: {model_run.run_id}"
            )
            return pipeline_results

        except Exception as e:
            logger.exception(f"Unexpected error in pipeline execution: {e}")
            if cleanup_on_failure:
                self._cleanup_outputs(model_run)
            return {
                **pipeline_results,
                "stage": "pipeline",
                "message": f"Pipeline error: {str(e)}",
                "error": str(e),
            }

    def _cleanup_outputs(self, model_run) -> None:
        """Clean up output files on pipeline failure.

        Args:
            model_run: The ModelRun instance
        """
        try:
            output_dir = Path(model_run.output_dir) / model_run.run_id
            if output_dir.exists():
                logger.info(f"Cleaning up output directory: {output_dir}")
                import shutil

                shutil.rmtree(output_dir)
                logger.info("Cleanup completed")
        except Exception as e:
            logger.warning(f"Failed to cleanup output directory: {e}")

    def _create_backend_config(self, run_backend: str, run_kwargs: Dict[str, Any]):
        """Create appropriate backend configuration from string name and kwargs.

        Args:
            run_backend: Backend name ("local" or "docker")
            run_kwargs: Additional configuration parameters

        Returns:
            Backend configuration object

        Raises:
            ValueError: If backend name is not supported
        """
        if run_backend == "local":
            # Filter kwargs to only include valid LocalConfig fields
            valid_fields = set(LocalConfig.model_fields.keys())
            filtered_kwargs = {k: v for k, v in run_kwargs.items() if k in valid_fields}
            if filtered_kwargs != run_kwargs:
                invalid_fields = set(run_kwargs.keys()) - valid_fields
                logger.warning(f"Ignoring invalid LocalConfig fields: {invalid_fields}")
            return LocalConfig(**filtered_kwargs)
        elif run_backend == "docker":
            # Filter kwargs to only include valid DockerConfig fields
            valid_fields = set(DockerConfig.model_fields.keys())
            filtered_kwargs = {k: v for k, v in run_kwargs.items() if k in valid_fields}
            if filtered_kwargs != run_kwargs:
                invalid_fields = set(run_kwargs.keys()) - valid_fields
                logger.warning(
                    f"Ignoring invalid DockerConfig fields: {invalid_fields}"
                )
            return DockerConfig(**filtered_kwargs)
        else:
            raise ValueError(
                f"Unsupported backend: {run_backend}. Supported: local, docker"
            )
Functions
execute
execute(model_run, run_backend: str = 'local', processor: str = 'noop', run_kwargs: Optional[Dict[str, Any]] = None, process_kwargs: Optional[Dict[str, Any]] = None, cleanup_on_failure: bool = False, validate_stages: bool = True, **kwargs) -> Dict[str, Any]

Execute the model pipeline locally.

Parameters:

Name Type Description Default
model_run

The ModelRun instance to execute

required
run_backend str

Backend to use for the run stage ("local" or "docker")

'local'
processor str

Processor to use for the postprocess stage

'noop'
run_kwargs Optional[Dict[str, Any]]

Additional parameters for the run stage

None
process_kwargs Optional[Dict[str, Any]]

Additional parameters for the postprocess stage

None
cleanup_on_failure bool

Whether to cleanup outputs on pipeline failure

False
validate_stages bool

Whether to validate each stage before proceeding

True
**kwargs

Additional parameters (unused)

{}

Returns:

Type Description
Dict[str, Any]

Combined results from the pipeline execution

Raises:

Type Description
ValueError

If model_run is invalid or parameters are invalid

Source code in rompy/pipeline/__init__.py
def execute(
    self,
    model_run,
    run_backend: str = "local",
    processor: str = "noop",
    run_kwargs: Optional[Dict[str, Any]] = None,
    process_kwargs: Optional[Dict[str, Any]] = None,
    cleanup_on_failure: bool = False,
    validate_stages: bool = True,
    **kwargs,
) -> Dict[str, Any]:
    """Execute the model pipeline locally.

    Args:
        model_run: The ModelRun instance to execute
        run_backend: Backend to use for the run stage ("local" or "docker")
        processor: Processor to use for the postprocess stage
        run_kwargs: Additional parameters for the run stage
        process_kwargs: Additional parameters for the postprocess stage
        cleanup_on_failure: Whether to cleanup outputs on pipeline failure
        validate_stages: Whether to validate each stage before proceeding
        **kwargs: Additional parameters (unused)

    Returns:
        Combined results from the pipeline execution

    Raises:
        ValueError: If model_run is invalid or parameters are invalid
    """
    # Validate input parameters
    if not model_run:
        raise ValueError("model_run cannot be None")

    if not hasattr(model_run, "run_id"):
        raise ValueError("model_run must have a run_id attribute")

    if not isinstance(run_backend, str) or not run_backend.strip():
        raise ValueError("run_backend must be a non-empty string")

    if not isinstance(processor, str) or not processor.strip():
        raise ValueError("processor must be a non-empty string")

    # Initialize parameters
    run_kwargs = run_kwargs or {}
    process_kwargs = process_kwargs or {}

    logger.info(f"Starting pipeline execution for run_id: {model_run.run_id}")
    logger.info(
        f"Pipeline configuration: run_backend='{run_backend}', processor='{processor}'"
    )

    pipeline_results = {
        "success": False,
        "run_id": model_run.run_id,
        "stages_completed": [],
        "run_backend": run_backend,
        "processor": processor,
    }

    try:
        # Stage 1: Generate input files
        logger.info(f"Stage 1: Generating input files for {model_run.run_id}")

        try:
            staging_dir = model_run.generate()
            pipeline_results["staging_dir"] = (
                str(staging_dir) if staging_dir else None
            )
            pipeline_results["stages_completed"].append("generate")
            logger.info(f"Input files generated successfully in: {staging_dir}")
        except Exception as e:
            logger.exception(f"Failed to generate input files: {e}")
            return {
                **pipeline_results,
                "stage": "generate",
                "message": f"Input file generation failed: {str(e)}",
                "error": str(e),
            }

        # Validate generation stage
        if validate_stages:
            output_dir = Path(model_run.output_dir) / model_run.run_id
            if not output_dir.exists():
                logger.error(f"Output directory was not created: {output_dir}")
                return {
                    **pipeline_results,
                    "stage": "generate",
                    "message": f"Output directory not found after generation: {output_dir}",
                }

        # Stage 2: Run the model
        logger.info(f"Stage 2: Running model using {run_backend} backend")

        try:
            # Create appropriate backend configuration
            backend_config = self._create_backend_config(run_backend, run_kwargs)

            # Pass the generated workspace directory to avoid duplicate generation
            run_success = model_run.run(
                backend=backend_config, workspace_dir=staging_dir
            )
            pipeline_results["run_success"] = run_success

            if not run_success:
                logger.error("Model run failed")
                if cleanup_on_failure:
                    self._cleanup_outputs(model_run)
                return {
                    **pipeline_results,
                    "stage": "run",
                    "message": "Model run failed",
                }

            pipeline_results["stages_completed"].append("run")
            logger.info("Model run completed successfully")

        except Exception as e:
            logger.exception(f"Error during model run: {e}")
            if cleanup_on_failure:
                self._cleanup_outputs(model_run)
            return {
                **pipeline_results,
                "stage": "run",
                "message": f"Model run error: {str(e)}",
                "error": str(e),
            }

        # Stage 3: Postprocess outputs
        logger.info(f"Stage 3: Postprocessing with {processor}")

        try:
            postprocess_results = model_run.postprocess(
                processor=processor, **process_kwargs
            )
            pipeline_results["postprocess_results"] = postprocess_results
            pipeline_results["stages_completed"].append("postprocess")

            # Check if postprocessing was successful
            if isinstance(
                postprocess_results, dict
            ) and not postprocess_results.get("success", True):
                logger.warning(
                    "Postprocessing reported failure but pipeline will continue"
                )

            logger.info("Postprocessing completed")

        except Exception as e:
            logger.exception(f"Error during postprocessing: {e}")
            return {
                **pipeline_results,
                "stage": "postprocess",
                "message": f"Postprocessing error: {str(e)}",
                "error": str(e),
            }

        # Pipeline completed successfully
        pipeline_results["success"] = True
        pipeline_results["message"] = "Pipeline completed successfully"

        logger.info(
            f"Pipeline execution completed successfully for run_id: {model_run.run_id}"
        )
        return pipeline_results

    except Exception as e:
        logger.exception(f"Unexpected error in pipeline execution: {e}")
        if cleanup_on_failure:
            self._cleanup_outputs(model_run)
        return {
            **pipeline_results,
            "stage": "pipeline",
            "message": f"Pipeline error: {str(e)}",
            "error": str(e),
        }

Next Steps