← back to mjamiv__natural-language-builder

Function bodies 335 total

All specs Real LLM only Function bodies
_process_sensitivity function · python · L670-L736 (67 LOC)
src/nlb/tools/red_team.py
def _process_sensitivity(
    name: str,
    base_dcr: float,
    low_dcr: float,
    high_dcr: float,
    findings: list[Finding],
    results: list[SensitivityResult],
) -> None:
    """Process one sensitivity parameter and generate findings."""
    if base_dcr <= 0:
        delta_dcr = max(abs(high_dcr - base_dcr), abs(low_dcr - base_dcr))
        pct_change = 1.0  # flag as dominant if base is zero
    else:
        delta_dcr = max(abs(high_dcr - base_dcr), abs(low_dcr - base_dcr))
        pct_change = delta_dcr / base_dcr

    if pct_change > SENSITIVITY_DOMINANT_THRESHOLD:
        classification = "DOMINANT"
    elif pct_change > SENSITIVITY_MODERATE_THRESHOLD:
        classification = "MODERATE"
    else:
        classification = "INSENSITIVE"

    results.append(SensitivityResult(
        parameter=name,
        base_dcr=round(base_dcr, 4),
        low_dcr=round(low_dcr, 4),
        high_dcr=round(high_dcr, 4),
        delta_dcr=round(delta_dcr, 4),
        classification=class
extreme_event_combiner function · python · L743-L828 (86 LOC)
src/nlb/tools/red_team.py
def extreme_event_combiner(
    adversarial_results: list[dict],
    standard_results: list[dict],
) -> list[Finding]:
    """Compare adversarial load combinations against standard AASHTO results.

    Args:
        adversarial_results: List of dicts from adversarial combos:
            - ``combo_name``: str
            - ``max_demand``: float — peak demand from this combo
            - ``demand_type``: str — "moment", "shear", etc.
            - ``element``: int
            - ``location``: str
        standard_results: List of dicts from standard AASHTO combos:
            - ``combo_name``: str
            - ``max_demand``: float — peak demand (envelope)
            - ``demand_type``: str

    Returns:
        List of Finding objects for adversarial exceedances.
    """
    findings: list[Finding] = []

    # Build standard envelope by demand type
    standard_envelope: dict[str, float] = {}
    standard_combo_names: dict[str, str] = {}
    for sr in standard_results:
        dtype = 
robustness_check function · python · L835-L944 (110 LOC)
src/nlb/tools/red_team.py
def robustness_check(
    components: list[dict],
    analyze_fn=None,
    robustness_results: list[dict] | None = None,
) -> list[Finding]:
    """Systematically remove one major component and check survival.

    Components to remove: girder lines, bearings, columns, foundations.
    Load level: DC + 0.5×LL per GSA/AASHTO progressive collapse criteria.

    Args:
        components: List of dicts describing removable components:
            - ``name``: str — e.g. "Girder G3", "Column C2"
            - ``type``: str — "girder", "bearing", "column", "foundation"
            - ``elements``: list[int] — element tags comprising this component
        analyze_fn: Callable(removed_elements: list[int]) -> dict.
                    Returns {"max_dcr": float, "stable": bool}.
        robustness_results: Pre-computed results list of dicts:
            - ``component``: str — component name
            - ``max_dcr``: float — max DCR after removal
            - ``stable``: bool — whether structure
history_matcher function · python · L951-L1066 (116 LOC)
src/nlb/tools/red_team.py
def history_matcher(
    bridge_info: dict,
    failure_db: list[dict] | None = None,
    score_threshold: int = 5,
) -> tuple[list[Finding], list[HistoryMatch]]:
    """Match bridge characteristics against historical failure database.

    Scoring algorithm:
    - Bridge type match: +3
    - Material match: +2
    - Span range within ±20%: +2
    - Similar structural detail: +3 per matching detail

    Args:
        bridge_info: Dict describing the bridge:
            - ``type``: str — "steel_girder", "steel_truss", "concrete", etc.
            - ``material``: str — "steel", "concrete"
            - ``max_span_ft``: float — longest span
            - ``details``: list[str] — structural details like
              "fracture_critical", "pin_hanger", etc.
        failure_db: Failure database records. If None, loads from file.
        score_threshold: Minimum score to report a match. Default 5.

    Returns:
        Tuple of (findings, history_matches).
    """
    findings: list[Finding] 
compute_risk_rating function · python · L1073-L1114 (42 LOC)
src/nlb/tools/red_team.py
def compute_risk_rating(
    findings: list[Finding],
    cascade_chains: list[CascadeChain] | None = None,
    sensitivity_results: list[SensitivityResult] | None = None,
    adversarial_exceedance_max: float = 0.0,
) -> str:
    """Compute overall risk rating from findings and analysis results.

    Rating logic:
    - **RED:** Any CRITICAL finding OR any cascade causing collapse
    - **YELLOW:** Any WARNING finding OR dominant sensitivity OR
      adversarial exceedance > 15%
    - **GREEN:** All findings are NOTE or less

    Args:
        findings:                  All findings from all vectors.
        cascade_chains:            Cascade chains from vector 2.
        sensitivity_results:       Sensitivity results from vector 4.
        adversarial_exceedance_max: Max exceedance ratio from vector 5.

    Returns:
        ``'RED'``, ``'YELLOW'``, or ``'GREEN'``.
    """
    # Check for RED conditions
    has_critical = any(f.severity == "CRITICAL" for f in findings)
    has_collaps
generate_summary function · python · L1121-L1172 (52 LOC)
src/nlb/tools/red_team.py
def generate_summary(
    findings: list[Finding],
    risk_rating: str,
    attack_vectors_run: list[str],
    total_combinations: int,
) -> str:
    """Generate a 1-paragraph executive summary of red-team results.

    Args:
        findings:           All findings.
        risk_rating:        Overall risk rating.
        attack_vectors_run: Which vectors were executed.
        total_combinations: Total load combinations checked.

    Returns:
        Executive summary paragraph.
    """
    n_critical = sum(1 for f in findings if f.severity == "CRITICAL")
    n_warning = sum(1 for f in findings if f.severity == "WARNING")
    n_note = sum(1 for f in findings if f.severity == "NOTE")

    if risk_rating == "RED":
        tone = (
            f"The red-team analysis identified {n_critical} CRITICAL "
            f"finding(s) that require immediate attention. "
        )
    elif risk_rating == "YELLOW":
        tone = (
            f"The red-team analysis identified {n_warning} WARNIN
_safe_get function · python · L1179-L1187 (9 LOC)
src/nlb/tools/red_team.py
def _safe_get(d: dict, *keys, default=0.0):
    """Safely traverse nested dicts."""
    val = d
    for k in keys:
        if isinstance(val, dict):
            val = val.get(k, default)
        else:
            return default
    return val if val is not None else default
Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
_get_section_capacity function · python · L1190-L1295 (106 LOC)
src/nlb/tools/red_team.py
def _get_section_capacity(section: dict, fy_ksi: float = 50.0) -> dict:
    """Extract or estimate capacity values from a section dict.

    Returns dict with phi_Mn, phi_Vn, phi_Pn (all in kip-inch units).
    Handles both flat sections and nested I-beam geometry.
    """
    cap: dict = {}

    # Extract dimensions — check multiple possible key names
    depth = (section.get("d", 0.0) or section.get("depth", 0.0)
             or section.get("depth_in", 0.0) or 0.0)
    tw = section.get("tw", 0.0) or 0.0
    bf_top = section.get("bf_top", 0.0) or section.get("bf", 0.0) or 0.0
    tf_top = section.get("tf_top", 0.0) or section.get("tf", 0.0) or 0.0
    bf_bot = section.get("bf_bot", 0.0) or bf_top
    tf_bot = section.get("tf_bot", 0.0) or tf_top
    area = section.get("A", 0.0) or section.get("area", 0.0) or 0.0
    Zx = section.get("Zx", 0.0) or section.get("zx", 0.0) or 0.0
    Sx = section.get("Sx", 0.0) or section.get("sx", 0.0) or 0.0
    Ix = section.get("Ix", 0.0) or section.ge
_get_rc_column_capacity function · python · L1298-L1330 (33 LOC)
src/nlb/tools/red_team.py
def _get_rc_column_capacity(section: dict, fc_ksi: float = 4.0, fy_ksi: float = 60.0) -> dict:
    """Compute RC column capacity from section properties.

    Returns dict with phi_Pn, phi_Mn (kip, kip-in).
    """
    cap: dict = {}
    diameter = section.get("diameter", 0.0) or section.get("D", 0.0)
    Ag = section.get("Ag", 0.0) or section.get("area", 0.0)
    As = section.get("As", 0.0) or section.get("rebar_area", 0.0)

    if diameter > 0 and Ag <= 0:
        Ag = math.pi * (diameter / 2.0) ** 2

    if As <= 0 and Ag > 0:
        # Assume 1% reinforcement ratio
        As = 0.01 * Ag

    # φPn = 0.75 × (0.85×f'c×(Ag−As) + fy×As)  — AASHTO tied column
    if Ag > 0:
        cap["phi_Pn"] = 0.75 * (0.85 * fc_ksi * (Ag - As) + fy_ksi * As)
    else:
        cap["phi_Pn"] = 0.0

    # Approximate φMn for circular column: ~0.12 × Ag × D × fy (rule of thumb)
    if Ag > 0 and diameter > 0:
        cap["phi_Mn"] = 0.9 * 0.12 * As * fy_ksi * diameter
    else:
        cap["phi_Mn"] = 
build_element_results_from_analysis function · python · L1333-L1663 (331 LOC)
src/nlb/tools/red_team.py
def build_element_results_from_analysis(
    analysis_results: dict,
    model: dict | None = None,
) -> list[dict]:
    """Build element_results list from raw AnalysisResults data and model sections.

    This is the key function that bridges raw OpenSees output to the red team
    DCR scanner. It:
    1. Extracts element forces (axial, shear, moment) from analysis envelopes
    2. Looks up section capacities from the model
    3. Computes DCRs for flexure, shear, axial, P-M interaction, deflection

    Args:
        analysis_results: Dict form of AnalysisResults with keys:
            envelopes, dcr, reactions, displacements, modal, controlling_cases
        model: Dict form of AssembledModel with keys:
            elements, sections, nodes, etc.

    Returns:
        List of element result dicts suitable for dcr_scanner().
    """
    element_results: list[dict] = []

    if not analysis_results:
        return element_results

    envelopes = analysis_results.get("envelopes", {})
 
_check_analysis_convergence function · python · L1666-L1740 (75 LOC)
src/nlb/tools/red_team.py
def _check_analysis_convergence(analysis_results: dict) -> list[Finding]:
    """Check for analysis convergence issues and return findings.

    If analysis produced no force results, flags it as a critical finding
    rather than silently returning GREEN.
    """
    findings: list[Finding] = []

    if not analysis_results:
        findings.append(Finding(
            severity="CRITICAL",
            vector="DCR Scanner",
            element=None,
            location="Global",
            description=(
                "Analysis did not converge — no results available. "
                "Cannot verify structural adequacy."
            ),
            dcr=None,
            controlling_combo="N/A",
            recommendation=(
                "Check model connectivity, boundary conditions, and loading. "
                "Verify OpenSees model builds and runs without errors."
            ),
        ))
        return findings

    envelopes = analysis_results.get("envelopes", {})
    disp
run_red_team function · python · L1747-L1948 (202 LOC)
src/nlb/tools/red_team.py
def run_red_team(
    element_results: list[dict] | None = None,
    bridge_info: dict | None = None,
    stage_results: list[dict] | None = None,
    site_constraints: dict | None = None,
    sensitivity_base_dcr: float | None = None,
    sensitivity_parameter_results: list[dict] | None = None,
    adversarial_results: list[dict] | None = None,
    standard_results: list[dict] | None = None,
    components: list[dict] | None = None,
    robustness_results: list[dict] | None = None,
    analyze_fn=None,
    cascade_analyze_fn=None,
    robustness_analyze_fn=None,
    sensitivity_analyze_fn=None,
    load_model: dict | None = None,
    vectors: list[str] | None = None,
    analysis_results: dict | None = None,
    model: dict | None = None,
) -> RedTeamReport:
    """Run the complete red-team analysis.

    This is the canonical entry point. It runs all 7 attack vectors
    (or a subset if ``vectors`` is specified) and assembles the
    RedTeamReport.

    Args:
        element_results:
_executive_report function · python · L67-L120 (54 LOC)
src/nlb/tools/report.py
def _executive_report(
    red_team: RedTeamReport,
    model_info: dict,
    site: dict,
) -> str:
    """Generate 1-page executive summary report (markdown).

    Targets < 500 words. Go/No-Go recommendation with top 3 findings.
    """
    bridge_name = model_info.get("name", "Unnamed Bridge")
    rating_display = _RATING_DISPLAY.get(red_team.risk_rating, red_team.risk_rating)
    recommendation = _RECOMMENDATION_MAP.get(red_team.risk_rating, "Review required.")

    lines = []
    lines.append(f"# BRIDGE RED TEAM REPORT — {bridge_name}")
    lines.append("")
    lines.append(f"**Risk Rating: {rating_display}**")
    lines.append("")
    lines.append(red_team.summary)
    lines.append("")

    # Top 3 findings
    top_findings = red_team.findings[:3]
    if top_findings:
        lines.append("## Top Findings")
        lines.append("")
        for i, f in enumerate(top_findings, 1):
            emoji = _SEVERITY_EMOJI.get(f.severity, "")
            lines.append(
                f"{i
_technical_report function · python · L127-L186 (60 LOC)
src/nlb/tools/report.py
def _technical_report(
    red_team: RedTeamReport,
    model_info: dict,
    site: dict,
) -> str:
    """Generate full 11-section technical report (markdown)."""
    bridge_name = model_info.get("name", "Unnamed Bridge")
    rating_display = _RATING_DISPLAY.get(red_team.risk_rating, red_team.risk_rating)

    sections = []

    # Title
    sections.append(f"# Bridge Red Team Report — {bridge_name}")
    sections.append(f"\n**Risk Rating: {rating_display}**\n")

    # Section 1: Bridge Description
    sections.append("## 1. Bridge Description\n")
    sections.append(_section_bridge_description(model_info))

    # Section 2: Site Conditions
    sections.append("## 2. Site Conditions\n")
    sections.append(_section_site_conditions(site))

    # Section 3: Model Description
    sections.append("## 3. Model Description\n")
    sections.append(_section_model_description(model_info))

    # Section 4: Loading
    sections.append("## 4. Loading\n")
    sections.append(_section_loading(model
_section_bridge_description function · python · L189-L200 (12 LOC)
src/nlb/tools/report.py
def _section_bridge_description(model_info: dict) -> str:
    lines = []
    for key in ["type", "material", "spans", "girder_spacing", "deck_width",
                "num_girders", "num_spans", "max_span_ft"]:
        val = model_info.get(key)
        if val is not None:
            label = key.replace("_", " ").title()
            lines.append(f"- **{label}:** {val}")

    if not lines:
        lines.append("*Bridge description data not provided.*")
    return "\n".join(lines) + "\n"
Repobility — the code-quality scanner for AI-generated software · https://repobility.com
_section_site_conditions function · python · L203-L241 (39 LOC)
src/nlb/tools/report.py
def _section_site_conditions(site: dict) -> str:
    lines = []
    location = site.get("location", {})
    if location:
        parts = [location.get("city", ""), location.get("county", ""),
                 location.get("state", "")]
        loc_str = ", ".join(p for p in parts if p)
        if loc_str:
            lines.append(f"- **Location:** {loc_str}")

    coords = site.get("coordinates", {})
    if coords:
        lines.append(f"- **Coordinates:** {coords.get('lat', '')}, {coords.get('lon', '')}")

    seismic = site.get("seismic", {})
    if seismic:
        sdc = seismic.get("sdc", "N/A")
        sds = seismic.get("sds", "N/A")
        sd1 = seismic.get("sd1", "N/A")
        lines.append(f"- **Seismic Design Category:** {sdc} (SDS={sds}, SD1={sd1})")

    wind = site.get("wind", {})
    if wind:
        lines.append(f"- **Wind Speed:** {wind.get('v_ult', 'N/A')} mph, Exposure {wind.get('exposure', 'C')}")

    thermal = site.get("thermal", {})
    if thermal:
        lines.a
_section_model_description function · python · L244-L254 (11 LOC)
src/nlb/tools/report.py
def _section_model_description(model_info: dict) -> str:
    lines = []
    for key in ["num_nodes", "num_elements", "num_dofs", "element_types"]:
        val = model_info.get(key)
        if val is not None:
            label = key.replace("_", " ").title()
            lines.append(f"- **{label}:** {val}")

    if not lines:
        lines.append("*Model description data not provided.*")
    return "\n".join(lines) + "\n"
_section_loading function · python · L257-L271 (15 LOC)
src/nlb/tools/report.py
def _section_loading(model_info: dict, red_team: RedTeamReport) -> str:
    lines = []
    lines.append(
        f"- **Total Load Combinations:** {red_team.total_combinations} "
        f"(standard + adversarial)"
    )
    lines.append(f"- **Total Load Cases:** {red_team.total_load_cases}")

    load_info = model_info.get("loading", {})
    if load_info:
        for key, val in load_info.items():
            label = key.replace("_", " ").title()
            lines.append(f"- **{label}:** {val}")

    return "\n".join(lines) + "\n"
_section_analysis_results function · python · L274-L293 (20 LOC)
src/nlb/tools/report.py
def _section_analysis_results(red_team: RedTeamReport) -> str:
    lines = []
    if red_team.findings:
        # Find max DCR across all findings
        dcr_findings = [f for f in red_team.findings if f.dcr is not None]
        if dcr_findings:
            max_dcr_finding = max(dcr_findings, key=lambda f: f.dcr)
            lines.append(
                f"- **Maximum DCR:** {max_dcr_finding.dcr:.3f} at "
                f"{max_dcr_finding.location} ({max_dcr_finding.controlling_combo})"
            )

        n_over_1 = sum(1 for f in dcr_findings if (f.dcr or 0) > 1.0)
        n_over_085 = sum(1 for f in dcr_findings if 0.85 < (f.dcr or 0) <= 1.0)
        lines.append(f"- **Elements exceeding capacity (DCR > 1.0):** {n_over_1}")
        lines.append(f"- **Elements near capacity (DCR > 0.85):** {n_over_085}")
    else:
        lines.append("*No analysis results available.*")

    return "\n".join(lines) + "\n"
_section_findings function · python · L296-L325 (30 LOC)
src/nlb/tools/report.py
def _section_findings(red_team: RedTeamReport) -> str:
    if not red_team.findings:
        return "*No findings generated.*\n"

    # Group by vector
    by_vector: dict[str, list[Finding]] = {}
    for f in red_team.findings:
        by_vector.setdefault(f.vector, []).append(f)

    lines = []
    for vector, findings in by_vector.items():
        lines.append(f"### {vector}")
        lines.append("")
        for f in findings:
            emoji = _SEVERITY_EMOJI.get(f.severity, "")
            lines.append(f"- {emoji} **{f.severity}**")
            if f.element is not None:
                lines.append(f"  - Element: {f.element} ({f.location})")
            elif f.location:
                lines.append(f"  - Location: {f.location}")
            lines.append(f"  - {f.description}")
            if f.dcr is not None:
                lines.append(f"  - DCR: {f.dcr:.3f}")
            lines.append(f"  - Controlling: {f.controlling_combo}")
            lines.append(f"  - Recommendation: {
_section_sensitivity function · python · L328-L353 (26 LOC)
src/nlb/tools/report.py
def _section_sensitivity(red_team: RedTeamReport) -> str:
    if not red_team.sensitivity_results:
        return "*Sensitivity analysis not performed or no results available.*\n"

    lines = []

    # Summary table
    lines.append("| Parameter | Base DCR | Low (-20%) | High (+20%) | ΔDCR | Class |")
    lines.append("|-----------|----------|------------|-------------|------|-------|")
    for sr in red_team.sensitivity_results:
        lines.append(
            f"| {sr.parameter} | {sr.base_dcr:.3f} | {sr.low_dcr:.3f} | "
            f"{sr.high_dcr:.3f} | {sr.delta_dcr:.3f} | {sr.classification} |"
        )

    lines.append("")

    # Tornado diagram SVG
    if red_team.sensitivity_results:
        lines.append("### Tornado Diagram")
        lines.append("")
        svg = generate_tornado_svg(red_team.sensitivity_results)
        lines.append(svg)
        lines.append("")

    return "\n".join(lines)
_section_cascade function · python · L356-L380 (25 LOC)
src/nlb/tools/report.py
def _section_cascade(red_team: RedTeamReport) -> str:
    if not red_team.cascade_chains:
        return "*No failure cascade chains detected.*\n"

    lines = []
    for i, chain in enumerate(red_team.cascade_chains, 1):
        collapse_flag = " ⚠️ **COLLAPSE**" if chain.causes_collapse else ""
        lines.append(f"### Chain {i}: Element {chain.trigger_element}{collapse_flag}")
        lines.append("")
        if chain.description:
            lines.append(f"{chain.description}")
        if chain.chain:
            lines.append("")
            lines.append("Cascade sequence:")
            for elem, dcr in chain.chain:
                lines.append(f"  → Element {elem} (DCR = {dcr:.2f})")
        lines.append("")

    # Cascade diagram SVG
    lines.append("### Cascade Diagram")
    lines.append("")
    svg = generate_cascade_svg(red_team.cascade_chains)
    lines.append(svg)

    return "\n".join(lines)
_section_history function · python · L383-L395 (13 LOC)
src/nlb/tools/report.py
def _section_history(red_team: RedTeamReport) -> str:
    if not red_team.history_matches:
        return "*No historical precedent matches found.*\n"

    lines = []
    for match in red_team.history_matches:
        lines.append(f"### {match.failure_name} ({match.year})")
        lines.append(f"- **Similarity Score:** {match.score}")
        lines.append(f"- **Matching Factors:** {', '.join(match.matching_factors)}")
        lines.append(f"- **Key Lesson:** {match.lesson}")
        lines.append("")

    return "\n".join(lines)
All rows above produced by Repobility · https://repobility.com
_section_robustness function · python · L398-L411 (14 LOC)
src/nlb/tools/report.py
def _section_robustness(red_team: RedTeamReport) -> str:
    robustness_findings = [
        f for f in red_team.findings if f.vector == "Robustness Check"
    ]
    if not robustness_findings:
        return "*Robustness assessment not performed.*\n"

    lines = []
    for f in robustness_findings:
        emoji = _SEVERITY_EMOJI.get(f.severity, "")
        lines.append(f"- {emoji} **{f.location}:** {f.description}")
    lines.append("")

    return "\n".join(lines)
_section_recommendations function · python · L414-L457 (44 LOC)
src/nlb/tools/report.py
def _section_recommendations(red_team: RedTeamReport) -> str:
    lines = []

    # Collect unique recommendations
    seen = set()
    critical_recs = []
    warning_recs = []
    note_recs = []

    for f in red_team.findings:
        if f.recommendation not in seen:
            seen.add(f.recommendation)
            if f.severity == "CRITICAL":
                critical_recs.append(f.recommendation)
            elif f.severity == "WARNING":
                warning_recs.append(f.recommendation)
            else:
                note_recs.append(f.recommendation)

    if critical_recs:
        lines.append("### Critical (Immediate Action Required)")
        for i, rec in enumerate(critical_recs, 1):
            lines.append(f"{i}. {rec}")
        lines.append("")

    if warning_recs:
        lines.append("### Warnings (Address Before Final Design)")
        for i, rec in enumerate(warning_recs, 1):
            lines.append(f"{i}. {rec}")
        lines.append("")

    if note_recs:
   
_raw_export function · python · L464-L486 (23 LOC)
src/nlb/tools/report.py
def _raw_export(
    red_team: RedTeamReport,
    model_info: dict,
    site: dict,
) -> str:
    """Export all data as JSON."""
    data = {
        "report_type": "raw",
        "risk_rating": red_team.risk_rating,
        "summary": red_team.summary,
        "model_info": model_info,
        "site": site,
        "findings": [asdict(f) for f in red_team.findings],
        "cascade_chains": [asdict(c) for c in red_team.cascade_chains],
        "sensitivity_results": [asdict(s) for s in red_team.sensitivity_results],
        "history_matches": [asdict(m) for m in red_team.history_matches],
        "robustness_results": red_team.robustness_results,
        "attack_vectors_run": red_team.attack_vectors_run,
        "total_load_cases": red_team.total_load_cases,
        "total_combinations": red_team.total_combinations,
        "analysis_time_sec": red_team.analysis_time_sec,
    }
    return json.dumps(data, indent=2)
generate_tornado_svg function · python · L493-L608 (116 LOC)
src/nlb/tools/report.py
def generate_tornado_svg(
    sensitivity_results: list[SensitivityResult],
    width: int = 600,
    bar_height: int = 28,
    margin: int = 40,
) -> str:
    """Generate SVG tornado diagram from sensitivity results.

    Bars extend left (low) and right (high) from the base DCR centerline.

    Args:
        sensitivity_results: List of SensitivityResult objects.
        width:               SVG width in pixels.
        bar_height:          Height of each bar.
        margin:              Margin around the diagram.

    Returns:
        SVG string.
    """
    if not sensitivity_results:
        return '<svg xmlns="http://www.w3.org/2000/svg" width="100" height="30"><text x="10" y="20">No data</text></svg>'

    n = len(sensitivity_results)
    # Sort by delta_dcr descending for tornado effect
    sorted_results = sorted(sensitivity_results, key=lambda s: s.delta_dcr, reverse=True)

    label_width = 180
    chart_width = width - label_width - margin * 2
    height = margin * 2 + n *
generate_dcr_heatmap_svg function · python · L611-L717 (107 LOC)
src/nlb/tools/report.py
def generate_dcr_heatmap_svg(
    elements: list[dict],
    width: int = 700,
    height: int = 200,
) -> str:
    """Generate SVG DCR heat map — bridge elevation colored by DCR.

    Args:
        elements: List of dicts with:
            - ``element``: int
            - ``x_start``: float — start position along bridge
            - ``x_end``: float — end position
            - ``dcr``: float
            - ``location``: str
        width:  SVG width.
        height: SVG height.

    Returns:
        SVG string.
    """
    if not elements:
        return '<svg xmlns="http://www.w3.org/2000/svg" width="100" height="30"><text x="10" y="20">No data</text></svg>'

    margin = 40
    bridge_y = height / 2
    elem_height = 30

    # Find span range
    all_x = []
    for e in elements:
        all_x.extend([e.get("x_start", 0), e.get("x_end", 0)])
    x_min = min(all_x) if all_x else 0
    x_max = max(all_x) if all_x else 100
    x_range = x_max - x_min
    if x_range < 0.001:
        x_r
generate_cascade_svg function · python · L720-L809 (90 LOC)
src/nlb/tools/report.py
def generate_cascade_svg(
    chains: list[CascadeChain],
    width: int = 600,
) -> str:
    """Generate SVG cascade/flowchart diagram.

    Shows trigger elements and their cascade chains as connected boxes.

    Args:
        chains: List of CascadeChain objects.
        width:  SVG width.

    Returns:
        SVG string.
    """
    if not chains:
        return '<svg xmlns="http://www.w3.org/2000/svg" width="200" height="30"><text x="10" y="20">No cascades</text></svg>'

    box_w = 120
    box_h = 40
    gap_x = 40
    gap_y = 60
    margin = 20

    # Calculate height
    max_chain_len = max(len(c.chain) for c in chains) if chains else 0
    n_rows = len(chains)
    height = margin * 2 + n_rows * (box_h + gap_y)

    lines = []
    lines.append(
        f'<svg xmlns="http://www.w3.org/2000/svg" '
        f'width="{width}" height="{height}" class="cascade-diagram">'
    )
    lines.append(
        '<style>'
        '.trigger { fill: #e74c3c; stroke: #c0392b; stroke-width: 2; rx:
generate_risk_dashboard_svg function · python · L812-L899 (88 LOC)
src/nlb/tools/report.py
def generate_risk_dashboard_svg(
    red_team: RedTeamReport,
    width: int = 500,
    height: int = 250,
) -> str:
    """Generate SVG risk dashboard summary card.

    Shows rating, finding counts, and key metrics in a compact card.

    Args:
        red_team: RedTeamReport object.
        width:    SVG width.
        height:   SVG height.

    Returns:
        SVG string.
    """
    rating_colors = {
        "GREEN": "#27ae60",
        "YELLOW": "#f1c40f",
        "RED": "#e74c3c",
    }
    bg_color = rating_colors.get(red_team.risk_rating, "#95a5a6")

    n_critical = sum(1 for f in red_team.findings if f.severity == "CRITICAL")
    n_warning = sum(1 for f in red_team.findings if f.severity == "WARNING")
    n_note = sum(1 for f in red_team.findings if f.severity == "NOTE")

    lines = []
    lines.append(
        f'<svg xmlns="http://www.w3.org/2000/svg" '
        f'width="{width}" height="{height}" class="risk-dashboard">'
    )
    lines.append(
        '<style>'
        '.
generate_report function · python · L906-L944 (39 LOC)
src/nlb/tools/report.py
def generate_report(
    red_team: RedTeamReport,
    model_info: dict,
    site: dict,
    tier: str = "technical",
) -> str:
    """Generate a bridge red team report at the specified tier.

    Args:
        red_team:   :class:`RedTeamReport` from :func:`~nlb.tools.red_team.run_red_team`.
        model_info: Dict describing the bridge model:
            - ``name``: str — bridge name
            - ``type``: str — bridge type
            - ``material``: str
            - ``spans``: list
            - ``num_nodes``, ``num_elements``, ``num_dofs``: int
            - ``loading``: dict (optional)
        site:       Site profile dict from :meth:`SiteProfile.to_dict`.
        tier:       ``'executive'``, ``'technical'``, or ``'raw'``.

    Returns:
        Markdown string (executive/technical) or JSON string (raw).

    Raises:
        ValueError: If tier is not recognized.
    """
    tier = tier.lower().strip()

    if tier == "executive":
        return _executive_report(red_team, model_
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
generate_wind_load_code function · python · L17-L81 (65 LOC)
src/nlb/tools/secondary_loads.py
def generate_wind_load_code(
    node_tags: Iterable[int],
    girder_depth_in: float,
    barrier_height_in: float = 42,
    deck_elevation_ft: float = 30,
    wind_speed_mph: float = 115,
    span_length_ft: float = 100,
    num_nodes: int = 100,
    drag_coefficient: float = 1.3,
    exposure_alpha: float = 9.5,
    zg_ft: float = 900.0,
    gust_factor: float = 1.0,
) -> str:
    """Return Python code string that applies wind forces to the OpenSees model.

    The generated code computes the velocity pressure PZ per AASHTO:
        PZ = 0.00256 * KZ * G * V^2 * CD    (ksf)
    where KZ is computed for Exposure C via the provided power-law approximation:
        KZ = 2.01 * (z/zg)^(2/alpha)
    The lateral force per node (kips) is taken as:
        F_node = PZ (ksf) * exposed_height (ft) * tributary_length (ft)

    The code produced creates a Plain pattern (tag 1001) and issues ops.load
    calls for each nodeTag (X-direction lateral loads). It also populates a
    results['wind_on
generate_wind_live_load_code function · python · L84-L114 (31 LOC)
src/nlb/tools/secondary_loads.py
def generate_wind_live_load_code(
    node_tags: Iterable[int],
    deck_above_ground_ft: float = 6.0,
    wl_klf: float = 0.10,
    span_length_ft: float = 100.0,
    num_nodes: int = 100,
) -> str:
    """Generate code applying wind on live load (AASHTO 3.8.1.3).

    WL is given in klf (kips per linear foot) at 6 ft above deck. We convert
    to kips per node by multiplying by the tributary length.
    """
    node_tags_list = list(node_tags)
    if num_nodes <= 0:
        num_nodes = max(1, len(node_tags_list))
    trib_len_ft = float(span_length_ft) / float(num_nodes)
    per_node_kip = wl_klf * trib_len_ft

    code_lines: List[str] = []
    code_lines.append("# Wind on live load (AASHTO 3.8.1.3) - generated by secondary_loads")
    code_lines.append("pat_tag = 1002")
    code_lines.append("ops.pattern('Plain', pat_tag, 1)")
    code_lines.append(f"# inputs: wl_klf={wl_klf}, deck_above_ground_ft={deck_above_ground_ft}")
    code_lines.append("results.setdefault('wind_on_live', {}
generate_thermal_load_code function · python · L117-L176 (60 LOC)
src/nlb/tools/secondary_loads.py
def generate_thermal_load_code(
    node_tags: Iterable[int],
    climate: str = "moderate",
    bridge_length_ft: float = 0.0,
    alpha: float = 6.5e-6,
    fixed_node: int | None = None,
) -> str:
    """Return Python code string for a uniform temperature load case.

    The code computes the thermal strains for rise and fall from a setting
    temperature of 60 F and stores them in results['thermal']. If a fixed_node
    is provided the code will create a placeholder concentrated axial force at
    that node proportional to the strain and a nominal sectional area/E.

    NOTE: This generator does not attempt to know section area or stiffness in
    the model. It computes strains (dimensionless) which can be used by the
    caller to create proper constrained-expansion forces when more model
    information is available.
    """
    climate_lower = climate.lower() if climate else "moderate"
    if climate_lower == "moderate":
        Tmax = 80.0
        Tmin = 0.0
    elif climate_l
generate_braking_force_code function · python · L179-L229 (51 LOC)
src/nlb/tools/secondary_loads.py
def generate_braking_force_code(
    node_tags: Iterable[int],
    num_lanes: int = 2,
    bridge_length_ft: float = 100.0,
    num_nodes: int | None = None,
) -> str:
    """Return Python code string applying braking forces per AASHTO 3.6.4.

    The implementation uses the common AASHTO design axle loads:
      - design truck total axle = 72 kip
      - design tandem total axle = 50 kip

    The braking force BR is the maximum of:
      - 25% of truck axles = 0.25 * 72
      - 25% of tandem = 0.25 * 50
      - 5% of (truck + lane load)
      - 5% of (tandem + lane load)

    The generator distributes the resulting force among the provided nodes.
    """
    node_tags_list = list(node_tags)
    if num_nodes is None or num_nodes <= 0:
        num_nodes = max(1, len(node_tags_list))

    truck = 72.0
    tandem = 50.0
    truck_brake = 0.25 * truck
    tandem_brake = 0.25 * tandem

    # lane load assumed to be zero (unknown) so the 5% terms reduce to 5%*truck/tandem
    five_pct_truck_
_seismic_design_category function · python · L73-L89 (17 LOC)
src/nlb/tools/site_recon.py
def _seismic_design_category(sd1: float) -> str:
    """Determine AASHTO Seismic Design Category from SD1.

    SDC drives the level of seismic detailing and analysis required:
      A  — minimal seismic design (SD1 < 0.15)
      B  — moderate design, no ductile detailing (0.15 ≤ SD1 < 0.30)
      C  — ductile substructure required (0.30 ≤ SD1 < 0.50)
      D  — full displacement-based seismic design (SD1 ≥ 0.50)
    """
    if sd1 < 0.15:
        return "A"
    elif sd1 < 0.30:
        return "B"
    elif sd1 < 0.50:
        return "C"
    else:
        return "D"
SeismicProfile class · python · L255-L271 (17 LOC)
src/nlb/tools/site_recon.py
class SeismicProfile:
    """AASHTO seismic hazard parameters at a site.

    All spectral accelerations in units of g (gravitational acceleration).
    Per AASHTO §3.10: SMS = Fa*Ss, SM1 = Fv*S1, SDS = (2/3)*SMS, SD1 = (2/3)*SM1.
    """
    pga: float          # Peak ground acceleration (g)
    ss: float           # 0.2-s spectral acceleration, Site Class B rock (g)
    s1: float           # 1.0-s spectral acceleration, Site Class B rock (g)
    fa: float           # Short-period site coefficient (AASHTO Table 3.10.3.2-1)
    fv: float           # Long-period site coefficient (AASHTO Table 3.10.3.2-2)
    sms: float          # Fa * Ss — MCER spectral accel at short period (g)
    sm1: float          # Fv * S1 — MCER spectral accel at 1.0 s (g)
    sds: float          # (2/3) * SMS — design spectral accel short period (g)
    sd1: float          # (2/3) * SM1 — design spectral accel 1.0 s (g)
    site_class: str     # AASHTO site class A–F (default D if unknown)
    sdc: str          
WindProfile class · python · L275-L282 (8 LOC)
src/nlb/tools/site_recon.py
class WindProfile:
    """ASCE 7-22 wind design parameters.

    V_ult is the 3-second gust wind speed (mph) at 33 ft above grade for
    Risk Category II structures (bridges per AASHTO Table 3.8.1.1.2-1).
    """
    v_ult: int          # mph — basic wind speed (3-s gust, RC-II)
    exposure: str       # ASCE 7 Exposure Category: B, C, or D
ThermalProfile class · python · L286-L294 (9 LOC)
src/nlb/tools/site_recon.py
class ThermalProfile:
    """AASHTO temperature design range for bridge expansion/contraction.

    Values per AASHTO §3.12.2 (steel bridges).  Used to size expansion joints,
    select bearing displacement capacity, and check abutment seat widths.
    """
    t_min: float        # °F — minimum design temperature
    t_max: float        # °F — maximum design temperature
    delta_t: float      # °F — total thermal range (t_max − t_min)
Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
ScourProfile class · python · L298-L310 (13 LOC)
src/nlb/tools/site_recon.py
class ScourProfile:
    """Hydraulic / scour design flags per AASHTO §2.6.4 and HEC-18.

    If water_crossing = True:
      design_flood = Q100 (100-year return period) — primary design flood
      check_flood  = Q500 (500-year return period) — extreme-event check
    Scour depths must be computed by the foundation tool using HEC-18
    methods with site-specific hydraulic data.
    """
    water_crossing: bool
    design_flood: Optional[str]    # "Q100" or None
    check_flood: Optional[str]     # "Q500" or None
    keywords_matched: list[str]    # for transparency in reports
SoilProfile class · python · L314-L323 (10 LOC)
src/nlb/tools/site_recon.py
class SoilProfile:
    """Soil characterization at the site.

    Site class is determined from AASHTO Table 3.10.3.1-1.
    When no geotechnical data is available, use Site Class D per
    AASHTO Commentary C3.10.3.1 — a conservative assumption for most
    US sites that avoids underestimating amplification.
    """
    site_class: str     # A–F per AASHTO Table 3.10.3.1-1
    description: str    # Human-readable description of site class
SiteProfile class · python · L327-L352 (26 LOC)
src/nlb/tools/site_recon.py
class SiteProfile:
    """Complete site environmental profile consumed by all other NLB tools.

    This is the canonical output of :func:`run_site_recon`.  All fields are
    populated — never None — so downstream tools can use them without guards.
    Missing data is filled with conservative defaults (documented inline).

    Serialize with :meth:`to_dict` for JSON transport over MCP.
    """
    coordinates: dict           # {"lat": float, "lon": float}
    location: dict              # {"state": str, "county": str, "city": str}
    seismic: SeismicProfile
    wind: WindProfile
    thermal: ThermalProfile
    scour: ScourProfile
    frost_depth_ft: float       # ft — minimum foundation depth (below grade)
    soil: SoilProfile
    climate_zone: str           # "hot" | "mixed" | "cold" | "very_cold"
    data_sources: dict = field(default_factory=dict)  # which APIs responded
    warnings: list[str] = field(default_factory=list) # fallback notices

    def to_dict(self) -> dict:
     
to_dict method · python · L348-L352 (5 LOC)
src/nlb/tools/site_recon.py
    def to_dict(self) -> dict:
        """Serialize to a plain dict suitable for JSON / MCP transport."""
        d = asdict(self)
        # Flatten nested dataclasses that asdict already handles; just clean up.
        return d
fetch_seismic_hazard function · python · L359-L444 (86 LOC)
src/nlb/tools/site_recon.py
def fetch_seismic_hazard(
    lat: float,
    lon: float,
    site_class: str = "D",
) -> tuple[SeismicProfile, bool]:
    """Fetch AASHTO seismic hazard parameters from the USGS Design Maps API.

    Uses the AASHTO 2009 reference document endpoint.  The API returns:
    PGA, Ss, S1, Fa, Fv, SMS, SM1, SDS, SD1 for the given coordinates and
    site class.

    Engineering decision: We always query for the user-specified site_class
    (default D).  If the API returns site coefficients that differ from our
    input (which can happen for extreme site classes), we trust the API.

    Args:
        lat:        Latitude (WGS-84, decimal degrees).
        lon:        Longitude (WGS-84, decimal degrees).
        site_class: AASHTO site class A–F (default "D").

    Returns:
        Tuple of (SeismicProfile, success_flag).
        On failure, returns (_SEISMIC_FALLBACK as SeismicProfile, False).

    API reference:
        https://earthquake.usgs.gov/ws/designmaps/aashto-2009.json
    """
  
_build_seismic_profile function · python · L447-L462 (16 LOC)
src/nlb/tools/site_recon.py
def _build_seismic_profile(d: dict, site_class: str) -> SeismicProfile:
    """Construct a SeismicProfile from a flat dict (e.g. the fallback dict)."""
    sd1 = d.get("sd1", _SEISMIC_FALLBACK["sd1"])
    return SeismicProfile(
        pga=d.get("pga", _SEISMIC_FALLBACK["pga"]),
        ss=d.get("ss",  _SEISMIC_FALLBACK["ss"]),
        s1=d.get("s1",  _SEISMIC_FALLBACK["s1"]),
        fa=d.get("fa",  _SEISMIC_FALLBACK["fa"]),
        fv=d.get("fv",  _SEISMIC_FALLBACK["fv"]),
        sms=d.get("sms", _SEISMIC_FALLBACK["sms"]),
        sm1=d.get("sm1", _SEISMIC_FALLBACK["sm1"]),
        sds=d.get("sds", _SEISMIC_FALLBACK["sds"]),
        sd1=sd1,
        site_class=site_class,
        sdc=_seismic_design_category(sd1),
    )
fetch_thermal_range function · python · L469-L510 (42 LOC)
src/nlb/tools/site_recon.py
def fetch_thermal_range(
    lat: float,
    lon: float,
    state: str = "",
    noaa_token: Optional[str] = None,
) -> tuple[ThermalProfile, bool]:
    """Fetch temperature design range from NOAA NCEI Climate Data Online API.

    Finds the nearest weather station with Annual Climate Normal data (1991–2020
    normals, dataset NORMAL_ANN) and extracts:
      - MLY-TMIN-NORMAL → approximate design minimum temperature (°F)
      - MLY-TMAX-NORMAL → approximate design maximum temperature (°F)

    Engineering note: NOAA climate normals are 30-year averages, NOT design
    extremes.  AASHTO §3.12.2 requires using the 50-year extreme temperature,
    which is roughly normal ± 20–25°F.  We apply a ±22°F adjustment to convert
    normals to approximate design extremes.

    Fallback hierarchy:
      1. NOAA NCEI API (requires token, may be slow)
      2. State-level climate zone lookup table

    Args:
        lat:         Latitude.
        lon:         Longitude.
        state:       2-let
_fetch_noaa_normals function · python · L513-L578 (66 LOC)
src/nlb/tools/site_recon.py
def _fetch_noaa_normals(lat: float, lon: float, token: str) -> Optional[ThermalProfile]:
    """Internal: fetch climate normals from NOAA NCEI CDO API."""
    # Step 1: find nearest station with NORMAL_ANN data
    stations_url = "https://www.ncei.noaa.gov/cdo-web/api/v2/stations"
    headers = {"token": token}
    station_params = {
        "datasetid": "NORMAL_ANN",
        "datatypeid": "MLY-TMIN-NORMAL,MLY-TMAX-NORMAL",
        "units": "standard",       # Fahrenheit
        "extent": f"{lat-1.0},{lon-1.0},{lat+1.0},{lon+1.0}",
        "limit": 5,
        "sortfield": "name",
    }

    resp = requests.get(
        stations_url, params=station_params, headers=headers, timeout=_API_TIMEOUT
    )
    resp.raise_for_status()
    station_data = resp.json()

    results = station_data.get("results", [])
    if not results:
        return None

    station_id = results[0]["id"]

    # Step 2: fetch annual normal data for that station
    data_url = "https://www.ncei.noaa.gov/cdo-web/api/
Repobility — the code-quality scanner for AI-generated software · https://repobility.com
_thermal_from_state function · python · L581-L585 (5 LOC)
src/nlb/tools/site_recon.py
def _thermal_from_state(state: str) -> ThermalProfile:
    """Return design thermal profile from state→climate zone lookup."""
    zone = CLIMATE_ZONE_BY_STATE.get(state, "cold")
    values = _THERMAL_BY_CLIMATE_ZONE.get(zone, _THERMAL_DEFAULT)
    return ThermalProfile(**values)
get_wind_speed function · python · L592-L616 (25 LOC)
src/nlb/tools/site_recon.py
def get_wind_speed(
    state: str,
    exposure: str = _DEFAULT_EXPOSURE,
) -> WindProfile:
    """Return ASCE 7-22 basic wind speed for the given state.

    Uses a conservative state-wide lookup table.  For coastal projects,
    county-level values should be obtained from the ASCE 7 Hazard Tool.

    Exposure category logic (ASCE 7 §26.7):
      B — Urban/suburban, forest (≥ 1500 ft fetch)
      C — Open terrain, scattered obstructions (default for most bridges)
      D — Flat, open water or mud flats, shoreline exposed to hurricanes

    The caller may override the default Exposure C.

    Args:
        state:    2-letter state abbreviation.
        exposure: ASCE 7 Exposure Category (B, C, or D).

    Returns:
        WindProfile with V_ult and exposure category.
    """
    v_ult = _WIND_SPEED_BY_STATE.get(state, _WIND_SPEED_DEFAULT)
    return WindProfile(v_ult=v_ult, exposure=exposure)
get_frost_depth function · python · L623-L636 (14 LOC)
src/nlb/tools/site_recon.py
def get_frost_depth(state: str) -> float:
    """Return minimum foundation depth below grade due to frost (feet).

    Values are based on AASHTO §10.6.1.2 and regional DOT bridge standards.
    The returned value is the *minimum* — actual embedment must also satisfy
    bearing capacity, scour, and development length requirements.

    Args:
        state: 2-letter state abbreviation.

    Returns:
        Frost depth in feet (US customary).
    """
    return _FROST_DEPTH_BY_STATE.get(state, _FROST_DEPTH_DEFAULT)
‹ prevpage 5 / 7next ›