Plugin Architecture
Rompy features a flexible plugin-based architecture that allows for extensible model execution and output processing. The system uses Python entry points to automatically discover and load plugins, making it easy to extend with custom implementations.
Core Plugin Categories
ROMPY implements three main plugin categories using Python entry points:
- Configuration Plugins (
rompy.config): Model-specific configurations. - Data Source Plugins (
rompy.source): Custom data acquisition implementations. - Execution Plugins: Three subcategories:
- Run Backends (
rompy.run): Model execution environments. - Postprocessors (
rompy.postprocess): Output analysis and transformation. - Pipeline Backends (
rompy.pipeline): Workflow orchestration.
- Run Backends (
Dual Selection Pattern
ROMPY uses two distinct selection patterns for different plugin types:
-
Pattern 1: Pydantic Discriminated Union (for configurations)
- Selection: At model instantiation time via a
model_typediscriminator field. - State: The configuration becomes part of the persistent, serializable model state.
- Benefit: Enables reproducible science with full validation.
- Selection: At model instantiation time via a
-
Pattern 2: Runtime String Selection (for backends)
- Selection: At execution time via string parameters.
- State: Allows environment-specific deployment without changing the model configuration.
- Benefit: Supports late binding and optional availability.
Entry Point Registration
All plugins are registered in pyproject.toml:
[project.entry-points."rompy.config"]
swan = "rompy.swan.config:SwanConfig"
schism = "rompy.schism.config:SCHISMConfig"
[project.entry-points."rompy.source"]
file = "rompy.core.source:SourceFile"
intake = "rompy.core.source:SourceIntake"
[project.entry-points."rompy.run"]
local = "rompy.run:LocalRunBackend"
docker = "rompy.run.docker:DockerRunBackend"
For basic plugin usage, please see the Getting Started Guide and Advanced Topics.
Run Backends
Run backends are responsible for executing models in different environments. They all implement a common interface with a run() method.
Built-in Run Backends
Local Backend
The local backend executes models directly on the local system:
# Basic local execution
success = model.run(backend="local")
# With custom command
success = model.run(
backend="local",
command="./my_model_executable",
env_vars={"OMP_NUM_THREADS": "4"},
timeout=3600
)
Docker Backend
The docker backend executes models inside Docker containers:
# Using pre-built image
success = model.run(
backend="docker",
image="rompy/schism:latest",
executable="/usr/local/bin/schism",
cpu=4,
volumes=["./data:/data:ro"],
env_vars={"MODEL_CONFIG": "production"}
)
# Building from Dockerfile
success = model.run(
backend="docker",
dockerfile="./docker/Dockerfile",
build_args={"MODEL_VERSION": "1.0.0"},
executable="/usr/local/bin/model",
mpiexec="mpiexec",
cpu=8
)
Custom Run Backends
You can create custom run backends by implementing the run interface:
class CustomRunBackend:
"""Custom run backend example."""
def run(self, model_run, **kwargs):
"""Execute the model run.
Args:
model_run: The ModelRun instance
**kwargs: Backend-specific parameters
Returns:
bool: True if successful, False otherwise
"""
try:
# Generate model inputs
model_run.generate()
# Custom execution logic here
return self._execute_custom_logic(model_run, **kwargs)
except Exception as e:
logger.exception(f"Custom backend failed: {e}")
return False
Register custom backends via entry points in pyproject.toml:
Postprocessors
Postprocessors handle analysis and transformation of model outputs. They implement a process() method that returns a dictionary with results.
Built-in Postprocessors
No-op Processor
The noop processor provides basic validation without processing:
# Basic validation
results = model.postprocess(processor="noop")
# With custom validation
results = model.postprocess(
processor="noop",
validate_outputs=True,
output_dir="./custom_output"
)
Custom Postprocessors
Create custom postprocessors by implementing the process interface:
class AnalysisPostprocessor:
"""Custom postprocessor for model analysis."""
def process(self, model_run, **kwargs):
"""Process model outputs.
Args:
model_run: The ModelRun instance
**kwargs: Processor-specific parameters
Returns:
dict: Processing results
"""
try:
output_dir = Path(model_run.output_dir) / model_run.run_id
# Custom analysis logic
metrics = self._calculate_metrics(output_dir)
plots = self._generate_plots(output_dir)
return {
"success": True,
"metrics": metrics,
"plots": plots,
"message": "Analysis completed successfully"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"message": f"Analysis failed: {e}"
}
Register via entry points:
Pipeline Backends
Pipeline backends orchestrate the complete model workflow from input generation through execution to output processing.
Built-in Pipeline Backends
Local Pipeline
The local pipeline executes all stages locally:
# Basic pipeline
results = model.pipeline(pipeline_backend="local")
# With custom backends
results = model.pipeline(
pipeline_backend="local",
run_backend="docker",
processor="analysis",
run_kwargs={"image": "rompy/model:latest", "cpu": 4},
process_kwargs={"create_plots": True},
cleanup_on_failure=True
)
Custom Pipeline Backends
Create custom pipeline backends for distributed or cloud execution:
class CloudPipelineBackend:
"""Pipeline backend for cloud execution."""
def execute(self, model_run, **kwargs):
"""Execute the complete pipeline.
Args:
model_run: The ModelRun instance
**kwargs: Pipeline-specific parameters
Returns:
dict: Pipeline execution results
"""
results = {
"success": False,
"run_id": model_run.run_id,
"stages_completed": []
}
try:
# Stage 1: Generate inputs
model_run.generate()
results["stages_completed"].append("generate")
# Stage 2: Submit to cloud
job_id = self._submit_cloud_job(model_run, **kwargs)
results["job_id"] = job_id
results["stages_completed"].append("submit")
# Stage 3: Wait for completion
self._wait_for_completion(job_id)
results["stages_completed"].append("execute")
# Stage 4: Download and process results
outputs = self._download_results(job_id)
processed = self._process_outputs(outputs, **kwargs)
results["outputs"] = processed
results["stages_completed"].append("postprocess")
results["success"] = True
return results
except Exception as e:
results["error"] = str(e)
return results
Best Practices
Error Handling
- Always wrap main logic in try-catch blocks
- Return appropriate boolean/dict responses
- Log errors with sufficient detail for debugging
- Clean up resources on failure when possible
Parameter Validation
- Validate required parameters early
- Provide clear error messages for invalid inputs
- Use type hints for better IDE support
- Document all parameters in docstrings
Logging
- Use structured logging with appropriate levels
- Include run_id and context in log messages
- Log progress for long-running operations
- Avoid logging sensitive information
Resource Management
- Clean up temporary files and directories
- Handle timeouts gracefully
- Implement proper cancellation mechanisms
- Monitor resource usage for long-running processes
Testing
- Write unit tests for all backend methods
- Mock external dependencies (Docker, cloud APIs)
- Test error conditions and edge cases
- Include integration tests where appropriate
API Reference
run
Local execution backend for model runs.
This module provides the local run backend implementation.
Attributes
Classes
LocalRunBackend
Execute models locally using the system's Python interpreter.
This is the simplest backend that just runs the model directly on the local system.
Source code in rompy/run/__init__.py
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 | |
Functions
run
run(model_run, config: LocalConfig, workspace_dir: Optional[str] = None) -> bool
Run the model locally.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model_run
|
The ModelRun instance to execute |
required | |
config
|
LocalConfig
|
LocalConfig instance with execution parameters |
required |
workspace_dir
|
Optional[str]
|
Path to the generated workspace directory (if None, will generate) |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if execution was successful, False otherwise |
Raises:
| Type | Description |
|---|---|
ValueError
|
If model_run is invalid |
TimeoutError
|
If execution exceeds timeout |
Source code in rompy/run/__init__.py
postprocess
No-op postprocessor for model outputs.
This module provides a basic postprocessor that does nothing.
Attributes
Classes
NoopPostprocessor
A postprocessor that does nothing.
This is a placeholder implementation that simply returns a success message. It's useful as a base class or for testing.
Source code in rompy/postprocess/__init__.py
Functions
process
process(model_run, validate_outputs: bool = True, output_dir: Optional[Union[str, Path]] = None, **kwargs) -> Dict[str, Any]
Process the output of a model run (does nothing).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model_run
|
The ModelRun instance whose outputs to process |
required | |
validate_outputs
|
bool
|
Whether to validate that output directory exists |
True
|
output_dir
|
Optional[Union[str, Path]]
|
Override output directory to check (defaults to model_run output) |
None
|
**kwargs
|
Additional parameters (unused) |
{}
|
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dictionary with processing results |
Raises:
| Type | Description |
|---|---|
ValueError
|
If model_run is invalid |
Source code in rompy/postprocess/__init__.py
pipeline
Local pipeline backend for model execution.
This module provides the local pipeline backend implementation.
Attributes
Classes
LocalPipelineBackend
Local pipeline backend that executes the full workflow locally.
This backend uses the existing generate(), run() and postprocess() methods to execute the complete pipeline locally.
Source code in rompy/pipeline/__init__.py
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 | |
Functions
execute
execute(model_run, run_backend: str = 'local', processor: str = 'noop', run_kwargs: Optional[Dict[str, Any]] = None, process_kwargs: Optional[Dict[str, Any]] = None, cleanup_on_failure: bool = False, validate_stages: bool = True, **kwargs) -> Dict[str, Any]
Execute the model pipeline locally.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model_run
|
The ModelRun instance to execute |
required | |
run_backend
|
str
|
Backend to use for the run stage ("local" or "docker") |
'local'
|
processor
|
str
|
Processor to use for the postprocess stage |
'noop'
|
run_kwargs
|
Optional[Dict[str, Any]]
|
Additional parameters for the run stage |
None
|
process_kwargs
|
Optional[Dict[str, Any]]
|
Additional parameters for the postprocess stage |
None
|
cleanup_on_failure
|
bool
|
Whether to cleanup outputs on pipeline failure |
False
|
validate_stages
|
bool
|
Whether to validate each stage before proceeding |
True
|
**kwargs
|
Additional parameters (unused) |
{}
|
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Combined results from the pipeline execution |
Raises:
| Type | Description |
|---|---|
ValueError
|
If model_run is invalid or parameters are invalid |
Source code in rompy/pipeline/__init__.py
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 | |
Next Steps
- Review the Architecture Overview for more details on the overall system design
- Check the Developer Guide for advanced development topics
- Look at the API Reference for detailed class documentation