Source code for ldc.fv_solver

"""Finite volume solver for lid-driven cavity.

This module implements a collocated finite volume solver using
SIMPLE algorithm for pressure-velocity coupling.
"""

import numpy as np
from scipy.sparse import csr_matrix

from .base_solver import LidDrivenCavitySolver
from .datastructures import FVParameters, FVSolverFields

from fv.assembly.convection_diffusion_matrix import assemble_diffusion_convection_matrix
from fv.discretization.gradient.structured_gradient import (
    compute_cell_gradients_structured,
)
from fv.linear_solvers.petsc_solver import petsc_solver
from fv.assembly.rhie_chow import mdot_calculation, rhie_chow_velocity
from fv.assembly.pressure_correction_eq_assembly import (
    assemble_pressure_correction_matrix,
)
from fv.assembly.divergence import compute_divergence_from_face_fluxes
from fv.core.corrections import velocity_correction
from fv.core.helpers import (
    bold_Dv_calculation,
    interpolate_to_face,
    interpolate_velocity_to_face,
    relax_momentum_equation,
)


[docs] class FVSolver(LidDrivenCavitySolver): """Finite volume solver for lid-driven cavity problem. This solver uses a collocated grid arrangement with Rhie-Chow interpolation for pressure-velocity coupling using the SIMPLE algorithm. Parameters ---------- params : FVParameters Parameters with physics (Re, lid velocity, domain size) and FV-specific settings (nx, ny, convection scheme, etc.). """ Parameters = FVParameters rho = 1.0 # Constant fluid density def __init__(self, **kwargs): """Initialize FV solver.""" super().__init__(**kwargs) # Create mesh from meshing.simple_structured import create_structured_mesh_2d self.mesh = create_structured_mesh_2d( nx=self.params.nx, ny=self.params.ny, Lx=self.params.Lx, Ly=self.params.Ly, lid_velocity=self.params.lid_velocity, ) # Get dimensions from mesh n_cells = self.mesh.cell_volumes.shape[0] n_faces = self.mesh.internal_faces.shape[0] + self.mesh.boundary_faces.shape[0] # Compute fluid properties self.mu = self.rho * self.params.lid_velocity * self.params.Lx / self.params.Re # Allocate internal solver arrays self.arrays = FVSolverFields.allocate(n_cells, n_faces) # Initialize output fields (base class handles this) self._init_fields( x=self.mesh.cell_centers[:, 0], y=self.mesh.cell_centers[:, 1] ) # Cache commonly used values self.n_cells = n_cells self.shape_full = (self.params.ny, self.params.nx) self.dx_min = self.params.Lx / self.params.nx self.dy_min = self.params.Ly / self.params.ny def _solve_momentum_equation( self, component_idx, phi, grad_phi, phi_prev_iter, grad_p_component, ksp ): """Solve a single momentum equation (u or v). Parameters ---------- component_idx : int Component index (0 for u, 1 for v) phi : ndarray Current velocity component (u or v) grad_phi : ndarray Gradient of velocity component phi_prev_iter : ndarray Previous iteration velocity component grad_p_component : ndarray Pressure gradient component (x or y) ksp : PETSc.KSP or None Reusable KSP solver object (updated in-place) Returns ------- phi_star : ndarray Predicted velocity component A_diag : ndarray Diagonal of momentum matrix (needed for pressure correction) ksp : PETSc.KSP Updated KSP solver object for reuse """ # Assemble momentum equation row, col, data, b = assemble_diffusion_convection_matrix( self.mesh, self.arrays.mdot, grad_phi, self.rho, self.mu, component_idx, phi=phi, scheme=self.params.convection_scheme, limiter=self.params.limiter, ) A = csr_matrix((data, (row, col)), shape=(self.n_cells, self.n_cells)) A_diag = A.diagonal() rhs = b - grad_p_component * self.mesh.cell_volumes # Apply under-relaxation relaxed_A_diag, rhs = relax_momentum_equation( rhs, A_diag, phi_prev_iter, self.params.alpha_uv ) A.setdiag(relaxed_A_diag) # Solve with PETSc phi_star, ksp = petsc_solver( A, rhs, ksp=ksp, tolerance=self.params.linear_solver_tol, solver_type="bcgs", preconditioner="gamg", ) return phi_star, A_diag, ksp
[docs] def step(self): """Perform one SIMPLE iteration. Returns ------- u, v, p : np.ndarray Updated velocity and pressure fields """ a = self.arrays # Shorthand for readability # Swap buffers at start (zero-copy) a.u, a.u_prev = a.u_prev, a.u a.v, a.v_prev = a.v_prev, a.v # Compute pressure gradient (no limiter for pressure) - reuse buffers compute_cell_gradients_structured( self.mesh, a.p, use_limiter=False, out=a.grad_p ) interpolate_to_face(self.mesh, a.grad_p, out=a.grad_p_bar) # Compute velocity gradients (with limiter) - reuse buffers compute_cell_gradients_structured( self.mesh, a.u_prev, use_limiter=True, out=a.grad_u ) compute_cell_gradients_structured( self.mesh, a.v_prev, use_limiter=True, out=a.grad_v ) # Solve momentum equations (reuse KSP objects) u_star, A_u_diag, a.ksp_u = self._solve_momentum_equation( 0, a.u_prev, a.grad_u, a.u_prev, a.grad_p[:, 0], a.ksp_u ) v_star, A_v_diag, a.ksp_v = self._solve_momentum_equation( 1, a.v_prev, a.grad_v, a.v_prev, a.grad_p[:, 1], a.ksp_v ) # Pressure correction - reuse buffers bold_Dv_calculation(self.mesh, A_u_diag, A_v_diag, out=a.bold_D) interpolate_to_face(self.mesh, a.bold_D, out=a.bold_D_bar) rhie_chow_velocity( self.mesh, u_star, v_star, a.grad_p_bar, a.grad_p, a.bold_D_bar, out=a.U_star_rc, ) mdot_calculation(self.mesh, self.rho, a.U_star_rc, out=a.mdot_star) # Assemble pressure correction matrix row, col, data = assemble_pressure_correction_matrix(self.mesh, self.rho) rhs_p = -compute_divergence_from_face_fluxes(self.mesh, a.mdot_star) # Solve pressure correction with PETSc (handles nullspace internally) A_p = csr_matrix((data, (row, col)), shape=(self.n_cells, self.n_cells)) p_prime, a.ksp_p = petsc_solver( A_p, rhs_p, ksp=a.ksp_p, tolerance=self.params.linear_solver_tol, solver_type="bcgs", preconditioner="gamg", remove_nullspace=True, ) # Velocity and pressure corrections - reuse buffers compute_cell_gradients_structured( self.mesh, p_prime, use_limiter=False, out=a.grad_p_prime ) velocity_correction( self.mesh, a.grad_p_prime, a.bold_D, u_prime=a.u_prime, v_prime=a.v_prime ) # Update velocity and pressure (in-place operations into fresh buffers) np.add(u_star, a.u_prime, out=a.u) np.add(v_star, a.v_prime, out=a.v) a.p += self.params.alpha_p * p_prime # Update mass flux - reuse buffers interpolate_velocity_to_face( self.mesh, a.u_prime, a.v_prime, out=a.U_prime_face ) mdot_calculation(self.mesh, self.rho, a.U_prime_face, out=a.mdot_prime) np.add(a.mdot_star, a.mdot_prime, out=a.mdot) # No copy needed! u and v now have new values, u_prev and v_prev have old values # Next iteration they will swap again return a.u, a.v, a.p
def _compute_algebraic_residuals(self): """Return algebraic residuals for SIMPLE algorithm. For FV-SIMPLE: - Momentum residuals: velocity correction magnitudes ||u'||, ||v'|| - Continuity residual: mass imbalance ||div(mdot)|| """ # Mass imbalance (continuity): divergence of corrected mass flux mass_imbalance = compute_divergence_from_face_fluxes( self.mesh, self.arrays.mdot ) return { "u_residual": np.linalg.norm(self.arrays.u_prime), "v_residual": np.linalg.norm(self.arrays.v_prime), "continuity_residual": np.linalg.norm(mass_imbalance), }