Skip to content

API Reference

This section provides comprehensive documentation for the Rompy API, organized by functional modules. Rompy is structured around a modular architecture that supports various ocean and wave models with consistent interfaces for configuration, data handling, execution, and post-processing.

Core Modules

Model Execution

The main entry point for running models in Rompy:

ModelRun

Bases: RompyBaseModel

A model run.

It is intented to be model agnostic. It deals primarily with how the model is to be run, i.e. the period of the run and where the output is going. The actual configuration of the run is provided by the config object.

Further explanation is given in the rompy.core.Baseconfig docstring.

Source code in rompy/model.py
class ModelRun(RompyBaseModel):
    """A model run.

    It is intented to be model agnostic.
    It deals primarily with how the model is to be run, i.e. the period of the run
    and where the output is going. The actual configuration of the run is
    provided by the config object.

    Further explanation is given in the rompy.core.Baseconfig docstring.
    """

    # Initialize formatting variables in __init__

    model_type: Literal["modelrun"] = Field("modelrun", description="The model type.")
    run_id: str = Field("run_id", description="The run id")
    period: TimeRange = Field(
        TimeRange(
            start=datetime(2020, 2, 21, 4),
            end=datetime(2020, 2, 24, 4),
            interval="15M",
        ),
        description="The time period to run the model",
    )
    output_dir: Path = Field("./simulations", description="The output directory")
    config: Union[CONFIG_TYPES] = Field(
        default_factory=BaseConfig,
        description="The configuration object",
        discriminator="model_type",
    )
    delete_existing: bool = Field(False, description="Delete existing output directory")
    run_id_subdir: bool = Field(
        True, description="Use run_id subdirectory in the output directory"
    )
    _datefmt: str = "%Y%m%d.%H%M%S"
    _staging_dir: Path = None

    @property
    def staging_dir(self):
        """The directory where the model is staged for execution

        returns
        -------
        staging_dir : str
        """

        if self._staging_dir is None:
            self._staging_dir = self._create_staging_dir()
        return self._staging_dir

    def _create_staging_dir(self):
        if self.run_id_subdir:
            odir = Path(self.output_dir) / self.run_id
        else:
            odir = Path(self.output_dir)
        if self.delete_existing and odir.exists():
            shutil.rmtree(odir)
        odir.mkdir(parents=True, exist_ok=True)
        return odir

    @property
    def _generation_medatadata(self):
        return dict(
            _generated_at=str(datetime.now(timezone.utc)),
            _generated_by=os.environ.get("USER"),
            _generated_on=platform.node(),
        )

    def generate(self) -> str:
        """Generate the model input files

        returns
        -------
        staging_dir : str

        """
        # Import formatting utilities
        from rompy.formatting import format_table_row, log_box

        # Format model settings in a structured way
        config_type = type(self.config).__name__
        duration = self.period.end - self.period.start
        formatted_duration = self.period.format_duration(duration)

        # Create table rows for the model run info
        rows = [
            format_table_row("Run ID", str(self.run_id)),
            format_table_row("Model Type", config_type),
            format_table_row("Start Time", self.period.start.isoformat()),
            format_table_row("End Time", self.period.end.isoformat()),
            format_table_row("Duration", formatted_duration),
            format_table_row("Time Interval", str(self.period.interval)),
            format_table_row("Output Directory", str(self.output_dir)),
        ]

        # Add description if available
        if hasattr(self.config, "description") and self.config.description:
            rows.append(format_table_row("Description", self.config.description))

        # Create a formatted table with proper alignment
        formatted_rows = []
        key_lengths = []

        # First pass: collect all valid rows and calculate max key length
        for row in rows:
            try:
                # Split the row by the box-drawing vertical line character
                parts = [p.strip() for p in row.split("┃") if p.strip()]
                if len(parts) >= 2:  # We expect at least key and value parts
                    key = parts[0].strip()
                    value = parts[1].strip() if len(parts) > 1 else ""
                    key_lengths.append(len(key))
                    formatted_rows.append((key, value))
            except Exception as e:
                logger.warning(f"Error processing row '{row}': {e}")

        if not formatted_rows:
            logger.warning("No valid rows found for model run configuration table")
            return self._staging_dir

        max_key_len = max(key_lengths) if key_lengths else 0

        # Format the rows with proper alignment
        aligned_rows = []
        for key, value in formatted_rows:
            aligned_row = f"{key:>{max_key_len}} : {value}"
            aligned_rows.append(aligned_row)

        # Log the box with the model run info
        log_box(title="MODEL RUN CONFIGURATION", logger=logger, add_empty_line=False)

        # Log each row of the content with proper indentation
        for row in aligned_rows:
            logger.info(f"  {row}")

        # Log the bottom of the box
        log_box(
            title=None,
            logger=logger,
            add_empty_line=True,  # Just the bottom border
        )

        # Display detailed configuration info using the new formatting framework
        from rompy.formatting import log_box

        # Create a box with the configuration type as title
        log_box(f"MODEL CONFIGURATION ({config_type})")

        # Use the model's string representation which now uses the new formatting
        try:
            # The __str__ method of RompyBaseModel already handles the formatting
            config_str = str(self.config)
            for line in config_str.split("\n"):
                logger.info(line)
        except Exception as e:
            # If anything goes wrong with config formatting, log the error and minimal info
            logger.info(f"Using {type(self.config).__name__} configuration")
            logger.debug(f"Configuration string formatting error: {str(e)}")

        logger.info("")
        log_box(
            title="STARTING MODEL GENERATION",
            logger=logger,
            add_empty_line=False,
        )
        logger.info(f"Preparing input files in {self.output_dir}")

        # Collect context data
        cc_full = {}
        cc_full["runtime"] = self.model_dump()
        cc_full["runtime"]["staging_dir"] = self.staging_dir
        cc_full["runtime"].update(self._generation_medatadata)
        cc_full["runtime"].update({"_datefmt": self._datefmt})

        # Process configuration
        logger.info("Processing model configuration...")
        if callable(self.config):
            # Run the __call__() method of the config object if it is callable passing
            # the runtime instance, and fill in the context with what is returned
            logger.info("Running configuration callable...")
            cc_full["config"] = self.config(self)
        else:
            # Otherwise just fill in the context with the config instance itself
            logger.info("Using static configuration...")
            cc_full["config"] = self.config

        # Render templates
        logger.info(f"Rendering model templates to {self.output_dir}/{self.run_id}...")
        staging_dir = render(
            cc_full, self.config.template, self.output_dir, self.config.checkout
        )

        logger.info("")
        # Use the log_box utility function
        from rompy.formatting import log_box

        log_box(
            title="MODEL GENERATION COMPLETE",
            logger=logger,
            add_empty_line=False,
        )
        logger.info(f"Model files generated at: {staging_dir}")
        return staging_dir

    def zip(self) -> str:
        """Zip the input files for the model run

        This function zips the input files for the model run and returns the
        name of the zip file. It also cleans up the staging directory leaving
        only the settings.json file that can be used to reproduce the run.

        returns
        -------
        zip_fn : str
        """
        # Use the log_box utility function
        from rompy.formatting import log_box

        log_box(
            title="ARCHIVING MODEL FILES",
            logger=logger,
        )

        # Always remove previous zips
        zip_fn = Path(str(self.staging_dir) + ".zip")
        if zip_fn.exists():
            logger.info(f"Removing existing archive at {zip_fn}")
            zip_fn.unlink()

        # Count files to be archived
        file_count = sum([len(fn) for _, _, fn in os.walk(self.staging_dir)])
        logger.info(f"Archiving {file_count} files from {self.staging_dir}")

        # Create zip archive
        with zf.ZipFile(zip_fn, mode="w", compression=zf.ZIP_DEFLATED) as z:
            for dp, dn, fn in os.walk(self.staging_dir):
                for filename in fn:
                    source_path = os.path.join(dp, filename)
                    rel_path = os.path.relpath(source_path, self.staging_dir)
                    z.write(source_path, rel_path)

        # Clean up staging directory
        logger.info(f"Cleaning up staging directory {self.staging_dir}")
        shutil.rmtree(self.staging_dir)

        from rompy.formatting import log_box

        log_box(
            f"✓ Archive created successfully: {zip_fn}",
            logger=logger,
            add_empty_line=False,
        )
        return zip_fn

    def __call__(self):
        return self.generate()

    def run(self, backend: BackendConfig, workspace_dir: Optional[str] = None) -> bool:
        """
        Run the model using the specified backend configuration.

        This method uses Pydantic configuration objects that provide type safety
        and validation for all backend parameters.

        Args:
            backend: Pydantic configuration object (LocalConfig, DockerConfig, etc.)
            workspace_dir: Path to generated workspace directory (optional)

        Returns:
            True if execution was successful, False otherwise

        Raises:
            TypeError: If backend is not a BackendConfig instance

        Examples:
            from rompy.backends import LocalConfig, DockerConfig

            # Local execution
            model.run(LocalConfig(timeout=3600, command="python run.py"))

            # Docker execution
            model.run(DockerConfig(image="swan:latest", cpu=4, memory="2g"))
        """
        if not isinstance(backend, BaseBackendConfig):
            raise TypeError(
                f"Backend must be a subclass of BaseBackendConfig, "
                f"got {type(backend).__name__}"
            )

        logger.debug(f"Using backend config: {type(backend).__name__}")

        # Get the backend class directly from the configuration
        backend_class = backend.get_backend_class()
        backend_instance = backend_class()

        # Pass the config object and workspace_dir to the backend
        return backend_instance.run(self, config=backend, workspace_dir=workspace_dir)

    def postprocess(self, processor: str = "noop", **kwargs) -> Dict[str, Any]:
        """
        Postprocess the model outputs using the specified processor.

        This method uses entry points to load and execute the appropriate postprocessor.
        Available processors are automatically discovered from the rompy.postprocess entry point group.

        Built-in processors:
        - "noop": A placeholder processor that does nothing but returns success

        Args:
            processor: Name of the postprocessor to use (default: "noop")
            **kwargs: Additional processor-specific parameters

        Returns:
            Dictionary with results from the postprocessing

        Raises:
            ValueError: If the specified processor is not available
        """
        # Get the requested postprocessor class from entry points
        if processor not in POSTPROCESSORS:
            available = list(POSTPROCESSORS.keys())
            raise ValueError(
                f"Unknown postprocessor: {processor}. "
                f"Available processors: {', '.join(available)}"
            )

        # Create an instance and process the outputs
        processor_class = POSTPROCESSORS[processor]
        processor_instance = processor_class()
        return processor_instance.process(self, **kwargs)

    def pipeline(self, pipeline_backend: str = "local", **kwargs) -> Dict[str, Any]:
        """
        Run the complete model pipeline (generate, run, postprocess) using the specified pipeline backend.

        This method executes the entire model workflow from input generation through running
        the model to postprocessing outputs. It uses entry points to load and execute the
        appropriate pipeline backend from the rompy.pipeline entry point group.

        Built-in pipeline backends:
        - "local": Execute the complete pipeline locally using the existing ModelRun methods

        Args:
            pipeline_backend: Name of the pipeline backend to use (default: "local")
            **kwargs: Additional backend-specific parameters. Common parameters include:
                - run_backend: Backend to use for the run stage (for local pipeline)
                - processor: Processor to use for postprocessing (for local pipeline)
                - run_kwargs: Additional parameters for the run stage
                - process_kwargs: Additional parameters for postprocessing

        Returns:
            Dictionary with results from the pipeline execution

        Raises:
            ValueError: If the specified pipeline backend is not available
        """
        # Get the requested pipeline backend class from entry points
        if pipeline_backend not in PIPELINE_BACKENDS:
            available = list(PIPELINE_BACKENDS.keys())
            raise ValueError(
                f"Unknown pipeline backend: {pipeline_backend}. "
                f"Available backends: {', '.join(available)}"
            )

        # Create an instance and execute the pipeline
        backend_class = PIPELINE_BACKENDS[pipeline_backend]
        backend_instance = backend_class()
        return backend_instance.execute(self, **kwargs)

Functions

generate

generate() -> str

Generate the model input files

returns

staging_dir : str

Source code in rompy/model.py
def generate(self) -> str:
    """Generate the model input files

    returns
    -------
    staging_dir : str

    """
    # Import formatting utilities
    from rompy.formatting import format_table_row, log_box

    # Format model settings in a structured way
    config_type = type(self.config).__name__
    duration = self.period.end - self.period.start
    formatted_duration = self.period.format_duration(duration)

    # Create table rows for the model run info
    rows = [
        format_table_row("Run ID", str(self.run_id)),
        format_table_row("Model Type", config_type),
        format_table_row("Start Time", self.period.start.isoformat()),
        format_table_row("End Time", self.period.end.isoformat()),
        format_table_row("Duration", formatted_duration),
        format_table_row("Time Interval", str(self.period.interval)),
        format_table_row("Output Directory", str(self.output_dir)),
    ]

    # Add description if available
    if hasattr(self.config, "description") and self.config.description:
        rows.append(format_table_row("Description", self.config.description))

    # Create a formatted table with proper alignment
    formatted_rows = []
    key_lengths = []

    # First pass: collect all valid rows and calculate max key length
    for row in rows:
        try:
            # Split the row by the box-drawing vertical line character
            parts = [p.strip() for p in row.split("┃") if p.strip()]
            if len(parts) >= 2:  # We expect at least key and value parts
                key = parts[0].strip()
                value = parts[1].strip() if len(parts) > 1 else ""
                key_lengths.append(len(key))
                formatted_rows.append((key, value))
        except Exception as e:
            logger.warning(f"Error processing row '{row}': {e}")

    if not formatted_rows:
        logger.warning("No valid rows found for model run configuration table")
        return self._staging_dir

    max_key_len = max(key_lengths) if key_lengths else 0

    # Format the rows with proper alignment
    aligned_rows = []
    for key, value in formatted_rows:
        aligned_row = f"{key:>{max_key_len}} : {value}"
        aligned_rows.append(aligned_row)

    # Log the box with the model run info
    log_box(title="MODEL RUN CONFIGURATION", logger=logger, add_empty_line=False)

    # Log each row of the content with proper indentation
    for row in aligned_rows:
        logger.info(f"  {row}")

    # Log the bottom of the box
    log_box(
        title=None,
        logger=logger,
        add_empty_line=True,  # Just the bottom border
    )

    # Display detailed configuration info using the new formatting framework
    from rompy.formatting import log_box

    # Create a box with the configuration type as title
    log_box(f"MODEL CONFIGURATION ({config_type})")

    # Use the model's string representation which now uses the new formatting
    try:
        # The __str__ method of RompyBaseModel already handles the formatting
        config_str = str(self.config)
        for line in config_str.split("\n"):
            logger.info(line)
    except Exception as e:
        # If anything goes wrong with config formatting, log the error and minimal info
        logger.info(f"Using {type(self.config).__name__} configuration")
        logger.debug(f"Configuration string formatting error: {str(e)}")

    logger.info("")
    log_box(
        title="STARTING MODEL GENERATION",
        logger=logger,
        add_empty_line=False,
    )
    logger.info(f"Preparing input files in {self.output_dir}")

    # Collect context data
    cc_full = {}
    cc_full["runtime"] = self.model_dump()
    cc_full["runtime"]["staging_dir"] = self.staging_dir
    cc_full["runtime"].update(self._generation_medatadata)
    cc_full["runtime"].update({"_datefmt": self._datefmt})

    # Process configuration
    logger.info("Processing model configuration...")
    if callable(self.config):
        # Run the __call__() method of the config object if it is callable passing
        # the runtime instance, and fill in the context with what is returned
        logger.info("Running configuration callable...")
        cc_full["config"] = self.config(self)
    else:
        # Otherwise just fill in the context with the config instance itself
        logger.info("Using static configuration...")
        cc_full["config"] = self.config

    # Render templates
    logger.info(f"Rendering model templates to {self.output_dir}/{self.run_id}...")
    staging_dir = render(
        cc_full, self.config.template, self.output_dir, self.config.checkout
    )

    logger.info("")
    # Use the log_box utility function
    from rompy.formatting import log_box

    log_box(
        title="MODEL GENERATION COMPLETE",
        logger=logger,
        add_empty_line=False,
    )
    logger.info(f"Model files generated at: {staging_dir}")
    return staging_dir

run

run(backend: BackendConfig, workspace_dir: Optional[str] = None) -> bool

Run the model using the specified backend configuration.

This method uses Pydantic configuration objects that provide type safety and validation for all backend parameters.

Parameters:

Name Type Description Default
backend BackendConfig

Pydantic configuration object (LocalConfig, DockerConfig, etc.)

required
workspace_dir Optional[str]

Path to generated workspace directory (optional)

None

Returns:

Type Description
bool

True if execution was successful, False otherwise

Raises:

Type Description
TypeError

If backend is not a BackendConfig instance

Examples:

from rompy.backends import LocalConfig, DockerConfig

Local execution

model.run(LocalConfig(timeout=3600, command="python run.py"))

Docker execution

model.run(DockerConfig(image="swan:latest", cpu=4, memory="2g"))

Source code in rompy/model.py
def run(self, backend: BackendConfig, workspace_dir: Optional[str] = None) -> bool:
    """
    Run the model using the specified backend configuration.

    This method uses Pydantic configuration objects that provide type safety
    and validation for all backend parameters.

    Args:
        backend: Pydantic configuration object (LocalConfig, DockerConfig, etc.)
        workspace_dir: Path to generated workspace directory (optional)

    Returns:
        True if execution was successful, False otherwise

    Raises:
        TypeError: If backend is not a BackendConfig instance

    Examples:
        from rompy.backends import LocalConfig, DockerConfig

        # Local execution
        model.run(LocalConfig(timeout=3600, command="python run.py"))

        # Docker execution
        model.run(DockerConfig(image="swan:latest", cpu=4, memory="2g"))
    """
    if not isinstance(backend, BaseBackendConfig):
        raise TypeError(
            f"Backend must be a subclass of BaseBackendConfig, "
            f"got {type(backend).__name__}"
        )

    logger.debug(f"Using backend config: {type(backend).__name__}")

    # Get the backend class directly from the configuration
    backend_class = backend.get_backend_class()
    backend_instance = backend_class()

    # Pass the config object and workspace_dir to the backend
    return backend_instance.run(self, config=backend, workspace_dir=workspace_dir)

postprocess

postprocess(processor: str = 'noop', **kwargs) -> Dict[str, Any]

Postprocess the model outputs using the specified processor.

This method uses entry points to load and execute the appropriate postprocessor. Available processors are automatically discovered from the rompy.postprocess entry point group.

Built-in processors: - "noop": A placeholder processor that does nothing but returns success

Parameters:

Name Type Description Default
processor str

Name of the postprocessor to use (default: "noop")

'noop'
**kwargs

Additional processor-specific parameters

{}

Returns:

Type Description
Dict[str, Any]

Dictionary with results from the postprocessing

Raises:

Type Description
ValueError

If the specified processor is not available

Source code in rompy/model.py
def postprocess(self, processor: str = "noop", **kwargs) -> Dict[str, Any]:
    """
    Postprocess the model outputs using the specified processor.

    This method uses entry points to load and execute the appropriate postprocessor.
    Available processors are automatically discovered from the rompy.postprocess entry point group.

    Built-in processors:
    - "noop": A placeholder processor that does nothing but returns success

    Args:
        processor: Name of the postprocessor to use (default: "noop")
        **kwargs: Additional processor-specific parameters

    Returns:
        Dictionary with results from the postprocessing

    Raises:
        ValueError: If the specified processor is not available
    """
    # Get the requested postprocessor class from entry points
    if processor not in POSTPROCESSORS:
        available = list(POSTPROCESSORS.keys())
        raise ValueError(
            f"Unknown postprocessor: {processor}. "
            f"Available processors: {', '.join(available)}"
        )

    # Create an instance and process the outputs
    processor_class = POSTPROCESSORS[processor]
    processor_instance = processor_class()
    return processor_instance.process(self, **kwargs)

Core Configuration

Base configuration classes that provide validation and type safety:

BaseConfig

Bases: RompyBaseModel

Base class for model templates.

The template class provides the object that is used to set up the model configuration. When implemented for a given model, can move along a scale of complexity to suit the application.

In its most basic form, as implemented in this base object, it consists of path to a cookiecutter template with the class providing the context for the {{config}} values in that template. Note that any {{runtime}} values are filled from the ModelRun object.

If the template is a git repo, the checkout parameter can be used to specify a branch or tag and it will be cloned and used.

If the object is callable, it will be colled prior to rendering the template. This mechanism can be used to perform tasks such as fetching exteral data, or providing additional context to the template beyond the arguments provided by the user..

Source code in rompy/core/config.py
class BaseConfig(RompyBaseModel):
    """Base class for model templates.

    The template class provides the object that is used to set up the model configuration.
    When implemented for a given model, can move along a scale of complexity
    to suit the application.

    In its most basic form, as implemented in this base object, it consists of path to a cookiecutter template
    with the class providing the context for the {{config}} values in that template. Note that any
    {{runtime}} values are filled from the ModelRun object.

    If the template is a git repo, the checkout parameter can be used to specify a branch or tag and it
    will be cloned and used.

    If the object is callable, it will be colled prior to rendering the template. This mechanism can be
    used to perform tasks such as fetching exteral data, or providing additional context to the template
    beyond the arguments provided by the user..
    """

    model_type: Literal["base"] = "base"
    template: Optional[str] = Field(
        description="The path to the model template",
        default=DEFAULT_TEMPLATE,
    )
    checkout: Optional[str] = Field(
        description="The git branch to use if the template is a git repo",
        default="main",
    )
    model_config = ConfigDict(extra="allow")

    def __call__(self, *args, **kwargs):
        return self

Time Management

Time range and interval handling:

TimeRange

Bases: BaseModel

A time range object

Examples

from rompy import TimeRange tr = TimeRange(start="2020-01-01", end="2020-01-02") tr TimeRange(start=datetime.datetime(2020, 1, 1, 0, 0), end=datetime.datetime(2020, 1, 2, 0, 0), duration=None, interval=None, include_end=True) tr = TimeRange(start="2020-01-01", duration="1d") tr TimeRange(start=datetime.datetime(2020, 1, 1, 0, 0), end=datetime.datetime(2020, 1, 2, 0, 0), duration=timedelta(days=1), interval=None, include_end=True) tr = TimeRange(start="2020-01-01", duration="1d", interval="1h") tr TimeRange(start=datetime.datetime(2020, 1, 1, 0, 0), end=None, duration=timedelta(days=1), interval=timedelta(hours=1), include_end=True)

Source code in rompy/core/time.py
class TimeRange(BaseModel):
    """
    A time range object

    Examples
    --------
    >>> from rompy import TimeRange
    >>> tr = TimeRange(start="2020-01-01", end="2020-01-02")
    >>> tr
    TimeRange(start=datetime.datetime(2020, 1, 1, 0, 0), end=datetime.datetime(2020, 1, 2, 0, 0), duration=None, interval=None, include_end=True)
    >>> tr = TimeRange(start="2020-01-01", duration="1d")
    >>> tr
    TimeRange(start=datetime.datetime(2020, 1, 1, 0, 0), end=datetime.datetime(2020, 1, 2, 0, 0), duration=timedelta(days=1), interval=None, include_end=True)
    >>> tr = TimeRange(start="2020-01-01", duration="1d", interval="1h")
    >>> tr
    TimeRange(start=datetime.datetime(2020, 1, 1, 0, 0), end=None, duration=timedelta(days=1), interval=timedelta(hours=1), include_end=True)
    """

    start: Optional[datetime] = Field(
        None, description="The start date of the time range", examples=["2020-01-01"]
    )
    end: Optional[datetime] = Field(
        None, description="The end date of the time range", examples=["2020-01-02"]
    )
    duration: Optional[Union[str, timedelta]] = Field(
        None, description="The duration of the time range", examples=["1d"]
    )
    interval: Optional[Union[str, timedelta]] = Field(
        "1h", description="The frequency of the time range", examples=["1h", "'1h'"]
    )
    include_end: bool = Field(
        True, description="Determines if the end date should be included in the range"
    )
    model_config = ConfigDict(validate_default=True)

    @field_validator("interval", "duration", mode="before")
    @classmethod
    def valid_duration_interval(cls, v):
        if v is None:
            return v
        if isinstance(v, timedelta):
            return v
        elif isinstance(v, str):
            try:
                return isodate.parse_duration(v)
            except isodate.ISO8601Error:
                if v[-1] not in time_units:
                    raise ValueError(
                        "Invalid duration unit. Must be one isoformat or one of: h, m, s, d, w, y"
                    )
                time_delta_unit = v[-1]
                time_delta_value = int(v[:-1])
                return timedelta(**{time_units[time_delta_unit]: time_delta_value})

    @field_validator("start", "end", mode="before")
    @classmethod
    def validate_start_end(cls, v):
        if v is None:
            return v
        if isinstance(v, datetime):
            return v
        for fmt in [
            "%Y%m%d",
            "%Y%m%dT%H",
            "%Y%m%dT%H%M",
            "%Y%m%dT%H%M%S",
            "%Y%m%d.%H",
            "%Y%m%d.%H%M",
            "%Y%m%d.%H%M%S",
            "%Y-%m-%d",
            "%Y-%m-%dT%H",
            "%Y-%m-%dT%H%M",
        ]:
            try:
                return datetime.strptime(v, fmt)
            except ValueError:
                continue
        return v

    @model_validator(mode="before")
    @classmethod
    def validate_start_end_duration(cls, data: Any) -> Any:
        start, end, duration = data.get("start"), data.get("end"), data.get("duration")
        if start and not (end or duration):
            raise ValueError("start provided, must provide either end or duration")
        if end and not (start or duration):
            raise ValueError("end provided, must provide either start or duration")
        if duration and not (start or end):
            raise ValueError("duration provided, must provide either start or end")
        if not (start or end or duration):
            raise ValueError("Must provide two of start, end, duration")
        if start and end and duration:
            raise ValueError("Must provide only two of start, end, duration")
        return data

    @model_validator(mode="after")
    def parse_start_end_duration(self) -> "TimeRange":
        if self.start and self.end and not self.duration:
            self.duration = self.end - self.start
        elif self.start and self.duration and not self.end:
            self.end = self.start + self.duration
        elif self.end and self.duration and not self.start:
            self.start = self.end - self.duration
        return self

    model_config = ConfigDict(validate_default=True)

    @model_serializer
    def serialize_model(self) -> dict:
        # This replaces default serialization for the whole model
        # It works for both direct serialization and nested serialization
        result = {}
        for key, value in self.__dict__.items():
            # Skip 'duration' if both start and end are present
            if key == "duration" and self.start and self.end:
                continue
            # Include all other fields
            result[key] = value
        return result

    @property
    def date_range(self) -> list[datetime]:
        if not self.start or not self.end or not self.interval:
            return []
        start, end = self.start, self.end
        step_size = (
            self.interval
            if isinstance(self.interval, timedelta)
            else timedelta(**{time_units[self.interval[-1]]: int(self.interval[:-1])})
        )
        date_range = []
        while start < end:
            date_range.append(start)
            start += step_size
        if self.include_end and date_range and date_range[-1] != end:
            date_range.append(end)
        return date_range

    def contains(self, date: datetime) -> bool:
        return self.start <= date <= self.end

    def contains_range(self, date_range: "TimeRange") -> bool:
        return self.contains(date_range.start) and self.contains(date_range.end)

    def common_times(self, date_range: "TimeRange") -> list[datetime]:
        return [date for date in self.date_range if date_range.contains(date)]

    def format_duration(self, duration: timedelta) -> str:
        """Format a timedelta object as a human-readable string.

        This method formats a timedelta in a way that's suitable for display
        in logs and other output.

        Args:
            duration: The timedelta object to format

        Returns:
            A formatted string representation of the duration
        """
        if not duration:
            return "None"

        days = duration.days
        seconds = duration.seconds
        hours, remainder = divmod(seconds, 3600)
        minutes, seconds = divmod(remainder, 60)

        parts = []
        if days > 0:
            parts.append(f"{days} day{'s' if days != 1 else ''}")
        if hours > 0:
            parts.append(f"{hours} hour{'s' if hours != 1 else ''}")
        if minutes > 0:
            parts.append(f"{minutes} minute{'s' if minutes != 1 else ''}")
        if seconds > 0 or not parts:
            parts.append(f"{seconds} second{'s' if seconds != 1 else ''}")

        return ", ".join(parts)

    def __str__(self):
        return (
            f"\n\tStart: {self.start}\n"
            f"\tEnd: {self.end}\n"
            f"\tDuration: {self.format_duration(self.duration)}\n"
            f"\tInterval: {str(self.interval)}\n"
            f"\tInclude End: {self.include_end}\n"
        )

Attributes

duration class-attribute instance-attribute

duration: Optional[Union[str, timedelta]] = Field(None, description='The duration of the time range', examples=['1d'])

interval class-attribute instance-attribute

interval: Optional[Union[str, timedelta]] = Field('1h', description='The frequency of the time range', examples=['1h', "'1h'"])

Grid Systems

Grid definition and management:

BaseGrid

Bases: RompyBaseModel

Representation of a grid in geographic space.

This is the base class for all Grid objects. The minimum representation of a grid are two NumPy array's representing the vertices or nodes of some structured or unstructured grid, its bounding box and a boundary polygon. No knowledge of the grid connectivity is expected.

Source code in rompy/core/grid.py
class BaseGrid(RompyBaseModel):
    """Representation of a grid in geographic space.

    This is the base class for all Grid objects. The minimum representation of a grid
    are two NumPy array's representing the vertices or nodes of some structured or
    unstructured grid, its bounding box and a boundary polygon. No knowledge of the
    grid connectivity is expected.

    """

    grid_type: Literal["base"] = "base"

    @property
    def x(self) -> np.ndarray:
        raise NotImplementedError

    @property
    def y(self) -> np.ndarray:
        raise NotImplementedError

    @property
    def minx(self) -> float:
        return np.nanmin(self.x)

    @property
    def maxx(self) -> float:
        return np.nanmax(self.x)

    @property
    def miny(self) -> float:
        return np.nanmin(self.y)

    @property
    def maxy(self) -> float:
        return np.nanmax(self.y)

    def bbox(self, buffer=0.0) -> Bbox:
        """Returns a bounding box for the spatial grid

        This function returns a list [ll_x, ll_y, ur_x, ur_y]
        where ll_x, ll_y (ur_x, ur_y) are the lower left (upper right)
        x and y coordinates bounding box of the model domain

        """
        ll_x = self.minx - buffer
        ll_y = self.miny - buffer
        ur_x = self.maxx + buffer
        ur_y = self.maxy + buffer
        bbox = [ll_x, ll_y, ur_x, ur_y]
        return bbox

    def _get_convex_hull(self, tolerance=0.2) -> Polygon:
        xys = list(zip(self.x.flatten(), self.y.flatten()))
        polygon = MultiPoint(xys).convex_hull
        polygon = polygon.simplify(tolerance=tolerance)
        return polygon

    def boundary(self, tolerance=0.2) -> Polygon:
        """Returns the convex hull boundary polygon from the grid.

        Parameters
        ----------
        tolerance: float
            Simplify polygon shape based on maximum distance from original geometry,
            see https://shapely.readthedocs.io/en/stable/manual.html#object.simplify.

        Returns
        -------
        polygon: shapely.Polygon
            See https://shapely.readthedocs.io/en/stable/manual.html#Polygon

        """
        return self._get_convex_hull(tolerance=tolerance)

    def boundary_points(self, spacing=None, tolerance=0.2) -> tuple:
        """Returns array of coordinates from boundary polygon.

        Parameters
        ----------
        tolerance: float
            Simplify polygon shape based on maximum distance from original geometry,
            see https://shapely.readthedocs.io/en/stable/manual.html#object.simplify.
        spacing: float
            If specified, points are returned evenly spaced along the boundary at the
            specified spacing, otherwise all points are returned.

        Returns:
        --------
        points: tuple
            Tuple of x and y coordinates of the boundary points.

        """
        polygon = self.boundary(tolerance=tolerance)
        if spacing is None:
            xpts, ypts = polygon.exterior.coords.xy
        else:
            perimeter = polygon.length
            if perimeter < spacing:
                raise ValueError(f"Spacing = {spacing} > grid perimeter = {perimeter}")
            npts = int(np.ceil(perimeter / spacing))
            points = [polygon.boundary.interpolate(i * spacing) for i in range(npts)]
            xpts = [point.x for point in points]
            ypts = [point.y for point in points]
        return np.array(xpts), np.array(ypts)

    def _figsize(self, x0, x1, y0, y1, fscale):
        xlen = abs(x1 - x0)
        ylen = abs(y1 - y0)
        if xlen >= ylen:
            figsize = (fscale, fscale * ylen / xlen or fscale)
        else:
            figsize = (fscale * xlen / ylen or fscale, fscale)
        return figsize

    def plot(
        self,
        ax=None,
        figsize=None,
        fscale=10,
        buffer=0.1,
        borders=True,
        land=True,
        coastline=True,
    ):
        """Plot the grid"""

        projection = ccrs.PlateCarree()
        transform = ccrs.PlateCarree()

        # Set some plot parameters:
        x0, y0, x1, y1 = self.bbox(buffer=buffer)

        # create figure and plot/map
        if ax is None:
            if figsize is None:
                figsize = self._figsize(x0, x1, y0, y1, fscale)
            fig = plt.figure(figsize=figsize)
            ax = fig.add_subplot(111, projection=projection)
            ax.set_extent([x0, x1, y0, y1], crs=transform)

            if borders:
                ax.add_feature(cfeature.BORDERS)
            if land:
                ax.add_feature(cfeature.LAND)
            if coastline:
                ax.add_feature(cfeature.COASTLINE)
        else:
            fig = ax.figure

        ax.gridlines(
            crs=transform,
            draw_labels=["left", "bottom"],
            linewidth=1,
            color="gray",
            alpha=0.5,
            linestyle="--",
        )

        # Plot the model domain
        bx, by = self.boundary_points()
        poly = plt.Polygon(list(zip(bx, by)), facecolor="r", alpha=0.05)
        ax.add_patch(poly)
        ax.plot(bx, by, lw=2, color="k")
        return fig, ax

    def __repr__(self):
        return f"{self.__class__.__name__}({self.x}, {self.y})"

    def __eq__(self, other):
        return self.model_dump() == other.model_dump()

RegularGrid

Bases: BaseGrid

Regular grid in geographic space.

This object provides an abstract representation of a regular grid in some geographic space.

Source code in rompy/core/grid.py
class RegularGrid(BaseGrid):
    """Regular grid in geographic space.

    This object provides an abstract representation of a regular grid in some
    geographic space.

    """

    grid_type: Literal["regular"] = Field(
        "regular", description="Type of grid, must be 'regular'"
    )
    x0: Optional[float] = Field(
        default=None, description="X coordinate of the grid origin"
    )
    y0: Optional[float] = Field(
        default=None, description="Y coordinate of the grid origin"
    )
    rot: Optional[float] = Field(
        0.0, description="Rotation angle of the grid in degrees"
    )
    dx: Optional[float] = Field(
        default=None, description="Spacing between grid points in the x direction"
    )
    dy: Optional[float] = Field(
        default=None, description="Spacing between grid points in the y direction"
    )
    nx: Optional[int] = Field(
        default=None, description="Number of grid points in the x direction"
    )
    ny: Optional[int] = Field(
        default=None, description="Number of grid points in the y direction"
    )

    @model_validator(mode="after")
    def generate(self) -> "RegularGrid":
        """Generate the grid from the provided parameters."""
        keys = ["x0", "y0", "dx", "dy", "nx", "ny"]
        if None in [getattr(self, key) for key in keys]:
            raise ValueError(f"All of {','.join(keys)} must be provided for REG grid")
        # Ensure x, y 2D coordinates are defined
        return self

    @property
    def x(self) -> np.ndarray:
        x, y = self._gen_reg_cgrid()
        return x

    @property
    def y(self) -> np.ndarray:
        x, y = self._gen_reg_cgrid()
        return y

    def _attrs_from_xy(self):
        """Generate regular grid attributes from x, y coordinates."""
        self.ny, self.nx = self.x.shape
        self.x0 = self.x[0, 0]
        self.y0 = self.y[0, 0]
        self.rot = np.degrees(
            np.arctan2(self.y[0, 1] - self.y0, self.x[0, 1] - self.x0)
        )
        self.dx = np.sqrt((self.x[0, 1] - self.x0) ** 2 + (self.y[0, 1] - self.y0) ** 2)
        self.dy = np.sqrt((self.x[1, 0] - self.x0) ** 2 + (self.y[1, 0] - self.y0) ** 2)

    @property
    def xlen(self):
        return self.dx * (self.nx - 1)

    @property
    def ylen(self):
        return self.dy * (self.ny - 1)

    def _gen_reg_cgrid(self):
        # Grid at origin
        i = np.arange(0.0, self.dx * self.nx, self.dx)
        j = np.arange(0.0, self.dy * self.ny, self.dy)
        ii, jj = np.meshgrid(i, j)

        # Rotation
        alpha = -self.rot * np.pi / 180.0
        R = np.array([[np.cos(alpha), -np.sin(alpha)], [np.sin(alpha), np.cos(alpha)]])
        gg = np.dot(np.vstack([ii.ravel(), jj.ravel()]).T, R)

        # Translation
        x = gg[:, 0] + self.x0
        y = gg[:, 1] + self.y0

        x = np.reshape(x, ii.shape)
        y = np.reshape(y, ii.shape)
        return x, y

    def __eq__(self, other) -> bool:
        return (
            (self.nx == other.nx)
            & (self.ny == other.ny)
            & (self.rot == other.rot)
            & (self.x0 == other.x0)
            & (self.y0 == other.y0)
            & (self.dx == other.dx)
            & (self.dy == other.dy)
        )

    def __repr__(self):
        return f"{self.__class__.__name__}({self.nx}, {self.ny})"

    def __str__(self):
        return f"{self.__class__.__name__}({self.nx}, {self.ny})"

Data Handling

Data source and handling components:

DataBase

Bases: ABC, RompyBaseModel

Base class for data objects.

Source code in rompy/core/data.py
class DataBase(ABC, RompyBaseModel):
    """Base class for data objects."""

    model_type: Literal["base"] = Field(
        default="base",
        description="Model type discriminator",
    )
    id: str = Field(
        default="data", description="Unique identifier for this data source"
    )

    @abstractmethod
    def get(self, destdir: Union[str, Path], *args, **kwargs) -> Path:
        """Abstract method to get the data."""
        pass

Functions

get abstractmethod

get(destdir: Union[str, Path], *args, **kwargs) -> Path

Abstract method to get the data.

Source code in rompy/core/data.py
@abstractmethod
def get(self, destdir: Union[str, Path], *args, **kwargs) -> Path:
    """Abstract method to get the data."""
    pass

SourceBase

Bases: RompyBaseModel, ABC

Abstract base class for a source dataset.

Source code in rompy/core/source.py
class SourceBase(RompyBaseModel, ABC):
    """Abstract base class for a source dataset."""

    model_type: Literal["base_source"] = Field(
        description="Model type discriminator, must be overriden by a subclass",
    )

    @abstractmethod
    def _open(self) -> xr.Dataset:
        """This abstract private method should return a xarray dataset object."""
        pass

    @cached_property
    def coordinates(self) -> xr.Dataset:
        """Return the coordinates of the datasource."""
        return self.open().coords

    def open(self, variables: list = [], filters: Filter = {}, **kwargs) -> xr.Dataset:
        """Return the filtered dataset object.

        Parameters
        ----------
        variables : list, optional
            List of variables to select from the dataset.
        filters : Filter, optional
            Filters to apply to the dataset.

        Notes
        -----
        The kwargs are only a placeholder in case a subclass needs to pass additional
        arguments to the open method.

        """
        ds = self._open()
        if variables:
            try:
                ds = ds[variables]
            except KeyError as e:
                dataset_variables = list(ds.data_vars.keys())
                missing_variables = list(set(variables) - set(dataset_variables))
                raise ValueError(
                    f"Cannot find requested variables in dataset.\n\n"
                    f"Requested variables in the Data object: {variables}\n"
                    f"Available variables in source dataset: {dataset_variables}\n"
                    f"Missing variables: {missing_variables}\n\n"
                    f"Please check:\n"
                    f"1. The variable names in your Data object, make sure you check for default values\n"
                    f"2. The data source contains the expected variables\n"
                    f"3. If using a custom data source, ensure it creates variables with the correct names"
                ) from e
        if filters:
            ds = filters(ds)
        return ds

Boundary Conditions

Boundary condition management:

DataBoundary

Bases: DataGrid

Source code in rompy/core/boundary.py
class DataBoundary(DataGrid):
    model_type: Literal["boundary"] = Field(
        default="data_boundary",
        description="Model type discriminator",
    )
    id: str = Field(description="Unique identifier for this data source")
    spacing: Optional[Union[float, Literal["parent"]]] = Field(
        default=None,
        description=(
            "Spacing between points along the grid boundary to retrieve data for. If "
            "None (default), points are defined from the the actual grid object "
            "passed to the `get` method. If 'parent', the resolution of the parent "
            "dataset is used to define the spacing."
        ),
    )
    sel_method: Literal["sel", "interp"] = Field(
        default="sel",
        description=(
            "Xarray method to use for selecting boundary points from the dataset"
        ),
    )
    sel_method_kwargs: dict = Field(
        default={}, description="Keyword arguments for sel_method"
    )
    crop_data: bool = Field(
        default=True,
        description="Update crop filter from Time object if passed to get method",
    )

    @field_validator("spacing")
    @classmethod
    def spacing_gt_zero(cls, v):
        if v not in (None, "parent") and v <= 0.0:
            raise ValueError("Spacing must be greater than zero")
        return v

    def _source_grid_spacing(self) -> float:
        """Return the lowest grid spacing in the source dataset.

        In a gridded dataset this is defined as the lowest spacing between adjacent
        points in the dataset. In other dataset types such as a station dataset this
        method needs to be overriden to return the lowest spacing between points.

        """
        dx = np.diff(sorted(self.ds[self.coords.x].values)).min()
        dy = np.diff(sorted(self.ds[self.coords.y].values)).min()
        return min(dx, dy)

    def _set_spacing(self) -> float:
        """Define spacing from the parent dataset if required."""
        if self.spacing == "parent":
            return self._source_grid_spacing()
        else:
            return self.spacing

    def _boundary_points(self, grid) -> tuple:
        """Returns the x and y arrays representing the boundary points to select.

        This method can be overriden to define custom boundary points.

        """
        xbnd, ybnd = grid.boundary_points(spacing=self._set_spacing())
        return xbnd, ybnd

    def _sel_boundary(self, grid) -> xr.Dataset:
        """Select the boundary points from the dataset."""
        xbnd, ybnd = self._boundary_points(grid=grid)
        coords = {
            self.coords.x: xr.DataArray(xbnd, dims=("site",)),
            self.coords.y: xr.DataArray(ybnd, dims=("site",)),
        }
        ds = getattr(self.ds, self.sel_method)(coords, **self.sel_method_kwargs)
        # rename the coordinates to x, y
        ds = ds.rename({self.coords.x: "x", self.coords.y: "y"})
        return ds

    def get(
        self, destdir: str | Path, grid: RegularGrid, time: Optional[TimeRange] = None
    ) -> str:
        """Write the selected boundary data to a netcdf file.

        Parameters
        ----------
        destdir : str | Path
            Destination directory for the netcdf file.
        grid : RegularGrid
            Grid instance to use for selecting the boundary points.
        time: TimeRange, optional
            The times to filter the data to, only used if `self.crop_data` is True.

        Returns
        -------
        outfile : Path
            Path to the netcdf file.

        """
        if self.crop_data and time is not None:
            self._filter_time(time)
        ds = self._sel_boundary(grid)
        outfile = Path(destdir) / f"{self.id}.nc"
        ds.to_netcdf(outfile)
        return outfile

    def plot(self, model_grid=None, cmap="turbo", fscale=10, ax=None, **kwargs):
        return scatter_plot(
            self, model_grid=model_grid, cmap=cmap, fscale=fscale, ax=ax, **kwargs
        )

    def plot_boundary(self, grid=None, fscale=10, ax=None, **kwargs):
        """Plot the boundary points on a map."""
        ds = self._sel_boundary(grid)
        fig, ax = grid.plot(ax=ax, fscale=fscale, **kwargs)
        return scatter_plot(
            self,
            ds=ds,
            fscale=fscale,
            ax=ax,
            **kwargs,
        )

Functions

get

get(destdir: str | Path, grid: RegularGrid, time: Optional[TimeRange] = None) -> str

Write the selected boundary data to a netcdf file.

Parameters

destdir : str | Path Destination directory for the netcdf file. grid : RegularGrid Grid instance to use for selecting the boundary points. time: TimeRange, optional The times to filter the data to, only used if self.crop_data is True.

Returns

outfile : Path Path to the netcdf file.

Source code in rompy/core/boundary.py
def get(
    self, destdir: str | Path, grid: RegularGrid, time: Optional[TimeRange] = None
) -> str:
    """Write the selected boundary data to a netcdf file.

    Parameters
    ----------
    destdir : str | Path
        Destination directory for the netcdf file.
    grid : RegularGrid
        Grid instance to use for selecting the boundary points.
    time: TimeRange, optional
        The times to filter the data to, only used if `self.crop_data` is True.

    Returns
    -------
    outfile : Path
        Path to the netcdf file.

    """
    if self.crop_data and time is not None:
        self._filter_time(time)
    ds = self._sel_boundary(grid)
    outfile = Path(destdir) / f"{self.id}.nc"
    ds.to_netcdf(outfile)
    return outfile

BoundaryWaveStation

Bases: DataBoundary

Wave boundary data from station datasets.

Note

The tolerance behaves differently with sel_methods idw and nearest; in idw sites with no enough neighbours within tolerance are masked whereas in nearest an exception is raised (see wavespectra documentation for more details).

Note

Be aware that when using idw missing values will be returned for sites with less than 2 neighbours within tolerance in the original dataset. This is okay for land mask areas but could cause boundary issues when on an open boundary location. To avoid this either use nearest or increase tolerance to include more neighbours.

Source code in rompy/core/boundary.py
class BoundaryWaveStation(DataBoundary):
    """Wave boundary data from station datasets.

    Note
    ----
    The `tolerance` behaves differently with sel_methods `idw` and `nearest`; in `idw`
    sites with no enough neighbours within `tolerance` are masked whereas in `nearest`
    an exception is raised (see wavespectra documentation for more details).

    Note
    ----
    Be aware that when using `idw` missing values will be returned for sites with less
    than 2 neighbours within `tolerance` in the original dataset. This is okay for land
    mask areas but could cause boundary issues when on an open boundary location. To
    avoid this either use `nearest` or increase `tolerance` to include more neighbours.

    """

    grid_type: Literal["boundary_wave_station"] = Field(
        default="boundary_wave_station",
        description="Model type discriminator",
    )
    source: Union[SOURCE_TYPES] = Field(
        description=(
            "Dataset source reader, must return a wavespectra-enabled "
            "xarray dataset in the open method"
        ),
        discriminator="model_type",
    )
    sel_method: Literal["idw", "nearest"] = Field(
        default="idw",
        description=(
            "Wavespectra method to use for selecting boundary points from the dataset"
        ),
    )
    buffer: float = Field(
        default=2.0,
        description="Space to buffer the grid bounding box if `filter_grid` is True",
    )

    def model_post_init(self, __context):
        self.variables = ["efth", "lon", "lat"]

    # @model_validator(mode="after")
    # def assert_has_wavespectra_accessor(self) -> "BoundaryWaveStation":
    #     dset = self.source.open()
    #     if not hasattr(dset, "spec"):
    #         raise ValueError(f"Wavespectra compatible source is required")
    #     return self

    def _source_grid_spacing(self, grid) -> float:
        """Return the lowest spacing between points in the source dataset."""
        # Select dataset points just outside the actual grid to optimise the search
        xbnd, ybnd = grid.boundary().exterior.coords.xy
        dx = np.diff(xbnd).min()
        dy = np.diff(ybnd).min()
        buffer = 2 * min(dx, dy)
        x0, y0, x1, y1 = grid.bbox(buffer=buffer)
        ds = self.ds.spec.sel([x0, x1], [y0, y1], method="bbox")
        # Return the closest distance between adjacent points in cropped dataset
        points = list(zip(ds.lon.values, ds.lat.values))
        return find_minimum_distance(points)

    def _set_spacing(self, grid) -> float:
        """Define spacing from the parent dataset if required."""
        if self.spacing == "parent":
            return self._source_grid_spacing(grid)
        else:
            return self.spacing

    def _boundary_points(self, grid) -> tuple:
        """Returns the x and y arrays representing the boundary points to select.

        Override the default method to use grid when setting the default spacing.

        """
        xbnd, ybnd = grid.boundary_points(spacing=self._set_spacing(grid))
        return xbnd, ybnd

    def _sel_boundary(self, grid) -> xr.Dataset:
        """Select the boundary points from the dataset."""
        xbnd, ybnd = self._boundary_points(grid=grid)
        ds = self.ds.spec.sel(
            lons=xbnd,
            lats=ybnd,
            method=self.sel_method,
            **self.sel_method_kwargs,
        )
        return ds

    @property
    def ds(self):
        """Return the filtered xarray dataset instance."""
        dset = super().ds
        if dset.efth.size == 0:
            raise ValueError(f"Empty dataset after applying filter {self.filter}")
        return dset

    def get(
        self, destdir: str | Path, grid: RegularGrid, time: Optional[TimeRange] = None
    ) -> str:
        """Write the selected boundary data to a netcdf file.

        Parameters
        ----------
        destdir : str | Path
            Destination directory for the netcdf file.
        grid : RegularGrid
            Grid instance to use for selecting the boundary points.
        time: TimeRange, optional
            The times to filter the data to, only used if `self.crop_data` is True.

        Returns
        -------
        outfile : Path
            Path to the netcdf file.

        """
        if self.crop_data:
            if time is not None:
                self._filter_time(time)
            if grid is not None:
                self._filter_grid(grid)
        ds = self._sel_boundary(grid)
        outfile = Path(destdir) / f"{self.id}.nc"
        ds.spec.to_netcdf(outfile)
        return outfile

Functions

get

get(destdir: str | Path, grid: RegularGrid, time: Optional[TimeRange] = None) -> str

Write the selected boundary data to a netcdf file.

Parameters

destdir : str | Path Destination directory for the netcdf file. grid : RegularGrid Grid instance to use for selecting the boundary points. time: TimeRange, optional The times to filter the data to, only used if self.crop_data is True.

Returns

outfile : Path Path to the netcdf file.

Source code in rompy/core/boundary.py
def get(
    self, destdir: str | Path, grid: RegularGrid, time: Optional[TimeRange] = None
) -> str:
    """Write the selected boundary data to a netcdf file.

    Parameters
    ----------
    destdir : str | Path
        Destination directory for the netcdf file.
    grid : RegularGrid
        Grid instance to use for selecting the boundary points.
    time: TimeRange, optional
        The times to filter the data to, only used if `self.crop_data` is True.

    Returns
    -------
    outfile : Path
        Path to the netcdf file.

    """
    if self.crop_data:
        if time is not None:
            self._filter_time(time)
        if grid is not None:
            self._filter_grid(grid)
    ds = self._sel_boundary(grid)
    outfile = Path(destdir) / f"{self.id}.nc"
    ds.spec.to_netcdf(outfile)
    return outfile

Backend Systems

Backend Configuration

Base and specific backend configurations:

BaseBackendConfig

Bases: BaseModel, ABC

Base class for all backend configurations.

This class defines common configuration parameters that apply to all backend types, such as timeouts and environment variables.

Source code in rompy/backends/config.py
class BaseBackendConfig(BaseModel, ABC):
    """Base class for all backend configurations.

    This class defines common configuration parameters that apply to all
    backend types, such as timeouts and environment variables.
    """

    timeout: int = Field(
        3600,
        ge=60,
        le=86400,
        description="Maximum execution time in seconds (1 minute to 24 hours)",
    )

    env_vars: Dict[str, str] = Field(
        default_factory=dict,
        description="Additional environment variables to set during execution",
    )

    working_dir: Optional[Path] = Field(
        None,
        description="Working directory for execution (defaults to model output directory)",
    )

    model_config = ConfigDict(
        validate_assignment=True,
        extra="forbid",  # Don't allow extra fields
        use_enum_values=True,
    )

    @field_validator("working_dir")
    @classmethod
    def validate_working_dir(cls, v):
        """Validate working directory exists if specified."""
        if v is not None:
            path = Path(v)
            if not path.exists():
                raise ValueError(f"Working directory does not exist: {path}")
            if not path.is_dir():
                raise ValueError(f"Working directory is not a directory: {path}")
        return v

    @field_validator("env_vars")
    @classmethod
    def validate_env_vars(cls, v):
        """Validate environment variables."""
        if not isinstance(v, dict):
            raise ValueError("env_vars must be a dictionary")

        for key, value in v.items():
            if not isinstance(key, str) or not isinstance(value, str):
                raise ValueError("Environment variable keys and values must be strings")
            if not key:
                raise ValueError("Environment variable keys cannot be empty")

        return v

    @abstractmethod
    def get_backend_class(self):
        """Return the backend class that should handle this configuration.

        Returns:
            The backend class to use for execution
        """
        pass

Functions

get_backend_class abstractmethod

get_backend_class()

Return the backend class that should handle this configuration.

Returns:

Type Description

The backend class to use for execution

Source code in rompy/backends/config.py
@abstractmethod
def get_backend_class(self):
    """Return the backend class that should handle this configuration.

    Returns:
        The backend class to use for execution
    """
    pass

LocalConfig

Bases: BaseBackendConfig

Configuration for local execution backend.

This configuration is used when running models directly on the local system using the system's Python interpreter or shell commands.

Source code in rompy/backends/config.py
class LocalConfig(BaseBackendConfig):
    """Configuration for local execution backend.

    This configuration is used when running models directly on the local system
    using the system's Python interpreter or shell commands.
    """

    command: Optional[str] = Field(
        None, description="Optional shell command to run instead of config.run()"
    )

    shell: bool = Field(
        True, description="Whether to execute commands through the shell"
    )

    capture_output: bool = Field(
        True, description="Whether to capture stdout and stderr"
    )

    def get_backend_class(self):
        """Return the LocalRunBackend class."""
        from rompy.run import LocalRunBackend

        return LocalRunBackend

    model_config = ConfigDict(
        json_schema_extra={
            "examples": [
                {
                    "timeout": 7200,
                    "env_vars": {"OMP_NUM_THREADS": "4"},
                    "command": "python run_model.py",
                },
                {"timeout": 3600, "working_dir": "/path/to/model/dir"},
            ]
        }
    )

DockerConfig

Bases: BaseBackendConfig

Configuration for Docker execution backend.

This configuration is used when running models inside Docker containers with appropriate resource limits and volume mounts.

Source code in rompy/backends/config.py
class DockerConfig(BaseBackendConfig):
    """Configuration for Docker execution backend.

    This configuration is used when running models inside Docker containers
    with appropriate resource limits and volume mounts.
    """

    image: Optional[str] = Field(
        None,
        description="Docker image to use (required if not building from dockerfile)",
        pattern=r"^[a-zA-Z0-9._:/-]+$",
    )

    dockerfile: Optional[Path] = Field(
        None, description="Path to Dockerfile to build (alternative to image)"
    )

    executable: str = Field(
        "/usr/local/bin/run.sh", description="Path to executable inside the container"
    )

    cpu: int = Field(1, ge=1, le=128, description="Number of CPU cores to allocate")

    memory: Optional[str] = Field(
        None, description="Memory limit (e.g., '2g', '512m')", pattern=r"^\d+[kmgKMG]?$"
    )

    mpiexec: str = Field("", description="MPI execution command (if using MPI)")

    build_args: Dict[str, str] = Field(
        default_factory=dict, description="Arguments to pass to docker build"
    )

    build_context: Optional[Path] = Field(
        None,
        description="Docker build context directory (defaults to dockerfile parent directory)",
    )

    volumes: List[str] = Field(
        default_factory=list,
        description="Additional volumes to mount (format: 'host_path:container_path')",
    )

    remove_container: bool = Field(
        True, description="Whether to remove container after execution"
    )

    user: str = Field("root", description="User to run as inside the container")

    @field_validator("image", "dockerfile", mode="before")
    @classmethod
    def validate_image_or_dockerfile(cls, v, info):
        """Validate that either image or dockerfile is provided, but not both."""
        # Skip validation during individual field validation
        # This will be handled by model_validator
        return v

    @field_validator("dockerfile")
    @classmethod
    def validate_dockerfile_exists(cls, v):
        """Validate dockerfile path format (should be relative)."""
        if v is not None:
            path = Path(v)
            if path.is_absolute():
                raise ValueError(
                    f"Dockerfile path must be relative to build context: {path}"
                )
        return v

    @field_validator("build_context")
    @classmethod
    def validate_build_context_exists(cls, v):
        """Validate build context directory exists if specified."""
        if v is not None:
            path = Path(v)
            if not path.exists():
                raise ValueError(f"Build context directory does not exist: {path}")
            if not path.is_dir():
                raise ValueError(f"Build context must be a directory: {path}")
        return v

    @field_validator("volumes")
    @classmethod
    def validate_volumes(cls, v):
        """Validate volume mount specifications."""
        for volume in v:
            if ":" not in volume:
                raise ValueError(f"Volume mount must contain ':' separator: {volume}")

            parts = volume.split(":")
            if len(parts) < 2:
                raise ValueError(
                    f"Volume mount must have host:container format: {volume}"
                )

            host_path = Path(parts[0])
            if not host_path.exists():
                raise ValueError(f"Host path does not exist: {host_path}")

        return v

    @field_validator("memory")
    @classmethod
    def validate_memory_format(cls, v):
        """Validate memory format."""
        if v is not None:
            import re

            if not re.match(r"^\d+[kmg]?$", v.lower()):
                raise ValueError(
                    "Memory must be in format like '2g', '512m', or '1024' (bytes)"
                )
        return v

    def get_backend_class(self):
        """Return the DockerRunBackend class."""
        from rompy.run.docker import DockerRunBackend

        return DockerRunBackend

    @classmethod
    def __pydantic_init_subclass__(cls, **kwargs):
        """Add model validator for image/dockerfile validation."""
        super().__pydantic_init_subclass__(**kwargs)

    def model_post_init(self, __context) -> None:
        """Validate that either image or dockerfile is provided, but not both."""
        if not self.image and not self.dockerfile:
            raise ValueError("Either 'image' or 'dockerfile' must be provided")

        if self.image and self.dockerfile:
            raise ValueError("Cannot specify both 'image' and 'dockerfile'")

        # Validate dockerfile exists within build context if both are specified
        if self.dockerfile and self.build_context:
            dockerfile_full_path = self.build_context / self.dockerfile
            if not dockerfile_full_path.exists():
                raise ValueError(f"Dockerfile does not exist: {dockerfile_full_path}")
            if not dockerfile_full_path.is_file():
                raise ValueError(f"Dockerfile is not a file: {dockerfile_full_path}")

    model_config = ConfigDict(
        json_schema_extra={
            "examples": [
                {
                    "image": "swan:latest",
                    "cpu": 4,
                    "memory": "2g",
                    "timeout": 7200,
                    "volumes": ["/data:/app/data"],
                },
                {
                    "dockerfile": "./docker/Dockerfile",
                    "cpu": 2,
                    "build_args": {"VERSION": "1.0"},
                    "mpiexec": "mpirun",
                },
            ]
        }
    )

Backend Execution

Backend execution implementations:

DockerRunBackend

Execute models inside Docker containers.

This backend builds Docker images if needed and runs models inside containers with appropriate volume mounts.

Source code in rompy/run/docker.py
class DockerRunBackend:
    """Execute models inside Docker containers.

    This backend builds Docker images if needed and runs models
    inside containers with appropriate volume mounts.
    """

    def run(
        self, model_run, config: "DockerConfig", workspace_dir: Optional[str] = None
    ) -> bool:
        """Run the model inside a Docker container.

        Args:
            model_run: The ModelRun instance to execute
            config: DockerConfig instance with execution parameters
            workspace_dir: Path to the generated workspace directory (if None, will generate)

        Returns:
            True if execution was successful, False otherwise
        """
        # Use config parameters
        exec_image = config.image
        exec_dockerfile = str(config.dockerfile) if config.dockerfile else None
        exec_executable = config.executable
        exec_mpiexec = config.mpiexec
        exec_cpu = config.cpu
        exec_build_args = config.build_args
        exec_volumes = config.volumes
        exec_env_vars = config.env_vars

        logger.debug(f"Using DockerConfig: image={exec_image}, cpu={exec_cpu}")

        # Use provided workspace or generate if not provided (for backwards compatibility)
        if workspace_dir is None:
            logger.warning(
                "No workspace_dir provided, generating files (this may cause double generation in pipeline)"
            )
            workspace_dir = model_run.generate()
        else:
            logger.info(f"Using provided workspace directory: {workspace_dir}")

        try:
            # Build or use a Docker image
            image_name = self._prepare_image(
                exec_image,
                exec_dockerfile,
                str(config.build_context) if config.build_context else None,
                exec_build_args,
            )
            if not image_name:
                return False

            # Set up the run command
            run_command = self._get_run_command(exec_executable, exec_mpiexec, exec_cpu)

            # Set up volume mounts
            volume_mounts = self._prepare_volumes(
                model_run, exec_volumes, workspace_dir
            )

            # Run the Docker container
            success = self._run_container(
                image_name=image_name,
                run_command=run_command,
                volume_mounts=volume_mounts,
                env_vars=exec_env_vars,
            )

            return success
        except Exception as e:
            logger.exception(f"Failed to run model in Docker: {e}")
            return False

    def _prepare_image(
        self,
        image: Optional[str],
        dockerfile: Optional[str],
        build_context: Optional[str] = None,
        build_args: Optional[Dict[str, str]] = None,
    ) -> Optional[str]:
        """Prepare the Docker image to use.

        This will either use a pre-built image or build one from a Dockerfile.

        Args:
            image: Docker image to use
            dockerfile: Path to Dockerfile relative to build context
            build_context: Docker build context directory
            build_args: Arguments to pass to docker build

        Returns:
            Image name to use, or None if preparation failed
        """
        # If image is provided, use it directly
        if image:
            logger.info(f"Using provided Docker image: {image}")
            return image

        # If Dockerfile is provided, build the image
        if dockerfile:
            # Determine build context
            if build_context:
                context_path = pathlib.Path(build_context)
                # dockerfile is relative to build_context
                dockerfile_path = context_path / dockerfile
            else:
                # dockerfile is absolute path, use its parent as context
                dockerfile_path = pathlib.Path(dockerfile)
                context_path = dockerfile_path.parent

            # Generate deterministic image name based on content
            image_name = self._generate_image_name(
                dockerfile_path, context_path, build_args
            )

            # Check if image already exists
            if self._image_exists(image_name):
                logger.info(f"Using existing Docker image: {image_name}")
                return image_name

            # Build the Docker image using docker-py
            logger.info(
                f"Building Docker image {image_name} from {dockerfile} (context: {context_path})"
            )

            try:
                client = docker.from_env()
                image_obj, build_logs = client.images.build(
                    path=str(context_path),
                    dockerfile=str(dockerfile_path.relative_to(context_path)),
                    tag=image_name,
                    buildargs=build_args or {},
                    rm=True,
                )

                # Log build output
                for line in build_logs:
                    if "stream" in line:
                        logger.debug(line["stream"].strip())

                logger.info(f"Successfully built Docker image: {image_name}")
                return image_name
            except BuildError as e:
                logger.error(f"Docker build failed: {e.msg}")
                for line in e.build_log:
                    if "error" in line:
                        logger.error(f"Build error: {line['error']}")
                return None
            except APIError as e:
                logger.error(f"Docker API error during build: {e}")
                return None

        # If neither is provided, use a default image
        logger.warning("No image or Dockerfile provided, using default rompy image")
        return "rompy/rompy:latest"

    def _get_run_command(self, executable: str, mpiexec: str, cpu: int) -> str:
        """Create the run command to execute inside the container.

        Args:
            executable: Path to the executable
            mpiexec: MPI execution command
            cpu: Number of CPU cores

        Returns:
            Command string to execute
        """
        # Add diagnostic commands to list directory contents and verify input file
        diagnostics = (
            "cd /app/run_id && "
            "echo 'Directory contents:' && "
            "ls -la && "
            "echo 'Executing model...'"
        )

        if mpiexec:
            # Add --allow-run-as-root flag for MPI inside Docker containers
            return (
                f"{diagnostics} && {mpiexec} --allow-run-as-root -n {cpu} {executable}"
            )
        return f"{diagnostics} && {executable}"

    def _prepare_volumes(
        self,
        model_run,
        additional_volumes: Optional[List[str]] = None,
        workspace_dir: Optional[str] = None,
    ) -> List[str]:
        """Prepare volume mounts for the Docker container.

        Args:
            model_run: The ModelRun instance
            additional_volumes: Additional volumes to mount
            workspace_dir: Path to the workspace directory to mount

        Returns:
            List of volume mount specifications
        """
        # Mount the workspace directory (generated files) into the container
        # Add :Z for SELinux contexts and proper permissions
        if workspace_dir:
            workspace_path = pathlib.Path(workspace_dir)
            volumes = [f"{workspace_path.absolute()}:/app/run_id:Z"]
        else:
            # Fallback to run directory for backwards compatibility
            run_dir = model_run.output_dir / model_run.run_id
            volumes = [f"{run_dir.absolute()}:/app/run_id:Z"]

        # Add any additional volumes
        if additional_volumes:
            volumes.extend(additional_volumes)

        return volumes

    def _run_container(
        self,
        image_name: str,
        run_command: str,
        volume_mounts: List[str],
        env_vars: Dict[str, str],
    ) -> bool:
        """Run the Docker container with the given configuration.

        Args:
            image_name: Docker image to use
            run_command: Command to run inside the container
            volume_mounts: Volume mounts to set up
            env_vars: Environment variables to pass to the container

        Returns:
            True if execution was successful, False otherwise
        """
        try:
            client = docker.from_env()

            # Convert volume mounts to docker-py format
            volumes = {}
            for volume in volume_mounts:
                parts = volume.split(":")
                if len(parts) >= 2:
                    host_path, container_path = parts[0], parts[1]
                    mode = "rw"  # default mode
                    if len(parts) > 2:
                        mode = parts[2] if parts[2] in ["ro", "rw", "Z"] else "rw"
                    volumes[host_path] = {"bind": container_path, "mode": mode}

            # Prepare container configuration
            container_config = {
                "image": image_name,
                "command": ["bash", "-c", run_command],
                "environment": env_vars,
                "volumes": volumes,
                "user": "root",
                "remove": True,  # Remove container after run
                "stdout": True,
                "stderr": True,
            }

            logger.info(f"Running Docker container with image: {image_name}")
            logger.debug(f"Command: {run_command}")
            logger.debug(f"Volumes: {volumes}")
            logger.debug(f"Environment: {env_vars}")

            # Run the container
            container = client.containers.run(**container_config)

            # Log output
            if container:
                logger.info("Model run completed successfully")
                return True
            else:
                logger.error("Model run failed - no output from container")
                return False

        except ContainerError as e:
            logger.error(f"Container error: {e}")
            if e.stderr:
                logger.error(f"Container stderr: {e.stderr}")
            return False
        except ImageNotFound:
            logger.error(f"Docker image not found: {image_name}")
            return False
        except APIError as e:
            logger.error(f"Docker API error: {e}")
            return False
        except Exception as e:
            logger.error(f"Docker run error: {str(e)}")
            return False

    def _generate_image_name(
        self,
        dockerfile_path: pathlib.Path,
        context_path: pathlib.Path,
        build_args: Optional[Dict[str, str]] = None,
    ) -> str:
        """Generate a deterministic image name based on Dockerfile content and build args.

        Args:
            dockerfile_path: Path to the Dockerfile
            context_path: Path to the build context
            build_args: Build arguments that affect the image

        Returns:
            Deterministic image name
        """
        # Create a hash based on:
        # 1. Dockerfile content
        # 2. Build arguments
        # 3. Build context path (affects COPY/ADD operations)
        hasher = hashlib.sha256()

        # Hash Dockerfile content
        try:
            with open(dockerfile_path, "rb") as f:
                hasher.update(f.read())
        except Exception as e:
            logger.warning(f"Could not read Dockerfile for hashing: {e}")
            # Fallback to timestamp-based naming
            return f"rompy-{int(time.time())}"

        # Hash build arguments (sorted for consistency)
        if build_args:
            build_args_str = json.dumps(build_args, sort_keys=True)
            hasher.update(build_args_str.encode())

        # Hash context path (affects relative paths in Dockerfile)
        hasher.update(str(context_path.absolute()).encode())

        # Generate short hash for image name
        image_hash = hasher.hexdigest()[:12]
        return f"rompy-{image_hash}"

    def _image_exists(self, image_name: str) -> bool:
        """Check if a Docker image already exists locally.

        Args:
            image_name: Name of the image to check

        Returns:
            True if image exists, False otherwise
        """
        try:
            client = docker.from_env()
            client.images.get(image_name)
            logger.debug(f"Image {image_name} already exists")
            return True
        except ImageNotFound:
            logger.debug(f"Image {image_name} does not exist")
            return False
        except APIError as e:
            logger.error(f"Error checking for image {image_name}: {e}")
            return False

Functions

run

run(model_run, config: DockerConfig, workspace_dir: Optional[str] = None) -> bool

Run the model inside a Docker container.

Parameters:

Name Type Description Default
model_run

The ModelRun instance to execute

required
config DockerConfig

DockerConfig instance with execution parameters

required
workspace_dir Optional[str]

Path to the generated workspace directory (if None, will generate)

None

Returns:

Type Description
bool

True if execution was successful, False otherwise

Source code in rompy/run/docker.py
def run(
    self, model_run, config: "DockerConfig", workspace_dir: Optional[str] = None
) -> bool:
    """Run the model inside a Docker container.

    Args:
        model_run: The ModelRun instance to execute
        config: DockerConfig instance with execution parameters
        workspace_dir: Path to the generated workspace directory (if None, will generate)

    Returns:
        True if execution was successful, False otherwise
    """
    # Use config parameters
    exec_image = config.image
    exec_dockerfile = str(config.dockerfile) if config.dockerfile else None
    exec_executable = config.executable
    exec_mpiexec = config.mpiexec
    exec_cpu = config.cpu
    exec_build_args = config.build_args
    exec_volumes = config.volumes
    exec_env_vars = config.env_vars

    logger.debug(f"Using DockerConfig: image={exec_image}, cpu={exec_cpu}")

    # Use provided workspace or generate if not provided (for backwards compatibility)
    if workspace_dir is None:
        logger.warning(
            "No workspace_dir provided, generating files (this may cause double generation in pipeline)"
        )
        workspace_dir = model_run.generate()
    else:
        logger.info(f"Using provided workspace directory: {workspace_dir}")

    try:
        # Build or use a Docker image
        image_name = self._prepare_image(
            exec_image,
            exec_dockerfile,
            str(config.build_context) if config.build_context else None,
            exec_build_args,
        )
        if not image_name:
            return False

        # Set up the run command
        run_command = self._get_run_command(exec_executable, exec_mpiexec, exec_cpu)

        # Set up volume mounts
        volume_mounts = self._prepare_volumes(
            model_run, exec_volumes, workspace_dir
        )

        # Run the Docker container
        success = self._run_container(
            image_name=image_name,
            run_command=run_command,
            volume_mounts=volume_mounts,
            env_vars=exec_env_vars,
        )

        return success
    except Exception as e:
        logger.exception(f"Failed to run model in Docker: {e}")
        return False

LocalRunBackend

Execute models locally using the system's Python interpreter.

This is the simplest backend that just runs the model directly on the local system.

Source code in rompy/run/__init__.py
class LocalRunBackend:
    """Execute models locally using the system's Python interpreter.

    This is the simplest backend that just runs the model directly
    on the local system.
    """

    def run(
        self, model_run, config: "LocalConfig", workspace_dir: Optional[str] = None
    ) -> bool:
        """Run the model locally.

        Args:
            model_run: The ModelRun instance to execute
            config: LocalConfig instance with execution parameters
            workspace_dir: Path to the generated workspace directory (if None, will generate)

        Returns:
            True if execution was successful, False otherwise

        Raises:
            ValueError: If model_run is invalid
            TimeoutError: If execution exceeds timeout
        """
        # Validate input parameters
        if not model_run:
            raise ValueError("model_run cannot be None")

        if not hasattr(model_run, "run_id"):
            raise ValueError("model_run must have a run_id attribute")

        # Use config parameters
        exec_command = config.command
        exec_working_dir = config.working_dir
        exec_env_vars = config.env_vars
        exec_timeout = config.timeout

        logger.debug(
            f"Using LocalConfig: timeout={exec_timeout}, env_vars={list(exec_env_vars.keys())}"
        )

        logger.info(f"Starting local execution for run_id: {model_run.run_id}")

        try:
            # Use provided workspace or generate if not provided (for backwards compatibility)
            if workspace_dir is None:
                logger.warning(
                    "No workspace_dir provided, generating files (this may cause double generation in pipeline)"
                )
                staging_dir = model_run.generate()
                logger.info(f"Model inputs generated in: {staging_dir}")
            else:
                logger.info(f"Using provided workspace directory: {workspace_dir}")
                staging_dir = workspace_dir

            # Set working directory
            if exec_working_dir:
                work_dir = Path(exec_working_dir)
            else:
                work_dir = (
                    Path(staging_dir)
                    if staging_dir
                    else Path(model_run.output_dir) / model_run.run_id
                )

            if not work_dir.exists():
                logger.error(f"Working directory does not exist: {work_dir}")
                return False

            # Prepare environment
            env = os.environ.copy()
            if exec_env_vars:
                env.update(exec_env_vars)
                logger.debug(
                    f"Added environment variables: {list(exec_env_vars.keys())}"
                )

            # Execute command or config.run()
            if exec_command:
                success = self._execute_command(
                    exec_command, work_dir, env, exec_timeout
                )
            else:
                success = self._execute_config_run(model_run, work_dir, env)

            if success:
                logger.info(
                    f"Local execution completed successfully for run_id: {model_run.run_id}"
                )
            else:
                logger.error(f"Local execution failed for run_id: {model_run.run_id}")

            return success

        except TimeoutError:
            logger.error(f"Model execution timed out after {exec_timeout} seconds")
            raise
        except Exception as e:
            logger.exception(f"Failed to run model locally: {e}")
            return False

    def _execute_command(
        self, command: str, work_dir: Path, env: Dict[str, str], timeout: Optional[int]
    ) -> bool:
        """Execute a shell command.

        Args:
            command: Command to execute
            work_dir: Working directory
            env: Environment variables
            timeout: Execution timeout

        Returns:
            True if successful, False otherwise
        """
        logger.info(f"Executing command: {command}")
        logger.debug(f"Working directory: {work_dir}")

        try:
            result = subprocess.run(
                command,
                shell=True,
                cwd=work_dir,
                env=env,
                timeout=timeout,
                capture_output=True,
                text=True,
                check=False,
            )

            # Log output
            if result.stdout:
                logger.info(f"Command stdout:\n{result.stdout}")
            if result.stderr:
                if result.returncode == 0:
                    logger.warning(f"Command stderr:\n{result.stderr}")
                else:
                    logger.error(f"Command stderr:\n{result.stderr}")

            if result.returncode == 0:
                logger.debug("Command completed successfully")
                return True
            else:
                logger.error(f"Command failed with return code: {result.returncode}")
                return False

        except subprocess.TimeoutExpired:
            logger.error(f"Command timed out after {timeout} seconds")
            raise TimeoutError(f"Command execution timed out after {timeout} seconds")
        except Exception as e:
            logger.exception(f"Error executing command: {e}")
            return False

    def _execute_config_run(
        self, model_run, work_dir: Path, env: Dict[str, str]
    ) -> bool:
        """Execute the model using config.run() method.

        Args:
            model_run: The ModelRun instance
            work_dir: Working directory
            env: Environment variables

        Returns:
            True if successful, False otherwise
        """
        # Check if config has a run method
        if not hasattr(model_run.config, "run") or not callable(model_run.config.run):
            logger.warning(
                "Model config does not have a run method. Nothing to execute."
            )
            return True

        logger.info("Executing model using config.run() method")

        try:
            # Set working directory in environment for config.run()
            original_cwd = os.getcwd()
            os.chdir(work_dir)

            # Update environment
            original_env = {}
            for key, value in env.items():
                if key in os.environ:
                    original_env[key] = os.environ[key]
                os.environ[key] = value

            try:
                # Execute the config run method
                result = model_run.config.run(model_run)

                if isinstance(result, bool):
                    return result
                else:
                    logger.warning(f"config.run() returned non-boolean value: {result}")
                    return True

            finally:
                # Restore original environment and directory
                os.chdir(original_cwd)
                for key, value in env.items():
                    if key in original_env:
                        os.environ[key] = original_env[key]
                    else:
                        os.environ.pop(key, None)

        except Exception as e:
            logger.exception(f"Error in config.run(): {e}")
            return False

Functions

run

run(model_run, config: LocalConfig, workspace_dir: Optional[str] = None) -> bool

Run the model locally.

Parameters:

Name Type Description Default
model_run

The ModelRun instance to execute

required
config LocalConfig

LocalConfig instance with execution parameters

required
workspace_dir Optional[str]

Path to the generated workspace directory (if None, will generate)

None

Returns:

Type Description
bool

True if execution was successful, False otherwise

Raises:

Type Description
ValueError

If model_run is invalid

TimeoutError

If execution exceeds timeout

Source code in rompy/run/__init__.py
def run(
    self, model_run, config: "LocalConfig", workspace_dir: Optional[str] = None
) -> bool:
    """Run the model locally.

    Args:
        model_run: The ModelRun instance to execute
        config: LocalConfig instance with execution parameters
        workspace_dir: Path to the generated workspace directory (if None, will generate)

    Returns:
        True if execution was successful, False otherwise

    Raises:
        ValueError: If model_run is invalid
        TimeoutError: If execution exceeds timeout
    """
    # Validate input parameters
    if not model_run:
        raise ValueError("model_run cannot be None")

    if not hasattr(model_run, "run_id"):
        raise ValueError("model_run must have a run_id attribute")

    # Use config parameters
    exec_command = config.command
    exec_working_dir = config.working_dir
    exec_env_vars = config.env_vars
    exec_timeout = config.timeout

    logger.debug(
        f"Using LocalConfig: timeout={exec_timeout}, env_vars={list(exec_env_vars.keys())}"
    )

    logger.info(f"Starting local execution for run_id: {model_run.run_id}")

    try:
        # Use provided workspace or generate if not provided (for backwards compatibility)
        if workspace_dir is None:
            logger.warning(
                "No workspace_dir provided, generating files (this may cause double generation in pipeline)"
            )
            staging_dir = model_run.generate()
            logger.info(f"Model inputs generated in: {staging_dir}")
        else:
            logger.info(f"Using provided workspace directory: {workspace_dir}")
            staging_dir = workspace_dir

        # Set working directory
        if exec_working_dir:
            work_dir = Path(exec_working_dir)
        else:
            work_dir = (
                Path(staging_dir)
                if staging_dir
                else Path(model_run.output_dir) / model_run.run_id
            )

        if not work_dir.exists():
            logger.error(f"Working directory does not exist: {work_dir}")
            return False

        # Prepare environment
        env = os.environ.copy()
        if exec_env_vars:
            env.update(exec_env_vars)
            logger.debug(
                f"Added environment variables: {list(exec_env_vars.keys())}"
            )

        # Execute command or config.run()
        if exec_command:
            success = self._execute_command(
                exec_command, work_dir, env, exec_timeout
            )
        else:
            success = self._execute_config_run(model_run, work_dir, env)

        if success:
            logger.info(
                f"Local execution completed successfully for run_id: {model_run.run_id}"
            )
        else:
            logger.error(f"Local execution failed for run_id: {model_run.run_id}")

        return success

    except TimeoutError:
        logger.error(f"Model execution timed out after {exec_timeout} seconds")
        raise
    except Exception as e:
        logger.exception(f"Failed to run model locally: {e}")
        return False

CLI Interface

Command-line interface components:

cli

ROMPY Command Line Interface

This module provides the command-line interface for ROMPY.

Functions

main

main()

Entry point for the rompy CLI.

This function is used by the console script entry point.

Source code in rompy/cli.py
def main():
    """Entry point for the rompy CLI.

    This function is used by the console script entry point.
    """
    cli()

run

run(config, backend_config, dry_run, verbose, log_dir, show_warnings, ascii_only, simple_logs, config_from_env)

Run a model configuration using Pydantic backend configuration.

Examples:

Run with local backend configuration

rompy run config.yml --backend-config unified_local_single.yml

Run with Docker backend configuration

rompy run config.yml --backend-config unified_docker_single.yml

Run with config from environment variable

rompy run --config-from-env --backend-config unified_local_single.yml

Source code in rompy/cli.py
@cli.command()
@click.argument("config", type=click.Path(exists=True), required=False)
@click.option(
    "--backend-config",
    type=click.Path(exists=True),
    required=True,
    help="YAML/JSON file with backend configuration",
)
@click.option("--dry-run", is_flag=True, help="Generate inputs only, don't run")
@add_common_options
def run(
    config,
    backend_config,
    dry_run,
    verbose,
    log_dir,
    show_warnings,
    ascii_only,
    simple_logs,
    config_from_env,
):
    """Run a model configuration using Pydantic backend configuration.

    Examples:
        # Run with local backend configuration
        rompy run config.yml --backend-config unified_local_single.yml

        # Run with Docker backend configuration
        rompy run config.yml --backend-config unified_docker_single.yml

        # Run with config from environment variable
        rompy run --config-from-env --backend-config unified_local_single.yml
    """
    configure_logging(verbose, log_dir, simple_logs, ascii_only, show_warnings)

    # Validate config source
    if config_from_env and config:
        raise click.UsageError("Cannot specify both config file and --config-from-env")
    if not config_from_env and not config:
        raise click.UsageError("Must specify either config file or --config-from-env")

    try:
        # Load model configuration
        config_data = load_config(config, from_env=config_from_env)
        model_run = ModelRun(**config_data)

        logger.info(f"Running model: {model_run.config.model_type}")
        logger.info(f"Run ID: {model_run.run_id}")

        # Load backend configuration
        backend_cfg = _load_backend_config(backend_config)

        # Generate inputs
        start_time = datetime.now()
        staging_dir = model_run.generate()
        logger.info(f"Inputs generated in: {staging_dir}")

        if dry_run:
            logger.info("Dry run mode - skipping model execution")
            return

        # Execute model with workspace directory to avoid double generation
        success = model_run.run(backend=backend_cfg, workspace_dir=staging_dir)

        elapsed = datetime.now() - start_time
        if success:
            logger.info(
                f"✅ Model completed successfully in {elapsed.total_seconds():.2f}s"
            )
        else:
            logger.error(
                f"❌ Model execution failed after {elapsed.total_seconds():.2f}s"
            )
            sys.exit(1)

    except Exception as e:
        logger.error(f"Error running model: {e}")
        if verbose > 0:
            logger.exception("Full traceback:")
        sys.exit(1)

generate

generate(config, output_dir, verbose, log_dir, show_warnings, ascii_only, simple_logs, config_from_env)

Generate model input files only.

Source code in rompy/cli.py
@cli.command()
@click.argument("config", type=click.Path(exists=True), required=False)
@click.option("--output-dir", help="Override output directory")
@add_common_options
def generate(
    config,
    output_dir,
    verbose,
    log_dir,
    show_warnings,
    ascii_only,
    simple_logs,
    config_from_env,
):
    """Generate model input files only."""
    configure_logging(verbose, log_dir, simple_logs, ascii_only, show_warnings)

    # Validate config source
    if config_from_env and config:
        raise click.UsageError("Cannot specify both config file and --config-from-env")
    if not config_from_env and not config:
        raise click.UsageError("Must specify either config file or --config-from-env")

    try:
        # Load configuration
        config_data = load_config(config, from_env=config_from_env)
        if output_dir:
            config_data["output_dir"] = output_dir

        model_run = ModelRun(**config_data)

        logger.info(f"Generating inputs for: {model_run.config.model_type}")
        logger.info(f"Run ID: {model_run.run_id}")

        start_time = datetime.now()
        staging_dir = model_run.generate()
        elapsed = datetime.now() - start_time

        logger.info(f"✅ Inputs generated in {elapsed.total_seconds():.2f}s")
        logger.info(f"📁 Staging directory: {staging_dir}")

        # List generated files
        if Path(staging_dir).exists():
            files = list(Path(staging_dir).glob("*"))
            logger.info(f"Generated {len(files)} files")

    except Exception as e:
        logger.error(f"Error generating inputs: {e}")
        if verbose > 0:
            logger.exception("Full traceback:")
        sys.exit(1)

Formatting and Utilities

Formatting and utility functions:

formatting

Formatting utilities for ROMPY.

This module provides various formatting utilities for creating consistent and visually appealing output in the ROMPY codebase.

utils

Utility functions for ROMPY.

This module provides various utility functions used throughout the ROMPY codebase.

Functions

load_config

load_config(*args, **kwargs)

Load configuration from file, string, or environment variable.

This is a lazy import wrapper to avoid circular imports.

Source code in rompy/utils.py
def load_config(*args, **kwargs):
    """Load configuration from file, string, or environment variable.

    This is a lazy import wrapper to avoid circular imports.
    """
    from rompy.cli import load_config as _load_config

    return _load_config(*args, **kwargs)

Post-processing

Post-processing utilities:

postprocess

No-op postprocessor for model outputs.

This module provides a basic postprocessor that does nothing.

Pipeline Management

Pipeline execution components:

LocalPipelineBackend

Local pipeline backend that executes the full workflow locally.

This backend uses the existing generate(), run() and postprocess() methods to execute the complete pipeline locally.

Source code in rompy/pipeline/__init__.py
class LocalPipelineBackend:
    """Local pipeline backend that executes the full workflow locally.

    This backend uses the existing generate(), run() and postprocess() methods
    to execute the complete pipeline locally.
    """

    def execute(
        self,
        model_run,
        run_backend: str = "local",
        processor: str = "noop",
        run_kwargs: Optional[Dict[str, Any]] = None,
        process_kwargs: Optional[Dict[str, Any]] = None,
        cleanup_on_failure: bool = False,
        validate_stages: bool = True,
        **kwargs,
    ) -> Dict[str, Any]:
        """Execute the model pipeline locally.

        Args:
            model_run: The ModelRun instance to execute
            run_backend: Backend to use for the run stage ("local" or "docker")
            processor: Processor to use for the postprocess stage
            run_kwargs: Additional parameters for the run stage
            process_kwargs: Additional parameters for the postprocess stage
            cleanup_on_failure: Whether to cleanup outputs on pipeline failure
            validate_stages: Whether to validate each stage before proceeding
            **kwargs: Additional parameters (unused)

        Returns:
            Combined results from the pipeline execution

        Raises:
            ValueError: If model_run is invalid or parameters are invalid
        """
        # Validate input parameters
        if not model_run:
            raise ValueError("model_run cannot be None")

        if not hasattr(model_run, "run_id"):
            raise ValueError("model_run must have a run_id attribute")

        if not isinstance(run_backend, str) or not run_backend.strip():
            raise ValueError("run_backend must be a non-empty string")

        if not isinstance(processor, str) or not processor.strip():
            raise ValueError("processor must be a non-empty string")

        # Initialize parameters
        run_kwargs = run_kwargs or {}
        process_kwargs = process_kwargs or {}

        logger.info(f"Starting pipeline execution for run_id: {model_run.run_id}")
        logger.info(
            f"Pipeline configuration: run_backend='{run_backend}', processor='{processor}'"
        )

        pipeline_results = {
            "success": False,
            "run_id": model_run.run_id,
            "stages_completed": [],
            "run_backend": run_backend,
            "processor": processor,
        }

        try:
            # Stage 1: Generate input files
            logger.info(f"Stage 1: Generating input files for {model_run.run_id}")

            try:
                staging_dir = model_run.generate()
                pipeline_results["staging_dir"] = (
                    str(staging_dir) if staging_dir else None
                )
                pipeline_results["stages_completed"].append("generate")
                logger.info(f"Input files generated successfully in: {staging_dir}")
            except Exception as e:
                logger.exception(f"Failed to generate input files: {e}")
                return {
                    **pipeline_results,
                    "stage": "generate",
                    "message": f"Input file generation failed: {str(e)}",
                    "error": str(e),
                }

            # Validate generation stage
            if validate_stages:
                output_dir = Path(model_run.output_dir) / model_run.run_id
                if not output_dir.exists():
                    logger.error(f"Output directory was not created: {output_dir}")
                    return {
                        **pipeline_results,
                        "stage": "generate",
                        "message": f"Output directory not found after generation: {output_dir}",
                    }

            # Stage 2: Run the model
            logger.info(f"Stage 2: Running model using {run_backend} backend")

            try:
                # Create appropriate backend configuration
                backend_config = self._create_backend_config(run_backend, run_kwargs)

                # Pass the generated workspace directory to avoid duplicate generation
                run_success = model_run.run(
                    backend=backend_config, workspace_dir=staging_dir
                )
                pipeline_results["run_success"] = run_success

                if not run_success:
                    logger.error("Model run failed")
                    if cleanup_on_failure:
                        self._cleanup_outputs(model_run)
                    return {
                        **pipeline_results,
                        "stage": "run",
                        "message": "Model run failed",
                    }

                pipeline_results["stages_completed"].append("run")
                logger.info("Model run completed successfully")

            except Exception as e:
                logger.exception(f"Error during model run: {e}")
                if cleanup_on_failure:
                    self._cleanup_outputs(model_run)
                return {
                    **pipeline_results,
                    "stage": "run",
                    "message": f"Model run error: {str(e)}",
                    "error": str(e),
                }

            # Stage 3: Postprocess outputs
            logger.info(f"Stage 3: Postprocessing with {processor}")

            try:
                postprocess_results = model_run.postprocess(
                    processor=processor, **process_kwargs
                )
                pipeline_results["postprocess_results"] = postprocess_results
                pipeline_results["stages_completed"].append("postprocess")

                # Check if postprocessing was successful
                if isinstance(
                    postprocess_results, dict
                ) and not postprocess_results.get("success", True):
                    logger.warning(
                        "Postprocessing reported failure but pipeline will continue"
                    )

                logger.info("Postprocessing completed")

            except Exception as e:
                logger.exception(f"Error during postprocessing: {e}")
                return {
                    **pipeline_results,
                    "stage": "postprocess",
                    "message": f"Postprocessing error: {str(e)}",
                    "error": str(e),
                }

            # Pipeline completed successfully
            pipeline_results["success"] = True
            pipeline_results["message"] = "Pipeline completed successfully"

            logger.info(
                f"Pipeline execution completed successfully for run_id: {model_run.run_id}"
            )
            return pipeline_results

        except Exception as e:
            logger.exception(f"Unexpected error in pipeline execution: {e}")
            if cleanup_on_failure:
                self._cleanup_outputs(model_run)
            return {
                **pipeline_results,
                "stage": "pipeline",
                "message": f"Pipeline error: {str(e)}",
                "error": str(e),
            }

    def _cleanup_outputs(self, model_run) -> None:
        """Clean up output files on pipeline failure.

        Args:
            model_run: The ModelRun instance
        """
        try:
            output_dir = Path(model_run.output_dir) / model_run.run_id
            if output_dir.exists():
                logger.info(f"Cleaning up output directory: {output_dir}")
                import shutil

                shutil.rmtree(output_dir)
                logger.info("Cleanup completed")
        except Exception as e:
            logger.warning(f"Failed to cleanup output directory: {e}")

    def _create_backend_config(self, run_backend: str, run_kwargs: Dict[str, Any]):
        """Create appropriate backend configuration from string name and kwargs.

        Args:
            run_backend: Backend name ("local" or "docker")
            run_kwargs: Additional configuration parameters

        Returns:
            Backend configuration object

        Raises:
            ValueError: If backend name is not supported
        """
        if run_backend == "local":
            # Filter kwargs to only include valid LocalConfig fields
            valid_fields = set(LocalConfig.model_fields.keys())
            filtered_kwargs = {k: v for k, v in run_kwargs.items() if k in valid_fields}
            if filtered_kwargs != run_kwargs:
                invalid_fields = set(run_kwargs.keys()) - valid_fields
                logger.warning(f"Ignoring invalid LocalConfig fields: {invalid_fields}")
            return LocalConfig(**filtered_kwargs)
        elif run_backend == "docker":
            # Filter kwargs to only include valid DockerConfig fields
            valid_fields = set(DockerConfig.model_fields.keys())
            filtered_kwargs = {k: v for k, v in run_kwargs.items() if k in valid_fields}
            if filtered_kwargs != run_kwargs:
                invalid_fields = set(run_kwargs.keys()) - valid_fields
                logger.warning(
                    f"Ignoring invalid DockerConfig fields: {invalid_fields}"
                )
            return DockerConfig(**filtered_kwargs)
        else:
            raise ValueError(
                f"Unsupported backend: {run_backend}. Supported: local, docker"
            )

Spectrum Handling

Spectral data handling:

Frequency

Bases: RompyBaseModel

Wave frequency.

Source code in rompy/core/spectrum.py
class Frequency(RompyBaseModel):
    """Wave frequency."""

    model_type: Literal["frequency", "FREQUENCY"] = Field(
        default="frequency", description="Model type discriminator"
    )
    freq: Np1DArray = Field(description="Frequency array")

    @property
    def f0(self):
        return self.freq.min()

    @property
    def f1(self):
        return self.freq.max()

    @property
    def nf(self):
        return self.freq.size

    @property
    def flen(self):
        return self.f1 - self.f0

LogFrequency

Bases: RompyBaseModel

Logarithmic wave frequencies.

Frequencies are defined according to:

:math:f_{i+1} = \gamma * f_{i}

Note

The number of frequency bins nbin is always kept unchanged when provided. This implies other parameters may be adjusted so nbin bins can be defined. Specify f0, f1 and finc and let nbin be calculated to avoid those values changing.

Note

Choose finc=0.1 for a 10% increment between frequencies that satisfies the DIA.

Examples

.. ipython:: python :okwarning:

from rompy.core.spectrum import LogFrequency

LogFrequency(f0=0.04, f1=1.0, nbin=34)
LogFrequency(f0=0.04, f1=1.0, finc=0.1)
LogFrequency(f0=0.04, nbin=34, finc=0.1)
LogFrequency(f1=1.0, nbin=34, finc=0.1)
Source code in rompy/core/spectrum.py
class LogFrequency(RompyBaseModel):
    """Logarithmic wave frequencies.

    Frequencies are defined according to:

    :math:`f_{i+1} = \gamma * f_{i}`

    Note
    ----
    The number of frequency bins `nbin` is always kept unchanged when provided. This
    implies other parameters may be adjusted so `nbin` bins can be defined. Specify
    `f0`, `f1` and `finc` and let `nbin` be calculated to avoid those values changing.

    Note
    ----
    Choose `finc=0.1` for a 10% increment between frequencies that satisfies the DIA.

    Examples
    --------

    .. ipython:: python
        :okwarning:

        from rompy.core.spectrum import LogFrequency

        LogFrequency(f0=0.04, f1=1.0, nbin=34)
        LogFrequency(f0=0.04, f1=1.0, finc=0.1)
        LogFrequency(f0=0.04, nbin=34, finc=0.1)
        LogFrequency(f1=1.0, nbin=34, finc=0.1)

    """

    model_type: Literal["log", "LOG"] = Field(
        default="log", description="Model type discriminator"
    )
    f0: Optional[float] = Field(
        default=None, description="Lower frequency boundary (Hz)", gt=0.0
    )
    f1: Optional[float] = Field(
        default=None, description="Upper frequency boundary (Hz)"
    )
    finc: Optional[float] = Field(
        default=None, description="Log frequency increment", gt=0.0
    )
    nbin: Optional[int] = Field(
        default=None,
        description="Number of frequency bins, one less the size of frequency array",
        gt=0,
    )

    @model_validator(mode="after")
    def init_options(self) -> "LogFrequency":
        """Set the missing frequency parameters."""
        if sum([v is not None for v in [self.f0, self.f1, self.finc, self.nbin]]) != 3:
            raise ValueError("Three (only) of (f0, f1, finc, nbin) must be provided")

        # Calculate the missing frequency parameters
        if self.finc is None:
            self.finc = self._finc()
        elif self.nbin is None:
            self.nbin = self._nbin(self.f0, self.f1, self.finc)
        elif self.f1 is None:
            self.f1 = self.f0 * self.gamma**self.nbin
        else:
            self.f0 = self._f0(self.f1, self.nbin, self.gamma)

        # Redefine parameters based on the calculated values
        self.f0 = self()[0]
        self.f1 = self()[-1]
        self.finc = self._finc()
        self.nbin = len(self()) - 1

        return self

    def __call__(self) -> Np1DArray:
        """Frequency array."""
        return np.geomspace(self.f0, self.f1, self.nf)

    def __getitem__(self, index) -> float | list[float]:
        """Slicing from the frequency array."""
        return self.__call__()[index]

    def __len__(self):
        """Returns the length of the frequency array."""
        return len(self())

    def _finc(self):
        return (self()[1] - self()[0]) / self()[0]

    def _nbin(self, f0, f1, finc):
        return np.round(np.log(f1 / f0) / np.log(1 + finc)).astype("int")

    def _f0(self, f1, nbin, gamma):
        """Returns f0 given f1, nbin and gamma."""
        freqs = [f1]
        for n in range(nbin):
            freqs.append(freqs[-1] / gamma)
        return freqs[-1]

    @property
    def nf(self):
        return self.nbin + 1

    @property
    def gamma(self):
        return self.finc + 1

    @property
    def flen(self):
        return self.f1 - self.f0

Type Definitions

Common type definitions:

types

Rompy types.