Execution and Output Processing Plugin Architecture#

Rompy features a flexible plugin-based architecture that allows for extensible model execution and output processing. The system uses Python entry points to automatically discover and load backends, making it easy to extend with custom implementations.

Overview#

The plugin architecture is built around three main categories:

  1. Run Backends (rompy.run): Handle model execution in different environments

  2. Postprocessors (rompy.postprocess): Handle model output analysis and transformation

  3. Pipeline Backends (rompy.pipeline): Orchestrate complete model workflows

Each category uses Python entry points for automatic discovery and loading, allowing third-party packages to easily extend rompy’s capabilities.

Run Backends#

Run backends are responsible for executing models in different environments. They all implement a common interface with a run() method.

Built-in Run Backends#

Local Backend#

The local backend executes models directly on the local system:

# Basic local execution
success = model.run(backend="local")

# With custom command
success = model.run(
    backend="local",
    command="./my_model_executable",
    env_vars={"OMP_NUM_THREADS": "4"},
    timeout=3600
)

Docker Backend#

The docker backend executes models inside Docker containers:

# Using pre-built image
success = model.run(
    backend="docker",
    image="rompy/schism:latest",
    executable="/usr/local/bin/schism",
    cpu=4,
    volumes=["./data:/data:ro"],
    env_vars={"MODEL_CONFIG": "production"}
)

# Building from Dockerfile
success = model.run(
    backend="docker",
    dockerfile="./docker/Dockerfile",
    build_args={"MODEL_VERSION": "1.0.0"},
    executable="/usr/local/bin/model",
    mpiexec="mpiexec",
    cpu=8
)

Custom Run Backends#

You can create custom run backends by implementing the run interface:

class CustomRunBackend:
    """Custom run backend example."""

    def run(self, model_run, **kwargs):
        """Execute the model run.

        Args:
            model_run: The ModelRun instance
            **kwargs: Backend-specific parameters

        Returns:
            bool: True if successful, False otherwise
        """
        try:
            # Generate model inputs
            model_run.generate()

            # Custom execution logic here
            return self._execute_custom_logic(model_run, **kwargs)

        except Exception as e:
            logger.exception(f"Custom backend failed: {e}")
            return False

Register custom backends via entry points in pyproject.toml:

[project.entry-points."rompy.run"]
custom = "mypackage.backends:CustomRunBackend"

Postprocessors#

Postprocessors handle analysis and transformation of model outputs. They implement a process() method that returns a dictionary with results.

Built-in Postprocessors#

No-op Processor#

The noop processor provides basic validation without processing:

# Basic validation
results = model.postprocess(processor="noop")

# With custom validation
results = model.postprocess(
    processor="noop",
    validate_outputs=True,
    output_dir="./custom_output"
)

Custom Postprocessors#

Create custom postprocessors by implementing the process interface:

class AnalysisPostprocessor:
    """Custom postprocessor for model analysis."""

    def process(self, model_run, **kwargs):
        """Process model outputs.

        Args:
            model_run: The ModelRun instance
            **kwargs: Processor-specific parameters

        Returns:
            dict: Processing results
        """
        try:
            output_dir = Path(model_run.output_dir) / model_run.run_id

            # Custom analysis logic
            metrics = self._calculate_metrics(output_dir)
            plots = self._generate_plots(output_dir)

            return {
                "success": True,
                "metrics": metrics,
                "plots": plots,
                "message": "Analysis completed successfully"
            }

        except Exception as e:
            return {
                "success": False,
                "error": str(e),
                "message": f"Analysis failed: {e}"
            }

Register via entry points:

[project.entry-points."rompy.postprocess"]
analysis = "mypackage.processors:AnalysisPostprocessor"

Pipeline Backends#

Pipeline backends orchestrate the complete model workflow from input generation through execution to output processing.

Built-in Pipeline Backends#

Local Pipeline#

The local pipeline executes all stages locally:

# Basic pipeline
results = model.pipeline(pipeline_backend="local")

# With custom backends
results = model.pipeline(
    pipeline_backend="local",
    run_backend="docker",
    processor="analysis",
    run_kwargs={"image": "rompy/model:latest", "cpu": 4},
    process_kwargs={"create_plots": True},
    cleanup_on_failure=True
)

Custom Pipeline Backends#

Create custom pipeline backends for distributed or cloud execution:

class CloudPipelineBackend:
    """Pipeline backend for cloud execution."""

    def execute(self, model_run, **kwargs):
        """Execute the complete pipeline.

        Args:
            model_run: The ModelRun instance
            **kwargs: Pipeline-specific parameters

        Returns:
            dict: Pipeline execution results
        """
        results = {
            "success": False,
            "run_id": model_run.run_id,
            "stages_completed": []
        }

        try:
            # Stage 1: Generate inputs
            model_run.generate()
            results["stages_completed"].append("generate")

            # Stage 2: Submit to cloud
            job_id = self._submit_cloud_job(model_run, **kwargs)
            results["job_id"] = job_id
            results["stages_completed"].append("submit")

            # Stage 3: Wait for completion
            self._wait_for_completion(job_id)
            results["stages_completed"].append("execute")

            # Stage 4: Download and process results
            outputs = self._download_results(job_id)
            processed = self._process_outputs(outputs, **kwargs)
            results["outputs"] = processed
            results["stages_completed"].append("postprocess")

            results["success"] = True
            return results

        except Exception as e:
            results["error"] = str(e)
            return results

Best Practices#

Error Handling#

  • Always wrap main logic in try-catch blocks

  • Return appropriate boolean/dict responses

  • Log errors with sufficient detail for debugging

  • Clean up resources on failure when possible

Parameter Validation#

  • Validate required parameters early

  • Provide clear error messages for invalid inputs

  • Use type hints for better IDE support

  • Document all parameters in docstrings

Logging#

  • Use structured logging with appropriate levels

  • Include run_id and context in log messages

  • Log progress for long-running operations

  • Avoid logging sensitive information

Resource Management#

  • Clean up temporary files and directories

  • Handle timeouts gracefully

  • Implement proper cancellation mechanisms

  • Monitor resource usage for long-running processes

Testing#

  • Write unit tests for all backend methods

  • Mock external dependencies (Docker, cloud APIs)

  • Test error conditions and edge cases

  • Include integration tests where appropriate

Examples#

Complete examples demonstrating the plugin architecture can be found in the examples/backends/ directory:

  • 01_basic_local_run.py: Simple local execution

  • 02_docker_run.py: Docker container execution

  • 03_custom_postprocessor.py: Custom output processing

  • 04_complete_workflow.py: End-to-end custom workflow

For interactive examples, see the notebooks/backend_examples.ipynb notebook.

API Reference#

Local execution backend for model runs.

This module provides the local run backend implementation.

class rompy.run.LocalRunBackend[source]#

Execute models locally using the system’s Python interpreter.

This is the simplest backend that just runs the model directly on the local system.

run(model_run, config: LocalConfig, workspace_dir: str | None = None) bool[source]#

Run the model locally.

Parameters:
  • model_run – The ModelRun instance to execute

  • config – LocalConfig instance with execution parameters

  • workspace_dir – Path to the generated workspace directory (if None, will generate)

Returns:

True if execution was successful, False otherwise

Raises:
  • ValueError – If model_run is invalid

  • TimeoutError – If execution exceeds timeout

No-op postprocessor for model outputs.

This module provides a basic postprocessor that does nothing.

class rompy.postprocess.NoopPostprocessor[source]#

A postprocessor that does nothing.

This is a placeholder implementation that simply returns a success message. It’s useful as a base class or for testing.

process(model_run, validate_outputs: bool = True, output_dir: str | Path | None = None, **kwargs) Dict[str, Any][source]#

Process the output of a model run (does nothing).

Parameters:
  • model_run – The ModelRun instance whose outputs to process

  • validate_outputs – Whether to validate that output directory exists

  • output_dir – Override output directory to check (defaults to model_run output)

  • **kwargs – Additional parameters (unused)

Returns:

Dictionary with processing results

Raises:

ValueError – If model_run is invalid

Local pipeline backend for model execution.

This module provides the local pipeline backend implementation.

class rompy.pipeline.LocalPipelineBackend[source]#

Local pipeline backend that executes the full workflow locally.

This backend uses the existing generate(), run() and postprocess() methods to execute the complete pipeline locally.

execute(model_run, run_backend: str = 'local', processor: str = 'noop', run_kwargs: Dict[str, Any] | None = None, process_kwargs: Dict[str, Any] | None = None, cleanup_on_failure: bool = False, validate_stages: bool = True, **kwargs) Dict[str, Any][source]#

Execute the model pipeline locally.

Parameters:
  • model_run – The ModelRun instance to execute

  • run_backend – Backend to use for the run stage (“local” or “docker”)

  • processor – Processor to use for the postprocess stage

  • run_kwargs – Additional parameters for the run stage

  • process_kwargs – Additional parameters for the postprocess stage

  • cleanup_on_failure – Whether to cleanup outputs on pipeline failure

  • validate_stages – Whether to validate each stage before proceeding

  • **kwargs – Additional parameters (unused)

Returns:

Combined results from the pipeline execution

Raises:

ValueError – If model_run is invalid or parameters are invalid