Function bodies 106 total
build_chart function · python · L110-L293 (184 LOC)HMM/app.py
def build_chart(
df: pd.DataFrame,
equity_df: pd.DataFrame,
trades_df: pd.DataFrame,
show_ema: bool,
show_trades: bool,
lookback_days: int,
) -> go.Figure:
"""
Build a 3-panel interactive Plotly chart:
Row 1 – Candlestick with regime background + EMAs + trade markers
Row 2 – Equity curve
Row 3 – Volume bars
"""
# --- Slice to lookback window ---
cutoff = df.index[-1] - pd.Timedelta(days=lookback_days)
view_df = df[df.index >= cutoff]
view_eq = equity_df[equity_df.index >= cutoff]
fig = make_subplots(
rows=3, cols=1,
shared_xaxes=True,
vertical_spacing=0.025,
row_heights=[0.60, 0.22, 0.18],
subplot_titles=("BTC/USD Price & Market Regimes", "Strategy Equity ($)", "Volume"),
)
# ── Regime backgrounds (group consecutive identical regimes) ──────────
regime_change_id = view_df["Regime"].ne(view_df["Regime"].shift()).cumsum()
for _, seg in view_df.groupby(regimbuild_regime_pie function · python · L300-L319 (20 LOC)HMM/app.py
def build_regime_pie(df: pd.DataFrame) -> go.Figure:
counts = df["Regime"].value_counts()
colors = [REGIME_LINE.get(r, "#888888") for r in counts.index]
fig = go.Figure(go.Pie(
labels=counts.index,
values=counts.values,
hole=0.45,
marker=dict(colors=colors, line=dict(color="#0d1117", width=2)),
textinfo="label+percent",
textfont=dict(size=12),
))
fig.update_layout(
template="plotly_dark",
paper_bgcolor="#0d1117",
height=300,
margin=dict(l=0, r=0, t=10, b=0),
showlegend=False,
)
return figmain function · python · L326-L512 (187 LOC)HMM/app.py
def main() -> None:
# ── Sidebar ──────────────────────────────────────────────────────────
with st.sidebar:
st.markdown("## ⚙️ Controls")
lookback_days = st.slider(
"Chart lookback (days)", min_value=30, max_value=730, value=180, step=30
)
show_ema = st.toggle("Show EMAs (50 / 200)", value=True)
show_trades = st.toggle("Show Trade Markers", value=True)
st.divider()
st.markdown("### Strategy Parameters")
st.markdown(f"- **Leverage**: 2.5×")
st.markdown(f"- **Cooldown**: 48 h after exit")
st.markdown(f"- **Min confirmations**: {CONFIRM_NEEDED}/8")
st.markdown(f"- **HMM States**: 7")
st.divider()
force_reload = st.button("🔄 Reload Data", use_container_width=True)
if force_reload:
st.cache_data.clear()
# ── Load pipeline ─────────────────────────────────────────────────────
with st.spinner("Loading data and running backtest … (first run ~3HMMEngine class · python · L64-L134 (71 LOC)HMM/backtester.py
class HMMEngine:
n_states: int = N_STATES
model: Optional[hmm.GaussianHMM] = field(default=None, repr=False)
scaler: Optional[StandardScaler] = field(default=None, repr=False)
bull_state: int = -1
bear_state: int = -1
regime_labels: dict[int, str] = field(default_factory=dict)
def fit(self, df: pd.DataFrame) -> "HMMEngine":
"""Train the HMM on Returns, Range, VolVol features."""
X = df[["Returns", "Range", "VolVol"]].values
self.scaler = StandardScaler()
X_scaled = self.scaler.fit_transform(X)
self.model = hmm.GaussianHMM(
n_components=self.n_states,
covariance_type="full",
n_iter=500,
tol=1e-5,
random_state=42,
)
self.model.fit(X_scaled)
self._identify_states()
return self
def _identify_states(self) -> None:
"""Map each HMM state to a human-readable regime label."""
# mean Returns for each state (fit method · python · L72-L89 (18 LOC)HMM/backtester.py
def fit(self, df: pd.DataFrame) -> "HMMEngine":
"""Train the HMM on Returns, Range, VolVol features."""
X = df[["Returns", "Range", "VolVol"]].values
self.scaler = StandardScaler()
X_scaled = self.scaler.fit_transform(X)
self.model = hmm.GaussianHMM(
n_components=self.n_states,
covariance_type="full",
n_iter=500,
tol=1e-5,
random_state=42,
)
self.model.fit(X_scaled)
self._identify_states()
return self_identify_states method · python · L91-L113 (23 LOC)HMM/backtester.py
def _identify_states(self) -> None:
"""Map each HMM state to a human-readable regime label."""
# mean Returns for each state (unscale only the first feature)
scaled_means = self.model.means_[:, 0]
returns_std = self.scaler.scale_[0]
returns_mean = self.scaler.mean_[0]
raw_means = scaled_means * returns_std + returns_mean
self.bull_state = int(np.argmax(raw_means))
self.bear_state = int(np.argmin(raw_means))
self.regime_labels = {}
for i in range(self.n_states):
if i == self.bull_state:
self.regime_labels[i] = REGIME_BULL
elif i == self.bear_state:
self.regime_labels[i] = REGIME_BEAR
elif raw_means[i] > 0:
self.regime_labels[i] = REGIME_BULL2
elif raw_means[i] < -1e-4:
self.regime_labels[i] = REGIME_BEAR2
else:
self.regime_labels[i] = REGIME_NEUTpredict method · python · L115-L119 (5 LOC)HMM/backtester.py
def predict(self, df: pd.DataFrame) -> np.ndarray:
"""Return predicted state sequence for each row in *df*."""
X = df[["Returns", "Range", "VolVol"]].values
X_scaled = self.scaler.transform(X)
return self.model.predict(X_scaled)Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
state_summary method · python · L121-L134 (14 LOC)HMM/backtester.py
def state_summary(self) -> pd.DataFrame:
"""Return a summary table of each state's mean features."""
means_scaled = self.model.means_
means_raw = self.scaler.inverse_transform(means_scaled)
rows = []
for i in range(self.n_states):
rows.append({
"State": i,
"Label": self.regime_labels[i],
"Mean Return": f"{means_raw[i, 0] * 100:.4f}%",
"Mean Range": f"{means_raw[i, 1] * 100:.4f}%",
"Mean VolVol": f"{means_raw[i, 2]:.4f}",
})
return pd.DataFrame(rows).set_index("State")_confirmations function · python · L153-L168 (16 LOC)HMM/backtester.py
def _confirmations(row: pd.Series) -> tuple[int, list[bool]]:
"""
Evaluate the 8 confirmation signals for a single bar.
Returns (count_of_True, list_of_bool).
"""
signals = [
row["RSI"] < 90,
row["Momentum"] > 1.0,
row["Volatility"] < 6.0,
row["Volume"] > row["VolSMA20"],
row["ADX"] > 25.0,
row["Close"] > row["EMA50"],
row["Close"] > row["EMA200"],
row["MACD"] > row["MACD_Signal"],
]
return sum(signals), signalsrun_backtest function · python · L175-L287 (113 LOC)HMM/backtester.py
def run_backtest(
df: pd.DataFrame,
engine: HMMEngine,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Simulate the strategy bar-by-bar.
Parameters
----------
df : fully featured DataFrame (from compute_indicators)
engine : fitted HMMEngine
Returns
-------
equity_df : hourly equity curve with regime / position columns
trades_df : log of every completed trade
"""
hidden_states = engine.predict(df)
df = df.copy()
df["HMM_State"] = hidden_states
df["Regime"] = [engine.regime_labels[s] for s in hidden_states]
capital = INITIAL_CAPITAL
position = None # None or dict with entry info
cooldown_end = pd.Timestamp.min.tz_localize(df.index.tz)
equity_rows: list[dict] = []
trade_rows: list[dict] = []
for ts, row in df.iterrows():
regime = row["Regime"]
conf_cnt, _ = _confirmations(row)
in_cooldown = ts < cooldown_end
# ── EXIT ──────────compute_metrics function · python · L294-L331 (38 LOC)HMM/backtester.py
def compute_metrics(
equity_df: pd.DataFrame,
trades_df: pd.DataFrame,
df: pd.DataFrame,
initial_capital: float = INITIAL_CAPITAL,
) -> dict:
"""Compute Total Return, Alpha, Win Rate, and Max Drawdown."""
final_equity = equity_df["equity"].iloc[-1]
total_return = (final_equity - initial_capital) / initial_capital * 100.0
bh_return = (df["Close"].iloc[-1] - df["Close"].iloc[0]) / df["Close"].iloc[0] * 100.0
alpha = total_return - bh_return
if len(trades_df) > 0:
wins = (trades_df["pnl_dollar"] > 0).sum()
win_rate = wins / len(trades_df) * 100.0
avg_win = trades_df.loc[trades_df["pnl_dollar"] > 0, "pnl_dollar"].mean()
avg_loss = trades_df.loc[trades_df["pnl_dollar"] <= 0, "pnl_dollar"].mean()
else:
win_rate = 0.0
avg_win = 0.0
avg_loss = 0.0
rolling_max = equity_df["equity"].cummax()
drawdown = (equity_df["equity"] - rolling_max) / rolling_marun_full_pipeline function · python · L338-L377 (40 LOC)HMM/backtester.py
def run_full_pipeline() -> tuple:
"""
Orchestrates the full workflow:
1. Fetch + compute indicators
2. Train HMM
3. Backtest
4. Compute metrics
Returns
-------
df, equity_df, trades_df, metrics, engine
"""
print(" [1/4] Fetching BTC-USD hourly data …")
df_raw = fetch_data()
print(" [2/4] Computing technical indicators …")
df = compute_indicators(df_raw)
print(" [3/4] Training 7-state GaussianHMM …")
engine = HMMEngine(n_states=N_STATES)
engine.fit(df)
print(f" Bull Run state → {engine.bull_state} | "
f"Bear/Crash state → {engine.bear_state}")
print(engine.state_summary().to_string())
# Attach regime columns to df
states = engine.predict(df)
df["HMM_State"] = states
df["Regime"] = [engine.regime_labels[s] for s in states]
print(" [4/4] Running backtest …")
equity_df, trades_df = run_backtest(df, engine)
metrics = compute_metrics(equity_df, tfetch_data function · python · L19-L41 (23 LOC)HMM/data_loader.py
def fetch_data(ticker: str = "BTC-USD", period_days: int = 730) -> pd.DataFrame:
"""
Download hourly OHLCV data for *ticker* spanning *period_days* days.
yfinance supports up to 730 days at the 1h interval.
Returns a clean DataFrame with columns: Open, High, Low, Close, Volume.
"""
df = yf.download(
ticker,
period=f"{period_days}d",
interval="1h",
progress=False,
auto_adjust=True,
)
# Flatten multi-level columns produced by newer yfinance versions
if isinstance(df.columns, pd.MultiIndex):
df.columns = df.columns.droplevel(1)
df = df[["Open", "High", "Low", "Close", "Volume"]].copy()
df.dropna(inplace=True)
df.sort_index(inplace=True)
return df_rsi function · python · L48-L55 (8 LOC)HMM/data_loader.py
def _rsi(series: pd.Series, period: int = 14) -> pd.Series:
delta = series.diff()
gain = delta.clip(lower=0)
loss = -delta.clip(upper=0)
avg_gain = gain.ewm(com=period - 1, min_periods=period, adjust=False).mean()
avg_loss = loss.ewm(com=period - 1, min_periods=period, adjust=False).mean()
rs = avg_gain / (avg_loss + 1e-10)
return 100.0 - (100.0 / (1.0 + rs))_adx function · python · L62-L91 (30 LOC)HMM/data_loader.py
def _adx(df: pd.DataFrame, period: int = 14) -> pd.Series:
high = df["High"]
low = df["Low"]
close = df["Close"]
# True Range
tr = pd.concat(
[high - low,
(high - close.shift(1)).abs(),
(low - close.shift(1)).abs()],
axis=1,
).max(axis=1)
# Raw Directional Movement
up_move = high - high.shift(1)
down_move = low.shift(1) - low
pos_dm = np.where((up_move > down_move) & (up_move > 0), up_move, 0.0)
neg_dm = np.where((down_move > up_move) & (down_move > 0), down_move, 0.0)
pos_dm = pd.Series(pos_dm, index=df.index)
neg_dm = pd.Series(neg_dm, index=df.index)
alpha = 1.0 / period
atr = tr.ewm(alpha=alpha, min_periods=period, adjust=False).mean()
pos_di = 100.0 * pos_dm.ewm(alpha=alpha, min_periods=period, adjust=False).mean() / (atr + 1e-10)
neg_di = 100.0 * neg_dm.ewm(alpha=alpha, min_periods=period, adjust=False).mean() / (atr + 1e-10)
dx = 100.0 * (pos_di - neg_di).abs(Repobility · code-quality intelligence · https://repobility.com
compute_indicators function · python · L98-L145 (48 LOC)HMM/data_loader.py
def compute_indicators(df: pd.DataFrame) -> pd.DataFrame:
"""
Compute all technical indicators and HMM feature columns.
HMM Features
------------
Returns : 1-period log return
Range : (High - Low) / Close — normalised intrabar range
VolVol : rolling 20-bar coefficient-of-variation of Volume
Strategy Signals
----------------
RSI : 14-period RSI
Momentum : 20-bar price change as %
Volatility : 20-bar rolling std of Returns × √24 (≈ daily vol %)
VolSMA20 : 20-bar simple moving average of Volume
ADX : 14-period Average Directional Index
EMA50 / EMA200: Exponential Moving Averages
MACD : EMA12 − EMA26
MACD_Signal : 9-bar EMA of MACD
"""
out = df.copy()
# --- HMM features ---
out["Returns"] = np.log(out["Close"] / out["Close"].shift(1))
out["Range"] = (out["High"] - out["Low"]) / (out["Close"] + 1e-10)
vol_mean = out["Volume"].rolling(2assess_ai_sensitivity function · python · L121-L190 (70 LOC)Red Flag Engine/src/ai_sensitivity.py
def assess_ai_sensitivity(
company: str,
claims: list[Claim],
client: Any,
) -> str:
"""Call Claude to produce an AI-announcement sensitivity section.
Args:
company: Company ticker (e.g. "BA", "TSLA", "NFLX").
claims: Claims extracted from the current-quarter transcript.
client: An instantiated ``anthropic.Anthropic()`` client.
Returns:
Markdown string for the section (begins with ``## AI Announcement
Sensitivity``), or a short fallback placeholder if the API call fails.
"""
# Build a compact claim digest (category + polarity + statement).
# Deduplicate on the first 80 chars of the claim text; cap at 50 entries
# to stay comfortably within the model's context window.
if claims:
seen: set[str] = set()
lines: list[str] = []
for c in claims:
key = c.claim[:80]
if key not in seen:
seen.add(key)
lines.append(
PostEarningsReturns class · python · L37-L46 (10 LOC)Red Flag Engine/src/backtest.py
class PostEarningsReturns(BaseModel):
"""Post-earnings price returns for a single report / earnings call."""
model_config = ConfigDict(extra="forbid")
ticker: str
call_date: str # ISO 8601 date of the earnings call
ret_1d: Optional[float] = None # 1-trading-day return
ret_5d: Optional[float] = None # 5-trading-day return
ret_20d: Optional[float] = None # 20-trading-day return_load_earnings_dates function · python · L53-L65 (13 LOC)Red Flag Engine/src/backtest.py
def _load_earnings_dates(path: Path) -> dict[str, str]:
"""Load earnings_dates.json; return empty dict on any error."""
if not path.exists():
logger.debug("earnings_dates.json not found at %s", path)
return {}
try:
with path.open(encoding="utf-8") as fh:
data = json.load(fh)
# Drop the _comment key if present
return {k: v for k, v in data.items() if not k.startswith("_")}
except Exception as exc:
logger.warning("Failed to load earnings_dates.json: %s", exc)
return {}_ret function · python · L68-L72 (5 LOC)Red Flag Engine/src/backtest.py
def _ret(close_series, t: int, baseline: float) -> Optional[float]:
"""Compute return at index *t* relative to *baseline*. Returns None if out of range."""
if t < len(close_series) and baseline != 0:
return round((close_series.iloc[t] - baseline) / baseline, 4)
return Nonecompute_post_earnings_returns function · python · L79-L159 (81 LOC)Red Flag Engine/src/backtest.py
def compute_post_earnings_returns(
report_filename: str,
earnings_dates_path: Path,
) -> Optional[PostEarningsReturns]:
"""Compute post-earnings price returns for one report.
Looks up the earnings call date in earnings_dates.json using the report
filename stem (without ``.md``), then fetches price history via yfinance
and computes 1d / 5d / 20d returns.
Args:
report_filename: E.g. ``"BA_2025Q4_vs_2025Q3.md"``.
earnings_dates_path: Path to earnings_dates.json.
Returns:
:class:`PostEarningsReturns` on success, or ``None`` on any error
(missing date entry, network failure, import error, etc.).
"""
# ── Lazy import of yfinance ───────────────────────────────────────────
try:
import yfinance as yf
except ImportError:
logger.warning("yfinance not installed — backtest unavailable. Run: pip install yfinance")
return None
# ── Look up earnings date ────────────────────────────load_backtest_summary function · python · L162-L206 (45 LOC)Red Flag Engine/src/backtest.py
def load_backtest_summary(
outputs_dir: Path,
earnings_dates_path: Path,
) -> "pd.DataFrame":
"""Build an aggregate backtest DataFrame from all reports in outputs_dir.
Args:
outputs_dir: Directory containing generated .md reports.
earnings_dates_path: Path to earnings_dates.json.
Returns:
pandas DataFrame with columns: Report, Ticker, Date, 1d%, 5d%, 20d%.
Returns an empty DataFrame when no data is available (no yfinance,
no date entries, or network failure).
"""
try:
import pandas as pd
except ImportError:
logger.warning("pandas not available — cannot build backtest summary")
import types
return types.SimpleNamespace(empty=True) # type: ignore[return-value]
rows: list[dict] = []
for md_path in sorted(outputs_dir.glob("*.md")):
result = compute_post_earnings_returns(md_path.name, earnings_dates_path)
if result is None:
continueChangeType class · python · L56-L60 (5 LOC)Red Flag Engine/src/diff.py
class ChangeType(str, Enum):
new = "new"
worsened = "worsened"
improved = "improved"
unchanged = "unchanged"Repobility · MCP-ready · https://repobility.com
Change class · python · L63-L82 (20 LOC)Red Flag Engine/src/diff.py
class Change(BaseModel):
model_config = ConfigDict(extra="forbid")
category: Category
change_type: ChangeType
severity: int # 1–5
confidence: Confidence
summary: str
# Current quarter
now_claim: str
now_evidence: str
now_chunk_id: str
now_speaker_role: str = "unknown" # propagated from Claim.speaker_role
# Prior quarter (None for new claims)
prev_claim: Optional[str] = None
prev_evidence: Optional[str] = None
prev_chunk_id: Optional[str] = None
# Similarity metadata
match_score: float = 0.0
match_quality: str = "strict" # "strict" | "soft"_assign_severity function · python · L89-L139 (51 LOC)Red Flag Engine/src/diff.py
def _assign_severity(
change_type: ChangeType,
category: Category,
polarity_now: Polarity,
confidence: Confidence,
) -> int:
"""Return a severity score in [1, 5] applying all rules in order.
Base rules:
new → 3 (+1 if high-risk category, +1 if negative polarity)
worsened → 4 (+1 if high-risk category)
improved → 1 (+1 if high confidence)
unchanged → 2
Post-base adjustments:
low confidence → cap at 3
high confidence + worsened + bump-category → +1 (cap 5)
"""
# ── Base ──────────────────────────────────────────────────────────────
if change_type == ChangeType.new:
base = 3
if category in _HIGH_RISK_CATEGORIES:
base += 1
if polarity_now == Polarity.negative:
base += 1
elif change_type == ChangeType.worsened:
base = 4
if category in _HIGH_RISK_CATEGORIES:
base += 1
elif change_type == ChangeType.improved:
_determine_change_type function · python · L146-L157 (12 LOC)Red Flag Engine/src/diff.py
def _determine_change_type(
polarity_now: Polarity,
polarity_prev: Polarity,
) -> ChangeType:
"""Map a polarity transition to a ChangeType."""
now_val = POLARITY_ORDER[polarity_now]
prev_val = POLARITY_ORDER[polarity_prev]
if now_val < prev_val:
return ChangeType.worsened
if now_val > prev_val:
return ChangeType.improved
return ChangeType.unchanged_build_summary function · python · L160-L182 (23 LOC)Red Flag Engine/src/diff.py
def _build_summary(
change_type: ChangeType,
claim_now: Claim,
claim_prev: Optional[Claim],
) -> str:
"""Produce a one-line human-readable summary for a Change."""
cat = claim_now.category.value.replace("_", " ").title()
if change_type == ChangeType.new:
return f"[NEW] {cat}: {claim_now.claim}"
if change_type == ChangeType.worsened:
return (
f"[WORSENED] {cat}: sentiment shifted "
f"{claim_prev.polarity.value} → {claim_now.polarity.value}. "
f"{claim_now.claim}"
)
if change_type == ChangeType.improved:
return (
f"[IMPROVED] {cat}: sentiment shifted "
f"{claim_prev.polarity.value} → {claim_now.polarity.value}. "
f"{claim_now.claim}"
)
return f"[UNCHANGED] {cat}: {claim_now.claim}"match_claims function · python · L189-L305 (117 LOC)Red Flag Engine/src/diff.py
def match_claims(
now_claims: list[Claim],
prev_claims: list[Claim],
threshold: int = MATCH_THRESHOLD,
) -> list[Change]:
"""Compare two claim lists and return classified, ranked Changes.
Two-pass matching strategy
--------------------------
Pass 1 (strict): token_set_ratio on full claim text >= threshold.
Pass 2 (soft): For unmatched claims only — token_set_ratio on the
first SOFT_WINDOW chars, same category, >= SOFT_THRESHOLD.
Produces match_quality="soft".
Args:
now_claims: Claims from the current (newer) quarter.
prev_claims: Claims from the prior quarter. May be empty.
threshold: Strict match threshold (default 72).
Returns:
List of Change objects sorted by severity DESC, then confidence DESC.
"""
changes: list[Change] = []
for now in now_claims:
# ── Pass 1: strict full-text match ─────────────────────────────
best_score: flAbandonedMetric class · python · L312-L321 (10 LOC)Red Flag Engine/src/diff.py
class AbandonedMetric(BaseModel):
"""A claim category that was prominent last quarter but absent this quarter."""
model_config = ConfigDict(extra="forbid")
category: Category
representative_claim: str # most-confident prev claim text for this category
evidence: str # evidence quote from the representative claim
chunk_id: str # chunk_id of the representative claim
confidence: Confidence # high ≥3 prev claims, medium ≥2find_abandoned_metrics function · python · L324-L411 (88 LOC)Red Flag Engine/src/diff.py
def find_abandoned_metrics(
claims_now: list[Claim],
claims_prev: list[Claim],
threshold: int = MATCH_THRESHOLD,
) -> list[AbandonedMetric]:
"""Identify claim categories present last quarter but absent this quarter.
A category is "abandoned" when:
- It has ≥ 2 prior-quarter claims, AND
- It is not an all-positive, non-guidance category (these may drop off
naturally without being a signal), AND
- No current-quarter claim has token_set_ratio ≥ SOFT_THRESHOLD against
any prior-quarter claim in that category.
Args:
claims_now: Claims extracted from the current quarter.
claims_prev: Claims extracted from the prior quarter.
threshold: Ignored (kept for API symmetry with match_claims).
Returns:
List of :class:`AbandonedMetric` sorted by confidence DESC then
category name ASC. Empty list if claims_prev is empty.
"""
if not claims_prev:
return []
# ── Group prior HedgeDelta class · python · L67-L76 (10 LOC)Red Flag Engine/src/hedge_score.py
class HedgeDelta(BaseModel):
"""Comparison of hedge scores for one section between two quarters."""
model_config = ConfigDict(extra="forbid")
section: str
now_score: float # hedge matches per 100 words (current quarter)
prev_score: float # hedge matches per 100 words (prior quarter)
delta: float # now_score − prev_score (positive = more hedged now)
flag: bool # True when delta > _FLAG_DELTA_PPRepobility (the analyzer behind this table) · https://repobility.com
_strip_safe_harbour function · python · L83-L92 (10 LOC)Red Flag Engine/src/hedge_score.py
def _strip_safe_harbour(text: str) -> str:
"""Remove paragraphs that contain a Tier 3 safe-harbour phrase."""
paragraphs = re.split(r"\n{2,}", text)
clean: list[str] = []
for para in paragraphs:
para_lower = para.lower()
if any(phrase in para_lower for phrase in _TIER3_PHRASES):
continue
clean.append(para)
return "\n\n".join(clean)_count_matches function · python · L95-L109 (15 LOC)Red Flag Engine/src/hedge_score.py
def _count_matches(text: str) -> int:
"""Count total Tier 1 + Tier 2 hedge term matches in *text*."""
count = 0
# Single-word patterns (word-boundary regex)
for pat in _TIER1_WORD_RES:
count += len(pat.findall(text))
for pat in _TIER2_WORD_RES:
count += len(pat.findall(text))
# Multi-word phrases (simple case-insensitive substring)
text_lower = text.lower()
for phrase in _TIER1_PHRASES:
count += text_lower.count(phrase)
for phrase in _TIER2_PHRASES:
count += text_lower.count(phrase)
return countscore_hedging function · python · L116-L146 (31 LOC)Red Flag Engine/src/hedge_score.py
def score_hedging(chunks: "list[Chunk]") -> dict[str, float]:
"""Return a mapping of section label → hedge score (matches per 100 words).
Safe-harbour paragraphs are stripped before counting. Scores for chunks
with the same section label are averaged.
Args:
chunks: Segmented transcript chunks (current or prior quarter).
Returns:
Dict keyed by section label (e.g. ``"guidance"``, ``"demand"``).
Sections with zero total words after stripping are skipped.
"""
# Accumulate (total_matches, total_words) per section
section_data: dict[str, list[float]] = {}
for chunk in chunks:
text = _strip_safe_harbour(chunk.text)
words = text.split()
if not words:
continue
count = _count_matches(text)
score = count / len(words) * 100.0
section_data.setdefault(chunk.section, []).append(score)
result: dict[str, float] = {}
for section, scores in section_data.items():
rediff_hedge_scores function · python · L149-L188 (40 LOC)Red Flag Engine/src/hedge_score.py
def diff_hedge_scores(
now_scores: dict[str, float],
prev_scores: dict[str, float],
) -> list[HedgeDelta]:
"""Compare section-level hedge scores between two quarters.
For sections present only in one quarter, the missing quarter's score
is treated as 0.0 (new section or dropped section).
Args:
now_scores: Output of ``score_hedging()`` for the current quarter.
prev_scores: Output of ``score_hedging()`` for the prior quarter.
Returns:
List of :class:`HedgeDelta` sorted by ``|delta|`` descending.
"""
all_sections = set(now_scores) | set(prev_scores)
deltas: list[HedgeDelta] = []
for section in all_sections:
now_val = now_scores.get(section, 0.0)
prev_val = prev_scores.get(section, 0.0)
delta = round(now_val - prev_val, 2)
deltas.append(HedgeDelta(
section = section,
now_score = round(now_val, 2),
prev_score = round(prev_val, 2),
Doc class · python · L11-L14 (4 LOC)Red Flag Engine/src/ingest.py
class Doc:
company: str
period: str
text: strload_doc function · python · L17-L70 (54 LOC)Red Flag Engine/src/ingest.py
def load_doc(company: str, period: str, filepath: str | Path) -> Doc:
"""Load a transcript from a PDF or TXT file and return a Doc.
Args:
company: Company identifier (e.g. "AAPL").
period: Period label (e.g. "Q4_2024").
filepath: Absolute or relative path to a .pdf or .txt file.
Returns:
Doc with the full extracted text.
Raises:
FileNotFoundError: If the file does not exist.
ValueError: If the file extension is not .pdf or .txt.
RuntimeError: If PDF extraction yields no text.
"""
path = Path(filepath)
if not path.exists():
raise FileNotFoundError(f"Transcript file not found: {path}")
suffix = path.suffix.lower()
if suffix == ".txt":
text = path.read_text(encoding="utf-8", errors="ignore")
logger.debug("Loaded TXT %s (%d chars)", path.name, len(text))
return Doc(company=company, period=period, text=text)
if suffix == ".pdf":
try:
iCategory class · python · L19-L29 (11 LOC)Red Flag Engine/src/llm_extract.py
class Category(str, Enum):
guidance = "guidance"
demand = "demand"
pricing_margin = "pricing_margin"
liquidity = "liquidity"
reg_legal = "reg_legal"
competition = "competition"
costs_restructuring = "costs_restructuring"
ops_execution = "ops_execution"
accounting = "accounting"
other = "other"Polarity class · python · L32-L36 (5 LOC)Red Flag Engine/src/llm_extract.py
class Polarity(str, Enum):
positive = "positive"
negative = "negative"
neutral = "neutral"
mixed = "mixed"Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
Confidence class · python · L39-L42 (4 LOC)Red Flag Engine/src/llm_extract.py
class Confidence(str, Enum):
low = "low"
medium = "medium"
high = "high"Claim class · python · L49-L65 (17 LOC)Red Flag Engine/src/llm_extract.py
class Claim(BaseModel):
model_config = ConfigDict(extra="forbid")
category: Category
polarity: Polarity
claim: str
evidence: str
chunk_id: str
confidence: Confidence
speaker_role: str = "unknown" # set post-parse from Chunk; not in LLM schema
@field_validator("evidence")
@classmethod
def evidence_not_empty(cls, v: str) -> str:
if not v.strip():
raise ValueError("evidence must not be empty")
return vevidence_not_empty method · python · L62-L65 (4 LOC)Red Flag Engine/src/llm_extract.py
def evidence_not_empty(cls, v: str) -> str:
if not v.strip():
raise ValueError("evidence must not be empty")
return vClaimList class · python · L68-L71 (4 LOC)Red Flag Engine/src/llm_extract.py
class ClaimList(BaseModel):
model_config = ConfigDict(extra="forbid")
claims: list[Claim]_build_user_prompt function · python · L137-L143 (7 LOC)Red Flag Engine/src/llm_extract.py
def _build_user_prompt(chunk: Chunk) -> str:
return _USER_PROMPT_TEMPLATE.format(
chunk_id=chunk.chunk_id,
section=chunk.section,
speaker_role=chunk.speaker_role,
chunk_text=chunk.text,
)_filter_claims function · python · L150-L215 (66 LOC)Red Flag Engine/src/llm_extract.py
def _filter_claims(
claims: list[Claim],
chunk_id: str,
chunk_text: str,
) -> list[Claim]:
"""Apply post-LLM validation and return only fully-passing claims.
Checks (in order, all must pass):
1. Evidence is non-empty.
2. Evidence word count ≤ 25.
3. Evidence is a verbatim substring of the chunk text (exact match).
Claims failing any check are dropped and the reason is logged.
The returned list is capped at MAX_CLAIMS_PER_CHUNK.
"""
valid: list[Claim] = []
for claim in claims:
ev = claim.evidence.strip()
cl = claim.claim.strip()
# Check 0a — minimum claim length (drops boilerplate like speaker intros)
if len(cl.split()) < MIN_CLAIM_WORDS:
logger.debug(
"%s: DROP claim too short (%d words) | claim: %.60s",
chunk_id, len(cl.split()), cl,
)
continue
# Check 0b — corporate header pattern (title + company suffix in same claim)
_parse_response function · python · L218-L240 (23 LOC)Red Flag Engine/src/llm_extract.py
def _parse_response(raw: str, chunk_id: str, chunk_text: str) -> list[Claim]:
"""Parse + validate the raw JSON string from the API response."""
text = raw.strip()
# Strip markdown code fences if the model wrapped its output.
if text.startswith("```"):
lines = text.splitlines()
text = "\n".join(
line for line in lines if not line.startswith("```")
).strip()
try:
data: Any = json.loads(text)
except json.JSONDecodeError as exc:
logger.error("%s: JSON parse error — %s", chunk_id, exc)
return []
try:
claim_list = ClaimList.model_validate(data)
except Exception as exc:
logger.error("%s: Pydantic validation error — %s", chunk_id, exc)
return []
return _filter_claims(claim_list.claims, chunk_id, chunk_text)_call_api function · python · L247-L270 (24 LOC)Red Flag Engine/src/llm_extract.py
def _call_api(chunk: Chunk, client: Any) -> str | None:
"""Call the Claude API with one retry on failure. Returns raw text or None."""
user_prompt = _build_user_prompt(chunk)
kwargs = dict(
model="claude-opus-4-6",
max_tokens=1024,
temperature=0,
system=SYSTEM_PROMPT,
messages=[{"role": "user", "content": user_prompt}],
)
for attempt in (1, 2):
try:
response = client.messages.create(**kwargs)
return response.content[0].text
except Exception as exc:
if attempt == 1:
logger.warning(
"%s: API error on attempt 1 (%s), retrying…", chunk.chunk_id, exc
)
else:
logger.warning(
"%s: API error on attempt 2 (%s), skipping chunk", chunk.chunk_id, exc
)
return NoneRepobility · code-quality intelligence · https://repobility.com
extract_claims_from_chunk function · python · L273-L298 (26 LOC)Red Flag Engine/src/llm_extract.py
def extract_claims_from_chunk(chunk: Chunk, client: Any) -> list[Claim]:
"""Call the Claude API for a single chunk and return validated Claims.
Args:
chunk: The transcript chunk to analyse.
client: An instantiated ``anthropic.Anthropic`` client.
Returns:
List of validated Claim objects (may be empty).
"""
raw = _call_api(chunk, client)
if raw is None:
return []
claims = _parse_response(raw, chunk.chunk_id, chunk.text)
# Propagate speaker_role from chunk onto each validated claim.
if chunk.speaker_role != "unknown":
claims = [c.model_copy(update={"speaker_role": chunk.speaker_role})
for c in claims]
logger.debug(
"%s: extracted %d claim(s) section=%s speaker=%s",
chunk.chunk_id, len(claims), chunk.section, chunk.speaker_role,
)
return claimsextract_claims function · python · L301-L318 (18 LOC)Red Flag Engine/src/llm_extract.py
def extract_claims(chunks: list[Chunk], client: Any) -> list[Claim]:
"""Extract claims from all chunks, skipping any that fail.
Args:
chunks: Ordered list of Chunk objects from segment_doc().
client: An instantiated ``anthropic.Anthropic`` client.
Returns:
Flat list of all validated Claims across the document.
"""
all_claims: list[Claim] = []
for chunk in chunks:
claims = extract_claims_from_chunk(chunk, client)
all_claims.extend(claims)
logger.info(
"Total claims extracted: %d from %d chunks", len(all_claims), len(chunks)
)
return all_claims_configure_logging function · python · L68-L74 (7 LOC)Red Flag Engine/src/main.py
def _configure_logging(level: str) -> None:
logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO),
format="%(asctime)s %(levelname)-8s %(name)s %(message)s",
datefmt="%H:%M:%S",
stream=sys.stderr,
)page 1 / 3next ›