"""
ffp_daily_monthly_helper.py (Refactored for FFPModel)
Summarize flux footprints from `improved_ffp.FFPModel` to daily/monthly
periods and (optionally) export 80% source-area contours to a GeoPackage.
Updated to use FFPModel as the canonical implementation.
"""
from __future__ import annotations
import math
import logging
from dataclasses import dataclass
from typing import Dict, Optional, Tuple, List, Union, Any
from pathlib import Path
import numpy as np
import pandas as pd
import xarray as xr
from rasterio import features
from affine import Affine
import rasterio
from rasterio.transform import from_origin
import csv
import geopandas as gpd
from shapely.geometry import Polygon
from pyproj import CRS, Transformer
# Use FFPModel as canonical implementation
from .improved_ffp import FFPModel
# Import models
from .kormannmeixner_adapter import KormannMeixnerModel
from .ls_footprint_adapter import LSFootprintModelAdapter
from .wang_footprint_adapter import WangFootprintModel
from .base_footprint_model import BaseFootprintModel
from .ffp_xr import ffp_climatology_new
# Keep backward compatibility option
_LEGACY_MODE = False
try:
from .ffp_xr import ffp_climatology_new as LegacyFFP
except ImportError:
LegacyFFP = None
_LEGACY_MODE = False
# ------------------------------
# Config & Data Loading
# ------------------------------
def load_config(ini_path: str | Path) -> Dict[str, Any]:
"""Read a minimal INI for site metadata and column names."""
import configparser
cp = configparser.ConfigParser(interpolation=None)
cp.read(ini_path)
def _getfloat(section, key, fallback=None):
try:
return cp.getfloat(section, key, fallback=fallback)
except Exception:
return fallback
cfg = dict(
station_latitude=_getfloat("METADATA", "station_latitude"),
station_longitude=_getfloat("METADATA", "station_longitude"),
missing_data_value=_getfloat("METADATA", "missing_data_value", -9999.0),
skiprows=int(cp.get("METADATA", "skiprows", fallback="0")),
date_parser=cp.get("METADATA", "date_parser", fallback="%Y%m%d%H%M"),
ts_col=cp.get("DATA", "datestring_col", fallback="TIMESTAMP_START"),
# Column names used by the footprint model
wind_dir_col=cp.get("DATA", "wind_dir_col", fallback="WD"),
wind_spd_col=cp.get("DATA", "wind_spd_col", fallback="WS"),
ustar_col=cp.get("DATA", "USTAR", fallback="USTAR"),
mo_length_col=cp.get("DATA", "MO_LENGTH", fallback="MO_LENGTH"),
v_sigma_col=cp.get("DATA", "V_SIGMA", fallback="V_SIGMA"),
)
return cfg
def load_amf_df(csv_path: str | Path, cfg: Dict[str, Any]) -> pd.DataFrame:
"""Load AMF half-hourly CSV and return a tidy DataFrame indexed by time."""
csv_path = Path(csv_path)
if not csv_path.exists():
raise FileNotFoundError(f"CSV not found: {csv_path}")
df = pd.read_csv(csv_path, skiprows=cfg.get("skiprows", 0))
# Parse timestamps
ts_col = cfg.get("ts_col", "TIMESTAMP_START")
date_fmt = cfg.get("date_parser", "%Y%m%d%H%M")
df[ts_col] = pd.to_datetime(
df[ts_col].astype(str), format=date_fmt, errors="coerce"
)
df = df.set_index(ts_col).sort_index()
# Replace missing sentinel with NaN
mv = cfg.get("missing_data_value", -9999.0)
df = df.replace(mv, np.nan)
return df
# ------------------------------
# Build & Run Climatology
# ------------------------------
[docs]
def build_climatology(
df: pd.DataFrame,
model_type: str = "ffp",
crop_height: float = 0.2,
atm_bound_height: float = 2000.0,
inst_height: float = 2.5,
dx: float = 10.0,
dy: float = 10.0,
domain: Tuple[float, float, float, float] = (-1000.0, 1000.0, -1000.0, 1000.0),
rs: list = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8],
smooth_data: bool = True,
verbosity: int = 2,
use_legacy: bool = False,
logger: Optional[logging.Logger] = None,
**model_kwargs,
) -> BaseFootprintModel:
"""
Prepare required columns and run the footprint model.
Parameters
----------
df : pd.DataFrame
Input DataFrame with required columns (WD, WS, USTAR, MO_LENGTH, V_SIGMA)
crop_height : float
Vegetation/canopy height (m)
atm_bound_height : float
Atmospheric boundary layer height (m)
inst_height : float
Instrument measurement height (m)
dx, dy : float
Grid resolution (m)
domain : tuple
(xmin, xmax, ymin, ymax) in meters
rs : list
Source area fractions to compute (0-1)
smooth_data : bool
Apply Gaussian smoothing
verbosity : int
Logging verbosity (0=silent, 2=debug)
use_legacy : bool
Use legacy ffp_xr implementation (for comparison)
logger : logging.Logger
Optional logger instance
Returns
-------
FFPModel or LegacyFFP
Footprint model instance with computed results
"""
if logger is None:
logger = logging.getLogger(__name__)
# Model registry
models = {
"ffp": FFPModel,
"ffp_xr": ffp_climatology_new,
"kormann-meixner": KormannMeixnerModel,
"km": KormannMeixnerModel,
"lagrangian": LSFootprintModelAdapter,
"ls": LSFootprintModelAdapter,
"wang": WangFootprintModel,
"wang2006": WangFootprintModel,
}
model_type = model_type.lower()
if model_type not in models:
raise ValueError(
f"Unknown model type '{model_type}'. "
f"Choose from: {list(models.keys())}"
)
ModelClass = models[model_type]
# Common parameters
common_params = dict(
df=df,
domain=list(domain),
dx=float(dx),
dy=float(dy),
rs=rs,
crop_height=float(crop_height),
atm_bound_height=float(atm_bound_height),
inst_height=float(inst_height),
smooth_data=smooth_data,
verbosity=verbosity,
logger=logger,
)
# Add model-specific parameters
common_params.update(model_kwargs)
# Map column names to FFPModel expectations
df_mapped = df.rename(
columns={
"WD": "wind_dir",
"WS": "umean",
"USTAR": "ustar",
"MO_LENGTH": "ol",
"V_SIGMA": "sigmav",
}
).copy()
# Check for required columns
required = ["wind_dir", "umean", "ustar", "ol", "sigmav"]
missing_cols = [c for c in required if c not in df_mapped.columns]
if missing_cols:
raise ValueError(f"Missing required columns: {missing_cols}")
# Create and run model
try:
logger.info(f"Initializing {model_type} model...")
model = ModelClass(**common_params)
logger.info("Running footprint calculation...")
model.run(return_result=True)
#model.normalize()
logger.info("Footprint calculation completed successfully")
return model
except Exception as e:
logger.error(f"Model {model_type} failed: {e}")
raise
# ------------------------------
# Daily/Monthly Summaries
# ------------------------------
def _ensure_time_normalized(f_2d: xr.DataArray) -> xr.DataArray:
"""Normalize each time-slice so sum over x,y = 1."""
return f_2d / f_2d.sum(dim=("x", "y"))
def _et_from_le(df: pd.DataFrame, le_col: str = "LE") -> pd.Series:
"""
Convert latent energy (LE, W/m^2) to ET in mm/hr.
1 mm water over 1 m^2 in 1 hour corresponds to ~ 680.6 W/m^2.
"""
if le_col not in df.columns:
raise ValueError(f"LE column '{le_col}' not found in DataFrame")
return df[le_col] / 680.6
@dataclass
class SummaryResult:
f_daily_mean: Optional[xr.DataArray] = None
f_monthly_mean: Optional[xr.DataArray] = None
f_daily_et_weighted: Optional[xr.DataArray] = None
f_monthly_et_weighted: Optional[xr.DataArray] = None
def summarize_periods(
model: BaseFootprintModel,
df: pd.DataFrame,
et_source: str = "LE",
daily: bool = True,
monthly: bool = True,
normalize_each_time: bool = True,
) -> SummaryResult:
"""
Build daily/monthly summaries from model.f_2d (time,x,y).
Parameters
----------
model : FFPModel
Fitted footprint model with f_2d attribute
df : pd.DataFrame
Original input DataFrame (for ET data)
et_source : str
Column name for latent energy flux (W/m²)
daily : bool
Compute daily aggregations
monthly : bool
Compute monthly aggregations
normalize_each_time : bool
Normalize each time slice before aggregation
Returns
-------
SummaryResult
Object containing requested daily/monthly footprints
"""
# Get f_2d from model
if not hasattr(model, "f_2d") or model.f_2d is None:
raise ValueError(
"Model must have f_2d attribute computed (call model.run() first)"
)
# Get f_2d from model using standard interface
f_2d = model.get_footprint_timeseries()
if f_2d is None:
raise ValueError(
"Model does not support time-resolved footprints. "
"Only climatology available."
)
f = f_2d
#f = model.f_2d
if normalize_each_time:
f = _ensure_time_normalized(f)
res = SummaryResult()
# Mean footprints
if daily:
res.f_daily_mean = f.resample(time="1D").mean()
if monthly:
res.f_monthly_mean = f.resample(time="MS").mean()
# ET-weighted aggregation
et_mmhr = _et_from_le(df, le_col=et_source).reindex(
f["time"].to_index(), method=None
)
et_da = xr.DataArray(et_mmhr.values, coords={"time": f["time"]}, dims=("time",))
weighted = f * et_da
if daily:
num = weighted.resample(time="1D").sum()
den = et_da.resample(time="1D").sum()
res.f_daily_et_weighted = num / den
if monthly:
num = weighted.resample(time="MS").sum()
den = et_da.resample(time="MS").sum()
res.f_monthly_et_weighted = num / den
return res
# ------------------------------
# Export utilities
# ------------------------------
def _choose_utm_epsg(lon: float, lat: float) -> int:
"""Pick a UTM EPSG (NAD83 / WGS84 UTM) from lon/lat."""
zone = int(math.floor((lon + 180.0) / 6.0) + 1)
return int(f"326{zone:02d}")
def _make_contour_polygon_from_field_alt(
z2d: np.ndarray,
x: np.ndarray,
y: np.ndarray,
level: float = 0.8,
method: str = "auto",
):
"""
Extract polygon boundary for cumulative-source threshold using
scikit-image or rasterio vectorization.
"""
z = np.array(z2d, dtype=float, copy=True)
z[z < 0] = 0.0
s = z.sum()
if not np.isfinite(s) or s <= 0:
return None
z /= s
# Determine threshold by sorting descending
flat = z.ravel()
idx = np.argsort(flat)[::-1]
cum = np.cumsum(flat[idx])
thr_idx = np.searchsorted(cum, float(level), side="left")
thr_val = flat[idx[thr_idx]] if thr_idx < flat.size else flat[idx[-1]]
mask = (z >= thr_val).astype(np.uint8)
x = np.asarray(x)
y = np.asarray(y)
nx, ny = x.size, y.size
col_ix = np.arange(nx, dtype=float)
row_ix = np.arange(ny, dtype=float)
def map_cols_to_x(cols):
return np.interp(cols, col_ix, x)
def map_rows_to_y(rows):
return np.interp(rows, row_ix, y)
# Try scikit-image first
if method in ("auto", "skimage"):
try:
from skimage import measure
conts = measure.find_contours(mask, level=0.5)
if conts:
arr = max(conts, key=lambda a: a.shape[0])
rows, cols = arr[:, 0], arr[:, 1]
xv = map_cols_to_x(cols)
yv = map_rows_to_y(rows)
return np.column_stack([xv, yv])
except Exception:
if method == "skimage":
return None
# Fallback to rasterio
if method in ("auto", "rasterio"):
try:
if nx < 2 or ny < 2:
return None
xres = float(x[1] - x[0])
yres = float(y[1] - y[0])
left = x.min() - xres / 2.0
top = y.max() + yres / 2.0
transform = Affine.translation(left, top) * Affine.scale(xres, -yres)
geoms = list(features.shapes(mask, mask=None, transform=transform))
polys = [g for g, v in geoms if int(v) == 1]
if not polys:
return None
def _poly_area(coords):
try:
exterior = coords[0]
A = 0.0
for i in range(len(exterior) - 1):
x0, y0 = exterior[i]
x1, y1 = exterior[i + 1]
A += x0 * y1 - x1 * y0
return abs(A) * 0.5
except Exception:
return 0.0
polys.sort(key=lambda g: _poly_area(g["coordinates"]), reverse=True)
ext = polys[0]["coordinates"][0]
return np.asarray(ext, dtype=float)
except Exception:
if method == "rasterio":
return None
return None
def export_contours_gpkg(
model: FFPModel,
summaries: SummaryResult,
df: pd.DataFrame,
station_lat: float,
station_lon: float,
gpkg_path: str | Path,
crs_out: str | int = "auto",
levels: tuple = (0.8,),
stats_csv: str | Path | None = None,
centroid_out: str | int = 4326,
rn_col: str | None = None,
h_col: str | None = None,
le_col: str | None = None,
g_col: str | None = None,
contour_method: str = "auto",
) -> Path:
"""
Export source-area contours to GeoPackage with energy balance stats.
Adapted to work with FFPModel output structure.
"""
# Get grid coordinates from model
x = model.x
y = model.y
# CRS setup
if crs_out == "auto":
epsg = _choose_utm_epsg(station_lon, station_lat)
dst_crs = CRS.from_epsg(epsg)
else:
dst_crs = CRS.from_user_input(crs_out)
wgs84 = CRS.from_epsg(4326)
to_proj = Transformer.from_crs(wgs84, dst_crs, always_xy=True)
to_wgs = Transformer.from_crs(dst_crs, wgs84, always_xy=True)
try:
centroid_crs = CRS.from_user_input(centroid_out)
except Exception:
centroid_crs = CRS.from_epsg(4326)
to_centroid = Transformer.from_crs(dst_crs, centroid_crs, always_xy=True)
to_wgs_cent = Transformer.from_crs(dst_crs, wgs84, always_xy=True)
# Energy balance helpers
def _pick(dfcols, preferred, fallbacks):
if preferred and preferred in dfcols:
return preferred
for f in fallbacks:
for c in dfcols:
if f.lower() in c.lower():
return c
return None
cols = list(df.columns)
rn_c = _pick(cols, rn_col, ["NETRAD", "RN", "RNET"])
h_c = _pick(cols, h_col, ["H"])
le_c = _pick(cols, le_col, ["LE", "LE_F_MDS", "LE_QC"])
g_c = _pick(cols, g_col, ["G", "G_1_1_1", "SHF", "SOIL_HEAT"])
def _eb_stats(slice_df: pd.DataFrame) -> dict:
out = {}
try:
rn = pd.to_numeric(slice_df[rn_c], errors="coerce") if rn_c else None
h = pd.to_numeric(slice_df[h_c], errors="coerce") if h_c else None
le = pd.to_numeric(slice_df[le_c], errors="coerce") if le_c else None
g = pd.to_numeric(slice_df[g_c], errors="coerce") if g_c else None
rn_m = float(np.nanmean(rn)) if rn is not None else np.nan
h_m = float(np.nanmean(h)) if h is not None else np.nan
le_m = float(np.nanmean(le)) if le is not None else np.nan
g_m = float(np.nanmean(g)) if g is not None else np.nan
resid_m = rn_m - sum(v for v in (h_m, le_m, g_m) if np.isfinite(v))
out.update(
dict(
Rn_mean_Wm2=rn_m,
H_mean_Wm2=h_m,
LE_mean_Wm2=le_m,
G_mean_Wm2=g_m,
Residual_mean_Wm2=resid_m,
)
)
if np.isfinite(rn_m) and rn_m != 0.0:
total = sum(v for v in (h_m, le_m, g_m) if np.isfinite(v))
out["Closure_frac"] = total / rn_m
out["Residual_frac_of_Rn"] = resid_m / rn_m
else:
out["Closure_frac"] = np.nan
out["Residual_frac_of_Rn"] = np.nan
except Exception:
out = dict(
Rn_mean_Wm2=np.nan,
H_mean_Wm2=np.nan,
LE_mean_Wm2=np.nan,
G_mean_Wm2=np.nan,
Residual_mean_Wm2=np.nan,
Closure_frac=np.nan,
Residual_frac_of_Rn=np.nan,
)
return out
# Prepare output
gpkg_path = Path(gpkg_path)
if gpkg_path.exists():
gpkg_path.unlink()
stats_records = []
x0, y0 = to_proj.transform(station_lon, station_lat)
def _to_world_coords(vertices_xy: np.ndarray | None):
if vertices_xy is None:
return None
vx = vertices_xy[:, 0] + x0
vy = vertices_xy[:, 1] + y0
return Polygon(np.column_stack([vx, vy]))
# Layer collection helper
def _collect_layer(da: Optional[xr.DataArray], base_layer: str):
if da is None:
return
for r in levels:
records = []
for i in range(da.sizes["time"]):
z = da.isel(time=i).values
verts = _make_contour_polygon_from_field_alt(
z, x, y, level=float(r), method=contour_method
)
poly = _to_world_coords(verts)
if poly is None or poly.is_empty:
continue
ts = pd.Timestamp(da["time"].values[i]).isoformat()
# EB stats
if base_layer.startswith("daily"):
sdf = df.loc[ts[:10]] if ts else df
else:
sdf = df.loc[ts[:7]] if ts else df
eb = _eb_stats(sdf)
rec = {"time": ts, "r": float(r), **eb, "geometry": poly}
records.append(rec)
if not records:
continue
lname = f"{base_layer}_r{int(round(float(r) * 100))}"
gdf = gpd.GeoDataFrame(records, geometry="geometry", crs=dst_crs)
# Metrics
gdf["area_m2"] = gdf.geometry.area
gdf["perimeter_m"] = gdf.geometry.length
gdf["area_ha"] = gdf["area_m2"] / 10000.0
gdf["area_acres"] = gdf["area_m2"] / 4046.85642
# Centroids
cx = gdf.geometry.centroid.x.values
cy = gdf.geometry.centroid.y.values
cx_out, cy_out = to_centroid.transform(cx, cy)
lon, lat = to_wgs_cent.transform(cx, cy)
gdf["centroid_x"] = cx
gdf["centroid_y"] = cy
gdf["centroid_out_x"] = cx_out
gdf["centroid_out_y"] = cy_out
gdf["centroid_lon"] = lon
gdf["centroid_lat"] = lat
gdf["layer"] = lname
# ET stats
try:
if le_c and le_c in df.columns:
if base_layer.startswith("daily"):
tstamp = pd.to_datetime(gdf["time"].iloc[0])
et_slice = df.loc[str(tstamp.date()), le_c]
else:
tstamp = pd.to_datetime(gdf["time"].iloc[0])
et_slice = df.loc[tstamp.strftime("%Y-%m"), le_c]
et_vals = (
pd.to_numeric(et_slice, errors="coerce").dropna().values / 680.6
)
if et_vals.size > 0:
gdf["ET_mean_mmhr"] = float(np.nanmean(et_vals))
gdf["ET_sum_mm"] = float(np.nansum(et_vals * 0.5))
except Exception:
pass
gdf.to_file(gpkg_path, layer=lname, driver="GPKG")
stats_records.extend(gdf.drop(columns="geometry").to_dict(orient="records"))
# Write layers
_collect_layer(summaries.f_daily_mean, "daily_mean")
_collect_layer(summaries.f_monthly_mean, "monthly_mean")
_collect_layer(summaries.f_daily_et_weighted, "daily_etw")
_collect_layer(summaries.f_monthly_et_weighted, "monthly_etw")
# Optional stats CSV
if stats_csv and stats_records:
stats_df = pd.DataFrame(stats_records)
stats_df.to_csv(stats_csv, index=False)
return gpkg_path
# Remaining functions (export_rasters_geotiff, export_contour_stats_csv) remain the same
# but should use model.x, model.y instead of clim.x, clim.y
def export_rasters_geotiff(
model: FFPModel,
summaries: SummaryResult,
station_lat: float,
station_lon: float,
out_dir: str | Path,
crs_out: str | int = "auto",
which=("daily_mean", "monthly_mean", "daily_etw", "monthly_etw"),
prefix="ffp",
dtype="float32",
nodata=0.0,
) -> Path:
"""Export each time slice as GeoTIFF (adapted for FFPModel)."""
out_dir = Path(out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
x = np.asarray(model.x)
y = np.asarray(model.y)
if x.size < 2 or y.size < 2:
raise ValueError("x/y grid must have at least 2 points each")
xres = float(x[1] - x[0])
yres = float(y[1] - y[0])
if crs_out == "auto":
epsg = _choose_utm_epsg(station_lon, station_lat)
dst_crs = CRS.from_epsg(epsg)
else:
dst_crs = CRS.from_user_input(crs_out)
wgs84 = CRS.from_epsg(4326)
to_proj = Transformer.from_crs(wgs84, dst_crs, always_xy=True)
x0, y0 = to_proj.transform(station_lon, station_lat)
left = (x.min() + x0) - xres / 2.0
top = (y.max() + y0) + yres / 2.0
transform = from_origin(left, top, xres, yres)
def _write_series(da, layername):
if da is None:
return
times = pd.to_datetime(da["time"].values)
for i, ts in enumerate(times):
arr = da.isel(time=i).values.astype(dtype)
if y[1] > y[0]:
arr = np.flipud(arr)
if layername.startswith("daily"):
suf = ts.strftime("%Y%m%d")
else:
suf = ts.strftime("%Y%m")
fp = out_dir / f"{prefix}_{layername}_{suf}.tif"
with rasterio.open(
fp,
"w",
driver="GTiff",
height=arr.shape[0],
width=arr.shape[1],
count=1,
dtype=dtype,
crs=dst_crs,
transform=transform,
nodata=nodata,
compress="deflate",
predictor=3,
tiled=True,
blockxsize=256,
blockysize=256,
) as dst:
dst.write(arr, 1)
layer_map = {
"daily_mean": summaries.f_daily_mean,
"monthly_mean": summaries.f_monthly_mean,
"daily_etw": summaries.f_daily_et_weighted,
"monthly_etw": summaries.f_monthly_et_weighted,
}
for name in which:
if name not in layer_map:
raise ValueError(f"Unknown layer '{name}'")
_write_series(layer_map[name], name)
return Path(out_dir)
def export_contour_stats_csv(
df: pd.DataFrame,
model: FFPModel,
summaries: SummaryResult,
station_lat: float,
station_lon: float,
csv_path: str | Path,
rn_col: str | None = None,
h_col: str | None = None,
le_col: str | None = None,
g_col: str | None = None,
method: str = "auto",
crs_out: str | int = "auto",
levels=(0.8,),
) -> Path:
"""Export contour stats to CSV (adapted for FFPModel)."""
x = model.x
y = model.y
if crs_out == "auto":
epsg = _choose_utm_epsg(station_lon, station_lat)
dst_crs = CRS.from_epsg(epsg)
else:
dst_crs = CRS.from_user_input(crs_out)
wgs84 = CRS.from_epsg(4326)
to_proj = Transformer.from_crs(wgs84, dst_crs, always_xy=True)
to_wgs = Transformer.from_crs(dst_crs, wgs84, always_xy=True)
x0, y0 = to_proj.transform(station_lon, station_lat)
def _to_world_coords(vertices_xy):
if vertices_xy is None:
return None
vx = vertices_xy[:, 0] + x0
vy = vertices_xy[:, 1] + y0
return Polygon(np.column_stack([vx, vy]))
csv_path = Path(csv_path)
with open(csv_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(
["layer", "time", "r", "area_ha", "centroid_lon", "centroid_lat"]
)
def _process_layer(da: Optional[xr.DataArray], base_layer: str):
if da is None:
return
for r in levels:
for i in range(da.sizes["time"]):
z = da.isel(time=i).values
verts = _make_contour_polygon_from_field_alt(
z, x, y, level=float(r), method=method
)
poly = _to_world_coords(verts)
if poly is None or poly.is_empty:
continue
ts = pd.Timestamp(da["time"].values[i]).isoformat()
area_ha = poly.area / 10000.0
cx, cy = poly.centroid.x, poly.centroid.y
clon, clat = to_wgs.transform(cx, cy)
writer.writerow([base_layer, ts, float(r), area_ha, clon, clat])
_process_layer(summaries.f_daily_mean, "daily_mean")
_process_layer(summaries.f_monthly_mean, "monthly_mean")
_process_layer(summaries.f_daily_et_weighted, "daily_etw")
_process_layer(summaries.f_monthly_et_weighted, "monthly_etw")
return csv_path