Function bodies 261 total
HealthWorkoutCorrelationAnalyzer._generate_visualizations method · python · L516-L533 (18 LOC)src/analysis/explore_health_workout_correlations.py
def _generate_visualizations(self, summary_df: pd.DataFrame) -> None:
"""Generate visualization plots."""
print("\nGenerating visualizations...")
# Create visualizations directory
viz_dir = self.output_dir / "visualizations"
viz_dir.mkdir(exist_ok=True)
# 1. Correlation strength by health metric category
self._plot_correlation_strength(summary_df, viz_dir)
# 2. Lag distribution
self._plot_lag_distribution(summary_df, viz_dir)
# 3. Top correlations heatmap
self._plot_top_correlations_heatmap(summary_df, viz_dir)
print(f"Visualizations saved to: {viz_dir}")HealthWorkoutCorrelationAnalyzer._plot_correlation_strength method · python · L535-L573 (39 LOC)src/analysis/explore_health_workout_correlations.py
def _plot_correlation_strength(self, summary_df: pd.DataFrame, viz_dir: Path) -> None:
"""Plot correlation strength by health metric category."""
# Categorize health metrics
def categorize_metric(metric: str) -> str:
metric_lower = metric.lower()
if any(keyword in metric_lower for keyword in ['sleep', 'rem']):
return 'Sleep'
elif 'stress' in metric_lower:
return 'Stress'
elif any(keyword in metric_lower for keyword in ['heart', 'hr']):
return 'Heart'
elif any(keyword in metric_lower for keyword in ['step', 'calori', 'distance']):
return 'Activity'
elif 'respir' in metric_lower:
return 'Respiration'
elif 'battery' in metric_lower:
return 'Body Battery'
else:
return 'Other'
summary_df['category'] = summary_df['health_metric'].apply(categorize_metrHealthWorkoutCorrelationAnalyzer._plot_lag_distribution method · python · L575-L589 (15 LOC)src/analysis/explore_health_workout_correlations.py
def _plot_lag_distribution(self, summary_df: pd.DataFrame, viz_dir: Path) -> None:
"""Plot distribution of optimal lags."""
plt.figure(figsize=(10, 6))
# Histogram of lags
plt.hist(summary_df['strongest_lag'], bins=range(self.max_lag + 2),
edgecolor='black', alpha=0.7)
plt.title('Distribution of Optimal Lag Days')
plt.xlabel('Lag (days)')
plt.ylabel('Count')
plt.xticks(range(self.max_lag + 1))
plot_path = viz_dir / "lag_distribution.png"
plt.savefig(plot_path, dpi=150)
plt.close()HealthWorkoutCorrelationAnalyzer._plot_top_correlations_heatmap method · python · L591-L616 (26 LOC)src/analysis/explore_health_workout_correlations.py
def _plot_top_correlations_heatmap(self, summary_df: pd.DataFrame, viz_dir: Path) -> None:
"""Create heatmap of top correlations."""
# Get top 20 correlations by absolute value
top_n = min(20, len(summary_df))
top_df = summary_df.nlargest(top_n, 'abs_correlation').copy()
# Create pivot table for heatmap
pivot_data = top_df.pivot_table(
values='correlation',
index='health_metric',
columns='workout_var',
aggfunc='first'
)
if pivot_data.empty:
return
plt.figure(figsize=(12, 10))
sns.heatmap(pivot_data, annot=True, fmt='.2f', cmap='RdBu_r',
center=0, square=True, cbar_kws={'label': 'Correlation (r)'})
plt.title(f'Top {top_n} Health-Workout Correlations')
plt.tight_layout()
plot_path = viz_dir / "top_correlations_heatmap.png"
plt.savefig(plot_path, dpi=150)
plt.close()main function · python · L619-L690 (72 LOC)src/analysis/explore_health_workout_correlations.py
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Explore correlations between workouts and health metrics"
)
# Required arguments
parser.add_argument(
"--workout-vars",
type=str,
required=True,
help="Comma-separated list of workout variables to analyze"
)
# Analysis parameters
parser.add_argument(
"--max-lag",
type=int,
default=7,
help="Maximum lag to compute correlations for (default: 7)"
)
parser.add_argument(
"--max-metrics",
type=int,
default=20,
help="Maximum number of health metrics to analyze (default: 20)"
)
parser.add_argument(
"--min-observations",
type=int,
default=50,
help="Minimum number of observations required (default: 50)"
)
# Output
parser.add_argument(
"--output-dir",
type=str,
default="output/health_correlations",load_correlation_data function · python · L29-L36 (8 LOC)src/analysis/summarize_health_workout_findings.py
def load_correlation_data(file_path: Path) -> pd.DataFrame:
"""Load correlation results from CSV file."""
df = pd.read_csv(file_path)
# Sort by absolute correlation strength
df = df.sort_values('abs_correlation', ascending=False)
return dfcategorize_health_metrics function · python · L39-L58 (20 LOC)src/analysis/summarize_health_workout_findings.py
def categorize_health_metrics(metric_name: str) -> str:
"""Categorize health metrics into meaningful groups."""
metric_lower = metric_name.lower()
if any(keyword in metric_lower for keyword in ['sleep', 'rem', 'deep', 'light', 'awake']):
return 'sleep'
elif any(keyword in metric_lower for keyword in ['heart', 'hr', 'bpm']):
return 'heart'
elif any(keyword in metric_lower for keyword in ['stress', 'avg_stress']):
return 'stress'
elif any(keyword in metric_lower for keyword in ['respiration', 'breath']):
return 'respiration'
elif any(keyword in metric_lower for keyword in ['battery', 'charged', 'drained']):
return 'energy'
elif any(keyword in metric_lower for keyword in ['calories', 'steps', 'distance', 'bmr']):
return 'activity'
elif any(keyword in metric_lower for keyword in ['efficiency', 'duration', 'minutes']):
return 'sleep' # Sleep efficiency/duration
else:
return 'other'Repobility · MCP-ready · https://repobility.com
analyze_top_findings function · python · L61-L106 (46 LOC)src/analysis/summarize_health_workout_findings.py
def analyze_top_findings(df: pd.DataFrame, n_top: int = 20) -> Dict[str, Any]:
"""Analyze top findings from correlation results."""
# Filter for significant correlations only
sig_df = df[df['significant'] == True].copy()
# Group by workout type
workout_groups = {}
for workout_var in sig_df['workout_var'].unique():
workout_data = sig_df[sig_df['workout_var'] == workout_var].copy()
# Add category
workout_data['category'] = workout_data['health_metric'].apply(categorize_health_metrics)
workout_groups[workout_var] = {
'total_significant': len(workout_data),
'positive_effects': len(workout_data[workout_data['direction'] == 'positive']),
'negative_effects': len(workout_data[workout_data['direction'] == 'negative']),
'by_category': workout_data.groupby('category').size().to_dict(),
'top_effects': workout_data.head(n_top).to_dict('records')
}
# Overall statisticsgenerate_visualizations function · python · L109-L170 (62 LOC)src/analysis/summarize_health_workout_findings.py
def generate_visualizations(df: pd.DataFrame, output_dir: Path) -> None:
"""Generate visualization plots."""
# Filter for significant correlations only
sig_df = df[df['significant'] == True].copy()
# Add category
sig_df['category'] = sig_df['health_metric'].apply(categorize_health_metrics)
# 1. Correlation strength by workout type
plt.figure(figsize=(12, 6))
sns.boxplot(data=sig_df, x='workout_var', y='abs_correlation')
plt.title('Correlation Strength by Workout Type (Significant Only)')
plt.xlabel('Workout Type')
plt.ylabel('Absolute Correlation |r|')
plt.tight_layout()
plt.savefig(output_dir / 'correlation_strength_by_workout.png', dpi=150)
plt.close()
# 2. Optimal lag distribution
plt.figure(figsize=(10, 6))
lag_counts = sig_df['strongest_lag'].value_counts().sort_index()
lag_counts.plot(kind='bar')
plt.title('Optimal Lag Distribution for Significant Correlations')
plt.xlabel('Lag (days)')
plt.ylgenerate_html_report function · python · L173-L366 (194 LOC)src/analysis/summarize_health_workout_findings.py
def generate_html_report(analysis_results: Dict[str, Any], output_dir: Path) -> None:
"""Generate HTML report of findings."""
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>Health-Workout Correlation Analysis Summary</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 40px; line-height: 1.6; }}
h1 {{ color: #333; border-bottom: 2px solid #333; padding-bottom: 10px; }}
h2 {{ color: #555; margin-top: 30px; border-bottom: 1px solid #ddd; padding-bottom: 5px; }}
h3 {{ color: #777; margin-top: 20px; }}
.summary-box {{ background-color: #f8f9fa; padding: 20px; border-radius: 5px; margin: 20px 0; border-left: 4px solid #007bff; }}
.finding-box {{ background-color: #e8f4f8; padding: 15px; border-radius: 5px; margin: 15px 0; border-left: 4px solid #17a2b8; }}
.workout-summary {{ background-color: #f0f7ff; padding: 15px; border-radius: 5px; margin: 15px 0main function · python · L369-L466 (98 LOC)src/analysis/summarize_health_workout_findings.py
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Summarize key findings from health-workout correlation analysis"
)
parser.add_argument(
"--correlation-file",
type=str,
default="output/full_health_correlations/health_workout_correlations.csv",
help="Path to correlation results CSV file"
)
parser.add_argument(
"--output-dir",
type=str,
default="output/health_findings_summary",
help="Output directory for results"
)
parser.add_argument(
"--top-n",
type=int,
default=20,
help="Number of top findings to analyze per category"
)
args = parser.parse_args()
# Create output directory
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Create visualizations subdirectory
viz_dir = output_dir / "visualizations"
viz_dir.mkdir(parents=True, exist_ok=True)
try:
load_bivariate_results function · python · L15-L31 (17 LOC)src/analysis/visualize_correlation_comparison.py
def load_bivariate_results():
"""Load bivariate model results from output directory."""
output_dir = Path("output/bivariate")
if not output_dir.exists():
print("Bivariate results not found. Running bivariate analysis...")
run_bivariate()
# Try to load correlation posterior samples
# For now, we'll create dummy data - in practice would load from saved samples
return {
'method': 'Bivariate GP',
'variables': 'Weight vs Resting HR',
'correlation_mean': -0.224, # From earlier run
'correlation_ci': (-0.674, 0.377),
'empirical_corr': -0.205,
'n_obs': 139
}load_independent_gp_results function · python · L34-L85 (52 LOC)src/analysis/visualize_correlation_comparison.py
def load_independent_gp_results():
"""Load independent GP correlation results."""
output_dir = Path("output/independent_gp_correlation")
summary_file = output_dir / "summary.txt"
if not summary_file.exists():
print("Independent GP results not found. Running analysis...")
analyze_weight_vo2max()
# Parse summary.txt
results = {
'method': 'Independent GPs',
'variables': 'Weight vs VO2 Max',
'n_obs_weight': 147,
'n_obs_vo2': 133
}
try:
with open(summary_file, 'r') as f:
lines = f.readlines()
for line in lines:
if 'Empirical correlation' in line:
parts = line.split(':')
if len(parts) > 1:
results['empirical_corr'] = float(parts[1].strip())
elif 'Latent correlation (posterior):' in line:
# Next lines contain mean, std, etc.
pass
create_correlation_comparison_plot function · python · L88-L211 (124 LOC)src/analysis/visualize_correlation_comparison.py
def create_correlation_comparison_plot():
"""Create visualization comparing different correlation analysis methods."""
output_dir = Path("output/visualizations")
output_dir.mkdir(parents=True, exist_ok=True)
print("Loading correlation analysis results...")
# Load results from different methods
bivariate_results = load_bivariate_results()
independent_results = load_independent_gp_results()
# Create comparison data
methods = ['Bivariate GP', 'Independent GPs']
variables = ['Weight vs Resting HR', 'Weight vs VO2 Max']
latent_means = [bivariate_results['correlation_mean'], independent_results['correlation_mean']]
latent_cis = [bivariate_results['correlation_ci'], independent_results['correlation_ci']]
empirical_corrs = [bivariate_results['empirical_corr'], independent_results.get('empirical_corr', np.nan)]
# Create figure
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 10))
# Plot 1: Correlation estimates with confidencanalyze_exercise_categories function · python · L15-L56 (42 LOC)src/analysis/workout_report.py
def analyze_exercise_categories(df_workouts):
"""Analyze exercise categories from detailed exercise data."""
if df_workouts.empty or 'exercise_details' not in df_workouts.columns:
return None
# Flatten exercise details
all_exercises = []
for idx, row in df_workouts.iterrows():
if not row['exercise_details']:
continue
for ex in row['exercise_details']:
ex_record = ex.copy()
ex_record['workout_date'] = row['date']
ex_record['workout_id'] = row['activity_id']
all_exercises.append(ex_record)
if not all_exercises:
return None
df_exercises = pd.DataFrame(all_exercises)
# Analyze by category
category_stats = df_exercises.groupby('category').agg({
'reps': ['count', 'sum', 'mean', 'std'],
'volume': ['sum', 'mean', 'std'],
'duration': ['sum', 'mean', 'std'],
'sets': ['sum', 'mean', 'std'],
}).round(2)
# Get top categories by Repobility analyzer · published findings · https://repobility.com
load_daily_metrics function · python · L9-L97 (89 LOC)src/data/activity.py
def load_daily_metrics(data_dir: Path | str = "data") -> pd.DataFrame:
"""Load daily aggregated metrics from Garmin UDS files.
Args:
data_dir: Path to the data directory containing DI_CONNECT folder.
Returns:
DataFrame with columns: date, resting_heart_rate, total_steps,
active_kilocalories, moderate_intensity_minutes, vigorous_intensity_minutes,
highly_active_seconds, active_seconds, min_heart_rate, max_heart_rate,
avg_stress_level, stress_duration, rest_duration, activity_duration,
floors_ascended_meters, floors_descended_meters, etc.
"""
data_dir = Path(data_dir)
uds_dir = data_dir / "DI_CONNECT/DI-Connect-Aggregator"
# Find all UDS files
uds_files = list(uds_dir.glob("UDSFile_*.json"))
if not uds_files:
raise FileNotFoundError(f"No UDS files found in {uds_dir}")
records = []
for filepath in uds_files:
with open(filepath) as f:
data = json.load(f)
for prepare_daily_metrics_for_stan function · python · L100-L152 (53 LOC)src/data/activity.py
def prepare_daily_metrics_for_stan(
df: pd.DataFrame,
target_variable: str = "resting_heart_rate",
include_date_index: bool = True,
) -> dict:
"""Prepare daily metrics for Stan modeling.
Args:
df: DataFrame from load_daily_metrics()
target_variable: Which variable to extract as primary outcome.
include_date_index: Whether to include date index for alignment.
Returns:
Dictionary with Stan data fields.
"""
# Ensure sorted by date
df = df.sort_values("date").reset_index(drop=True)
# Create days since start
df["days_since_start"] = (df["date"] - df["date"].min()).dt.days
# Extract target variable
if target_variable not in df.columns:
raise ValueError(f"Target variable '{target_variable}' not found in DataFrame")
y = df[target_variable].values
# Remove missing values? For now, require complete data
if np.any(pd.isna(y)):
raise ValueError(f"Target variable '{target_variable}' aggregate_weight_to_daily function · python · L11-L41 (31 LOC)src/data/align.py
def aggregate_weight_to_daily(df_weight: pd.DataFrame) -> pd.DataFrame:
"""Aggregate weight measurements to daily statistics.
Args:
df_weight: DataFrame from load_weight_data() with columns timestamp, weight_lbs.
Returns:
DataFrame with columns date, weight_mean, weight_std, weight_count,
weight_min, weight_max, weight_median, weight_first, weight_last.
"""
# Extract date (without time)
df = df_weight.copy()
df["date"] = df["timestamp"].dt.date
df["date"] = pd.to_datetime(df["date"])
# Group by date
grouped = df.groupby("date")["weight_lbs"].agg([
("weight_mean", "mean"),
("weight_std", "std"),
("weight_count", "count"),
("weight_min", "min"),
("weight_max", "max"),
("weight_median", "median"),
("weight_first", lambda x: x.iloc[0]),
("weight_last", lambda x: x.iloc[-1]),
]).reset_index()
# Fill NaN std with 0 for single measurements
grouped["wemerge_weight_with_daily_metrics function · python · L44-L77 (34 LOC)src/data/align.py
def merge_weight_with_daily_metrics(
data_dir: Path | str = "data",
weight_aggregation: str = "mean",
) -> pd.DataFrame:
"""Merge weight data (aggregated daily) with daily metrics.
Args:
data_dir: Path to data directory.
weight_aggregation: Which weight statistic to use as primary weight variable.
Options: 'mean', 'median', 'first', 'last', 'min', 'max'.
Returns:
DataFrame with columns date, weight_* (all aggregations), plus all daily metrics.
Rows are aligned by date; missing days in either dataset are dropped (inner join).
"""
# Load data
df_weight = load_weight_data(data_dir)
df_daily = load_daily_metrics(data_dir)
# Aggregate weight to daily
df_weight_daily = aggregate_weight_to_daily(df_weight)
# Merge on date (inner join to keep only dates with both weight and daily metrics)
merged = pd.merge(df_weight_daily, df_daily, on="date", how="inner")
# Add derived columns
merged["prepare_bivariate_stan_data function · python · L80-L214 (135 LOC)src/data/align.py
def prepare_bivariate_stan_data(
df: pd.DataFrame,
weight_var: str = "weight_mean",
other_var: str = "resting_heart_rate",
use_sparse: bool = True,
n_inducing_points: int = 50,
inducing_point_method: str = "uniform",
include_prediction_grid: bool = False,
prediction_step_days: int = 1,
) -> dict:
"""Prepare data for bivariate GP model (weight + another variable).
Args:
df: Merged DataFrame from merge_weight_with_daily_metrics().
weight_var: Weight variable column name.
other_var: Other variable column name (must be numeric).
use_sparse: Whether to include sparse GP parameters.
n_inducing_points: Number of inducing points for sparse GP (if use_sparse=True).
inducing_point_method: Method for selecting inducing points ("uniform", "kmeans", "random").
include_prediction_grid: Whether to include prediction grid for unobserved days.
prediction_step_days: Step size in days for prediction gprepare_bivariate_stan_data_mismatched function · python · L217-L375 (159 LOC)src/data/align.py
def prepare_bivariate_stan_data_mismatched(
df_weight: pd.DataFrame,
df_other: pd.DataFrame,
weight_time_col: str = "timestamp",
weight_value_col: str = "weight_lbs",
other_time_col: str = "timestamp",
other_value_col: str = "value",
use_sparse: bool = True,
n_inducing_points: int = 50,
inducing_point_method: str = "uniform",
include_prediction_grid: bool = False,
prediction_step_days: int = 1,
) -> dict:
"""Prepare data for bivariate GP model with mismatched observation times.
Args:
df_weight: DataFrame with weight observations (must have timestamp and value columns).
df_other: DataFrame with other variable observations (must have timestamp and value columns).
weight_time_col: Name of timestamp column in df_weight.
weight_value_col: Name of value column in df_weight.
other_time_col: Name of timestamp column in df_other.
other_value_col: Name of value column in df_other.
use_spaprepare_crosslagged_stan_data function · python · L378-L461 (84 LOC)src/data/align.py
def prepare_crosslagged_stan_data(
df_weight: pd.DataFrame,
df_workout: pd.DataFrame,
weight_time_col: str = "timestamp",
weight_value_col: str = "weight_lbs",
workout_time_col: str = "date",
workout_value_col: str = "workout_count",
lag_days: float = 2.0,
use_sparse: bool = True,
n_inducing_points: int = 50,
inducing_point_method: str = "uniform",
include_prediction_grid: bool = False,
prediction_step_days: int = 1,
) -> dict:
"""Prepare data for cross-lagged GP model (weight depends on lagged workouts).
Args:
df_weight: DataFrame with weight observations.
df_workout: DataFrame with workout metric observations (daily aggregates).
weight_time_col: Name of timestamp column in df_weight.
weight_value_col: Name of value column in df_weight.
workout_time_col: Name of timestamp column in df_workout.
workout_value_col: Name of value column in df_workout.
lag_days: Lag in days (worprepare_crosslagged_stan_data_estimated function · python · L464-L532 (69 LOC)src/data/align.py
def prepare_crosslagged_stan_data_estimated(
df_weight: pd.DataFrame,
df_workout: pd.DataFrame,
weight_time_col: str = "timestamp",
weight_value_col: str = "weight_lbs",
workout_time_col: str = "date",
workout_value_col: str = "workout_count",
use_sparse: bool = True,
n_inducing_points: int = 50,
inducing_point_method: str = "uniform",
include_prediction_grid: bool = False,
prediction_step_days: int = 1,
) -> dict:
"""Prepare data for cross-lagged GP model with estimated lag parameter.
Args:
df_weight: DataFrame with weight observations.
df_workout: DataFrame with workout metric observations (daily aggregates).
weight_time_col: Name of timestamp column in df_weight.
weight_value_col: Name of value column in df_weight.
workout_time_col: Name of timestamp column in df_workout.
workout_value_col: Name of value column in df_workout.
use_sparse: Whether to use sparse GP approximationRepobility — same analyzer, your code, free for public repos · /scan/
prepare_crosslagged_stan_data_cumulative function · python · L535-L614 (80 LOC)src/data/align.py
def prepare_crosslagged_stan_data_cumulative(
df_weight: pd.DataFrame,
df_workout: pd.DataFrame,
lag_days_list: list[float] = [1.0, 2.0, 3.0],
weight_time_col: str = "timestamp",
weight_value_col: str = "weight_lbs",
workout_time_col: str = "date",
workout_value_col: str = "workout_count",
use_sparse: bool = True,
n_inducing_points: int = 50,
inducing_point_method: str = "uniform",
include_prediction_grid: bool = False,
prediction_step_days: int = 1,
) -> dict:
"""Prepare data for cross-lagged GP model with cumulative lag effects.
Args:
df_weight: DataFrame with weight observations.
df_workout: DataFrame with workout metric observations (daily aggregates).
lag_days_list: List of lag values in days (e.g., [1, 2, 3] for 1,2,3 day lags).
weight_time_col: Name of timestamp column in df_weight.
weight_value_col: Name of value column in df_weight.
workout_time_col: Name of timestamp columload_sleep_data function · python · L21-L119 (99 LOC)src/data/health_metrics.py
def load_sleep_data(data_dir: Path | str = "data") -> pd.DataFrame:
"""Load sleep data from Garmin wellness export.
Args:
data_dir: Path to data directory containing DI_CONNECT folder.
Returns:
DataFrame with sleep metrics:
- date: Calendar date
- sleep_start: Sleep start timestamp
- sleep_end: Sleep end timestamp
- total_sleep_minutes: Total sleep duration in minutes
- deep_sleep_minutes: Deep sleep duration in minutes
- light_sleep_minutes: Light sleep duration in minutes
- rem_sleep_minutes: REM sleep duration in minutes
- awake_minutes: Awake duration during sleep window
- unmeasurable_minutes: Unmeasurable sleep duration
- avg_respiration: Average respiration rate
- lowest_respiration: Lowest respiration rate
- highest_respiration: Highest respiration rate
"""
data_dir = Path(data_dir)
wellness_dir = data_dir / "DI_CONNECT/DI-Connect-Wellness"
load_daily_health_metrics function · python · L122-L237 (116 LOC)src/data/health_metrics.py
def load_daily_health_metrics(data_dir: Path | str = "data") -> pd.DataFrame:
"""Load daily health metrics from Garmin UDS (User Daily Summary) files.
Args:
data_dir: Path to data directory containing DI_CONNECT folder.
Returns:
DataFrame with daily health metrics:
- date: Calendar date
- resting_heart_rate: Resting heart rate (bpm)
- avg_stress: Average stress level (0-100)
- max_stress: Maximum stress level (0-100)
- stress_duration_minutes: Total stress duration in minutes
- rest_duration_minutes: Rest duration in minutes
- activity_duration_minutes: Activity duration in minutes
- body_battery_charged: Body Battery charged value
- body_battery_drained: Body Battery drained value
- body_battery_highest: Highest Body Battery value
- body_battery_lowest: Lowest Body Battery value
- avg_respiration: Average waking respiration rate
- highest_respiration: Hload_combined_health_data function · python · L240-L312 (73 LOC)src/data/health_metrics.py
def load_combined_health_data(data_dir: Path | str = "data") -> pd.DataFrame:
"""Load and combine all available health metrics.
This function combines sleep data and daily health metrics into a single
DataFrame with one row per day.
Args:
data_dir: Path to data directory containing DI_CONNECT folder.
Returns:
Combined DataFrame with all health metrics.
"""
# Load individual datasets
sleep_df = load_sleep_data(data_dir)
daily_df = load_daily_health_metrics(data_dir)
if sleep_df.empty and daily_df.empty:
return pd.DataFrame()
# Merge datasets on date
if not sleep_df.empty and not daily_df.empty:
# Ensure both have date columns
if 'date' in sleep_df.columns and 'date' in daily_df.columns:
# Convert to date-only for merging (remove time component)
sleep_df['date_only'] = sleep_df['date'].dt.date
daily_df['date_only'] = daily_df['date'].dt.date
# Renaprepare_health_metrics_for_analysis function · python · L315-L384 (70 LOC)src/data/health_metrics.py
def prepare_health_metrics_for_analysis(
health_df: pd.DataFrame,
target_date: Optional[str] = None
) -> pd.DataFrame:
"""Prepare health metrics for cross-lagged analysis.
This function:
1. Filters to relevant date range if specified
2. Handles missing values
3. Creates derived metrics
4. Ensures consistent date indexing
Args:
health_df: DataFrame with health metrics (from load_combined_health_data)
target_date: Optional target date for filtering (format: 'YYYY-MM-DD')
Returns:
Prepared DataFrame ready for analysis.
"""
if health_df.empty:
return pd.DataFrame()
df = health_df.copy()
# Filter by date if specified
if target_date and 'date' in df.columns:
target_dt = pd.to_datetime(target_date)
# Keep data up to target date
df = df[df['date'] <= target_dt]
# Create derived metrics
# 1. Sleep quality score (composite metric)
if all(col in df.columns for col get_available_health_metrics function · python · L387-L460 (74 LOC)src/data/health_metrics.py
def get_available_health_metrics(data_dir: Path | str = "data") -> Dict[str, List[str]]:
"""Get list of available health metrics in the data.
Args:
data_dir: Path to data directory.
Returns:
Dictionary with metric categories and available metrics.
"""
try:
health_df = load_combined_health_data(data_dir)
except Exception as e:
return {"error": str(e)}
if health_df.empty:
return {"available_metrics": []}
# Categorize metrics
categories = {
"sleep": [
col for col in health_df.columns
if 'sleep' in col.lower() or 'rem' in col.lower()
],
"stress": [
col for col in health_df.columns
if 'stress' in col.lower()
],
"heart": [
col for col in health_df.columns
if 'heart' in col.lower() or 'hr' in col.lower()
],
"activity": [
col for col in health_df.columns
if 'step' icompute_workout_intensity function · python · L24-L115 (92 LOC)src/data/intensity.py
def compute_workout_intensity(
df_workouts: pd.DataFrame,
df_health: pd.DataFrame,
max_hr: float = 185.0,
intensity_col: str = "intensity",
) -> pd.DataFrame:
"""Compute workout intensity for each workout and aggregate by day.
Args:
df_workouts: DataFrame with workout data from load_workout_data.
Must contain columns: 'date', 'duration', 'avg_hr'.
df_health: DataFrame with daily health metrics from load_combined_health_data.
Must contain column: 'resting_heart_rate'.
max_hr: Estimated maximum heart rate (default: 185, typical for age 35).
intensity_col: Name for the intensity column in output.
Returns:
DataFrame with columns: 'date', intensity_col (daily summed intensity).
"""
# Ensure date columns are datetime
df_workouts = df_workouts.copy()
df_health = df_health.copy()
if 'date' in df_workouts.columns:
df_workouts['date'] = pd.to_datetime(df_workload_intensity_data function · python · L118-L180 (63 LOC)src/data/intensity.py
def load_intensity_data(
data_dir: Union[str, Path] = "data",
activity_types: Optional[List[str]] = None,
max_hr: float = 185.0,
intensity_col: str = "intensity",
) -> pd.DataFrame:
"""Load workout data and compute daily intensity.
Args:
data_dir: Path to data directory containing DI_CONNECT folder.
activity_types: List of activity types to include.
If None, includes all types.
max_hr: Estimated maximum heart rate.
intensity_col: Name for intensity column.
Returns:
DataFrame with columns: 'date', intensity_col.
"""
data_dir = Path(data_dir)
# Load health data for resting HR
print("Loading health data for resting heart rate...")
df_health = load_combined_health_data(data_dir)
# Load workout data
print("Loading workout data...")
if activity_types is None:
# Load all activity types by passing empty list (workout module defaults to strength_training)
If a scraper extracted this row, it came from Repobility (https://repobility.com)
prepare_state_space_data function · python · L183-L315 (133 LOC)src/data/intensity.py
def prepare_state_space_data(
df_weight: pd.DataFrame,
df_intensity: pd.DataFrame,
weight_time_col: str = "timestamp",
weight_value_col: str = "weight_lbs",
intensity_time_col: str = "date",
intensity_value_col: str = "intensity",
use_sparse: bool = True,
n_inducing_points: int = 50,
) -> Dict[str, Any]:
"""Prepare data for state-space model.
Args:
df_weight: DataFrame with weight observations.
df_intensity: DataFrame with daily intensity values.
weight_time_col: Name of timestamp column in df_weight.
weight_value_col: Name of value column in df_weight.
intensity_time_col: Name of date column in df_intensity.
intensity_value_col: Name of intensity column in df_intensity.
use_sparse: Whether to use sparse GP approximation.
n_inducing_points: Number of inducing points for sparse GP.
Returns:
Dictionary with Stan data format for weight_state_space.stan.
"""
# Ensurcompute_cumulative_intensity function · python · L318-L363 (46 LOC)src/data/intensity.py
def compute_cumulative_intensity(
df_intensity: pd.DataFrame,
window_days: int = 7,
intensity_col: str = "intensity",
time_col: str = "date",
) -> pd.DataFrame:
"""Compute cumulative intensity over a rolling window.
Args:
df_intensity: DataFrame with daily intensity values.
window_days: Number of days to include in cumulative window.
intensity_col: Name of intensity column.
time_col: Name of time column.
Returns:
DataFrame with original columns plus 'cumulative_intensity_{window_days}d'.
"""
df = df_intensity.copy()
df = df.sort_values(time_col)
# Ensure continuous daily series
date_range = pd.date_range(
start=df[time_col].min(),
end=df[time_col].max(),
freq='D'
)
df_full = pd.DataFrame({time_col: date_range})
df_full = pd.merge(df_full, df[[time_col, intensity_col]], on=time_col, how='left')
df_full[intensity_col] = df_full[intensity_col].fillna(0.0)
compute_log_intensity function · python · L366-L383 (18 LOC)src/data/intensity.py
def compute_log_intensity(
df_intensity: pd.DataFrame,
intensity_col: str = "intensity",
offset: float = 1.0,
) -> pd.DataFrame:
"""Compute log-transformed intensity to reduce skew.
Args:
df_intensity: DataFrame with daily intensity values.
intensity_col: Name of intensity column.
offset: Value to add before log transformation (log(intensity + offset)).
Returns:
DataFrame with original columns plus 'log_intensity'.
"""
df = df_intensity.copy()
df['log_intensity'] = np.log(df[intensity_col] + offset)
return dfprepare_state_space_data_with_spline function · python · L386-L545 (160 LOC)src/data/intensity.py
def prepare_state_space_data_with_spline(
df_weight: pd.DataFrame,
df_intensity: pd.DataFrame,
weight_time_col: str = "timestamp",
weight_value_col: str = "weight_lbs",
intensity_time_col: str = "date",
intensity_value_col: str = "intensity",
use_sparse: bool = True,
n_inducing_points: int = 50,
fourier_harmonics: int = 2,
include_prediction_grid: bool = False,
prediction_hour: float = 8.0,
prediction_step_days: int = 1,
) -> Dict[str, Any]:
"""Prepare data for state-space model with daily spline component.
Args:
df_weight: DataFrame with weight observations.
df_intensity: DataFrame with daily intensity values.
weight_time_col: Name of timestamp column in df_weight.
weight_value_col: Name of value column in df_weight.
intensity_time_col: Name of date column in df_intensity.
intensity_value_col: Name of intensity column in df_intensity.
use_sparse: Whether to use sparse GP apprepare_state_space_data_cumulative function · python · L548-L609 (62 LOC)src/data/intensity.py
def prepare_state_space_data_cumulative(
df_weight: pd.DataFrame,
df_intensity: pd.DataFrame,
cumulative_window: int = 7,
weight_time_col: str = "timestamp",
weight_value_col: str = "weight_lbs",
intensity_time_col: str = "date",
intensity_value_col: str = "intensity",
use_sparse: bool = True,
n_inducing_points: int = 50,
) -> Dict[str, Any]:
"""Prepare data for state-space model with cumulative intensity.
Similar to prepare_state_space_data but uses cumulative intensity
over a window of days instead of single-day intensity.
Args:
df_weight: DataFrame with weight observations.
df_intensity: DataFrame with daily intensity values.
cumulative_window: Number of days to include in cumulative intensity.
weight_time_col: Name of timestamp column in df_weight.
weight_value_col: Name of value column in df_weight.
intensity_time_col: Name of date column in df_intensity.
intensity_value_cload_intensity_by_activity function · python · L612-L697 (86 LOC)src/data/intensity.py
def load_intensity_by_activity(
data_dir: Union[str, Path] = "data",
activity_types: Optional[List[str]] = None,
max_hr: float = 185.0,
) -> pd.DataFrame:
"""Load workout data and compute daily intensity separated by activity type.
Args:
data_dir: Path to data directory containing DI_CONNECT folder.
activity_types: List of activity types to include.
If None, includes ['strength_training', 'walking', 'cycling'].
max_hr: Estimated maximum heart rate.
Returns:
DataFrame with columns: 'date', plus columns for each activity type
with intensity values for that activity (0 on days without that activity).
"""
data_dir = Path(data_dir)
if activity_types is None:
activity_types = ['strength_training', 'walking', 'cycling']
# Load health data for resting HR
print("Loading health data for resting heart rate...")
df_health = load_combined_health_data(data_dir)
# Initializeload_sleep_data function · python · L8-L91 (84 LOC)src/data/sleep.py
def load_sleep_data(data_dir: Path | str = "data") -> pd.DataFrame:
"""Load sleep metrics from Garmin sleepData files.
Args:
data_dir: Path to the data directory containing DI_CONNECT folder.
Returns:
DataFrame with columns: date, sleep_start_gmt, sleep_end_gmt,
total_sleep_seconds, deep_sleep_seconds, light_sleep_seconds,
rem_sleep_seconds, awake_seconds, unmeasurable_seconds,
sleep_efficiency, deep_sleep_percent, light_sleep_percent,
rem_sleep_percent, awake_percent, average_respiration,
lowest_respiration, highest_respiration, etc.
"""
data_dir = Path(data_dir)
sleep_dir = data_dir / "DI_CONNECT/DI-Connect-Wellness"
# Find all sleepData files
sleep_files = list(sleep_dir.glob("*_sleepData.json"))
if not sleep_files:
raise FileNotFoundError(f"No sleepData files found in {sleep_dir}")
records = []
for filepath in sleep_files:
with open(filepath) as f:
datmerge_sleep_with_daily function · python · L94-L108 (15 LOC)src/data/sleep.py
def merge_sleep_with_daily(
df_daily: pd.DataFrame,
df_sleep: pd.DataFrame,
) -> pd.DataFrame:
"""Merge sleep data with daily metrics DataFrame.
Args:
df_daily: Daily metrics DataFrame (from load_daily_metrics).
df_sleep: Sleep DataFrame (from load_sleep_data).
Returns:
Merged DataFrame with sleep columns added.
"""
merged = pd.merge(df_daily, df_sleep, on="date", how="left", suffixes=("", "_sleep"))
return mergedRepobility · MCP-ready · https://repobility.com
load_vo2max_data function · python · L8-L76 (69 LOC)src/data/vo2max.py
def load_vo2max_data(data_dir: Path | str = "data") -> pd.DataFrame:
"""Load VO2 max and fitness metrics from Garmin MetricsMaxMetData files.
Args:
data_dir: Path to the data directory containing DI_CONNECT folder.
Returns:
DataFrame with columns: date, vo2_max, fitness_age, fitness_age_description,
max_met, max_met_category, analyzer_method, calibrated_data, device_id, etc.
"""
data_dir = Path(data_dir)
metrics_dir = data_dir / "DI_CONNECT/DI-Connect-Metrics"
# Find all MetricsMaxMetData files
metric_files = list(metrics_dir.glob("MetricsMaxMetData_*.json"))
if not metric_files:
raise FileNotFoundError(f"No MetricsMaxMetData files found in {metrics_dir}")
records = []
for filepath in metric_files:
with open(filepath) as f:
data = json.load(f)
for entry in data:
# Extract date
date_str = entry.get("calendarDate")
if not date_str:
merge_vo2max_with_weight function · python · L79-L113 (35 LOC)src/data/vo2max.py
def merge_vo2max_with_weight(
data_dir: Path | str = "data",
weight_aggregation: str = "mean",
) -> pd.DataFrame:
"""Merge VO2 max data with weight data.
Args:
data_dir: Path to data directory.
weight_aggregation: Which weight statistic to use as primary weight variable.
Returns:
DataFrame with columns date, weight_* (aggregations), vo2_max, etc.
Rows are aligned by date; missing days in either dataset are dropped (inner join).
"""
from .weight import load_weight_data
from .align import aggregate_weight_to_daily
# Load data
df_weight = load_weight_data(data_dir)
df_vo2max = load_vo2max_data(data_dir)
# Aggregate weight to daily
df_weight_daily = aggregate_weight_to_daily(df_weight)
# Merge on date (inner join to keep only dates with both weight and VO2 max)
merged = pd.merge(df_weight_daily, df_vo2max, on="date", how="inner")
# Add derived columns
merged["weight_variable"] = mergedload_weight_data function · python · L12-L51 (40 LOC)src/data/weight.py
def load_weight_data(data_dir: Path | str = "data") -> pd.DataFrame:
"""Load weight measurements from Garmin biometrics export.
Args:
data_dir: Path to the data directory containing DI_CONNECT folder.
Returns:
DataFrame with columns: date, timestamp, weight_lbs, days_since_start
"""
data_dir = Path(data_dir)
biometrics_path = data_dir / "DI_CONNECT/DI-Connect-Wellness/114762117_userBioMetrics.json"
with open(biometrics_path) as f:
data = json.load(f)
# Extract entries with weight data
records = []
for entry in data:
if "weight" not in entry or not entry["weight"]:
continue
weight_info = entry["weight"]
# Use timestampGMT if available, otherwise calendarDate
timestamp_str = weight_info.get("timestampGMT") or entry["metaData"]["calendarDate"]
date_str = entry["metaData"]["calendarDate"][:10]
weight_lbs = weight_info["weight"] * GRAMS_TO_LBS # Convert from gramprepare_stan_data function · python · L54-L252 (199 LOC)src/data/weight.py
def prepare_stan_data(
df: pd.DataFrame,
include_hour_info: bool = True,
include_weekly_info: bool = False,
fourier_harmonics: int = 2,
weekly_harmonics: int = 2,
use_sparse: bool = True,
n_inducing_points: int = 50,
inducing_point_method: str = "uniform",
include_prediction_grid: bool = False,
prediction_hour: float = 8.0,
prediction_hour_step: float = None,
prediction_step_days: int = 1,
) -> dict:
"""Prepare data dictionary for Stan model.
Args:
df: DataFrame from load_weight_data()
include_hour_info: Whether to include hour-of-day information for cyclic models
include_weekly_info: Whether to include day-of-week information for weekly cyclic models
fourier_harmonics: Number of Fourier harmonics for spline model (K parameter)
weekly_harmonics: Number of Fourier harmonics for weekly spline model (L parameter)
use_sparse: Whether to include sparse GP parameters (for optimized model)load_workout_data function · python · L9-L133 (125 LOC)src/data/workout.py
def load_workout_data(
data_dir: Path | str = "data",
activity_type: str | list[str] = "strength_training",
include_exercise_details: bool = True,
) -> pd.DataFrame:
"""Load workout data from Garmin summarized activities export.
Args:
data_dir: Path to the data directory containing DI_CONNECT folder.
activity_type: Which activity types to include. Can be a string or list.
Common types: 'strength_training', 'walking', 'running', etc.
include_exercise_details: Whether to include detailed exercise set information.
Returns:
DataFrame with columns:
- activity_id, activity_type, name, start_time_gmt, start_time_local,
- duration, calories, avg_hr, max_hr, min_hr, steps, total_reps, total_sets,
- total_volume (optional), active_sets, exercise_details (if include_exercise_details)
- date (derived from start_time_local)
"""
data_dir = Path(data_dir)
activities_path = data_prepare_workout_aggregates function · python · L136-L202 (67 LOC)src/data/workout.py
def prepare_workout_aggregates(
df_workouts: pd.DataFrame,
aggregation: Literal["daily", "weekly"] = "daily",
metric: Literal["count", "volume", "reps", "sets", "calories", "duration"] = "count",
) -> pd.DataFrame:
"""Aggregate workout data to regular time intervals.
Args:
df_workouts: DataFrame from load_workout_data()
aggregation: Time interval for aggregation ('daily' or 'weekly')
metric: Which metric to aggregate ('count', 'volume', 'reps', 'sets', 'calories', 'duration')
Returns:
DataFrame with columns: date, workout_metric (aggregated)
"""
if df_workouts.empty:
return pd.DataFrame(columns=["date", "workout_metric"])
# Ensure date column exists
if "date" not in df_workouts.columns:
raise ValueError("DataFrame must have 'date' column")
# Remove rows without date
df = df_workouts.dropna(subset=["date"]).copy()
# Determine aggregation column based on metric
metric_columns = {
prepare_workout_for_stan function · python · L205-L266 (62 LOC)src/data/workout.py
def prepare_workout_for_stan(
df_workouts: pd.DataFrame,
metric: str = "count",
aggregation: str = "daily",
fill_missing: bool = True,
) -> dict:
"""Prepare workout data for Stan modeling.
Args:
df_workouts: DataFrame from load_workout_data()
metric: Which metric to use ('count', 'volume', 'reps', 'sets', 'calories', 'duration')
aggregation: Time interval ('daily' or 'weekly')
fill_missing: Whether to fill missing days with zeros
Returns:
Dictionary with Stan data fields.
"""
# Aggregate workouts
df_agg = prepare_workout_aggregates(df_workouts, aggregation=aggregation, metric=metric)
if df_agg.empty:
raise ValueError("No workout data after aggregation")
# If fill_missing, create complete date range
if fill_missing and aggregation == "daily":
date_range = pd.date_range(df_agg["date"].min(), df_agg["date"].max(), freq="D")
df_complete = pd.DataFrame({"date": date_range}load_strength_training_data function · python · L269-L302 (34 LOC)src/data/workout.py
def load_strength_training_data(
data_dir: Path | str = "data",
aggregation: str = "daily",
metric: str = "count",
) -> tuple[pd.DataFrame, dict]:
"""Convenience function to load strength training data for modeling.
Args:
data_dir: Path to data directory.
aggregation: Time interval for aggregation.
metric: Which metric to use.
Returns:
Tuple of (DataFrame with workout data, Stan data dictionary)
"""
# Load raw workout data
df_workouts = load_workout_data(
data_dir=data_dir,
activity_type="strength_training",
include_exercise_details=True,
)
# Prepare for Stan
stan_data = prepare_workout_for_stan(
df_workouts,
metric=metric,
aggregation=aggregation,
fill_missing=True,
)
# Also return aggregated DataFrame for inspection
df_agg = prepare_workout_aggregates(df_workouts, aggregation=aggregation, metric=metric)
return df_agg, stan_dataRepobility analyzer · published findings · https://repobility.com
benchmark_model function · python · L23-L72 (50 LOC)src/models/benchmark_optimized.py
def benchmark_model(model_name, fit_function, **kwargs):
"""Benchmark a single model fitting."""
print(f"\n{'='*60}")
print(f"Benchmarking: {model_name}")
print(f"{'='*60}")
start_time = time.time()
try:
fit, idata, df, stan_data = fit_function(**kwargs)
elapsed = time.time() - start_time
# Extract key metrics
sigma_mean = idata.posterior["sigma"].mean().item()
sigma_sd = idata.posterior["sigma"].std().item()
# Compute WAIC if log_likelihood group exists
waic = None
if "log_likelihood" in idata:
import arviz as az
waic_result = az.waic(idata)
# Convert to -2*elpd scale (lower is better)
waic = -2 * waic_result.elpd_waic
print(" Status: ✓ SUCCESS")
print(f" Time: {elapsed:.1f} seconds")
print(f" σ: {sigma_mean:.4f} ± {sigma_sd:.4f}")
if waic is not None:
print(f" WAIC: {waic:.1f}")
return {
should_show_plots function · python · L58-L63 (6 LOC)src/models/demo_bivariate.py
def should_show_plots() -> bool:
"""Return True if plots should be displayed interactively.
Modified to always return False - plots are saved to disk only.
"""
return False_compute_ess function · python · L80-L90 (11 LOC)src/models/demo_bivariate.py
def _compute_ess(da) -> float:
"""Compute effective sample size from DataArray."""
try:
ess = az.ess(da)
if ess is None:
return np.nan
# Extract scalar if possible
val = ess.values
return float(val.item() if val.size == 1 else val.mean())
except Exception:
return np.nan