# Written by Ori Adam Mar.21.2017
# Edited by Alison Ming Jul.4.2017
# rewrite for readability/vectorization - sjs 1.27.22
from typing import Optional, Tuple, Callable, Union
import warnings
from functools import wraps
from inspect import signature
import re
import numpy as np
from numpy.polynomial import polynomial
try:
from scipy.integrate import cumulative_trapezoid as cumtrapz
except ImportError:
from scipy.integrate import cumtrapz
from scipy.interpolate import interp1d
from scipy.signal import fftconvolve
from .functions import (
find_nearest,
TropD_Calculate_MaxLat,
TropD_Calculate_StreamFunction,
TropD_Calculate_TropopauseHeight,
TropD_Calculate_ZeroCrossing,
)
from ._fortran_zero_crossing import fortran_descending_threshold_crossing
# kappa = R_dry / Cp_dry
KAPPA = 287.04 / 1005.7
try:
trapezoid = np.trapezoid
except AttributeError:
trapezoid = np.trapz
def _metric_code_from_name(func_name: str) -> str:
"""Infer the base metric code from a metric wrapper function name."""
known_codes = ("EDJ", "OLR", "PE", "PSI", "PSL", "STJ", "TPB", "UAS", "GWL", "1sigma")
prefix = "TropD_Metric_"
metric_name = func_name[len(prefix) :] if func_name.startswith(prefix) else func_name
for code in known_codes:
if (
metric_name == code
or metric_name.startswith(code + "_")
or metric_name.endswith("_" + code)
or ("_" + code + "_") in metric_name
):
return code
return metric_name.split("_")[-1]
[docs]def hemisphere_handler(
metric_func: Callable[..., Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]]
) -> Callable:
"""
Wrapper for metrics to allow one or two hemispheres of data to be passed
Parameters
----------
metric_func : Callable
the skytropd metric function to wrap
Returns
-------
Callable
wrapped function with same name, docstring, and call signature as the original
function, but which operates over each hemisphere independently
"""
@wraps(metric_func)
def wrapped_metric_func(
arr: np.ndarray, lat: np.ndarray, *args, **kwargs
) -> Tuple[np.ndarray, np.ndarray]:
"""
wrapper for skytropd metric functions which allows one or two hemispheres of data
to be passed
Parameters
----------
arr : np.ndarray
first argument to a skytropd metric function is always the field used for the
metric (zonal wind, OLR, meridional wind, etc.)
lat : np.ndarray
second argument to a metric function is always the latitude array
*args
Other arguments passed to the metric function, typically will only be the
pressure level array if any
**kwargs
Keyword arguments passed to the metric function, all have a `method` keyword,
some have others
Returns
-------
PhiSH : np.ndarray, optional
metric latitude in SH (if SH latitudes are provided)
PhiNH : np.ndarray, optional
metric latitude in NH (if NH latitudes are provided)
UmaxSH : np.ndarray, optional
maximum of SH zonal wind (if using `TropD_Metric_EDJ` or `TropD_Metric_STJ`
and `method="fit"`)
UmaxNH : np.ndarray, optional
maximum of NH zonal wind (if using `TropD_Metric_EDJ` or `TropD_Metric_STJ`
and `method="fit"`)
"""
kwargs.pop("hem", None)
# get the metric identifier from the function name
metric_code = _metric_code_from_name(metric_func.__name__)
# we need to know which dimension is the latitude dimension to split it
# appropriately. In functions which accept vertically-resolved input data,
# the vertical level is usually last and latitude is second to last.
# TropD_Metric_PSI also accepts the trailing order (..., lev, lat).
has_extra_dim = False
# these all require vertically-resolved data
if metric_code in ["STJ", "TPB", "PSI"]:
has_extra_dim = True
# these take it optionally, so we need to figure out if it was provided, either as
# an arg (it will be third positional and probably the only arg in args) or kwarg
elif metric_code in ["UAS", "EDJ"]:
has_extra_dim = (len(args) > 0) or (kwargs.get("lev", None) is not None)
# Shah metrics may also have an extra last dimension if zonal_mean_tracer=False
# (but it is lon, not lev)
elif metric_code in ["GWL", "1sigma"]:
has_extra_dim = not kwargs.get(
"zonal_mean_tracer",
signature(metric_func).parameters["zonal_mean_tracer"].default,
)
lat_axis = -2 if has_extra_dim else -1
if metric_code == "PSI":
lev = args[0] if len(args) > 0 else kwargs.get("lev", None)
if lev is not None:
lev = np.asarray(lev)
if arr.shape[-2:] == (lev.size, lat.size):
lat_axis = -1
# now the TropD_Metric_TPB accepts a Z array kwarg that also needs to be split
# based on latitude, so we need to get that if it is there
Z = None
if metric_code == "TPB":
Z = kwargs.pop("Z", None)
# now we just split by hemisphere and apply as if the data was for the NH
Phi_list = []
if (lat < -20.0).any():
# let's make sure we include the equator point just in case
SHmask = lat < 0.5
SHarr_mask = [slice(None)] * arr.ndim
SHarr_mask[lat_axis] = SHmask
if Z is not None:
kwargs["Z"] = Z[tuple(SHarr_mask)]
Phi_list.append(
metric_func(
# for the TropD_Metric_PSI, it takes meridional wind. In order to
# make meridional wind in the SH like the NH, we have to flip the sign
arr[tuple(SHarr_mask)] * (-1.0 if metric_code == "PSI" else 1.0),
-lat[SHmask],
*args,
**kwargs,
)
)
if isinstance(Phi_list[0], tuple):
Phi_list_temp = list(Phi_list[0])
#Phi_list[0][0] *= -1.0
Phi_list_temp[0] *= -1.0
Phi_list[0] = tuple(Phi_list_temp)
else:
Phi_list[0] *= -1.0
if (lat > 20.0).any():
NHmask = lat > -0.5
NHarr_mask = [slice(None)] * arr.ndim
NHarr_mask[lat_axis] = NHmask
if Z is not None:
kwargs["Z"] = Z[tuple(NHarr_mask)]
Phi_list.append(
metric_func(arr[tuple(NHarr_mask)], lat[NHmask], *args, **kwargs)
)
# one final snag before we return. TropD_Metric_EDJ and TropD_Metric_STJ
# potentially return extra arrays if using `method="fit"`, so we need to
# flatten our return list and potentially swap some variables if global data was
# provided so that metric latitudes are always returned first
if metric_code in ["EDJ", "STJ"]:
method_used = kwargs.get(
"method", signature(metric_func).parameters["method"].default
)
if method_used == "fit":
Phi_list = [item for pair in Phi_list for item in pair]
if len(Phi_list) == 4:
Phi_list[1], Phi_list[2] = Phi_list[2], Phi_list[1]
return tuple(Phi_list) # type: ignore
return wrapped_metric_func
def _coerce_trailing_lat_lev_axes(
field: np.ndarray, lat: np.ndarray, lev: np.ndarray, field_name: str
) -> np.ndarray:
"""Return ``field`` with trailing axes ordered as ``(..., lat, lev)``."""
field = np.asarray(field)
lat = np.asarray(lat)
lev = np.asarray(lev)
if field.shape[-2:] == (lat.size, lev.size):
return field
if field.shape[-2:] == (lev.size, lat.size):
return np.swapaxes(field, -2, -1)
raise ValueError(
f"final dimensions on {field_name} {field.shape[-2:]} and grid coordinates "
f"must match either (lat, lev)=({lat.size},{lev.size}) or "
f"(lev, lat)=({lev.size},{lat.size})"
)
def _interp_last_axis_common_grid(
x: np.ndarray, xp: np.ndarray, fp: np.ndarray
) -> np.ndarray:
"""Vectorized linear interpolation for profiles that share a common x-grid."""
x = np.asarray(x, dtype=float)
xp = np.asarray(xp, dtype=float)
fp = np.asarray(fp)
if xp.ndim != 1:
raise ValueError("xp must be one-dimensional")
if fp.shape[-1] != xp.size:
raise ValueError(
f"fp last axis of size {fp.shape[-1]} is not aligned with xp size {xp.size}"
)
if x.shape != fp.shape[:-1]:
raise ValueError(
f"x shape {x.shape} must match fp leading shape {fp.shape[:-1]}"
)
x_eval = np.where(np.isfinite(x), x, xp[0])
idx = np.searchsorted(xp, x_eval, side="right") - 1
idx = np.clip(idx, 0, xp.size - 2)
x0 = xp[idx]
x1 = xp[idx + 1]
y0 = np.take_along_axis(fp, idx[..., None], axis=-1)[..., 0]
y1 = np.take_along_axis(fp, (idx + 1)[..., None], axis=-1)[..., 0]
frac = np.divide(
x_eval - x0,
x1 - x0,
out=np.zeros_like(x0, dtype=np.result_type(fp.dtype, float)),
where=x1 != x0,
)
out = y0 + frac * (y1 - y0)
return np.where(np.isfinite(x), out, np.nan)
def _interp_midpoint_field_to_grid(midpoint_field: np.ndarray, x: np.ndarray) -> np.ndarray:
"""Interpolate midpoint values back to the original 1-D grid along the last axis."""
midpoint_field = np.asarray(midpoint_field)
x = np.asarray(x, dtype=float)
if x.ndim != 1:
raise ValueError("x must be one-dimensional")
if midpoint_field.shape[-1] != x.size - 1:
raise ValueError(
"midpoint_field last axis must be one smaller than the target x-grid"
)
out = np.empty(midpoint_field.shape[:-1] + (x.size,), dtype=midpoint_field.dtype)
out[..., 0] = midpoint_field[..., 0]
out[..., -1] = midpoint_field[..., -1]
if x.size > 2:
dx_prev = x[1:-1] - x[:-2]
dx_next = x[2:] - x[1:-1]
out[..., 1:-1] = (
midpoint_field[..., :-1] * dx_next + midpoint_field[..., 1:] * dx_prev
) / (dx_prev + dx_next)
return out
def _maxlat_last_axis_no_nan(F: np.ndarray, lat: np.ndarray, n: int) -> np.ndarray:
"""Fast path for MaxLat when latitude is the last axis and there are no NaNs."""
F = np.asarray(F)
lat = np.asarray(lat, dtype=float)
if F.shape[-1] != lat.size:
raise ValueError(
f"last axis of F with size {F.shape[-1]} is not aligned with lat size {lat.size}"
)
Fmin = F.min(axis=-1, keepdims=True)
Frange = F.max(axis=-1, keepdims=True) - Fmin
scaled = np.divide(
F - Fmin,
Frange,
out=np.full_like(F, np.nan, dtype=np.result_type(F.dtype, float)),
where=Frange != 0,
)
weights = scaled**n
return trapezoid(weights * lat, lat, axis=-1) / trapezoid(weights, lat, axis=-1)
def _quadratic_peak_fit_last_axis(
field: np.ndarray, lat: np.ndarray, n_fit: int = 1
) -> Tuple[np.ndarray, np.ndarray]:
"""Return quadratic-fit peak latitude and strength for data on the last axis."""
field = np.asarray(field)
lat = np.asarray(lat, dtype=float)
if field.shape[-1] != lat.size:
raise ValueError(
f"last axis of field with size {field.shape[-1]} is not aligned with lat size {lat.size}"
)
field_flat = field.reshape(-1, lat.size)
out_shape = field.shape[:-1]
phi = np.zeros(field_flat.shape[0], dtype=np.result_type(field.dtype, float))
umax = np.zeros(field_flat.shape[0], dtype=np.result_type(field.dtype, float))
imax = np.nanargmax(field_flat, axis=-1)
boundary = (imax == 0) | (imax == lat.size - 1)
phi[boundary] = lat[imax[boundary]]
interior_rows = np.where(~boundary)[0]
if interior_rows.size == 0:
return phi.reshape(out_shape), umax.reshape(out_shape)
if int(n_fit) == 1:
interior_imax = imax[interior_rows]
for peak_idx in np.unique(interior_imax):
rows = interior_rows[interior_imax == peak_idx]
coeffs = polynomial.polyfit(
lat[peak_idx - 1 : peak_idx + 2],
field_flat[rows, peak_idx - 1 : peak_idx + 2].T,
deg=2,
)
phi[rows] = -coeffs[1] / (2.0 * coeffs[2])
umax[rows] = (
4.0 * coeffs[2] * coeffs[0] - coeffs[1] * coeffs[1]
) / 4.0 / coeffs[2]
return phi.reshape(out_shape), umax.reshape(out_shape)
for row in interior_rows:
peak_idx = imax[row]
profile = field_flat[row]
if n_fit > peak_idx or n_fit > profile.size - peak_idx + 1:
fit_half_width = min(peak_idx, profile.size - peak_idx + 1)
else:
fit_half_width = n_fit
coeffs = polynomial.polyfit(
lat[peak_idx - fit_half_width : peak_idx + fit_half_width + 1],
profile[peak_idx - fit_half_width : peak_idx + fit_half_width + 1],
deg=2,
)
phi[row] = -coeffs[1] / (2.0 * coeffs[2])
umax[row] = (
4.0 * coeffs[2] * coeffs[0] - coeffs[1] * coeffs[1]
) / 4.0 / coeffs[2]
return phi.reshape(out_shape), umax.reshape(out_shape)
def _psi_metric_latitude(
Psi: np.ndarray,
lat: np.ndarray,
lev: np.ndarray,
method: str = "Psi_500",
lat_uncertainty: float = 0.0,
threshold: Optional[float] = 0.1,
) -> np.ndarray:
"""Return HC edge latitude from a precomputed mass streamfunction."""
Psi = _coerce_trailing_lat_lev_axes(Psi, lat, lev, "Psi")
lat = np.asarray(lat)
lev = np.asarray(lev)
if threshold is not None:
threshold = float(threshold)
if threshold < 0.0:
raise ValueError("threshold must be non-negative or None")
# make latitude vector monotonically increasing
if lat[-1] < lat[0]:
Psi = Psi[..., ::-1, :]
lat = lat[::-1]
cos_lat = np.cos(lat * np.pi / 180.0)[:, None]
psi_method = method.strip()
layer_match = re.fullmatch(
r"Psi_(\d+(?:\.\d+)?)_(\d+(?:\.\d+)?)",
psi_method,
flags=re.IGNORECASE,
)
layer_percent_match = re.fullmatch(
r"Psi_(\d+(?:\.\d+)?)_(\d+(?:\.\d+)?)_(\d+(?:\.\d+)?)Perc(?:_(center2d|profile))?",
psi_method,
flags=re.IGNORECASE,
)
if (psi_method == "Psi_500") or (psi_method == "Psi_500_10Perc"):
# Use Psi at the level nearest to 500 hPa
P = Psi[..., find_nearest(lev, 500.0)]
elif psi_method == "Psi_500_Int":
# Use integrated Psi from p=0 to level nearest to 500 hPa
PPsi = cumtrapz(Psi * cos_lat, 100.0 * lev, axis=-1, initial=0.0)
P = PPsi[..., find_nearest(lev, 500.0)]
elif psi_method == "Psi_Int":
# Use vertical mean of Psi
P = trapezoid(Psi * cos_lat, 100.0 * lev, axis=-1)
elif layer_match is not None:
lower = float(layer_match.group(1))
upper = float(layer_match.group(2))
lower_bound = min(lower, upper)
upper_bound = max(lower, upper)
# Use Psi averaged between the requested pressure levels
layer_mask = (lev >= lower_bound) & (lev <= upper_bound)
if not np.any(layer_mask):
raise ValueError(
"no pressure levels found within the requested PSI layer "
f"{lower:g}-{upper:g} hPa"
)
P = trapezoid(
Psi[..., layer_mask] * cos_lat, lev[layer_mask] * 100.0, axis=-1
)
elif layer_percent_match is not None:
lower = float(layer_percent_match.group(1))
upper = float(layer_percent_match.group(2))
threshold_fraction = float(layer_percent_match.group(3)) / 100.0
threshold_mode = (layer_percent_match.group(4) or "center2d").lower()
lower_bound = min(lower, upper)
upper_bound = max(lower, upper)
layer_mask = (lev >= lower_bound) & (lev <= upper_bound)
if not np.any(layer_mask):
raise ValueError(
"no pressure levels found within the requested PSI layer "
f"{lower:g}-{upper:g} hPa"
)
profile = Psi[..., layer_mask].mean(axis=-1)
nh_mask = lat > 0.0
if not np.any(nh_mask):
raise ValueError(
"requires at least one positive latitude for PSI layer Perc methods"
)
out_shape = Psi.shape[:-2]
phi = np.full(out_shape, np.nan, dtype=np.result_type(Psi.dtype, float))
psi_flat = Psi.reshape(-1, lat.size, lev.size)
profile_flat = profile.reshape(-1, lat.size)
phi_flat = phi.reshape(-1)
nh_lat_indices = np.where(nh_mask)[0]
start_idx = np.full(psi_flat.shape[0], -1, dtype=np.int32)
threshold_values = np.full(psi_flat.shape[0], np.nan, dtype=float)
for i in range(psi_flat.shape[0]):
nh_field = psi_flat[i, nh_mask, :]
if not np.isfinite(nh_field).any():
continue
center_flat_index = int(np.nanargmax(nh_field))
center_lat_subindex, center_level_index = np.unravel_index(
center_flat_index, nh_field.shape
)
center_lat_index = nh_lat_indices[center_lat_subindex]
center_value_2d = float(nh_field[center_lat_subindex, center_level_index])
center_value_profile = float(profile_flat[i, center_lat_index])
if threshold_mode == "center2d":
threshold_values[i] = threshold_fraction * center_value_2d
elif threshold_mode == "profile":
threshold_values[i] = threshold_fraction * center_value_profile
else:
raise ValueError("unrecognized method " + method)
start_idx[i] = center_lat_index
phi_flat[:] = fortran_descending_threshold_crossing(
profile_flat, lat, start_idx, threshold_values
)
return phi
else:
raise ValueError("unrecognized method " + method)
# define regions of interest
subpolar_boundary = 30.0
polar_boundary = 60.0
mask = (lat > 0) & (lat < polar_boundary)
subpolar_mask = (lat > 0) & (lat < subpolar_boundary)
lat_masked = lat[mask]
# 1. Find latitude of maximal tropical Psi
Pmax_lat_masked = TropD_Calculate_MaxLat(P[..., subpolar_mask], lat[subpolar_mask])
# 2. Find latitude of minimal subtropical Psi poleward of tropical maximum
lat_after_Pmax = lat_masked >= Pmax_lat_masked[..., None]
Plat_after_Pmax = np.where(lat_after_Pmax, P[..., mask], np.nan)
Pmin_lat_masked = TropD_Calculate_MaxLat(-Plat_after_Pmax, lat_masked)
# 3. Find the zero crossing between the above latitudes
lat_in_between = (lat_masked <= Pmin_lat_masked[..., None]) & lat_after_Pmax
Plat_in_between = np.where(lat_in_between, P[..., mask], np.nan)
def _psi_threshold_crossing(threshold_value: float) -> np.ndarray:
Pmax = P[..., subpolar_mask].max(axis=-1)[..., None]
return TropD_Calculate_ZeroCrossing(
Plat_in_between - threshold_value * Pmax,
lat_masked,
lat_uncertainty=lat_uncertainty,
)
if psi_method == "Psi_500_10Perc":
threshold_value = 0.1 if threshold is None else threshold
Phi = _psi_threshold_crossing(threshold_value)
else:
Phi = TropD_Calculate_ZeroCrossing(
Plat_in_between, lat_masked, lat_uncertainty=lat_uncertainty
)
if threshold is not None:
Phi = np.where(np.isnan(Phi), _psi_threshold_crossing(threshold), Phi)
return Phi
[docs]@hemisphere_handler
def TropD_Metric_EDJ(
U: np.ndarray,
lat: np.ndarray,
lev: Optional[np.ndarray] = None,
method: str = "peak",
n_fit: int = 1,
**maxlat_kwargs,
) -> np.ndarray:
"""
TropD Eddy Driven Jet (EDJ) Metric
Latitude of maximum of the zonal wind at the level closest to 850 hPa
Parameters
----------
U : numpy.ndarray (dim1, ..., lat[, lev])
N-dimensional array of zonal-mean zonal wind data. Also accepts surface/
850hPa wind
lat : numpy.ndarray (lat,)
latitude array
lev : numpy.ndarray, optional (lev,)
vertical level array in hPa, used to find wind closest to 850hPa. if not
provided, last axis is assumed to be lat
method : {"peak", "max", "fit"}, optional
Method for determining latitude of maximum zonal wind, by default "peak":
* "peak": Latitude of the maximum of the zonal wind at the level closest
to 850hPa (smoothing parameter ``n=30``)
* "max": Latitude of the maximum of the zonal wind at the level closest to
850hPa (smoothing parameter ``n=6``)
* "fit": Latitude of the maximum of the zonal wind at the level closest to
850hPa using a quadratic polynomial fit of data from grid points
surrounding the grid point of the maximum
n_fit : int, optional
used when ``method="fit"``, determines the number of points around the max to use
while fitting, by default 1
**kwargs : optional
additional keyword arguments for :py:func:`TropD_Calculate_MaxLat` (not used
for ``method="fit"``)
n : int, optional
Rank of moment used to calculate the location of max, e.g.,
``n=1,2,4,6,8,...``, by default 6 if ``method="max"``, 30 if ``method="peak"``
Returns
-------
Phi : numpy.ndarray (dim1, ..., dimN-2[, dimN-1])
N-2(N-1) dimensional latitudes of the EDJ
Umax : numpy.ndarray (dim1, ..., dimN-2), conditional
N-2 dimensional EDJ strength, returned if ``method="fit"``
"""
U = np.asarray(U)
lat = np.asarray(lat)
get_lev = lev is not None
U_grid_shape = U.shape[-2:] if get_lev else U.shape[-1]
input_grid_shape = (lat.size, lev.size) if get_lev else lat.size # type: ignore
if U_grid_shape != input_grid_shape:
raise ValueError(
f"last axes of U w/ shape {U_grid_shape},"
" not aligned with input grid of shape " + str(input_grid_shape)
)
if method not in ["max", "peak", "fit"]:
raise ValueError("unrecognized method " + method)
n_fit = int(n_fit)
eq_boundary = 15.0
polar_boundary = 70.0
mask = (lat > eq_boundary) & (lat < polar_boundary)
if get_lev:
u850 = U[..., find_nearest(lev, 850.0)] # type: ignore
else:
u850 = U
if method != "fit": # max or peak
# update default n to 30 for peak
if method == "peak":
maxlat_kwargs["n"] = maxlat_kwargs.get("n", 30)
else: # max
maxlat_kwargs["n"] = maxlat_kwargs.get("n", 6)
# lat should already be last axis
maxlat_kwargs.pop("axis", None)
Phi = TropD_Calculate_MaxLat(u850[..., mask], lat[mask], **maxlat_kwargs)
return Phi
else: # method == 'fit':
lat_mask = lat[mask]
Phi, Umax = _quadratic_peak_fit_last_axis(u850[..., mask], lat_mask, n_fit=n_fit)
return Phi, Umax # type: ignore
[docs]@hemisphere_handler
def TropD_Metric_OLR(
olr: np.ndarray,
lat: np.ndarray,
method: str = "250W",
Cutoff: Optional[float] = None,
**maxlat_kwargs,
) -> np.ndarray:
"""
TropD Outgoing Longwave Radiation (OLR) Metric
Latitude of maximum OLR or first latitude poleward of maximum where OLR reaches
crosses a predefined cutoff
Parameters
----------
olr : numpy.ndarray (dim1, ..., lat)
zonal mean TOA olr (positive)
lat : numpy.ndarray (lat,)
latitude array
method : {"250W", "20W", "cutoff", "10Perc", "max", "peak"}, optional
Method for determining the OLR maximum/threshold, by default "250W":
* "250W": the 1st latitude poleward of the tropical OLR max in each hemisphere
where OLR crosses :math:`250W/m^2`
* "20W": the 1st latitude poleward of the tropical OLR max in each hemisphere
where OLR crosses [the tropical OLR max minus :math:`20W/m^2`]
* "cutoff": the 1st latitude poleward of the tropical OLR max in each
hemisphere where OLR crosses a specified cutoff value
* "10Perc": the 1st latitude poleward of the tropical OLR max in each
hemisphere where OLR is 10% smaller than the tropical OLR max
* "max": the latitude of the tropical olr max in each hemisphere with
smoothing parameter ``n=6``
* "peak": the latitude of maximum of tropical olr in each hemisphere with
smoothing parameter ``n=30``
Cutoff : float, optional
if ``method="cutoff"``, specifies the OLR cutoff value in :math:`W/m^2`
**kwargs : optional
additional keyword arguments for :py:func:`TropD_Calculate_MaxLat`
n : int, optional
Rank of moment used to calculate the location of max, e.g.,
``n=1,2,4,6,8,...``, by default 6 if ``method="max"``, 30 if ``method="peak"``
Returns
-------
Phi : numpy.ndarray (dim1, ..., dimN-1)
N-1 dimensional latitudes of the near-equator OLR max/threshold crossing
"""
olr = np.asarray(olr)
lat = np.asarray(lat)
if olr.shape[-1] != lat.size:
raise ValueError(
f"last axis of olr had shape {olr.shape[-1]},"
f" not aligned with lat size {lat.size}"
)
if "n" in maxlat_kwargs and method != "max":
warnings.warn("smoothing parameter n only utilized for method == max")
maxlat_kwargs.pop("n")
if Cutoff is not None and method != "cutoff":
warnings.warn("cutoff parameter only utilized for method == cutoff")
# make latitude vector monotonically increasing
if lat[-1] < lat[0]:
olr = olr[..., ::-1]
lat = lat[::-1]
eq_boundary = 5.0
subpolar_boundary = 40.0
polar_boundary = 60.0
subpolar_mask = (lat > eq_boundary) & (lat < subpolar_boundary)
if method == "peak":
maxlat_kwargs["n"] = 30
elif method == "max":
maxlat_kwargs["n"] = 6
# lat should already be last axis
maxlat_kwargs.pop("axis", None)
olr_max_lat = TropD_Calculate_MaxLat(
olr[..., subpolar_mask], lat[subpolar_mask], **maxlat_kwargs
)
if method in ["cutoff", "250W", "20W", "10Perc"]:
# get tropical OLR max for methods 20W and 10Perc
olr_max = olr[..., subpolar_mask].max(axis=-1)[..., None]
# set cutoff dependent on method
if method == "250W":
Cutoff = 250.0
elif method == "20W":
Cutoff = olr_max - 20.0
elif method == "10Perc":
Cutoff = 0.9 * olr_max
# identify regions poleward of the OLR max in both hemispheres
mask = (lat > eq_boundary) & (lat < polar_boundary)
max_mask = lat[mask] > olr_max_lat[..., None]
# OLR in each hemisphere, only valid poleward of max
olr = np.where(max_mask, olr[..., mask], np.nan)
# get latitude where OLR falls below cutoff poleward of tropical max
Phi = TropD_Calculate_ZeroCrossing(olr - Cutoff, lat[mask])
# these methods don't need to find a threshold after
# the OLR max, just need the max
elif method in ["max", "peak"]:
Phi = olr_max_lat
else:
raise ValueError("unrecognized method " + method)
return Phi
[docs]@hemisphere_handler
def TropD_Metric_PE(
pe: np.ndarray,
lat: np.ndarray,
method: str = "zero_crossing",
lat_uncertainty: float = 0.0,
) -> np.ndarray:
"""
TropD Precipitation Minus Evaporation (PE) Metric
Find the first zero crossing of zonal-mean precipitation minus evaporation poleward
of the subtropical minimum
Parameters
----------
pe : numpy.ndarray (dim1, ..., lat,)
zonal-mean precipitation minus evaporation
lat : numpy.ndarray (lat,)
latitude array
method : {"zero_crossing"}, optional
Method to compute the zero crossing for precipitation minus evaporation, by
default "zero_crossing".
* "zero_crossing": the first latitude poleward of the subtropical P-E min
where P-E changes from negative to positive.
lat_uncertainty : float, optional
The minimal distance allowed between adjacent zero crossings in degrees,
by default 0.0
Returns
-------
Phi : numpy.ndarray (dim1, ..., dimN-1)
N-1 dimensional latitudes of the 1st subtropical P-E zero crossing
"""
pe = np.atleast_2d(pe)
lat = np.asarray(lat)
if pe.shape[-1] != lat.size:
raise ValueError(
f"last axis of P-E had shape {pe.shape[-1]},"
f" not aligned with lat size {lat.size}"
)
if method != "zero_crossing":
raise ValueError("unrecognized method " + method)
# make latitude vector monotonically increasing
if lat[-1] < lat[0]:
pe = pe[..., ::-1]
lat = lat[::-1]
# The gradient of PE is used to determine whether PE
# becomes positive at the zero crossing
dpedy = np.diff(pe, axis=-1)
pe_grad = _interp_midpoint_field_to_grid(dpedy, lat)
# define latitudes of boundaries certain regions
eq_boundary = 5.0
subpolar_boundary = 50.0
polar_boundary = 60.0
# split into hemispheres
mask = (lat > eq_boundary) & (lat < polar_boundary)
lat_masked = lat[mask]
# find E-P maximum (P-E min) latitude in subtropics
# first define the subpolar region to search, excluding poles due to low P-E
subpolar_mask = (lat > eq_boundary) & (lat < subpolar_boundary)
Emax_lat_masked = _maxlat_last_axis_no_nan(-pe[..., subpolar_mask], lat[subpolar_mask], n=30)
# find zero crossings poleward of E-P max
# first define regions poleward of E-P max in each hemisphere
lat_after_Emax = lat_masked > Emax_lat_masked[..., None]
pelat_after_Emax = np.where(lat_after_Emax, pe[..., mask], np.nan)
ZC1_lat_masked = TropD_Calculate_ZeroCrossing(
pelat_after_Emax, lat_masked, lat_uncertainty=lat_uncertainty
)
# we've got the zero crossing poleward of E-P max, but it might not be the
# right one. Now we need to go through and check the P-E gradient
# to make sure P-E is increasing poleward.
# if it is, use that latitude, else, use the next zero crossing
# first check if the (northward) gradient value at the
# zero crossing is increasing poleward
grad_at_ZC1 = _interp_last_axis_common_grid(ZC1_lat_masked, lat, pe_grad)
pelat_increases_at_ZC = np.isfinite(ZC1_lat_masked) & (grad_at_ZC1 > 0.0)
# then get the next zero crossing for when we need it
# first define regions poleward of zero crossing
ZC2_lat_masked = np.full_like(ZC1_lat_masked, np.nan)
needs_ZC2 = ~pelat_increases_at_ZC
if np.any(needs_ZC2):
lat_after_ZC = lat_masked > ZC1_lat_masked[needs_ZC2][..., None]
pelat_after_ZC = np.where(lat_after_ZC, pe[..., mask][needs_ZC2], np.nan)
ZC2_lat_masked[needs_ZC2] = TropD_Calculate_ZeroCrossing(
pelat_after_ZC, lat_masked, lat_uncertainty=lat_uncertainty
)
# if the gradient is increasing poleward, use it, otherwise, use the next
Phi = np.where(pelat_increases_at_ZC, ZC1_lat_masked, ZC2_lat_masked)
return Phi
[docs]@hemisphere_handler
def TropD_Metric_PSI(
V: np.ndarray,
lat: np.ndarray,
lev: np.ndarray,
method: str = "Psi_500",
lat_uncertainty: float = 0.0,
field_type: str = "V",
threshold: Optional[float] = 0.1,
) -> np.ndarray:
"""
TropD Mass Streamfunction (PSI) Metric
Latitude of the subtropical zero crossing of the meridional mass streamfunction.
The input field can be either zonal-mean meridional wind or a precomputed mass
streamfunction.
Parameters
----------
V : numpy.ndarray (dim1, ..., lat, lev) or (dim1, ..., lev, lat)
N-dimensional zonal-mean meridional wind when ``field_type="V"`` or a
precomputed mass streamfunction when ``field_type="PSI"``. The final two
axes may be ordered either as ``(..., lat, lev)`` or ``(..., lev, lat)``.
lat : numpy.ndarray (lat,)
latitude array
lev : numpy.ndarray (lev,)
vertical level array in hPa
method : {"Psi_500", "Psi_500_10Perc", "Psi_<p1>_<p2>", "Psi_<p1>_<p2>_<x>Perc", "Psi_<p1>_<p2>_<x>Perc_center2d", "Psi_<p1>_<p2>_<x>Perc_profile", "Psi_500_Int", "Psi_Int"},
optional
Method of determining which Psi zero crossing to return, by default "Psi_500":
* "Psi_500": Zero crossing of the streamfunction (Psi) at 500hPa
* "Psi_500_10Perc": Crossing of ``threshold`` times the extremum value of Psi
in each hemisphere at the 500hPa level. The historical
method name is retained and defaults to ``threshold=0.1``.
* "Psi_<p1>_<p2>": Zero crossing of Psi vertically averaged between the
``p1`` and ``p2`` hPa levels, e.g. ``"Psi_300_700"``
* "Psi_<p1>_<p2>_<x>Perc": Latitude where the layer-mean streamfunction
between ``p1`` and ``p2`` hPa decreases to
``x`` % of the Hadley-cell-center value from
the full ``(lat, lev)`` field. Omitting the
suffix defaults to the ``center2d`` variant
below, so ``"Psi_500_800_5Perc"`` remains the
Hill-style default.
* "Psi_<p1>_<p2>_<x>Perc_center2d": As above, with the threshold taken
from the 2-D cell-center maximum.
* "Psi_<p1>_<p2>_<x>Perc_profile": As above, but with the threshold
taken from the layer-mean profile
value at the cell-center latitude.
* "Psi_500_Int": Zero crossing of the vertically-integrated Psi at 500 hPa
* "Psi_Int" : Zero crossing of the column-averaged Psi
lat_uncertainty : float, optional
The minimal distance allowed between the adjacent zero crossings, same units as
lat, by default 0.0. e.g., for ``lat_uncertainty=10``, this function will return
NaN if another zero crossing is within 10 degrees of the most equatorward
zero crossing.
field_type : {"V", "PSI"}, optional
Specifies whether the first argument is zonal-mean meridional wind or a
precomputed mass streamfunction, by default ``"V"``
threshold : float or None, optional
Threshold fraction used for the threshold-crossing HC edge. For
``method="Psi_500_10Perc"``, this is the threshold applied directly.
For the other PSI methods, if the zero crossing cannot be found and the
result would otherwise be NaN, the metric falls back to the latitude
where Psi first crosses ``threshold`` times ``Pmax`` between the subtropical
maximum and minimum. Set to ``None`` to disable this fallback. By
default 0.1.
Returns
-------
Phi : numpy.ndarray (dim1, ..., dimN-2)
N-2 dimensional latitudes of the Psi zero crossing
"""
# type casting/input checking
V = _coerce_trailing_lat_lev_axes(V, lat, lev, "V")
lat = np.asarray(lat)
lev = np.asarray(lev)
normalized_input = field_type.strip().lower()
if normalized_input in {"v", "wind", "meridional_wind"}:
Psi = TropD_Calculate_StreamFunction(V, lat, lev)
elif normalized_input in {"psi", "streamfunction", "mass_streamfunction"}:
Psi = V
else:
raise ValueError("field_type must be one of 'V' or 'PSI'")
return _psi_metric_latitude(
Psi,
lat,
lev,
method=method,
lat_uncertainty=lat_uncertainty,
threshold=threshold,
)
[docs]@hemisphere_handler
def TropD_Metric_PSL(
ps: np.ndarray, lat: np.ndarray, method: str = "peak", **maxlat_kwargs
) -> np.ndarray:
"""
TropD Sea-level Pressure (PSL) Metric
Latitude of maximum of the subtropical sea-level pressure
Parameters
----------
ps : np.ndarray (dim1, ..., lat)
N-dimensional sea-level pressure
lat : np.ndarray (lat,)
latitude array
method : {"peak", "max"}, optional
Method for determining latitude of max PSL, by default "peak":
* "peak": latitude of the maximum of subtropical sea-level pressure (smoothing
parameter ``n=30``)
* "max": latitude of the maximum of subtropical sea-level pressure (smoothing
parameter ``n=6``)
**kwargs : optional
additional keyword arguments for :py:func:`TropD_Calculate_MaxLat` (not used
for ``method="fit"``)
n : int, optional
Rank of moment used to calculate the location of max, e.g.,
``n=1,2,4,6,8,...``, by default 6 if ``method="max"``, 30 if ``method="peak"``
Returns
-------
Phi : numpy.ndarray (dim1, ..., dimN-1)
N-1 dimensional latitudes of the subtropical PSL maximum
"""
ps = np.asarray(ps)
lat = np.asarray(lat)
if ps.shape[-1] != lat.size:
raise ValueError(
f"last axis of ps had shape {ps.shape[-1]},"
f" not aligned with lat size {lat.size}"
)
if method not in ["max", "peak"]:
raise ValueError("unrecognized method " + method)
if method == "peak":
maxlat_kwargs["n"] = maxlat_kwargs.get("n", 30)
else: # max
maxlat_kwargs["n"] = maxlat_kwargs.get("n", 6)
# lat should already be last axis
maxlat_kwargs.pop("axis", None)
eq_boundary = 15
polar_boundary = 60
mask = (lat > eq_boundary) & (lat < polar_boundary)
Phi = TropD_Calculate_MaxLat(ps[..., mask], lat[mask], **maxlat_kwargs)
return Phi
[docs]@hemisphere_handler
def TropD_Metric_STJ(
U: np.ndarray,
lat: np.ndarray,
lev: np.ndarray,
method: str = "adjusted_peak",
n_fit: int = 1,
**maxlat_kwargs,
) -> np.ndarray:
#TODO: Amend fit to adjusted_fit for consistency. Deprecate and add warning
"""
TropD Subtropical Jet (STJ) Metric
Latitude of the (adjusted) maximum upper-level zonal-mean zonal wind
Parameters
----------
U : numpy.ndarray (dim1,...,lat,lev,)
N-dimensional zonal-mean zonal wind
lat : numpy.ndarray (lat,)
latitude array
lev : numpy.ndarray (lev,)
vertical level array in hPa
method : {"adjusted_peak", "adjusted_max", "core_peak", "core_max", "fit"}, optional
Method for determing the latitude of the STJ maximum, by default "adjusted_peak":
* "adjusted_peak": Latitude of maximum (smoothing parameter``n=30``) of [the
zonal wind averaged between 100 and 400 hPa] minus [the zonal
mean zonal wind at the level closest to 850hPa], poleward of
10 degrees and equatorward of the Eddy Driven Jet latitude
* "adjusted_max": Latitude of maximum (smoothing parameter ``n=6``) of [the
zonal wind averaged between 100 and 400 hPa] minus [the zonal
mean zonal wind at the level closest to 850hPa], poleward of 10
degrees and equatorward of the Eddy Driven Jet latitude
* "core_peak": Latitude of maximum (smoothing parameter ``n=30``) of the zonal
wind averaged between 100 and 400 hPa, poleward of 10 degrees and
equatorward of 60 degrees
* "core_max": Latitude of maximum (smoothing parameter ``n=6``) of the zonal wind
averaged between 100 and 400 hPa, poleward of 10 degrees and
equatorward of 60 degrees`r`n * "fit": Latitude of the maximum of [the zonal wind averaged between 100 and 400`r`n hPa] minus [the zonal mean zonal wind at the level closest to 850 hPa]`r`n using a quadratic polynomial fit of data from grid points surrounding`r`n the grid point of the maximum
n_fit : int, optional
used when ``method="fit"``, determines the number of points around the max to use
while fitting, by default 1
**kwargs : optional
additional keyword arguments for :py:func:`TropD_Calculate_MaxLat` (not used
for ``method="fit"``)
n : int, optional
Rank of moment used to calculate the location of max, e.g.,
``n=1,2,4,6,8,...``, by default 6 if ``method="core_max"`` or
``method="adjusted_max"``, 30 if ``method="core_peak"`` or
``method="adjusted_peak"``
Returns
-------
Phi : numpy.ndarray (dim1, ..., dimN-2)
N-2 dimensional latitudes of the STJ
Umax : numpy.ndarray (dim1, ..., dimN-2), conditional
N-2 dimensional STJ strength, returned if ``method="fit"``
"""
U = np.asarray(U)
if U.ndim < 3:
U = U[None, ...]
lat = np.asarray(lat)
lev = np.asarray(lev)
if U.shape[-2:] != (lat.size, lev.size):
raise ValueError(
f"last axes of U had shape {U.shape[-2:]},"
f" not aligned with grid shape {lat.size, lev.size}"
)
if method not in ["adjusted_peak", "core_peak", "adjusted_max", "core_max", "fit"]:
raise ValueError("unrecognized method " + method)
layer_400_to_100 = (lev >= 100) & (lev <= 400)
lev_int = lev[layer_400_to_100]
# Pressure weighted vertical mean of U
if lev_int.size > 1:
u_int = trapezoid(U[..., layer_400_to_100], lev_int, axis=-1) / (
lev_int[-1] - lev_int[0]
)
else:
u_int = U[..., layer_400_to_100]
if ("adjusted" in method) or method == "fit": # adjusted_peak, adjusted_max methods
idx_850 = find_nearest(lev, 850)
u = u_int - U[..., idx_850]
else: # core_peak, core_max methods
u = u_int
eq_boundary = 10
polar_boundary = 60
mask = (lat > eq_boundary) & (lat < polar_boundary)
if method != "fit":
if "peak" in method: # adjusted_peak or core_peak have different default
maxlat_kwargs["n"] = maxlat_kwargs.get("n", 30)
else: # adjusted_max or core_max methods
maxlat_kwargs["n"] = maxlat_kwargs.get("n", 6)
# lat should already be last axis
maxlat_kwargs.pop("axis", None)
u_masked = u[..., mask]
if "adjusted" in method: # adjusted_max or adjusted_peak
(Phi_EDJ,) = TropD_Metric_EDJ(U, lat, lev)
lat_before_EDJ = lat[mask] < Phi_EDJ[..., None]
u_masked = np.where(lat_before_EDJ, u_masked, np.nan)
Phi = TropD_Calculate_MaxLat(u_masked, lat[mask], **maxlat_kwargs)
return Phi
else: # method == 'fit':
lat_mask = lat[mask]
Phi, Umax = _quadratic_peak_fit_last_axis(u[..., mask], lat_mask, n_fit=n_fit)
return Phi, Umax # type: ignore
[docs]@hemisphere_handler
def TropD_Metric_TPB(
T: np.ndarray,
lat: np.ndarray,
lev: np.ndarray,
method: str = "max_gradient",
Z: Optional[np.ndarray] = None,
Cutoff: float = 1.5e4,
**maxlat_kwargs,
) -> np.ndarray:
"""
TropD Tropopause Break (TPB) Metric
Finds the latitude of the tropopause break
Parameters
----------
T : numpy.ndarray (dim1, ..., lat, lev)
N-dimensional temperature array (K)
lat : numpy.ndarray (lat,)
latitude array
lev : numpy.ndarray (lev,)
vertical levels array in hPa
method : {"max_gradient", "cutoff", "max_potemp"}, optional
Method to identify tropopause break, by default "max_gradient":
* "max_gradient": The latitude of maximal poleward gradient of the tropopause
height
* "cutoff": The most equatorward latitude where the tropopause crosses
a prescribed cutoff value
* "max_potemp": The latitude of maximal difference between the potential
temperature at the tropopause and at the surface
Z : Optional[numpy.ndarray], optional
N-dimensional geopotential height array (m), required by ``method="cutoff"``
Cutoff : float, optional
Geopotential height cutoff (m) that marks the location of the tropopause break,
by default 1.5e4, used when ``method="cutoff"``
**kwargs : optional
additional keyword arguments for :py:func:`TropD_Calculate_MaxLat` (not used
for ``method="cutoff"``)
n : int, optional
Rank of moment used to calculate the location of max, e.g.,
``n=1,2,4,6,8,...``, by default 6 if ``method="max_gradient"``, 30 if ``method="max_pottemp"``
Returns
-------
Phi : numpy.ndarray (dim1, ..., dimN-2)
N-2 dimensional latitudes of the tropopause break
"""
T = np.asarray(T)
lat = np.asarray(lat)
lev = np.asarray(lev)
if T.shape[-2:] != (lat.size, lev.size):
raise ValueError(
f"final dimensions on T {T.shape[-2:]} and grid "
f"coordinates don't match ({lat.size},{lev.size})"
)
if method not in ["max_gradient", "max_potemp", "cutoff"]:
raise ValueError("unrecognized method " + method)
# make latitude vector monotonically increasing
if lat[-1] < lat[0]:
lat = lat[::-1]
T = T[..., ::-1, :]
if Z is not None:
Z = Z[..., ::-1, :]
polar_boundary = 60.0
eq_boundary = 0.0
mask = (lat > eq_boundary) & (lat < polar_boundary)
if "max_" in method: # 'max_gradient' or 'max_potemp'
if method == "max_potemp":
maxlat_kwargs["n"] = maxlat_kwargs.get("n", 30)
PT = T / (lev / 1000.0) ** KAPPA
Pt, PTt = TropD_Calculate_TropopauseHeight(T, lev, Z=PT)
F = PTt - np.nanmin(PT, axis=-1)
else:
Pt = TropD_Calculate_TropopauseHeight(T, lev, Z=None)
F = np.diff(Pt, axis=-1) / (lat[1] - lat[0])
lat = (lat[1:] + lat[:-1]) / 2.0
F *= np.sign(lat)
# redefine mask b/c we have new grid points
mask = (lat > eq_boundary) & (lat < polar_boundary)
F = np.where(np.isfinite(F), F, 0.0)
Phi = TropD_Calculate_MaxLat(F[..., mask], lat[mask], **maxlat_kwargs)
else: # method == 'cutoff'
if Z is None:
raise ValueError('Z must be provided when method = "cutoff"')
Pt, Ht = TropD_Calculate_TropopauseHeight(T, lev, Z)
Phi = TropD_Calculate_ZeroCrossing(Ht[..., mask] - Cutoff, lat[mask])
return Phi
[docs]@hemisphere_handler
def TropD_Metric_UAS(
U: np.ndarray,
lat: np.ndarray,
lev: Optional[np.ndarray] = None,
method: str = "zero_crossing",
lat_uncertainty: float = 0.0,
) -> np.ndarray:
"""
TropD Near-Surface Zonal Wind (UAS) Metric
Parameters
----------
U : numpy.ndarray (dim1, ..., lat[, lev])
N-dimensional zonal mean zonal wind array. Also accepts surface wind
lat : numpy.ndarray (lat,)
latitude array
lev : numpy.ndarray, optional (lev,)
vertical level array in hPa, required if U has final dimension lev
method : {"zero_crossing"}, optional
Method for identifying the surface wind zero crossing, by default "zero_crossing".
* "zero_crossing": the first subtropical latitude where near-surface zonal wind
changes from negative to positive
lat_uncertainty : float, optional
the minimal distance allowed between adjacent zero crossings in degrees, by
default 0.0
Returns
-------
Phi : numpy.ndarray (dim1, ..., dimN-2[, dimN-1])
N-2(N-1) dimensional latitudes of the first subtropical zero crossing of
near-surface zonal wind
"""
U = np.asarray(U)
lat = np.asarray(lat)
get_lev = lev is not None
U_grid_shape = U.shape[-2:] if get_lev else U.shape[-1]
input_grid_shape = (lat.size, lev.size) if get_lev else lat.size # type: ignore
if U_grid_shape != input_grid_shape:
raise ValueError(
f"last axes of U w/ shape {U_grid_shape},"
" not aligned with input grid of shape " + str(input_grid_shape)
)
if method != "zero_crossing":
raise ValueError("unrecognized method " + method)
if get_lev:
uas = U[..., find_nearest(lev, 850)] # type: ignore
else:
uas = U
# make latitude vector monotonically increasing
if lat[-1] < lat[0]:
uas = uas[..., ::-1]
lat = lat[::-1]
# define latitudes of boundaries certain regions
eq_boundary = 5
subpolar_boundary = 30
polar_boundary = 60
subpolar_mask = (lat > eq_boundary) & (lat < subpolar_boundary)
mask = (lat > eq_boundary) & (lat < polar_boundary)
# get subtropical surface wind minimum
uas_min_lat = TropD_Calculate_MaxLat(-uas[..., subpolar_mask], lat[subpolar_mask])
# need to look for zero crossing after subtropical minimum,
lat_after_Umin = lat[mask] > uas_min_lat[..., None]
ulat_after_Umin = np.where(lat_after_Umin, uas[..., mask], np.nan)
Phi = TropD_Calculate_ZeroCrossing(
ulat_after_Umin, lat[mask], lat_uncertainty=lat_uncertainty
)
return Phi
# ========================================
# Stratospheric metrics
# Written by Kasturi Shah - August 3 2020
# converted to python by Alison Ming 8 April 2021
# vectorized and refactored by Sam Smith 19 May 22
[docs]@hemisphere_handler
def Shah_2020_GWL(
tracer: np.ndarray, lat: np.ndarray, zonal_mean_tracer=False
) -> np.ndarray:
"""
Computes the gradient-weighted latitude (GWL) from tracer data
Reference: Shah et al., JGR-A, 2020
https://doi.org/10.1029/2020JD033081
Parameters
==========
tracer : numpy.ndarray (..., lat [, lon])
N-dimensional array of tracer data for computing gradient. If
``zonal_mean_tracer=False``, the last dimension should correspond to the
longitude axis, otherwise it should be the latitude axis
latitude : numpy.ndarray (lat,)
latitude array in degrees
zonal_mean_tracer : bool, optional
whether the input tracer data is zonally symmetric (True) or zonally-varying
(False) (i.e., has a longitude dimension), by default False
Returns
=======
tracer_steep_lat: numpy.ndarray
N-2(N-1 for ``zonal_mean_tracer=True``) dimenional array of GWL latitudes in the
SH
"""
tracer = np.asarray(tracer)
lat = np.asarray(lat).flatten()
if not zonal_mean_tracer:
tracer = tracer.swapaxes(-2, -1)
if lat.size != tracer.shape[-1]:
raise ValueError(
"input array 'lat' should be aligned with "
f"{'' if zonal_mean_tracer else 'second to '}last axis of tracer"
)
if lat[0] > lat[1]:
lat = lat[::-1]
tracer = tracer[..., ::-1]
phi_lat = np.radians(lat)
phi_mid_lat = 0.5 * (phi_lat[1:] + phi_lat[:-1])
# calculating gradients
grad_weight_lat = np.diff(tracer, axis=-1) / np.diff(phi_lat) * np.cos(phi_mid_lat)
# array of gradient weighted latitudes (...[,nlon])
GWL_lat = (phi_mid_lat * grad_weight_lat).sum(axis=-1) / grad_weight_lat.sum(
axis=-1
)
# area equivalent GWL width (in degrees)
if zonal_mean_tracer:
tracer_steep_lat = np.degrees(GWL_lat)
else:
tracer_steep_lat = np.degrees(np.arcsin(np.nanmean(np.sin(GWL_lat), axis=-1)))
return tracer_steep_lat
[docs]@hemisphere_handler
def Shah_2020_1sigma(
tracer: np.ndarray,
lat: np.ndarray,
zonal_mean_tracer=False,
) -> np.ndarray:
r"""
Computes the one-sigma width from 3-D tracer data
Reference: Shah et al., JGR-A, 2020
https://doi.org/10.1029/2020JD033081
Parameters
==========
tracer: numpy.ndarray (...,lat[, lon])
N-dimensional array of tracer data for computing 1:math:`\sigma`-width. If
``zonal_mean_tracer=False``, the last dimension should correspond to the
longitude axis, otherwise it should be the latitude axis
latitude: numpy.ndarray (lat,)
array of latitudes in degrees (1-D). If not increasing, it will be sorted
zonal_mean_tracer : bool, optional
whether the input tracer data is zonally symmetric (True) or zonally-varying
(False) (i.e., has a longitude dimension), by default False
Returns
=======
tracer_sigma_lat: numpy.ndarray
N-2(N-1 for ``zonal_mean_tracer=True``) dimenional array of
1:math:`\sigma`-widths in the NH
"""
tracer = np.atleast_2d(tracer)
lat = np.asarray(lat).flatten()
if not zonal_mean_tracer:
tracer = tracer.swapaxes(-2, -1)
if lat.size != tracer.shape[-1]:
raise ValueError(
"input array 'lat' should be aligned with second to last axis of tracer"
)
if lat[0] > lat[1]:
lat = lat[::-1]
tracer = tracer[..., ::-1]
phi_lat = np.broadcast_to(np.radians(lat), tracer.shape)
# finding ranges of 70 degs latitude with largest tracer values
nlats_70deg = round(70.0 / (lat[1] - lat[0])) + 1
tracer_70deg_totals = fftconvolve(
np.where(np.isfinite(tracer), tracer, 0.0),
np.ones((tracer.ndim - 1) * (1,) + (nlats_70deg,)),
"valid",
axes=-1,
)
max_70deg_starts = np.argmax(tracer_70deg_totals, axis=-1)[..., None]
bands_70deg = (np.arange(lat.size) >= max_70deg_starts) & (
np.arange(lat.size) < max_70deg_starts + nlats_70deg
)
tracer_banded = np.ma.masked_where(~bands_70deg, tracer)
# mean and std of 70deg bands
mean_70deg = tracer_banded.mean(axis=-1).filled(0.0)
std_70deg = tracer_banded.std(axis=-1, ddof=1).filled(0.0)
threshold = (mean_70deg - std_70deg)[..., None]
sigma_width_lat = (
np.ma.masked_where(
~((tracer < threshold) & (phi_lat > phi_lat[..., :1])),
phi_lat,
copy=False,
)
.min(axis=-1)
.filled(np.nan)
)
# area equivalent latitude (in degrees)
if zonal_mean_tracer:
tracer_sigma_lat = np.degrees(sigma_width_lat)
else:
tracer_sigma_lat = np.degrees(
np.arcsin(np.nanmean(np.sin(sigma_width_lat), axis=-1))
)
return tracer_sigma_lat