Source code for skytropd.functions

# Written by Ori Adam Mar.21.2017
# Edited by Alison Ming Jul.4.2017
# update to Python3, vectorized, and patched bugs - sjs 2.3.22
from typing import Callable, List, Optional, Tuple, Union, overload
import warnings
import numpy as np
try:
    from scipy.integrate import cumulative_trapezoid as cumtrapz
except ImportError:
    from scipy.integrate import cumtrapz
from scipy.interpolate import interp1d

from ._fortran_zero_crossing import fortran_zero_crossing

try:
    trapezoid = np.trapezoid
except AttributeError:
    trapezoid = np.trapz

EARTH_RADIUS = 6371220.0
GRAV = 9.80616
GAS_CONSTANT_DRY = 287.04
SPEC_HEAT_PRES_DRY = 1005.7
KAPPA = GAS_CONSTANT_DRY / SPEC_HEAT_PRES_DRY


def _interp_last_axis_monotonic_grid(
    x: np.ndarray, xp: np.ndarray, fp: np.ndarray
) -> np.ndarray:
    """Vectorized linear interpolation on a monotonic 1-D grid."""

    x = np.asarray(x, dtype=float)
    xp = np.asarray(xp, dtype=float)
    fp = np.asarray(fp)

    if xp.ndim != 1:
        raise ValueError("xp must be one-dimensional")
    if fp.shape[-1] != xp.size:
        raise ValueError(
            f"fp last axis of size {fp.shape[-1]} is not aligned with xp size {xp.size}"
        )
    if x.shape != fp.shape[:-1]:
        raise ValueError(
            f"x shape {x.shape} must match fp leading shape {fp.shape[:-1]}"
        )

    if xp.size < 2:
        raise ValueError("xp must contain at least two points")
    if xp[0] > xp[-1]:
        xp = xp[::-1]
        fp = fp[..., ::-1]

    x_eval = np.where(np.isfinite(x), x, xp[0])
    idx = np.searchsorted(xp, x_eval, side="right") - 1
    idx = np.clip(idx, 0, xp.size - 2)

    x0 = xp[idx]
    x1 = xp[idx + 1]
    y0 = np.take_along_axis(fp, idx[..., None], axis=-1)[..., 0]
    y1 = np.take_along_axis(fp, (idx + 1)[..., None], axis=-1)[..., 0]

    frac = np.divide(
        x_eval - x0,
        x1 - x0,
        out=np.zeros_like(x0, dtype=np.result_type(fp.dtype, float)),
        where=x1 != x0,
    )
    out = y0 + frac * (y1 - y0)
    return np.where(np.isfinite(x), out, np.nan)


def _interp_common_grid_1d(x: np.ndarray, xp: np.ndarray, fp: np.ndarray) -> np.ndarray:
    """Interpolate profiles on the last axis to a shared 1-D target grid."""

    x = np.asarray(x, dtype=float)
    xp = np.asarray(xp, dtype=float)
    fp = np.asarray(fp)

    if x.ndim != 1:
        raise ValueError("x must be one-dimensional")
    if xp.ndim != 1:
        raise ValueError("xp must be one-dimensional")
    if fp.shape[-1] != xp.size:
        raise ValueError(
            f"fp last axis of size {fp.shape[-1]} is not aligned with xp size {xp.size}"
        )
    if xp.size < 2:
        raise ValueError("xp must contain at least two points")

    if xp[0] > xp[-1]:
        xp = xp[::-1]
        fp = fp[..., ::-1]

    idx = np.searchsorted(xp, x, side="right") - 1
    idx = np.clip(idx, 0, xp.size - 2)

    x0 = xp[idx]
    x1 = xp[idx + 1]
    y0 = np.take(fp, idx, axis=-1)
    y1 = np.take(fp, idx + 1, axis=-1)
    frac = (x - x0) / (x1 - x0)
    return y0 + frac * (y1 - y0)


def _nearest_indices_descending_grid(grid_desc: np.ndarray, values: np.ndarray) -> np.ndarray:
    """Return nearest indices on a descending 1-D grid for an array of values."""

    grid_desc = np.asarray(grid_desc, dtype=float)
    values = np.asarray(values, dtype=float)

    if grid_desc.ndim != 1:
        raise ValueError("grid_desc must be one-dimensional")
    if grid_desc.size == 0:
        raise ValueError("grid_desc must not be empty")

    boundary_grid_inc = ((grid_desc[:-1] + grid_desc[1:]) / 2.0)[::-1]
    right_idx_inc = np.searchsorted(boundary_grid_inc, values, side="left")
    return np.clip(grid_desc.size - 1 - right_idx_inc, 0, grid_desc.size - 1)


[docs]def find_nearest( array: np.ndarray, value: float, axis: Optional[int] = None, skipna: bool = False ) -> np.ndarray: """ Find the index of the item in the array nearest to the value Parameters ---------- array : numpy.ndarray array to search value : float value to be found axis : int, optional the axis of the array to search. if not given, the array is flattened prior to being searched. If given, output will be (n-1)-dimensional, returning the position of the value within the specified axis skipna : bool, optional whether to skip over NaN in the array, by default False Returns ------- numpy.ndarray index(es) nearest to value in array """ array = np.asarray(array) if (value < array.min()) | (value > array.max()): warnings.warn( "searching outside the bounds of an array will return " "index of the closest boundary", category=RuntimeWarning, stacklevel=2, ) if not skipna: if np.isnan(array).any(): warnings.warn( "searching an array with NaNs will return index " "of first NaN, use skipna=True to ignore NaNs", category=RuntimeWarning, stacklevel=2, ) argmin: Callable = np.argmin else: argmin = np.nanargmin nearest_inds: np.ndarray = argmin(np.abs(array - value), axis=axis) # type: ignore return nearest_inds
# Converted to python by Paul Staten Jul.29.2017
[docs]def TropD_Calculate_MaxLat( F: np.ndarray, lat: np.ndarray, n: int = 6, axis: int = -1 ) -> np.ndarray: """ Find latitude of absolute maximum value for a given interval *Note*: assumes a smoothly varying function Parameters ---------- F : numpy.ndarray (dim1, ..., dimN-1, lat) N-dimensional array w/ lat as specified axis (default last), data assumed contingous with invalid data only on ends. interior nans are untested and will prompt warning lat : numpy.ndarray latitude array n : int, optional rank of moment used to calculate the position of max value. n = 1,2,4,6,8,... , by default 6 axis : int, optional axis corresponding to latitude, by default -1 Returns ------- numpy.ndarray (dim1, ..., dimN-1) N-1 dimensional array of latitude(s) of max values of F """ # type casting F = np.asarray(F).copy() lat = np.asarray(lat) n = int(n) axis = int(axis) # input checking if n < 1: raise ValueError("The smoothing parameter n must be >= 1") if not np.isfinite(F).any(): raise ValueError( "input field F has only NaN/inf values," " max lat cannot be computed" ) if not F.shape[axis] == lat.size: raise ValueError( f"input field lat axis of size {F.shape[axis]} not " f"aligned with lat coordinates of size {lat.size}" ) # ensure lat axis is last axes_list = list(range(F.ndim)) axes_list[axis], axes_list[-1] = axes_list[-1], axes_list[axis] F = F.transpose(axes_list) # map F to [0,1] F -= np.nanmin(F, axis=-1)[..., None] F /= np.nanmax(F, axis=-1)[..., None] # in order for this function to handle "jagged" arrays (such as time # -varying domains, e.g. domains poleward/equatorward of another # circulation feature), it will receive all data on the same grid, # but with masked data outside the region of interest. # However, simply filling with zeros produces extra boundary values # in the integration, so we need to correct for this if not np.isfinite(F).all(): F_filled = np.where(np.isfinite(F), F, 0.0) # find edges of nan regions nanbounds = np.diff(np.isfinite(F), axis=-1) # ensure we only have one contiguous region extra_nans_check = (nanbounds.sum(axis=-1) > 2.0).any() interior_nans_check = (np.isfinite(F)[..., 0] & np.isfinite(F)[..., -1]).any() if extra_nans_check or interior_nans_check: warnings.warn( "detected NaN/inf data located between valid data, " "this may not be handled correctly", stacklevel=2, category=RuntimeWarning, ) # now construct arrays which are zero everywhere except on the # boundaries of nan regions. We can integrate these to correct for the # extra data added by trapz at boundaries pad = np.zeros_like(F[..., 0])[..., None] rbounds = np.where(nanbounds & np.isfinite(F[..., 1:]), F[..., 1:], 0.0) lbounds = np.where(nanbounds & np.isfinite(F[..., :-1]), F[..., :-1], 0.0) bounds = np.concatenate([pad, rbounds], axis=-1) + np.concatenate( [lbounds, pad], axis=-1 ) # now integrate and remove the extra boundary values added by trapz nom = ( trapezoid(F_filled**n * lat, lat, axis=-1) - trapezoid(bounds**n * lat, lat, axis=-1) / 2.0 ) denom = ( trapezoid(F_filled**n, lat, axis=-1) - trapezoid(bounds**n, lat, axis=-1) / 2.0 ) # weighted integral to account for discrete grid Ymax = nom / denom # if the grid is normal, just go ahead and integrate else: # weighted integral to account for discrete grid Ymax = trapezoid((F**n) * lat, lat, axis=-1) / trapezoid(F**n, lat, axis=-1) return Ymax
[docs]def TropD_Calculate_Mon2Season( Fm: np.ndarray, season: List[int], m: Optional[int] = None, first_jan_idx: int = 0, axis: int = 0, ) -> np.ndarray: """ Calculate unweighted seasonal means from monthly time series. Dec is averaged with Jan and Feb of the **same** year, not the following year Parameters ---------- Fm : numpy.ndarray (time, dim2, ..., dimN) N-dimensional array, only utilizes full years-worth of data will be utilized season : list of int 0-based list of month indices, e.g., [11,0,1] for DJF m : int, optional deprecated. use first_jan_idx instead. first_jan_idx : int, optional index of first occurence of Jan in Fm, by default 0 axis : int, optional time axis to resample along, by default 0 Returns ------- numpy.ndarray (time, dim2, ..., dimN) the annual time series of the seasonal means (not weighted for unequal month lengths) """ Fm = np.asarray(Fm) season = list(season) if not all([ssn in range(12) for ssn in season]): raise ValueError("season can only include indices from 0 to 11") if m is not None: warnings.warn( "use of parameter m is deprecated and will be removed in" " future versions, please use first_jan_idx instead", category=DeprecationWarning, stacklevel=2, ) first_jan_idx = m # ensure time first axis axes_list = list(range(Fm.ndim)) axes_list[axis], axes_list[0] = axes_list[0], axes_list[axis] Fm = Fm.transpose(axes_list) whole_years = (Fm.shape[0] - first_jan_idx) // 12 last_jan_idx = whole_years * 12 + first_jan_idx Fm = Fm[first_jan_idx:last_jan_idx, ...] F = (Fm.reshape(whole_years, 12, *Fm.shape[1:])[:, season]).mean(axis=1) return F
[docs]def TropD_Calculate_StreamFunction( V: np.ndarray, lat: np.ndarray, lev: np.ndarray ) -> np.ndarray: """ Calculates the meridional mass streamfunction by integrating meridional wind from top of the atmosphere to surface Parameters ---------- V : numpy.ndarray (dim1, ..., dimN-2, lat, lev) or (dim1, ..., dimN-2, lev, lat) N-dimensional array of zonal-mean meridional wind with final 2 axes corresponding to latitude and vertical level, respectively, or the swapped trailing order ``(..., lev, lat)`` lat : numpy.ndarray equally-spaced latitude array lev : numpy.ndarray vertical level array in hPa Returns ------- numpy.ndarray (same shape as V) the meridional mass overturning streamfunction (psi) """ V = np.asarray(V) lat = np.asarray(lat) lev = np.asarray(lev) transposed_trailing_axes = False if V.shape[-2:] == (lat.size, lev.size): pass elif V.shape[-2:] == (lev.size, lat.size): V = np.swapaxes(V, -2, -1) transposed_trailing_axes = True else: raise ValueError( f"final dimensions on V {V.shape[-2:]} and grid coordinates must match " f"either (lat, lev)=({lat.size},{lev.size}) or " f"(lev, lat)=({lev.size},{lat.size})" ) # mask any subsurface or missing data V = np.where(np.isfinite(V), V, 0) cos_lat = np.cos(lat * np.pi / 180.0)[:, None] # cumtrapz: if F(x) = int f(x) dx, return F(x) - F(x[0]) psi = ( cumtrapz(V, lev * 100.0, axis=-1, initial=0.0) * (EARTH_RADIUS / GRAV * 2.0 * np.pi) * cos_lat ) if transposed_trailing_axes: psi = np.swapaxes(psi, -2, -1) return psi
@overload def TropD_Calculate_TropopauseHeight( T: np.ndarray, P: np.ndarray, Z: None ) -> np.ndarray: ... @overload def TropD_Calculate_TropopauseHeight( T: np.ndarray, P: np.ndarray, Z: np.ndarray ) -> Tuple[np.ndarray, np.ndarray]: ...
[docs]def TropD_Calculate_TropopauseHeight( T: np.ndarray, P: np.ndarray, Z: Optional[np.ndarray] = None ) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]: """ Calculate the tropopause height in isobaric coordinates Based on the method in Birner (2010), according to the WMO definition: first level where the lapse rate <= 2K/km AND where the lapse rate <= 2K/km for at least 2km above that level Parameters ---------- T : numpy.ndarray (dim1, ..., dimN-1, lev) N-dimensional temperature array P : numpy.ndarray (lev,) pressure levels in hPa Z : numpy.ndarray (same shape as T), optional N-dimensional array to be interpolated at tropopause, such as geopotential height (m) to yield tropopause height Returns ------- numpy.ndarray (dim1, ..., dimN-1) the tropopause pressure in hPa numpy.ndarray (dim1, ..., dimN-1), optional returned if Z is provided. Corresponds to Z evaluated at the tropopause. If Z is geopotential height, it is tropopause altitude (m) """ COMPUTE_Z = Z is not None T = np.atleast_2d(T) if COMPUTE_Z: Z = np.atleast_2d(Z) # type: ignore if T.shape[-1] != P.size: raise ValueError( f"last axis of temperature data, size {T.shape[-1]}, " f"is not aligned with pressure data, size {P.size}" ) # make P monotonically decreasing if P[-1] > P[0]: P = P[::-1] T = T[..., ::-1] if COMPUTE_Z: Z = Z[..., ::-1] # type: ignore Pk = (P * 100.0) ** KAPPA Pk_mid = (Pk[:-1] + Pk[1:]) / 2.0 T_mid = (T[..., :-1] + T[..., 1:]) / 2.0 Gamma = ( np.diff(T, axis=-1) / np.diff(Pk) * Pk_mid / T_mid * GRAV / SPEC_HEAT_PRES_DRY * 1000.0 ) # K / km PI = (np.linspace(1000.0, 1.0, 1000) * 100.0) ** KAPPA candidate_mask = (PI < (550.0 * 100.0) ** KAPPA) & (PI > (75.0 * 100.0) ** KAPPA) candidate_idx = np.nonzero(candidate_mask)[0] PI_candidate = PI[candidate_mask] TI_candidate = _interp_common_grid_1d(PI_candidate, Pk_mid, T_mid) candidate_top = ( PI_candidate - 2000.0 * GRAV / SPEC_HEAT_PRES_DRY / TI_candidate * PI_candidate ) idx2km = _nearest_indices_descending_grid(PI, candidate_top) gi_start = min(candidate_idx[0], int(np.min(idx2km))) gi_stop = max(candidate_idx[-1], int(np.max(idx2km))) GI_slice = slice(gi_start, gi_stop + 1) PI_window = PI[GI_slice] GI = _interp_common_grid_1d(PI_window, Pk_mid, Gamma) candidate_idx_window = candidate_idx - gi_start idx2km_window = idx2km - gi_start # points in upper troposphere where lapse rate is less than 2K/km trop_layer = GI[..., candidate_idx_window] <= 2.0 first_bad_idx = np.where(GI > 2.0, np.arange(PI_window.size), PI_window.size) first_bad_idx = np.minimum.accumulate(first_bad_idx[..., ::-1], axis=-1)[..., ::-1] valid_tropopause = trop_layer & ( idx2km_window < first_bad_idx[..., candidate_idx_window] ) first_valid_idx = np.argmax(valid_tropopause, axis=-1) has_valid = np.any(valid_tropopause, axis=-1) Pt = np.full(T.shape[:-1], np.nan, dtype=np.result_type(T.dtype, float)) Pt[has_valid] = PI_window[candidate_idx_window[first_valid_idx[has_valid]]] Pt = Pt ** (1.0 / KAPPA) / 100.0 if COMPUTE_Z: Ht = _interp_last_axis_monotonic_grid(Pt, P, Z) # type: ignore return Pt, Ht else: return Pt
def _zero_crossing_python( F_last_axis: np.ndarray, lat: np.ndarray, lat_uncertainty: float ) -> np.ndarray: """Reference Python implementation for validating the compiled backend.""" # Find all sign changes D = np.diff(np.sign(F_last_axis), axis=-1) # initialize for looping ZC = np.full(D.shape[:-1], np.nan) # find zero crossings looping over all latitude bands for i in range(ZC.size): iband = np.unravel_index(i, ZC.shape) Di = D[iband] Fi = F_last_axis[iband] # Make sure a zero crossing exists if not (np.any(Fi > 0) and np.any(Fi < 0)): continue # no sign changes # get indices of all sign changes a = Di.nonzero()[0] a = a[np.isfinite(Di[a])] # If more than one zero crossing of same sign exists # in proximity to the first zero crossing. if a.size > 2 and np.abs(lat[a[2]] - lat[a[0]]) < lat_uncertainty: continue # first sign change if a.size != 0: a1 = a[0] # if there is an exact zero, use its latitude... if np.abs(Di[a1]) == 1: ZC[iband] = lat[a1 + 1] else: # np.abs(D[a1]) == 2 (directly from + to - or - to +) ZC[iband] = ( -Fi[a1] * (lat[a1 + 1] - lat[a1]) / (Fi[a1 + 1] - Fi[a1]) + lat[a1] ) return ZC # Converted to python by Paul Staten Jul.29.2017
[docs]def TropD_Calculate_ZeroCrossing( F: np.ndarray, lat: np.ndarray, lat_uncertainty: float = 0.0, axis: int = -1 ) -> np.ndarray: """ Find the first (with increasing index) zero crossing of the function F. This function uses the compiled backend for the per-latitude-band search. Parameters ---------- F : numpy.ndarray (dim1, ..., dimN-1, lat) N dimensional array to search lat : numpy.ndarray (lat,) latitude array lat_uncertainty : float, optional same unit as lat. The minimum distance allowed between adjacent zero crossings of identical sign change, by default 0.0. For example, for lat_uncertainty = 10, if the most equatorward zero crossing is from positive to negative, NaN is returned if an additional zero crossing from positive to negative is found within 10 degrees of the first one. axis : int, optional axis corresponding to latitude, by default -1 (last) Returns ------- np.ndarray latitude(s) of zero crossing by linear interpolation """ F = np.atleast_2d(F) lat = np.asarray(lat) lat_uncertainty = np.abs(lat_uncertainty) if F.shape[axis] != lat.size: raise ValueError( f"input array F with lat axis of size {F.shape[axis]}" f" is not aligned with lat coords of size {lat.size}" ) if lat.size < 4: raise ValueError( "requires at least 4 latitudes to find zero " f"crossing, got {lat.size}" ) # ensure lat axis is last axes_list = list(range(F.ndim)) axes_list[axis], axes_list[-1] = axes_list[-1], axes_list[axis] F = F.transpose(axes_list) F_shape = F.shape[:-1] F_flat = F.reshape(-1, lat.size) ZC_fortran = fortran_zero_crossing(F_flat, lat, lat_uncertainty) return ZC_fortran.reshape(F_shape)