Skip to content

Data API

Grid

SwanGrid

Bases: RegularGrid

Regular SWAN grid in geographic space.

Source code in src/rompy_swan/grid.py
class SwanGrid(RegularGrid):
    """Regular SWAN grid in geographic space."""

    grid_type: Literal["REG", "CURV"] = Field(
        "REG", description="Type of grid (REG=regular, CURV=curvilinear)"
    )
    exc: Optional[float] = Field(None, description="Missing value")
    gridfile: Optional[str] = Field(
        None, description="Name of grid file to load", max_length=36
    )

    @field_validator("grid_type")
    @classmethod
    def validate_grid_type(cls, v):
        if v not in ["REG", "CURV"]:
            raise ValueError("grid_type must be one of REG or CURV")
        return v

    @model_validator(mode="after")
    def validate_curvilinear_grid(self) -> "SwanGrid":
        if self.grid_type == "CURV" and self.gridfile is None:
            raise ValueError("gridfile must be provided for CURV grid")
        return self

    def _regen_grid(self):
        if self.grid_type == "REG":
            _x, _y = self._gen_reg_cgrid()
        elif self.grid_type == "CURV":
            _x, _y = self._gen_curv_cgrid()
        self.x = _x
        self.y = _y

    def _gen_curv_cgrid(self):
        """loads a SWAN curvilinear grid and returns cgrid lat/lons and
        command to be used in SWAN contol file. The Default grid is one I made using
        Deltares' RGFGrid tool and converted to a SWAN-friendly formate using Deltares
        OpenEarth code "swan_io_grd.m"

        """
        # number of grid cells in the 'x' and 'y' directions:
        # (you can get this from d3d_qp.m or other Deltares OpenEarth code)
        nX = self.nx
        nY = self.ny

        grid_Data = open(self.gridpath).readlines()
        ix = grid_Data.index("x-coordinates\n")
        iy = grid_Data.index("y-coordinates\n")
        lons = []
        lats = []
        for idx in np.arange(ix + 1, iy):
            lons.append(re.sub("\n", "", grid_Data[idx]).split())
        for idx in np.arange(iy + 1, len(grid_Data)):
            lats.append(re.sub("\n", "", grid_Data[idx]).split())

        def flatten(l):
            return [item for sublist in l for item in sublist]

        lons = np.array(flatten(lons)).astype(np.float)
        lats = np.array(flatten(lats)).astype(np.float)

        x = np.reshape(lats, (nX, nY))
        y = np.reshape(lons, (nX, nY))

        return x, y

    @property
    def inpgrid(self):
        if self.grid_type == "REG":
            inpstr = f"REG {self.x0} {self.y0} {self.rot} {self.nx-1:0.0f} {self.ny-1:0.0f} {self.dx} {self.dy}"
            if self.exc is not None:
                inpstr += f" EXC {self.exc}"
            return inpstr
        elif self.grid_type == "CURV":
            raise NotImplementedError("Curvilinear grids not supported yet")
            # return f'CURVilinear {self.nx-1:0.0f} {self.ny-1:0.0f} \nREADGRID COOR 1 \'{os.path.basename(self.gridpath)}\' 1 0 1 FREE'

    @property
    def cgrid(self):
        if self.grid_type == "REG":
            return f"REG {self.x0} {self.y0} {self.rot} {self.xlen} {self.ylen} {self.nx-1:0.0f} {self.ny-1:0.0f}"
        elif self.grid_type == "CURV":
            raise NotImplementedError("Curvilinear grids not supported yet")
            # return (f'CURVilinear {self.nx-1:0.0f} {self.ny-1:0.0f}',f'READGRID COOR 1 \'{os.path.basename(self.gridpath)}\' 1 0 1 FREE')

    @property
    def cgrid_read(self):
        if self.grid_type == "REG":
            return ""
        elif self.grid_type == "CURV":
            raise NotImplementedError("Curvilinear grids not supported yet")
            # return f'READGRID COOR 1 \'{os.path.basename(self.gridpath)}\' 1 0 1 FREE'

    @property
    def component(self):
        """Return the respective SWAN component for this grid."""
        if self.grid_type == "REG":
            return GRIDREGULAR(
                xp=self.x0,
                yp=self.y0,
                alp=self.rot,
                xlen=self.xlen,
                ylen=self.ylen,
                mx=self.nx - 1,
                my=self.ny - 1,
            )
        else:
            raise NotImplementedError("Only regular grid is currently supported")

    def __call__(self):
        output = f"CGRID {self.cgrid} CIRCLE 36 0.0464 1. 31\n"
        output += f"{self.cgrid_read}\n"
        return output

    def boundary(self, *args, **kwargs) -> tuple:
        """Returns the grid boundary polygon.

        Override the parent method to use the actual points from the regular grid
        boundary instead of the convex hull which is not always the boundary.

        """
        x = np.concatenate(
            [self.x[0, :], self.x[1:, -1], self.x[-1, -2::-1], self.x[-2::-1, 0]]
        )
        y = np.concatenate(
            [self.y[0, :], self.y[1:, -1], self.y[-1, -2::-1], self.y[-2::-1, 0]]
        )
        return Polygon(zip(x, y))

    def nearby_spectra(self, ds_spec, dist_thres=0.05, plot=True):
        """Find points nearby and project to the boundary

        Parameters
        ----------
        ds_spec: xarray.Dataset
            an XArray dataset of wave spectra at a number of points.
            Dataset variable names standardised using wavespectra.read_*
            functions.

            See https://wavespectra.readthedocs.io/en/latest/api.html#input-functions
        dist_thres: float, optional [Default: 0.05]
            Maximum distance to translate the input spectra to the grid boundary
        plot: boolean, optional [Default: True]
            Generate a plot that shows the applied projections

        Returns
        -------
        xarray.Dataset
            A subset of ds_spec with lat and lon coordinates projected to the boundary
        """

        bbox = self.bbox(buffer=dist_thres)
        minLon, minLat, maxLon, maxLat = bbox

        inds = np.where(
            (ds_spec.lon > minLon)
            & (ds_spec.lon < maxLon)
            & (ds_spec.lat > minLat)
            & (ds_spec.lat < maxLat)
        )[0]
        ds_spec = ds_spec.isel(site=inds)

        # Work out the closest spectral points
        def _nearestPointOnLine(p1, p2, p3):
            # calculate the distance of p3 from the line between p1 and p2 and return
            # the closest point on the line

            from math import fabs, sqrt

            a = p2[1] - p1[1]
            b = -1.0 * (p2[0] - p1[0])
            c = p2[0] * p1[1] - p2[1] * p1[0]

            dist = fabs(a * p3[0] + b * p3[1] + c) / sqrt(a**2 + b**2)
            x = (b * (b * p3[0] - a * p3[1]) - a * c) / (a**2 + b**2)
            y = (a * (-b * p3[0] + a * p3[1]) - b * c) / (a**2 + b**2)

            return dist, x, y

        bx, by = self.boundary_points()
        pol = np.stack([bx, by])

        # Spectra points
        ds_spec.lon.load()
        ds_spec.lat.load()
        ds_spec["lon_original"] = ds_spec["lon"]
        ds_spec["lat_original"] = ds_spec["lat"]
        p3s = list(zip(ds_spec.lon.values, ds_spec.lat.values))

        if plot:
            fig, ax = self.plot()
            ax.scatter(ds_spec.lon, ds_spec.lat)

        specPoints = []
        for i in range(pol.shape[1] - 1):
            p1 = pol[:, i]
            p2 = pol[:, i + 1]
            np.stack((p1, p2))
            output = np.array(
                list(map(lambda xi: _nearestPointOnLine(p1, p2, xi), p3s))
            )
            dists = output[:, 0]
            segmentPoints = output[:, 1:]
            inds = np.where((dists < dist_thres))[0]

            # Loop through the points projected onto the line
            for ind in inds:
                specPoint = ds_spec.isel(site=ind)

                segLon = segmentPoints[ind, 0]
                segLat = segmentPoints[ind, 1]

                if plot:
                    ax.plot(
                        [segLon, specPoint.lon],
                        [segLat, specPoint.lat],
                        color="r",
                        lw=2,
                    )
                    ax.scatter(specPoint.lon, specPoint.lat, marker="o", color="b")
                    ax.scatter(segLon, segLat, marker="x", color="g")

                specPoint["lon"] = segLon
                specPoint["lat"] = segLat
                specPoints.append(specPoint)

            logger.debug(f"Segment {i} - Indices {inds}")

        if plot:
            fig.show()

        ds_boundary = xr.concat(specPoints, dim="site")
        return ds_boundary

    def __repr__(self):
        return f"SwanGrid: {self.grid_type}, {self.nx}x{self.ny}"

    def __str__(self):
        return f"SwanGrid: {self.grid_type}, {self.nx}x{self.ny}"

    @classmethod
    def from_component(cls, component: GRIDREGULAR) -> "SwanGrid":
        """Swan grid from an existing component.

        Parameters
        ----------
        component: GRIDREGULAR
            A GRIDREGULAR SWAN component.

        Returns
        -------
        SwanGrid
            A SwanGrid object.

        """
        return cls(
            x0=component.xp,
            y0=component.yp,
            rot=component.alp,
            dx=component.dx,
            dy=component.dy,
            nx=component.mx + 1,
            ny=component.my + 1,
        )

Attributes

grid_type class-attribute instance-attribute

grid_type: Literal['REG', 'CURV'] = Field('REG', description='Type of grid (REG=regular, CURV=curvilinear)')

exc class-attribute instance-attribute

exc: Optional[float] = Field(None, description='Missing value')

gridfile class-attribute instance-attribute

gridfile: Optional[str] = Field(None, description='Name of grid file to load', max_length=36)

inpgrid property

inpgrid

cgrid property

cgrid

cgrid_read property

cgrid_read

component property

component

Return the respective SWAN component for this grid.

Functions

validate_grid_type classmethod

validate_grid_type(v)
Source code in src/rompy_swan/grid.py
@field_validator("grid_type")
@classmethod
def validate_grid_type(cls, v):
    if v not in ["REG", "CURV"]:
        raise ValueError("grid_type must be one of REG or CURV")
    return v

validate_curvilinear_grid

validate_curvilinear_grid() -> SwanGrid
Source code in src/rompy_swan/grid.py
@model_validator(mode="after")
def validate_curvilinear_grid(self) -> "SwanGrid":
    if self.grid_type == "CURV" and self.gridfile is None:
        raise ValueError("gridfile must be provided for CURV grid")
    return self

boundary

boundary(*args, **kwargs) -> tuple

Returns the grid boundary polygon.

Override the parent method to use the actual points from the regular grid boundary instead of the convex hull which is not always the boundary.

Source code in src/rompy_swan/grid.py
def boundary(self, *args, **kwargs) -> tuple:
    """Returns the grid boundary polygon.

    Override the parent method to use the actual points from the regular grid
    boundary instead of the convex hull which is not always the boundary.

    """
    x = np.concatenate(
        [self.x[0, :], self.x[1:, -1], self.x[-1, -2::-1], self.x[-2::-1, 0]]
    )
    y = np.concatenate(
        [self.y[0, :], self.y[1:, -1], self.y[-1, -2::-1], self.y[-2::-1, 0]]
    )
    return Polygon(zip(x, y))

nearby_spectra

nearby_spectra(ds_spec, dist_thres=0.05, plot=True)

Find points nearby and project to the boundary

Parameters

ds_spec: xarray.Dataset an XArray dataset of wave spectra at a number of points. Dataset variable names standardised using wavespectra.read_* functions.

See https://wavespectra.readthedocs.io/en/latest/api.html#input-functions

dist_thres: float, optional [Default: 0.05] Maximum distance to translate the input spectra to the grid boundary plot: boolean, optional [Default: True] Generate a plot that shows the applied projections

Returns

xarray.Dataset A subset of ds_spec with lat and lon coordinates projected to the boundary

Source code in src/rompy_swan/grid.py
def nearby_spectra(self, ds_spec, dist_thres=0.05, plot=True):
    """Find points nearby and project to the boundary

    Parameters
    ----------
    ds_spec: xarray.Dataset
        an XArray dataset of wave spectra at a number of points.
        Dataset variable names standardised using wavespectra.read_*
        functions.

        See https://wavespectra.readthedocs.io/en/latest/api.html#input-functions
    dist_thres: float, optional [Default: 0.05]
        Maximum distance to translate the input spectra to the grid boundary
    plot: boolean, optional [Default: True]
        Generate a plot that shows the applied projections

    Returns
    -------
    xarray.Dataset
        A subset of ds_spec with lat and lon coordinates projected to the boundary
    """

    bbox = self.bbox(buffer=dist_thres)
    minLon, minLat, maxLon, maxLat = bbox

    inds = np.where(
        (ds_spec.lon > minLon)
        & (ds_spec.lon < maxLon)
        & (ds_spec.lat > minLat)
        & (ds_spec.lat < maxLat)
    )[0]
    ds_spec = ds_spec.isel(site=inds)

    # Work out the closest spectral points
    def _nearestPointOnLine(p1, p2, p3):
        # calculate the distance of p3 from the line between p1 and p2 and return
        # the closest point on the line

        from math import fabs, sqrt

        a = p2[1] - p1[1]
        b = -1.0 * (p2[0] - p1[0])
        c = p2[0] * p1[1] - p2[1] * p1[0]

        dist = fabs(a * p3[0] + b * p3[1] + c) / sqrt(a**2 + b**2)
        x = (b * (b * p3[0] - a * p3[1]) - a * c) / (a**2 + b**2)
        y = (a * (-b * p3[0] + a * p3[1]) - b * c) / (a**2 + b**2)

        return dist, x, y

    bx, by = self.boundary_points()
    pol = np.stack([bx, by])

    # Spectra points
    ds_spec.lon.load()
    ds_spec.lat.load()
    ds_spec["lon_original"] = ds_spec["lon"]
    ds_spec["lat_original"] = ds_spec["lat"]
    p3s = list(zip(ds_spec.lon.values, ds_spec.lat.values))

    if plot:
        fig, ax = self.plot()
        ax.scatter(ds_spec.lon, ds_spec.lat)

    specPoints = []
    for i in range(pol.shape[1] - 1):
        p1 = pol[:, i]
        p2 = pol[:, i + 1]
        np.stack((p1, p2))
        output = np.array(
            list(map(lambda xi: _nearestPointOnLine(p1, p2, xi), p3s))
        )
        dists = output[:, 0]
        segmentPoints = output[:, 1:]
        inds = np.where((dists < dist_thres))[0]

        # Loop through the points projected onto the line
        for ind in inds:
            specPoint = ds_spec.isel(site=ind)

            segLon = segmentPoints[ind, 0]
            segLat = segmentPoints[ind, 1]

            if plot:
                ax.plot(
                    [segLon, specPoint.lon],
                    [segLat, specPoint.lat],
                    color="r",
                    lw=2,
                )
                ax.scatter(specPoint.lon, specPoint.lat, marker="o", color="b")
                ax.scatter(segLon, segLat, marker="x", color="g")

            specPoint["lon"] = segLon
            specPoint["lat"] = segLat
            specPoints.append(specPoint)

        logger.debug(f"Segment {i} - Indices {inds}")

    if plot:
        fig.show()

    ds_boundary = xr.concat(specPoints, dim="site")
    return ds_boundary

from_component classmethod

from_component(component: GRIDREGULAR) -> SwanGrid

Swan grid from an existing component.

Parameters

component: GRIDREGULAR A GRIDREGULAR SWAN component.

Returns

SwanGrid A SwanGrid object.

Source code in src/rompy_swan/grid.py
@classmethod
def from_component(cls, component: GRIDREGULAR) -> "SwanGrid":
    """Swan grid from an existing component.

    Parameters
    ----------
    component: GRIDREGULAR
        A GRIDREGULAR SWAN component.

    Returns
    -------
    SwanGrid
        A SwanGrid object.

    """
    return cls(
        x0=component.xp,
        y0=component.yp,
        rot=component.alp,
        dx=component.dx,
        dy=component.dy,
        nx=component.mx + 1,
        ny=component.my + 1,
    )

Data Classes

SwanDataGrid

Bases: DataGrid

This class is used to write SWAN data from a dataset.

Source code in src/rompy_swan/data.py
class SwanDataGrid(DataGrid):
    """This class is used to write SWAN data from a dataset."""

    z1: Optional[str] = Field(
        default=None,
        description=(
            "Name of the data variable in dataset representing either a scaler "
            "parameter or the u-componet of a vector field"
        ),
    )
    z2: Optional[str] = Field(
        default=None,
        description=(
            "Name of the data variable in dataset representing "
            "the v-componet of a vector field"
        ),
    )
    var: GridOptions = Field(description="SWAN input grid name")
    fac: float = Field(
        description=(
            "SWAN multiplies all values that are read from file by `fac`. For "
            "instance if the values are given in unit decimeter, one should make "
            "`fac=0.1` to obtain values in m. To change sign use a negative `fac`"
        ),
        default=1.0,
    )

    @model_validator(mode="after")
    def ensure_z1_in_data_vars(self) -> "SwanDataGrid":
        data_vars = self.variables
        for z in [self.z1, self.z2]:
            if z and z not in data_vars:
                logger.debug(f"Adding {z} to data_vars")
                data_vars.append(z)
        self.variables = data_vars
        return self

    def get(
        self,
        destdir: str | Path,
        grid: Optional[SwanGrid] = None,
        time: Optional[TimeRange] = None,
    ) -> Path:
        """Write the data source to a new location.

        Parameters
        ----------
        destdir : str | Path
            The destination directory to write the netcdf data to.
        grid: SwanGrid, optional
            The grid to filter the data to, only used if `self.filter_grid` is True.
        time: TimeRange, optional
            The times to filter the data to, only used if `self.filter_time` is True.

        Returns
        -------
        cmd: str
            The command line string with the INPGRID/READINP commands ready to be
            written to the SWAN input file.

        Note
        ----
        The data are assumed to not have been rotated. We cannot use the grid.rot attr
        as this is the rotation from the model grid object which is not necessarily the
        same as the rotation of the data.

        """
        if self.crop_data:
            if grid is not None:
                self._filter_grid(grid)
            if time is not None:
                self._filter_time(time)

        output_file = os.path.join(destdir, f"{self.var.value}.grd")

        # Create a formatted box for logging
        log_box(
            title=f"WRITING {self.var.value.upper()} GRID DATA",
            logger=logger,
            add_empty_line=False,
        )

        # Log output file and dataset information using bullet points
        items = [f"Output file: {output_file}"]

        # Add variable information if available
        if self.z1:
            shape_info = f"{self.ds[self.z1].shape}"
            items.append(f"Variable: {self.z1} with shape {shape_info}")
        if self.z2:
            shape_info = f"{self.ds[self.z2].shape}"
            items.append(f"Variable: {self.z2} with shape {shape_info}")

        # Add scaling factor
        items.append(f"Scaling factor: {self.fac}")

        # Log all items as a bulleted list
        logger.bullet_list(items, indent=2)

        start_time = time_module.time()
        if self.var.value == "bottom":
            inpgrid, readgrid = self.ds.swan.to_bottom_grid(
                output_file,
                fmt="%4.2f",
                x=self.coords.x,
                y=self.coords.y,
                z=self.z1,
                fac=self.fac,
                rot=0.0,
                vmin=float("-inf"),
            )
        else:
            inpgrid, readgrid = self.ds.swan.to_inpgrid(
                output_file=output_file,
                x=self.coords.x,
                y=self.coords.y,
                z1=self.z1,
                z2=self.z2,
                fac=self.fac,
                rot=0.0,
                var=self.var.name,
            )

        # Log completion and processing time
        elapsed_time = time_module.time() - start_time
        file_size = Path(output_file).stat().st_size / (1024 * 1024)  # Size in MB

        # Use the centralized functions from rompy package

        # Log completion information as a bulleted list
        logger.bullet_list(
            [
                f"Completed in {elapsed_time:.2f} seconds",
                f"File size: {file_size:.2f} MB",
            ],
            indent=2,
        )

        return f"{inpgrid}\n{readgrid}\n"

    def __str__(self):
        return f"SWANDataGrid {self.var.name}"

    def _format_value(self, obj):
        """Format SwanDataGrid values using the new formatting framework.

        This method provides special formatting for SwanDataGrid objects.

        Args:
            obj: The object to format

        Returns:
            A formatted string or None to use default formatting
        """
        # Only format SwanDataGrid objects
        if not isinstance(obj, SwanDataGrid):
            return None

        # Use the new formatting framework
        from rompy.formatting import format_value

        return format_value(obj)
        lines.append(f"  {bullet} Variable:   {obj.var.name}")

        # Add source information if available
        if hasattr(obj, "source") and obj.source:
            source_type = getattr(obj.source, "model_type", "unknown")
            lines.append(f"  {bullet} Source:     {source_type}")

            # Add dataset information if available
            if hasattr(obj.source, "dataset_id"):
                lines.append(f"  {bullet} Dataset ID: {obj.source.dataset_id}")

        # Add coordinate information if available
        if hasattr(obj, "coords") and obj.coords:
            coords = [f"{k}={v}" for k, v in obj.coords.items()]
            coords_str = ", ".join(coords)
            lines.append(f"  {bullet} Coordinates: {coords_str}")

        # Add scaling factor if available
        if hasattr(obj, "fac"):
            lines.append(f"  {bullet} Scale factor: {obj.fac}")

        # Add z variables information if available
        if hasattr(obj, "z1") and obj.z1:
            lines.append(f"  {bullet} Z1 variable: {obj.z1}")
        if hasattr(obj, "z2") and obj.z2:
            lines.append(f"  {bullet} Z2 variable: {obj.z2}")

        # Close with footer
        lines.append(footer)

        return "\n".join(lines)

Attributes

z1 class-attribute instance-attribute

z1: Optional[str] = Field(default=None, description='Name of the data variable in dataset representing either a scaler parameter or the u-componet of a vector field')

z2 class-attribute instance-attribute

z2: Optional[str] = Field(default=None, description='Name of the data variable in dataset representing the v-componet of a vector field')

var class-attribute instance-attribute

var: GridOptions = Field(description='SWAN input grid name')

fac class-attribute instance-attribute

fac: float = Field(description='SWAN multiplies all values that are read from file by `fac`. For instance if the values are given in unit decimeter, one should make `fac=0.1` to obtain values in m. To change sign use a negative `fac`', default=1.0)

Functions

ensure_z1_in_data_vars

ensure_z1_in_data_vars() -> SwanDataGrid
Source code in src/rompy_swan/data.py
@model_validator(mode="after")
def ensure_z1_in_data_vars(self) -> "SwanDataGrid":
    data_vars = self.variables
    for z in [self.z1, self.z2]:
        if z and z not in data_vars:
            logger.debug(f"Adding {z} to data_vars")
            data_vars.append(z)
    self.variables = data_vars
    return self

get

get(destdir: str | Path, grid: Optional[SwanGrid] = None, time: Optional[TimeRange] = None) -> Path

Write the data source to a new location.

Parameters

destdir : str | Path The destination directory to write the netcdf data to. grid: SwanGrid, optional The grid to filter the data to, only used if self.filter_grid is True. time: TimeRange, optional The times to filter the data to, only used if self.filter_time is True.

Returns

cmd: str The command line string with the INPGRID/READINP commands ready to be written to the SWAN input file.

Note

The data are assumed to not have been rotated. We cannot use the grid.rot attr as this is the rotation from the model grid object which is not necessarily the same as the rotation of the data.

Source code in src/rompy_swan/data.py
def get(
    self,
    destdir: str | Path,
    grid: Optional[SwanGrid] = None,
    time: Optional[TimeRange] = None,
) -> Path:
    """Write the data source to a new location.

    Parameters
    ----------
    destdir : str | Path
        The destination directory to write the netcdf data to.
    grid: SwanGrid, optional
        The grid to filter the data to, only used if `self.filter_grid` is True.
    time: TimeRange, optional
        The times to filter the data to, only used if `self.filter_time` is True.

    Returns
    -------
    cmd: str
        The command line string with the INPGRID/READINP commands ready to be
        written to the SWAN input file.

    Note
    ----
    The data are assumed to not have been rotated. We cannot use the grid.rot attr
    as this is the rotation from the model grid object which is not necessarily the
    same as the rotation of the data.

    """
    if self.crop_data:
        if grid is not None:
            self._filter_grid(grid)
        if time is not None:
            self._filter_time(time)

    output_file = os.path.join(destdir, f"{self.var.value}.grd")

    # Create a formatted box for logging
    log_box(
        title=f"WRITING {self.var.value.upper()} GRID DATA",
        logger=logger,
        add_empty_line=False,
    )

    # Log output file and dataset information using bullet points
    items = [f"Output file: {output_file}"]

    # Add variable information if available
    if self.z1:
        shape_info = f"{self.ds[self.z1].shape}"
        items.append(f"Variable: {self.z1} with shape {shape_info}")
    if self.z2:
        shape_info = f"{self.ds[self.z2].shape}"
        items.append(f"Variable: {self.z2} with shape {shape_info}")

    # Add scaling factor
    items.append(f"Scaling factor: {self.fac}")

    # Log all items as a bulleted list
    logger.bullet_list(items, indent=2)

    start_time = time_module.time()
    if self.var.value == "bottom":
        inpgrid, readgrid = self.ds.swan.to_bottom_grid(
            output_file,
            fmt="%4.2f",
            x=self.coords.x,
            y=self.coords.y,
            z=self.z1,
            fac=self.fac,
            rot=0.0,
            vmin=float("-inf"),
        )
    else:
        inpgrid, readgrid = self.ds.swan.to_inpgrid(
            output_file=output_file,
            x=self.coords.x,
            y=self.coords.y,
            z1=self.z1,
            z2=self.z2,
            fac=self.fac,
            rot=0.0,
            var=self.var.name,
        )

    # Log completion and processing time
    elapsed_time = time_module.time() - start_time
    file_size = Path(output_file).stat().st_size / (1024 * 1024)  # Size in MB

    # Use the centralized functions from rompy package

    # Log completion information as a bulleted list
    logger.bullet_list(
        [
            f"Completed in {elapsed_time:.2f} seconds",
            f"File size: {file_size:.2f} MB",
        ],
        indent=2,
    )

    return f"{inpgrid}\n{readgrid}\n"

Boundary Data

Boundnest1

Bases: BoundaryWaveStation

SWAN BOUNDNEST1 NEST data class.

Source code in src/rompy_swan/boundary.py
class Boundnest1(BoundaryWaveStation):
    """SWAN BOUNDNEST1 NEST data class."""

    model_type: Literal["boundnest1", "BOUNDNEST1"] = Field(
        default="boundnest1", description="Model type discriminator"
    )
    rectangle: Literal["closed", "open"] = Field(
        default="closed",
        description=(
            "Defines whether boundary is defined over an closed or open rectangle"
        ),
    )

    def get(
        self, destdir: str, grid: SwanGrid, time: Optional[TimeRange] = None
    ) -> str:
        """Write the data source to a new location.

        Parameters
        ----------
        destdir : str | Path
            Destination directory for the SWAN ASCII file.
        grid : RegularGrid
            Grid instance to use for selecting the boundary points.
        time: TimeRange, optional
            The times to filter the data to, only used if `self.crop_data` is True.

        Returns
        -------
        filename: Path
            The filename of the written boundary file.
        cmd : str
            Boundary command string to render in the SWAN INPUT file

        """
        if self.crop_data and time is not None:
            self._filter_time(time)
        if self.crop_data and grid is not None:
            self._filter_grid(grid)

        ds = self._sel_boundary(grid).sortby("dir")

        # If nearest, ensure points are returned at the requested positions
        if self.sel_method == "nearest":
            xbnd, ybnd = self._boundary_points(grid=grid)
            ds["lon"].values = xbnd
            ds["lat"].values = ybnd

        filename = Path(destdir) / f"{self.id}.bnd"
        ds.spec.to_swan(filename)
        cmd = f"BOUNDNEST1 NEST '{filename.name}' {self.rectangle.upper()}"
        return filename, cmd

Attributes

model_type class-attribute instance-attribute

model_type: Literal['boundnest1', 'BOUNDNEST1'] = Field(default='boundnest1', description='Model type discriminator')

rectangle class-attribute instance-attribute

rectangle: Literal['closed', 'open'] = Field(default='closed', description='Defines whether boundary is defined over an closed or open rectangle')

Functions

get

get(destdir: str, grid: SwanGrid, time: Optional[TimeRange] = None) -> str

Write the data source to a new location.

Parameters

destdir : str | Path Destination directory for the SWAN ASCII file. grid : RegularGrid Grid instance to use for selecting the boundary points. time: TimeRange, optional The times to filter the data to, only used if self.crop_data is True.

Returns

filename: Path The filename of the written boundary file. cmd : str Boundary command string to render in the SWAN INPUT file

Source code in src/rompy_swan/boundary.py
def get(
    self, destdir: str, grid: SwanGrid, time: Optional[TimeRange] = None
) -> str:
    """Write the data source to a new location.

    Parameters
    ----------
    destdir : str | Path
        Destination directory for the SWAN ASCII file.
    grid : RegularGrid
        Grid instance to use for selecting the boundary points.
    time: TimeRange, optional
        The times to filter the data to, only used if `self.crop_data` is True.

    Returns
    -------
    filename: Path
        The filename of the written boundary file.
    cmd : str
        Boundary command string to render in the SWAN INPUT file

    """
    if self.crop_data and time is not None:
        self._filter_time(time)
    if self.crop_data and grid is not None:
        self._filter_grid(grid)

    ds = self._sel_boundary(grid).sortby("dir")

    # If nearest, ensure points are returned at the requested positions
    if self.sel_method == "nearest":
        xbnd, ybnd = self._boundary_points(grid=grid)
        ds["lon"].values = xbnd
        ds["lat"].values = ybnd

    filename = Path(destdir) / f"{self.id}.bnd"
    ds.spec.to_swan(filename)
    cmd = f"BOUNDNEST1 NEST '{filename.name}' {self.rectangle.upper()}"
    return filename, cmd

Interfaces

DataInterface

Bases: RompyBaseModel

SWAN forcing data interface.

Examples

.. ipython:: python :okwarning:

from rompy_swan.interface import DataInterface
Source code in src/rompy_swan/interface.py
class DataInterface(RompyBaseModel):
    """SWAN forcing data interface.

    Examples
    --------

    .. ipython:: python
        :okwarning:

        from rompy_swan.interface import DataInterface

    """

    model_type: Literal["data_interface", "DATA_INTERFACE"] = Field(
        default="data_interface", description="Model type discriminator"
    )
    bottom: Optional[SwanDataGrid] = Field(default=None, description="Bathymetry data")
    input: list[SwanDataGrid] = Field(default=[], description="Input grid data")

    @field_validator("input")
    @classmethod
    def ensure_unique_var(
        cls, input: list[SwanDataGrid], info: ValidationInfo
    ) -> list[SwanDataGrid]:
        """Ensure that each input var is unique."""
        vars = []
        if info.data["bottom"] is not None:
            vars.append(info.data["bottom"].var)
        vars.extend([inp.var for inp in input])
        if len(vars) != len(set(vars)):
            raise ValueError("Each var must be unique in input")
        return input

    def get(self, staging_dir: Path, grid: SwanGrid, period: TimeRange):
        inputs = []
        if self.bottom is not None:
            inputs.append(self.bottom)
        inputs.extend(self.input)
        cmds = []
        for input in inputs:
            cmds.append(input.get(destdir=staging_dir, grid=grid, time=period))
        return "\n".join(cmds)

    def render(self, *args, **kwargs):
        """Make this class consistent with the components API."""
        return self.get(*args, **kwargs)

Attributes

model_type class-attribute instance-attribute

model_type: Literal['data_interface', 'DATA_INTERFACE'] = Field(default='data_interface', description='Model type discriminator')

bottom class-attribute instance-attribute

bottom: Optional[SwanDataGrid] = Field(default=None, description='Bathymetry data')

input class-attribute instance-attribute

input: list[SwanDataGrid] = Field(default=[], description='Input grid data')

Functions

ensure_unique_var classmethod

ensure_unique_var(input: list[SwanDataGrid], info: ValidationInfo) -> list[SwanDataGrid]

Ensure that each input var is unique.

Source code in src/rompy_swan/interface.py
@field_validator("input")
@classmethod
def ensure_unique_var(
    cls, input: list[SwanDataGrid], info: ValidationInfo
) -> list[SwanDataGrid]:
    """Ensure that each input var is unique."""
    vars = []
    if info.data["bottom"] is not None:
        vars.append(info.data["bottom"].var)
    vars.extend([inp.var for inp in input])
    if len(vars) != len(set(vars)):
        raise ValueError("Each var must be unique in input")
    return input

get

get(staging_dir: Path, grid: SwanGrid, period: TimeRange)
Source code in src/rompy_swan/interface.py
def get(self, staging_dir: Path, grid: SwanGrid, period: TimeRange):
    inputs = []
    if self.bottom is not None:
        inputs.append(self.bottom)
    inputs.extend(self.input)
    cmds = []
    for input in inputs:
        cmds.append(input.get(destdir=staging_dir, grid=grid, time=period))
    return "\n".join(cmds)

render

render(*args, **kwargs)

Make this class consistent with the components API.

Source code in src/rompy_swan/interface.py
def render(self, *args, **kwargs):
    """Make this class consistent with the components API."""
    return self.get(*args, **kwargs)

BoundaryInterface

Bases: RompyBaseModel

SWAN forcing boundary interface.

Examples

.. ipython:: python :okwarning:

from rompy_swan.interface import BoundaryInterface
Source code in src/rompy_swan/interface.py
class BoundaryInterface(RompyBaseModel):
    """SWAN forcing boundary interface.

    Examples
    --------

    .. ipython:: python
        :okwarning:

        from rompy_swan.interface import BoundaryInterface

    """

    model_type: Literal["boundary_interface", "BOUNDARY_INTERFACE"] = Field(
        default="boundary_interface", description="Model type discriminator"
    )
    kind: Union[Boundnest1, BoundspecSide, BoundspecSegmentXY] = Field(
        default=None, description="Boundary data object"
    )

    def get(self, staging_dir: Path, grid: SwanGrid, period: TimeRange):
        filename, cmd = self.kind.get(destdir=staging_dir, grid=grid, time=period)
        logger.info(f"Generating boundary file: {filename}")
        return cmd

    def render(self, *args, **kwargs):
        """Make this class consistent with the components API."""
        return self.get(*args, **kwargs)

Attributes

model_type class-attribute instance-attribute

model_type: Literal['boundary_interface', 'BOUNDARY_INTERFACE'] = Field(default='boundary_interface', description='Model type discriminator')

kind class-attribute instance-attribute

kind: Union[Boundnest1, BoundspecSide, BoundspecSegmentXY] = Field(default=None, description='Boundary data object')

Functions

get

get(staging_dir: Path, grid: SwanGrid, period: TimeRange)
Source code in src/rompy_swan/interface.py
def get(self, staging_dir: Path, grid: SwanGrid, period: TimeRange):
    filename, cmd = self.kind.get(destdir=staging_dir, grid=grid, time=period)
    logger.info(f"Generating boundary file: {filename}")
    return cmd

render

render(*args, **kwargs)

Make this class consistent with the components API.

Source code in src/rompy_swan/interface.py
def render(self, *args, **kwargs):
    """Make this class consistent with the components API."""
    return self.get(*args, **kwargs)

OutputInterface

Bases: TimeInterface

Output group component with consistent times.

Source code in src/rompy_swan/interface.py
class OutputInterface(TimeInterface):
    """Output group component with consistent times."""

    model_type: Literal["outputinterface", "OUTPUTINTERFACE"] = Field(
        default="outputinterface", description="Model type discriminator"
    )

    @model_validator(mode="after")
    def time_interface(self) -> "OutputInterface":
        """Set the time parameter for all WRITE components.

        Note
        ----
        * tbeg is set to the Config runtime start
        * delt is set from the component's times if defined, otherwise from the runtime interval
        * tfmt is set from the component's times if defined, otherwise from TimeRange default
        * dfmt is set from the component's times if defined, otherwise from TimeRange default
        * suffix is set to the suffix of the component

        """
        for component in self.group._write_fields:
            obj = getattr(self.group, component)
            if obj is not None:
                times = obj.times or TimeRangeOpen()
                obj.times = TimeRangeOpen(
                    tbeg=self.period.start,
                    delt=times.delt if obj.times else self.period.interval,
                    tfmt=times.tfmt,
                    dfmt=times.dfmt,
                    suffix=obj.suffix,
                )

Attributes

model_type class-attribute instance-attribute

model_type: Literal['outputinterface', 'OUTPUTINTERFACE'] = Field(default='outputinterface', description='Model type discriminator')

Functions

time_interface

time_interface() -> OutputInterface

Set the time parameter for all WRITE components.

Note
  • tbeg is set to the Config runtime start
  • delt is set from the component's times if defined, otherwise from the runtime interval
  • tfmt is set from the component's times if defined, otherwise from TimeRange default
  • dfmt is set from the component's times if defined, otherwise from TimeRange default
  • suffix is set to the suffix of the component
Source code in src/rompy_swan/interface.py
@model_validator(mode="after")
def time_interface(self) -> "OutputInterface":
    """Set the time parameter for all WRITE components.

    Note
    ----
    * tbeg is set to the Config runtime start
    * delt is set from the component's times if defined, otherwise from the runtime interval
    * tfmt is set from the component's times if defined, otherwise from TimeRange default
    * dfmt is set from the component's times if defined, otherwise from TimeRange default
    * suffix is set to the suffix of the component

    """
    for component in self.group._write_fields:
        obj = getattr(self.group, component)
        if obj is not None:
            times = obj.times or TimeRangeOpen()
            obj.times = TimeRangeOpen(
                tbeg=self.period.start,
                delt=times.delt if obj.times else self.period.interval,
                tfmt=times.tfmt,
                dfmt=times.dfmt,
                suffix=obj.suffix,
            )

LockupInterface

Bases: TimeInterface

Lockup group component with consistent times.

Source code in src/rompy_swan/interface.py
class LockupInterface(TimeInterface):
    """Lockup group component with consistent times."""

    model_type: Literal["lockupinterface", "LOCKUPINTERFACE"] = Field(
        default="lockupinterface", description="Model type discriminator"
    )

    def _nonstationary(self, tfmt: str, dfmt: str) -> NONSTATIONARY:
        return NONSTATIONARY(
            tbeg=self.period.start,
            tend=self.period.end,
            delt=self.period.interval,
            tfmt=tfmt,
            dfmt=dfmt,
            suffix="c",
        )

    def _stationary(self, tfmt: str) -> STATIONARY:
        return STATIONARY(time=self.period.start, tfmt=tfmt)

    @model_validator(mode="after")
    def time_interface(self) -> "LockupInterface":
        """Set the time parameter for COMPUTE components."""
        times = self.group.compute.times or NONSTATIONARY()
        if isinstance(times, NONSTATIONARY):
            times = self._nonstationary(times.tfmt, times.dfmt)
        elif isinstance(times, STATIONARY):
            times = self._stationary(times.tfmt)
        else:
            raise ValueError(f"Unknown time type {type(times)}")
        self.group.compute.times = times

Attributes

model_type class-attribute instance-attribute

model_type: Literal['lockupinterface', 'LOCKUPINTERFACE'] = Field(default='lockupinterface', description='Model type discriminator')

Functions

time_interface

time_interface() -> LockupInterface

Set the time parameter for COMPUTE components.

Source code in src/rompy_swan/interface.py
@model_validator(mode="after")
def time_interface(self) -> "LockupInterface":
    """Set the time parameter for COMPUTE components."""
    times = self.group.compute.times or NONSTATIONARY()
    if isinstance(times, NONSTATIONARY):
        times = self._nonstationary(times.tfmt, times.dfmt)
    elif isinstance(times, STATIONARY):
        times = self._stationary(times.tfmt)
    else:
        raise ValueError(f"Unknown time type {type(times)}")
    self.group.compute.times = times