Execution and Output Processing Plugin Architecture#
Rompy features a flexible plugin-based architecture that allows for extensible model execution and output processing. The system uses Python entry points to automatically discover and load backends, making it easy to extend with custom implementations.
Overview#
The plugin architecture is built around three main categories:
Run Backends (
rompy.run
): Handle model execution in different environmentsPostprocessors (
rompy.postprocess
): Handle model output analysis and transformationPipeline Backends (
rompy.pipeline
): Orchestrate complete model workflows
Each category uses Python entry points for automatic discovery and loading, allowing third-party packages to easily extend rompy’s capabilities.
Run Backends#
Run backends are responsible for executing models in different environments. They all implement a common interface with a run()
method.
Built-in Run Backends#
Local Backend#
The local
backend executes models directly on the local system:
# Basic local execution
success = model.run(backend="local")
# With custom command
success = model.run(
backend="local",
command="./my_model_executable",
env_vars={"OMP_NUM_THREADS": "4"},
timeout=3600
)
Docker Backend#
The docker
backend executes models inside Docker containers:
# Using pre-built image
success = model.run(
backend="docker",
image="rompy/schism:latest",
executable="/usr/local/bin/schism",
cpu=4,
volumes=["./data:/data:ro"],
env_vars={"MODEL_CONFIG": "production"}
)
# Building from Dockerfile
success = model.run(
backend="docker",
dockerfile="./docker/Dockerfile",
build_args={"MODEL_VERSION": "1.0.0"},
executable="/usr/local/bin/model",
mpiexec="mpiexec",
cpu=8
)
Custom Run Backends#
You can create custom run backends by implementing the run interface:
class CustomRunBackend:
"""Custom run backend example."""
def run(self, model_run, **kwargs):
"""Execute the model run.
Args:
model_run: The ModelRun instance
**kwargs: Backend-specific parameters
Returns:
bool: True if successful, False otherwise
"""
try:
# Generate model inputs
model_run.generate()
# Custom execution logic here
return self._execute_custom_logic(model_run, **kwargs)
except Exception as e:
logger.exception(f"Custom backend failed: {e}")
return False
Register custom backends via entry points in pyproject.toml
:
[project.entry-points."rompy.run"]
custom = "mypackage.backends:CustomRunBackend"
Postprocessors#
Postprocessors handle analysis and transformation of model outputs. They implement a process()
method that returns a dictionary with results.
Built-in Postprocessors#
No-op Processor#
The noop
processor provides basic validation without processing:
# Basic validation
results = model.postprocess(processor="noop")
# With custom validation
results = model.postprocess(
processor="noop",
validate_outputs=True,
output_dir="./custom_output"
)
Custom Postprocessors#
Create custom postprocessors by implementing the process interface:
class AnalysisPostprocessor:
"""Custom postprocessor for model analysis."""
def process(self, model_run, **kwargs):
"""Process model outputs.
Args:
model_run: The ModelRun instance
**kwargs: Processor-specific parameters
Returns:
dict: Processing results
"""
try:
output_dir = Path(model_run.output_dir) / model_run.run_id
# Custom analysis logic
metrics = self._calculate_metrics(output_dir)
plots = self._generate_plots(output_dir)
return {
"success": True,
"metrics": metrics,
"plots": plots,
"message": "Analysis completed successfully"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"message": f"Analysis failed: {e}"
}
Register via entry points:
[project.entry-points."rompy.postprocess"]
analysis = "mypackage.processors:AnalysisPostprocessor"
Pipeline Backends#
Pipeline backends orchestrate the complete model workflow from input generation through execution to output processing.
Built-in Pipeline Backends#
Local Pipeline#
The local
pipeline executes all stages locally:
# Basic pipeline
results = model.pipeline(pipeline_backend="local")
# With custom backends
results = model.pipeline(
pipeline_backend="local",
run_backend="docker",
processor="analysis",
run_kwargs={"image": "rompy/model:latest", "cpu": 4},
process_kwargs={"create_plots": True},
cleanup_on_failure=True
)
Custom Pipeline Backends#
Create custom pipeline backends for distributed or cloud execution:
class CloudPipelineBackend:
"""Pipeline backend for cloud execution."""
def execute(self, model_run, **kwargs):
"""Execute the complete pipeline.
Args:
model_run: The ModelRun instance
**kwargs: Pipeline-specific parameters
Returns:
dict: Pipeline execution results
"""
results = {
"success": False,
"run_id": model_run.run_id,
"stages_completed": []
}
try:
# Stage 1: Generate inputs
model_run.generate()
results["stages_completed"].append("generate")
# Stage 2: Submit to cloud
job_id = self._submit_cloud_job(model_run, **kwargs)
results["job_id"] = job_id
results["stages_completed"].append("submit")
# Stage 3: Wait for completion
self._wait_for_completion(job_id)
results["stages_completed"].append("execute")
# Stage 4: Download and process results
outputs = self._download_results(job_id)
processed = self._process_outputs(outputs, **kwargs)
results["outputs"] = processed
results["stages_completed"].append("postprocess")
results["success"] = True
return results
except Exception as e:
results["error"] = str(e)
return results
Best Practices#
Error Handling#
Always wrap main logic in try-catch blocks
Return appropriate boolean/dict responses
Log errors with sufficient detail for debugging
Clean up resources on failure when possible
Parameter Validation#
Validate required parameters early
Provide clear error messages for invalid inputs
Use type hints for better IDE support
Document all parameters in docstrings
Logging#
Use structured logging with appropriate levels
Include run_id and context in log messages
Log progress for long-running operations
Avoid logging sensitive information
Resource Management#
Clean up temporary files and directories
Handle timeouts gracefully
Implement proper cancellation mechanisms
Monitor resource usage for long-running processes
Testing#
Write unit tests for all backend methods
Mock external dependencies (Docker, cloud APIs)
Test error conditions and edge cases
Include integration tests where appropriate
Examples#
Complete examples demonstrating the plugin architecture can be found in the examples/backends/
directory:
01_basic_local_run.py
: Simple local execution02_docker_run.py
: Docker container execution03_custom_postprocessor.py
: Custom output processing04_complete_workflow.py
: End-to-end custom workflow
For interactive examples, see the notebooks/backend_examples.ipynb
notebook.
API Reference#
Local execution backend for model runs.
This module provides the local run backend implementation.
- class rompy.run.LocalRunBackend[source]#
Execute models locally using the system’s Python interpreter.
This is the simplest backend that just runs the model directly on the local system.
- run(model_run, config: LocalConfig, workspace_dir: str | None = None) bool [source]#
Run the model locally.
- Parameters:
model_run – The ModelRun instance to execute
config – LocalConfig instance with execution parameters
workspace_dir – Path to the generated workspace directory (if None, will generate)
- Returns:
True if execution was successful, False otherwise
- Raises:
ValueError – If model_run is invalid
TimeoutError – If execution exceeds timeout
No-op postprocessor for model outputs.
This module provides a basic postprocessor that does nothing.
- class rompy.postprocess.NoopPostprocessor[source]#
A postprocessor that does nothing.
This is a placeholder implementation that simply returns a success message. It’s useful as a base class or for testing.
- process(model_run, validate_outputs: bool = True, output_dir: str | Path | None = None, **kwargs) Dict[str, Any] [source]#
Process the output of a model run (does nothing).
- Parameters:
model_run – The ModelRun instance whose outputs to process
validate_outputs – Whether to validate that output directory exists
output_dir – Override output directory to check (defaults to model_run output)
**kwargs – Additional parameters (unused)
- Returns:
Dictionary with processing results
- Raises:
ValueError – If model_run is invalid
Local pipeline backend for model execution.
This module provides the local pipeline backend implementation.
- class rompy.pipeline.LocalPipelineBackend[source]#
Local pipeline backend that executes the full workflow locally.
This backend uses the existing generate(), run() and postprocess() methods to execute the complete pipeline locally.
- execute(model_run, run_backend: str = 'local', processor: str = 'noop', run_kwargs: Dict[str, Any] | None = None, process_kwargs: Dict[str, Any] | None = None, cleanup_on_failure: bool = False, validate_stages: bool = True, **kwargs) Dict[str, Any] [source]#
Execute the model pipeline locally.
- Parameters:
model_run – The ModelRun instance to execute
run_backend – Backend to use for the run stage (“local” or “docker”)
processor – Processor to use for the postprocess stage
run_kwargs – Additional parameters for the run stage
process_kwargs – Additional parameters for the postprocess stage
cleanup_on_failure – Whether to cleanup outputs on pipeline failure
validate_stages – Whether to validate each stage before proceeding
**kwargs – Additional parameters (unused)
- Returns:
Combined results from the pipeline execution
- Raises:
ValueError – If model_run is invalid or parameters are invalid