"""Spectral solver for lid-driven cavity using pseudospectral method.
This solver implements the PN-PN-2 method with:
- Velocities on full (Nx+1)×(Ny+1) Legendre-Gauss-Lobatto grid
- Pressure on reduced (Nx-1)×(Ny-1) inner grid
- Artificial compressibility for pressure-velocity coupling
- 4-stage RK4 explicit time stepping with adaptive CFL
"""
import numpy as np
from .base_solver import LidDrivenCavitySolver
from .datastructures import SpectralParameters, SpectralSolverFields
from spectral.spectral import LegendreLobattoBasis, ChebyshevLobattoBasis
[docs]
class SpectralSolver(LidDrivenCavitySolver):
"""Pseudospectral solver for lid-driven cavity problem.
Uses explicit time-stepping with artificial compressibility to solve
the incompressible Navier-Stokes equations on a Legendre-Gauss-Lobatto grid.
Parameters
----------
params : SpectralParameters
Parameters with physics (Re, lid velocity, domain size) and
spectral-specific settings (Nx, Ny, CFL, beta_squared, etc.).
"""
Parameters = SpectralParameters
def __init__(self, **kwargs):
"""Initialize spectral solver."""
super().__init__(**kwargs)
# Create spectral basis based on params
if self.params.basis_type.lower() == "chebyshev":
self.basis_x = ChebyshevLobattoBasis(domain=(0.0, self.params.Lx))
self.basis_y = ChebyshevLobattoBasis(domain=(0.0, self.params.Ly))
print("Using Chebyshev-Gauss-Lobatto basis (Zhang et al. 2010)")
elif self.params.basis_type.lower() == "legendre":
self.basis_x = LegendreLobattoBasis(domain=(0.0, self.params.Lx))
self.basis_y = LegendreLobattoBasis(domain=(0.0, self.params.Ly))
print("Using Legendre-Gauss-Lobatto basis")
else:
raise ValueError(
f"Unknown basis_type: {self.params.basis_type}. Use 'legendre' or 'chebyshev'"
)
# Setup grids and differentiation matrices
self._setup_grids()
self._build_diff_matrices()
# Cache grid shapes
self.shape_full = (self.params.nx + 1, self.params.ny + 1)
self.shape_inner = (self.params.nx - 1, self.params.ny - 1)
# Allocate internal solver arrays
n_nodes_full = self.shape_full[0] * self.shape_full[1]
n_nodes_inner = self.shape_inner[0] * self.shape_inner[1]
self.arrays = SpectralSolverFields.allocate(n_nodes_full, n_nodes_inner)
# Initialize output fields (base class handles this)
self._init_fields(x=self.x_full.ravel(), y=self.y_full.ravel())
# Create persistent 2D views (modifications affect underlying 1D arrays)
self.u_2d = self.arrays.u.reshape(self.shape_full)
self.v_2d = self.arrays.v.reshape(self.shape_full)
self.u_stage_2d = self.arrays.u_stage.reshape(self.shape_full)
self.v_stage_2d = self.arrays.v_stage.reshape(self.shape_full)
self.p_2d = self.arrays.p.reshape(self.shape_inner)
self.dp_dx_inner_2d = self.arrays.dp_dx_inner.reshape(self.shape_inner)
self.dp_dy_inner_2d = self.arrays.dp_dy_inner.reshape(self.shape_inner)
# Initialize lid velocity with corner smoothing
self._initialize_lid_velocity()
def _setup_grids(self):
"""Setup full and reduced grids using Legendre-Gauss-Lobatto nodes."""
# Full grid: (Nx+1) × (Ny+1)
x_nodes = self.basis_x.nodes(self.params.nx + 1)
y_nodes = self.basis_y.nodes(self.params.ny + 1)
self.x_full, self.y_full = np.meshgrid(x_nodes, y_nodes, indexing="ij")
# Reduced grid for pressure: (Nx-1) × (Ny-1) - interior points only
self.x_inner = x_nodes[1:-1]
self.y_inner = y_nodes[1:-1]
self.x_reduced, self.y_reduced = np.meshgrid(
self.x_inner, self.y_inner, indexing="ij"
)
# Grid spacing (minimum) for CFL calculation
self.dx_min = np.min(np.diff(x_nodes))
self.dy_min = np.min(np.diff(y_nodes))
def _apply_corner_smoothing(self, u_2d):
"""Apply corner smoothing to lid velocity on top boundary.
Parameters
----------
u_2d : np.ndarray
2D velocity array on full grid (Nx+1, Ny+1), modified in place
"""
if self.params.corner_smoothing > 0:
Nx = self.params.nx
smooth_width = int(self.params.corner_smoothing * Nx)
if smooth_width > 0:
# Vectorized corner smoothing
i_values = np.arange(smooth_width)
factors = 0.5 * (1 - np.cos(np.pi * i_values / smooth_width))
# Left and right corners
u_2d[i_values, -1] = factors * self.params.lid_velocity
u_2d[Nx - i_values, -1] = factors * self.params.lid_velocity
def _extrapolate_to_full_grid(self, inner_2d):
"""Extrapolate field from inner grid (Nx-1, Ny-1) to full grid (Nx+1, Ny+1).
Uses linear extrapolation to boundaries and averaging for corners.
Parameters
----------
inner_2d : np.ndarray
Field on inner grid, shape (Nx-1, Ny-1)
Returns
-------
full_2d : np.ndarray
Field on full grid, shape (Nx+1, Ny+1)
"""
full_2d = np.zeros(self.shape_full)
# Copy interior values
full_2d[1:-1, 1:-1] = inner_2d
# Extrapolate to boundaries (linear extrapolation)
# West/East boundaries
full_2d[0, 1:-1] = 2 * full_2d[1, 1:-1] - full_2d[2, 1:-1]
full_2d[-1, 1:-1] = 2 * full_2d[-2, 1:-1] - full_2d[-3, 1:-1]
# South/North boundaries
full_2d[1:-1, 0] = 2 * full_2d[1:-1, 1] - full_2d[1:-1, 2]
full_2d[1:-1, -1] = 2 * full_2d[1:-1, -2] - full_2d[1:-1, -3]
# Corners (average of neighbors)
full_2d[0, 0] = 0.5 * (full_2d[0, 1] + full_2d[1, 0])
full_2d[0, -1] = 0.5 * (full_2d[0, -2] + full_2d[1, -1])
full_2d[-1, 0] = 0.5 * (full_2d[-1, 1] + full_2d[-2, 0])
full_2d[-1, -1] = 0.5 * (full_2d[-1, -2] + full_2d[-2, -1])
return full_2d
def _build_diff_matrices(self):
"""Build spectral differentiation matrices using tensor products."""
Nx, Ny = self.params.nx, self.params.ny
# 1D differentiation matrices on full grid
x_nodes_full = self.basis_x.nodes(Nx + 1)
y_nodes_full = self.basis_y.nodes(Ny + 1)
Dx_1d = self.basis_x.diff_matrix(x_nodes_full) # (Nx+1) × (Nx+1)
Dy_1d = self.basis_y.diff_matrix(y_nodes_full) # (Ny+1) × (Ny+1)
# 2D differentiation via Kronecker products
# For meshgrid with indexing='ij': first index is x, second is y
Ix = np.eye(Nx + 1)
Iy = np.eye(Ny + 1)
self.Dx = np.kron(Dx_1d, Iy) # d/dx on full grid
self.Dy = np.kron(Ix, Dy_1d) # d/dy on full grid
# Laplacian: ∇² = ∂²/∂x² + ∂²/∂y²
Dxx_1d = Dx_1d @ Dx_1d
Dyy_1d = Dy_1d @ Dy_1d
self.Dxx = np.kron(Dxx_1d, Iy)
self.Dyy = np.kron(Ix, Dyy_1d)
self.Laplacian = self.Dxx + self.Dyy
# 1D differentiation matrices on reduced grid (for pressure)
Dx_inner_1d = self.basis_x.diff_matrix(self.x_inner) # (Nx-1) × (Nx-1)
Dy_inner_1d = self.basis_y.diff_matrix(self.y_inner) # (Ny-1) × (Ny-1)
# 2D differentiation on reduced grid
Ix_inner = np.eye(Nx - 1)
Iy_inner = np.eye(Ny - 1)
self.Dx_inner = np.kron(Dx_inner_1d, Iy_inner)
self.Dy_inner = np.kron(Ix_inner, Dy_inner_1d)
def _initialize_lid_velocity(self):
"""Initialize lid velocity with corner smoothing to avoid spurious modes."""
# Set top boundary (y = Ly) to lid velocity
self.u_2d[:, -1] = self.params.lid_velocity
# Apply corner smoothing using smooth transition
self._apply_corner_smoothing(self.u_2d)
def _interpolate_pressure_gradient(self):
"""Compute pressure gradient on inner grid and interpolate to full grid.
PN-PN-2 method:
1. Pressure p exists on (Nx-1) × (Ny-1) inner grid
2. Compute ∂p/∂x and ∂p/∂y on inner grid using inner diff matrices
3. Extrapolate gradients to boundaries on full grid
"""
# Compute pressure gradient on inner grid (this is where pressure actually lives!)
self.arrays.dp_dx_inner[:] = self.Dx_inner @ self.arrays.p
self.arrays.dp_dy_inner[:] = self.Dy_inner @ self.arrays.p
# Extrapolate to full grid (using 2D views)
dp_dx_2d = self._extrapolate_to_full_grid(self.dp_dx_inner_2d)
dp_dy_2d = self._extrapolate_to_full_grid(self.dp_dy_inner_2d)
# Store flattened on full grid
self.arrays.dp_dx[:] = dp_dx_2d.ravel()
self.arrays.dp_dy[:] = dp_dy_2d.ravel()
def _compute_residuals(self, u, v, p):
"""Compute RHS residuals for pseudo time-stepping.
PN-PN-2 method:
- u, v on full (Nx+1) × (Ny+1) grid
- p on inner (Nx-1) × (Ny-1) grid
- R_u, R_v on full grid
- R_p on inner grid
Parameters
----------
u, v : np.ndarray
Current velocity fields on full grid
p : np.ndarray
Current pressure field on INNER grid
Updates
-------
self.arrays.R_u, self.arrays.R_v (full grid), self.arrays.R_p (inner grid)
"""
# Compute velocity derivatives on full grid
self.arrays.du_dx[:] = self.Dx @ u
self.arrays.du_dy[:] = self.Dy @ u
self.arrays.dv_dx[:] = self.Dx @ v
self.arrays.dv_dy[:] = self.Dy @ v
# Compute Laplacians on full grid
self.arrays.lap_u[:] = self.Laplacian @ u
self.arrays.lap_v[:] = self.Laplacian @ v
# Compute pressure gradient from inner grid p and interpolate to full grid
self._interpolate_pressure_gradient()
# Momentum residuals on FULL grid: R = -(u·∇)u - ∇p + (1/Re)∇²u
conv_u = u * self.arrays.du_dx + v * self.arrays.du_dy
conv_v = u * self.arrays.dv_dx + v * self.arrays.dv_dy
nu = 1.0 / self.params.Re
self.arrays.R_u[:] = -conv_u - self.arrays.dp_dx + nu * self.arrays.lap_u
self.arrays.R_v[:] = -conv_v - self.arrays.dp_dy + nu * self.arrays.lap_v
# Continuity residual on INNER grid: R_p = -β²(∂u/∂x + ∂v/∂y)
# Compute divergence on full grid, then restrict to inner grid
divergence_full = self.arrays.du_dx + self.arrays.dv_dy
divergence_2d = divergence_full.reshape(self.shape_full)
divergence_inner = divergence_2d[1:-1, 1:-1].ravel()
# Pressure residual on inner grid
self.arrays.R_p[:] = -self.params.beta_squared * divergence_inner
def _enforce_boundary_conditions(self, u, v):
"""Enforce no-slip boundary conditions on all walls.
Parameters
----------
u, v : np.ndarray
Velocity fields (1D flat arrays) to modify in place
"""
# Create 2D views (cheap - just metadata)
u_2d = u.reshape(self.shape_full)
v_2d = v.reshape(self.shape_full)
# No-slip on all boundaries
u_2d[0, :] = 0.0 # West
u_2d[-1, :] = 0.0 # East
u_2d[:, 0] = 0.0 # South
v_2d[0, :] = 0.0 # West
v_2d[-1, :] = 0.0 # East
v_2d[:, 0] = 0.0 # South
v_2d[:, -1] = 0.0 # North
# Moving lid on north boundary
u_2d[:, -1] = self.params.lid_velocity
self._apply_corner_smoothing(u_2d)
def _compute_adaptive_timestep(self):
"""Compute adaptive pseudo-timestep based on CFL condition.
Returns
-------
float
Adaptive timestep ∆τ
"""
# Maximum velocities (avoid division by zero)
u_max = max(np.max(np.abs(self.arrays.u)), self.params.lid_velocity)
v_max = max(np.max(np.abs(self.arrays.v)), 1e-10)
# Wave speeds: λ_x and λ_y from equation (9)
nu = 1.0 / self.params.Re
lambda_x = (
u_max + np.sqrt(u_max**2 + self.params.beta_squared)
) / self.dx_min + nu / self.dx_min**2
lambda_y = (
v_max + np.sqrt(v_max**2 + self.params.beta_squared)
) / self.dy_min + nu / self.dy_min**2
return self.params.CFL / (lambda_x + lambda_y)
[docs]
def step(self):
"""Perform one RK4 pseudo time-step.
PN-PN-2: Updates u, v on full grid and p on inner grid.
Returns
-------
u, v, p : np.ndarray
Updated velocities (full grid) and pressure (inner grid)
"""
a = self.arrays # Shorthand
# Swap buffers at start (for residual calculation in solve())
a.u, a.u_prev = a.u_prev, a.u
a.v, a.v_prev = a.v_prev, a.v
# Compute adaptive timestep
dt = self._compute_adaptive_timestep()
# 4-stage RK4: φ^(i) = φ^n + α_i·∆τ·R(φ^(i-1))
rk4_coeffs = [0.25, 1.0 / 3.0, 0.5, 1.0]
u_in, v_in, p_in = a.u, a.v, a.p
for i, alpha in enumerate(rk4_coeffs):
self._compute_residuals(u_in, v_in, p_in)
# Last stage: write to final arrays; otherwise use staging arrays
if i < 3:
a.u_stage[:] = a.u + alpha * dt * a.R_u
a.v_stage[:] = a.v + alpha * dt * a.R_v
a.p_stage[:] = a.p + alpha * dt * a.R_p
self._enforce_boundary_conditions(a.u_stage, a.v_stage)
u_in, v_in, p_in = a.u_stage, a.v_stage, a.p_stage
else:
a.u[:] = a.u + alpha * dt * a.R_u
a.v[:] = a.v + alpha * dt * a.R_v
a.p[:] = a.p + alpha * dt * a.R_p
self._enforce_boundary_conditions(a.u, a.v)
return a.u, a.v, a.p
def _finalize_fields(self):
"""Copy final solution to output fields.
Override base class because PN-PN-2 pressure lives on inner grid
and needs interpolation to full grid for output.
"""
self.fields.u[:] = self.arrays.u
self.fields.v[:] = self.arrays.v
# Interpolate pressure from inner to full grid
p_full_2d = self._extrapolate_to_full_grid(self.p_2d)
self.fields.p[:] = p_full_2d.ravel()
def _compute_algebraic_residuals(self):
"""Return algebraic residuals from pseudo-time stepping.
For spectral solver, the algebraic residuals are the RHS of the
time-stepping equations (R_u, R_v, R_p) computed during step().
"""
return {
"u_residual": np.linalg.norm(self.arrays.R_u),
"v_residual": np.linalg.norm(self.arrays.R_v),
"continuity_residual": np.linalg.norm(self.arrays.R_p),
}