"""Abstract base solver for lid-driven cavity problem."""
from abc import ABC, abstractmethod
import os
import time
import numpy as np
import mlflow
from dataclasses import asdict
from .datastructures import TimeSeries, Metrics, Fields
[docs]
class LidDrivenCavitySolver(ABC):
"""Abstract base solver for lid-driven cavity problem.
Handles:
- Parameter management (input configuration)
- Metrics tracking (output results)
- Iteration loop with residual computation
- MLflow logging
Subclasses must:
- Set Parameters class attribute (e.g., FVParameters)
- Implement step() - perform one iteration
- Call _init_fields(x, y) after setting up grid
- Implement _compute_algebraic_residuals() for Ax-b residuals
"""
Parameters = None # Subclasses set this to FVParameters or SpectralParameters
def __init__(self, params=None, **kwargs):
"""Initialize solver with parameters.
Parameters
----------
params : Parameters, optional
Parameters object. If not provided, kwargs are used to create params.
**kwargs
Configuration parameters passed to Parameters class if params is None.
"""
if params is None:
if self.Parameters is None:
raise ValueError("Subclass must define Parameters class attribute")
params = self.Parameters(**kwargs)
self.params = params
self.metrics = Metrics()
self.fields = None # Initialized by subclass via _init_fields()
self.time_series = None # Populated after solve()
def _init_fields(self, x: np.ndarray, y: np.ndarray):
"""Initialize output fields with grid coordinates.
Called by subclass after setting up grid. Pre-allocates the Fields
dataclass that will hold the final solution.
Parameters
----------
x : np.ndarray
X coordinates of all grid points (1D array)
y : np.ndarray
Y coordinates of all grid points (1D array)
"""
n_points = len(x)
self.fields = Fields(
u=np.zeros(n_points),
v=np.zeros(n_points),
p=np.zeros(n_points),
x=x.copy(),
y=y.copy(),
)
[docs]
@abstractmethod
def step(self):
"""Perform one iteration/time step of the solver.
Returns
-------
u, v, p : np.ndarray
Updated velocity and pressure fields
"""
pass
def _finalize_fields(self):
"""Copy final solution from internal arrays to output fields.
Default implementation copies directly. Override if transformation
is needed (e.g., spectral pressure interpolation from inner grid).
"""
self.fields.u[:] = self.arrays.u
self.fields.v[:] = self.arrays.v
self.fields.p[:] = self.arrays.p
@abstractmethod
def _compute_algebraic_residuals(self):
"""Compute algebraic residuals (Ax - b) for the discretized equations.
Returns
-------
dict
Dictionary with keys 'u_residual', 'v_residual', 'continuity_residual'
containing L2 norms of the algebraic residuals.
"""
pass
def _store_results(
self,
residual_history,
final_iter_count,
is_converged,
wall_time,
energy_history=None,
enstrophy_history=None,
palinstrophy_history=None,
max_timeseries_points: int = 1000,
):
"""Store solve results in self.fields, self.time_series, and self.metrics."""
# Extract residuals
rel_iter_residuals = [r["rel_iter"] for r in residual_history]
u_residuals = [r["u_eq"] for r in residual_history]
v_residuals = [r["v_eq"] for r in residual_history]
continuity_residuals = [r.get("continuity", None) for r in residual_history]
# Check if all continuity residuals are None
if all(c is None for c in continuity_residuals):
continuity_residuals = None
# Copy final solution to output fields
self._finalize_fields()
# Downsample time series to max_timeseries_points
def downsample(data):
if data is None or len(data) <= max_timeseries_points:
return data
indices = np.linspace(0, len(data) - 1, max_timeseries_points, dtype=int)
return [data[i] for i in indices]
# Create time series (downsampled)
self.time_series = TimeSeries(
rel_iter_residual=downsample(rel_iter_residuals),
u_residual=downsample(u_residuals),
v_residual=downsample(v_residuals),
continuity_residual=downsample(continuity_residuals),
energy=downsample(energy_history),
enstrophy=downsample(enstrophy_history),
palinstrophy=downsample(palinstrophy_history),
)
# Update metrics with convergence info (use FINAL values, not downsampled)
self.metrics = Metrics(
iterations=final_iter_count,
converged=is_converged,
final_residual=rel_iter_residuals[-1]
if rel_iter_residuals
else float("inf"),
wall_time_seconds=wall_time,
u_momentum_residual=u_residuals[-1] if u_residuals else 0.0,
v_momentum_residual=v_residuals[-1] if v_residuals else 0.0,
continuity_residual=continuity_residuals[-1]
if continuity_residuals
else 0.0,
final_energy=energy_history[-1] if energy_history else 0.0,
final_enstrophy=enstrophy_history[-1] if enstrophy_history else 0.0,
final_palinstrophy=palinstrophy_history[-1]
if palinstrophy_history
else 0.0,
)
[docs]
def solve(self, tolerance: float = None, max_iter: int = None):
"""Solve the lid-driven cavity problem using iterative stepping.
This method implements the common iteration loop with residual calculation.
Subclasses implement step() to define one iteration.
Stores results in solver attributes:
- self.fields : Fields dataclass with solution fields
- self.time_series : TimeSeries dataclass with time series data
- self.metrics : Metrics dataclass with solver metrics
Parameters
----------
tolerance : float, optional
Convergence tolerance. If None, uses params.tolerance.
max_iter : int, optional
Maximum iterations. If None, uses params.max_iterations.
"""
# Use params values if not explicitly provided
if tolerance is None:
tolerance = self.params.tolerance
if max_iter is None:
max_iter = self.params.max_iterations
# Store previous iteration for residual calculation
u_prev = self.arrays.u.copy()
v_prev = self.arrays.v.copy()
# Residual history
residual_history = []
# Quantity tracking (energy, enstrophy, palinstrophy)
energy_history = []
enstrophy_history = []
palinstrophy_history = []
time_start = time.time()
mlflow_time = 0.0 # Track time spent on MLflow logging
final_iter_count = 0
is_converged = False
for i in range(max_iter):
final_iter_count = i + 1
# Perform one iteration
self.arrays.u, self.arrays.v, self.arrays.p = self.step()
# Calculate normalized solution change: ||u^{n+1} - u^n||_2 / ||u^n||_2
u_change_norm = np.linalg.norm(self.arrays.u - u_prev)
v_change_norm = np.linalg.norm(self.arrays.v - v_prev)
u_prev_norm = np.linalg.norm(u_prev) + 1e-12
v_prev_norm = np.linalg.norm(v_prev) + 1e-12
u_solution_change = u_change_norm / u_prev_norm
v_solution_change = v_change_norm / v_prev_norm
rel_iter_residual = max(u_solution_change, v_solution_change)
# Compute algebraic residuals (Ax - b)
eq_residuals = self._compute_algebraic_residuals()
# Only store residual history after first 10 iterations
if i >= 10:
residual_history.append(
{
"rel_iter": rel_iter_residual,
"u_eq": eq_residuals["u_residual"],
"v_eq": eq_residuals["v_residual"],
"continuity": eq_residuals.get("continuity_residual", None),
}
)
# Calculate and store conserved quantities
energy_history.append(self._compute_energy())
enstrophy_history.append(self._compute_enstrophy())
palinstrophy_history.append(self._compute_palinstrophy())
# Update previous iteration
u_prev = self.arrays.u.copy()
v_prev = self.arrays.v.copy()
# Check convergence (only after warmup period)
if i >= 10:
is_converged = rel_iter_residual < tolerance
else:
is_converged = False
if i % 50 == 0 or is_converged:
print(
f"Iteration {i}: u_res={u_solution_change:.6e}, v_res={v_solution_change:.6e}"
)
# Live MLflow logging every 50 iterations (timed separately)
if mlflow.active_run():
t_log_start = time.time()
live_metrics = {
"rel_iter_residual": rel_iter_residual,
"u_residual": eq_residuals["u_residual"],
"v_residual": eq_residuals["v_residual"],
}
if "continuity_residual" in eq_residuals:
live_metrics["continuity_residual"] = eq_residuals[
"continuity_residual"
]
if i >= 10: # After warmup, also log conserved quantities
live_metrics["energy"] = energy_history[-1]
live_metrics["enstrophy"] = enstrophy_history[-1]
mlflow.log_metrics(live_metrics, step=i)
mlflow_time += time.time() - t_log_start
if is_converged:
print(f"Converged at iteration {i}")
break
time_end = time.time()
wall_time = time_end - time_start - mlflow_time # Exclude MLflow logging time
print(
f"Solver finished in {wall_time:.2f} seconds (excl. {mlflow_time:.2f}s logging)."
)
# Store results
self._store_results(
residual_history,
final_iter_count,
is_converged,
wall_time,
energy_history,
enstrophy_history,
palinstrophy_history,
)
[docs]
def save(self, filepath):
"""Save complete solver state to HDF5 file.
Saves params, metrics, time_series, and fields for later analysis.
Parameters
----------
filepath : str or Path
Output file path (use .h5 extension).
"""
from pathlib import Path
filepath = Path(filepath)
filepath.parent.mkdir(parents=True, exist_ok=True)
import pandas as pd
with pd.HDFStore(filepath, mode="w", complevel=5) as store:
store["params"] = self.params.to_dataframe()
store["metrics"] = self.metrics.to_dataframe()
store["time_series"] = self.time_series.to_dataframe()
store["fields"] = self.fields.to_dataframe()
# =========================================================================
# Conserved Quantity Calculations (for comparison with Saad reference data)
# =========================================================================
def _compute_energy(self) -> float:
"""Compute kinetic energy: E = 0.5 * ∫ (u² + v²) dA."""
u = self.arrays.u
v = self.arrays.v
dA = self._get_cell_area()
return 0.5 * float(np.sum(u * u + v * v) * dA)
def _compute_enstrophy(self) -> float:
"""Compute enstrophy: Z = 0.5 * ∫ ω² dA, where ω = ∂v/∂x - ∂u/∂y."""
omega = self._compute_vorticity()
dA = self._get_cell_area()
return 0.5 * float(np.sum(omega * omega) * dA)
def _compute_palinstrophy(self) -> float:
"""Compute palinstrophy: P = ∫ (∂ω/∂x)² + (∂ω/∂y)² dA."""
omega = self._compute_vorticity()
domega_dx, domega_dy = self._compute_gradient(omega)
dA = self._get_cell_area()
return float(np.sum(domega_dx**2 + domega_dy**2) * dA)
def _compute_gradient(self, field: np.ndarray) -> tuple:
"""Compute gradient of scalar field using finite differences."""
dx, dy = self.dx_min, self.dy_min
shape = getattr(self, "shape_full", (self.params.nx, self.params.ny))
field_2d = np.pad(field.reshape(shape), 1, mode="edge")
df_dx = (field_2d[1:-1, 2:] - field_2d[1:-1, :-2]) / (2 * dx)
df_dy = (field_2d[2:, 1:-1] - field_2d[:-2, 1:-1]) / (2 * dy)
return df_dx.ravel(), df_dy.ravel()
def _compute_vorticity(self) -> np.ndarray:
"""Compute vorticity ω = ∂v/∂x - ∂u/∂y using finite differences."""
dv_dx, _ = self._compute_gradient(self.arrays.v)
_, du_dy = self._compute_gradient(self.arrays.u)
return dv_dx - du_dy
def _get_cell_area(self) -> float:
"""Get cell area for integration. Subclasses should override."""
dx = getattr(self, "dx_min", None)
dy = getattr(self, "dy_min", None)
if dx is not None and dy is not None:
return dx * dy
# Fallback: assume unit domain
n = len(self.arrays.u)
return 1.0 / n
# ========================================================================
# MLflow Integration
# ========================================================================
[docs]
def mlflow_start(
self, experiment_name: str, run_name: str, parent_run_name: str = None
):
"""Start MLflow run and log parameters.
Parameters
----------
experiment_name : str
Name of the MLflow experiment.
run_name : str
Name of the run within the experiment.
parent_run_name : str, optional
If specified, creates a nested run under a parent with this name.
Parent is created if it doesn't exist, or resumed if it does.
"""
mlflow.login()
# Databricks requires absolute paths
experiment_name = f"/Shared/ANA-P3/{experiment_name}"
if mlflow.get_experiment_by_name(experiment_name) is None:
mlflow.create_experiment(name=experiment_name)
mlflow.set_experiment(experiment_name)
# Handle parent run if specified
if parent_run_name:
experiment = mlflow.get_experiment_by_name(experiment_name)
client = mlflow.tracking.MlflowClient()
# Search for existing parent run
runs = client.search_runs(
experiment_ids=[experiment.experiment_id],
filter_string=f"tags.mlflow.runName = '{parent_run_name}' AND tags.is_parent = 'true'",
max_results=1,
)
if runs:
# Resume existing parent
parent_run_id = runs[0].info.run_id
else:
# Create new parent run
parent_run = client.create_run(
experiment_id=experiment.experiment_id,
run_name=parent_run_name,
tags={"is_parent": "true"},
)
parent_run_id = parent_run.info.run_id
# Start nested child run
mlflow.start_run(run_id=parent_run_id, log_system_metrics=False)
mlflow.start_run(run_name=run_name, nested=True, log_system_metrics=True)
self._mlflow_nested = True
else:
mlflow.start_run(log_system_metrics=True, run_name=run_name)
self._mlflow_nested = False
# Log all parameters from the params dataclass
mlflow.log_params(asdict(self.params))
# Log HPC job info if running on LSF cluster
job_id = os.environ.get("LSB_JOBID")
if job_id:
mlflow.set_tag("lsf.job_id", job_id)
job_index = os.environ.get("LSB_JOBINDEX", "")
job_name = os.environ.get("LSB_JOBNAME", "")
description = f"HPC Job: {job_name} (ID: {job_id}"
if job_index:
description += f", Index: {job_index}"
description += ")"
mlflow.set_tag("mlflow.note.content", description)
[docs]
def mlflow_end(self):
"""End MLflow run and log final metrics."""
# Log final metrics from the metrics dataclass
mlflow.log_metrics(asdict(self.metrics))
# End child run
mlflow.end_run()
# End parent run if nested
if getattr(self, "_mlflow_nested", False):
mlflow.end_run()
[docs]
def mlflow_log_artifact(self, filepath: str):
"""Log an artifact (e.g., saved HDF5 file) to MLflow.
Parameters
----------
filepath : str
Path to the file to log as artifact.
"""
mlflow.log_artifact(filepath)