Plugin Architecture
Rompy features a flexible plugin-based architecture that allows for extensible model execution and output processing. The system uses Python entry points to automatically discover and load backends, making it easy to extend with custom implementations.
For basic plugin usage, please see the User Guide and Advanced Topics.
Architecture Overview
The plugin architecture is built around three main categories:
- Run Backends (
rompy.run): Handle model execution in different environments
- Postprocessors (
rompy.postprocess): Handle model output analysis and transformation
- Pipeline Backends (
rompy.pipeline): Orchestrate complete model workflows
Each category uses Python entry points for automatic discovery and loading, allowing third-party packages to easily extend rompy's capabilities.
Run Backends
Run backends are responsible for executing models in different environments. They all implement a common interface with a run() method.
Built-in Run Backends
Local Backend
The local backend executes models directly on the local system:
# Basic local execution
success = model.run(backend="local")
# With custom command
success = model.run(
backend="local",
command="./my_model_executable",
env_vars={"OMP_NUM_THREADS": "4"},
timeout=3600
)
Docker Backend
The docker backend executes models inside Docker containers:
# Using pre-built image
success = model.run(
backend="docker",
image="rompy/schism:latest",
executable="/usr/local/bin/schism",
cpu=4,
volumes=["./data:/data:ro"],
env_vars={"MODEL_CONFIG": "production"}
)
# Building from Dockerfile
success = model.run(
backend="docker",
dockerfile="./docker/Dockerfile",
build_args={"MODEL_VERSION": "1.0.0"},
executable="/usr/local/bin/model",
mpiexec="mpiexec",
cpu=8
)
Custom Run Backends
You can create custom run backends by implementing the run interface:
class CustomRunBackend:
"""Custom run backend example."""
def run(self, model_run, **kwargs):
"""Execute the model run.
Args:
model_run: The ModelRun instance
**kwargs: Backend-specific parameters
Returns:
bool: True if successful, False otherwise
"""
try:
# Generate model inputs
model_run.generate()
# Custom execution logic here
return self._execute_custom_logic(model_run, **kwargs)
except Exception as e:
logger.exception(f"Custom backend failed: {e}")
return False
Register custom backends via entry points in pyproject.toml:
[project.entry-points."rompy.run"]
custom = "mypackage.backends:CustomRunBackend"
Postprocessors
Postprocessors handle analysis and transformation of model outputs. They implement a process() method that returns a dictionary with results.
Built-in Postprocessors
No-op Processor
The noop processor provides basic validation without processing:
# Basic validation
results = model.postprocess(processor="noop")
# With custom validation
results = model.postprocess(
processor="noop",
validate_outputs=True,
output_dir="./custom_output"
)
Custom Postprocessors
Create custom postprocessors by implementing the process interface:
class AnalysisPostprocessor:
"""Custom postprocessor for model analysis."""
def process(self, model_run, **kwargs):
"""Process model outputs.
Args:
model_run: The ModelRun instance
**kwargs: Processor-specific parameters
Returns:
dict: Processing results
"""
try:
output_dir = Path(model_run.output_dir) / model_run.run_id
# Custom analysis logic
metrics = self._calculate_metrics(output_dir)
plots = self._generate_plots(output_dir)
return {
"success": True,
"metrics": metrics,
"plots": plots,
"message": "Analysis completed successfully"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"message": f"Analysis failed: {e}"
}
Register via entry points:
[project.entry-points."rompy.postprocess"]
analysis = "mypackage.processors:AnalysisPostprocessor"
Pipeline Backends
Pipeline backends orchestrate the complete model workflow from input generation through execution to output processing.
Built-in Pipeline Backends
Local Pipeline
The local pipeline executes all stages locally:
# Basic pipeline
results = model.pipeline(pipeline_backend="local")
# With custom backends
results = model.pipeline(
pipeline_backend="local",
run_backend="docker",
processor="analysis",
run_kwargs={"image": "rompy/model:latest", "cpu": 4},
process_kwargs={"create_plots": True},
cleanup_on_failure=True
)
Custom Pipeline Backends
Create custom pipeline backends for distributed or cloud execution:
class CloudPipelineBackend:
"""Pipeline backend for cloud execution."""
def execute(self, model_run, **kwargs):
"""Execute the complete pipeline.
Args:
model_run: The ModelRun instance
**kwargs: Pipeline-specific parameters
Returns:
dict: Pipeline execution results
"""
results = {
"success": False,
"run_id": model_run.run_id,
"stages_completed": []
}
try:
# Stage 1: Generate inputs
model_run.generate()
results["stages_completed"].append("generate")
# Stage 2: Submit to cloud
job_id = self._submit_cloud_job(model_run, **kwargs)
results["job_id"] = job_id
results["stages_completed"].append("submit")
# Stage 3: Wait for completion
self._wait_for_completion(job_id)
results["stages_completed"].append("execute")
# Stage 4: Download and process results
outputs = self._download_results(job_id)
processed = self._process_outputs(outputs, **kwargs)
results["outputs"] = processed
results["stages_completed"].append("postprocess")
results["success"] = True
return results
except Exception as e:
results["error"] = str(e)
return results
Best Practices
Error Handling
- Always wrap main logic in try-catch blocks
- Return appropriate boolean/dict responses
- Log errors with sufficient detail for debugging
- Clean up resources on failure when possible
Parameter Validation
- Validate required parameters early
- Provide clear error messages for invalid inputs
- Use type hints for better IDE support
- Document all parameters in docstrings
Logging
- Use structured logging with appropriate levels
- Include run_id and context in log messages
- Log progress for long-running operations
- Avoid logging sensitive information
Resource Management
- Clean up temporary files and directories
- Handle timeouts gracefully
- Implement proper cancellation mechanisms
- Monitor resource usage for long-running processes
Testing
- Write unit tests for all backend methods
- Mock external dependencies (Docker, cloud APIs)
- Test error conditions and edge cases
- Include integration tests where appropriate
API Reference
run
Local execution backend for model runs.
This module provides the local run backend implementation.
Attributes
logger
module-attribute
logger = getLogger(__name__)
Classes
LocalRunBackend
Execute models locally using the system's Python interpreter.
This is the simplest backend that just runs the model directly
on the local system.
Source code in rompy/run/__init__.py
| class LocalRunBackend:
"""Execute models locally using the system's Python interpreter.
This is the simplest backend that just runs the model directly
on the local system.
"""
def run(
self, model_run, config: "LocalConfig", workspace_dir: Optional[str] = None
) -> bool:
"""Run the model locally.
Args:
model_run: The ModelRun instance to execute
config: LocalConfig instance with execution parameters
workspace_dir: Path to the generated workspace directory (if None, will generate)
Returns:
True if execution was successful, False otherwise
Raises:
ValueError: If model_run is invalid
TimeoutError: If execution exceeds timeout
"""
# Validate input parameters
if not model_run:
raise ValueError("model_run cannot be None")
if not hasattr(model_run, "run_id"):
raise ValueError("model_run must have a run_id attribute")
# Use config parameters
exec_command = config.command
exec_working_dir = config.working_dir
exec_env_vars = config.env_vars
exec_timeout = config.timeout
logger.debug(
f"Using LocalConfig: timeout={exec_timeout}, env_vars={list(exec_env_vars.keys())}"
)
logger.info(f"Starting local execution for run_id: {model_run.run_id}")
try:
# Use provided workspace or generate if not provided (for backwards compatibility)
if workspace_dir is None:
logger.warning(
"No workspace_dir provided, generating files (this may cause double generation in pipeline)"
)
staging_dir = model_run.generate()
logger.info(f"Model inputs generated in: {staging_dir}")
else:
logger.info(f"Using provided workspace directory: {workspace_dir}")
staging_dir = workspace_dir
# Set working directory
if exec_working_dir:
work_dir = Path(exec_working_dir)
else:
work_dir = (
Path(staging_dir)
if staging_dir
else Path(model_run.output_dir) / model_run.run_id
)
if not work_dir.exists():
logger.error(f"Working directory does not exist: {work_dir}")
return False
# Prepare environment
env = os.environ.copy()
if exec_env_vars:
env.update(exec_env_vars)
logger.debug(
f"Added environment variables: {list(exec_env_vars.keys())}"
)
# Execute command or config.run()
if exec_command:
success = self._execute_command(
exec_command, work_dir, env, exec_timeout
)
else:
success = self._execute_config_run(model_run, work_dir, env)
if success:
logger.info(
f"Local execution completed successfully for run_id: {model_run.run_id}"
)
else:
logger.error(f"Local execution failed for run_id: {model_run.run_id}")
return success
except TimeoutError:
logger.error(f"Model execution timed out after {exec_timeout} seconds")
raise
except Exception as e:
logger.exception(f"Failed to run model locally: {e}")
return False
def _execute_command(
self, command: str, work_dir: Path, env: Dict[str, str], timeout: Optional[int]
) -> bool:
"""Execute a shell command.
Args:
command: Command to execute
work_dir: Working directory
env: Environment variables
timeout: Execution timeout
Returns:
True if successful, False otherwise
"""
logger.info(f"Executing command: {command}")
logger.debug(f"Working directory: {work_dir}")
try:
result = subprocess.run(
command,
shell=True,
cwd=work_dir,
env=env,
timeout=timeout,
capture_output=True,
text=True,
check=False,
)
# Log output
if result.stdout:
logger.info(f"Command stdout:\n{result.stdout}")
if result.stderr:
if result.returncode == 0:
logger.warning(f"Command stderr:\n{result.stderr}")
else:
logger.error(f"Command stderr:\n{result.stderr}")
if result.returncode == 0:
logger.debug("Command completed successfully")
return True
else:
logger.error(f"Command failed with return code: {result.returncode}")
return False
except subprocess.TimeoutExpired:
logger.error(f"Command timed out after {timeout} seconds")
raise TimeoutError(f"Command execution timed out after {timeout} seconds")
except Exception as e:
logger.exception(f"Error executing command: {e}")
return False
def _execute_config_run(
self, model_run, work_dir: Path, env: Dict[str, str]
) -> bool:
"""Execute the model using config.run() method.
Args:
model_run: The ModelRun instance
work_dir: Working directory
env: Environment variables
Returns:
True if successful, False otherwise
"""
# Check if config has a run method
if not hasattr(model_run.config, "run") or not callable(model_run.config.run):
logger.warning(
"Model config does not have a run method. Nothing to execute."
)
return True
logger.info("Executing model using config.run() method")
try:
# Set working directory in environment for config.run()
original_cwd = os.getcwd()
os.chdir(work_dir)
# Update environment
original_env = {}
for key, value in env.items():
if key in os.environ:
original_env[key] = os.environ[key]
os.environ[key] = value
try:
# Execute the config run method
result = model_run.config.run(model_run)
if isinstance(result, bool):
return result
else:
logger.warning(f"config.run() returned non-boolean value: {result}")
return True
finally:
# Restore original environment and directory
os.chdir(original_cwd)
for key, value in env.items():
if key in original_env:
os.environ[key] = original_env[key]
else:
os.environ.pop(key, None)
except Exception as e:
logger.exception(f"Error in config.run(): {e}")
return False
|
Functions
run
run(model_run, config: LocalConfig, workspace_dir: Optional[str] = None) -> bool
Run the model locally.
Parameters:
| Name |
Type |
Description |
Default |
model_run
|
|
The ModelRun instance to execute
|
required
|
config
|
LocalConfig
|
LocalConfig instance with execution parameters
|
required
|
workspace_dir
|
Optional[str]
|
Path to the generated workspace directory (if None, will generate)
|
None
|
Returns:
| Type |
Description |
bool
|
True if execution was successful, False otherwise
|
Raises:
| Type |
Description |
ValueError
|
|
TimeoutError
|
If execution exceeds timeout
|
Source code in rompy/run/__init__.py
| def run(
self, model_run, config: "LocalConfig", workspace_dir: Optional[str] = None
) -> bool:
"""Run the model locally.
Args:
model_run: The ModelRun instance to execute
config: LocalConfig instance with execution parameters
workspace_dir: Path to the generated workspace directory (if None, will generate)
Returns:
True if execution was successful, False otherwise
Raises:
ValueError: If model_run is invalid
TimeoutError: If execution exceeds timeout
"""
# Validate input parameters
if not model_run:
raise ValueError("model_run cannot be None")
if not hasattr(model_run, "run_id"):
raise ValueError("model_run must have a run_id attribute")
# Use config parameters
exec_command = config.command
exec_working_dir = config.working_dir
exec_env_vars = config.env_vars
exec_timeout = config.timeout
logger.debug(
f"Using LocalConfig: timeout={exec_timeout}, env_vars={list(exec_env_vars.keys())}"
)
logger.info(f"Starting local execution for run_id: {model_run.run_id}")
try:
# Use provided workspace or generate if not provided (for backwards compatibility)
if workspace_dir is None:
logger.warning(
"No workspace_dir provided, generating files (this may cause double generation in pipeline)"
)
staging_dir = model_run.generate()
logger.info(f"Model inputs generated in: {staging_dir}")
else:
logger.info(f"Using provided workspace directory: {workspace_dir}")
staging_dir = workspace_dir
# Set working directory
if exec_working_dir:
work_dir = Path(exec_working_dir)
else:
work_dir = (
Path(staging_dir)
if staging_dir
else Path(model_run.output_dir) / model_run.run_id
)
if not work_dir.exists():
logger.error(f"Working directory does not exist: {work_dir}")
return False
# Prepare environment
env = os.environ.copy()
if exec_env_vars:
env.update(exec_env_vars)
logger.debug(
f"Added environment variables: {list(exec_env_vars.keys())}"
)
# Execute command or config.run()
if exec_command:
success = self._execute_command(
exec_command, work_dir, env, exec_timeout
)
else:
success = self._execute_config_run(model_run, work_dir, env)
if success:
logger.info(
f"Local execution completed successfully for run_id: {model_run.run_id}"
)
else:
logger.error(f"Local execution failed for run_id: {model_run.run_id}")
return success
except TimeoutError:
logger.error(f"Model execution timed out after {exec_timeout} seconds")
raise
except Exception as e:
logger.exception(f"Failed to run model locally: {e}")
return False
|
postprocess
No-op postprocessor for model outputs.
This module provides a basic postprocessor that does nothing.
Attributes
logger
module-attribute
logger = getLogger(__name__)
Classes
NoopPostprocessor
A postprocessor that does nothing.
This is a placeholder implementation that simply returns a success message.
It's useful as a base class or for testing.
Source code in rompy/postprocess/__init__.py
| class NoopPostprocessor:
"""A postprocessor that does nothing.
This is a placeholder implementation that simply returns a success message.
It's useful as a base class or for testing.
"""
def process(
self,
model_run,
validate_outputs: bool = True,
output_dir: Optional[Union[str, Path]] = None,
**kwargs,
) -> Dict[str, Any]:
"""Process the output of a model run (does nothing).
Args:
model_run: The ModelRun instance whose outputs to process
validate_outputs: Whether to validate that output directory exists
output_dir: Override output directory to check (defaults to model_run output)
**kwargs: Additional parameters (unused)
Returns:
Dictionary with processing results
Raises:
ValueError: If model_run is invalid
"""
# Validate input parameters
if not model_run:
raise ValueError("model_run cannot be None")
if not hasattr(model_run, "run_id"):
raise ValueError("model_run must have a run_id attribute")
logger.info(f"Starting no-op postprocessing for run_id: {model_run.run_id}")
try:
# Determine output directory
if output_dir:
check_dir = Path(output_dir)
else:
check_dir = Path(model_run.output_dir) / model_run.run_id
# Validate outputs if requested
if validate_outputs:
if not check_dir.exists():
logger.warning(f"Output directory does not exist: {check_dir}")
return {
"success": False,
"message": f"Output directory not found: {check_dir}",
"run_id": model_run.run_id,
"output_dir": str(check_dir),
}
else:
# Count files in output directory
file_count = sum(1 for f in check_dir.rglob("*") if f.is_file())
logger.info(f"Found {file_count} output files in {check_dir}")
logger.info(
f"No-op postprocessing completed for run_id: {model_run.run_id}"
)
return {
"success": True,
"message": "No postprocessing requested - validation only",
"run_id": model_run.run_id,
"output_dir": str(check_dir),
"validated": validate_outputs,
}
except Exception as e:
logger.exception(f"Error in no-op postprocessor: {e}")
return {
"success": False,
"message": f"Error in postprocessor: {str(e)}",
"run_id": getattr(model_run, "run_id", "unknown"),
"error": str(e),
}
|
Functions
process
process(model_run, validate_outputs: bool = True, output_dir: Optional[Union[str, Path]] = None, **kwargs) -> Dict[str, Any]
Process the output of a model run (does nothing).
Parameters:
| Name |
Type |
Description |
Default |
model_run
|
|
The ModelRun instance whose outputs to process
|
required
|
validate_outputs
|
bool
|
Whether to validate that output directory exists
|
True
|
output_dir
|
Optional[Union[str, Path]]
|
Override output directory to check (defaults to model_run output)
|
None
|
**kwargs
|
|
Additional parameters (unused)
|
{}
|
Returns:
| Type |
Description |
Dict[str, Any]
|
Dictionary with processing results
|
Raises:
| Type |
Description |
ValueError
|
|
Source code in rompy/postprocess/__init__.py
| def process(
self,
model_run,
validate_outputs: bool = True,
output_dir: Optional[Union[str, Path]] = None,
**kwargs,
) -> Dict[str, Any]:
"""Process the output of a model run (does nothing).
Args:
model_run: The ModelRun instance whose outputs to process
validate_outputs: Whether to validate that output directory exists
output_dir: Override output directory to check (defaults to model_run output)
**kwargs: Additional parameters (unused)
Returns:
Dictionary with processing results
Raises:
ValueError: If model_run is invalid
"""
# Validate input parameters
if not model_run:
raise ValueError("model_run cannot be None")
if not hasattr(model_run, "run_id"):
raise ValueError("model_run must have a run_id attribute")
logger.info(f"Starting no-op postprocessing for run_id: {model_run.run_id}")
try:
# Determine output directory
if output_dir:
check_dir = Path(output_dir)
else:
check_dir = Path(model_run.output_dir) / model_run.run_id
# Validate outputs if requested
if validate_outputs:
if not check_dir.exists():
logger.warning(f"Output directory does not exist: {check_dir}")
return {
"success": False,
"message": f"Output directory not found: {check_dir}",
"run_id": model_run.run_id,
"output_dir": str(check_dir),
}
else:
# Count files in output directory
file_count = sum(1 for f in check_dir.rglob("*") if f.is_file())
logger.info(f"Found {file_count} output files in {check_dir}")
logger.info(
f"No-op postprocessing completed for run_id: {model_run.run_id}"
)
return {
"success": True,
"message": "No postprocessing requested - validation only",
"run_id": model_run.run_id,
"output_dir": str(check_dir),
"validated": validate_outputs,
}
except Exception as e:
logger.exception(f"Error in no-op postprocessor: {e}")
return {
"success": False,
"message": f"Error in postprocessor: {str(e)}",
"run_id": getattr(model_run, "run_id", "unknown"),
"error": str(e),
}
|
pipeline
Local pipeline backend for model execution.
This module provides the local pipeline backend implementation.
Attributes
logger
module-attribute
logger = getLogger(__name__)
Classes
LocalPipelineBackend
Local pipeline backend that executes the full workflow locally.
This backend uses the existing generate(), run() and postprocess() methods
to execute the complete pipeline locally.
Source code in rompy/pipeline/__init__.py
| class LocalPipelineBackend:
"""Local pipeline backend that executes the full workflow locally.
This backend uses the existing generate(), run() and postprocess() methods
to execute the complete pipeline locally.
"""
def execute(
self,
model_run,
run_backend: str = "local",
processor: str = "noop",
run_kwargs: Optional[Dict[str, Any]] = None,
process_kwargs: Optional[Dict[str, Any]] = None,
cleanup_on_failure: bool = False,
validate_stages: bool = True,
**kwargs,
) -> Dict[str, Any]:
"""Execute the model pipeline locally.
Args:
model_run: The ModelRun instance to execute
run_backend: Backend to use for the run stage ("local" or "docker")
processor: Processor to use for the postprocess stage
run_kwargs: Additional parameters for the run stage
process_kwargs: Additional parameters for the postprocess stage
cleanup_on_failure: Whether to cleanup outputs on pipeline failure
validate_stages: Whether to validate each stage before proceeding
**kwargs: Additional parameters (unused)
Returns:
Combined results from the pipeline execution
Raises:
ValueError: If model_run is invalid or parameters are invalid
"""
# Validate input parameters
if not model_run:
raise ValueError("model_run cannot be None")
if not hasattr(model_run, "run_id"):
raise ValueError("model_run must have a run_id attribute")
if not isinstance(run_backend, str) or not run_backend.strip():
raise ValueError("run_backend must be a non-empty string")
if not isinstance(processor, str) or not processor.strip():
raise ValueError("processor must be a non-empty string")
# Initialize parameters
run_kwargs = run_kwargs or {}
process_kwargs = process_kwargs or {}
logger.info(f"Starting pipeline execution for run_id: {model_run.run_id}")
logger.info(
f"Pipeline configuration: run_backend='{run_backend}', processor='{processor}'"
)
pipeline_results = {
"success": False,
"run_id": model_run.run_id,
"stages_completed": [],
"run_backend": run_backend,
"processor": processor,
}
try:
# Stage 1: Generate input files
logger.info(f"Stage 1: Generating input files for {model_run.run_id}")
try:
staging_dir = model_run.generate()
pipeline_results["staging_dir"] = (
str(staging_dir) if staging_dir else None
)
pipeline_results["stages_completed"].append("generate")
logger.info(f"Input files generated successfully in: {staging_dir}")
except Exception as e:
logger.exception(f"Failed to generate input files: {e}")
return {
**pipeline_results,
"stage": "generate",
"message": f"Input file generation failed: {str(e)}",
"error": str(e),
}
# Validate generation stage
if validate_stages:
output_dir = Path(model_run.output_dir) / model_run.run_id
if not output_dir.exists():
logger.error(f"Output directory was not created: {output_dir}")
return {
**pipeline_results,
"stage": "generate",
"message": f"Output directory not found after generation: {output_dir}",
}
# Stage 2: Run the model
logger.info(f"Stage 2: Running model using {run_backend} backend")
try:
# Create appropriate backend configuration
backend_config = self._create_backend_config(run_backend, run_kwargs)
# Pass the generated workspace directory to avoid duplicate generation
run_success = model_run.run(
backend=backend_config, workspace_dir=staging_dir
)
pipeline_results["run_success"] = run_success
if not run_success:
logger.error("Model run failed")
if cleanup_on_failure:
self._cleanup_outputs(model_run)
return {
**pipeline_results,
"stage": "run",
"message": "Model run failed",
}
pipeline_results["stages_completed"].append("run")
logger.info("Model run completed successfully")
except Exception as e:
logger.exception(f"Error during model run: {e}")
if cleanup_on_failure:
self._cleanup_outputs(model_run)
return {
**pipeline_results,
"stage": "run",
"message": f"Model run error: {str(e)}",
"error": str(e),
}
# Stage 3: Postprocess outputs
logger.info(f"Stage 3: Postprocessing with {processor}")
try:
postprocess_results = model_run.postprocess(
processor=processor, **process_kwargs
)
pipeline_results["postprocess_results"] = postprocess_results
pipeline_results["stages_completed"].append("postprocess")
# Check if postprocessing was successful
if isinstance(
postprocess_results, dict
) and not postprocess_results.get("success", True):
logger.warning(
"Postprocessing reported failure but pipeline will continue"
)
logger.info("Postprocessing completed")
except Exception as e:
logger.exception(f"Error during postprocessing: {e}")
return {
**pipeline_results,
"stage": "postprocess",
"message": f"Postprocessing error: {str(e)}",
"error": str(e),
}
# Pipeline completed successfully
pipeline_results["success"] = True
pipeline_results["message"] = "Pipeline completed successfully"
logger.info(
f"Pipeline execution completed successfully for run_id: {model_run.run_id}"
)
return pipeline_results
except Exception as e:
logger.exception(f"Unexpected error in pipeline execution: {e}")
if cleanup_on_failure:
self._cleanup_outputs(model_run)
return {
**pipeline_results,
"stage": "pipeline",
"message": f"Pipeline error: {str(e)}",
"error": str(e),
}
def _cleanup_outputs(self, model_run) -> None:
"""Clean up output files on pipeline failure.
Args:
model_run: The ModelRun instance
"""
try:
output_dir = Path(model_run.output_dir) / model_run.run_id
if output_dir.exists():
logger.info(f"Cleaning up output directory: {output_dir}")
import shutil
shutil.rmtree(output_dir)
logger.info("Cleanup completed")
except Exception as e:
logger.warning(f"Failed to cleanup output directory: {e}")
def _create_backend_config(self, run_backend: str, run_kwargs: Dict[str, Any]):
"""Create appropriate backend configuration from string name and kwargs.
Args:
run_backend: Backend name ("local" or "docker")
run_kwargs: Additional configuration parameters
Returns:
Backend configuration object
Raises:
ValueError: If backend name is not supported
"""
if run_backend == "local":
# Filter kwargs to only include valid LocalConfig fields
valid_fields = set(LocalConfig.model_fields.keys())
filtered_kwargs = {k: v for k, v in run_kwargs.items() if k in valid_fields}
if filtered_kwargs != run_kwargs:
invalid_fields = set(run_kwargs.keys()) - valid_fields
logger.warning(f"Ignoring invalid LocalConfig fields: {invalid_fields}")
return LocalConfig(**filtered_kwargs)
elif run_backend == "docker":
# Filter kwargs to only include valid DockerConfig fields
valid_fields = set(DockerConfig.model_fields.keys())
filtered_kwargs = {k: v for k, v in run_kwargs.items() if k in valid_fields}
if filtered_kwargs != run_kwargs:
invalid_fields = set(run_kwargs.keys()) - valid_fields
logger.warning(
f"Ignoring invalid DockerConfig fields: {invalid_fields}"
)
return DockerConfig(**filtered_kwargs)
else:
raise ValueError(
f"Unsupported backend: {run_backend}. Supported: local, docker"
)
|
Functions
execute
execute(model_run, run_backend: str = 'local', processor: str = 'noop', run_kwargs: Optional[Dict[str, Any]] = None, process_kwargs: Optional[Dict[str, Any]] = None, cleanup_on_failure: bool = False, validate_stages: bool = True, **kwargs) -> Dict[str, Any]
Execute the model pipeline locally.
Parameters:
| Name |
Type |
Description |
Default |
model_run
|
|
The ModelRun instance to execute
|
required
|
run_backend
|
str
|
Backend to use for the run stage ("local" or "docker")
|
'local'
|
processor
|
str
|
Processor to use for the postprocess stage
|
'noop'
|
run_kwargs
|
Optional[Dict[str, Any]]
|
Additional parameters for the run stage
|
None
|
process_kwargs
|
Optional[Dict[str, Any]]
|
Additional parameters for the postprocess stage
|
None
|
cleanup_on_failure
|
bool
|
Whether to cleanup outputs on pipeline failure
|
False
|
validate_stages
|
bool
|
Whether to validate each stage before proceeding
|
True
|
**kwargs
|
|
Additional parameters (unused)
|
{}
|
Returns:
| Type |
Description |
Dict[str, Any]
|
Combined results from the pipeline execution
|
Raises:
| Type |
Description |
ValueError
|
If model_run is invalid or parameters are invalid
|
Source code in rompy/pipeline/__init__.py
| def execute(
self,
model_run,
run_backend: str = "local",
processor: str = "noop",
run_kwargs: Optional[Dict[str, Any]] = None,
process_kwargs: Optional[Dict[str, Any]] = None,
cleanup_on_failure: bool = False,
validate_stages: bool = True,
**kwargs,
) -> Dict[str, Any]:
"""Execute the model pipeline locally.
Args:
model_run: The ModelRun instance to execute
run_backend: Backend to use for the run stage ("local" or "docker")
processor: Processor to use for the postprocess stage
run_kwargs: Additional parameters for the run stage
process_kwargs: Additional parameters for the postprocess stage
cleanup_on_failure: Whether to cleanup outputs on pipeline failure
validate_stages: Whether to validate each stage before proceeding
**kwargs: Additional parameters (unused)
Returns:
Combined results from the pipeline execution
Raises:
ValueError: If model_run is invalid or parameters are invalid
"""
# Validate input parameters
if not model_run:
raise ValueError("model_run cannot be None")
if not hasattr(model_run, "run_id"):
raise ValueError("model_run must have a run_id attribute")
if not isinstance(run_backend, str) or not run_backend.strip():
raise ValueError("run_backend must be a non-empty string")
if not isinstance(processor, str) or not processor.strip():
raise ValueError("processor must be a non-empty string")
# Initialize parameters
run_kwargs = run_kwargs or {}
process_kwargs = process_kwargs or {}
logger.info(f"Starting pipeline execution for run_id: {model_run.run_id}")
logger.info(
f"Pipeline configuration: run_backend='{run_backend}', processor='{processor}'"
)
pipeline_results = {
"success": False,
"run_id": model_run.run_id,
"stages_completed": [],
"run_backend": run_backend,
"processor": processor,
}
try:
# Stage 1: Generate input files
logger.info(f"Stage 1: Generating input files for {model_run.run_id}")
try:
staging_dir = model_run.generate()
pipeline_results["staging_dir"] = (
str(staging_dir) if staging_dir else None
)
pipeline_results["stages_completed"].append("generate")
logger.info(f"Input files generated successfully in: {staging_dir}")
except Exception as e:
logger.exception(f"Failed to generate input files: {e}")
return {
**pipeline_results,
"stage": "generate",
"message": f"Input file generation failed: {str(e)}",
"error": str(e),
}
# Validate generation stage
if validate_stages:
output_dir = Path(model_run.output_dir) / model_run.run_id
if not output_dir.exists():
logger.error(f"Output directory was not created: {output_dir}")
return {
**pipeline_results,
"stage": "generate",
"message": f"Output directory not found after generation: {output_dir}",
}
# Stage 2: Run the model
logger.info(f"Stage 2: Running model using {run_backend} backend")
try:
# Create appropriate backend configuration
backend_config = self._create_backend_config(run_backend, run_kwargs)
# Pass the generated workspace directory to avoid duplicate generation
run_success = model_run.run(
backend=backend_config, workspace_dir=staging_dir
)
pipeline_results["run_success"] = run_success
if not run_success:
logger.error("Model run failed")
if cleanup_on_failure:
self._cleanup_outputs(model_run)
return {
**pipeline_results,
"stage": "run",
"message": "Model run failed",
}
pipeline_results["stages_completed"].append("run")
logger.info("Model run completed successfully")
except Exception as e:
logger.exception(f"Error during model run: {e}")
if cleanup_on_failure:
self._cleanup_outputs(model_run)
return {
**pipeline_results,
"stage": "run",
"message": f"Model run error: {str(e)}",
"error": str(e),
}
# Stage 3: Postprocess outputs
logger.info(f"Stage 3: Postprocessing with {processor}")
try:
postprocess_results = model_run.postprocess(
processor=processor, **process_kwargs
)
pipeline_results["postprocess_results"] = postprocess_results
pipeline_results["stages_completed"].append("postprocess")
# Check if postprocessing was successful
if isinstance(
postprocess_results, dict
) and not postprocess_results.get("success", True):
logger.warning(
"Postprocessing reported failure but pipeline will continue"
)
logger.info("Postprocessing completed")
except Exception as e:
logger.exception(f"Error during postprocessing: {e}")
return {
**pipeline_results,
"stage": "postprocess",
"message": f"Postprocessing error: {str(e)}",
"error": str(e),
}
# Pipeline completed successfully
pipeline_results["success"] = True
pipeline_results["message"] = "Pipeline completed successfully"
logger.info(
f"Pipeline execution completed successfully for run_id: {model_run.run_id}"
)
return pipeline_results
except Exception as e:
logger.exception(f"Unexpected error in pipeline execution: {e}")
if cleanup_on_failure:
self._cleanup_outputs(model_run)
return {
**pipeline_results,
"stage": "pipeline",
"message": f"Pipeline error: {str(e)}",
"error": str(e),
}
|
Next Steps