Skip to content

Source

source

Rompy source objects.

Attributes

logger module-attribute

logger = get_logger(__name__)

Classes

SourceBase

Bases: RompyBaseModel, ABC

Abstract base class for a source dataset.

Source code in rompy/core/source.py
class SourceBase(RompyBaseModel, ABC):
    """Abstract base class for a source dataset."""

    model_type: Literal["base_source"] = Field(
        description="Model type discriminator, must be overriden by a subclass",
    )

    @abstractmethod
    def _open(self) -> xr.Dataset:
        """This abstract private method should return a xarray dataset object."""
        pass

    @cached_property
    def coordinates(self) -> xr.Dataset:
        """Return the coordinates of the datasource."""
        return self.open().coords

    def open(self, variables: list = [], filters: Filter = {}, **kwargs) -> xr.Dataset:
        """Return the filtered dataset object.

        Parameters
        ----------
        variables : list, optional
            List of variables to select from the dataset.
        filters : Filter, optional
            Filters to apply to the dataset.

        Notes
        -----
        The kwargs are only a placeholder in case a subclass needs to pass additional
        arguments to the open method.

        """
        ds = self._open()
        if variables:
            try:
                ds = ds[variables]
            except KeyError as e:
                dataset_variables = list(ds.data_vars.keys())
                missing_variables = list(set(variables) - set(dataset_variables))
                raise ValueError(
                    f"Cannot find requested variables in dataset.\n\n"
                    f"Requested variables in the Data object: {variables}\n"
                    f"Available variables in source dataset: {dataset_variables}\n"
                    f"Missing variables: {missing_variables}\n\n"
                    f"Please check:\n"
                    f"1. The variable names in your Data object, make sure you check for default values\n"
                    f"2. The data source contains the expected variables\n"
                    f"3. If using a custom data source, ensure it creates variables with the correct names"
                ) from e
        if filters:
            ds = filters(ds)
        return ds
Attributes
model_type class-attribute instance-attribute
model_type: Literal['base_source'] = Field(description='Model type discriminator, must be overriden by a subclass')
coordinates cached property
coordinates: Dataset

Return the coordinates of the datasource.

Functions
open
open(variables: list = [], filters: Filter = {}, **kwargs) -> Dataset

Return the filtered dataset object.

Parameters

variables : list, optional List of variables to select from the dataset. filters : Filter, optional Filters to apply to the dataset.

Notes

The kwargs are only a placeholder in case a subclass needs to pass additional arguments to the open method.

Source code in rompy/core/source.py
def open(self, variables: list = [], filters: Filter = {}, **kwargs) -> xr.Dataset:
    """Return the filtered dataset object.

    Parameters
    ----------
    variables : list, optional
        List of variables to select from the dataset.
    filters : Filter, optional
        Filters to apply to the dataset.

    Notes
    -----
    The kwargs are only a placeholder in case a subclass needs to pass additional
    arguments to the open method.

    """
    ds = self._open()
    if variables:
        try:
            ds = ds[variables]
        except KeyError as e:
            dataset_variables = list(ds.data_vars.keys())
            missing_variables = list(set(variables) - set(dataset_variables))
            raise ValueError(
                f"Cannot find requested variables in dataset.\n\n"
                f"Requested variables in the Data object: {variables}\n"
                f"Available variables in source dataset: {dataset_variables}\n"
                f"Missing variables: {missing_variables}\n\n"
                f"Please check:\n"
                f"1. The variable names in your Data object, make sure you check for default values\n"
                f"2. The data source contains the expected variables\n"
                f"3. If using a custom data source, ensure it creates variables with the correct names"
            ) from e
    if filters:
        ds = filters(ds)
    return ds

SourceFile

Bases: SourceBase

Source dataset from file to open with xarray.open_dataset.

Source code in rompy/core/source.py
class SourceFile(SourceBase):
    """Source dataset from file to open with xarray.open_dataset."""

    model_type: Literal["file"] = Field(
        default="file",
        description="Model type discriminator",
    )
    uri: Union[str, Path] = Field(description="Path to the dataset")
    kwargs: dict = Field(
        default={},
        description="Keyword arguments to pass to xarray.open_dataset",
    )

    variable: Optional[str] = Field(
        default=None,
        description="Variable to select from the dataset",
    )

    # Enable arbitrary types for Path objects
    model_config = ConfigDict(arbitrary_types_allowed=True)

    def __str__(self) -> str:
        return f"SourceFile(uri={self.uri})"

    def _open(self) -> Union[xr.Dataset, xr.DataArray]:
        # Handle Path objects by using str() to ensure compatibility
        uri_str = str(self.uri) if isinstance(self.uri, Path) else self.uri
        if self.variable:
            # If a variable is specified, open the dataset and select the variable
            return xr.open_dataset(uri_str, **self.kwargs)[self.variable]
        else:
            return xr.open_dataset(uri_str, **self.kwargs)
Attributes
model_type class-attribute instance-attribute
model_type: Literal['file'] = Field(default='file', description='Model type discriminator')
uri class-attribute instance-attribute
uri: Union[str, Path] = Field(description='Path to the dataset')
kwargs class-attribute instance-attribute
kwargs: dict = Field(default={}, description='Keyword arguments to pass to xarray.open_dataset')
variable class-attribute instance-attribute
variable: Optional[str] = Field(default=None, description='Variable to select from the dataset')
model_config class-attribute instance-attribute
model_config = ConfigDict(arbitrary_types_allowed=True)

SourceIntake

Bases: SourceBase

Source dataset from intake catalog.

note

The intake catalog can be prescribed either by the URI of an existing catalog file or by a YAML string defining the catalog. The YAML string can be obtained from calling the yaml() method on an intake dataset instance.

Source code in rompy/core/source.py
class SourceIntake(SourceBase):
    """Source dataset from intake catalog.

    note
    ----
    The intake catalog can be prescribed either by the URI of an existing catalog file
    or by a YAML string defining the catalog. The YAML string can be obtained from
    calling the `yaml()` method on an intake dataset instance.

    """

    model_type: Literal["intake"] = Field(
        default="intake",
        description="Model type discriminator",
    )
    dataset_id: str = Field(description="The id of the dataset to read in the catalog")
    catalog_uri: Optional[str | Path] = Field(
        default=None,
        description="The URI of the catalog to read from",
    )
    catalog_yaml: Optional[str] = Field(
        default=None,
        description="The YAML string of the catalog to read from",
    )
    kwargs: dict = Field(
        default={},
        description="Keyword arguments to define intake dataset parameters",
    )

    @model_validator(mode="after")
    def check_catalog(self) -> "SourceIntake":
        if self.catalog_uri is None and self.catalog_yaml is None:
            raise ValueError("Either catalog_uri or catalog_yaml must be provided")
        elif self.catalog_uri is not None and self.catalog_yaml is not None:
            raise ValueError("Only one of catalog_uri or catalog_yaml can be provided")
        return self

    def __str__(self) -> str:
        return f"SourceIntake(catalog_uri={self.catalog_uri}, dataset_id={self.dataset_id})"

    @property
    def catalog(self) -> Catalog:
        """The intake catalog instance."""
        if self.catalog_uri:
            return intake.open_catalog(self.catalog_uri)
        else:
            fs = fsspec.filesystem("memory")
            fs_map = fs.get_mapper()
            fs_map["/temp.yaml"] = self.catalog_yaml.encode("utf-8")
            return YAMLFileCatalog("temp.yaml", fs=fs)

    def _open(self) -> xr.Dataset:
        return self.catalog[self.dataset_id](**self.kwargs).to_dask()
Attributes
model_type class-attribute instance-attribute
model_type: Literal['intake'] = Field(default='intake', description='Model type discriminator')
dataset_id class-attribute instance-attribute
dataset_id: str = Field(description='The id of the dataset to read in the catalog')
catalog_uri class-attribute instance-attribute
catalog_uri: Optional[str | Path] = Field(default=None, description='The URI of the catalog to read from')
catalog_yaml class-attribute instance-attribute
catalog_yaml: Optional[str] = Field(default=None, description='The YAML string of the catalog to read from')
kwargs class-attribute instance-attribute
kwargs: dict = Field(default={}, description='Keyword arguments to define intake dataset parameters')
catalog property
catalog: Catalog

The intake catalog instance.

Functions
check_catalog
check_catalog() -> SourceIntake
Source code in rompy/core/source.py
@model_validator(mode="after")
def check_catalog(self) -> "SourceIntake":
    if self.catalog_uri is None and self.catalog_yaml is None:
        raise ValueError("Either catalog_uri or catalog_yaml must be provided")
    elif self.catalog_uri is not None and self.catalog_yaml is not None:
        raise ValueError("Only one of catalog_uri or catalog_yaml can be provided")
    return self

SourceDatamesh

Bases: SourceBase

Source dataset from Datamesh.

Datamesh documentation: https://docs.oceanum.io/datamesh/index.html

Source code in rompy/core/source.py
class SourceDatamesh(SourceBase):
    """Source dataset from Datamesh.

    Datamesh documentation: https://docs.oceanum.io/datamesh/index.html

    """

    model_type: Literal["datamesh"] = Field(
        default="datamesh",
        description="Model type discriminator",
    )
    datasource: str = Field(
        description="The id of the datasource on Datamesh",
    )
    token: Optional[str] = Field(
        default=None,
        description="Datamesh API token, taken from the environment if not provided",
    )
    kwargs: dict = Field(
        default={},
        description="Keyword arguments to pass to `oceanum.datamesh.Connector`",
    )

    _query_cache: dict = PrivateAttr(default_factory=dict)

    def __str__(self) -> str:
        return f"SourceDatamesh(datasource={self.datasource})"

    @cached_property
    def connector(self) -> Connector:
        """The Datamesh connector instance."""
        return Connector(token=self.token, **self.kwargs)

    @cached_property
    def coordinates(self) -> xr.Dataset:
        """Return the coordinates of the datasource."""
        return self._open(variables=[], geofilter=None, timefilter=None).coords

    def _geofilter(self, filters: Filter, coords: DatasetCoords) -> dict:
        """The Datamesh geofilter."""
        xslice = filters.crop.get(coords.x)
        yslice = filters.crop.get(coords.y)
        if xslice is None or yslice is None:
            logger.warning(
                f"No slices found for x={coords.x} and/or y={coords.y} in the crop "
                f"filter {filters.crop}, cannot define a geofilter for querying"
            )
            return None

        x0 = min(xslice.start, xslice.stop)
        x1 = max(xslice.start, xslice.stop)
        y0 = min(yslice.start, yslice.stop)
        y1 = max(yslice.start, yslice.stop)
        return dict(type="bbox", geom=[x0, y0, x1, y1])

    def _timefilter(self, filters: Filter, coords: DatasetCoords) -> dict:
        """The Datamesh timefilter."""
        tslice = filters.crop.get(coords.t)
        if tslice is None:
            logger.info(
                f"No time slice found in the crop filter {filters.crop}, "
                "cannot define a timefilter for querying datamesh"
            )
            return None
        return dict(type="range", times=[tslice.start, tslice.stop])

    def _open(self, variables: list, geofilter: dict, timefilter: dict) -> xr.Dataset:
        query = dict(
            datasource=self.datasource,
            variables=variables,
            geofilter=geofilter,
            timefilter=timefilter,
        )
        query_key = dict_to_key(query)
        if query_key in self._query_cache:
            logger.info("Using cached Datamesh query result")
        else:
            logger.info(f"Querying Datamesh datasource '{self.datasource}'")
            logger.debug(f"Datamesh query: {query}")
            self._query_cache[query_key] = self.connector.query(query)
        return self._query_cache[query_key]

    def open(
        self, filters: Filter, coords: DatasetCoords, variables: list = []
    ) -> xr.Dataset:
        """Returns the filtered dataset object.

        This method is overriden from the base class because the crop filters need to
        be converted to a geofilter and timefilter for querying Datamesh.

        """
        ds = self._open(
            variables=variables,
            geofilter=self._geofilter(filters, coords),
            timefilter=self._timefilter(filters, coords),
        )
        if filters:
            ds = filters(ds)
        return ds
Attributes
model_type class-attribute instance-attribute
model_type: Literal['datamesh'] = Field(default='datamesh', description='Model type discriminator')
datasource class-attribute instance-attribute
datasource: str = Field(description='The id of the datasource on Datamesh')
token class-attribute instance-attribute
token: Optional[str] = Field(default=None, description='Datamesh API token, taken from the environment if not provided')
kwargs class-attribute instance-attribute
kwargs: dict = Field(default={}, description='Keyword arguments to pass to `oceanum.datamesh.Connector`')
connector cached property
connector: Connector

The Datamesh connector instance.

coordinates cached property
coordinates: Dataset

Return the coordinates of the datasource.

Functions
open
open(filters: Filter, coords: DatasetCoords, variables: list = []) -> Dataset

Returns the filtered dataset object.

This method is overriden from the base class because the crop filters need to be converted to a geofilter and timefilter for querying Datamesh.

Source code in rompy/core/source.py
def open(
    self, filters: Filter, coords: DatasetCoords, variables: list = []
) -> xr.Dataset:
    """Returns the filtered dataset object.

    This method is overriden from the base class because the crop filters need to
    be converted to a geofilter and timefilter for querying Datamesh.

    """
    ds = self._open(
        variables=variables,
        geofilter=self._geofilter(filters, coords),
        timefilter=self._timefilter(filters, coords),
    )
    if filters:
        ds = filters(ds)
    return ds

SourceWavespectra

Bases: SourceBase

Wavespectra dataset from wavespectra reader.

Source code in rompy/core/source.py
class SourceWavespectra(SourceBase):
    """Wavespectra dataset from wavespectra reader."""

    model_type: Literal["wavespectra"] = Field(
        default="wavespectra",
        description="Model type discriminator",
    )
    uri: str | Path = Field(description="Path to the dataset")
    reader: str = Field(
        description="Name of the wavespectra reader to use, e.g., read_swan",
    )
    kwargs: dict = Field(
        default={},
        description="Keyword arguments to pass to the wavespectra reader",
    )

    def __str__(self) -> str:
        return f"SourceWavespectra(uri={self.uri}, reader={self.reader})"

    def _open(self):
        return getattr(wavespectra, self.reader)(self.uri, **self.kwargs)
Attributes
model_type class-attribute instance-attribute
model_type: Literal['wavespectra'] = Field(default='wavespectra', description='Model type discriminator')
uri class-attribute instance-attribute
uri: str | Path = Field(description='Path to the dataset')
reader class-attribute instance-attribute
reader: str = Field(description='Name of the wavespectra reader to use, e.g., read_swan')
kwargs class-attribute instance-attribute
kwargs: dict = Field(default={}, description='Keyword arguments to pass to the wavespectra reader')

SourceTimeseriesCSV

Bases: SourceBase

Timeseries source class from CSV file.

This class should return a timeseries from a CSV file. The dataset variables are defined from the column headers, therefore the appropriate read_csv kwargs must be passed to allow defining the columns. The time index is defined from column name identified by the tcol field.

Source code in rompy/core/source.py
class SourceTimeseriesCSV(SourceBase):
    """Timeseries source class from CSV file.

    This class should return a timeseries from a CSV file. The dataset variables are
    defined from the column headers, therefore the appropriate read_csv kwargs must be
    passed to allow defining the columns. The time index is defined from column name
    identified by the tcol field.

    """

    model_type: Literal["csv"] = Field(
        default="csv",
        description="Model type discriminator",
    )
    filename: str | Path = Field(description="Path to the csv file")
    tcol: str = Field(
        default="time",
        description="Name of the column containing the time data",
    )
    read_csv_kwargs: dict = Field(
        default={},
        description="Keyword arguments to pass to pandas.read_csv",
    )

    @model_validator(mode="after")
    def validate_kwargs(self) -> "SourceTimeseriesCSV":
        """Validate the keyword arguments."""
        if "parse_dates" not in self.read_csv_kwargs:
            self.read_csv_kwargs["parse_dates"] = [self.tcol]
        if "index_col" not in self.read_csv_kwargs:
            self.read_csv_kwargs["index_col"] = self.tcol
        return self

    def _open_dataframe(self) -> pd.DataFrame:
        """Read the data from the csv file."""
        return pd.read_csv(self.filename, **self.read_csv_kwargs)

    def _open(self) -> xr.Dataset:
        """Interpolate the xyz data onto a regular grid."""
        df = self._open_dataframe()
        ds = xr.Dataset.from_dataframe(df).rename({self.tcol: "time"})
        return ds
Attributes
model_type class-attribute instance-attribute
model_type: Literal['csv'] = Field(default='csv', description='Model type discriminator')
filename class-attribute instance-attribute
filename: str | Path = Field(description='Path to the csv file')
tcol class-attribute instance-attribute
tcol: str = Field(default='time', description='Name of the column containing the time data')
read_csv_kwargs class-attribute instance-attribute
read_csv_kwargs: dict = Field(default={}, description='Keyword arguments to pass to pandas.read_csv')
Functions
validate_kwargs
validate_kwargs() -> SourceTimeseriesCSV

Validate the keyword arguments.

Source code in rompy/core/source.py
@model_validator(mode="after")
def validate_kwargs(self) -> "SourceTimeseriesCSV":
    """Validate the keyword arguments."""
    if "parse_dates" not in self.read_csv_kwargs:
        self.read_csv_kwargs["parse_dates"] = [self.tcol]
    if "index_col" not in self.read_csv_kwargs:
        self.read_csv_kwargs["index_col"] = self.tcol
    return self

Functions

render_datetimes

render_datetimes(dict_obj: dict) -> dict

Convert datetime objects in a dictionary to ISO format strings.

Parameters

dict_obj : dict The dictionary to process.

Returns

dict The processed dictionary with datetime objects converted to strings.

Source code in rompy/core/source.py
def render_datetimes(dict_obj: dict) -> dict:
    """Convert datetime objects in a dictionary to ISO format strings.

    Parameters
    ----------
    dict_obj : dict
        The dictionary to process.

    Returns
    -------
    dict
        The processed dictionary with datetime objects converted to strings.
    """
    for key, value in dict_obj.items():
        if isinstance(value, dict):
            dict_obj[key] = render_datetimes(value)
        if isinstance(value, list):
            for i, item in enumerate(value):
                if isinstance(item, dict):
                    value[i] = render_datetimes(item)
                elif hasattr(item, "isoformat"):
                    value[i] = item.isoformat()
        elif hasattr(value, "isoformat"):
            dict_obj[key] = value.isoformat()
    return dict_obj

dict_to_key

dict_to_key(data: dict) -> str

Convert a dictionary to a hashable key string.

This function converts a dictionary uniquely to a string which can be used as a hashable key.

Parameters

data : dict The dictionary to convert.

Returns

str A string representation of the sorted dictionary items.

Source code in rompy/core/source.py
def dict_to_key(data: dict) -> str:
    """Convert a dictionary to a hashable key string.

    This function converts a dictionary uniquely to a string
    which can be used as a hashable key.

    Parameters
    ----------
    data : dict
        The dictionary to convert.

    Returns
    -------
    str
        A string representation of the sorted dictionary items.
    """
    data = render_datetimes(data)
    unique_str = json.dumps(data, sort_keys=True)
    hash_str = hashlib.sha256(unique_str.encode()).hexdigest()
    return hash_str