Source code for utils.mlflow_io

"""MLflow I/O utilities for fetching runs and artifacts.

This module provides helpers for retrieving experiment data from
MLflow/Databricks and downloading artifacts to the local data directory.
"""

import os
from pathlib import Path
from typing import List, Optional
import mlflow
import pandas as pd


[docs] def setup_mlflow_auth(): """Configure MLflow authentication. Uses DATABRICKS_TOKEN environment variable if available (for CI), otherwise falls back to interactive login. """ token = os.environ.get("DATABRICKS_TOKEN") if token: # CI environment - set both host and token for Databricks auth host = "https://dbc-6756e917-e5fc.cloud.databricks.com" os.environ["DATABRICKS_HOST"] = host mlflow.set_tracking_uri("databricks") else: # Local environment - interactive login mlflow.login()
[docs] def load_runs( experiment: str, converged_only: bool = True, exclude_parent_runs: bool = True, ) -> pd.DataFrame: """Load runs from an MLflow experiment. Parameters ---------- experiment : str Experiment name (e.g., "HPC-FV-Solver" or full path "/Shared/ANA-P3/HPC-FV-Solver"). converged_only : bool, default True Only return runs where metrics.converged = 1. exclude_parent_runs : bool, default True Exclude parent runs (nested run containers). Returns ------- pd.DataFrame DataFrame with run info, parameters (params.*), and metrics (metrics.*). Examples -------- >>> df = load_runs("HPC-FV-Solver") >>> df[["run_id", "params.nx", "metrics.wall_time_seconds"]] """ # Normalize experiment name if not experiment.startswith("/"): # Note: Adapted for LSM Project 2, assuming same shared folder structure or user should adjust experiment = f"/Shared/LSM-Project-2/{experiment}" # Build filter string filters = [] if converged_only: filters.append("metrics.converged = 1") filter_string = " and ".join(filters) if filters else "" # Fetch runs df = mlflow.search_runs( experiment_names=[experiment], filter_string=filter_string, order_by=["start_time DESC"], ) # Filter out parent runs in pandas (MLflow filter doesn't handle None well) if exclude_parent_runs and "tags.is_parent" in df.columns: df = df[df["tags.is_parent"] != "true"] return df
[docs] def download_artifacts( experiment: str, output_dir: Path, converged_only: bool = True, artifact_filter: Optional[List[str]] = None, ) -> List[Path]: """Download artifacts from MLflow runs to local directory. Parameters ---------- experiment : str Experiment name (e.g., "HPC-FV-Solver"). output_dir : Path Directory to save artifacts. Files are named based on run parameters. converged_only : bool, default True Only download from converged runs. artifact_filter : list of str, optional Only download artifacts matching these patterns (e.g., ["*.h5", "*.png"]). If None, downloads all artifacts. Returns ------- list of Path Paths to downloaded files. Examples -------- >>> paths = download_artifacts("HPC-FV-Solver", Path("data/FV-Solver")) >>> print(paths) [Path('data/FV-Solver/LDC_N32_Re100.h5'), ...] """ output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) # Get runs df = load_runs(experiment, converged_only=converged_only) if df.empty: print(f"No runs found for {experiment}") return [] client = mlflow.tracking.MlflowClient() downloaded = [] for _, row in df.iterrows(): run_id = row["run_id"] # List artifacts artifacts = client.list_artifacts(run_id) for artifact in artifacts: # Apply filter if specified if artifact_filter: if not any(artifact.path.endswith(f) for f in artifact_filter): continue # Download to output directory local_path = client.download_artifacts(run_id, artifact.path, output_dir) downloaded.append(Path(local_path)) print(f" Downloaded: {artifact.path}") return downloaded
[docs] def download_artifacts_with_naming( experiment: str, output_dir: Path, converged_only: bool = True, ) -> List[Path]: """Download HDF5 artifacts with standardized naming. Names files as: POISSON_N{n}_Iter{iter}.h5 (Adapted for LSM) Parameters ---------- experiment : str Experiment name. output_dir : Path Directory to save artifacts. converged_only : bool, default True Only download from converged runs. Returns ------- list of Path Paths to downloaded files. """ import tempfile import shutil output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) df = load_runs(experiment, converged_only=converged_only) if df.empty: print(f"No runs found for {experiment}") return [] client = mlflow.tracking.MlflowClient() downloaded = [] for _, row in df.iterrows(): run_id = row["run_id"] # Extract parameters for naming - Adapting to typical Poisson params # Assuming 'n' is grid size, 'max_iter' or 'iterations' might be useful n = row.get("params.n", row.get("params.N", "unknown")) # List artifacts and find HDF5 files artifacts = client.list_artifacts(run_id) for artifact in artifacts: if artifact.path.endswith(".h5"): # Download to temp location first with tempfile.TemporaryDirectory() as tmpdir: tmp_path = client.download_artifacts(run_id, artifact.path, tmpdir) # Rename with standardized naming new_name = f"Poisson_N{n}_{artifact.path.split('/')[-1]}" final_path = output_dir / new_name shutil.copy(tmp_path, final_path) downloaded.append(final_path) print(f" {artifact.path} -> {new_name}") return downloaded