Skip to content

SWAN

TODO: Ensure the model_type is shown next to each class in the autosummaries.

TODO: Fix broken links to classes and modules.

Grid

SwanGrid

Bases: RegularGrid

Regular SWAN grid in geographic space.

Source code in rompy_swan/grid.py
class SwanGrid(RegularGrid):
    """Regular SWAN grid in geographic space."""

    grid_type: Literal["REG", "CURV"] = Field(
        "REG", description="Type of grid (REG=regular, CURV=curvilinear)"
    )
    exc: Optional[float] = Field(None, description="Missing value")
    gridfile: Optional[str] = Field(
        None, description="Name of grid file to load", max_length=36
    )

    @field_validator("grid_type")
    @classmethod
    def validate_grid_type(cls, v):
        if v not in ["REG", "CURV"]:
            raise ValueError("grid_type must be one of REG or CURV")
        return v

    @model_validator(mode="after")
    def validate_curvilinear_grid(self) -> "SwanGrid":
        if self.grid_type == "CURV" and self.gridfile is None:
            raise ValueError("gridfile must be provided for CURV grid")
        return self

    def _regen_grid(self):
        if self.grid_type == "REG":
            _x, _y = self._gen_reg_cgrid()
        elif self.grid_type == "CURV":
            _x, _y = self._gen_curv_cgrid()
        self.x = _x
        self.y = _y

    def _gen_curv_cgrid(self):
        """loads a SWAN curvilinear grid and returns cgrid lat/lons and
        command to be used in SWAN contol file. The Default grid is one I made using
        Deltares' RGFGrid tool and converted to a SWAN-friendly formate using Deltares
        OpenEarth code "swan_io_grd.m"

        """
        # number of grid cells in the 'x' and 'y' directions:
        # (you can get this from d3d_qp.m or other Deltares OpenEarth code)
        nX = self.nx
        nY = self.ny

        grid_Data = open(self.gridpath).readlines()
        ix = grid_Data.index("x-coordinates\n")
        iy = grid_Data.index("y-coordinates\n")
        lons = []
        lats = []
        for idx in np.arange(ix + 1, iy):
            lons.append(re.sub("\n", "", grid_Data[idx]).split())
        for idx in np.arange(iy + 1, len(grid_Data)):
            lats.append(re.sub("\n", "", grid_Data[idx]).split())

        def flatten(l):
            return [item for sublist in l for item in sublist]

        lons = np.array(flatten(lons)).astype(np.float)
        lats = np.array(flatten(lats)).astype(np.float)

        x = np.reshape(lats, (nX, nY))
        y = np.reshape(lons, (nX, nY))

        return x, y

    @property
    def inpgrid(self):
        if self.grid_type == "REG":
            inpstr = f"REG {self.x0} {self.y0} {self.rot} {self.nx-1:0.0f} {self.ny-1:0.0f} {self.dx} {self.dy}"
            if self.exc is not None:
                inpstr += f" EXC {self.exc}"
            return inpstr
        elif self.grid_type == "CURV":
            raise NotImplementedError("Curvilinear grids not supported yet")
            # return f'CURVilinear {self.nx-1:0.0f} {self.ny-1:0.0f} \nREADGRID COOR 1 \'{os.path.basename(self.gridpath)}\' 1 0 1 FREE'

    @property
    def cgrid(self):
        if self.grid_type == "REG":
            return f"REG {self.x0} {self.y0} {self.rot} {self.xlen} {self.ylen} {self.nx-1:0.0f} {self.ny-1:0.0f}"
        elif self.grid_type == "CURV":
            raise NotImplementedError("Curvilinear grids not supported yet")
            # return (f'CURVilinear {self.nx-1:0.0f} {self.ny-1:0.0f}',f'READGRID COOR 1 \'{os.path.basename(self.gridpath)}\' 1 0 1 FREE')

    @property
    def cgrid_read(self):
        if self.grid_type == "REG":
            return ""
        elif self.grid_type == "CURV":
            raise NotImplementedError("Curvilinear grids not supported yet")
            # return f'READGRID COOR 1 \'{os.path.basename(self.gridpath)}\' 1 0 1 FREE'

    @property
    def component(self):
        """Return the respective SWAN component for this grid."""
        if self.grid_type == "REG":
            return GRIDREGULAR(
                xp=self.x0,
                yp=self.y0,
                alp=self.rot,
                xlen=self.xlen,
                ylen=self.ylen,
                mx=self.nx - 1,
                my=self.ny - 1,
            )
        else:
            raise NotImplementedError("Only regular grid is currently supported")

    def __call__(self):
        output = f"CGRID {self.cgrid} CIRCLE 36 0.0464 1. 31\n"
        output += f"{self.cgrid_read}\n"
        return output

    def boundary(self, *args, **kwargs) -> tuple:
        """Returns the grid boundary polygon.

        Override the parent method to use the actual points from the regular grid
        boundary instead of the convex hull which is not always the boundary.

        """
        x = np.concatenate(
            [self.x[0, :], self.x[1:, -1], self.x[-1, -2::-1], self.x[-2::-1, 0]]
        )
        y = np.concatenate(
            [self.y[0, :], self.y[1:, -1], self.y[-1, -2::-1], self.y[-2::-1, 0]]
        )
        return Polygon(zip(x, y))

    def nearby_spectra(self, ds_spec, dist_thres=0.05, plot=True):
        """Find points nearby and project to the boundary

        Parameters
        ----------
        ds_spec: xarray.Dataset
            an XArray dataset of wave spectra at a number of points.
            Dataset variable names standardised using wavespectra.read_*
            functions.

            See https://wavespectra.readthedocs.io/en/latest/api.html#input-functions
        dist_thres: float, optional [Default: 0.05]
            Maximum distance to translate the input spectra to the grid boundary
        plot: boolean, optional [Default: True]
            Generate a plot that shows the applied projections

        Returns
        -------
        xarray.Dataset
            A subset of ds_spec with lat and lon coordinates projected to the boundary
        """

        bbox = self.bbox(buffer=dist_thres)
        minLon, minLat, maxLon, maxLat = bbox

        inds = np.where(
            (ds_spec.lon > minLon)
            & (ds_spec.lon < maxLon)
            & (ds_spec.lat > minLat)
            & (ds_spec.lat < maxLat)
        )[0]
        ds_spec = ds_spec.isel(site=inds)

        # Work out the closest spectral points
        def _nearestPointOnLine(p1, p2, p3):
            # calculate the distance of p3 from the line between p1 and p2 and return
            # the closest point on the line

            from math import fabs, sqrt

            a = p2[1] - p1[1]
            b = -1.0 * (p2[0] - p1[0])
            c = p2[0] * p1[1] - p2[1] * p1[0]

            dist = fabs(a * p3[0] + b * p3[1] + c) / sqrt(a**2 + b**2)
            x = (b * (b * p3[0] - a * p3[1]) - a * c) / (a**2 + b**2)
            y = (a * (-b * p3[0] + a * p3[1]) - b * c) / (a**2 + b**2)

            return dist, x, y

        bx, by = self.boundary_points()
        pol = np.stack([bx, by])

        # Spectra points
        ds_spec.lon.load()
        ds_spec.lat.load()
        ds_spec["lon_original"] = ds_spec["lon"]
        ds_spec["lat_original"] = ds_spec["lat"]
        p3s = list(zip(ds_spec.lon.values, ds_spec.lat.values))

        if plot:
            fig, ax = self.plot()
            ax.scatter(ds_spec.lon, ds_spec.lat)

        specPoints = []
        for i in range(pol.shape[1] - 1):
            p1 = pol[:, i]
            p2 = pol[:, i + 1]
            np.stack((p1, p2))
            output = np.array(
                list(map(lambda xi: _nearestPointOnLine(p1, p2, xi), p3s))
            )
            dists = output[:, 0]
            segmentPoints = output[:, 1:]
            inds = np.where((dists < dist_thres))[0]

            # Loop through the points projected onto the line
            for ind in inds:
                specPoint = ds_spec.isel(site=ind)

                segLon = segmentPoints[ind, 0]
                segLat = segmentPoints[ind, 1]

                if plot:
                    ax.plot(
                        [segLon, specPoint.lon],
                        [segLat, specPoint.lat],
                        color="r",
                        lw=2,
                    )
                    ax.scatter(specPoint.lon, specPoint.lat, marker="o", color="b")
                    ax.scatter(segLon, segLat, marker="x", color="g")

                specPoint["lon"] = segLon
                specPoint["lat"] = segLat
                specPoints.append(specPoint)

            logger.debug(f"Segment {i} - Indices {inds}")

        if plot:
            fig.show()

        ds_boundary = xr.concat(specPoints, dim="site")
        return ds_boundary

    def __repr__(self):
        return f"SwanGrid: {self.grid_type}, {self.nx}x{self.ny}"

    def __str__(self):
        return f"SwanGrid: {self.grid_type}, {self.nx}x{self.ny}"

    @classmethod
    def from_component(cls, component: GRIDREGULAR) -> "SwanGrid":
        """Swan grid from an existing component.

        Parameters
        ----------
        component: GRIDREGULAR
            A GRIDREGULAR SWAN component.

        Returns
        -------
        SwanGrid
            A SwanGrid object.

        """
        return cls(
            x0=component.xp,
            y0=component.yp,
            rot=component.alp,
            dx=component.dx,
            dy=component.dy,
            nx=component.mx + 1,
            ny=component.my + 1,
        )

Attributes

grid_type class-attribute instance-attribute

grid_type: Literal['REG', 'CURV'] = Field('REG', description='Type of grid (REG=regular, CURV=curvilinear)')

exc class-attribute instance-attribute

exc: Optional[float] = Field(None, description='Missing value')

gridfile class-attribute instance-attribute

gridfile: Optional[str] = Field(None, description='Name of grid file to load', max_length=36)

inpgrid property

inpgrid

cgrid property

cgrid

cgrid_read property

cgrid_read

component property

component

Return the respective SWAN component for this grid.

Functions

validate_grid_type classmethod

validate_grid_type(v)
Source code in rompy_swan/grid.py
@field_validator("grid_type")
@classmethod
def validate_grid_type(cls, v):
    if v not in ["REG", "CURV"]:
        raise ValueError("grid_type must be one of REG or CURV")
    return v

validate_curvilinear_grid

validate_curvilinear_grid() -> SwanGrid
Source code in rompy_swan/grid.py
@model_validator(mode="after")
def validate_curvilinear_grid(self) -> "SwanGrid":
    if self.grid_type == "CURV" and self.gridfile is None:
        raise ValueError("gridfile must be provided for CURV grid")
    return self

boundary

boundary(*args, **kwargs) -> tuple

Returns the grid boundary polygon.

Override the parent method to use the actual points from the regular grid boundary instead of the convex hull which is not always the boundary.

Source code in rompy_swan/grid.py
def boundary(self, *args, **kwargs) -> tuple:
    """Returns the grid boundary polygon.

    Override the parent method to use the actual points from the regular grid
    boundary instead of the convex hull which is not always the boundary.

    """
    x = np.concatenate(
        [self.x[0, :], self.x[1:, -1], self.x[-1, -2::-1], self.x[-2::-1, 0]]
    )
    y = np.concatenate(
        [self.y[0, :], self.y[1:, -1], self.y[-1, -2::-1], self.y[-2::-1, 0]]
    )
    return Polygon(zip(x, y))

nearby_spectra

nearby_spectra(ds_spec, dist_thres=0.05, plot=True)

Find points nearby and project to the boundary

Parameters

ds_spec: xarray.Dataset an XArray dataset of wave spectra at a number of points. Dataset variable names standardised using wavespectra.read_* functions.

See https://wavespectra.readthedocs.io/en/latest/api.html#input-functions

dist_thres: float, optional [Default: 0.05] Maximum distance to translate the input spectra to the grid boundary plot: boolean, optional [Default: True] Generate a plot that shows the applied projections

Returns

xarray.Dataset A subset of ds_spec with lat and lon coordinates projected to the boundary

Source code in rompy_swan/grid.py
def nearby_spectra(self, ds_spec, dist_thres=0.05, plot=True):
    """Find points nearby and project to the boundary

    Parameters
    ----------
    ds_spec: xarray.Dataset
        an XArray dataset of wave spectra at a number of points.
        Dataset variable names standardised using wavespectra.read_*
        functions.

        See https://wavespectra.readthedocs.io/en/latest/api.html#input-functions
    dist_thres: float, optional [Default: 0.05]
        Maximum distance to translate the input spectra to the grid boundary
    plot: boolean, optional [Default: True]
        Generate a plot that shows the applied projections

    Returns
    -------
    xarray.Dataset
        A subset of ds_spec with lat and lon coordinates projected to the boundary
    """

    bbox = self.bbox(buffer=dist_thres)
    minLon, minLat, maxLon, maxLat = bbox

    inds = np.where(
        (ds_spec.lon > minLon)
        & (ds_spec.lon < maxLon)
        & (ds_spec.lat > minLat)
        & (ds_spec.lat < maxLat)
    )[0]
    ds_spec = ds_spec.isel(site=inds)

    # Work out the closest spectral points
    def _nearestPointOnLine(p1, p2, p3):
        # calculate the distance of p3 from the line between p1 and p2 and return
        # the closest point on the line

        from math import fabs, sqrt

        a = p2[1] - p1[1]
        b = -1.0 * (p2[0] - p1[0])
        c = p2[0] * p1[1] - p2[1] * p1[0]

        dist = fabs(a * p3[0] + b * p3[1] + c) / sqrt(a**2 + b**2)
        x = (b * (b * p3[0] - a * p3[1]) - a * c) / (a**2 + b**2)
        y = (a * (-b * p3[0] + a * p3[1]) - b * c) / (a**2 + b**2)

        return dist, x, y

    bx, by = self.boundary_points()
    pol = np.stack([bx, by])

    # Spectra points
    ds_spec.lon.load()
    ds_spec.lat.load()
    ds_spec["lon_original"] = ds_spec["lon"]
    ds_spec["lat_original"] = ds_spec["lat"]
    p3s = list(zip(ds_spec.lon.values, ds_spec.lat.values))

    if plot:
        fig, ax = self.plot()
        ax.scatter(ds_spec.lon, ds_spec.lat)

    specPoints = []
    for i in range(pol.shape[1] - 1):
        p1 = pol[:, i]
        p2 = pol[:, i + 1]
        np.stack((p1, p2))
        output = np.array(
            list(map(lambda xi: _nearestPointOnLine(p1, p2, xi), p3s))
        )
        dists = output[:, 0]
        segmentPoints = output[:, 1:]
        inds = np.where((dists < dist_thres))[0]

        # Loop through the points projected onto the line
        for ind in inds:
            specPoint = ds_spec.isel(site=ind)

            segLon = segmentPoints[ind, 0]
            segLat = segmentPoints[ind, 1]

            if plot:
                ax.plot(
                    [segLon, specPoint.lon],
                    [segLat, specPoint.lat],
                    color="r",
                    lw=2,
                )
                ax.scatter(specPoint.lon, specPoint.lat, marker="o", color="b")
                ax.scatter(segLon, segLat, marker="x", color="g")

            specPoint["lon"] = segLon
            specPoint["lat"] = segLat
            specPoints.append(specPoint)

        logger.debug(f"Segment {i} - Indices {inds}")

    if plot:
        fig.show()

    ds_boundary = xr.concat(specPoints, dim="site")
    return ds_boundary

from_component classmethod

from_component(component: GRIDREGULAR) -> SwanGrid

Swan grid from an existing component.

Parameters

component: GRIDREGULAR A GRIDREGULAR SWAN component.

Returns

SwanGrid A SwanGrid object.

Source code in rompy_swan/grid.py
@classmethod
def from_component(cls, component: GRIDREGULAR) -> "SwanGrid":
    """Swan grid from an existing component.

    Parameters
    ----------
    component: GRIDREGULAR
        A GRIDREGULAR SWAN component.

    Returns
    -------
    SwanGrid
        A SwanGrid object.

    """
    return cls(
        x0=component.xp,
        y0=component.yp,
        rot=component.alp,
        dx=component.dx,
        dy=component.dy,
        nx=component.mx + 1,
        ny=component.my + 1,
    )

Data

SwanDataGrid

Bases: DataGrid

This class is used to write SWAN data from a dataset.

Source code in rompy_swan/data.py
class SwanDataGrid(DataGrid):
    """This class is used to write SWAN data from a dataset."""

    z1: Optional[str] = Field(
        default=None,
        description=(
            "Name of the data variable in dataset representing either a scaler "
            "parameter or the u-componet of a vector field"
        ),
    )
    z2: Optional[str] = Field(
        default=None,
        description=(
            "Name of the data variable in dataset representing "
            "the v-componet of a vector field"
        ),
    )
    var: GridOptions = Field(description="SWAN input grid name")
    fac: float = Field(
        description=(
            "SWAN multiplies all values that are read from file by `fac`. For "
            "instance if the values are given in unit decimeter, one should make "
            "`fac=0.1` to obtain values in m. To change sign use a negative `fac`"
        ),
        default=1.0,
    )

    @model_validator(mode="after")
    def ensure_z1_in_data_vars(self) -> "SwanDataGrid":
        data_vars = self.variables
        for z in [self.z1, self.z2]:
            if z and z not in data_vars:
                logger.debug(f"Adding {z} to data_vars")
                data_vars.append(z)
        self.variables = data_vars
        return self

    def get(
        self,
        destdir: str | Path,
        grid: Optional[SwanGrid] = None,
        time: Optional[TimeRange] = None,
    ) -> Path:
        """Write the data source to a new location.

        Parameters
        ----------
        destdir : str | Path
            The destination directory to write the netcdf data to.
        grid: SwanGrid, optional
            The grid to filter the data to, only used if `self.filter_grid` is True.
        time: TimeRange, optional
            The times to filter the data to, only used if `self.filter_time` is True.

        Returns
        -------
        cmd: str
            The command line string with the INPGRID/READINP commands ready to be
            written to the SWAN input file.

        Note
        ----
        The data are assumed to not have been rotated. We cannot use the grid.rot attr
        as this is the rotation from the model grid object which is not necessarily the
        same as the rotation of the data.

        """
        if self.crop_data:
            if grid is not None:
                self._filter_grid(grid)
            if time is not None:
                self._filter_time(time)

        output_file = os.path.join(destdir, f"{self.var.value}.grd")

        # Create a formatted box for logging
        log_box(
            title=f"WRITING {self.var.value.upper()} GRID DATA",
            logger=logger,
            add_empty_line=False,
        )

        # Log output file and dataset information using bullet points
        items = [f"Output file: {output_file}"]

        # Add variable information if available
        if self.z1:
            shape_info = f"{self.ds[self.z1].shape}"
            items.append(f"Variable: {self.z1} with shape {shape_info}")
        if self.z2:
            shape_info = f"{self.ds[self.z2].shape}"
            items.append(f"Variable: {self.z2} with shape {shape_info}")

        # Add scaling factor
        items.append(f"Scaling factor: {self.fac}")

        # Log all items as a bulleted list
        logger.bullet_list(items, indent=2)

        start_time = time_module.time()
        if self.var.value == "bottom":
            inpgrid, readgrid = self.ds.swan.to_bottom_grid(
                output_file,
                fmt="%4.2f",
                x=self.coords.x,
                y=self.coords.y,
                z=self.z1,
                fac=self.fac,
                rot=0.0,
                vmin=float("-inf"),
            )
        else:
            inpgrid, readgrid = self.ds.swan.to_inpgrid(
                output_file=output_file,
                x=self.coords.x,
                y=self.coords.y,
                z1=self.z1,
                z2=self.z2,
                fac=self.fac,
                rot=0.0,
                var=self.var.name,
            )

        # Log completion and processing time
        elapsed_time = time_module.time() - start_time
        file_size = Path(output_file).stat().st_size / (1024 * 1024)  # Size in MB

        # Use the centralized functions from rompy package

        # Log completion information as a bulleted list
        logger.bullet_list(
            [
                f"Completed in {elapsed_time:.2f} seconds",
                f"File size: {file_size:.2f} MB",
            ],
            indent=2,
        )

        return f"{inpgrid}\n{readgrid}\n"

    def __str__(self):
        return f"SWANDataGrid {self.var.name}"

    def _format_value(self, obj):
        """Format SwanDataGrid values using the new formatting framework.

        This method provides special formatting for SwanDataGrid objects.

        Args:
            obj: The object to format

        Returns:
            A formatted string or None to use default formatting
        """
        # Only format SwanDataGrid objects
        if not isinstance(obj, SwanDataGrid):
            return None

        # Use the new formatting framework
        from rompy.formatting import format_value

        return format_value(obj)
        lines.append(f"  {bullet} Variable:   {obj.var.name}")

        # Add source information if available
        if hasattr(obj, "source") and obj.source:
            source_type = getattr(obj.source, "model_type", "unknown")
            lines.append(f"  {bullet} Source:     {source_type}")

            # Add dataset information if available
            if hasattr(obj.source, "dataset_id"):
                lines.append(f"  {bullet} Dataset ID: {obj.source.dataset_id}")

        # Add coordinate information if available
        if hasattr(obj, "coords") and obj.coords:
            coords = [f"{k}={v}" for k, v in obj.coords.items()]
            coords_str = ", ".join(coords)
            lines.append(f"  {bullet} Coordinates: {coords_str}")

        # Add scaling factor if available
        if hasattr(obj, "fac"):
            lines.append(f"  {bullet} Scale factor: {obj.fac}")

        # Add z variables information if available
        if hasattr(obj, "z1") and obj.z1:
            lines.append(f"  {bullet} Z1 variable: {obj.z1}")
        if hasattr(obj, "z2") and obj.z2:
            lines.append(f"  {bullet} Z2 variable: {obj.z2}")

        # Close with footer
        lines.append(footer)

        return "\n".join(lines)

Attributes

z1 class-attribute instance-attribute

z1: Optional[str] = Field(default=None, description='Name of the data variable in dataset representing either a scaler parameter or the u-componet of a vector field')

z2 class-attribute instance-attribute

z2: Optional[str] = Field(default=None, description='Name of the data variable in dataset representing the v-componet of a vector field')

var class-attribute instance-attribute

var: GridOptions = Field(description='SWAN input grid name')

fac class-attribute instance-attribute

fac: float = Field(description='SWAN multiplies all values that are read from file by `fac`. For instance if the values are given in unit decimeter, one should make `fac=0.1` to obtain values in m. To change sign use a negative `fac`', default=1.0)

Functions

ensure_z1_in_data_vars

ensure_z1_in_data_vars() -> SwanDataGrid
Source code in rompy_swan/data.py
@model_validator(mode="after")
def ensure_z1_in_data_vars(self) -> "SwanDataGrid":
    data_vars = self.variables
    for z in [self.z1, self.z2]:
        if z and z not in data_vars:
            logger.debug(f"Adding {z} to data_vars")
            data_vars.append(z)
    self.variables = data_vars
    return self

get

get(destdir: str | Path, grid: Optional[SwanGrid] = None, time: Optional[TimeRange] = None) -> Path

Write the data source to a new location.

Parameters

destdir : str | Path The destination directory to write the netcdf data to. grid: SwanGrid, optional The grid to filter the data to, only used if self.filter_grid is True. time: TimeRange, optional The times to filter the data to, only used if self.filter_time is True.

Returns

cmd: str The command line string with the INPGRID/READINP commands ready to be written to the SWAN input file.

Note

The data are assumed to not have been rotated. We cannot use the grid.rot attr as this is the rotation from the model grid object which is not necessarily the same as the rotation of the data.

Source code in rompy_swan/data.py
def get(
    self,
    destdir: str | Path,
    grid: Optional[SwanGrid] = None,
    time: Optional[TimeRange] = None,
) -> Path:
    """Write the data source to a new location.

    Parameters
    ----------
    destdir : str | Path
        The destination directory to write the netcdf data to.
    grid: SwanGrid, optional
        The grid to filter the data to, only used if `self.filter_grid` is True.
    time: TimeRange, optional
        The times to filter the data to, only used if `self.filter_time` is True.

    Returns
    -------
    cmd: str
        The command line string with the INPGRID/READINP commands ready to be
        written to the SWAN input file.

    Note
    ----
    The data are assumed to not have been rotated. We cannot use the grid.rot attr
    as this is the rotation from the model grid object which is not necessarily the
    same as the rotation of the data.

    """
    if self.crop_data:
        if grid is not None:
            self._filter_grid(grid)
        if time is not None:
            self._filter_time(time)

    output_file = os.path.join(destdir, f"{self.var.value}.grd")

    # Create a formatted box for logging
    log_box(
        title=f"WRITING {self.var.value.upper()} GRID DATA",
        logger=logger,
        add_empty_line=False,
    )

    # Log output file and dataset information using bullet points
    items = [f"Output file: {output_file}"]

    # Add variable information if available
    if self.z1:
        shape_info = f"{self.ds[self.z1].shape}"
        items.append(f"Variable: {self.z1} with shape {shape_info}")
    if self.z2:
        shape_info = f"{self.ds[self.z2].shape}"
        items.append(f"Variable: {self.z2} with shape {shape_info}")

    # Add scaling factor
    items.append(f"Scaling factor: {self.fac}")

    # Log all items as a bulleted list
    logger.bullet_list(items, indent=2)

    start_time = time_module.time()
    if self.var.value == "bottom":
        inpgrid, readgrid = self.ds.swan.to_bottom_grid(
            output_file,
            fmt="%4.2f",
            x=self.coords.x,
            y=self.coords.y,
            z=self.z1,
            fac=self.fac,
            rot=0.0,
            vmin=float("-inf"),
        )
    else:
        inpgrid, readgrid = self.ds.swan.to_inpgrid(
            output_file=output_file,
            x=self.coords.x,
            y=self.coords.y,
            z1=self.z1,
            z2=self.z2,
            fac=self.fac,
            rot=0.0,
            var=self.var.name,
        )

    # Log completion and processing time
    elapsed_time = time_module.time() - start_time
    file_size = Path(output_file).stat().st_size / (1024 * 1024)  # Size in MB

    # Use the centralized functions from rompy package

    # Log completion information as a bulleted list
    logger.bullet_list(
        [
            f"Completed in {elapsed_time:.2f} seconds",
            f"File size: {file_size:.2f} MB",
        ],
        indent=2,
    )

    return f"{inpgrid}\n{readgrid}\n"

Boundnest1

Bases: BoundaryWaveStation

SWAN BOUNDNEST1 NEST data class.

Source code in rompy_swan/boundary.py
class Boundnest1(BoundaryWaveStation):
    """SWAN BOUNDNEST1 NEST data class."""

    model_type: Literal["boundnest1", "BOUNDNEST1"] = Field(
        default="boundnest1", description="Model type discriminator"
    )
    rectangle: Literal["closed", "open"] = Field(
        default="closed",
        description=(
            "Defines whether boundary is defined over an closed or open rectangle"
        ),
    )

    def get(
        self, destdir: str, grid: SwanGrid, time: Optional[TimeRange] = None
    ) -> str:
        """Write the data source to a new location.

        Parameters
        ----------
        destdir : str | Path
            Destination directory for the SWAN ASCII file.
        grid : RegularGrid
            Grid instance to use for selecting the boundary points.
        time: TimeRange, optional
            The times to filter the data to, only used if `self.crop_data` is True.

        Returns
        -------
        filename: Path
            The filename of the written boundary file.
        cmd : str
            Boundary command string to render in the SWAN INPUT file

        """
        if self.crop_data and time is not None:
            self._filter_time(time)
        if self.crop_data and grid is not None:
            self._filter_grid(grid)

        ds = self._sel_boundary(grid).sortby("dir")

        # If nearest, ensure points are returned at the requested positions
        if self.sel_method == "nearest":
            xbnd, ybnd = self._boundary_points(grid=grid)
            ds["lon"].values = xbnd
            ds["lat"].values = ybnd

        filename = Path(destdir) / f"{self.id}.bnd"
        ds.spec.to_swan(filename)
        cmd = f"BOUNDNEST1 NEST '{filename.name}' {self.rectangle.upper()}"
        return filename, cmd

Attributes

model_type class-attribute instance-attribute

model_type: Literal['boundnest1', 'BOUNDNEST1'] = Field(default='boundnest1', description='Model type discriminator')

rectangle class-attribute instance-attribute

rectangle: Literal['closed', 'open'] = Field(default='closed', description='Defines whether boundary is defined over an closed or open rectangle')

Functions

get

get(destdir: str, grid: SwanGrid, time: Optional[TimeRange] = None) -> str

Write the data source to a new location.

Parameters

destdir : str | Path Destination directory for the SWAN ASCII file. grid : RegularGrid Grid instance to use for selecting the boundary points. time: TimeRange, optional The times to filter the data to, only used if self.crop_data is True.

Returns

filename: Path The filename of the written boundary file. cmd : str Boundary command string to render in the SWAN INPUT file

Source code in rompy_swan/boundary.py
def get(
    self, destdir: str, grid: SwanGrid, time: Optional[TimeRange] = None
) -> str:
    """Write the data source to a new location.

    Parameters
    ----------
    destdir : str | Path
        Destination directory for the SWAN ASCII file.
    grid : RegularGrid
        Grid instance to use for selecting the boundary points.
    time: TimeRange, optional
        The times to filter the data to, only used if `self.crop_data` is True.

    Returns
    -------
    filename: Path
        The filename of the written boundary file.
    cmd : str
        Boundary command string to render in the SWAN INPUT file

    """
    if self.crop_data and time is not None:
        self._filter_time(time)
    if self.crop_data and grid is not None:
        self._filter_grid(grid)

    ds = self._sel_boundary(grid).sortby("dir")

    # If nearest, ensure points are returned at the requested positions
    if self.sel_method == "nearest":
        xbnd, ybnd = self._boundary_points(grid=grid)
        ds["lon"].values = xbnd
        ds["lat"].values = ybnd

    filename = Path(destdir) / f"{self.id}.bnd"
    ds.spec.to_swan(filename)
    cmd = f"BOUNDNEST1 NEST '{filename.name}' {self.rectangle.upper()}"
    return filename, cmd

Components

SWAN command instructions are described in Rompy by a set of pydantic models defined as components. Each component defines a full command instruction such as PROJECT, CGRID, GEN3, NUMERIC, etc. Inputs to the components may include other pydantic models called subcomponents to handle more complex arguments.

Components are subclasses of rompy_swan.components.base.BaseComponent. The base component class implements the following attribues:

  • The model_type field that should be overwritten in each component subclass. The model_type field is defined as a Literal type and is used to discriminate the exact components to use in fields defined by a Union type of two or more components in a declarative framework (i.e., instantiating with a dict from yaml or json file).

  • The cmd() method that must be overwritten in each component subclass. The cmd() method should return either a string or a list of strings to fully define a SWAN command line instruction. A list of strings defines multiple command line instructions that are executed in sequence such as the INPGRID/READGRID components.

  • The render() method that constructs the command line instruction from the content returned from the cmd() method. The render() method is typically called inside the __call__ method of the config class to construct the specific command line instruction from that component, taking care of maximum line size, line break and line continuation.

Components are defined within the rompy_swan.components subpackage and render an entire SWAN command line specification. The following modules are available:

Subcomponents

Subcomponents are defined within the rompy_swan.subcomponents subpackage and render part of a SWAN command line specification. They typically define specific arguments to one or more component. The following modules are available:

Interface

Interface classes provide an interface between swan components and higher level objects such as TimeRange, Data and Grid objects. They are used inside the __call__ method of the config classes to pass instances of these objects to the appropriate components and define consistent parameters to the config after instantiating them.

DataInterface

Bases: RompyBaseModel

SWAN forcing data interface.

Examples

.. ipython:: python :okwarning:

from rompy_swan.interface import DataInterface
Source code in rompy_swan/interface.py
class DataInterface(RompyBaseModel):
    """SWAN forcing data interface.

    Examples
    --------

    .. ipython:: python
        :okwarning:

        from rompy_swan.interface import DataInterface

    """

    model_type: Literal["data_interface", "DATA_INTERFACE"] = Field(
        default="data_interface", description="Model type discriminator"
    )
    bottom: Optional[SwanDataGrid] = Field(default=None, description="Bathymetry data")
    input: list[SwanDataGrid] = Field(default=[], description="Input grid data")

    @field_validator("input")
    @classmethod
    def ensure_unique_var(
        cls, input: list[SwanDataGrid], info: ValidationInfo
    ) -> list[SwanDataGrid]:
        """Ensure that each input var is unique."""
        vars = []
        if info.data["bottom"] is not None:
            vars.append(info.data["bottom"].var)
        vars.extend([inp.var for inp in input])
        if len(vars) != len(set(vars)):
            raise ValueError("Each var must be unique in input")
        return input

    def get(self, staging_dir: Path, grid: SwanGrid, period: TimeRange):
        inputs = []
        if self.bottom is not None:
            inputs.append(self.bottom)
        inputs.extend(self.input)
        cmds = []
        for input in inputs:
            cmds.append(input.get(destdir=staging_dir, grid=grid, time=period))
        return "\n".join(cmds)

    def render(self, *args, **kwargs):
        """Make this class consistent with the components API."""
        return self.get(*args, **kwargs)

Attributes

model_type class-attribute instance-attribute

model_type: Literal['data_interface', 'DATA_INTERFACE'] = Field(default='data_interface', description='Model type discriminator')

bottom class-attribute instance-attribute

bottom: Optional[SwanDataGrid] = Field(default=None, description='Bathymetry data')

input class-attribute instance-attribute

input: list[SwanDataGrid] = Field(default=[], description='Input grid data')

Functions

ensure_unique_var classmethod

ensure_unique_var(input: list[SwanDataGrid], info: ValidationInfo) -> list[SwanDataGrid]

Ensure that each input var is unique.

Source code in rompy_swan/interface.py
@field_validator("input")
@classmethod
def ensure_unique_var(
    cls, input: list[SwanDataGrid], info: ValidationInfo
) -> list[SwanDataGrid]:
    """Ensure that each input var is unique."""
    vars = []
    if info.data["bottom"] is not None:
        vars.append(info.data["bottom"].var)
    vars.extend([inp.var for inp in input])
    if len(vars) != len(set(vars)):
        raise ValueError("Each var must be unique in input")
    return input

get

get(staging_dir: Path, grid: SwanGrid, period: TimeRange)
Source code in rompy_swan/interface.py
def get(self, staging_dir: Path, grid: SwanGrid, period: TimeRange):
    inputs = []
    if self.bottom is not None:
        inputs.append(self.bottom)
    inputs.extend(self.input)
    cmds = []
    for input in inputs:
        cmds.append(input.get(destdir=staging_dir, grid=grid, time=period))
    return "\n".join(cmds)

render

render(*args, **kwargs)

Make this class consistent with the components API.

Source code in rompy_swan/interface.py
def render(self, *args, **kwargs):
    """Make this class consistent with the components API."""
    return self.get(*args, **kwargs)

BoundaryInterface

Bases: RompyBaseModel

SWAN forcing boundary interface.

Examples

.. ipython:: python :okwarning:

from rompy_swan.interface import BoundaryInterface
Source code in rompy_swan/interface.py
class BoundaryInterface(RompyBaseModel):
    """SWAN forcing boundary interface.

    Examples
    --------

    .. ipython:: python
        :okwarning:

        from rompy_swan.interface import BoundaryInterface

    """

    model_type: Literal["boundary_interface", "BOUNDARY_INTERFACE"] = Field(
        default="boundary_interface", description="Model type discriminator"
    )
    kind: Union[Boundnest1, BoundspecSide, BoundspecSegmentXY] = Field(
        default=None, description="Boundary data object"
    )

    def get(self, staging_dir: Path, grid: SwanGrid, period: TimeRange):
        filename, cmd = self.kind.get(destdir=staging_dir, grid=grid, time=period)
        logger.info(f"Generating boundary file: {filename}")
        return cmd

    def render(self, *args, **kwargs):
        """Make this class consistent with the components API."""
        return self.get(*args, **kwargs)

Attributes

model_type class-attribute instance-attribute

model_type: Literal['boundary_interface', 'BOUNDARY_INTERFACE'] = Field(default='boundary_interface', description='Model type discriminator')

kind class-attribute instance-attribute

kind: Union[Boundnest1, BoundspecSide, BoundspecSegmentXY] = Field(default=None, description='Boundary data object')

Functions

get

get(staging_dir: Path, grid: SwanGrid, period: TimeRange)
Source code in rompy_swan/interface.py
def get(self, staging_dir: Path, grid: SwanGrid, period: TimeRange):
    filename, cmd = self.kind.get(destdir=staging_dir, grid=grid, time=period)
    logger.info(f"Generating boundary file: {filename}")
    return cmd

render

render(*args, **kwargs)

Make this class consistent with the components API.

Source code in rompy_swan/interface.py
def render(self, *args, **kwargs):
    """Make this class consistent with the components API."""
    return self.get(*args, **kwargs)

OutputInterface

Bases: TimeInterface

Output group component with consistent times.

Source code in rompy_swan/interface.py
class OutputInterface(TimeInterface):
    """Output group component with consistent times."""

    model_type: Literal["outputinterface", "OUTPUTINTERFACE"] = Field(
        default="outputinterface", description="Model type discriminator"
    )

    @model_validator(mode="after")
    def time_interface(self) -> "OutputInterface":
        """Set the time parameter for all WRITE components."""
        for component in self.group._write_fields:
            obj = getattr(self.group, component)
            if obj is not None:
                times = obj.times or TimeRangeOpen()
                obj.times = self._timerange(times.tfmt, times.dfmt, obj.suffix)

    def _timerange(self, tfmt: int, dfmt: str, suffix: str) -> TimeRangeOpen:
        """Convert generic TimeRange into the Swan TimeRangeOpen subcomponent."""
        return TimeRangeOpen(
            tbeg=self.period.start,
            delt=self.period.interval,
            tfmt=tfmt,
            dfmt=dfmt,
            suffix=suffix,
        )

Attributes

model_type class-attribute instance-attribute

model_type: Literal['outputinterface', 'OUTPUTINTERFACE'] = Field(default='outputinterface', description='Model type discriminator')

Functions

time_interface

time_interface() -> OutputInterface

Set the time parameter for all WRITE components.

Source code in rompy_swan/interface.py
@model_validator(mode="after")
def time_interface(self) -> "OutputInterface":
    """Set the time parameter for all WRITE components."""
    for component in self.group._write_fields:
        obj = getattr(self.group, component)
        if obj is not None:
            times = obj.times or TimeRangeOpen()
            obj.times = self._timerange(times.tfmt, times.dfmt, obj.suffix)

LockupInterface

Bases: TimeInterface

Lockup group component with consistent times.

Source code in rompy_swan/interface.py
class LockupInterface(TimeInterface):
    """Lockup group component with consistent times."""

    model_type: Literal["lockupinterface", "LOCKUPINTERFACE"] = Field(
        default="lockupinterface", description="Model type discriminator"
    )

    def _nonstationary(self, tfmt: str, dfmt: str) -> NONSTATIONARY:
        return NONSTATIONARY(
            tbeg=self.period.start,
            tend=self.period.end,
            delt=self.period.interval,
            tfmt=tfmt,
            dfmt=dfmt,
            suffix="c",
        )

    def _stationary(self, tfmt: str) -> STATIONARY:
        return STATIONARY(time=self.period.start, tfmt=tfmt)

    @model_validator(mode="after")
    def time_interface(self) -> "LockupInterface":
        """Set the time parameter for COMPUTE components."""
        times = self.group.compute.times or NONSTATIONARY()
        if isinstance(times, NONSTATIONARY):
            times = self._nonstationary(times.tfmt, times.dfmt)
        elif isinstance(times, STATIONARY):
            times = self._stationary(times.tfmt)
        else:
            raise ValueError(f"Unknown time type {type(times)}")
        self.group.compute.times = times

Attributes

model_type class-attribute instance-attribute

model_type: Literal['lockupinterface', 'LOCKUPINTERFACE'] = Field(default='lockupinterface', description='Model type discriminator')

Functions

time_interface

time_interface() -> LockupInterface

Set the time parameter for COMPUTE components.

Source code in rompy_swan/interface.py
@model_validator(mode="after")
def time_interface(self) -> "LockupInterface":
    """Set the time parameter for COMPUTE components."""
    times = self.group.compute.times or NONSTATIONARY()
    if isinstance(times, NONSTATIONARY):
        times = self._nonstationary(times.tfmt, times.dfmt)
    elif isinstance(times, STATIONARY):
        times = self._stationary(times.tfmt)
    else:
        raise ValueError(f"Unknown time type {type(times)}")
    self.group.compute.times = times

Types

SWAN types provide valid values for a specific SWAN command line argument.

Link: Literal