Function bodies 261 total
_compute_rhat function · python · L93-L102 (10 LOC)src/models/demo_bivariate.py
def _compute_rhat(da) -> float:
"""Compute R-hat from DataArray."""
try:
rhat = az.rhat(da)
if rhat is None:
return np.nan
val = rhat.values
return float(val.item() if val.size == 1 else val.mean())
except Exception:
return np.nanrun_cumulative_lag_analysis function · python · L832-L1022 (191 LOC)src/models/demo_bivariate.py
def run_cumulative_lag_analysis(
args,
workout_vars: List[str],
output_dir: Path,
skip_plots: bool = False,
) -> Dict[str, Any]:
"""Run cumulative lag model analysis for each variable."""
print("\n" + "=" * 70)
print("CUMULATIVE LAG MODEL ANALYSIS")
print("=" * 70)
# Generate lag days list based on window and step
lag_days_list = np.arange(0, args.lag_window + args.lag_step, args.lag_step)
lag_days_list = lag_days_list[lag_days_list <= args.lag_window] # ensure up to window
lag_days_list = lag_days_list[lag_days_list > 0] # exclude zero lag? include zero if window starts at 0
# Include zero lag if step starts at 0
if 0 in lag_days_list:
lag_days_list = lag_days_list[lag_days_list >= 0]
print(f"Lag window: {args.lag_window} days, step: {args.lag_step} days")
print(f"Lags to include: {lag_days_list.tolist()} days")
# Load weight data (common for all variables)
print("\n1. Loading weight data...")
df_wcreate_comparison_report function · python · L1025-L1050 (26 LOC)src/models/demo_bivariate.py
def create_comparison_report(
results: Dict[str, Any],
output_dir: Path,
model_type: str,
) -> None:
"""Create markdown report comparing results across variables/lags."""
report_path = output_dir / f"{model_type}_comparison_report.md"
with open(report_path, 'w') as f:
f.write(f"# Cross-Lagged Model Comparison Report\n\n")
f.write(f"**Model type**: {model_type}\n")
f.write(f"**Generated**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
if model_type == "fixed":
_write_fixed_lag_report(f, results, output_dir)
elif model_type == "estimated":
_write_estimated_lag_report(f, results, output_dir)
elif model_type == "cumulative":
_write_cumulative_lag_report(f, results, output_dir)
else:
f.write("Unknown model type\n")
f.write("\n---\n")
f.write("*Report generated by demo_bivariate.py*\n")
print(f"Report saved to {report_path}")_write_fixed_lag_report function · python · L1053-L1153 (101 LOC)src/models/demo_bivariate.py
def _write_fixed_lag_report(f, results: Dict[str, Any], output_dir: Path):
"""Write fixed lag model report section."""
if not results:
f.write("No results available.\n")
return
f.write("## Fixed Lag Model Comparison\n\n")
# Collect all valid results
all_valid_results = []
for var_name, var_results in results.items():
for r in var_results:
if 'beta_mean' in r:
all_valid_results.append(r)
if not all_valid_results:
f.write("No valid model results found.\n")
return
df_all = pd.DataFrame(all_valid_results)
# Summary table
f.write("### Summary Table\n\n")
f.write("| Variable | Lag (days) | β (std) | 95% CI | β (orig) | 95% CI | WAIC | LOO |\n")
f.write("|----------|------------|---------|--------|----------|--------|------|-----|\n")
for _, row in df_all.iterrows():
beta_std_ci = f"{row['beta_mean']:.3f} [{row['beta_ci_low']:.3f}, {row['beta_ci_high']:.3f}]"
_write_estimated_lag_report function · python · L1156-L1240 (85 LOC)src/models/demo_bivariate.py
def _write_estimated_lag_report(f, results: Dict[str, Any], output_dir: Path):
"""Write estimated lag model report section."""
f.write("## Estimated Lag Model Analysis\n\n")
if not results:
f.write("No results available.\n")
return
# Collect valid results
valid_results = []
for var_name, result in results.items():
if 'beta_mean' in result and 'lag_days_mean' in result:
valid_results.append(result)
if not valid_results:
f.write("No valid model results found.\n")
return
df = pd.DataFrame(valid_results)
# Summary table
f.write("### Summary Table\n\n")
f.write("| Variable | β (std) | 95% CI | β (orig) | 95% CI | Lag τ (days) | 95% CI | WAIC | LOO |\n")
f.write("|----------|---------|--------|----------|--------|--------------|--------|------|-----|\n")
for _, row in df.iterrows():
beta_std_ci = f"{row['beta_mean']:.3f} [{row['beta_ci_low']:.3f}, {row['beta_ci_high']:.3f}]"
_write_cumulative_lag_report function · python · L1243-L1319 (77 LOC)src/models/demo_bivariate.py
def _write_cumulative_lag_report(f, results: Dict[str, Any], output_dir: Path):
"""Write cumulative lag model report section."""
f.write("## Cumulative Lag Model Analysis\n\n")
if not results:
f.write("No results available.\n")
return
# Collect valid results
valid_results = []
for var_name, result in results.items():
if 'beta_mean' in result:
valid_results.append(result)
if not valid_results:
f.write("No valid model results found.\n")
return
df = pd.DataFrame(valid_results)
# Summary table
f.write("### Summary Table\n\n")
f.write("| Variable | Lag Window | Lag Step | β (std) | 95% CI | β (orig) | 95% CI | WAIC | LOO |\n")
f.write("|----------|------------|----------|---------|--------|----------|--------|------|-----|\n")
for _, row in df.iterrows():
lag_window = row.get('lag_window', 'N/A')
lag_step = row.get('lag_step', 'N/A')
beta_std_ci = f"{row['betshould_show_plots function · python · L31-L36 (6 LOC)src/models/demo_cyclic_single.py
def should_show_plots() -> bool:
"""Return True if plots should be displayed interactively.
Modified to always return False - plots are saved to disk only.
"""
return FalseGenerated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
write_markdown_report function · python · L46-L211 (166 LOC)src/models/demo_cyclic_single.py
def write_markdown_report(
output_dir: Path,
idata,
df,
stan_data,
sigma: float,
daily_amplitude: float,
prop_variance_daily: float,
adapt_delta: float,
max_treedepth: int,
use_sparse: bool,
n_inducing_points: int,
chains: int,
iter_warmup: int,
iter_sampling: int,
skip_plots: bool,
):
"""Write comprehensive markdown report with embedded images and model summary.
Args:
output_dir: Directory containing generated images and report
idata: ArviZ InferenceData object
df: Original DataFrame
stan_data: Stan data dictionary
sigma: Measurement error (lbs)
daily_amplitude: Daily amplitude (lbs)
prop_variance_daily: Proportion of variance from daily component
adapt_delta: Adapt delta parameter used
max_treedepth: Maximum tree depth used
use_sparse: Whether sparse GP was used
n_inducing_points: Number of inducing points (if sparse)
chashould_show_plots function · python · L67-L72 (6 LOC)src/models/demo_general.py
def should_show_plots() -> bool:
"""Return True if plots should be displayed interactively.
Modified to always return False - plots are saved to disk only.
"""
return Falseextract_all_parameters function · python · L75-L187 (113 LOC)src/models/demo_general.py
def extract_all_parameters(idata, stan_data, model_name: str) -> Dict[str, Any]:
"""Extract all model parameters with summary statistics.
Args:
idata: ArviZ InferenceData object
stan_data: Stan data dictionary
model_name: 'daily' or 'weekly'
Returns:
Dictionary with parameter summaries organized by category
"""
import numpy as np
y_sd = stan_data["_y_sd"]
stan_data["_y_mean"]
def get_param_stats(param_name, scale_factor=1.0, transform_back=True):
"""Helper to extract parameter statistics."""
if param_name not in idata.posterior:
return None
samples = idata.posterior[param_name].values
# Reshape to (samples, ...)
n_chains, n_draws = samples.shape[:2]
samples_flat = samples.reshape(n_chains * n_draws, *samples.shape[2:])
# Apply scaling if needed
if transform_back and scale_factor != 1.0:
samples_flat = samples_flat * scale_factor
plot_sigma_comparison function · python · L40-L90 (51 LOC)src/models/demo_optimized_cyclic_spline.py
def plot_sigma_comparison(idata_a, idata_b, stan_data, output_path=None,
model_a_name="Model A", model_b_name="Model B"):
"""Plot sigma comparison between two models.
Args:
idata_a: InferenceData from first model
idata_b: InferenceData from second model
stan_data: Stan data dictionary
output_path: Path to save plot (optional)
model_a_name: Name for first model (for legend)
model_b_name: Name for second model (for legend)
"""
# Extract sigma estimates
sigma_a = idata_a.posterior["sigma"].values.flatten() * stan_data["_y_sd"]
sigma_b = idata_b.posterior["sigma"].values.flatten() * stan_data["_y_sd"]
# Create figure
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
# Plot 1: Sigma comparison histogram
ax = axes[0]
ax.hist(sigma_a, bins=30, alpha=0.5, label=f"{model_a_name}: {sigma_a.mean():.2f} ± {sigma_a.std():.2f} lbs")
ax.hist(sigma_b, bins=30, alpha=0.5, label=f"{should_show_plots function · python · L43-L48 (6 LOC)src/models/demo_spline_single.py
def should_show_plots() -> bool:
"""Return True if plots should be displayed interactively.
Modified to always return False - plots are saved to disk only.
"""
return False_compute_cache_key function · python · L26-L104 (79 LOC)src/models/fit_weight.py
def _compute_cache_key(
data_dir: Path,
stan_file: Path,
chains: int,
iter_warmup: int,
iter_sampling: int,
adapt_delta: float = None,
max_treedepth: int = None,
alpha_prior_sd: float = None,
rho_prior_shape: float = None,
rho_prior_scale: float = None,
sigma_prior_sd: float = None,
fourier_harmonics: int = None,
weekly_harmonics: int = None,
use_sparse: int = None,
n_inducing_points: int = None,
inducing_point_method: str = None,
include_prediction_grid: bool = None,
prediction_hour: float = None,
prediction_hour_step: float = None,
prediction_step_days: int = None,
) -> tuple[str, dict]:
"""Compute SHA256 hash of model configuration for caching.
Returns:
Tuple of (cache_key, config_dict) where config_dict contains all
parameters used to compute the hash.
"""
# Include file modification times
data_mtime = _get_data_file_mtime(data_dir)
stan_mtime = Path(stan_file).s_save_cache function · python · L107-L160 (54 LOC)src/models/fit_weight.py
def _save_cache(
cache_dir: Path,
fit: CmdStanMCMC,
idata: az.InferenceData,
df: pd.DataFrame,
stan_data: dict,
config: dict,
):
"""Save model results to cache directory."""
cache_dir = Path(cache_dir)
cache_dir.mkdir(parents=True, exist_ok=True)
# Save CSV files
csv_dir = cache_dir / "csv"
csv_dir.mkdir(exist_ok=True)
for csv_file in fit.runset.csv_files:
shutil.copy2(csv_file, csv_dir / Path(csv_file).name)
# Save InferenceData as netcdf
idata.to_netcdf(cache_dir / "idata.nc")
# Save DataFrame as parquet
df.to_parquet(cache_dir / "df.parquet")
# Save stan_data as JSON (convert numpy arrays)
stan_data_serializable = {}
for key, value in stan_data.items():
if isinstance(value, np.ndarray):
stan_data_serializable[key] = value.tolist()
elif isinstance(value, np.generic):
# Convert numpy scalars to Python scalars (e.g., np.float64, np.int64, np.bool_)
_load_cache function · python · L163-L202 (40 LOC)src/models/fit_weight.py
def _load_cache(cache_dir: Path, stan_file: Path):
"""Load model results from cache directory."""
cache_dir = Path(cache_dir)
# Load metadata
with open(cache_dir / "metadata.json", "r") as f:
metadata = json.load(f)
# Load DataFrame
df = pd.read_parquet(cache_dir / "df.parquet")
# Load stan_data
with open(cache_dir / "stan_data.json", "r") as f:
stan_data_loaded = json.load(f)
# Convert lists back to numpy arrays for numeric fields
stan_data = {}
for key, value in stan_data_loaded.items():
if isinstance(value, list) and key not in ["t", "y"]: # t and y are lists in Stan data
# Check if it's a numeric list
if value and isinstance(value[0], (int, float)):
stan_data[key] = np.array(value)
else:
stan_data[key] = value
else:
stan_data[key] = value
# Load InferenceData
idata = az.from_netcdf(cache_dir / "idata.nc")
idataRepobility analyzer · published findings · https://repobility.com
_cache_exists function · python · L205-L214 (10 LOC)src/models/fit_weight.py
def _cache_exists(cache_dir: Path) -> bool:
"""Check if cache directory contains all required files."""
required = [
cache_dir / "metadata.json",
cache_dir / "idata.nc",
cache_dir / "df.parquet",
cache_dir / "stan_data.json",
cache_dir / "csv",
]
return all(path.exists() for path in required)extract_predictions function · python · L217-L270 (54 LOC)src/models/fit_weight.py
def extract_predictions(idata, stan_data):
"""Extract predictions from InferenceData and back-transform to original scale.
Args:
idata: ArviZ InferenceData object from fitted model
stan_data: Stan data dictionary with scaling parameters
Returns:
Dictionary with prediction results if available, else empty dict.
Keys include:
- t_pred: prediction time points (days since start)
- t_pred_scaled: scaled time points
- hour_of_day_pred: hour of day for predictions
- f_pred_mean, f_pred_lower, f_pred_upper: mean and 95% CI for f_pred
- y_pred_mean, y_pred_lower, y_pred_upper: mean and 95% CI for y_pred
All values in original scale (lbs).
"""
N_pred = stan_data.get("N_pred", 0)
if N_pred == 0 or "f_pred" not in idata.posterior_predictive:
return {}
# Ensure numeric types (defensive conversion for cached data)
y_mean = float(stan_data["_y_mean"])
y_sd = float(stan_data["fit_weight_model function · python · L273-L356 (84 LOC)src/models/fit_weight.py
def fit_weight_model(
data_dir: Path | str = "data",
stan_file: Path | str = "stan/weight_gp.stan",
output_dir: Path | str = "output",
chains: int = 4,
iter_warmup: int = 500,
iter_sampling: int = 500,
cache: bool = True,
force_refit: bool = False,
) -> tuple:
"""Fit the GP weight model and return results.
Args:
data_dir: Path to data directory
stan_file: Path to Stan model file
output_dir: Directory for output files
chains: Number of MCMC chains
iter_warmup: Warmup iterations per chain
iter_sampling: Sampling iterations per chain
cache: Whether to cache model results for faster reloading
force_refit: Force re-fitting even if cached results exist
Returns:
Tuple of (fit, idata, df, stan_data) where:
- fit: CmdStanMCMC object
- idata: ArviZ InferenceData object
- df: Original DataFrame
- stan_data: Stan data dictionary
"""
output_diplot_weight_fit function · python · L359-L395 (37 LOC)src/models/fit_weight.py
def plot_weight_fit(idata, df, stan_data, output_path: Path | str = None):
"""Plot the fitted weight model with uncertainty."""
fig, axes = plt.subplots(2, 1, figsize=(12, 8))
# Back-transform predictions to original scale
y_mean = stan_data["_y_mean"]
y_sd = stan_data["_y_sd"]
# Extract posterior mean and credible intervals for f
f_samples = idata.posterior["f"].values
f_mean = f_samples.mean(axis=(0, 1)) * y_sd + y_mean
f_lower = np.percentile(f_samples, 2.5, axis=(0, 1)) * y_sd + y_mean
f_upper = np.percentile(f_samples, 97.5, axis=(0, 1)) * y_sd + y_mean
# Plot 1: Fit with uncertainty
ax = axes[0]
ax.scatter(df["date"], df["weight_lbs"], alpha=0.5, s=20, label="Observations")
ax.plot(df["date"], f_mean, "k-", linewidth=2, label="GP mean")
ax.fill_between(df["date"], f_lower, f_upper, alpha=0.3, color="blue", label="95% CI")
ax.set_xlabel("Date")
ax.set_ylabel("Weight (lbs)")
ax.set_title("Weight Over Time - print_summary function · python · L398-L417 (20 LOC)src/models/fit_weight.py
def print_summary(idata, stan_data):
"""Print model summary statistics."""
print("\n" + "=" * 60)
print("MODEL SUMMARY")
print("=" * 60)
# Key parameters
summary = az.summary(idata, var_names=["alpha", "rho", "sigma", "trend_change"])
print("\nKey parameters:")
print(summary)
# Diagnostics
print("\nDiagnostics:")
print(f" R-hat max: {summary['r_hat'].max():.3f}")
print(f" ESS min: {summary['ess_bulk'].min():.0f}")
# Back-transform trend change
y_sd = stan_data["_y_sd"]
trend_change = idata.posterior["trend_change"].values.mean() * y_sd
print(f"\nTrend change (original scale): {trend_change:.2f} lbs")fit_weight_model_flexible function · python · L420-L520 (101 LOC)src/models/fit_weight.py
def fit_weight_model_flexible(
data_dir: Path | str = "data",
stan_file: Path | str = "stan/weight_gp_flexible.stan",
output_dir: Path | str = "output",
chains: int = 4,
iter_warmup: int = 500,
iter_sampling: int = 500,
alpha_prior_sd: float = 1.0,
rho_prior_shape: float = 5.0,
rho_prior_scale: float = 1.0,
sigma_prior_sd: float = 0.5,
cache: bool = True,
force_refit: bool = False,
) -> tuple:
"""Fit the GP weight model with flexible priors and return results.
Args:
data_dir: Path to data directory
stan_file: Path to Stan model file (should be the flexible version)
output_dir: Directory for output files
chains: Number of MCMC chains
iter_warmup: Warmup iterations per chain
iter_sampling: Sampling iterations per chain
alpha_prior_sd: Standard deviation for alpha ~ normal(0, sd)
rho_prior_shape: Shape parameter for rho ~ inv_gamma(shape, scale)
rho_prior_scale:generate_prior_predictive function · python · L523-L612 (90 LOC)src/models/fit_weight.py
def generate_prior_predictive(
t,
n_samples=100,
alpha_prior_sd=1.0,
rho_prior_shape=5.0,
rho_prior_scale=1.0,
sigma_prior_sd=0.5,
seed=None,
):
"""Generate prior predictive samples for the GP weight model.
Samples parameters from priors, then generates GP function values and
prior predictive observations.
Args:
t: Array of time points (scaled to [0,1])
n_samples: Number of prior samples to generate
alpha_prior_sd: Standard deviation for alpha ~ normal(0, sd)
rho_prior_shape: Shape parameter for rho ~ inv_gamma(shape, scale)
rho_prior_scale: Scale parameter for rho ~ inv_gamma(shape, scale)
sigma_prior_sd: Standard deviation for sigma ~ half-normal(0, sd)
seed: Random seed for reproducibility
Returns:
Dictionary with prior predictive samples and parameter samples:
- y_prior_rep: Prior predictive samples (n_samples, N)
- f_prior_rep: GP function samples (n_sfit_weight_model_cyclic function · python · L615-L714 (100 LOC)src/models/fit_weight.py
def fit_weight_model_cyclic(
data_dir: Path | str = "data",
stan_file: Path | str = "stan/weight_gp_cyclic.stan",
output_dir: Path | str = "output",
chains: int = 4,
iter_warmup: int = 500,
iter_sampling: int = 500,
cache: bool = True,
force_refit: bool = False,
) -> tuple:
"""Fit the cyclic GP weight model (trend + daily) and return results.
Args:
data_dir: Path to data directory
stan_file: Path to Stan model file (cyclic version)
output_dir: Directory for output files
chains: Number of MCMC chains
iter_warmup: Warmup iterations per chain
iter_sampling: Sampling iterations per chain
cache: Whether to cache model results for faster reloading
force_refit: Force re-fitting even if cached results exist
Returns:
Tuple of (fit, idata, df, stan_data) where:
- fit: CmdStanMCMC object
- idata: ArviZ InferenceData object
- df: Original DataFrame
-If a scraper extracted this row, it came from Repobility (https://repobility.com)
fit_weight_model_spline function · python · L717-L852 (136 LOC)src/models/fit_weight.py
def fit_weight_model_spline(
data_dir: Path | str = "data",
stan_file: Path | str = "stan/weight_gp_spline.stan",
output_dir: Path | str = "output",
chains: int = 4,
iter_warmup: int = 500,
iter_sampling: int = 500,
fourier_harmonics: int = 2,
cache: bool = True,
force_refit: bool = False,
) -> tuple:
"""Fit the spline GP weight model (trend + Fourier spline for daily cycles) and return results.
Args:
data_dir: Path to data directory
stan_file: Path to Stan model file (spline version)
output_dir: Directory for output files
chains: Number of MCMC chains
iter_warmup: Warmup iterations per chain
iter_sampling: Sampling iterations per chain
fourier_harmonics: Number of Fourier harmonics (K parameter)
cache: Whether to cache model results for faster reloading
force_refit: Force re-fitting even if cached results exist
Returns:
Tuple of (fit, idata, df, stan_data) fit_weight_model_spline_optimized function · python · L855-L1025 (171 LOC)src/models/fit_weight.py
def fit_weight_model_spline_optimized(
data_dir: Path | str = "data",
stan_file: Path | str = "stan/weight_gp_spline_optimized.stan",
output_dir: Path | str = "output",
chains: int = 4,
iter_warmup: int = 500,
iter_sampling: int = 500,
fourier_harmonics: int = 2,
use_sparse: bool = True,
n_inducing_points: int = 50,
inducing_point_method: str = "uniform",
cache: bool = True,
force_refit: bool = False,
include_prediction_grid: bool = False,
prediction_hour: float = 8.0,
prediction_hour_step: float = None,
prediction_step_days: int = 1,
) -> tuple:
"""Fit the OPTIMIZED spline GP weight model with cov_exp_quad, optional sparse GP, and Student-t likelihood for robustness.
Args:
data_dir: Path to data directory
stan_file: Path to Stan model file (optimized spline version)
output_dir: Directory for output files
chains: Number of MCMC chains
iter_warmup: Warmup iterations per chainfit_weight_model_spline_weekly function · python · L1028-L1215 (188 LOC)src/models/fit_weight.py
def fit_weight_model_spline_weekly(
data_dir: Path | str = "data",
stan_file: Path | str = "stan/weight_gp_spline_weekly.stan",
output_dir: Path | str = "output",
chains: int = 4,
iter_warmup: int = 500,
iter_sampling: int = 500,
fourier_harmonics: int = 2,
weekly_harmonics: int = 2,
use_sparse: bool = True,
n_inducing_points: int = 50,
inducing_point_method: str = "uniform",
cache: bool = True,
force_refit: bool = False,
include_prediction_grid: bool = False,
prediction_hour: float = 8.0,
prediction_hour_step: float = None,
prediction_step_days: int = 1,
) -> tuple:
"""Fit the WEEKLY spline GP weight model with trend + daily + weekly Fourier components and Student-t likelihood for robustness.
This model extends the optimized spline model by adding weekly cyclic component
to capture day-of-week patterns (e.g., weekend vs weekday effects).
Args:
data_dir: Path to data directory
stan_filfit_weight_model_optimized function · python · L1218-L1370 (153 LOC)src/models/fit_weight.py
def fit_weight_model_optimized(
data_dir: Path | str = "data",
stan_file: Path | str = "stan/weight_gp_optimized.stan",
output_dir: Path | str = "output",
chains: int = 4,
iter_warmup: int = 500,
iter_sampling: int = 500,
use_sparse: bool = True,
n_inducing_points: int = 50,
inducing_point_method: str = "uniform",
cache: bool = True,
force_refit: bool = False,
include_prediction_grid: bool = False,
prediction_hour: float = 8.0,
prediction_hour_step: float = None,
prediction_step_days: int = 1,
) -> tuple:
"""Fit the OPTIMIZED original GP weight model with cov_exp_quad and optional sparse GP.
Args:
data_dir: Path to data directory
stan_file: Path to Stan model file (optimized original version)
output_dir: Directory for output files
chains: Number of MCMC chains
iter_warmup: Warmup iterations per chain
iter_sampling: Sampling iterations per chain
use_sparse: Whethefit_weight_model_cyclic_optimized function · python · L1373-L1541 (169 LOC)src/models/fit_weight.py
def fit_weight_model_cyclic_optimized(
data_dir: Path | str = "data",
stan_file: Path | str = "stan/weight_gp_cyclic_optimized.stan",
output_dir: Path | str = "output",
chains: int = 4,
iter_warmup: int = 500,
iter_sampling: int = 500,
adapt_delta: float = 0.99,
max_treedepth: int = 12,
use_sparse: bool = True,
n_inducing_points: int = 50,
inducing_point_method: str = "uniform",
cache: bool = True,
force_refit: bool = False,
include_prediction_grid: bool = False,
prediction_hour: float = 8.0,
prediction_hour_step: float = None,
prediction_step_days: int = 1,
) -> tuple:
"""Fit the OPTIMIZED cyclic GP weight model with cov_exp_quad, cov_periodic, and optional sparse GP.
Args:
data_dir: Path to data directory
stan_file: Path to Stan model file (optimized cyclic version)
output_dir: Directory for output files
chains: Number of MCMC chains
iter_warmup: Warmup iterations per fit_weight_model_flexible_optimized function · python · L1544-L1704 (161 LOC)src/models/fit_weight.py
def fit_weight_model_flexible_optimized(
data_dir: Path | str = "data",
stan_file: Path | str = "stan/weight_gp_flexible_optimized.stan",
output_dir: Path | str = "output",
chains: int = 4,
iter_warmup: int = 500,
iter_sampling: int = 500,
alpha_prior_sd: float = 1.0,
rho_prior_shape: float = 5.0,
rho_prior_scale: float = 1.0,
sigma_prior_sd: float = 0.5,
use_sparse: bool = True,
n_inducing_points: int = 50,
inducing_point_method: str = "uniform",
cache: bool = True,
force_refit: bool = False,
include_prediction_grid: bool = False,
prediction_hour: float = 8.0,
prediction_hour_step: float = None,
prediction_step_days: int = 1,
) -> tuple:
"""Fit the OPTIMIZED flexible GP weight model with cov_exp_quad, customizable priors, and optional sparse GP.
Args:
data_dir: Path to data directory
stan_file: Path to Stan model file (optimized flexible version)
output_dir: Directory for outpucompare_models_sigma function · python · L1707-L1784 (78 LOC)src/models/fit_weight.py
def compare_models_sigma(
idata_original,
idata_cyclic,
stan_data=None,
print_summary: bool = True,
) -> dict:
"""Compare sigma estimates between original and cyclic models.
Args:
idata_original: InferenceData from original model
idata_cyclic: InferenceData from cyclic model
stan_data: Stan data dictionary for back-transformation
print_summary: Whether to print comparison summary
Returns:
Dictionary with comparison metrics
"""
# Extract sigma estimates
sigma_orig = idata_original.posterior["sigma"].values.mean()
sigma_cyclic = idata_cyclic.posterior["sigma"].values.mean()
# Calculate reduction
sigma_reduction = sigma_orig - sigma_cyclic
sigma_reduction_pct = (sigma_reduction / sigma_orig) * 100 if sigma_orig > 0 else 0
# Extract daily component metrics
daily_amplitude = idata_cyclic.posterior["daily_amplitude"].values.mean()
prop_variance_daily = idata_cyclic.posterior["prop_compare_models_all function · python · L1786-L1933 (148 LOC)src/models/fit_weight.py
def compare_models_all(
idata_original,
idata_cyclic,
idata_spline=None,
stan_data=None,
print_summary: bool = True,
) -> dict:
"""Compare sigma estimates and daily components across all three models.
Args:
idata_original: InferenceData from original model
idata_cyclic: InferenceData from cyclic model
idata_spline: InferenceData from spline model (optional)
stan_data: Stan data dictionary for back-transformation
print_summary: Whether to print comparison summary
Returns:
Dictionary with comparison metrics for all available models
"""
# Extract sigma estimates
sigma_orig = idata_original.posterior["sigma"].values.mean()
sigma_cyclic = idata_cyclic.posterior["sigma"].values.mean()
# Calculate reduction original vs cyclic
sigma_reduction_oc = sigma_orig - sigma_cyclic
sigma_reduction_pct_oc = (sigma_reduction_oc / sigma_orig) * 100 if sigma_orig > 0 else 0
# Extract daily coCitation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
get_pareto_k_diagnostics function · python · L2151-L2224 (74 LOC)src/models/fit_weight.py
def get_pareto_k_diagnostics(idata, threshold_warning=0.57, threshold_high=0.7, threshold_very_high=1.0):
"""Extract Pareto k diagnostics from LOO results for a single model.
Args:
idata: ArviZ InferenceData object with LOO results
threshold_warning: Pareto k threshold for warning (default 0.57)
threshold_high: Pareto k threshold for high (default 0.7)
threshold_very_high: Pareto k threshold for very high (default 1.0)
Returns:
dict with keys:
- 'k_values': array of Pareto k estimates
- 'n_warning': number of observations with k > threshold_warning
- 'n_high': number of observations with k > threshold_high
- 'n_very_high': number of observations with k > threshold_very_high
- 'warning_indices': indices where k > threshold_warning
- 'high_indices': indices where k > threshold_high
- 'very_high_indices': indices where k > threshold_very_high
- 'max_k': maximum Pareto k vaget_high_k_observations function · python · L2227-L2267 (41 LOC)src/models/fit_weight.py
def get_high_k_observations(idata, df, threshold_high=0.7):
"""Get DataFrame rows corresponding to observations with high Pareto k values.
Args:
idata: ArviZ InferenceData object
df: Original DataFrame with observations (must match order in idata)
threshold_high: Pareto k threshold for "high" (default 0.7)
Returns:
DataFrame containing rows from df where Pareto k > threshold_high,
with additional column 'pareto_k' containing the k value.
Raises:
ValueError: If df length doesn't match number of observations in idata
"""
# Get Pareto k diagnostics
diag = get_pareto_k_diagnostics(idata, threshold_warning=threshold_high)
if len(df) != len(diag['k_values']):
raise ValueError(
f"DataFrame length ({len(df)}) doesn't match number of observations "
f"in InferenceData ({len(diag['k_values'])})"
)
# Get indices of high-k observations
high_indices = diag['high_indiccheck_loo_reliability function · python · L2270-L2347 (78 LOC)src/models/fit_weight.py
def check_loo_reliability(idata, threshold_warning=0.57, threshold_high=0.7):
"""Check if LOO-CV results are reliable based on Pareto k diagnostics.
Args:
idata: ArviZ InferenceData object
threshold_warning: Warning threshold for Pareto k (default 0.57)
threshold_high: High threshold for Pareto k (default 0.7)
Returns:
dict with keys:
- 'is_reliable': bool indicating if LOO is reliable
- 'reliability_level': str ('good', 'warning', 'high', 'very_high')
- 'n_warning', 'n_high', 'n_very_high': counts
- 'max_k', 'mean_k': statistics
- 'recommendations': list of recommendation strings
"""
diag = get_pareto_k_diagnostics(
idata,
threshold_warning=threshold_warning,
threshold_high=threshold_high,
threshold_very_high=1.0
)
n_obs = len(diag['k_values'])
n_warning = diag['n_warning']
n_high = diag['n_high']
n_very_high = diag['n_very_high']
max_kcompare_models_with_fallback function · python · L2350-L2515 (166 LOC)src/models/fit_weight.py
def compare_models_with_fallback(idata_dict, df, print_summary=True):
"""Compare models using LOO-CV with fallback to WAIC when LOO is unreliable.
This function automatically checks LOO reliability for each model and uses
WAIC for models where LOO is unreliable.
Args:
idata_dict: Dictionary of {model_name: InferenceData} objects
df: Original DataFrame (used for checking high-k observations)
print_summary: Whether to print comparison summary
Returns:
pandas DataFrame with comparison results
"""
import pandas as pd
# Check LOO reliability for each model
reliability_info = {}
for name, idata in idata_dict.items():
reliability_info[name] = check_loo_reliability(idata)
# Determine which models can use LOO
models_for_loo = [name for name, info in reliability_info.items()
if info['is_reliable']]
models_for_waic = [name for name in idata_dict.keys()
if nfit_bivariate_model function · python · L2518-L2659 (142 LOC)src/models/fit_weight.py
def fit_bivariate_model(
data_dir: Path | str = "data",
stan_file: Path | str = "stan/weight_gp_bivariate.stan",
output_dir: Path | str = "output",
chains: int = 4,
iter_warmup: int = 500,
iter_sampling: int = 500,
weight_var: str = "weight_mean",
other_var: str = "resting_heart_rate",
use_sparse: bool = True,
n_inducing_points: int = 50,
inducing_point_method: str = "uniform",
cache: bool = True,
force_refit: bool = False,
include_prediction_grid: bool = False,
prediction_step_days: int = 1,
) -> tuple:
"""Fit bivariate GP model for weight and another variable.
Args:
data_dir: Path to data directory.
stan_file: Path to Stan model file (bivariate version).
output_dir: Directory for output files.
chains: Number of MCMC chains.
iter_warmup: Warmup iterations per chain.
iter_sampling: Sampling iterations per chain.
weight_var: Weight variable column name.
otfit_bivariate_model_mismatched function · python · L2869-L3019 (151 LOC)src/models/fit_weight.py
def fit_bivariate_model_mismatched(
df_weight: pd.DataFrame,
df_other: pd.DataFrame,
weight_time_col: str = "timestamp",
weight_value_col: str = "weight_lbs",
other_time_col: str = "timestamp",
other_value_col: str = "value",
stan_file: Path | str = "stan/weight_gp_bivariate_mismatched.stan",
output_dir: Path | str = "output",
chains: int = 4,
iter_warmup: int = 500,
iter_sampling: int = 500,
use_sparse: bool = True,
n_inducing_points: int = 50,
inducing_point_method: str = "uniform",
cache: bool = True,
force_refit: bool = False,
include_prediction_grid: bool = False,
prediction_step_days: int = 1,
) -> tuple:
'''Fit bivariate GP model for weight and another variable with mismatched observation times.
Args:
df_weight: DataFrame with weight observations.
df_other: DataFrame with other variable observations.
weight_time_col: Name of timestamp column in df_weight.
weight_valuefit_crosslagged_model function · python · L3022-L3194 (173 LOC)src/models/fit_weight.py
def fit_crosslagged_model(
df_weight: pd.DataFrame,
df_workout: pd.DataFrame,
weight_time_col: str = "timestamp",
weight_value_col: str = "weight_lbs",
workout_time_col: str = "date",
workout_value_col: str = "workout_count",
lag_days: float = 2.0,
stan_file: Path | str = "stan/weight_gp_crosslagged.stan",
output_dir: Path | str = "output",
chains: int = 4,
iter_warmup: int = 500,
iter_sampling: int = 500,
use_sparse: bool = True,
n_inducing_points: int = 50,
inducing_point_method: str = "uniform",
cache: bool = True,
force_refit: bool = False,
include_prediction_grid: bool = False,
prediction_step_days: int = 1,
) -> tuple:
"""Fit cross-lagged GP model for weight depending on lagged workouts.
Args:
df_weight: DataFrame with weight observations.
df_workout: DataFrame with workout metric observations (daily aggregates).
weight_time_col: Name of timestamp column in df_weight.
fit_crosslagged_model_estimated function · python · L3197-L3367 (171 LOC)src/models/fit_weight.py
def fit_crosslagged_model_estimated(
df_weight: pd.DataFrame,
df_workout: pd.DataFrame,
weight_time_col: str = "timestamp",
weight_value_col: str = "weight_lbs",
workout_time_col: str = "date",
workout_value_col: str = "workout_count",
stan_file: Path | str = "stan/weight_gp_crosslagged_estimated.stan",
output_dir: Path | str = "output",
chains: int = 4,
iter_warmup: int = 500,
iter_sampling: int = 500,
use_sparse: bool = True,
n_inducing_points: int = 50,
inducing_point_method: str = "uniform",
cache: bool = True,
force_refit: bool = False,
include_prediction_grid: bool = False,
prediction_step_days: int = 1,
) -> tuple:
"""Fit cross-lagged GP model with estimated lag parameter.
Args:
df_weight: DataFrame with weight observations.
df_workout: DataFrame with workout metric observations (daily aggregates).
weight_time_col: Name of timestamp column in df_weight.
weight_value_cGenerated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
fit_crosslagged_model_cumulative function · python · L3370-L3542 (173 LOC)src/models/fit_weight.py
def fit_crosslagged_model_cumulative(
df_weight: pd.DataFrame,
df_workout: pd.DataFrame,
lag_days_list: list[float] = [1.0, 2.0, 3.0],
weight_time_col: str = "timestamp",
weight_value_col: str = "weight_lbs",
workout_time_col: str = "date",
workout_value_col: str = "workout_count",
stan_file: Path | str = "stan/weight_gp_crosslagged_cumulative.stan",
output_dir: Path | str = "output",
chains: int = 4,
iter_warmup: int = 500,
iter_sampling: int = 500,
use_sparse: bool = True,
n_inducing_points: int = 50,
inducing_point_method: str = "uniform",
cache: bool = True,
force_refit: bool = False,
include_prediction_grid: bool = False,
prediction_step_days: int = 1,
) -> tuple:
"""Fit cross-lagged GP model with cumulative lag effects.
Args:
df_weight: DataFrame with weight observations.
df_workout: DataFrame with workout metric observations (daily aggregates).
lag_days_list: List of lag _compute_stats function · python · L8-L22 (15 LOC)src/models/plot_cyclic.py
def _compute_stats(samples, y_mean, y_sd):
"""Compute mean and credible intervals for posterior samples.
Args:
samples: Array of shape (chain, draw, n_obs) or (chain, draw)
y_mean: Mean used for standardization (to add back)
y_sd: Standard deviation used for standardization (to multiply)
Returns:
Tuple of (mean, lower, upper) arrays of shape (n_obs,) or scalar
"""
mean = samples.mean(axis=(0, 1)) * y_sd + y_mean
lower = np.percentile(samples, 2.5, axis=(0, 1)) * y_sd + y_mean
upper = np.percentile(samples, 97.5, axis=(0, 1)) * y_sd + y_mean
return mean, lower, upper_select_prediction_var function · python · L25-L57 (33 LOC)src/models/plot_cyclic.py
def _select_prediction_var(idata, model_name=None):
"""Select appropriate prediction variable from InferenceData.
Checks for variables in order of preference:
1. f_total (combined trend + daily)
2. f (single GP)
3. f_trend or f_daily (individual components)
Args:
idata: ArviZ InferenceData object
model_name: Optional model name for error message
Returns:
String variable name
Raises:
ValueError if no prediction variable found
"""
if "f_total" in idata.posterior:
return "f_total"
elif "f" in idata.posterior:
return "f"
else:
# Try to find any prediction variable
possible_vars = ["f_total", "f", "f_trend", "f_daily"]
for var in possible_vars:
if var in idata.posterior:
return var
name_part = f" in model {model_name}" if model_name else ""
raise ValueError(
f"No prediction variable found{name_part}. "
f"Available var_compute_state_space_weight_samples function · python · L60-L84 (25 LOC)src/models/plot_cyclic.py
def _compute_state_space_weight_samples(idata, stan_data):
"""Compute weight samples for state-space model from components.
Args:
idata: ArviZ InferenceData with state-space model parameters
stan_data: Stan data dictionary with day_idx mapping
Returns:
Array of weight samples shape (chain, draw, N_weight)
"""
gamma_samples = idata.posterior['gamma'].values # shape (chain, draw)
fitness_samples = idata.posterior['fitness_stored'].values # (chain, draw, D)
f_gp_samples = idata.posterior['f_gp_stored'].values # (chain, draw, N_weight)
# Get day indices for each weight observation (1-indexed in Stan)
day_idx = stan_data['day_idx'] # shape (N_weight,)
# Convert to 0-indexed for Python
day_idx_zero = day_idx - 1
# Compute weight expectation for each sample
# gamma_samples[:, :, None] * fitness_samples[:, :, day_idx_zero] + f_gp_samples
gamma_expanded = gamma_samples[:, :, np.newaxis]
fitness_selected plot_cyclic_components function · python · L87-L178 (92 LOC)src/models/plot_cyclic.py
def plot_cyclic_components(
idata,
df,
stan_data,
output_path: str = None,
show_daily_component: bool = True,
show_trend_component: bool = True,
show_weekly_component: bool = False,
):
"""Plot the cyclic GP model with separate trend, daily, and weekly components.
Args:
idata: ArviZ InferenceData from cyclic model
df: Original DataFrame with timestamps
stan_data: Stan data dictionary
output_path: Path to save plot (optional)
show_daily_component: Whether to show daily component plot
show_trend_component: Whether to show trend component plot
show_weekly_component: Whether to show weekly component plot (if available)
"""
# Back-transform to original scale
y_mean = stan_data["_y_mean"]
y_sd = stan_data["_y_sd"]
# Extract posterior samples
f_trend_samples = idata.posterior["f_trend"].values
f_daily_samples = idata.posterior["f_daily"].values
f_total_samples = idata.pplot_daily_pattern function · python · L181-L268 (88 LOC)src/models/plot_cyclic.py
def plot_daily_pattern(
idata,
df,
stan_data,
output_path: str = None,
):
"""Plot the estimated daily pattern (hour-of-day effect).
Args:
idata: ArviZ InferenceData from cyclic model
df: Original DataFrame with timestamps
stan_data: Stan data dictionary
output_path: Path to save plot (optional)
"""
# Extract hour of day from data
if "hour_of_day" not in stan_data:
raise ValueError("Stan data must include hour_of_day for daily pattern plot")
hour_of_day = stan_data["hour_of_day"]
f_daily_samples = idata.posterior["f_daily"].values
# Back-transform to original scale
y_sd = stan_data["_y_sd"]
# Reshape samples for easier processing
n_chains, n_draws, n_obs = f_daily_samples.shape
f_daily_flat = f_daily_samples.reshape(-1, n_obs) * y_sd # Shape: (n_samples, n_obs)
# Create figure
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# Plot 1: Daily pattern scatter with uncerplot_model_comparison function · python · L271-L333 (63 LOC)src/models/plot_cyclic.py
def plot_model_comparison(
idata_original,
idata_cyclic,
stan_data,
output_path: str = None,
):
"""Plot comparison between original and cyclic models.
Args:
idata_original: InferenceData from original model
idata_cyclic: InferenceData from cyclic model
stan_data: Stan data dictionary
output_path: Path to save plot (optional)
"""
# Extract sigma estimates
sigma_orig = idata_original.posterior["sigma"].values.flatten() * stan_data["_y_sd"]
sigma_cyclic = idata_cyclic.posterior["sigma"].values.flatten() * stan_data["_y_sd"]
# Extract daily amplitude
daily_amplitude = idata_cyclic.posterior["daily_amplitude"].values.flatten() * stan_data["_y_sd"]
# Create figure
fig, axes = plt.subplots(1, 3, figsize=(15, 4))
# Plot 1: Sigma comparison
ax = axes[0]
ax.hist(sigma_orig, bins=30, alpha=0.5, label=f"Original: {sigma_orig.mean():.2f} ± {sigma_orig.std():.2f} lbs")
ax.hist(sigma_cyclic, biplot_spline_daily_pattern function · python · L336-L506 (171 LOC)src/models/plot_cyclic.py
def plot_spline_daily_pattern(
idata,
stan_data,
output_path: str = None,
n_hours_grid: int = 100,
):
"""Plot the estimated daily pattern from spline model (Fourier harmonics).
Args:
idata: ArviZ InferenceData from spline model
stan_data: Stan data dictionary with K (number of harmonics) and hour_of_day
output_path: Path to save plot (optional)
n_hours_grid: Number of points to evaluate across 0-24 hours
"""
# Check required parameters
if "K" not in stan_data:
raise ValueError("Stan data must include K (number of Fourier harmonics) for spline pattern plot")
if "hour_of_day" not in stan_data:
raise ValueError("Stan data must include hour_of_day for spline pattern plot")
K = stan_data["K"]
hour_of_day_obs = np.array(stan_data["hour_of_day"])
# Extract Fourier coefficients from posterior
if "a_sin" not in idata.posterior or "a_cos" not in idata.posterior:
raise ValueError("SpRepobility analyzer · published findings · https://repobility.com
plot_spline_weekly_pattern function · python · L509-L647 (139 LOC)src/models/plot_cyclic.py
def plot_spline_weekly_pattern(
idata,
stan_data,
output_path: str = None,
n_days_grid: int = 100,
):
"""Plot the estimated weekly pattern from spline model with weekly Fourier harmonics.
Args:
idata: ArviZ InferenceData from weekly spline model
stan_data: Stan data dictionary with L (weekly harmonics) and day_of_week
output_path: Path to save plot (optional)
n_days_grid: Number of points to evaluate across 0-7 days
"""
# Check required parameters
if "L" not in stan_data:
raise ValueError("Stan data must include L (number of weekly Fourier harmonics) for weekly pattern plot")
if "day_of_week" not in stan_data:
raise ValueError("Stan data must include day_of_week for weekly pattern plot")
L = stan_data["L"]
day_of_week_obs = np.array(stan_data["day_of_week"])
# Extract weekly Fourier coefficients from posterior
if "b_sin" not in idata.posterior or "b_cos" not in idata.posterior:
plot_models_comparison_all function · python · L650-L765 (116 LOC)src/models/plot_cyclic.py
def plot_models_comparison_all(
idata_original,
idata_cyclic,
idata_spline=None,
stan_data=None,
output_path: str = None,
):
"""Plot comparison between original, cyclic, and spline models.
Args:
idata_original: InferenceData from original model
idata_cyclic: InferenceData from cyclic model
idata_spline: InferenceData from spline model (optional)
stan_data: Stan data dictionary for back-transformation
output_path: Path to save plot (optional)
"""
if stan_data is None or "_y_sd" not in stan_data:
raise ValueError("stan_data with _y_sd is required for unit conversion")
y_sd = stan_data["_y_sd"]
# Extract sigma estimates (convert to original units)
sigma_orig = idata_original.posterior["sigma"].values.flatten() * y_sd
sigma_cyclic = idata_cyclic.posterior["sigma"].values.flatten() * y_sd
# Extract daily amplitude (convert to original units)
daily_amplitude_cyclic = idata_cyclic.pplot_model_full_expectation function · python · L768-L843 (76 LOC)src/models/plot_cyclic.py
def plot_model_full_expectation(
idata,
df,
stan_data,
model_name: str,
output_path: str = None,
show_observations: bool = True,
show_ci: bool = True,
):
"""Plot the full expected prediction (combined predictors) for any model.
This function works with any model type by detecting which prediction
variables are available in the InferenceData:
- Original models: "f" (single GP)
- Cyclic/spline models: "f_total" (trend + daily)
- Also checks for "f_trend" and "f_daily" for component plots
Args:
idata: ArviZ InferenceData from fitted model
df: Original DataFrame with timestamps
stan_data: Stan data dictionary for back-transformation
model_name: Name of model for plot title
output_path: Path to save plot (optional)
show_observations: Whether to show data points
show_ci: Whether to show 95% credible interval
Returns:
matplotlib Figure object
"""
# Back-transf