"""
Improved Flux Footprint Prediction (FFP) Module
This module implements the FFP model described in:
Kljun, N., P. Calanca, M.W. Rotach, H.P. Schmid, 2015:
A simple two-dimensional parameterisation for Flux Footprint Prediction (FFP).
Geosci. Model Dev. 8, 3695-3713, doi:10.5194/gmd-8-3695-2015
This version includes improvements for better alignment with the theoretical framework.
"""
import logging
from typing import Optional, Tuple, Union
import numpy as np
import pandas as pd
import xarray as xr
from scipy import signal
from scipy.ndimage import gaussian_filter
import matplotlib.pyplot as plt
import traceback
from .base_footprint_model import BaseFootprintModel
[docs]
class FFPModel(BaseFootprintModel):
"""
Flux Footprint Prediction (FFP) model based on Kljun et al. (2015).
This class implements an improved version of the flux footprint model
for determining the source area of scalar fluxes measured by
eddy covariance towers. It uses atmospheric stability, wind parameters,
and terrain configurations to calculate spatial footprints.
Attributes
----------
REQUIRED_COLUMNS : list of str
Names of required columns in the input DataFrame.
df : pandas.DataFrame
Copy of the input DataFrame used for modeling.
domain : list of float
Spatial domain defined as [xmin, xmax, ymin, ymax].
fclim_2d : xarray.DataArray or None
Final footprint climatology after all processing.
f_2d : xarray.DataArray or None
Instantaneous 2D footprint estimate.
logger : logging.Logger
Logger instance for model-level debugging.
"""
REQUIRED_COLUMNS = [
"sigmav", # Standard deviation of lateral velocity fluctuations
"ustar", # Friction velocity
"ol", # Obukhov length
"wind_dir", # Wind direction
"umean", # Wind speed
]
def __init__(
self,
df: pd.DataFrame,
domain: list = [-1000.0, 1000.0, -1000.0, 1000.0],
dx: float = 10.0,
dy: float = 10.0,
nx: int = 1000,
ny: int = 1000,
rs: list = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
crop_height: float = 0.2,
atm_bound_height: float = 2000.0,
inst_height: float = 2.0,
rslayer: bool = False,
smooth_data: bool = True,
crop: bool = False,
verbosity: int = 2,
logger=None,
**kwargs,
):
"""
Initialize the FFP model with input data and configuration.
Parameters
----------
df : pandas.DataFrame
Input DataFrame with required meteorological columns including:
'V_SIGMA', 'USTAR', 'MO_LENGTH', 'WD', 'WS'.
domain : list of float, optional
Footprint domain in the format [xmin, xmax, ymin, ymax].
dx : float, optional
Grid spacing in the x-direction (meters). Default is 10.0.
dy : float, optional
Grid spacing in the y-direction (meters). Default is 10.0.
nx : int, optional
Number of grid points in the x-direction. Default is 1000.
ny : int, optional
Number of grid points in the y-direction. Default is 1000.
rs : list of float, optional
Relative source area contributions to evaluate. Values must be between 0 and 1.
crop_height : float, optional
Height of vegetation or surface roughness (m). Default is 0.2.
atm_bound_height : float, optional
Height of the atmospheric boundary layer (m). Must be > 10. Default is 2000.0.
inst_height : float, optional
Height of the measurement instrument (m). Must be > crop_height. Default is 2.0.
rslayer : bool, optional
If True, apply roughness sublayer corrections. Default is False.
smooth_data : bool, optional
Whether to apply smoothing to the footprint climatology. Default is True.
crop : bool, optional
If True, crop output to significant areas. Default is False.
verbosity : int, optional
Verbosity level for logging (0=none, 1=info, 2=debug). Default is 2.
logger : logging.Logger, optional
Custom logger. If None, a new logger is created.
kwargs : dict
Additional keyword arguments for future extensions.
Raises
------
ValueError
If any input parameters are invalid or inconsistent.
"""
super().__init__(
df=df,
domain=domain,
dx=dx,
dy=dy,
rs=rs,
crop_height=crop_height,
atm_bound_height=atm_bound_height,
inst_height=inst_height,
smooth_data=smooth_data,
verbosity=verbosity,
logger=logger,
)
# Set up logger
self.logger.setLevel(logging.DEBUG if self.verbosity > 1 else logging.WARNING)
# Model constants
self.k = 0.4 # von Karman constant
self.oln = 5000.0 # neutral stability limit
# Add RSL-specific parameters
self.n_rsl = 2.75 # RSL height multiplier (2 ≤ n ≤ 5 per paper)
self.rsl_params = {
"conv": {"ps1": 0.8}, # p value for convective conditions
"stab": {"ps1": 0.55}, # p value for stable conditions
}
# Validate input DataFrame
self.df = self._validate_input_df(self.df)
# Process input data
self.prep_df_fields(
crop_height=self.crop_height,
inst_height=inst_height,
atm_bound_height=self.atm_bound_height,
)
# Initialize basic attributes
self.sigma_y = None
self.sigma_y_correction = None
self.fclim_2d = None
self.f_2d = None
self.domain = self._validate_domain(domain)
self.dx = float(dx)
self.dy = float(dy)
self.nx = int(nx)
self.ny = int(ny)
self.rs = self._validate_rs(rs)
# Validate physical parameters
if crop_height < 0:
raise ValueError("crop_height must be positive")
if atm_bound_height <= 10:
raise ValueError("atm_bound_height must be > 10m")
if inst_height <= crop_height:
raise ValueError("inst_height must be greater than crop_height")
self.crop_height = crop_height
self.atm_bound_height = atm_bound_height
self.inst_height = inst_height
self.smooth_data = bool(smooth_data)
self.crop = bool(crop)
# Initialize model parameters (will be updated based on stability)
self.initialize_model_parameters()
# Set up computational domain
self.define_domain()
# Create xarray dataset
self.create_xr_dataset()
# Perform validity checks
self.check_validity_ranges()
# Handle stability regimes
self.handle_stability_regimes()
def _validate_input_df(self, df: pd.DataFrame) -> None:
"""
Validate that the input DataFrame contains all required columns.
Parameters
----------
df : pandas.DataFrame
The input DataFrame to validate.
Raises
------
ValueError
If required columns are missing from the DataFrame.
"""
df1 = df.copy() # Make a copy to avoid modifying original
# Rename fields to standard names
df1 = df1.rename(
columns={
"V_SIGMA": "sigmav",
"USTAR": "ustar",
"wd": "wind_dir",
"WD": "wind_dir",
"MO_LENGTH": "ol",
"ws": "umean",
"WS": "umean",
}
)
missing_cols = [
col
for col in self.REQUIRED_COLUMNS
if col not in map(str.lower, df1.columns)
]
if missing_cols:
raise ValueError(
f"Missing required columns in input DataFrame: {missing_cols}"
)
# Check for invalid values
for col in self.REQUIRED_COLUMNS:
if df1[col].isnull().any():
self.logger.warning(f"Found null values in column {col}")
if not np.isfinite(df1[col]).all():
self.logger.warning(f"Found non-finite values in column {col}")
return df1
def _validate_domain(self, domain: list) -> list:
"""
Validate the domain boundaries.
Parameters
----------
domain : list of float
A list with exactly four elements: [xmin, xmax, ymin, ymax].
Returns
-------
list of float
Validated domain list.
Raises
------
ValueError
If the domain is not in the correct format.
"""
if len(domain) != 4:
raise ValueError("domain must be a list of [xmin, xmax, ymin, ymax]")
domain = [float(x) for x in domain]
if domain[0] >= domain[1] or domain[2] >= domain[3]:
raise ValueError("Invalid domain bounds")
return domain
def _validate_rs(self, rs: list) -> list:
"""
Validate the list of relative source area contributions.
Parameters
----------
rs : list of float
List of values between 0 and 1 (exclusive).
Returns
-------
list of float
Sorted list of validated relative contributions.
Raises
------
ValueError
If any value in the list is not in the (0, 1) range.
"""
if not isinstance(rs, (list, np.ndarray)):
raise ValueError("rs must be a list or array")
rs = [float(r) for r in rs]
if any(r <= 0 or r >= 1 for r in rs):
raise ValueError("all rs values must be between 0 and 1")
return sorted(rs)
def _setup_logger(self) -> logging.Logger:
"""
Set up a logger for the FFPModel class.
Returns
-------
logging.Logger
A configured logger instance.
"""
logger = logging.getLogger("FFPModel")
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
[docs]
def get_scalar_value(self, arr):
"""
Convert an array-like object to a scalar float value.
Parameters
----------
arr : array-like or scalar
The value to be converted.
Returns
-------
float
A scalar representation of the input.
"""
if isinstance(arr, xr.DataArray):
return float(arr.mean())
elif isinstance(arr, np.ndarray):
return float(np.mean(arr))
else:
return float(arr)
[docs]
def get_source_area_contour(self, r, x_ru, x_rd, y_r):
"""
Generate a contour dataset for a given source area.
Parameters
----------
r : float
Relative contribution (0 < r < 1).
x_ru : float
Upwind distance.
x_rd : float
Downwind distance.
y_r : float
Crosswind extent.
Returns
-------
xarray.Dataset
Contour dataset with coordinates and footprint values.
"""
# Get scalar values
x_rd_val = self.get_scalar_value(x_rd)
x_ru_val = self.get_scalar_value(x_ru)
y_r_val = self.get_scalar_value(y_r)
# Create distance arrays with unique, evenly spaced coordinates
x = np.linspace(x_rd_val, x_ru_val, self.nx)
y = np.linspace(-y_r_val, y_r_val, self.ny)
# Ensure coordinates are unique by adding small offset where needed
eps = np.finfo(float).eps # Smallest possible float value
x_unique = x + np.arange(len(x)) * eps
y_unique = y + np.arange(len(y)) * eps
# Create grid
xx, yy = np.meshgrid(x_unique, y_unique)
# Calculate polar coordinates
rho = np.sqrt(xx**2 + yy**2)
theta = np.arctan2(yy, xx)
# Get mean wind direction
wind_dir_mean = self.get_scalar_value(self.ds["wind_dir"])
rotated_theta = theta - (wind_dir_mean * np.pi / 180.0)
# Calculate footprint components using mean values
x_star = self.calc_scaled_x(rho)
sigma_y = self.calc_crosswind_spread(rho)
f_y = self.calc_crosswind_integrated_footprint(x_star)
# Calculate 2D footprint
f_2d = (
f_y / (np.sqrt(2 * np.pi) * sigma_y) * np.exp(-(yy**2) / (2.0 * sigma_y**2))
)
# Calculate contour level
total_flux = np.sum(f_2d) * self.dx * self.dy
sorted_f = np.sort(f_2d.flatten())[::-1]
cumsum_f = np.cumsum(sorted_f) * self.dx * self.dy
idx = np.searchsorted(cumsum_f / total_flux, r)
if idx >= len(sorted_f):
idx = len(sorted_f) - 1
contour_level = sorted_f[idx]
# Create output dataset with unique coordinates
return xr.Dataset(
{
"contour_level": xr.DataArray(contour_level),
"x": xr.DataArray(x_unique, dims=["x"]),
"y": xr.DataArray(y_unique, dims=["y"]),
"f": xr.DataArray(f_2d, dims=["y", "x"]),
}
)
[docs]
def calc_scaled_x(self, x: Union[float, np.ndarray]) -> Union[float, np.ndarray]:
"""
Calculate the scaled distance X* based on wind and height parameters.
Parameters
----------
x : float or ndarray
Upwind distance from the receptor.
Returns
-------
float or ndarray
Scaled distance X*.
"""
# Get mean values
zm_mean = float(self.ds["zm"].mean())
h_mean = float(self.ds["h"].mean())
u_zm_mean = float(self.ds["umean"].mean())
ustar_mean = float(self.ds["ustar"].mean())
# Calculate scaling
result = (
x / zm_mean * (1 - zm_mean / h_mean) * (ustar_mean / (u_zm_mean * self.k))
)
return result
[docs]
def initialize_model_parameters(self):
"""
Initialize core model parameters (a, b, c, d, ac, bc, cc).
Ensures scalar values are assigned.
"""
# Start with universal parameters as scalars
self.a = 1.4524
self.b = -1.9914
self.c = 1.4622
self.d = 0.1359
# Crosswind dispersion parameters
self.ac = 2.17
self.bc = 1.66
self.cc = 20.0
self.logger.debug("Initialized with universal parameters")
[docs]
def check_rsl_validity(self):
"""
Check if the measurement height is above the roughness sublayer.
Returns
-------
pandas.Series
Boolean mask of valid rows.
"""
# Calculate RSL height
h_rs = 10 * self.df["z0"] # Roughness element height
z_star = self.n_rsl * h_rs # RSL height
# Check if measurement height is above RSL
rsl_valid = self.df["zm"] > z_star
# Log RSL check results
invalid_count = np.sum(~rsl_valid)
if invalid_count > 0:
self.logger.warning(
f"{invalid_count} measurements within roughness sublayer "
f"(zm <= {self.n_rsl:.1f} * h_rs)"
)
return rsl_valid
[docs]
def apply_rsl_corrections(self):
"""
Apply roughness sublayer (RSL) corrections to crosswind dispersion.
"""
# Calculate RSL parameters
h_rs = 10 * self.ds["z0"]
z_star = self.n_rsl * h_rs
# Identify measurements within RSL
in_rsl = self.ds["zm"] <= z_star
if in_rsl.any():
# Calculate scaling factors for RSL effects
stability_param = self.ds["zm"] / self.ds["ol"]
# Get appropriate ps1 value based on stability
ps1 = xr.where(
stability_param <= 0,
self.rsl_params["conv"]["ps1"],
self.rsl_params["stab"]["ps1"],
)
# Calculate RSL correction based on eq. C1-C3
T = ps1 * self.ds["zm"] / self.ds["ustar"]
sigma_y0 = self.ds["sigmav"] * T
# Base crosswind spread as a function of x → expand over y to (x, y)
sigma_y_1d = xr.DataArray(
self.calc_crosswind_spread(self.x),
dims=("x",),
coords={"x": self.x},
)
sigma_y_grid = sigma_y_1d.expand_dims(y=self.y) # (x, y)
# Broadcast to (time, x, y)
sigma_y_grid_3d = sigma_y_grid.expand_dims(time=self.ds.time)
sigma_y0_3d = sigma_y0.broadcast_like(sigma_y_grid_3d)
in_rsl_3d = in_rsl.broadcast_like(sigma_y_grid_3d)
# Combine near-source and far-field terms where in RSL
self.sigma_y = xr.where(
in_rsl_3d,
np.sqrt(sigma_y0_3d ** 2 + sigma_y_grid_3d ** 2),
sigma_y_grid_3d,
)
# Blind-zone correction (time-only)
self.x_min = self.ds["ustar"] * T
else:
# Outside RSL: keep base spread and shape it to (time, x, y)
sigma_y_1d = xr.DataArray(
self.calc_crosswind_spread(self.x),
dims=("x",),
coords={"x": self.x},
)
self.sigma_y = sigma_y_1d.expand_dims(y=self.y).expand_dims(time=self.ds.time)
# No blind-zone correction needed; keep as NaN series
self.x_min = xr.full_like(self.ds["ustar"], np.nan)
[docs]
def prep_df_fields(
self, crop_height: float, inst_height: float, atm_bound_height: float
):
"""
Prepare and normalize input DataFrame fields for footprint calculation.
Parameters
----------
crop_height : float
Vegetation height.
inst_height : float
Instrument height.
atm_bound_height : float
Atmospheric boundary layer height.
"""
# Calculate displacement height
d_h = 10 ** (0.979 * np.log10(crop_height) - 0.154)
# Add derived fields
self.df["zm"] = inst_height - d_h # measurement height above displacement
self.df["h_c"] = crop_height
self.df["z0"] = crop_height * 0.123 # roughness length
self.df["h"] = atm_bound_height
# Apply validity checks
self._apply_validity_masks()
# Drop invalid data
self.df = self.df.dropna(subset=["sigmav", "wind_dir", "h", "ol"])
self.ts_len = len(self.df)
self.logger.debug(f"Valid input length: {self.ts_len}")
# Add RSL validity check
rsl_valid = self.check_rsl_validity()
self.df["rsl_valid"] = rsl_valid
# Additional RSL parameters
self.df["h_rs"] = 10 * self.df["z0"]
self.df["z_star"] = self.n_rsl * self.df["h_rs"]
# Log RSL statistics
mean_z_star = self.df["z_star"].mean()
self.logger.info(
f"Mean RSL height (z*): {mean_z_star:.1f}m, "
f"Measurement height: {inst_height}m"
)
def _apply_validity_masks(self):
"""
Apply basic physical constraints to filter invalid data in the DataFrame.
"""
self.df["zm"] = np.where(self.df["zm"] <= 0.0, np.nan, self.df["zm"])
self.df["h"] = np.where(self.df["h"] <= 10.0, np.nan, self.df["h"])
self.df["zm"] = np.where(self.df["zm"] > self.df["h"], np.nan, self.df["zm"])
self.df["sigmav"] = np.where(self.df["sigmav"] < 0.0, np.nan, self.df["sigmav"])
self.df["ustar"] = np.where(self.df["ustar"] <= 0.1, np.nan, self.df["ustar"])
self.df["wind_dir"] = np.where(
(self.df["wind_dir"] > 360.0) | (self.df["wind_dir"] < 0.0),
np.nan,
self.df["wind_dir"],
)
[docs]
def define_domain(self):
"""
Define the spatial computational domain and polar coordinate grids.
"""
self.logger.info("Setting up computational domain...")
try:
# Create coordinate arrays
if self.dx is None and self.nx is not None:
self.x = np.linspace(self.domain[0], self.domain[1], self.nx + 1)
self.y = np.linspace(self.domain[2], self.domain[3], self.ny + 1)
else:
self.x = np.arange(self.domain[0], self.domain[1] + self.dx, self.dx)
self.y = np.arange(self.domain[2], self.domain[3] + self.dy, self.dy)
self.logger.debug(f"Domain dimensions - x: {len(self.x)}, y: {len(self.y)}")
# Create 2D grid with explicit dimensions
self.xv, self.yv = np.meshgrid(self.x, self.y, indexing="ij")
# Create polar coordinate grids as xarray DataArrays with explicit dimensions
self.rho = xr.DataArray(
np.sqrt(self.xv**2 + self.yv**2),
dims=("x", "y"),
coords={"x": self.x, "y": self.y},
)
self.theta = xr.DataArray(
np.arctan2(self.xv, self.yv),
dims=("x", "y"),
coords={"x": self.x, "y": self.y},
)
# Initialize footprint grid with proper dimensions
self.fclim_2d = xr.zeros_like(self.rho)
self.logger.debug(
f"Grid shapes - xv: {self.xv.shape}, rho: {self.rho.shape}"
)
self.logger.info("Domain setup completed successfully")
except Exception as e:
self.logger.error(f"Error in domain setup: {str(e)}")
raise
[docs]
def create_xr_dataset(self):
"""Create xarray Dataset from input DataFrame."""
self.df.index.name = "time"
self.ds = xr.Dataset.from_dataframe(self.df)
[docs]
def handle_stability_regimes(self):
"""
Classify stability regimes and assign model parameters based on regime.
Returns
-------
xarray.Dataset
Dataset with regime-specific masks.
"""
# Calculate stability parameter
stability_param = self.ds["zm"] / self.ds["ol"]
# Define regime boundaries
regimes = xr.Dataset()
regimes["strongly_unstable"] = stability_param <= -15.5
regimes["unstable"] = (stability_param > -15.5) & (stability_param < -0.1)
regimes["neutral"] = (stability_param >= -0.1) & (stability_param <= 0.1)
regimes["stable"] = (stability_param > 0.1) & (
stability_param < self.oln / self.ds["zm"]
)
regimes["strongly_stable"] = stability_param >= self.oln / self.ds["zm"]
# Regime-specific parameters from Appendix A
params = {
"unstable": {"a": 2.930, "b": -2.285, "c": 2.127, "d": -0.107},
"neutral": {"a": 1.472, "b": -1.996, "c": 1.480, "d": 0.169},
"stable": {"a": 1.472, "b": -1.996, "c": 1.480, "d": 0.169},
}
# Smooth transitions
transition_zone = 0.1
def transition_weight(param, center, width):
"""Calculate smooth transition weight."""
return 0.5 * (1 + np.tanh((param - center) / width))
neutral_to_unstable = transition_weight(stability_param, -0.1, transition_zone)
neutral_to_stable = transition_weight(stability_param, 0.1, transition_zone)
# Apply smooth parameter transitions - results have shape (time,)
for param in ["a", "b", "c", "d"]:
time_varying = (
neutral_to_unstable * params["unstable"][param]
+ (1 - neutral_to_unstable) * (1 - neutral_to_stable) * params["neutral"][param]
+ neutral_to_stable * params["stable"][param]
)
# Store as DataArray with time dimension
setattr(self, param, time_varying)
self.logger.debug(f"Parameter {param}: shape={time_varying.shape}, "
f"range=[{float(time_varying.min()):.3f}, {float(time_varying.max()):.3f}]")
# Log regime statistics
for regime in regimes:
count = int(regimes[regime].sum())
percentage = (count / self.ts_len) * 100
self.logger.info(f"{regime}: {count} points ({percentage:.1f}%)")
# Add stability flags to dataset
self.ds["stability_regime"] = xr.where(
regimes["strongly_unstable"], -2,
xr.where(regimes["unstable"], -1,
xr.where(regimes["neutral"], 0,
xr.where(regimes["stable"], 1, 2)))
)
return regimes
[docs]
def check_validity_ranges(self):
"""
Check validity ranges according to equation 27 and other constraints from Kljun et al. (2015).
Implements the following validity checks:
1. Height validity (Eq. 27): 20z₀ < zm < he
where he ≈ 0.8h is the entrainment height
2. Stability validity (Eq. 27): -15.5 ≤ zm/L
3. Turbulence validity: u* > 0.1 m/s, σv > 0
4. Roughness sublayer validity: zm > z* ≈ n*hrs
where hrs ≈ 10z₀ and 2 ≤ n ≤ 5 (Section 2)
Returns
-------
xr.Dataset: Dataset containing boolean masks for:
- height_valid: True where 20z₀ < zm < he
- stability_valid: True where -15.5 ≤ zm/L
- turbulence_valid: True where u* > 0.1 and σv > 0
- rsl_valid: True where measurement height above RSL
Combined validity is stored in self.valid_footprint
"""
validity_mask = xr.Dataset()
# Height validity: 20z₀ < zm < he
validity_mask["height_valid"] = xr.where(
(self.ds["zm"] > 20 * self.ds["z0"]) & (self.ds["zm"] < 0.8 * self.ds["h"]),
True,
False,
)
# Stability validity: -15.5 ≤ zm/L
validity_mask["stability_valid"] = xr.where(
self.ds["zm"] / self.ds["ol"] >= -15.5, True, False
)
# Turbulence validity
validity_mask["turbulence_valid"] = xr.where(
(self.ds["ustar"] > 0.1) & (self.ds["sigmav"] > 0), True, False
)
# Combined validity
self.valid_footprint = validity_mask.all()
# Add RSL-specific checks
rsl_valid = self.check_rsl_validity()
validity_mask["rsl_valid"] = rsl_valid
# Update combined validity to include RSL
self.valid_footprint = self.valid_footprint & rsl_valid
return validity_mask
[docs]
def calc_pi_4(self):
"""
Calculate Π4 with comprehensive stability function implementation including neutral conditions.
Returns
-------
xr.DataArray
Calculated Π4 values with proper neutral condition handling
"""
stability_param = self.ds["zm"] / self.ds["ol"]
# Initialize psi_m array
psi_m = xr.zeros_like(stability_param)
# Determine stability regimes
stable_mask = self.ds["ol"] > 0
unstable_mask = self.ds["ol"] < -self.oln
neutral_mask = (self.ds["ol"] <= 0) & (self.ds["ol"] >= -self.oln)
# Calculate psi_m for each stability regime
# Stable conditions
psi_m = xr.where(stable_mask, -5.3 * stability_param, psi_m)
# Unstable conditions
psi_m = xr.where(unstable_mask, self.calc_unstable_psi(stability_param), psi_m)
# Neutral conditions - psi_m remains zero
self.logger.debug(
f"Stability regime counts - Stable: {stable_mask.sum().values}, "
f"Unstable: {unstable_mask.sum().values}, "
f"Neutral: {neutral_mask.sum().values}"
)
return np.log(self.ds["zm"] / self.ds["z0"]) - psi_m
[docs]
def calc_unstable_psi(self, stability_param):
"""
Calculate ψM for unstable atmospheric conditions.
Parameters
----------
stability_param : xarray.DataArray
Stability parameter zm/L.
Returns
-------
xarray.DataArray
Stability correction ψM.
"""
# Limit stability parameter to prevent numerical issues
stability_param = xr.where(stability_param < -15.5, -15.5, stability_param)
chi = (1 - 19 * stability_param) ** 0.25
# Implement the full equation with better numerical handling
return (
np.log((1 + chi**2) / 2)
+ 2 * np.log((1 + chi) / 2)
- 2 * np.arctan(chi)
+ np.pi / 2
)
[docs]
def apply_paper_smoothing(self):
"""
Apply enhanced 3x3 smoothing from Kljun et al. paper with mass conservation.
"""
self.logger.debug("Starting paper smoothing...")
try:
# Define smoothing kernel from paper
skernel = np.array([
[0.05, 0.1, 0.05],
[0.1, 0.4, 0.1],
[0.05, 0.1, 0.05]
])
# Store original for validation
original = self.fclim_2d.copy()
original_sum = float(original.sum())
self.logger.debug(
f"Pre-smoothing stats - Min: {float(original.min()):.2e}, "
f"Max: {float(original.max()):.2e}, Sum: {original_sum:.2e}"
)
# Apply convolution twice as specified in paper
smoothed = original.copy()
for i in range(2):
smoothed_values = signal.convolve2d(
smoothed.values,
skernel,
mode='same',
boundary='wrap' # Changed from 'fill' to 'wrap' for better edges
)
smoothed = xr.DataArray(
smoothed_values,
dims=smoothed.dims,
coords=smoothed.coords
)
self.logger.debug(f"Smoothing iteration {i + 1} complete")
# Additional Gaussian smoothing for visual quality (optional, scales with resolution)
if self.dx <= 5.0: # Only for fine grids
sigma = 0.5 # Very light additional smoothing
smoothed_values = gaussian_filter(
smoothed.values,
sigma=sigma,
mode='constant',
cval=0
)
smoothed = xr.DataArray(
smoothed_values,
dims=smoothed.dims,
coords=smoothed.coords
)
self.logger.debug(f"Applied additional Gaussian smoothing (sigma={sigma})")
# CRITICAL: Preserve total mass
smoothed_sum = float(smoothed.sum())
if smoothed_sum > 1e-10:
smoothed = smoothed * (original_sum / smoothed_sum)
self.logger.debug(
f"Mass conservation: {original_sum:.6e} -> {float(smoothed.sum()):.6e}"
)
# Validation
if np.isnan(smoothed).any() or not np.any(smoothed):
self.logger.warning(
"Smoothing produced invalid results, reverting to original"
)
return original
self.logger.debug(
f"Post-smoothing stats - Min: {float(smoothed.min()):.2e}, "
f"Max: {float(smoothed.max()):.2e}"
)
return smoothed
except Exception as e:
self.logger.error(f"Error in smoothing: {str(e)}")
self.logger.debug("Returning original unsmoothed data")
return self.fclim_2d
[docs]
def verify_results(self, data, name="data"):
"""
Check for NaN or infinite values in the given data.
Parameters
----------
data : array-like or xarray.DataArray
The data to verify.
name : str, optional
Descriptive name for logging.
"""
try:
if isinstance(data, xr.DataArray):
if data.isnull().any():
self.logger.warning(f"NaN values detected in {name}")
if not xr.DataArray.all(data.where(data.notnull())):
self.logger.warning(f"Non-finite values detected in {name}")
elif isinstance(data, np.ndarray):
if np.any(np.isnan(data)):
self.logger.warning(f"NaN values detected in {name}")
if not np.all(np.isfinite(data)):
self.logger.warning(f"Non-finite values detected in {name}")
else:
# Try to convert to numpy array first
try:
arr = np.asarray(data)
if np.any(np.isnan(arr)):
self.logger.warning(f"NaN values detected in {name}")
if not np.all(np.isfinite(arr)):
self.logger.warning(f"Non-finite values detected in {name}")
except:
self.logger.warning(
f"Could not verify {name} for NaN/infinite values"
)
except Exception as e:
self.logger.warning(f"Error verifying {name}: {str(e)}")
[docs]
def verify_data(self, data: xr.DataArray, name: str) -> bool:
"""
Verify the content and structure of a DataArray.
Parameters
----------
data : xarray.DataArray
Data array to check.
name : str
Variable name for logging.
Returns
-------
bool
True if valid, False otherwise.
"""
if data is None:
self.logger.error(f"{name} is None")
return False
if not isinstance(data, xr.DataArray):
self.logger.error(f"{name} is not a DataArray, got {type(data)}")
return False
if data.size == 0:
self.logger.error(f"{name} is empty")
return False
if np.all(np.isnan(data)):
self.logger.error(f"All values in {name} are NaN")
return False
# Log statistics for debugging
self.logger.debug(f"{name} statistics:")
self.logger.debug(f" Shape: {data.shape}")
self.logger.debug(f" NaN count: {np.sum(np.isnan(data))}")
self.logger.debug(f" Min: {float(data.min())}")
self.logger.debug(f" Max: {float(data.max())}")
self.logger.debug(f" Mean: {float(data.mean())}")
return True
def _log_array_stats(self, name, arr):
"""
Log statistics for an array.
Parameters
----------
name : str
Descriptive name.
arr : xarray.DataArray
The array to summarize.
"""
self.logger.debug(f"{name} statistics:")
self.logger.debug(f" Shape: {arr.shape}")
self.logger.debug(f" NaN count: {xr.where(np.isnan(arr), 1, 0).sum()}")
self.logger.debug(f" Min: {float(arr.min())}")
self.logger.debug(f" Max: {float(arr.max())}")
self.logger.debug(f" Mean: {float(arr.mean())}")
[docs]
def calc_scaled_distance_rsl(self):
"""
Calculate scaled distance X* with roughness sublayer correction.
Returns
-------
xarray.DataArray
Corrected scaled distance.
"""
# Basic scaled distance calculation
xstar_base = (
self.rho
* np.cos(self.rotated_theta)
/ self.ds["zm"]
* (1.0 - self.ds["zm"] / self.ds["h"])
/ (np.log(self.ds["zm"] / self.ds["z0"]) - self.calc_pi_4())
)
# Apply RSL correction where needed
in_rsl = self.ds["zm"] <= self.ds["z_star"]
if in_rsl.any():
# Calculate blind zone adjustment
x_adj = xr.where(in_rsl, self.x_min * np.cos(self.rotated_theta), 0.0)
# Adjust scaled distance
xstar_adj = xr.where(in_rsl, xstar_base + x_adj / self.ds["zm"], xstar_base)
return xstar_adj
return xstar_base
[docs]
def calculate_pi_groups(self):
"""
Calculate dimensionless Π groups for analysis.
Returns
-------
tuple of xarray.DataArray
Π₁, Π₂, Π₃, Π₄ values.
"""
if not hasattr(self, "f_2d"):
raise AttributeError(
"f_2d must be initialized before calculating pi groups"
)
# Π1 = fy*zm
pi_1 = self.f_2d * self.ds["zm"]
# Π2 = x/zm
pi_2 = self.rho * np.cos(self.rotated_theta) / self.ds["zm"]
# Π3 = (h - zm)/h = 1 - zm/h
pi_3 = 1 - self.ds["zm"] / self.ds["h"]
# Π4 = u(zm)/(u*k) = ln(zm/z0) - ψM
pi_4 = self.calc_pi_4()
return pi_1, pi_2, pi_3, pi_4
[docs]
def calc_crosswind_dispersion(self, xstar_ci_dummy, px):
"""
Compute dimensionless crosswind dispersion with safety checks.
Parameters
----------
xstar_ci_dummy : xarray.DataArray
Scaled distance.
px : bool
Condition flag.
Returns
-------
xarray.DataArray
Dimensionless crosswind dispersion.
"""
try:
# Ensure positive input for sqrt
x_abs = np.abs(xstar_ci_dummy)
denom = np.maximum(1.0 + self.cc * x_abs, 1e-10)
sqrt_term = np.clip(
self.bc * x_abs**2 / denom,
0.0, # Ensure non-negative input to sqrt
1e6, # Upper limit to prevent overflow
)
result = xr.where(px, self.ac * np.sqrt(sqrt_term), 0.0)
# Safety check for invalid values
result = xr.where(np.isfinite(result), result, 0.0)
return result
except Exception as e:
self.logger.error(f"Error in crosswind dispersion calculation: {str(e)}")
return xr.zeros_like(xstar_ci_dummy)
[docs]
def scale_crosswind_dispersion(self, sigystar_dummy):
"""
Scale the dimensionless crosswind dispersion σy* to real-scale σy.
Implements the real-scale conversion described in Equations 12-13 and surrounding text
of Kljun et al. (2015):
σy = σy*/(scale_const) * zm * σv/u*
where:
- σy* is dimensionless crosswind dispersion
- scale_const is stability-dependent scaling factor:
For L ≤ 0 (convective): scale_const = min(1, 10⁻⁵|zm/L|⁻¹ + 0.80)
For L > 0 (stable): scale_const = min(1, 10⁻⁵|zm/L|⁻¹ + 0.55)
- zm is measurement height
- σv is standard deviation of lateral velocity fluctuations
- u* is friction velocity
Args:
sigystar_dummy (xarray.DataArray): Dimensionless crosswind dispersion σy*
Returns:
xarray.DataArray: Real-scale crosswind dispersion σy [m]
Note:
Includes numerical safety checks to prevent division by zero and handle
potential non-finite values in intermediate calculations.
"""
try:
self.logger.debug("Starting crosswind dispersion scaling...")
self.logger.debug(f"Input σy* shape: {sigystar_dummy.shape}")
self.logger.debug(
f"σy* range: [{float(sigystar_dummy.min()):.2e}, {float(sigystar_dummy.max()):.2e}]"
)
# Calculate stability parameter
stability_param = self.ds["zm"] / self.ds["ol"]
self.logger.debug(
f"Stability parameter range: [{float(stability_param.min()):.2e}, {float(stability_param.max()):.2e}]"
)
# Calculate stability-dependent scaling constant with safety
scale_const = xr.where(
self.ds["ol"] <= 0,
1e-5 * np.maximum(np.abs(stability_param), 1e-10) ** (-1)
+ 0.80, # Convective
1e-5 * np.maximum(np.abs(stability_param), 1e-10) ** (-1)
+ 0.55, # Stable
)
self.logger.debug(
f"Scale constant range: [{float(scale_const.min()):.2e}, {float(scale_const.max()):.2e}]"
)
# Limit scaling constant
scale_const = xr.where(
np.isfinite(scale_const), np.minimum(scale_const, 1.0), 1.0
)
self.logger.debug("Applied upper limit of 1.0 to scale constant")
self.logger.debug(
f"Final scale constant range: [{float(scale_const.min()):.2e}, {float(scale_const.max()):.2e}]"
)
# Calculate scaled dispersion with safety checks
result = (
sigystar_dummy
/ np.maximum(scale_const, 1e-10)
* self.ds["zm"]
* np.maximum(self.ds["sigmav"], 1e-10)
/ np.maximum(self.ds["ustar"], 1e-10)
)
# Log intermediate values
self.logger.debug(
f"zm range: [{float(self.ds['zm'].min()):.2e}, {float(self.ds['zm'].max()):.2e}]"
)
self.logger.debug(
f"sigmav range: [{float(self.ds['sigmav'].min()):.2e}, {float(self.ds['sigmav'].max()):.2e}]"
)
self.logger.debug(
f"ustar range: [{float(self.ds['ustar'].min()):.2e}, {float(self.ds['ustar'].max()):.2e}]"
)
# Final safety check
result = xr.where(np.isfinite(result), result, 0.0)
self.logger.debug(
f"Final σy range: [{float(result.min()):.2e}, {float(result.max()):.2e}]"
)
self.logger.info("Crosswind dispersion scaling completed successfully")
return result
except Exception as e:
self.logger.error(f"Error in scaling crosswind dispersion: {str(e)}")
self.logger.debug(f"Detailed error trace:", exc_info=True)
raise
[docs]
def calculate_source_areas(self):
"""
Compute source areas for all specified relative contributions.
Returns
-------
dict
Dictionary of contour datasets by contribution level.
"""
source_areas = {}
# Calculate for each requested relative contribution
for r in self.rs:
if not 0.1 <= r <= 0.9:
self.logger.warning(f"Skipping r={r}, outside valid range 0.1-0.9")
continue
# Calculate extents
x_r = self.calc_crosswind_integrated_extent(r)
x_ru, x_rd = self.calc_peak_based_limits(r)
y_r = self.calc_crosswind_extent(r, x_ru, x_rd)
# Store results as xarray objects
source_areas[f"r_{int(r * 100)}"] = {
"x_r": xr.DataArray(x_r, name="extent"),
"x_ru": xr.DataArray(x_ru, name="upwind_extent"),
"x_rd": xr.DataArray(x_rd, name="downwind_extent"),
"y_r": xr.DataArray(y_r, name="crosswind_extent"),
"contour": self.get_source_area_contour(r, x_ru, x_rd, y_r),
}
return source_areas
[docs]
def calc_crosswind_integrated_extent(self, r: float) -> xr.DataArray:
"""
Calculate crosswind-integrated footprint extent for relative contribution r.
Following Equations 23-25 of the paper.
Args:
r: Relative contribution (between 0.1 and 0.9)
Returns:
xr.DataArray: Distance from receptor containing fraction r of footprint
"""
# Parameters from Eq. 17
c = 1.462
d = 0.136
# Calculate scaled extent from Eq. 24
x_star_r = -c / np.log(r) + d
# Convert to real scale using Eq. 25
x_r = (
x_star_r
* self.ds["zm"]
* (1 - self.ds["zm"] / self.ds["h"]) ** -1
* (np.log(self.ds["zm"] / self.ds["z0"]) - self.calc_pi_4())
)
return x_r
[docs]
def calc_peak_based_limits(self, r: float) -> Tuple[xr.DataArray, xr.DataArray]:
"""
Calculate upwind and downwind distances from the peak location for fraction r.
Following Eq. 26 of the paper.
Args:
r: Relative contribution (between 0.1 and 0.9)
Returns:
Tuple containing upwind and downwind distances from peak
"""
# Calculate peak location (Eq. 20)
x_star_max = -self.c / self.b + self.d
# Get x_star_r from crosswind integration
x_star_r = -self.c / np.log(r) + self.d
# Calculate downwind limit (Eq. 26 with first set of parameters)
x_star_rd = 0.44 * x_star_r**-0.77 + 0.24
# New fixed condition:
condition = (x_star_max < x_star_r) & (x_star_r <= 1.5)
# Calculate upwind limit with split conditions (Eq. 26)
x_star_ru = xr.where(
condition,
0.60 * x_star_r**1.32 + 0.61,
0.96 * x_star_r**1.01 + 0.19,
)
# Convert to real scale
x_rd = self.scale_to_real_distance(x_star_rd)
x_ru = self.scale_to_real_distance(x_star_ru)
return x_ru, x_rd
[docs]
def scale_to_real_distance(self, x_star: xr.DataArray) -> xr.DataArray:
"""
Convert scaled distance to real distance using Eq. 6/7.
Args:
x_star: Scaled distance X*
Returns:
xr.DataArray: Real-scale distance
"""
return (
x_star
* self.ds["zm"]
* (1 - self.ds["zm"] / self.ds["h"]) ** -1
* (np.log(self.ds["zm"] / self.ds["z0"]) - self.calc_pi_4())
)
[docs]
def calc_crosswind_extent(
self, r: float, x_ru: xr.DataArray, x_rd: xr.DataArray
) -> xr.DataArray:
"""
Calculate crosswind extent of source area using parameterized relationships.
Args:
r: Relative contribution
x_ru: Upwind distance from peak
x_rd: Downwind distance from peak
Returns:
xr.DataArray: Crosswind extent at each distance
"""
# Calculate sigma_y at peak location
x_peak = self.calc_real_footprint_peak(
self.ds["zm"], self.ds["h"], self.ds["umean"], self.ds["ustar"]
)
sigma_y_peak = self.calc_crosswind_spread(x_peak)
# Scale crosswind extent based on distance from peak
def scale_sigma(x_dist):
return sigma_y_peak * np.sqrt(x_dist / x_peak)
y_r = xr.where(x_rd <= x_peak, scale_sigma(x_rd), scale_sigma(x_ru))
return y_r
[docs]
def calc_crosswind_spread_xr(self, x_star):
"""
Calculate the standard deviation of crosswind spread σy.
Implements Equations 18-19 from Kljun et al. (2015):
``σy* = ac * sqrt((bc * |X*|^2)/(1 + cc|X*|))``
Where ac=2.17, bc=1.66, cc=20.0 (Equation 19)
Real-scale σy is then obtained through:
``σy = σy*/(scale_const) * zm * σv/u*``
Where scale_const depends on stability (Eq. not numbered in paper):
- For unstable: 1e-5|zm/L|^(-1) + 0.80
- For stable: 1e-5|zm/L|^(-1) + 0.55
Args:
x_star (float or array): Distance from receptor [m]
Returns:
Standard deviation of crosswind spread [m]
"""
try:
# Calculate scaled crosswind spread σy* using Eq. 18
sigma_y_star = self.ac * np.sqrt(
(self.bc * x_star**2) / (1 + self.cc * x_star)
)
scale_const = self.scale_crosswind_dispersion(sigma_y_star)
# Convert to real scale
sigma_y = (
sigma_y_star
/ scale_const
* self.ds["zm"]
* self.ds["sigmav"]
/ self.ds["ustar"]
)
return sigma_y
except Exception as e:
self.logger.error(f"Error calculating crosswind spread: {str(e)}")
raise
[docs]
def calc_crosswind_spread(
self, x: Union[float, np.ndarray]
) -> Union[float, np.ndarray]:
"""
Calculate the standard deviation of cross-wind spread :math:`\sigma_y`.
Implements Eqs. 18-19 from *Kljun et al., 2015*:
.. math::
\sigma_y^* = a_c \sqrt{\frac{b_c \lvert X^* \rvert^2}
{1 + c_c \lvert X^* \rvert}}
\sigma_y = \frac{\sigma_y^*}{\text{scale\_const}}
\; z_m \; \sigma_v / u_*
where
* :math:`a_c = 2.17`
* :math:`b_c = 1.66`
* :math:`c_c = 20.0`
* ``scale_const`` depends on stability (see paper).
Parameters
----------
x : float or ndarray
Up-wind distance from the receptor [m].
Returns
-------
float or ndarray
Cross-wind spread :math:`\sigma_y` [m].
"""
# Get mean values of stability parameters for consistent calculation
zm_mean = float(self.ds["zm"].mean())
h_mean = float(self.ds["h"].mean())
u_zm_mean = float(self.ds["umean"].mean())
ustar_mean = float(self.ds["ustar"].mean())
sigmav_mean = float(self.ds["sigmav"].mean())
ol_mean = float(self.ds["ol"].mean())
# Calculate mean stability parameter
stability_param_mean = zm_mean / ol_mean
# Calculate scaled distance X*
x_star = self.calc_scaled_x(x)
# Calculate scaled crosswind spread σy* using Eq. 18
# Parameters from Eq. 19: ac = 2.17, bc = 1.66, cc = 20.0
sigma_y_star = self.ac * np.sqrt((self.bc * x_star**2) / (1 + self.cc * x_star))
# Calculate stability-dependent scaling constant
if stability_param_mean <= 0: # Convective
scale_const = 1e-5 * abs(stability_param_mean) ** (-1) + 0.80
else: # Stable
scale_const = 1e-5 * abs(stability_param_mean) ** (-1) + 0.55
# Limit scaling constant to maximum of 1.0
scale_const = min(scale_const, 1.0)
# Convert to real scale (Eq. 18)
sigma_y = sigma_y_star / scale_const * zm_mean * sigmav_mean / ustar_mean
return sigma_y
[docs]
def run(self, return_result: bool = True) -> Optional[xr.Dataset]:
"""
Execute the FFP model calculations.
Args:
return_result: If True, returns complete results dataset
Returns:
Optional[xr.Dataset]: Dataset containing footprint results
"""
self.logger.info("Starting FFP model calculations...")
try:
# Ensure domain is properly initialized
if not hasattr(self, "x") or not hasattr(self, "y"):
self.define_domain()
# Initialize fclim_2d with zeros before calculations
self.fclim_2d = xr.DataArray(
np.zeros((len(self.x), len(self.y))),
dims=("x", "y"),
coords={"x": self.x, "y": self.y},
)
# Perform main calculations
self.fclim_2d = self.calc_xr_footprint() # Now returns fclim_2d directly
# Add validation checks
if self.fclim_2d is None:
self.logger.error("fclim_2d is None after calculations")
raise ValueError("Footprint climatology calculation failed")
if not isinstance(self.fclim_2d, xr.DataArray):
self.logger.error(
f"fclim_2d is not a DataArray, got {type(self.fclim_2d)}"
)
raise ValueError("Invalid footprint climatology type")
if np.all(np.isnan(self.fclim_2d)):
self.logger.error("All values in fclim_2d are NaN")
raise ValueError("Invalid footprint climatology values")
if return_result:
self.logger.debug(f"Domain shapes - x: {len(self.x)}, y: {len(self.y)}")
self.logger.debug(f"Footprint climatology shape: {self.fclim_2d.shape}")
self.logger.debug(
f"Footprint climatology stats - min: {float(self.fclim_2d.min())}, max: {float(self.fclim_2d.max())}"
)
try:
# Create base results dataset with explicit dimensions
results = xr.Dataset(
{
"footprint_climatology": self.fclim_2d, # Use DataArray directly
"domain_x": ("x", self.x),
"domain_y": ("y", self.y),
}
)
if hasattr(self, "f_2d") and self.f_2d is not None:
if "time" in self.f_2d.dims:
results["footprint_2d"] = self.f_2d
else:
self.logger.warning("f_2d missing time dimension")
# Add source areas if calculated
if hasattr(self, "source_areas") and self.source_areas is not None:
for r_level, area_dict in self.source_areas.items():
for key, value in area_dict.items():
if isinstance(value, xr.DataArray):
results[f"{r_level}_{key}"] = value
self.logger.info("FFP model calculations completed successfully")
return results
except Exception as e:
self.logger.error(f"Error creating results dataset: {str(e)}")
raise
except Exception as e:
self.logger.error(f"Error in FFP calculations: {str(e)}")
self.logger.debug("Traceback:", exc_info=True) # Add full traceback
raise
self.logger.info("FFP model calculations completed")
return None
[docs]
def save_results(self, filename: str):
"""
Save model results to a netCDF file.
Args:
filename: Path where the results should be saved
"""
if getattr(self, "f_2d", None) is None or getattr(self, "fclim_2d", None) is None:
raise RuntimeError("No results to save. Did you call run() first?")
results = xr.Dataset(
{
"footprint_2d": self.f_2d, # expect dims (time, x, y) or similar
"footprint_climatology": self.fclim_2d, # dims (x, y)
"domain_x": xr.DataArray(self.x, dims=["x"]),
"domain_y": xr.DataArray(self.y, dims=["y"]),
}
)
# Store parameters as attributes (NetCDF-safe)
results.attrs.update(
{
"crop_height": float(self.df["h_c"].iloc[0]),
"inst_height": float(self.df["zm"].iloc[0]) + float(self.df["h_c"].iloc[0]),
"atm_bound_height": float(self.df["h"].iloc[0]),
}
)
results.to_netcdf(filename)
self.logger.info(f"Results saved to {filename}")