Function bodies 335 total
_process_sensitivity function · python · L670-L736 (67 LOC)src/nlb/tools/red_team.py
def _process_sensitivity(
name: str,
base_dcr: float,
low_dcr: float,
high_dcr: float,
findings: list[Finding],
results: list[SensitivityResult],
) -> None:
"""Process one sensitivity parameter and generate findings."""
if base_dcr <= 0:
delta_dcr = max(abs(high_dcr - base_dcr), abs(low_dcr - base_dcr))
pct_change = 1.0 # flag as dominant if base is zero
else:
delta_dcr = max(abs(high_dcr - base_dcr), abs(low_dcr - base_dcr))
pct_change = delta_dcr / base_dcr
if pct_change > SENSITIVITY_DOMINANT_THRESHOLD:
classification = "DOMINANT"
elif pct_change > SENSITIVITY_MODERATE_THRESHOLD:
classification = "MODERATE"
else:
classification = "INSENSITIVE"
results.append(SensitivityResult(
parameter=name,
base_dcr=round(base_dcr, 4),
low_dcr=round(low_dcr, 4),
high_dcr=round(high_dcr, 4),
delta_dcr=round(delta_dcr, 4),
classification=classextreme_event_combiner function · python · L743-L828 (86 LOC)src/nlb/tools/red_team.py
def extreme_event_combiner(
adversarial_results: list[dict],
standard_results: list[dict],
) -> list[Finding]:
"""Compare adversarial load combinations against standard AASHTO results.
Args:
adversarial_results: List of dicts from adversarial combos:
- ``combo_name``: str
- ``max_demand``: float — peak demand from this combo
- ``demand_type``: str — "moment", "shear", etc.
- ``element``: int
- ``location``: str
standard_results: List of dicts from standard AASHTO combos:
- ``combo_name``: str
- ``max_demand``: float — peak demand (envelope)
- ``demand_type``: str
Returns:
List of Finding objects for adversarial exceedances.
"""
findings: list[Finding] = []
# Build standard envelope by demand type
standard_envelope: dict[str, float] = {}
standard_combo_names: dict[str, str] = {}
for sr in standard_results:
dtype = robustness_check function · python · L835-L944 (110 LOC)src/nlb/tools/red_team.py
def robustness_check(
components: list[dict],
analyze_fn=None,
robustness_results: list[dict] | None = None,
) -> list[Finding]:
"""Systematically remove one major component and check survival.
Components to remove: girder lines, bearings, columns, foundations.
Load level: DC + 0.5×LL per GSA/AASHTO progressive collapse criteria.
Args:
components: List of dicts describing removable components:
- ``name``: str — e.g. "Girder G3", "Column C2"
- ``type``: str — "girder", "bearing", "column", "foundation"
- ``elements``: list[int] — element tags comprising this component
analyze_fn: Callable(removed_elements: list[int]) -> dict.
Returns {"max_dcr": float, "stable": bool}.
robustness_results: Pre-computed results list of dicts:
- ``component``: str — component name
- ``max_dcr``: float — max DCR after removal
- ``stable``: bool — whether structurehistory_matcher function · python · L951-L1066 (116 LOC)src/nlb/tools/red_team.py
def history_matcher(
bridge_info: dict,
failure_db: list[dict] | None = None,
score_threshold: int = 5,
) -> tuple[list[Finding], list[HistoryMatch]]:
"""Match bridge characteristics against historical failure database.
Scoring algorithm:
- Bridge type match: +3
- Material match: +2
- Span range within ±20%: +2
- Similar structural detail: +3 per matching detail
Args:
bridge_info: Dict describing the bridge:
- ``type``: str — "steel_girder", "steel_truss", "concrete", etc.
- ``material``: str — "steel", "concrete"
- ``max_span_ft``: float — longest span
- ``details``: list[str] — structural details like
"fracture_critical", "pin_hanger", etc.
failure_db: Failure database records. If None, loads from file.
score_threshold: Minimum score to report a match. Default 5.
Returns:
Tuple of (findings, history_matches).
"""
findings: list[Finding] compute_risk_rating function · python · L1073-L1114 (42 LOC)src/nlb/tools/red_team.py
def compute_risk_rating(
findings: list[Finding],
cascade_chains: list[CascadeChain] | None = None,
sensitivity_results: list[SensitivityResult] | None = None,
adversarial_exceedance_max: float = 0.0,
) -> str:
"""Compute overall risk rating from findings and analysis results.
Rating logic:
- **RED:** Any CRITICAL finding OR any cascade causing collapse
- **YELLOW:** Any WARNING finding OR dominant sensitivity OR
adversarial exceedance > 15%
- **GREEN:** All findings are NOTE or less
Args:
findings: All findings from all vectors.
cascade_chains: Cascade chains from vector 2.
sensitivity_results: Sensitivity results from vector 4.
adversarial_exceedance_max: Max exceedance ratio from vector 5.
Returns:
``'RED'``, ``'YELLOW'``, or ``'GREEN'``.
"""
# Check for RED conditions
has_critical = any(f.severity == "CRITICAL" for f in findings)
has_collapsgenerate_summary function · python · L1121-L1172 (52 LOC)src/nlb/tools/red_team.py
def generate_summary(
findings: list[Finding],
risk_rating: str,
attack_vectors_run: list[str],
total_combinations: int,
) -> str:
"""Generate a 1-paragraph executive summary of red-team results.
Args:
findings: All findings.
risk_rating: Overall risk rating.
attack_vectors_run: Which vectors were executed.
total_combinations: Total load combinations checked.
Returns:
Executive summary paragraph.
"""
n_critical = sum(1 for f in findings if f.severity == "CRITICAL")
n_warning = sum(1 for f in findings if f.severity == "WARNING")
n_note = sum(1 for f in findings if f.severity == "NOTE")
if risk_rating == "RED":
tone = (
f"The red-team analysis identified {n_critical} CRITICAL "
f"finding(s) that require immediate attention. "
)
elif risk_rating == "YELLOW":
tone = (
f"The red-team analysis identified {n_warning} WARNIN_safe_get function · python · L1179-L1187 (9 LOC)src/nlb/tools/red_team.py
def _safe_get(d: dict, *keys, default=0.0):
"""Safely traverse nested dicts."""
val = d
for k in keys:
if isinstance(val, dict):
val = val.get(k, default)
else:
return default
return val if val is not None else defaultWant fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
_get_section_capacity function · python · L1190-L1295 (106 LOC)src/nlb/tools/red_team.py
def _get_section_capacity(section: dict, fy_ksi: float = 50.0) -> dict:
"""Extract or estimate capacity values from a section dict.
Returns dict with phi_Mn, phi_Vn, phi_Pn (all in kip-inch units).
Handles both flat sections and nested I-beam geometry.
"""
cap: dict = {}
# Extract dimensions — check multiple possible key names
depth = (section.get("d", 0.0) or section.get("depth", 0.0)
or section.get("depth_in", 0.0) or 0.0)
tw = section.get("tw", 0.0) or 0.0
bf_top = section.get("bf_top", 0.0) or section.get("bf", 0.0) or 0.0
tf_top = section.get("tf_top", 0.0) or section.get("tf", 0.0) or 0.0
bf_bot = section.get("bf_bot", 0.0) or bf_top
tf_bot = section.get("tf_bot", 0.0) or tf_top
area = section.get("A", 0.0) or section.get("area", 0.0) or 0.0
Zx = section.get("Zx", 0.0) or section.get("zx", 0.0) or 0.0
Sx = section.get("Sx", 0.0) or section.get("sx", 0.0) or 0.0
Ix = section.get("Ix", 0.0) or section.ge_get_rc_column_capacity function · python · L1298-L1330 (33 LOC)src/nlb/tools/red_team.py
def _get_rc_column_capacity(section: dict, fc_ksi: float = 4.0, fy_ksi: float = 60.0) -> dict:
"""Compute RC column capacity from section properties.
Returns dict with phi_Pn, phi_Mn (kip, kip-in).
"""
cap: dict = {}
diameter = section.get("diameter", 0.0) or section.get("D", 0.0)
Ag = section.get("Ag", 0.0) or section.get("area", 0.0)
As = section.get("As", 0.0) or section.get("rebar_area", 0.0)
if diameter > 0 and Ag <= 0:
Ag = math.pi * (diameter / 2.0) ** 2
if As <= 0 and Ag > 0:
# Assume 1% reinforcement ratio
As = 0.01 * Ag
# φPn = 0.75 × (0.85×f'c×(Ag−As) + fy×As) — AASHTO tied column
if Ag > 0:
cap["phi_Pn"] = 0.75 * (0.85 * fc_ksi * (Ag - As) + fy_ksi * As)
else:
cap["phi_Pn"] = 0.0
# Approximate φMn for circular column: ~0.12 × Ag × D × fy (rule of thumb)
if Ag > 0 and diameter > 0:
cap["phi_Mn"] = 0.9 * 0.12 * As * fy_ksi * diameter
else:
cap["phi_Mn"] = build_element_results_from_analysis function · python · L1333-L1663 (331 LOC)src/nlb/tools/red_team.py
def build_element_results_from_analysis(
analysis_results: dict,
model: dict | None = None,
) -> list[dict]:
"""Build element_results list from raw AnalysisResults data and model sections.
This is the key function that bridges raw OpenSees output to the red team
DCR scanner. It:
1. Extracts element forces (axial, shear, moment) from analysis envelopes
2. Looks up section capacities from the model
3. Computes DCRs for flexure, shear, axial, P-M interaction, deflection
Args:
analysis_results: Dict form of AnalysisResults with keys:
envelopes, dcr, reactions, displacements, modal, controlling_cases
model: Dict form of AssembledModel with keys:
elements, sections, nodes, etc.
Returns:
List of element result dicts suitable for dcr_scanner().
"""
element_results: list[dict] = []
if not analysis_results:
return element_results
envelopes = analysis_results.get("envelopes", {})
_check_analysis_convergence function · python · L1666-L1740 (75 LOC)src/nlb/tools/red_team.py
def _check_analysis_convergence(analysis_results: dict) -> list[Finding]:
"""Check for analysis convergence issues and return findings.
If analysis produced no force results, flags it as a critical finding
rather than silently returning GREEN.
"""
findings: list[Finding] = []
if not analysis_results:
findings.append(Finding(
severity="CRITICAL",
vector="DCR Scanner",
element=None,
location="Global",
description=(
"Analysis did not converge — no results available. "
"Cannot verify structural adequacy."
),
dcr=None,
controlling_combo="N/A",
recommendation=(
"Check model connectivity, boundary conditions, and loading. "
"Verify OpenSees model builds and runs without errors."
),
))
return findings
envelopes = analysis_results.get("envelopes", {})
disprun_red_team function · python · L1747-L1948 (202 LOC)src/nlb/tools/red_team.py
def run_red_team(
element_results: list[dict] | None = None,
bridge_info: dict | None = None,
stage_results: list[dict] | None = None,
site_constraints: dict | None = None,
sensitivity_base_dcr: float | None = None,
sensitivity_parameter_results: list[dict] | None = None,
adversarial_results: list[dict] | None = None,
standard_results: list[dict] | None = None,
components: list[dict] | None = None,
robustness_results: list[dict] | None = None,
analyze_fn=None,
cascade_analyze_fn=None,
robustness_analyze_fn=None,
sensitivity_analyze_fn=None,
load_model: dict | None = None,
vectors: list[str] | None = None,
analysis_results: dict | None = None,
model: dict | None = None,
) -> RedTeamReport:
"""Run the complete red-team analysis.
This is the canonical entry point. It runs all 7 attack vectors
(or a subset if ``vectors`` is specified) and assembles the
RedTeamReport.
Args:
element_results:_executive_report function · python · L67-L120 (54 LOC)src/nlb/tools/report.py
def _executive_report(
red_team: RedTeamReport,
model_info: dict,
site: dict,
) -> str:
"""Generate 1-page executive summary report (markdown).
Targets < 500 words. Go/No-Go recommendation with top 3 findings.
"""
bridge_name = model_info.get("name", "Unnamed Bridge")
rating_display = _RATING_DISPLAY.get(red_team.risk_rating, red_team.risk_rating)
recommendation = _RECOMMENDATION_MAP.get(red_team.risk_rating, "Review required.")
lines = []
lines.append(f"# BRIDGE RED TEAM REPORT — {bridge_name}")
lines.append("")
lines.append(f"**Risk Rating: {rating_display}**")
lines.append("")
lines.append(red_team.summary)
lines.append("")
# Top 3 findings
top_findings = red_team.findings[:3]
if top_findings:
lines.append("## Top Findings")
lines.append("")
for i, f in enumerate(top_findings, 1):
emoji = _SEVERITY_EMOJI.get(f.severity, "")
lines.append(
f"{i_technical_report function · python · L127-L186 (60 LOC)src/nlb/tools/report.py
def _technical_report(
red_team: RedTeamReport,
model_info: dict,
site: dict,
) -> str:
"""Generate full 11-section technical report (markdown)."""
bridge_name = model_info.get("name", "Unnamed Bridge")
rating_display = _RATING_DISPLAY.get(red_team.risk_rating, red_team.risk_rating)
sections = []
# Title
sections.append(f"# Bridge Red Team Report — {bridge_name}")
sections.append(f"\n**Risk Rating: {rating_display}**\n")
# Section 1: Bridge Description
sections.append("## 1. Bridge Description\n")
sections.append(_section_bridge_description(model_info))
# Section 2: Site Conditions
sections.append("## 2. Site Conditions\n")
sections.append(_section_site_conditions(site))
# Section 3: Model Description
sections.append("## 3. Model Description\n")
sections.append(_section_model_description(model_info))
# Section 4: Loading
sections.append("## 4. Loading\n")
sections.append(_section_loading(model_section_bridge_description function · python · L189-L200 (12 LOC)src/nlb/tools/report.py
def _section_bridge_description(model_info: dict) -> str:
lines = []
for key in ["type", "material", "spans", "girder_spacing", "deck_width",
"num_girders", "num_spans", "max_span_ft"]:
val = model_info.get(key)
if val is not None:
label = key.replace("_", " ").title()
lines.append(f"- **{label}:** {val}")
if not lines:
lines.append("*Bridge description data not provided.*")
return "\n".join(lines) + "\n"Repobility — the code-quality scanner for AI-generated software · https://repobility.com
_section_site_conditions function · python · L203-L241 (39 LOC)src/nlb/tools/report.py
def _section_site_conditions(site: dict) -> str:
lines = []
location = site.get("location", {})
if location:
parts = [location.get("city", ""), location.get("county", ""),
location.get("state", "")]
loc_str = ", ".join(p for p in parts if p)
if loc_str:
lines.append(f"- **Location:** {loc_str}")
coords = site.get("coordinates", {})
if coords:
lines.append(f"- **Coordinates:** {coords.get('lat', '')}, {coords.get('lon', '')}")
seismic = site.get("seismic", {})
if seismic:
sdc = seismic.get("sdc", "N/A")
sds = seismic.get("sds", "N/A")
sd1 = seismic.get("sd1", "N/A")
lines.append(f"- **Seismic Design Category:** {sdc} (SDS={sds}, SD1={sd1})")
wind = site.get("wind", {})
if wind:
lines.append(f"- **Wind Speed:** {wind.get('v_ult', 'N/A')} mph, Exposure {wind.get('exposure', 'C')}")
thermal = site.get("thermal", {})
if thermal:
lines.a_section_model_description function · python · L244-L254 (11 LOC)src/nlb/tools/report.py
def _section_model_description(model_info: dict) -> str:
lines = []
for key in ["num_nodes", "num_elements", "num_dofs", "element_types"]:
val = model_info.get(key)
if val is not None:
label = key.replace("_", " ").title()
lines.append(f"- **{label}:** {val}")
if not lines:
lines.append("*Model description data not provided.*")
return "\n".join(lines) + "\n"_section_loading function · python · L257-L271 (15 LOC)src/nlb/tools/report.py
def _section_loading(model_info: dict, red_team: RedTeamReport) -> str:
lines = []
lines.append(
f"- **Total Load Combinations:** {red_team.total_combinations} "
f"(standard + adversarial)"
)
lines.append(f"- **Total Load Cases:** {red_team.total_load_cases}")
load_info = model_info.get("loading", {})
if load_info:
for key, val in load_info.items():
label = key.replace("_", " ").title()
lines.append(f"- **{label}:** {val}")
return "\n".join(lines) + "\n"_section_analysis_results function · python · L274-L293 (20 LOC)src/nlb/tools/report.py
def _section_analysis_results(red_team: RedTeamReport) -> str:
lines = []
if red_team.findings:
# Find max DCR across all findings
dcr_findings = [f for f in red_team.findings if f.dcr is not None]
if dcr_findings:
max_dcr_finding = max(dcr_findings, key=lambda f: f.dcr)
lines.append(
f"- **Maximum DCR:** {max_dcr_finding.dcr:.3f} at "
f"{max_dcr_finding.location} ({max_dcr_finding.controlling_combo})"
)
n_over_1 = sum(1 for f in dcr_findings if (f.dcr or 0) > 1.0)
n_over_085 = sum(1 for f in dcr_findings if 0.85 < (f.dcr or 0) <= 1.0)
lines.append(f"- **Elements exceeding capacity (DCR > 1.0):** {n_over_1}")
lines.append(f"- **Elements near capacity (DCR > 0.85):** {n_over_085}")
else:
lines.append("*No analysis results available.*")
return "\n".join(lines) + "\n"_section_findings function · python · L296-L325 (30 LOC)src/nlb/tools/report.py
def _section_findings(red_team: RedTeamReport) -> str:
if not red_team.findings:
return "*No findings generated.*\n"
# Group by vector
by_vector: dict[str, list[Finding]] = {}
for f in red_team.findings:
by_vector.setdefault(f.vector, []).append(f)
lines = []
for vector, findings in by_vector.items():
lines.append(f"### {vector}")
lines.append("")
for f in findings:
emoji = _SEVERITY_EMOJI.get(f.severity, "")
lines.append(f"- {emoji} **{f.severity}**")
if f.element is not None:
lines.append(f" - Element: {f.element} ({f.location})")
elif f.location:
lines.append(f" - Location: {f.location}")
lines.append(f" - {f.description}")
if f.dcr is not None:
lines.append(f" - DCR: {f.dcr:.3f}")
lines.append(f" - Controlling: {f.controlling_combo}")
lines.append(f" - Recommendation: {_section_sensitivity function · python · L328-L353 (26 LOC)src/nlb/tools/report.py
def _section_sensitivity(red_team: RedTeamReport) -> str:
if not red_team.sensitivity_results:
return "*Sensitivity analysis not performed or no results available.*\n"
lines = []
# Summary table
lines.append("| Parameter | Base DCR | Low (-20%) | High (+20%) | ΔDCR | Class |")
lines.append("|-----------|----------|------------|-------------|------|-------|")
for sr in red_team.sensitivity_results:
lines.append(
f"| {sr.parameter} | {sr.base_dcr:.3f} | {sr.low_dcr:.3f} | "
f"{sr.high_dcr:.3f} | {sr.delta_dcr:.3f} | {sr.classification} |"
)
lines.append("")
# Tornado diagram SVG
if red_team.sensitivity_results:
lines.append("### Tornado Diagram")
lines.append("")
svg = generate_tornado_svg(red_team.sensitivity_results)
lines.append(svg)
lines.append("")
return "\n".join(lines)_section_cascade function · python · L356-L380 (25 LOC)src/nlb/tools/report.py
def _section_cascade(red_team: RedTeamReport) -> str:
if not red_team.cascade_chains:
return "*No failure cascade chains detected.*\n"
lines = []
for i, chain in enumerate(red_team.cascade_chains, 1):
collapse_flag = " ⚠️ **COLLAPSE**" if chain.causes_collapse else ""
lines.append(f"### Chain {i}: Element {chain.trigger_element}{collapse_flag}")
lines.append("")
if chain.description:
lines.append(f"{chain.description}")
if chain.chain:
lines.append("")
lines.append("Cascade sequence:")
for elem, dcr in chain.chain:
lines.append(f" → Element {elem} (DCR = {dcr:.2f})")
lines.append("")
# Cascade diagram SVG
lines.append("### Cascade Diagram")
lines.append("")
svg = generate_cascade_svg(red_team.cascade_chains)
lines.append(svg)
return "\n".join(lines)_section_history function · python · L383-L395 (13 LOC)src/nlb/tools/report.py
def _section_history(red_team: RedTeamReport) -> str:
if not red_team.history_matches:
return "*No historical precedent matches found.*\n"
lines = []
for match in red_team.history_matches:
lines.append(f"### {match.failure_name} ({match.year})")
lines.append(f"- **Similarity Score:** {match.score}")
lines.append(f"- **Matching Factors:** {', '.join(match.matching_factors)}")
lines.append(f"- **Key Lesson:** {match.lesson}")
lines.append("")
return "\n".join(lines)All rows above produced by Repobility · https://repobility.com
_section_robustness function · python · L398-L411 (14 LOC)src/nlb/tools/report.py
def _section_robustness(red_team: RedTeamReport) -> str:
robustness_findings = [
f for f in red_team.findings if f.vector == "Robustness Check"
]
if not robustness_findings:
return "*Robustness assessment not performed.*\n"
lines = []
for f in robustness_findings:
emoji = _SEVERITY_EMOJI.get(f.severity, "")
lines.append(f"- {emoji} **{f.location}:** {f.description}")
lines.append("")
return "\n".join(lines)_section_recommendations function · python · L414-L457 (44 LOC)src/nlb/tools/report.py
def _section_recommendations(red_team: RedTeamReport) -> str:
lines = []
# Collect unique recommendations
seen = set()
critical_recs = []
warning_recs = []
note_recs = []
for f in red_team.findings:
if f.recommendation not in seen:
seen.add(f.recommendation)
if f.severity == "CRITICAL":
critical_recs.append(f.recommendation)
elif f.severity == "WARNING":
warning_recs.append(f.recommendation)
else:
note_recs.append(f.recommendation)
if critical_recs:
lines.append("### Critical (Immediate Action Required)")
for i, rec in enumerate(critical_recs, 1):
lines.append(f"{i}. {rec}")
lines.append("")
if warning_recs:
lines.append("### Warnings (Address Before Final Design)")
for i, rec in enumerate(warning_recs, 1):
lines.append(f"{i}. {rec}")
lines.append("")
if note_recs:
_raw_export function · python · L464-L486 (23 LOC)src/nlb/tools/report.py
def _raw_export(
red_team: RedTeamReport,
model_info: dict,
site: dict,
) -> str:
"""Export all data as JSON."""
data = {
"report_type": "raw",
"risk_rating": red_team.risk_rating,
"summary": red_team.summary,
"model_info": model_info,
"site": site,
"findings": [asdict(f) for f in red_team.findings],
"cascade_chains": [asdict(c) for c in red_team.cascade_chains],
"sensitivity_results": [asdict(s) for s in red_team.sensitivity_results],
"history_matches": [asdict(m) for m in red_team.history_matches],
"robustness_results": red_team.robustness_results,
"attack_vectors_run": red_team.attack_vectors_run,
"total_load_cases": red_team.total_load_cases,
"total_combinations": red_team.total_combinations,
"analysis_time_sec": red_team.analysis_time_sec,
}
return json.dumps(data, indent=2)generate_tornado_svg function · python · L493-L608 (116 LOC)src/nlb/tools/report.py
def generate_tornado_svg(
sensitivity_results: list[SensitivityResult],
width: int = 600,
bar_height: int = 28,
margin: int = 40,
) -> str:
"""Generate SVG tornado diagram from sensitivity results.
Bars extend left (low) and right (high) from the base DCR centerline.
Args:
sensitivity_results: List of SensitivityResult objects.
width: SVG width in pixels.
bar_height: Height of each bar.
margin: Margin around the diagram.
Returns:
SVG string.
"""
if not sensitivity_results:
return '<svg xmlns="http://www.w3.org/2000/svg" width="100" height="30"><text x="10" y="20">No data</text></svg>'
n = len(sensitivity_results)
# Sort by delta_dcr descending for tornado effect
sorted_results = sorted(sensitivity_results, key=lambda s: s.delta_dcr, reverse=True)
label_width = 180
chart_width = width - label_width - margin * 2
height = margin * 2 + n *generate_dcr_heatmap_svg function · python · L611-L717 (107 LOC)src/nlb/tools/report.py
def generate_dcr_heatmap_svg(
elements: list[dict],
width: int = 700,
height: int = 200,
) -> str:
"""Generate SVG DCR heat map — bridge elevation colored by DCR.
Args:
elements: List of dicts with:
- ``element``: int
- ``x_start``: float — start position along bridge
- ``x_end``: float — end position
- ``dcr``: float
- ``location``: str
width: SVG width.
height: SVG height.
Returns:
SVG string.
"""
if not elements:
return '<svg xmlns="http://www.w3.org/2000/svg" width="100" height="30"><text x="10" y="20">No data</text></svg>'
margin = 40
bridge_y = height / 2
elem_height = 30
# Find span range
all_x = []
for e in elements:
all_x.extend([e.get("x_start", 0), e.get("x_end", 0)])
x_min = min(all_x) if all_x else 0
x_max = max(all_x) if all_x else 100
x_range = x_max - x_min
if x_range < 0.001:
x_rgenerate_cascade_svg function · python · L720-L809 (90 LOC)src/nlb/tools/report.py
def generate_cascade_svg(
chains: list[CascadeChain],
width: int = 600,
) -> str:
"""Generate SVG cascade/flowchart diagram.
Shows trigger elements and their cascade chains as connected boxes.
Args:
chains: List of CascadeChain objects.
width: SVG width.
Returns:
SVG string.
"""
if not chains:
return '<svg xmlns="http://www.w3.org/2000/svg" width="200" height="30"><text x="10" y="20">No cascades</text></svg>'
box_w = 120
box_h = 40
gap_x = 40
gap_y = 60
margin = 20
# Calculate height
max_chain_len = max(len(c.chain) for c in chains) if chains else 0
n_rows = len(chains)
height = margin * 2 + n_rows * (box_h + gap_y)
lines = []
lines.append(
f'<svg xmlns="http://www.w3.org/2000/svg" '
f'width="{width}" height="{height}" class="cascade-diagram">'
)
lines.append(
'<style>'
'.trigger { fill: #e74c3c; stroke: #c0392b; stroke-width: 2; rx:generate_risk_dashboard_svg function · python · L812-L899 (88 LOC)src/nlb/tools/report.py
def generate_risk_dashboard_svg(
red_team: RedTeamReport,
width: int = 500,
height: int = 250,
) -> str:
"""Generate SVG risk dashboard summary card.
Shows rating, finding counts, and key metrics in a compact card.
Args:
red_team: RedTeamReport object.
width: SVG width.
height: SVG height.
Returns:
SVG string.
"""
rating_colors = {
"GREEN": "#27ae60",
"YELLOW": "#f1c40f",
"RED": "#e74c3c",
}
bg_color = rating_colors.get(red_team.risk_rating, "#95a5a6")
n_critical = sum(1 for f in red_team.findings if f.severity == "CRITICAL")
n_warning = sum(1 for f in red_team.findings if f.severity == "WARNING")
n_note = sum(1 for f in red_team.findings if f.severity == "NOTE")
lines = []
lines.append(
f'<svg xmlns="http://www.w3.org/2000/svg" '
f'width="{width}" height="{height}" class="risk-dashboard">'
)
lines.append(
'<style>'
'.generate_report function · python · L906-L944 (39 LOC)src/nlb/tools/report.py
def generate_report(
red_team: RedTeamReport,
model_info: dict,
site: dict,
tier: str = "technical",
) -> str:
"""Generate a bridge red team report at the specified tier.
Args:
red_team: :class:`RedTeamReport` from :func:`~nlb.tools.red_team.run_red_team`.
model_info: Dict describing the bridge model:
- ``name``: str — bridge name
- ``type``: str — bridge type
- ``material``: str
- ``spans``: list
- ``num_nodes``, ``num_elements``, ``num_dofs``: int
- ``loading``: dict (optional)
site: Site profile dict from :meth:`SiteProfile.to_dict`.
tier: ``'executive'``, ``'technical'``, or ``'raw'``.
Returns:
Markdown string (executive/technical) or JSON string (raw).
Raises:
ValueError: If tier is not recognized.
"""
tier = tier.lower().strip()
if tier == "executive":
return _executive_report(red_team, model_Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
generate_wind_load_code function · python · L17-L81 (65 LOC)src/nlb/tools/secondary_loads.py
def generate_wind_load_code(
node_tags: Iterable[int],
girder_depth_in: float,
barrier_height_in: float = 42,
deck_elevation_ft: float = 30,
wind_speed_mph: float = 115,
span_length_ft: float = 100,
num_nodes: int = 100,
drag_coefficient: float = 1.3,
exposure_alpha: float = 9.5,
zg_ft: float = 900.0,
gust_factor: float = 1.0,
) -> str:
"""Return Python code string that applies wind forces to the OpenSees model.
The generated code computes the velocity pressure PZ per AASHTO:
PZ = 0.00256 * KZ * G * V^2 * CD (ksf)
where KZ is computed for Exposure C via the provided power-law approximation:
KZ = 2.01 * (z/zg)^(2/alpha)
The lateral force per node (kips) is taken as:
F_node = PZ (ksf) * exposed_height (ft) * tributary_length (ft)
The code produced creates a Plain pattern (tag 1001) and issues ops.load
calls for each nodeTag (X-direction lateral loads). It also populates a
results['wind_ongenerate_wind_live_load_code function · python · L84-L114 (31 LOC)src/nlb/tools/secondary_loads.py
def generate_wind_live_load_code(
node_tags: Iterable[int],
deck_above_ground_ft: float = 6.0,
wl_klf: float = 0.10,
span_length_ft: float = 100.0,
num_nodes: int = 100,
) -> str:
"""Generate code applying wind on live load (AASHTO 3.8.1.3).
WL is given in klf (kips per linear foot) at 6 ft above deck. We convert
to kips per node by multiplying by the tributary length.
"""
node_tags_list = list(node_tags)
if num_nodes <= 0:
num_nodes = max(1, len(node_tags_list))
trib_len_ft = float(span_length_ft) / float(num_nodes)
per_node_kip = wl_klf * trib_len_ft
code_lines: List[str] = []
code_lines.append("# Wind on live load (AASHTO 3.8.1.3) - generated by secondary_loads")
code_lines.append("pat_tag = 1002")
code_lines.append("ops.pattern('Plain', pat_tag, 1)")
code_lines.append(f"# inputs: wl_klf={wl_klf}, deck_above_ground_ft={deck_above_ground_ft}")
code_lines.append("results.setdefault('wind_on_live', {}generate_thermal_load_code function · python · L117-L176 (60 LOC)src/nlb/tools/secondary_loads.py
def generate_thermal_load_code(
node_tags: Iterable[int],
climate: str = "moderate",
bridge_length_ft: float = 0.0,
alpha: float = 6.5e-6,
fixed_node: int | None = None,
) -> str:
"""Return Python code string for a uniform temperature load case.
The code computes the thermal strains for rise and fall from a setting
temperature of 60 F and stores them in results['thermal']. If a fixed_node
is provided the code will create a placeholder concentrated axial force at
that node proportional to the strain and a nominal sectional area/E.
NOTE: This generator does not attempt to know section area or stiffness in
the model. It computes strains (dimensionless) which can be used by the
caller to create proper constrained-expansion forces when more model
information is available.
"""
climate_lower = climate.lower() if climate else "moderate"
if climate_lower == "moderate":
Tmax = 80.0
Tmin = 0.0
elif climate_lgenerate_braking_force_code function · python · L179-L229 (51 LOC)src/nlb/tools/secondary_loads.py
def generate_braking_force_code(
node_tags: Iterable[int],
num_lanes: int = 2,
bridge_length_ft: float = 100.0,
num_nodes: int | None = None,
) -> str:
"""Return Python code string applying braking forces per AASHTO 3.6.4.
The implementation uses the common AASHTO design axle loads:
- design truck total axle = 72 kip
- design tandem total axle = 50 kip
The braking force BR is the maximum of:
- 25% of truck axles = 0.25 * 72
- 25% of tandem = 0.25 * 50
- 5% of (truck + lane load)
- 5% of (tandem + lane load)
The generator distributes the resulting force among the provided nodes.
"""
node_tags_list = list(node_tags)
if num_nodes is None or num_nodes <= 0:
num_nodes = max(1, len(node_tags_list))
truck = 72.0
tandem = 50.0
truck_brake = 0.25 * truck
tandem_brake = 0.25 * tandem
# lane load assumed to be zero (unknown) so the 5% terms reduce to 5%*truck/tandem
five_pct_truck__seismic_design_category function · python · L73-L89 (17 LOC)src/nlb/tools/site_recon.py
def _seismic_design_category(sd1: float) -> str:
"""Determine AASHTO Seismic Design Category from SD1.
SDC drives the level of seismic detailing and analysis required:
A — minimal seismic design (SD1 < 0.15)
B — moderate design, no ductile detailing (0.15 ≤ SD1 < 0.30)
C — ductile substructure required (0.30 ≤ SD1 < 0.50)
D — full displacement-based seismic design (SD1 ≥ 0.50)
"""
if sd1 < 0.15:
return "A"
elif sd1 < 0.30:
return "B"
elif sd1 < 0.50:
return "C"
else:
return "D"SeismicProfile class · python · L255-L271 (17 LOC)src/nlb/tools/site_recon.py
class SeismicProfile:
"""AASHTO seismic hazard parameters at a site.
All spectral accelerations in units of g (gravitational acceleration).
Per AASHTO §3.10: SMS = Fa*Ss, SM1 = Fv*S1, SDS = (2/3)*SMS, SD1 = (2/3)*SM1.
"""
pga: float # Peak ground acceleration (g)
ss: float # 0.2-s spectral acceleration, Site Class B rock (g)
s1: float # 1.0-s spectral acceleration, Site Class B rock (g)
fa: float # Short-period site coefficient (AASHTO Table 3.10.3.2-1)
fv: float # Long-period site coefficient (AASHTO Table 3.10.3.2-2)
sms: float # Fa * Ss — MCER spectral accel at short period (g)
sm1: float # Fv * S1 — MCER spectral accel at 1.0 s (g)
sds: float # (2/3) * SMS — design spectral accel short period (g)
sd1: float # (2/3) * SM1 — design spectral accel 1.0 s (g)
site_class: str # AASHTO site class A–F (default D if unknown)
sdc: str WindProfile class · python · L275-L282 (8 LOC)src/nlb/tools/site_recon.py
class WindProfile:
"""ASCE 7-22 wind design parameters.
V_ult is the 3-second gust wind speed (mph) at 33 ft above grade for
Risk Category II structures (bridges per AASHTO Table 3.8.1.1.2-1).
"""
v_ult: int # mph — basic wind speed (3-s gust, RC-II)
exposure: str # ASCE 7 Exposure Category: B, C, or DThermalProfile class · python · L286-L294 (9 LOC)src/nlb/tools/site_recon.py
class ThermalProfile:
"""AASHTO temperature design range for bridge expansion/contraction.
Values per AASHTO §3.12.2 (steel bridges). Used to size expansion joints,
select bearing displacement capacity, and check abutment seat widths.
"""
t_min: float # °F — minimum design temperature
t_max: float # °F — maximum design temperature
delta_t: float # °F — total thermal range (t_max − t_min)Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
ScourProfile class · python · L298-L310 (13 LOC)src/nlb/tools/site_recon.py
class ScourProfile:
"""Hydraulic / scour design flags per AASHTO §2.6.4 and HEC-18.
If water_crossing = True:
design_flood = Q100 (100-year return period) — primary design flood
check_flood = Q500 (500-year return period) — extreme-event check
Scour depths must be computed by the foundation tool using HEC-18
methods with site-specific hydraulic data.
"""
water_crossing: bool
design_flood: Optional[str] # "Q100" or None
check_flood: Optional[str] # "Q500" or None
keywords_matched: list[str] # for transparency in reportsSoilProfile class · python · L314-L323 (10 LOC)src/nlb/tools/site_recon.py
class SoilProfile:
"""Soil characterization at the site.
Site class is determined from AASHTO Table 3.10.3.1-1.
When no geotechnical data is available, use Site Class D per
AASHTO Commentary C3.10.3.1 — a conservative assumption for most
US sites that avoids underestimating amplification.
"""
site_class: str # A–F per AASHTO Table 3.10.3.1-1
description: str # Human-readable description of site classSiteProfile class · python · L327-L352 (26 LOC)src/nlb/tools/site_recon.py
class SiteProfile:
"""Complete site environmental profile consumed by all other NLB tools.
This is the canonical output of :func:`run_site_recon`. All fields are
populated — never None — so downstream tools can use them without guards.
Missing data is filled with conservative defaults (documented inline).
Serialize with :meth:`to_dict` for JSON transport over MCP.
"""
coordinates: dict # {"lat": float, "lon": float}
location: dict # {"state": str, "county": str, "city": str}
seismic: SeismicProfile
wind: WindProfile
thermal: ThermalProfile
scour: ScourProfile
frost_depth_ft: float # ft — minimum foundation depth (below grade)
soil: SoilProfile
climate_zone: str # "hot" | "mixed" | "cold" | "very_cold"
data_sources: dict = field(default_factory=dict) # which APIs responded
warnings: list[str] = field(default_factory=list) # fallback notices
def to_dict(self) -> dict:
to_dict method · python · L348-L352 (5 LOC)src/nlb/tools/site_recon.py
def to_dict(self) -> dict:
"""Serialize to a plain dict suitable for JSON / MCP transport."""
d = asdict(self)
# Flatten nested dataclasses that asdict already handles; just clean up.
return dfetch_seismic_hazard function · python · L359-L444 (86 LOC)src/nlb/tools/site_recon.py
def fetch_seismic_hazard(
lat: float,
lon: float,
site_class: str = "D",
) -> tuple[SeismicProfile, bool]:
"""Fetch AASHTO seismic hazard parameters from the USGS Design Maps API.
Uses the AASHTO 2009 reference document endpoint. The API returns:
PGA, Ss, S1, Fa, Fv, SMS, SM1, SDS, SD1 for the given coordinates and
site class.
Engineering decision: We always query for the user-specified site_class
(default D). If the API returns site coefficients that differ from our
input (which can happen for extreme site classes), we trust the API.
Args:
lat: Latitude (WGS-84, decimal degrees).
lon: Longitude (WGS-84, decimal degrees).
site_class: AASHTO site class A–F (default "D").
Returns:
Tuple of (SeismicProfile, success_flag).
On failure, returns (_SEISMIC_FALLBACK as SeismicProfile, False).
API reference:
https://earthquake.usgs.gov/ws/designmaps/aashto-2009.json
"""
_build_seismic_profile function · python · L447-L462 (16 LOC)src/nlb/tools/site_recon.py
def _build_seismic_profile(d: dict, site_class: str) -> SeismicProfile:
"""Construct a SeismicProfile from a flat dict (e.g. the fallback dict)."""
sd1 = d.get("sd1", _SEISMIC_FALLBACK["sd1"])
return SeismicProfile(
pga=d.get("pga", _SEISMIC_FALLBACK["pga"]),
ss=d.get("ss", _SEISMIC_FALLBACK["ss"]),
s1=d.get("s1", _SEISMIC_FALLBACK["s1"]),
fa=d.get("fa", _SEISMIC_FALLBACK["fa"]),
fv=d.get("fv", _SEISMIC_FALLBACK["fv"]),
sms=d.get("sms", _SEISMIC_FALLBACK["sms"]),
sm1=d.get("sm1", _SEISMIC_FALLBACK["sm1"]),
sds=d.get("sds", _SEISMIC_FALLBACK["sds"]),
sd1=sd1,
site_class=site_class,
sdc=_seismic_design_category(sd1),
)fetch_thermal_range function · python · L469-L510 (42 LOC)src/nlb/tools/site_recon.py
def fetch_thermal_range(
lat: float,
lon: float,
state: str = "",
noaa_token: Optional[str] = None,
) -> tuple[ThermalProfile, bool]:
"""Fetch temperature design range from NOAA NCEI Climate Data Online API.
Finds the nearest weather station with Annual Climate Normal data (1991–2020
normals, dataset NORMAL_ANN) and extracts:
- MLY-TMIN-NORMAL → approximate design minimum temperature (°F)
- MLY-TMAX-NORMAL → approximate design maximum temperature (°F)
Engineering note: NOAA climate normals are 30-year averages, NOT design
extremes. AASHTO §3.12.2 requires using the 50-year extreme temperature,
which is roughly normal ± 20–25°F. We apply a ±22°F adjustment to convert
normals to approximate design extremes.
Fallback hierarchy:
1. NOAA NCEI API (requires token, may be slow)
2. State-level climate zone lookup table
Args:
lat: Latitude.
lon: Longitude.
state: 2-let_fetch_noaa_normals function · python · L513-L578 (66 LOC)src/nlb/tools/site_recon.py
def _fetch_noaa_normals(lat: float, lon: float, token: str) -> Optional[ThermalProfile]:
"""Internal: fetch climate normals from NOAA NCEI CDO API."""
# Step 1: find nearest station with NORMAL_ANN data
stations_url = "https://www.ncei.noaa.gov/cdo-web/api/v2/stations"
headers = {"token": token}
station_params = {
"datasetid": "NORMAL_ANN",
"datatypeid": "MLY-TMIN-NORMAL,MLY-TMAX-NORMAL",
"units": "standard", # Fahrenheit
"extent": f"{lat-1.0},{lon-1.0},{lat+1.0},{lon+1.0}",
"limit": 5,
"sortfield": "name",
}
resp = requests.get(
stations_url, params=station_params, headers=headers, timeout=_API_TIMEOUT
)
resp.raise_for_status()
station_data = resp.json()
results = station_data.get("results", [])
if not results:
return None
station_id = results[0]["id"]
# Step 2: fetch annual normal data for that station
data_url = "https://www.ncei.noaa.gov/cdo-web/api/Repobility — the code-quality scanner for AI-generated software · https://repobility.com
_thermal_from_state function · python · L581-L585 (5 LOC)src/nlb/tools/site_recon.py
def _thermal_from_state(state: str) -> ThermalProfile:
"""Return design thermal profile from state→climate zone lookup."""
zone = CLIMATE_ZONE_BY_STATE.get(state, "cold")
values = _THERMAL_BY_CLIMATE_ZONE.get(zone, _THERMAL_DEFAULT)
return ThermalProfile(**values)get_wind_speed function · python · L592-L616 (25 LOC)src/nlb/tools/site_recon.py
def get_wind_speed(
state: str,
exposure: str = _DEFAULT_EXPOSURE,
) -> WindProfile:
"""Return ASCE 7-22 basic wind speed for the given state.
Uses a conservative state-wide lookup table. For coastal projects,
county-level values should be obtained from the ASCE 7 Hazard Tool.
Exposure category logic (ASCE 7 §26.7):
B — Urban/suburban, forest (≥ 1500 ft fetch)
C — Open terrain, scattered obstructions (default for most bridges)
D — Flat, open water or mud flats, shoreline exposed to hurricanes
The caller may override the default Exposure C.
Args:
state: 2-letter state abbreviation.
exposure: ASCE 7 Exposure Category (B, C, or D).
Returns:
WindProfile with V_ult and exposure category.
"""
v_ult = _WIND_SPEED_BY_STATE.get(state, _WIND_SPEED_DEFAULT)
return WindProfile(v_ult=v_ult, exposure=exposure)get_frost_depth function · python · L623-L636 (14 LOC)src/nlb/tools/site_recon.py
def get_frost_depth(state: str) -> float:
"""Return minimum foundation depth below grade due to frost (feet).
Values are based on AASHTO §10.6.1.2 and regional DOT bridge standards.
The returned value is the *minimum* — actual embedment must also satisfy
bearing capacity, scour, and development length requirements.
Args:
state: 2-letter state abbreviation.
Returns:
Frost depth in feet (US customary).
"""
return _FROST_DEPTH_BY_STATE.get(state, _FROST_DEPTH_DEFAULT)