Skip to content

Core Concepts

[!NOTE] For information about Rompy's formatting and logging system, see formatting_and_logging.

For details on using the command line interface, see cli.

This section delves into the fundamental components that make up Rompy's architecture. If you're new to Rompy, start with the User Guide before diving into these concepts.

Rompy is a modular library with configuration and execution separated by design. The core framework consists of two primary concepts:

ModelRun

Bases: RompyBaseModel

A model run.

It is intented to be model agnostic. It deals primarily with how the model is to be run, i.e. the period of the run and where the output is going. The actual configuration of the run is provided by the config object.

Further explanation is given in the rompy.core.Baseconfig docstring.

Source code in rompy/model.py
class ModelRun(RompyBaseModel):
    """A model run.

    It is intented to be model agnostic.
    It deals primarily with how the model is to be run, i.e. the period of the run
    and where the output is going. The actual configuration of the run is
    provided by the config object.

    Further explanation is given in the rompy.core.Baseconfig docstring.
    """

    # Initialize formatting variables in __init__

    model_type: Literal["modelrun"] = Field("modelrun", description="The model type.")
    run_id: str = Field("run_id", description="The run id")
    period: TimeRange = Field(
        TimeRange(
            start=datetime(2020, 2, 21, 4),
            end=datetime(2020, 2, 24, 4),
            interval="15M",
        ),
        description="The time period to run the model",
    )
    output_dir: Path = Field("./simulations", description="The output directory")
    config: Union[CONFIG_TYPES] = Field(
        default_factory=BaseConfig,
        description="The configuration object",
        discriminator="model_type",
    )
    delete_existing: bool = Field(False, description="Delete existing output directory")
    run_id_subdir: bool = Field(
        True, description="Use run_id subdirectory in the output directory"
    )
    _datefmt: str = "%Y%m%d.%H%M%S"
    _staging_dir: Path = None

    @property
    def staging_dir(self):
        """The directory where the model is staged for execution

        returns
        -------
        staging_dir : str
        """

        if self._staging_dir is None:
            self._staging_dir = self._create_staging_dir()
        return self._staging_dir

    def _create_staging_dir(self):
        if self.run_id_subdir:
            odir = Path(self.output_dir) / self.run_id
        else:
            odir = Path(self.output_dir)
        if self.delete_existing and odir.exists():
            shutil.rmtree(odir)
        odir.mkdir(parents=True, exist_ok=True)
        return odir

    @property
    def _generation_medatadata(self):
        return dict(
            _generated_at=str(datetime.now(timezone.utc)),
            _generated_by=os.environ.get("USER"),
            _generated_on=platform.node(),
        )

    def generate(self) -> str:
        """Generate the model input files

        returns
        -------
        staging_dir : str

        """
        # Import formatting utilities
        from rompy.formatting import format_table_row, log_box

        # Format model settings in a structured way
        config_type = type(self.config).__name__
        duration = self.period.end - self.period.start
        formatted_duration = self.period.format_duration(duration)

        # Create table rows for the model run info
        rows = [
            format_table_row("Run ID", str(self.run_id)),
            format_table_row("Model Type", config_type),
            format_table_row("Start Time", self.period.start.isoformat()),
            format_table_row("End Time", self.period.end.isoformat()),
            format_table_row("Duration", formatted_duration),
            format_table_row("Time Interval", str(self.period.interval)),
            format_table_row("Output Directory", str(self.output_dir)),
        ]

        # Add description if available
        if hasattr(self.config, "description") and self.config.description:
            rows.append(format_table_row("Description", self.config.description))

        # Create a formatted table with proper alignment
        formatted_rows = []
        key_lengths = []

        # First pass: collect all valid rows and calculate max key length
        for row in rows:
            try:
                # Split the row by the box-drawing vertical line character
                parts = [p.strip() for p in row.split("┃") if p.strip()]
                if len(parts) >= 2:  # We expect at least key and value parts
                    key = parts[0].strip()
                    value = parts[1].strip() if len(parts) > 1 else ""
                    key_lengths.append(len(key))
                    formatted_rows.append((key, value))
            except Exception as e:
                logger.warning(f"Error processing row '{row}': {e}")

        if not formatted_rows:
            logger.warning("No valid rows found for model run configuration table")
            return self._staging_dir

        max_key_len = max(key_lengths) if key_lengths else 0

        # Format the rows with proper alignment
        aligned_rows = []
        for key, value in formatted_rows:
            aligned_row = f"{key:>{max_key_len}} : {value}"
            aligned_rows.append(aligned_row)

        # Log the box with the model run info
        log_box(title="MODEL RUN CONFIGURATION", logger=logger, add_empty_line=False)

        # Log each row of the content with proper indentation
        for row in aligned_rows:
            logger.info(f"  {row}")

        # Log the bottom of the box
        log_box(
            title=None,
            logger=logger,
            add_empty_line=True,  # Just the bottom border
        )

        # Display detailed configuration info using the new formatting framework
        from rompy.formatting import log_box

        # Create a box with the configuration type as title
        log_box(f"MODEL CONFIGURATION ({config_type})")

        # Use the model's string representation which now uses the new formatting
        try:
            # The __str__ method of RompyBaseModel already handles the formatting
            config_str = str(self.config)
            for line in config_str.split("\n"):
                logger.info(line)
        except Exception as e:
            # If anything goes wrong with config formatting, log the error and minimal info
            logger.info(f"Using {type(self.config).__name__} configuration")
            logger.debug(f"Configuration string formatting error: {str(e)}")

        logger.info("")
        log_box(
            title="STARTING MODEL GENERATION",
            logger=logger,
            add_empty_line=False,
        )
        logger.info(f"Preparing input files in {self.output_dir}")

        # Collect context data
        cc_full = {}
        cc_full["runtime"] = self.model_dump()
        cc_full["runtime"]["staging_dir"] = self.staging_dir
        cc_full["runtime"].update(self._generation_medatadata)
        cc_full["runtime"].update({"_datefmt": self._datefmt})

        # Process configuration
        logger.info("Processing model configuration...")
        if callable(self.config):
            # Run the __call__() method of the config object if it is callable passing
            # the runtime instance, and fill in the context with what is returned
            logger.info("Running configuration callable...")
            cc_full["config"] = self.config(self)
        else:
            # Otherwise just fill in the context with the config instance itself
            logger.info("Using static configuration...")
            cc_full["config"] = self.config

        # Render templates
        logger.info(f"Rendering model templates to {self.output_dir}/{self.run_id}...")
        staging_dir = render(
            cc_full, self.config.template, self.output_dir, self.config.checkout
        )

        logger.info("")
        # Use the log_box utility function
        from rompy.formatting import log_box

        log_box(
            title="MODEL GENERATION COMPLETE",
            logger=logger,
            add_empty_line=False,
        )
        logger.info(f"Model files generated at: {staging_dir}")
        return staging_dir

    def zip(self) -> str:
        """Zip the input files for the model run

        This function zips the input files for the model run and returns the
        name of the zip file. It also cleans up the staging directory leaving
        only the settings.json file that can be used to reproduce the run.

        returns
        -------
        zip_fn : str
        """
        # Use the log_box utility function
        from rompy.formatting import log_box

        log_box(
            title="ARCHIVING MODEL FILES",
            logger=logger,
        )

        # Always remove previous zips
        zip_fn = Path(str(self.staging_dir) + ".zip")
        if zip_fn.exists():
            logger.info(f"Removing existing archive at {zip_fn}")
            zip_fn.unlink()

        # Count files to be archived
        file_count = sum([len(fn) for _, _, fn in os.walk(self.staging_dir)])
        logger.info(f"Archiving {file_count} files from {self.staging_dir}")

        # Create zip archive
        with zf.ZipFile(zip_fn, mode="w", compression=zf.ZIP_DEFLATED) as z:
            for dp, dn, fn in os.walk(self.staging_dir):
                for filename in fn:
                    source_path = os.path.join(dp, filename)
                    rel_path = os.path.relpath(source_path, self.staging_dir)
                    z.write(source_path, rel_path)

        # Clean up staging directory
        logger.info(f"Cleaning up staging directory {self.staging_dir}")
        shutil.rmtree(self.staging_dir)

        from rompy.formatting import log_box

        log_box(
            f"✓ Archive created successfully: {zip_fn}",
            logger=logger,
            add_empty_line=False,
        )
        return zip_fn

    def __call__(self):
        return self.generate()

    def run(self, backend: BackendConfig, workspace_dir: Optional[str] = None) -> bool:
        """
        Run the model using the specified backend configuration.

        This method uses Pydantic configuration objects that provide type safety
        and validation for all backend parameters.

        Args:
            backend: Pydantic configuration object (LocalConfig, DockerConfig, etc.)
            workspace_dir: Path to generated workspace directory (optional)

        Returns:
            True if execution was successful, False otherwise

        Raises:
            TypeError: If backend is not a BackendConfig instance

        Examples:
            from rompy.backends import LocalConfig, DockerConfig

            # Local execution
            model.run(LocalConfig(timeout=3600, command="python run.py"))

            # Docker execution
            model.run(DockerConfig(image="swan:latest", cpu=4, memory="2g"))
        """
        if not isinstance(backend, BaseBackendConfig):
            raise TypeError(
                f"Backend must be a subclass of BaseBackendConfig, "
                f"got {type(backend).__name__}"
            )

        logger.debug(f"Using backend config: {type(backend).__name__}")

        # Get the backend class directly from the configuration
        backend_class = backend.get_backend_class()
        backend_instance = backend_class()

        # Pass the config object and workspace_dir to the backend
        return backend_instance.run(self, config=backend, workspace_dir=workspace_dir)

    def postprocess(self, processor: str = "noop", **kwargs) -> Dict[str, Any]:
        """
        Postprocess the model outputs using the specified processor.

        This method uses entry points to load and execute the appropriate postprocessor.
        Available processors are automatically discovered from the rompy.postprocess entry point group.

        Built-in processors:
        - "noop": A placeholder processor that does nothing but returns success

        Args:
            processor: Name of the postprocessor to use (default: "noop")
            **kwargs: Additional processor-specific parameters

        Returns:
            Dictionary with results from the postprocessing

        Raises:
            ValueError: If the specified processor is not available
        """
        # Get the requested postprocessor class from entry points
        if processor not in POSTPROCESSORS:
            available = list(POSTPROCESSORS.keys())
            raise ValueError(
                f"Unknown postprocessor: {processor}. "
                f"Available processors: {', '.join(available)}"
            )

        # Create an instance and process the outputs
        processor_class = POSTPROCESSORS[processor]
        processor_instance = processor_class()
        return processor_instance.process(self, **kwargs)

    def pipeline(self, pipeline_backend: str = "local", **kwargs) -> Dict[str, Any]:
        """
        Run the complete model pipeline (generate, run, postprocess) using the specified pipeline backend.

        This method executes the entire model workflow from input generation through running
        the model to postprocessing outputs. It uses entry points to load and execute the
        appropriate pipeline backend from the rompy.pipeline entry point group.

        Built-in pipeline backends:
        - "local": Execute the complete pipeline locally using the existing ModelRun methods

        Args:
            pipeline_backend: Name of the pipeline backend to use (default: "local")
            **kwargs: Additional backend-specific parameters. Common parameters include:
                - run_backend: Backend to use for the run stage (for local pipeline)
                - processor: Processor to use for postprocessing (for local pipeline)
                - run_kwargs: Additional parameters for the run stage
                - process_kwargs: Additional parameters for postprocessing

        Returns:
            Dictionary with results from the pipeline execution

        Raises:
            ValueError: If the specified pipeline backend is not available
        """
        # Get the requested pipeline backend class from entry points
        if pipeline_backend not in PIPELINE_BACKENDS:
            available = list(PIPELINE_BACKENDS.keys())
            raise ValueError(
                f"Unknown pipeline backend: {pipeline_backend}. "
                f"Available backends: {', '.join(available)}"
            )

        # Create an instance and execute the pipeline
        backend_class = PIPELINE_BACKENDS[pipeline_backend]
        backend_instance = backend_class()
        return backend_instance.execute(self, **kwargs)

Attributes

model_type class-attribute instance-attribute

model_type: Literal['modelrun'] = Field('modelrun', description='The model type.')

run_id class-attribute instance-attribute

run_id: str = Field('run_id', description='The run id')

period class-attribute instance-attribute

period: TimeRange = Field(TimeRange(start=datetime(2020, 2, 21, 4), end=datetime(2020, 2, 24, 4), interval='15M'), description='The time period to run the model')

output_dir class-attribute instance-attribute

output_dir: Path = Field('./simulations', description='The output directory')

config class-attribute instance-attribute

config: Union[CONFIG_TYPES] = Field(default_factory=BaseConfig, description='The configuration object', discriminator='model_type')

delete_existing class-attribute instance-attribute

delete_existing: bool = Field(False, description='Delete existing output directory')

run_id_subdir class-attribute instance-attribute

run_id_subdir: bool = Field(True, description='Use run_id subdirectory in the output directory')

staging_dir property

staging_dir

The directory where the model is staged for execution

returns

staging_dir : str

Functions

generate

generate() -> str

Generate the model input files

returns

staging_dir : str

Source code in rompy/model.py
def generate(self) -> str:
    """Generate the model input files

    returns
    -------
    staging_dir : str

    """
    # Import formatting utilities
    from rompy.formatting import format_table_row, log_box

    # Format model settings in a structured way
    config_type = type(self.config).__name__
    duration = self.period.end - self.period.start
    formatted_duration = self.period.format_duration(duration)

    # Create table rows for the model run info
    rows = [
        format_table_row("Run ID", str(self.run_id)),
        format_table_row("Model Type", config_type),
        format_table_row("Start Time", self.period.start.isoformat()),
        format_table_row("End Time", self.period.end.isoformat()),
        format_table_row("Duration", formatted_duration),
        format_table_row("Time Interval", str(self.period.interval)),
        format_table_row("Output Directory", str(self.output_dir)),
    ]

    # Add description if available
    if hasattr(self.config, "description") and self.config.description:
        rows.append(format_table_row("Description", self.config.description))

    # Create a formatted table with proper alignment
    formatted_rows = []
    key_lengths = []

    # First pass: collect all valid rows and calculate max key length
    for row in rows:
        try:
            # Split the row by the box-drawing vertical line character
            parts = [p.strip() for p in row.split("┃") if p.strip()]
            if len(parts) >= 2:  # We expect at least key and value parts
                key = parts[0].strip()
                value = parts[1].strip() if len(parts) > 1 else ""
                key_lengths.append(len(key))
                formatted_rows.append((key, value))
        except Exception as e:
            logger.warning(f"Error processing row '{row}': {e}")

    if not formatted_rows:
        logger.warning("No valid rows found for model run configuration table")
        return self._staging_dir

    max_key_len = max(key_lengths) if key_lengths else 0

    # Format the rows with proper alignment
    aligned_rows = []
    for key, value in formatted_rows:
        aligned_row = f"{key:>{max_key_len}} : {value}"
        aligned_rows.append(aligned_row)

    # Log the box with the model run info
    log_box(title="MODEL RUN CONFIGURATION", logger=logger, add_empty_line=False)

    # Log each row of the content with proper indentation
    for row in aligned_rows:
        logger.info(f"  {row}")

    # Log the bottom of the box
    log_box(
        title=None,
        logger=logger,
        add_empty_line=True,  # Just the bottom border
    )

    # Display detailed configuration info using the new formatting framework
    from rompy.formatting import log_box

    # Create a box with the configuration type as title
    log_box(f"MODEL CONFIGURATION ({config_type})")

    # Use the model's string representation which now uses the new formatting
    try:
        # The __str__ method of RompyBaseModel already handles the formatting
        config_str = str(self.config)
        for line in config_str.split("\n"):
            logger.info(line)
    except Exception as e:
        # If anything goes wrong with config formatting, log the error and minimal info
        logger.info(f"Using {type(self.config).__name__} configuration")
        logger.debug(f"Configuration string formatting error: {str(e)}")

    logger.info("")
    log_box(
        title="STARTING MODEL GENERATION",
        logger=logger,
        add_empty_line=False,
    )
    logger.info(f"Preparing input files in {self.output_dir}")

    # Collect context data
    cc_full = {}
    cc_full["runtime"] = self.model_dump()
    cc_full["runtime"]["staging_dir"] = self.staging_dir
    cc_full["runtime"].update(self._generation_medatadata)
    cc_full["runtime"].update({"_datefmt": self._datefmt})

    # Process configuration
    logger.info("Processing model configuration...")
    if callable(self.config):
        # Run the __call__() method of the config object if it is callable passing
        # the runtime instance, and fill in the context with what is returned
        logger.info("Running configuration callable...")
        cc_full["config"] = self.config(self)
    else:
        # Otherwise just fill in the context with the config instance itself
        logger.info("Using static configuration...")
        cc_full["config"] = self.config

    # Render templates
    logger.info(f"Rendering model templates to {self.output_dir}/{self.run_id}...")
    staging_dir = render(
        cc_full, self.config.template, self.output_dir, self.config.checkout
    )

    logger.info("")
    # Use the log_box utility function
    from rompy.formatting import log_box

    log_box(
        title="MODEL GENERATION COMPLETE",
        logger=logger,
        add_empty_line=False,
    )
    logger.info(f"Model files generated at: {staging_dir}")
    return staging_dir

zip

zip() -> str

Zip the input files for the model run

This function zips the input files for the model run and returns the name of the zip file. It also cleans up the staging directory leaving only the settings.json file that can be used to reproduce the run.

returns

zip_fn : str

Source code in rompy/model.py
def zip(self) -> str:
    """Zip the input files for the model run

    This function zips the input files for the model run and returns the
    name of the zip file. It also cleans up the staging directory leaving
    only the settings.json file that can be used to reproduce the run.

    returns
    -------
    zip_fn : str
    """
    # Use the log_box utility function
    from rompy.formatting import log_box

    log_box(
        title="ARCHIVING MODEL FILES",
        logger=logger,
    )

    # Always remove previous zips
    zip_fn = Path(str(self.staging_dir) + ".zip")
    if zip_fn.exists():
        logger.info(f"Removing existing archive at {zip_fn}")
        zip_fn.unlink()

    # Count files to be archived
    file_count = sum([len(fn) for _, _, fn in os.walk(self.staging_dir)])
    logger.info(f"Archiving {file_count} files from {self.staging_dir}")

    # Create zip archive
    with zf.ZipFile(zip_fn, mode="w", compression=zf.ZIP_DEFLATED) as z:
        for dp, dn, fn in os.walk(self.staging_dir):
            for filename in fn:
                source_path = os.path.join(dp, filename)
                rel_path = os.path.relpath(source_path, self.staging_dir)
                z.write(source_path, rel_path)

    # Clean up staging directory
    logger.info(f"Cleaning up staging directory {self.staging_dir}")
    shutil.rmtree(self.staging_dir)

    from rompy.formatting import log_box

    log_box(
        f"✓ Archive created successfully: {zip_fn}",
        logger=logger,
        add_empty_line=False,
    )
    return zip_fn

run

run(backend: BackendConfig, workspace_dir: Optional[str] = None) -> bool

Run the model using the specified backend configuration.

This method uses Pydantic configuration objects that provide type safety and validation for all backend parameters.

Parameters:

Name Type Description Default
backend BackendConfig

Pydantic configuration object (LocalConfig, DockerConfig, etc.)

required
workspace_dir Optional[str]

Path to generated workspace directory (optional)

None

Returns:

Type Description
bool

True if execution was successful, False otherwise

Raises:

Type Description
TypeError

If backend is not a BackendConfig instance

Examples:

from rompy.backends import LocalConfig, DockerConfig

Local execution

model.run(LocalConfig(timeout=3600, command="python run.py"))

Docker execution

model.run(DockerConfig(image="swan:latest", cpu=4, memory="2g"))

Source code in rompy/model.py
def run(self, backend: BackendConfig, workspace_dir: Optional[str] = None) -> bool:
    """
    Run the model using the specified backend configuration.

    This method uses Pydantic configuration objects that provide type safety
    and validation for all backend parameters.

    Args:
        backend: Pydantic configuration object (LocalConfig, DockerConfig, etc.)
        workspace_dir: Path to generated workspace directory (optional)

    Returns:
        True if execution was successful, False otherwise

    Raises:
        TypeError: If backend is not a BackendConfig instance

    Examples:
        from rompy.backends import LocalConfig, DockerConfig

        # Local execution
        model.run(LocalConfig(timeout=3600, command="python run.py"))

        # Docker execution
        model.run(DockerConfig(image="swan:latest", cpu=4, memory="2g"))
    """
    if not isinstance(backend, BaseBackendConfig):
        raise TypeError(
            f"Backend must be a subclass of BaseBackendConfig, "
            f"got {type(backend).__name__}"
        )

    logger.debug(f"Using backend config: {type(backend).__name__}")

    # Get the backend class directly from the configuration
    backend_class = backend.get_backend_class()
    backend_instance = backend_class()

    # Pass the config object and workspace_dir to the backend
    return backend_instance.run(self, config=backend, workspace_dir=workspace_dir)

postprocess

postprocess(processor: str = 'noop', **kwargs) -> Dict[str, Any]

Postprocess the model outputs using the specified processor.

This method uses entry points to load and execute the appropriate postprocessor. Available processors are automatically discovered from the rompy.postprocess entry point group.

Built-in processors: - "noop": A placeholder processor that does nothing but returns success

Parameters:

Name Type Description Default
processor str

Name of the postprocessor to use (default: "noop")

'noop'
**kwargs

Additional processor-specific parameters

{}

Returns:

Type Description
Dict[str, Any]

Dictionary with results from the postprocessing

Raises:

Type Description
ValueError

If the specified processor is not available

Source code in rompy/model.py
def postprocess(self, processor: str = "noop", **kwargs) -> Dict[str, Any]:
    """
    Postprocess the model outputs using the specified processor.

    This method uses entry points to load and execute the appropriate postprocessor.
    Available processors are automatically discovered from the rompy.postprocess entry point group.

    Built-in processors:
    - "noop": A placeholder processor that does nothing but returns success

    Args:
        processor: Name of the postprocessor to use (default: "noop")
        **kwargs: Additional processor-specific parameters

    Returns:
        Dictionary with results from the postprocessing

    Raises:
        ValueError: If the specified processor is not available
    """
    # Get the requested postprocessor class from entry points
    if processor not in POSTPROCESSORS:
        available = list(POSTPROCESSORS.keys())
        raise ValueError(
            f"Unknown postprocessor: {processor}. "
            f"Available processors: {', '.join(available)}"
        )

    # Create an instance and process the outputs
    processor_class = POSTPROCESSORS[processor]
    processor_instance = processor_class()
    return processor_instance.process(self, **kwargs)

pipeline

pipeline(pipeline_backend: str = 'local', **kwargs) -> Dict[str, Any]

Run the complete model pipeline (generate, run, postprocess) using the specified pipeline backend.

This method executes the entire model workflow from input generation through running the model to postprocessing outputs. It uses entry points to load and execute the appropriate pipeline backend from the rompy.pipeline entry point group.

Built-in pipeline backends: - "local": Execute the complete pipeline locally using the existing ModelRun methods

Parameters:

Name Type Description Default
pipeline_backend str

Name of the pipeline backend to use (default: "local")

'local'
**kwargs

Additional backend-specific parameters. Common parameters include: - run_backend: Backend to use for the run stage (for local pipeline) - processor: Processor to use for postprocessing (for local pipeline) - run_kwargs: Additional parameters for the run stage - process_kwargs: Additional parameters for postprocessing

{}

Returns:

Type Description
Dict[str, Any]

Dictionary with results from the pipeline execution

Raises:

Type Description
ValueError

If the specified pipeline backend is not available

Source code in rompy/model.py
def pipeline(self, pipeline_backend: str = "local", **kwargs) -> Dict[str, Any]:
    """
    Run the complete model pipeline (generate, run, postprocess) using the specified pipeline backend.

    This method executes the entire model workflow from input generation through running
    the model to postprocessing outputs. It uses entry points to load and execute the
    appropriate pipeline backend from the rompy.pipeline entry point group.

    Built-in pipeline backends:
    - "local": Execute the complete pipeline locally using the existing ModelRun methods

    Args:
        pipeline_backend: Name of the pipeline backend to use (default: "local")
        **kwargs: Additional backend-specific parameters. Common parameters include:
            - run_backend: Backend to use for the run stage (for local pipeline)
            - processor: Processor to use for postprocessing (for local pipeline)
            - run_kwargs: Additional parameters for the run stage
            - process_kwargs: Additional parameters for postprocessing

    Returns:
        Dictionary with results from the pipeline execution

    Raises:
        ValueError: If the specified pipeline backend is not available
    """
    # Get the requested pipeline backend class from entry points
    if pipeline_backend not in PIPELINE_BACKENDS:
        available = list(PIPELINE_BACKENDS.keys())
        raise ValueError(
            f"Unknown pipeline backend: {pipeline_backend}. "
            f"Available backends: {', '.join(available)}"
        )

    # Create an instance and execute the pipeline
    backend_class = PIPELINE_BACKENDS[pipeline_backend]
    backend_instance = backend_class()
    return backend_instance.execute(self, **kwargs)

BaseConfig

Bases: RompyBaseModel

Base class for model templates.

The template class provides the object that is used to set up the model configuration. When implemented for a given model, can move along a scale of complexity to suit the application.

In its most basic form, as implemented in this base object, it consists of path to a cookiecutter template with the class providing the context for the {{config}} values in that template. Note that any {{runtime}} values are filled from the ModelRun object.

If the template is a git repo, the checkout parameter can be used to specify a branch or tag and it will be cloned and used.

If the object is callable, it will be colled prior to rendering the template. This mechanism can be used to perform tasks such as fetching exteral data, or providing additional context to the template beyond the arguments provided by the user..

Source code in rompy/core/config.py
class BaseConfig(RompyBaseModel):
    """Base class for model templates.

    The template class provides the object that is used to set up the model configuration.
    When implemented for a given model, can move along a scale of complexity
    to suit the application.

    In its most basic form, as implemented in this base object, it consists of path to a cookiecutter template
    with the class providing the context for the {{config}} values in that template. Note that any
    {{runtime}} values are filled from the ModelRun object.

    If the template is a git repo, the checkout parameter can be used to specify a branch or tag and it
    will be cloned and used.

    If the object is callable, it will be colled prior to rendering the template. This mechanism can be
    used to perform tasks such as fetching exteral data, or providing additional context to the template
    beyond the arguments provided by the user..
    """

    model_type: Literal["base"] = "base"
    template: Optional[str] = Field(
        description="The path to the model template",
        default=DEFAULT_TEMPLATE,
    )
    checkout: Optional[str] = Field(
        description="The git branch to use if the template is a git repo",
        default="main",
    )
    model_config = ConfigDict(extra="allow")

    def __call__(self, *args, **kwargs):
        return self

Attributes

model_type class-attribute instance-attribute

model_type: Literal['base'] = 'base'

template class-attribute instance-attribute

template: Optional[str] = Field(description='The path to the model template', default=DEFAULT_TEMPLATE)

checkout class-attribute instance-attribute

checkout: Optional[str] = Field(description='The git branch to use if the template is a git repo', default='main')

model_config class-attribute instance-attribute

model_config = ConfigDict(extra='allow')

At a high level, ModelRun orchestrates the entire model execution process including generation, execution, and post-processing, while configuration objects are responsible for defining the model setup.

Core Component Categories

Grid Components

Grids define the spatial domain of models. Rompy provides several grid types:

BaseGrid

Bases: RompyBaseModel

Representation of a grid in geographic space.

This is the base class for all Grid objects. The minimum representation of a grid are two NumPy array's representing the vertices or nodes of some structured or unstructured grid, its bounding box and a boundary polygon. No knowledge of the grid connectivity is expected.

Source code in rompy/core/grid.py
class BaseGrid(RompyBaseModel):
    """Representation of a grid in geographic space.

    This is the base class for all Grid objects. The minimum representation of a grid
    are two NumPy array's representing the vertices or nodes of some structured or
    unstructured grid, its bounding box and a boundary polygon. No knowledge of the
    grid connectivity is expected.

    """

    grid_type: Literal["base"] = "base"

    @property
    def x(self) -> np.ndarray:
        raise NotImplementedError

    @property
    def y(self) -> np.ndarray:
        raise NotImplementedError

    @property
    def minx(self) -> float:
        return np.nanmin(self.x)

    @property
    def maxx(self) -> float:
        return np.nanmax(self.x)

    @property
    def miny(self) -> float:
        return np.nanmin(self.y)

    @property
    def maxy(self) -> float:
        return np.nanmax(self.y)

    def bbox(self, buffer=0.0) -> Bbox:
        """Returns a bounding box for the spatial grid

        This function returns a list [ll_x, ll_y, ur_x, ur_y]
        where ll_x, ll_y (ur_x, ur_y) are the lower left (upper right)
        x and y coordinates bounding box of the model domain

        """
        ll_x = self.minx - buffer
        ll_y = self.miny - buffer
        ur_x = self.maxx + buffer
        ur_y = self.maxy + buffer
        bbox = [ll_x, ll_y, ur_x, ur_y]
        return bbox

    def _get_convex_hull(self, tolerance=0.2) -> Polygon:
        xys = list(zip(self.x.flatten(), self.y.flatten()))
        polygon = MultiPoint(xys).convex_hull
        polygon = polygon.simplify(tolerance=tolerance)
        return polygon

    def boundary(self, tolerance=0.2) -> Polygon:
        """Returns the convex hull boundary polygon from the grid.

        Parameters
        ----------
        tolerance: float
            Simplify polygon shape based on maximum distance from original geometry,
            see https://shapely.readthedocs.io/en/stable/manual.html#object.simplify.

        Returns
        -------
        polygon: shapely.Polygon
            See https://shapely.readthedocs.io/en/stable/manual.html#Polygon

        """
        return self._get_convex_hull(tolerance=tolerance)

    def boundary_points(self, spacing=None, tolerance=0.2) -> tuple:
        """Returns array of coordinates from boundary polygon.

        Parameters
        ----------
        tolerance: float
            Simplify polygon shape based on maximum distance from original geometry,
            see https://shapely.readthedocs.io/en/stable/manual.html#object.simplify.
        spacing: float
            If specified, points are returned evenly spaced along the boundary at the
            specified spacing, otherwise all points are returned.

        Returns:
        --------
        points: tuple
            Tuple of x and y coordinates of the boundary points.

        """
        polygon = self.boundary(tolerance=tolerance)
        if spacing is None:
            xpts, ypts = polygon.exterior.coords.xy
        else:
            perimeter = polygon.length
            if perimeter < spacing:
                raise ValueError(f"Spacing = {spacing} > grid perimeter = {perimeter}")
            npts = int(np.ceil(perimeter / spacing))
            points = [polygon.boundary.interpolate(i * spacing) for i in range(npts)]
            xpts = [point.x for point in points]
            ypts = [point.y for point in points]
        return np.array(xpts), np.array(ypts)

    def _figsize(self, x0, x1, y0, y1, fscale):
        xlen = abs(x1 - x0)
        ylen = abs(y1 - y0)
        if xlen >= ylen:
            figsize = (fscale, fscale * ylen / xlen or fscale)
        else:
            figsize = (fscale * xlen / ylen or fscale, fscale)
        return figsize

    def plot(
        self,
        ax=None,
        figsize=None,
        fscale=10,
        buffer=0.1,
        borders=True,
        land=True,
        coastline=True,
    ):
        """Plot the grid"""

        projection = ccrs.PlateCarree()
        transform = ccrs.PlateCarree()

        # Set some plot parameters:
        x0, y0, x1, y1 = self.bbox(buffer=buffer)

        # create figure and plot/map
        if ax is None:
            if figsize is None:
                figsize = self._figsize(x0, x1, y0, y1, fscale)
            fig = plt.figure(figsize=figsize)
            ax = fig.add_subplot(111, projection=projection)
            ax.set_extent([x0, x1, y0, y1], crs=transform)

            if borders:
                ax.add_feature(cfeature.BORDERS)
            if land:
                ax.add_feature(cfeature.LAND)
            if coastline:
                ax.add_feature(cfeature.COASTLINE)
        else:
            fig = ax.figure

        ax.gridlines(
            crs=transform,
            draw_labels=["left", "bottom"],
            linewidth=1,
            color="gray",
            alpha=0.5,
            linestyle="--",
        )

        # Plot the model domain
        bx, by = self.boundary_points()
        poly = plt.Polygon(list(zip(bx, by)), facecolor="r", alpha=0.05)
        ax.add_patch(poly)
        ax.plot(bx, by, lw=2, color="k")
        return fig, ax

    def __repr__(self):
        return f"{self.__class__.__name__}({self.x}, {self.y})"

    def __eq__(self, other):
        return self.model_dump() == other.model_dump()

Attributes

grid_type class-attribute instance-attribute

grid_type: Literal['base'] = 'base'

x property

x: ndarray

y property

y: ndarray

minx property

minx: float

maxx property

maxx: float

miny property

miny: float

maxy property

maxy: float

Functions

bbox

bbox(buffer=0.0) -> Bbox

Returns a bounding box for the spatial grid

This function returns a list [ll_x, ll_y, ur_x, ur_y] where ll_x, ll_y (ur_x, ur_y) are the lower left (upper right) x and y coordinates bounding box of the model domain

Source code in rompy/core/grid.py
def bbox(self, buffer=0.0) -> Bbox:
    """Returns a bounding box for the spatial grid

    This function returns a list [ll_x, ll_y, ur_x, ur_y]
    where ll_x, ll_y (ur_x, ur_y) are the lower left (upper right)
    x and y coordinates bounding box of the model domain

    """
    ll_x = self.minx - buffer
    ll_y = self.miny - buffer
    ur_x = self.maxx + buffer
    ur_y = self.maxy + buffer
    bbox = [ll_x, ll_y, ur_x, ur_y]
    return bbox

boundary

boundary(tolerance=0.2) -> Polygon

Returns the convex hull boundary polygon from the grid.

Parameters

tolerance: float Simplify polygon shape based on maximum distance from original geometry, see https://shapely.readthedocs.io/en/stable/manual.html#object.simplify.

Returns

polygon: shapely.Polygon See https://shapely.readthedocs.io/en/stable/manual.html#Polygon

Source code in rompy/core/grid.py
def boundary(self, tolerance=0.2) -> Polygon:
    """Returns the convex hull boundary polygon from the grid.

    Parameters
    ----------
    tolerance: float
        Simplify polygon shape based on maximum distance from original geometry,
        see https://shapely.readthedocs.io/en/stable/manual.html#object.simplify.

    Returns
    -------
    polygon: shapely.Polygon
        See https://shapely.readthedocs.io/en/stable/manual.html#Polygon

    """
    return self._get_convex_hull(tolerance=tolerance)

boundary_points

boundary_points(spacing=None, tolerance=0.2) -> tuple

Returns array of coordinates from boundary polygon.

Parameters

tolerance: float Simplify polygon shape based on maximum distance from original geometry, see https://shapely.readthedocs.io/en/stable/manual.html#object.simplify. spacing: float If specified, points are returned evenly spaced along the boundary at the specified spacing, otherwise all points are returned.

Returns:

points: tuple Tuple of x and y coordinates of the boundary points.

Source code in rompy/core/grid.py
def boundary_points(self, spacing=None, tolerance=0.2) -> tuple:
    """Returns array of coordinates from boundary polygon.

    Parameters
    ----------
    tolerance: float
        Simplify polygon shape based on maximum distance from original geometry,
        see https://shapely.readthedocs.io/en/stable/manual.html#object.simplify.
    spacing: float
        If specified, points are returned evenly spaced along the boundary at the
        specified spacing, otherwise all points are returned.

    Returns:
    --------
    points: tuple
        Tuple of x and y coordinates of the boundary points.

    """
    polygon = self.boundary(tolerance=tolerance)
    if spacing is None:
        xpts, ypts = polygon.exterior.coords.xy
    else:
        perimeter = polygon.length
        if perimeter < spacing:
            raise ValueError(f"Spacing = {spacing} > grid perimeter = {perimeter}")
        npts = int(np.ceil(perimeter / spacing))
        points = [polygon.boundary.interpolate(i * spacing) for i in range(npts)]
        xpts = [point.x for point in points]
        ypts = [point.y for point in points]
    return np.array(xpts), np.array(ypts)

plot

plot(ax=None, figsize=None, fscale=10, buffer=0.1, borders=True, land=True, coastline=True)

Plot the grid

Source code in rompy/core/grid.py
def plot(
    self,
    ax=None,
    figsize=None,
    fscale=10,
    buffer=0.1,
    borders=True,
    land=True,
    coastline=True,
):
    """Plot the grid"""

    projection = ccrs.PlateCarree()
    transform = ccrs.PlateCarree()

    # Set some plot parameters:
    x0, y0, x1, y1 = self.bbox(buffer=buffer)

    # create figure and plot/map
    if ax is None:
        if figsize is None:
            figsize = self._figsize(x0, x1, y0, y1, fscale)
        fig = plt.figure(figsize=figsize)
        ax = fig.add_subplot(111, projection=projection)
        ax.set_extent([x0, x1, y0, y1], crs=transform)

        if borders:
            ax.add_feature(cfeature.BORDERS)
        if land:
            ax.add_feature(cfeature.LAND)
        if coastline:
            ax.add_feature(cfeature.COASTLINE)
    else:
        fig = ax.figure

    ax.gridlines(
        crs=transform,
        draw_labels=["left", "bottom"],
        linewidth=1,
        color="gray",
        alpha=0.5,
        linestyle="--",
    )

    # Plot the model domain
    bx, by = self.boundary_points()
    poly = plt.Polygon(list(zip(bx, by)), facecolor="r", alpha=0.05)
    ax.add_patch(poly)
    ax.plot(bx, by, lw=2, color="k")
    return fig, ax

RegularGrid

Bases: BaseGrid

Regular grid in geographic space.

This object provides an abstract representation of a regular grid in some geographic space.

Source code in rompy/core/grid.py
class RegularGrid(BaseGrid):
    """Regular grid in geographic space.

    This object provides an abstract representation of a regular grid in some
    geographic space.

    """

    grid_type: Literal["regular"] = Field(
        "regular", description="Type of grid, must be 'regular'"
    )
    x0: Optional[float] = Field(
        default=None, description="X coordinate of the grid origin"
    )
    y0: Optional[float] = Field(
        default=None, description="Y coordinate of the grid origin"
    )
    rot: Optional[float] = Field(
        0.0, description="Rotation angle of the grid in degrees"
    )
    dx: Optional[float] = Field(
        default=None, description="Spacing between grid points in the x direction"
    )
    dy: Optional[float] = Field(
        default=None, description="Spacing between grid points in the y direction"
    )
    nx: Optional[int] = Field(
        default=None, description="Number of grid points in the x direction"
    )
    ny: Optional[int] = Field(
        default=None, description="Number of grid points in the y direction"
    )

    @model_validator(mode="after")
    def generate(self) -> "RegularGrid":
        """Generate the grid from the provided parameters."""
        keys = ["x0", "y0", "dx", "dy", "nx", "ny"]
        if None in [getattr(self, key) for key in keys]:
            raise ValueError(f"All of {','.join(keys)} must be provided for REG grid")
        # Ensure x, y 2D coordinates are defined
        return self

    @property
    def x(self) -> np.ndarray:
        x, y = self._gen_reg_cgrid()
        return x

    @property
    def y(self) -> np.ndarray:
        x, y = self._gen_reg_cgrid()
        return y

    def _attrs_from_xy(self):
        """Generate regular grid attributes from x, y coordinates."""
        self.ny, self.nx = self.x.shape
        self.x0 = self.x[0, 0]
        self.y0 = self.y[0, 0]
        self.rot = np.degrees(
            np.arctan2(self.y[0, 1] - self.y0, self.x[0, 1] - self.x0)
        )
        self.dx = np.sqrt((self.x[0, 1] - self.x0) ** 2 + (self.y[0, 1] - self.y0) ** 2)
        self.dy = np.sqrt((self.x[1, 0] - self.x0) ** 2 + (self.y[1, 0] - self.y0) ** 2)

    @property
    def xlen(self):
        return self.dx * (self.nx - 1)

    @property
    def ylen(self):
        return self.dy * (self.ny - 1)

    def _gen_reg_cgrid(self):
        # Grid at origin
        i = np.arange(0.0, self.dx * self.nx, self.dx)
        j = np.arange(0.0, self.dy * self.ny, self.dy)
        ii, jj = np.meshgrid(i, j)

        # Rotation
        alpha = -self.rot * np.pi / 180.0
        R = np.array([[np.cos(alpha), -np.sin(alpha)], [np.sin(alpha), np.cos(alpha)]])
        gg = np.dot(np.vstack([ii.ravel(), jj.ravel()]).T, R)

        # Translation
        x = gg[:, 0] + self.x0
        y = gg[:, 1] + self.y0

        x = np.reshape(x, ii.shape)
        y = np.reshape(y, ii.shape)
        return x, y

    def __eq__(self, other) -> bool:
        return (
            (self.nx == other.nx)
            & (self.ny == other.ny)
            & (self.rot == other.rot)
            & (self.x0 == other.x0)
            & (self.y0 == other.y0)
            & (self.dx == other.dx)
            & (self.dy == other.dy)
        )

    def __repr__(self):
        return f"{self.__class__.__name__}({self.nx}, {self.ny})"

    def __str__(self):
        return f"{self.__class__.__name__}({self.nx}, {self.ny})"

Attributes

grid_type class-attribute instance-attribute

grid_type: Literal['regular'] = Field('regular', description="Type of grid, must be 'regular'")

x0 class-attribute instance-attribute

x0: Optional[float] = Field(default=None, description='X coordinate of the grid origin')

y0 class-attribute instance-attribute

y0: Optional[float] = Field(default=None, description='Y coordinate of the grid origin')

rot class-attribute instance-attribute

rot: Optional[float] = Field(0.0, description='Rotation angle of the grid in degrees')

dx class-attribute instance-attribute

dx: Optional[float] = Field(default=None, description='Spacing between grid points in the x direction')

dy class-attribute instance-attribute

dy: Optional[float] = Field(default=None, description='Spacing between grid points in the y direction')

nx class-attribute instance-attribute

nx: Optional[int] = Field(default=None, description='Number of grid points in the x direction')

ny class-attribute instance-attribute

ny: Optional[int] = Field(default=None, description='Number of grid points in the y direction')

x property

x: ndarray

y property

y: ndarray

xlen property

xlen

ylen property

ylen

Functions

generate

generate() -> RegularGrid

Generate the grid from the provided parameters.

Source code in rompy/core/grid.py
@model_validator(mode="after")
def generate(self) -> "RegularGrid":
    """Generate the grid from the provided parameters."""
    keys = ["x0", "y0", "dx", "dy", "nx", "ny"]
    if None in [getattr(self, key) for key in keys]:
        raise ValueError(f"All of {','.join(keys)} must be provided for REG grid")
    # Ensure x, y 2D coordinates are defined
    return self

Data Components

Data objects represent and handle input data for models:

DataBlob

Bases: DataBase

Data source for model ingestion.

Generic data source for files that either need to be copied to the model directory or linked if link is set to True.

Source code in rompy/core/data.py
class DataBlob(DataBase):
    """Data source for model ingestion.

    Generic data source for files that either need to be copied to the model directory
    or linked if `link` is set to True.

    """

    model_type: Literal["data_blob", "data_link"] = Field(
        default="data_blob",
        description="Model type discriminator",
    )
    source: AnyPath = Field(
        description="URI of the data source, either a local file path or a remote uri",
    )
    link: bool = Field(
        default=False,
        description="Whether to create a symbolic link instead of copying the file",
    )
    _copied: str = PrivateAttr(default=None)

    def get(self, destdir: Union[str, Path], name: str = None, *args, **kwargs) -> Path:
        """Copy or link the data source to a new directory.

        Parameters
        ----------
        destdir : str | Path
            The destination directory to copy or link the data source to.

        Returns
        -------
        Path
            The path to the copied file or created symlink.
        """
        destdir = Path(destdir).resolve()

        if self.link:
            # Create a symbolic link
            if name:
                symlink_path = destdir / name
            else:
                symlink_path = destdir / self.source.name

            # Ensure the destination directory exists
            destdir.mkdir(parents=True, exist_ok=True)

            # Remove existing symlink/file if it exists
            if symlink_path.exists():
                symlink_path.unlink()

            # Compute the relative path from destdir to self.source
            relative_source_path = os.path.relpath(self.source.resolve(), destdir)

            # Create symlink
            os.symlink(relative_source_path, symlink_path)
            self._copied = symlink_path

            return symlink_path
        else:
            # Copy the data source
            if self.source.is_dir():
                # Copy directory
                outfile = copytree(self.source, destdir)
            else:
                if name:
                    outfile = destdir / name
                else:
                    outfile = destdir / self.source.name
                if outfile.resolve() != self.source.resolve():
                    outfile.write_bytes(self.source.read_bytes())
            self._copied = outfile
            return outfile

Attributes

model_type class-attribute instance-attribute

model_type: Literal['data_blob', 'data_link'] = Field(default='data_blob', description='Model type discriminator')

source class-attribute instance-attribute

source: AnyPath = Field(description='URI of the data source, either a local file path or a remote uri')
link: bool = Field(default=False, description='Whether to create a symbolic link instead of copying the file')

Functions

get

get(destdir: Union[str, Path], name: str = None, *args, **kwargs) -> Path

Copy or link the data source to a new directory.

Parameters

destdir : str | Path The destination directory to copy or link the data source to.

Returns

Path The path to the copied file or created symlink.

Source code in rompy/core/data.py
def get(self, destdir: Union[str, Path], name: str = None, *args, **kwargs) -> Path:
    """Copy or link the data source to a new directory.

    Parameters
    ----------
    destdir : str | Path
        The destination directory to copy or link the data source to.

    Returns
    -------
    Path
        The path to the copied file or created symlink.
    """
    destdir = Path(destdir).resolve()

    if self.link:
        # Create a symbolic link
        if name:
            symlink_path = destdir / name
        else:
            symlink_path = destdir / self.source.name

        # Ensure the destination directory exists
        destdir.mkdir(parents=True, exist_ok=True)

        # Remove existing symlink/file if it exists
        if symlink_path.exists():
            symlink_path.unlink()

        # Compute the relative path from destdir to self.source
        relative_source_path = os.path.relpath(self.source.resolve(), destdir)

        # Create symlink
        os.symlink(relative_source_path, symlink_path)
        self._copied = symlink_path

        return symlink_path
    else:
        # Copy the data source
        if self.source.is_dir():
            # Copy directory
            outfile = copytree(self.source, destdir)
        else:
            if name:
                outfile = destdir / name
            else:
                outfile = destdir / self.source.name
            if outfile.resolve() != self.source.resolve():
                outfile.write_bytes(self.source.read_bytes())
        self._copied = outfile
        return outfile

DataGrid

Bases: DataPoint

Data object for gridded source data.

Generic data object for xarray datasets that with gridded spatial dimensions

Note

The fields filter_grid and filter_time trigger updates to the crop filter from the grid and time range objects passed to the get method. This is useful for data sources that are not defined on the same grid as the model grid or the same time range as the model run.

Source code in rompy/core/data.py
class DataGrid(DataPoint):
    """Data object for gridded source data.

    Generic data object for xarray datasets that with gridded spatial dimensions

    Note
    ----
    The fields `filter_grid` and `filter_time` trigger updates to the crop filter from
    the grid and time range objects passed to the get method. This is useful for data
    sources that are not defined on the same grid as the model grid or the same time
    range as the model run.

    """

    model_type: Literal["grid"] = Field(
        default="grid",
        description="Model type discriminator",
    )
    source: Union[SOURCE_TYPES] = Field(
        description="Source reader, must return an xarray gridded dataset in the open method",
        discriminator="model_type",
    )

    def _filter_grid(self, grid: GRID_TYPES):
        """Define the filters to use to extract data to this grid"""
        x0, y0, x1, y1 = grid.bbox(buffer=self.buffer)
        self.filter.crop.update(
            {
                self.coords.x: Slice(start=x0, stop=x1),
                self.coords.y: Slice(start=y0, stop=y1),
            }
        )

    def _figsize(self, x0, x1, y0, y1, fscale):
        xlen = abs(x1 - x0)
        ylen = abs(y1 - y0)
        if xlen >= ylen:
            figsize = (fscale, (fscale * ylen / xlen or fscale) * 0.8)
        else:
            figsize = ((fscale * xlen / ylen) * 1.2 or fscale, fscale)
        return figsize

    def plot(
        self,
        param,
        isel={},
        model_grid=None,
        cmap="turbo",
        figsize=None,
        fscale=10,
        borders=True,
        land=True,
        coastline=True,
        **kwargs,
    ):
        """Plot the grid."""

        projection = ccrs.PlateCarree()
        transform = ccrs.PlateCarree()

        # Sanity checks
        try:
            ds = self.ds[param].isel(isel)
        except KeyError as err:
            raise ValueError(f"Parameter {param} not in dataset") from err

        if ds[self.coords.x].size <= 1:
            raise ValueError(f"Cannot plot {param} with only one x coordinate\n\n{ds}")
        if ds[self.coords.y].size <= 1:
            raise ValueError(f"Cannot plot {param} with only one y coordinate\n\n{ds}")

        # Set some plot parameters:
        x0 = ds[self.coords.x].values[0]
        y0 = ds[self.coords.y].values[0]
        x1 = ds[self.coords.x].values[-1]
        y1 = ds[self.coords.y].values[-1]

        # create figure and plot/map
        if figsize is None:
            figsize = self._figsize(x0, x1, y0, y1, fscale)
        fig = plt.figure(figsize=figsize)
        ax = fig.add_subplot(111, projection=projection)

        ds.plot.pcolormesh(ax=ax, cmap=cmap, **kwargs)

        if borders:
            ax.add_feature(cfeature.BORDERS)
        if land:
            ax.add_feature(cfeature.LAND, zorder=1)
        if coastline:
            ax.add_feature(cfeature.COASTLINE)

        ax.gridlines(
            crs=transform,
            draw_labels=["left", "bottom"],
            linewidth=1,
            color="gray",
            alpha=0.5,
            linestyle="--",
        )

        # Plot the model domain
        if model_grid:
            bx, by = model_grid.boundary_points()
            poly = plt.Polygon(list(zip(bx, by)), facecolor="r", alpha=0.05)
            ax.add_patch(poly)
            ax.plot(bx, by, lw=2, color="k")
        return fig, ax

Attributes

model_type class-attribute instance-attribute

model_type: Literal['grid'] = Field(default='grid', description='Model type discriminator')

source class-attribute instance-attribute

source: Union[SOURCE_TYPES] = Field(description='Source reader, must return an xarray gridded dataset in the open method', discriminator='model_type')

Functions

plot

plot(param, isel={}, model_grid=None, cmap='turbo', figsize=None, fscale=10, borders=True, land=True, coastline=True, **kwargs)

Plot the grid.

Source code in rompy/core/data.py
def plot(
    self,
    param,
    isel={},
    model_grid=None,
    cmap="turbo",
    figsize=None,
    fscale=10,
    borders=True,
    land=True,
    coastline=True,
    **kwargs,
):
    """Plot the grid."""

    projection = ccrs.PlateCarree()
    transform = ccrs.PlateCarree()

    # Sanity checks
    try:
        ds = self.ds[param].isel(isel)
    except KeyError as err:
        raise ValueError(f"Parameter {param} not in dataset") from err

    if ds[self.coords.x].size <= 1:
        raise ValueError(f"Cannot plot {param} with only one x coordinate\n\n{ds}")
    if ds[self.coords.y].size <= 1:
        raise ValueError(f"Cannot plot {param} with only one y coordinate\n\n{ds}")

    # Set some plot parameters:
    x0 = ds[self.coords.x].values[0]
    y0 = ds[self.coords.y].values[0]
    x1 = ds[self.coords.x].values[-1]
    y1 = ds[self.coords.y].values[-1]

    # create figure and plot/map
    if figsize is None:
        figsize = self._figsize(x0, x1, y0, y1, fscale)
    fig = plt.figure(figsize=figsize)
    ax = fig.add_subplot(111, projection=projection)

    ds.plot.pcolormesh(ax=ax, cmap=cmap, **kwargs)

    if borders:
        ax.add_feature(cfeature.BORDERS)
    if land:
        ax.add_feature(cfeature.LAND, zorder=1)
    if coastline:
        ax.add_feature(cfeature.COASTLINE)

    ax.gridlines(
        crs=transform,
        draw_labels=["left", "bottom"],
        linewidth=1,
        color="gray",
        alpha=0.5,
        linestyle="--",
    )

    # Plot the model domain
    if model_grid:
        bx, by = model_grid.boundary_points()
        poly = plt.Polygon(list(zip(bx, by)), facecolor="r", alpha=0.05)
        ax.add_patch(poly)
        ax.plot(bx, by, lw=2, color="k")
    return fig, ax

SourceBase

Bases: RompyBaseModel, ABC

Abstract base class for a source dataset.

Source code in rompy/core/source.py
class SourceBase(RompyBaseModel, ABC):
    """Abstract base class for a source dataset."""

    model_type: Literal["base_source"] = Field(
        description="Model type discriminator, must be overriden by a subclass",
    )

    @abstractmethod
    def _open(self) -> xr.Dataset:
        """This abstract private method should return a xarray dataset object."""
        pass

    @cached_property
    def coordinates(self) -> xr.Dataset:
        """Return the coordinates of the datasource."""
        return self.open().coords

    def open(self, variables: list = [], filters: Filter = {}, **kwargs) -> xr.Dataset:
        """Return the filtered dataset object.

        Parameters
        ----------
        variables : list, optional
            List of variables to select from the dataset.
        filters : Filter, optional
            Filters to apply to the dataset.

        Notes
        -----
        The kwargs are only a placeholder in case a subclass needs to pass additional
        arguments to the open method.

        """
        ds = self._open()
        if variables:
            try:
                ds = ds[variables]
            except KeyError as e:
                dataset_variables = list(ds.data_vars.keys())
                missing_variables = list(set(variables) - set(dataset_variables))
                raise ValueError(
                    f"Cannot find requested variables in dataset.\n\n"
                    f"Requested variables in the Data object: {variables}\n"
                    f"Available variables in source dataset: {dataset_variables}\n"
                    f"Missing variables: {missing_variables}\n\n"
                    f"Please check:\n"
                    f"1. The variable names in your Data object, make sure you check for default values\n"
                    f"2. The data source contains the expected variables\n"
                    f"3. If using a custom data source, ensure it creates variables with the correct names"
                ) from e
        if filters:
            ds = filters(ds)
        return ds

Attributes

model_type class-attribute instance-attribute

model_type: Literal['base_source'] = Field(description='Model type discriminator, must be overriden by a subclass')

coordinates cached property

coordinates: Dataset

Return the coordinates of the datasource.

Functions

open

open(variables: list = [], filters: Filter = {}, **kwargs) -> Dataset

Return the filtered dataset object.

Parameters

variables : list, optional List of variables to select from the dataset. filters : Filter, optional Filters to apply to the dataset.

Notes

The kwargs are only a placeholder in case a subclass needs to pass additional arguments to the open method.

Source code in rompy/core/source.py
def open(self, variables: list = [], filters: Filter = {}, **kwargs) -> xr.Dataset:
    """Return the filtered dataset object.

    Parameters
    ----------
    variables : list, optional
        List of variables to select from the dataset.
    filters : Filter, optional
        Filters to apply to the dataset.

    Notes
    -----
    The kwargs are only a placeholder in case a subclass needs to pass additional
    arguments to the open method.

    """
    ds = self._open()
    if variables:
        try:
            ds = ds[variables]
        except KeyError as e:
            dataset_variables = list(ds.data_vars.keys())
            missing_variables = list(set(variables) - set(dataset_variables))
            raise ValueError(
                f"Cannot find requested variables in dataset.\n\n"
                f"Requested variables in the Data object: {variables}\n"
                f"Available variables in source dataset: {dataset_variables}\n"
                f"Missing variables: {missing_variables}\n\n"
                f"Please check:\n"
                f"1. The variable names in your Data object, make sure you check for default values\n"
                f"2. The data source contains the expected variables\n"
                f"3. If using a custom data source, ensure it creates variables with the correct names"
            ) from e
    if filters:
        ds = filters(ds)
    return ds

SourceFile

Bases: SourceBase

Source dataset from file to open with xarray.open_dataset.

Source code in rompy/core/source.py
class SourceFile(SourceBase):
    """Source dataset from file to open with xarray.open_dataset."""

    model_type: Literal["file"] = Field(
        default="file",
        description="Model type discriminator",
    )
    uri: Union[str, Path] = Field(description="Path to the dataset")
    kwargs: dict = Field(
        default={},
        description="Keyword arguments to pass to xarray.open_dataset",
    )

    variable: Optional[str] = Field(
        default=None,
        description="Variable to select from the dataset",
    )

    # Enable arbitrary types for Path objects
    model_config = ConfigDict(arbitrary_types_allowed=True)

    def __str__(self) -> str:
        return f"SourceFile(uri={self.uri})"

    def _open(self) -> Union[xr.Dataset, xr.DataArray]:
        # Handle Path objects by using str() to ensure compatibility
        uri_str = str(self.uri) if isinstance(self.uri, Path) else self.uri
        if self.variable:
            # If a variable is specified, open the dataset and select the variable
            return xr.open_dataset(uri_str, **self.kwargs)[self.variable]
        else:
            return xr.open_dataset(uri_str, **self.kwargs)

Attributes

model_type class-attribute instance-attribute

model_type: Literal['file'] = Field(default='file', description='Model type discriminator')

uri class-attribute instance-attribute

uri: Union[str, Path] = Field(description='Path to the dataset')

kwargs class-attribute instance-attribute

kwargs: dict = Field(default={}, description='Keyword arguments to pass to xarray.open_dataset')

variable class-attribute instance-attribute

variable: Optional[str] = Field(default=None, description='Variable to select from the dataset')

model_config class-attribute instance-attribute

model_config = ConfigDict(arbitrary_types_allowed=True)

SourceIntake

Bases: SourceBase

Source dataset from intake catalog.

note

The intake catalog can be prescribed either by the URI of an existing catalog file or by a YAML string defining the catalog. The YAML string can be obtained from calling the yaml() method on an intake dataset instance.

Source code in rompy/core/source.py
class SourceIntake(SourceBase):
    """Source dataset from intake catalog.

    note
    ----
    The intake catalog can be prescribed either by the URI of an existing catalog file
    or by a YAML string defining the catalog. The YAML string can be obtained from
    calling the `yaml()` method on an intake dataset instance.

    """

    model_type: Literal["intake"] = Field(
        default="intake",
        description="Model type discriminator",
    )
    dataset_id: str = Field(description="The id of the dataset to read in the catalog")
    catalog_uri: Optional[str | Path] = Field(
        default=None,
        description="The URI of the catalog to read from",
    )
    catalog_yaml: Optional[str] = Field(
        default=None,
        description="The YAML string of the catalog to read from",
    )
    kwargs: dict = Field(
        default={},
        description="Keyword arguments to define intake dataset parameters",
    )

    @model_validator(mode="after")
    def check_catalog(self) -> "SourceIntake":
        if self.catalog_uri is None and self.catalog_yaml is None:
            raise ValueError("Either catalog_uri or catalog_yaml must be provided")
        elif self.catalog_uri is not None and self.catalog_yaml is not None:
            raise ValueError("Only one of catalog_uri or catalog_yaml can be provided")
        return self

    def __str__(self) -> str:
        return f"SourceIntake(catalog_uri={self.catalog_uri}, dataset_id={self.dataset_id})"

    @property
    def catalog(self) -> Catalog:
        """The intake catalog instance."""
        if self.catalog_uri:
            return intake.open_catalog(self.catalog_uri)
        else:
            fs = fsspec.filesystem("memory")
            fs_map = fs.get_mapper()
            fs_map["/temp.yaml"] = self.catalog_yaml.encode("utf-8")
            return YAMLFileCatalog("temp.yaml", fs=fs)

    def _open(self) -> xr.Dataset:
        return self.catalog[self.dataset_id](**self.kwargs).to_dask()

Attributes

model_type class-attribute instance-attribute

model_type: Literal['intake'] = Field(default='intake', description='Model type discriminator')

dataset_id class-attribute instance-attribute

dataset_id: str = Field(description='The id of the dataset to read in the catalog')

catalog_uri class-attribute instance-attribute

catalog_uri: Optional[str | Path] = Field(default=None, description='The URI of the catalog to read from')

catalog_yaml class-attribute instance-attribute

catalog_yaml: Optional[str] = Field(default=None, description='The YAML string of the catalog to read from')

kwargs class-attribute instance-attribute

kwargs: dict = Field(default={}, description='Keyword arguments to define intake dataset parameters')

catalog property

catalog: Catalog

The intake catalog instance.

Functions

check_catalog

check_catalog() -> SourceIntake
Source code in rompy/core/source.py
@model_validator(mode="after")
def check_catalog(self) -> "SourceIntake":
    if self.catalog_uri is None and self.catalog_yaml is None:
        raise ValueError("Either catalog_uri or catalog_yaml must be provided")
    elif self.catalog_uri is not None and self.catalog_yaml is not None:
        raise ValueError("Only one of catalog_uri or catalog_yaml can be provided")
    return self

Boundary Components

Boundary conditions specify model forcing at domain edges:

BoundaryWaveStation

Bases: DataBoundary

Wave boundary data from station datasets.

Note

The tolerance behaves differently with sel_methods idw and nearest; in idw sites with no enough neighbours within tolerance are masked whereas in nearest an exception is raised (see wavespectra documentation for more details).

Note

Be aware that when using idw missing values will be returned for sites with less than 2 neighbours within tolerance in the original dataset. This is okay for land mask areas but could cause boundary issues when on an open boundary location. To avoid this either use nearest or increase tolerance to include more neighbours.

Source code in rompy/core/boundary.py
class BoundaryWaveStation(DataBoundary):
    """Wave boundary data from station datasets.

    Note
    ----
    The `tolerance` behaves differently with sel_methods `idw` and `nearest`; in `idw`
    sites with no enough neighbours within `tolerance` are masked whereas in `nearest`
    an exception is raised (see wavespectra documentation for more details).

    Note
    ----
    Be aware that when using `idw` missing values will be returned for sites with less
    than 2 neighbours within `tolerance` in the original dataset. This is okay for land
    mask areas but could cause boundary issues when on an open boundary location. To
    avoid this either use `nearest` or increase `tolerance` to include more neighbours.

    """

    grid_type: Literal["boundary_wave_station"] = Field(
        default="boundary_wave_station",
        description="Model type discriminator",
    )
    source: Union[SOURCE_TYPES] = Field(
        description=(
            "Dataset source reader, must return a wavespectra-enabled "
            "xarray dataset in the open method"
        ),
        discriminator="model_type",
    )
    sel_method: Literal["idw", "nearest"] = Field(
        default="idw",
        description=(
            "Wavespectra method to use for selecting boundary points from the dataset"
        ),
    )
    buffer: float = Field(
        default=2.0,
        description="Space to buffer the grid bounding box if `filter_grid` is True",
    )

    def model_post_init(self, __context):
        self.variables = ["efth", "lon", "lat"]

    # @model_validator(mode="after")
    # def assert_has_wavespectra_accessor(self) -> "BoundaryWaveStation":
    #     dset = self.source.open()
    #     if not hasattr(dset, "spec"):
    #         raise ValueError(f"Wavespectra compatible source is required")
    #     return self

    def _source_grid_spacing(self, grid) -> float:
        """Return the lowest spacing between points in the source dataset."""
        # Select dataset points just outside the actual grid to optimise the search
        xbnd, ybnd = grid.boundary().exterior.coords.xy
        dx = np.diff(xbnd).min()
        dy = np.diff(ybnd).min()
        buffer = 2 * min(dx, dy)
        x0, y0, x1, y1 = grid.bbox(buffer=buffer)
        ds = self.ds.spec.sel([x0, x1], [y0, y1], method="bbox")
        # Return the closest distance between adjacent points in cropped dataset
        points = list(zip(ds.lon.values, ds.lat.values))
        return find_minimum_distance(points)

    def _set_spacing(self, grid) -> float:
        """Define spacing from the parent dataset if required."""
        if self.spacing == "parent":
            return self._source_grid_spacing(grid)
        else:
            return self.spacing

    def _boundary_points(self, grid) -> tuple:
        """Returns the x and y arrays representing the boundary points to select.

        Override the default method to use grid when setting the default spacing.

        """
        xbnd, ybnd = grid.boundary_points(spacing=self._set_spacing(grid))
        return xbnd, ybnd

    def _sel_boundary(self, grid) -> xr.Dataset:
        """Select the boundary points from the dataset."""
        xbnd, ybnd = self._boundary_points(grid=grid)
        ds = self.ds.spec.sel(
            lons=xbnd,
            lats=ybnd,
            method=self.sel_method,
            **self.sel_method_kwargs,
        )
        return ds

    @property
    def ds(self):
        """Return the filtered xarray dataset instance."""
        dset = super().ds
        if dset.efth.size == 0:
            raise ValueError(f"Empty dataset after applying filter {self.filter}")
        return dset

    def get(
        self, destdir: str | Path, grid: RegularGrid, time: Optional[TimeRange] = None
    ) -> str:
        """Write the selected boundary data to a netcdf file.

        Parameters
        ----------
        destdir : str | Path
            Destination directory for the netcdf file.
        grid : RegularGrid
            Grid instance to use for selecting the boundary points.
        time: TimeRange, optional
            The times to filter the data to, only used if `self.crop_data` is True.

        Returns
        -------
        outfile : Path
            Path to the netcdf file.

        """
        if self.crop_data:
            if time is not None:
                self._filter_time(time)
            if grid is not None:
                self._filter_grid(grid)
        ds = self._sel_boundary(grid)
        outfile = Path(destdir) / f"{self.id}.nc"
        ds.spec.to_netcdf(outfile)
        return outfile

Attributes

grid_type class-attribute instance-attribute

grid_type: Literal['boundary_wave_station'] = Field(default='boundary_wave_station', description='Model type discriminator')

source class-attribute instance-attribute

source: Union[SOURCE_TYPES] = Field(description='Dataset source reader, must return a wavespectra-enabled xarray dataset in the open method', discriminator='model_type')

sel_method class-attribute instance-attribute

sel_method: Literal['idw', 'nearest'] = Field(default='idw', description='Wavespectra method to use for selecting boundary points from the dataset')

buffer class-attribute instance-attribute

buffer: float = Field(default=2.0, description='Space to buffer the grid bounding box if `filter_grid` is True')

ds property

ds

Return the filtered xarray dataset instance.

Functions

model_post_init

model_post_init(__context)
Source code in rompy/core/boundary.py
def model_post_init(self, __context):
    self.variables = ["efth", "lon", "lat"]

get

get(destdir: str | Path, grid: RegularGrid, time: Optional[TimeRange] = None) -> str

Write the selected boundary data to a netcdf file.

Parameters

destdir : str | Path Destination directory for the netcdf file. grid : RegularGrid Grid instance to use for selecting the boundary points. time: TimeRange, optional The times to filter the data to, only used if self.crop_data is True.

Returns

outfile : Path Path to the netcdf file.

Source code in rompy/core/boundary.py
def get(
    self, destdir: str | Path, grid: RegularGrid, time: Optional[TimeRange] = None
) -> str:
    """Write the selected boundary data to a netcdf file.

    Parameters
    ----------
    destdir : str | Path
        Destination directory for the netcdf file.
    grid : RegularGrid
        Grid instance to use for selecting the boundary points.
    time: TimeRange, optional
        The times to filter the data to, only used if `self.crop_data` is True.

    Returns
    -------
    outfile : Path
        Path to the netcdf file.

    """
    if self.crop_data:
        if time is not None:
            self._filter_time(time)
        if grid is not None:
            self._filter_grid(grid)
    ds = self._sel_boundary(grid)
    outfile = Path(destdir) / f"{self.id}.nc"
    ds.spec.to_netcdf(outfile)
    return outfile

SourceWavespectra

Bases: SourceBase

Wavespectra dataset from wavespectra reader.

Source code in rompy/core/source.py
class SourceWavespectra(SourceBase):
    """Wavespectra dataset from wavespectra reader."""

    model_type: Literal["wavespectra"] = Field(
        default="wavespectra",
        description="Model type discriminator",
    )
    uri: str | Path = Field(description="Path to the dataset")
    reader: str = Field(
        description="Name of the wavespectra reader to use, e.g., read_swan",
    )
    kwargs: dict = Field(
        default={},
        description="Keyword arguments to pass to the wavespectra reader",
    )

    def __str__(self) -> str:
        return f"SourceWavespectra(uri={self.uri}, reader={self.reader})"

    def _open(self):
        return getattr(wavespectra, self.reader)(self.uri, **self.kwargs)

Attributes

model_type class-attribute instance-attribute

model_type: Literal['wavespectra'] = Field(default='wavespectra', description='Model type discriminator')

uri class-attribute instance-attribute

uri: str | Path = Field(description='Path to the dataset')

reader class-attribute instance-attribute

reader: str = Field(description='Name of the wavespectra reader to use, e.g., read_swan')

kwargs class-attribute instance-attribute

kwargs: dict = Field(default={}, description='Keyword arguments to pass to the wavespectra reader')

Spectrum Components

Spectral representations for wave models:

LogFrequency

Bases: RompyBaseModel

Logarithmic wave frequencies.

Frequencies are defined according to:

:math:f_{i+1} = \gamma * f_{i}

Note

The number of frequency bins nbin is always kept unchanged when provided. This implies other parameters may be adjusted so nbin bins can be defined. Specify f0, f1 and finc and let nbin be calculated to avoid those values changing.

Note

Choose finc=0.1 for a 10% increment between frequencies that satisfies the DIA.

Examples

.. ipython:: python :okwarning:

from rompy.core.spectrum import LogFrequency

LogFrequency(f0=0.04, f1=1.0, nbin=34)
LogFrequency(f0=0.04, f1=1.0, finc=0.1)
LogFrequency(f0=0.04, nbin=34, finc=0.1)
LogFrequency(f1=1.0, nbin=34, finc=0.1)
Source code in rompy/core/spectrum.py
class LogFrequency(RompyBaseModel):
    """Logarithmic wave frequencies.

    Frequencies are defined according to:

    :math:`f_{i+1} = \gamma * f_{i}`

    Note
    ----
    The number of frequency bins `nbin` is always kept unchanged when provided. This
    implies other parameters may be adjusted so `nbin` bins can be defined. Specify
    `f0`, `f1` and `finc` and let `nbin` be calculated to avoid those values changing.

    Note
    ----
    Choose `finc=0.1` for a 10% increment between frequencies that satisfies the DIA.

    Examples
    --------

    .. ipython:: python
        :okwarning:

        from rompy.core.spectrum import LogFrequency

        LogFrequency(f0=0.04, f1=1.0, nbin=34)
        LogFrequency(f0=0.04, f1=1.0, finc=0.1)
        LogFrequency(f0=0.04, nbin=34, finc=0.1)
        LogFrequency(f1=1.0, nbin=34, finc=0.1)

    """

    model_type: Literal["log", "LOG"] = Field(
        default="log", description="Model type discriminator"
    )
    f0: Optional[float] = Field(
        default=None, description="Lower frequency boundary (Hz)", gt=0.0
    )
    f1: Optional[float] = Field(
        default=None, description="Upper frequency boundary (Hz)"
    )
    finc: Optional[float] = Field(
        default=None, description="Log frequency increment", gt=0.0
    )
    nbin: Optional[int] = Field(
        default=None,
        description="Number of frequency bins, one less the size of frequency array",
        gt=0,
    )

    @model_validator(mode="after")
    def init_options(self) -> "LogFrequency":
        """Set the missing frequency parameters."""
        if sum([v is not None for v in [self.f0, self.f1, self.finc, self.nbin]]) != 3:
            raise ValueError("Three (only) of (f0, f1, finc, nbin) must be provided")

        # Calculate the missing frequency parameters
        if self.finc is None:
            self.finc = self._finc()
        elif self.nbin is None:
            self.nbin = self._nbin(self.f0, self.f1, self.finc)
        elif self.f1 is None:
            self.f1 = self.f0 * self.gamma**self.nbin
        else:
            self.f0 = self._f0(self.f1, self.nbin, self.gamma)

        # Redefine parameters based on the calculated values
        self.f0 = self()[0]
        self.f1 = self()[-1]
        self.finc = self._finc()
        self.nbin = len(self()) - 1

        return self

    def __call__(self) -> Np1DArray:
        """Frequency array."""
        return np.geomspace(self.f0, self.f1, self.nf)

    def __getitem__(self, index) -> float | list[float]:
        """Slicing from the frequency array."""
        return self.__call__()[index]

    def __len__(self):
        """Returns the length of the frequency array."""
        return len(self())

    def _finc(self):
        return (self()[1] - self()[0]) / self()[0]

    def _nbin(self, f0, f1, finc):
        return np.round(np.log(f1 / f0) / np.log(1 + finc)).astype("int")

    def _f0(self, f1, nbin, gamma):
        """Returns f0 given f1, nbin and gamma."""
        freqs = [f1]
        for n in range(nbin):
            freqs.append(freqs[-1] / gamma)
        return freqs[-1]

    @property
    def nf(self):
        return self.nbin + 1

    @property
    def gamma(self):
        return self.finc + 1

    @property
    def flen(self):
        return self.f1 - self.f0

Attributes

model_type class-attribute instance-attribute

model_type: Literal['log', 'LOG'] = Field(default='log', description='Model type discriminator')

f0 class-attribute instance-attribute

f0: Optional[float] = Field(default=None, description='Lower frequency boundary (Hz)', gt=0.0)

f1 class-attribute instance-attribute

f1: Optional[float] = Field(default=None, description='Upper frequency boundary (Hz)')

finc class-attribute instance-attribute

finc: Optional[float] = Field(default=None, description='Log frequency increment', gt=0.0)

nbin class-attribute instance-attribute

nbin: Optional[int] = Field(default=None, description='Number of frequency bins, one less the size of frequency array', gt=0)

nf property

nf

gamma property

gamma

flen property

flen

Functions

init_options

init_options() -> LogFrequency

Set the missing frequency parameters.

Source code in rompy/core/spectrum.py
@model_validator(mode="after")
def init_options(self) -> "LogFrequency":
    """Set the missing frequency parameters."""
    if sum([v is not None for v in [self.f0, self.f1, self.finc, self.nbin]]) != 3:
        raise ValueError("Three (only) of (f0, f1, finc, nbin) must be provided")

    # Calculate the missing frequency parameters
    if self.finc is None:
        self.finc = self._finc()
    elif self.nbin is None:
        self.nbin = self._nbin(self.f0, self.f1, self.finc)
    elif self.f1 is None:
        self.f1 = self.f0 * self.gamma**self.nbin
    else:
        self.f0 = self._f0(self.f1, self.nbin, self.gamma)

    # Redefine parameters based on the calculated values
    self.f0 = self()[0]
    self.f1 = self()[-1]
    self.finc = self._finc()
    self.nbin = len(self()) - 1

    return self

Architecture Patterns

Configuration Validation

All Rompy configurations use Pydantic models, providing type safety and validation:

  • Automatic validation of configuration parameters
  • Clear error messages for invalid configurations
  • Serialization/deserialization capabilities for reproducibility

Plugin Architecture

Rompy's plugin system allows for extensibility:

  • Model configurations via rompy.config entry points
  • Execution backends via rompy.run entry points
  • Post-processors via rompy.postprocess entry points

Backend Abstraction

Execution backends abstract the computational environment:

  • Local execution for development
  • Docker execution for containerized workflows
  • HPC execution for high-performance computing
  • Cloud execution for scalable computing

Best Practices

Configuration Design

  1. Use Type Safety: Leverage Pydantic models for configuration validation
  2. Modular Configuration: Keep components modular and reusable
  3. Serialization: Ensure configurations are fully serializable for reproducibility
  4. Documentation: Document configuration options and default values

Model Integration

  1. Template-based Generation: Use cookiecutter templates for model input generation
  2. Environment Agnostic: Design models to run in different computational environments
  3. Data Abstraction: Abstract data sources to support multiple input formats

Next Steps