← back to eloiduc__redflag-engine

Function bodies 106 total

All specs Real LLM only Function bodies
build_chart function · python · L110-L293 (184 LOC)
HMM/app.py
def build_chart(
    df: pd.DataFrame,
    equity_df: pd.DataFrame,
    trades_df: pd.DataFrame,
    show_ema: bool,
    show_trades: bool,
    lookback_days: int,
) -> go.Figure:
    """
    Build a 3-panel interactive Plotly chart:
      Row 1 – Candlestick with regime background + EMAs + trade markers
      Row 2 – Equity curve
      Row 3 – Volume bars
    """
    # --- Slice to lookback window ---
    cutoff = df.index[-1] - pd.Timedelta(days=lookback_days)
    view_df = df[df.index >= cutoff]
    view_eq  = equity_df[equity_df.index >= cutoff]

    fig = make_subplots(
        rows=3, cols=1,
        shared_xaxes=True,
        vertical_spacing=0.025,
        row_heights=[0.60, 0.22, 0.18],
        subplot_titles=("BTC/USD Price & Market Regimes", "Strategy Equity ($)", "Volume"),
    )

    # ── Regime backgrounds (group consecutive identical regimes) ──────────
    regime_change_id = view_df["Regime"].ne(view_df["Regime"].shift()).cumsum()
    for _, seg in view_df.groupby(regim
build_regime_pie function · python · L300-L319 (20 LOC)
HMM/app.py
def build_regime_pie(df: pd.DataFrame) -> go.Figure:
    counts = df["Regime"].value_counts()
    colors = [REGIME_LINE.get(r, "#888888") for r in counts.index]

    fig = go.Figure(go.Pie(
        labels=counts.index,
        values=counts.values,
        hole=0.45,
        marker=dict(colors=colors, line=dict(color="#0d1117", width=2)),
        textinfo="label+percent",
        textfont=dict(size=12),
    ))
    fig.update_layout(
        template="plotly_dark",
        paper_bgcolor="#0d1117",
        height=300,
        margin=dict(l=0, r=0, t=10, b=0),
        showlegend=False,
    )
    return fig
main function · python · L326-L512 (187 LOC)
HMM/app.py
def main() -> None:

    # ── Sidebar ──────────────────────────────────────────────────────────
    with st.sidebar:
        st.markdown("## ⚙️ Controls")
        lookback_days = st.slider(
            "Chart lookback (days)", min_value=30, max_value=730, value=180, step=30
        )
        show_ema    = st.toggle("Show EMAs (50 / 200)", value=True)
        show_trades = st.toggle("Show Trade Markers",   value=True)
        st.divider()
        st.markdown("### Strategy Parameters")
        st.markdown(f"- **Leverage**: 2.5×")
        st.markdown(f"- **Cooldown**: 48 h after exit")
        st.markdown(f"- **Min confirmations**: {CONFIRM_NEEDED}/8")
        st.markdown(f"- **HMM States**: 7")
        st.divider()
        force_reload = st.button("🔄 Reload Data", use_container_width=True)

    if force_reload:
        st.cache_data.clear()

    # ── Load pipeline ─────────────────────────────────────────────────────
    with st.spinner("Loading data and running backtest … (first run ~3
HMMEngine class · python · L64-L134 (71 LOC)
HMM/backtester.py
class HMMEngine:
    n_states: int = N_STATES
    model:    Optional[hmm.GaussianHMM] = field(default=None, repr=False)
    scaler:   Optional[StandardScaler]  = field(default=None, repr=False)
    bull_state: int = -1
    bear_state: int = -1
    regime_labels: dict[int, str] = field(default_factory=dict)

    def fit(self, df: pd.DataFrame) -> "HMMEngine":
        """Train the HMM on Returns, Range, VolVol features."""
        X = df[["Returns", "Range", "VolVol"]].values

        self.scaler = StandardScaler()
        X_scaled = self.scaler.fit_transform(X)

        self.model = hmm.GaussianHMM(
            n_components=self.n_states,
            covariance_type="full",
            n_iter=500,
            tol=1e-5,
            random_state=42,
        )
        self.model.fit(X_scaled)

        self._identify_states()
        return self

    def _identify_states(self) -> None:
        """Map each HMM state to a human-readable regime label."""
        # mean Returns for each state (
fit method · python · L72-L89 (18 LOC)
HMM/backtester.py
    def fit(self, df: pd.DataFrame) -> "HMMEngine":
        """Train the HMM on Returns, Range, VolVol features."""
        X = df[["Returns", "Range", "VolVol"]].values

        self.scaler = StandardScaler()
        X_scaled = self.scaler.fit_transform(X)

        self.model = hmm.GaussianHMM(
            n_components=self.n_states,
            covariance_type="full",
            n_iter=500,
            tol=1e-5,
            random_state=42,
        )
        self.model.fit(X_scaled)

        self._identify_states()
        return self
_identify_states method · python · L91-L113 (23 LOC)
HMM/backtester.py
    def _identify_states(self) -> None:
        """Map each HMM state to a human-readable regime label."""
        # mean Returns for each state (unscale only the first feature)
        scaled_means = self.model.means_[:, 0]
        returns_std  = self.scaler.scale_[0]
        returns_mean = self.scaler.mean_[0]
        raw_means    = scaled_means * returns_std + returns_mean

        self.bull_state = int(np.argmax(raw_means))
        self.bear_state = int(np.argmin(raw_means))

        self.regime_labels = {}
        for i in range(self.n_states):
            if i == self.bull_state:
                self.regime_labels[i] = REGIME_BULL
            elif i == self.bear_state:
                self.regime_labels[i] = REGIME_BEAR
            elif raw_means[i] > 0:
                self.regime_labels[i] = REGIME_BULL2
            elif raw_means[i] < -1e-4:
                self.regime_labels[i] = REGIME_BEAR2
            else:
                self.regime_labels[i] = REGIME_NEUT
predict method · python · L115-L119 (5 LOC)
HMM/backtester.py
    def predict(self, df: pd.DataFrame) -> np.ndarray:
        """Return predicted state sequence for each row in *df*."""
        X = df[["Returns", "Range", "VolVol"]].values
        X_scaled = self.scaler.transform(X)
        return self.model.predict(X_scaled)
Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
state_summary method · python · L121-L134 (14 LOC)
HMM/backtester.py
    def state_summary(self) -> pd.DataFrame:
        """Return a summary table of each state's mean features."""
        means_scaled = self.model.means_
        means_raw    = self.scaler.inverse_transform(means_scaled)
        rows = []
        for i in range(self.n_states):
            rows.append({
                "State":       i,
                "Label":       self.regime_labels[i],
                "Mean Return": f"{means_raw[i, 0] * 100:.4f}%",
                "Mean Range":  f"{means_raw[i, 1] * 100:.4f}%",
                "Mean VolVol": f"{means_raw[i, 2]:.4f}",
            })
        return pd.DataFrame(rows).set_index("State")
_confirmations function · python · L153-L168 (16 LOC)
HMM/backtester.py
def _confirmations(row: pd.Series) -> tuple[int, list[bool]]:
    """
    Evaluate the 8 confirmation signals for a single bar.
    Returns (count_of_True, list_of_bool).
    """
    signals = [
        row["RSI"]      < 90,
        row["Momentum"] > 1.0,
        row["Volatility"] < 6.0,
        row["Volume"]   > row["VolSMA20"],
        row["ADX"]      > 25.0,
        row["Close"]    > row["EMA50"],
        row["Close"]    > row["EMA200"],
        row["MACD"]     > row["MACD_Signal"],
    ]
    return sum(signals), signals
run_backtest function · python · L175-L287 (113 LOC)
HMM/backtester.py
def run_backtest(
    df: pd.DataFrame,
    engine: HMMEngine,
) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Simulate the strategy bar-by-bar.

    Parameters
    ----------
    df     : fully featured DataFrame (from compute_indicators)
    engine : fitted HMMEngine

    Returns
    -------
    equity_df : hourly equity curve with regime / position columns
    trades_df : log of every completed trade
    """
    hidden_states = engine.predict(df)
    df = df.copy()
    df["HMM_State"] = hidden_states
    df["Regime"]    = [engine.regime_labels[s] for s in hidden_states]

    capital      = INITIAL_CAPITAL
    position     = None          # None or dict with entry info
    cooldown_end = pd.Timestamp.min.tz_localize(df.index.tz)

    equity_rows: list[dict] = []
    trade_rows:  list[dict] = []

    for ts, row in df.iterrows():
        regime        = row["Regime"]
        conf_cnt, _   = _confirmations(row)
        in_cooldown   = ts < cooldown_end

        # ── EXIT ──────────
compute_metrics function · python · L294-L331 (38 LOC)
HMM/backtester.py
def compute_metrics(
    equity_df:      pd.DataFrame,
    trades_df:      pd.DataFrame,
    df:             pd.DataFrame,
    initial_capital: float = INITIAL_CAPITAL,
) -> dict:
    """Compute Total Return, Alpha, Win Rate, and Max Drawdown."""
    final_equity = equity_df["equity"].iloc[-1]
    total_return = (final_equity - initial_capital) / initial_capital * 100.0

    bh_return = (df["Close"].iloc[-1] - df["Close"].iloc[0]) / df["Close"].iloc[0] * 100.0
    alpha     = total_return - bh_return

    if len(trades_df) > 0:
        wins     = (trades_df["pnl_dollar"] > 0).sum()
        win_rate = wins / len(trades_df) * 100.0
        avg_win  = trades_df.loc[trades_df["pnl_dollar"] > 0, "pnl_dollar"].mean()
        avg_loss = trades_df.loc[trades_df["pnl_dollar"] <= 0, "pnl_dollar"].mean()
    else:
        win_rate = 0.0
        avg_win  = 0.0
        avg_loss = 0.0

    rolling_max  = equity_df["equity"].cummax()
    drawdown     = (equity_df["equity"] - rolling_max) / rolling_ma
run_full_pipeline function · python · L338-L377 (40 LOC)
HMM/backtester.py
def run_full_pipeline() -> tuple:
    """
    Orchestrates the full workflow:
      1. Fetch + compute indicators
      2. Train HMM
      3. Backtest
      4. Compute metrics

    Returns
    -------
    df, equity_df, trades_df, metrics, engine
    """
    print("  [1/4] Fetching BTC-USD hourly data …")
    df_raw = fetch_data()

    print("  [2/4] Computing technical indicators …")
    df = compute_indicators(df_raw)

    print("  [3/4] Training 7-state GaussianHMM …")
    engine = HMMEngine(n_states=N_STATES)
    engine.fit(df)
    print(f"        Bull Run state → {engine.bull_state}  |  "
          f"Bear/Crash state → {engine.bear_state}")
    print(engine.state_summary().to_string())

    # Attach regime columns to df
    states     = engine.predict(df)
    df["HMM_State"] = states
    df["Regime"]    = [engine.regime_labels[s] for s in states]

    print("  [4/4] Running backtest …")
    equity_df, trades_df = run_backtest(df, engine)

    metrics = compute_metrics(equity_df, t
fetch_data function · python · L19-L41 (23 LOC)
HMM/data_loader.py
def fetch_data(ticker: str = "BTC-USD", period_days: int = 730) -> pd.DataFrame:
    """
    Download hourly OHLCV data for *ticker* spanning *period_days* days.

    yfinance supports up to 730 days at the 1h interval.
    Returns a clean DataFrame with columns: Open, High, Low, Close, Volume.
    """
    df = yf.download(
        ticker,
        period=f"{period_days}d",
        interval="1h",
        progress=False,
        auto_adjust=True,
    )

    # Flatten multi-level columns produced by newer yfinance versions
    if isinstance(df.columns, pd.MultiIndex):
        df.columns = df.columns.droplevel(1)

    df = df[["Open", "High", "Low", "Close", "Volume"]].copy()
    df.dropna(inplace=True)
    df.sort_index(inplace=True)
    return df
_rsi function · python · L48-L55 (8 LOC)
HMM/data_loader.py
def _rsi(series: pd.Series, period: int = 14) -> pd.Series:
    delta = series.diff()
    gain = delta.clip(lower=0)
    loss = -delta.clip(upper=0)
    avg_gain = gain.ewm(com=period - 1, min_periods=period, adjust=False).mean()
    avg_loss = loss.ewm(com=period - 1, min_periods=period, adjust=False).mean()
    rs = avg_gain / (avg_loss + 1e-10)
    return 100.0 - (100.0 / (1.0 + rs))
_adx function · python · L62-L91 (30 LOC)
HMM/data_loader.py
def _adx(df: pd.DataFrame, period: int = 14) -> pd.Series:
    high  = df["High"]
    low   = df["Low"]
    close = df["Close"]

    # True Range
    tr = pd.concat(
        [high - low,
         (high - close.shift(1)).abs(),
         (low  - close.shift(1)).abs()],
        axis=1,
    ).max(axis=1)

    # Raw Directional Movement
    up_move   = high - high.shift(1)
    down_move = low.shift(1) - low

    pos_dm = np.where((up_move > down_move) & (up_move > 0), up_move, 0.0)
    neg_dm = np.where((down_move > up_move) & (down_move > 0), down_move, 0.0)
    pos_dm = pd.Series(pos_dm, index=df.index)
    neg_dm = pd.Series(neg_dm, index=df.index)

    alpha = 1.0 / period
    atr    = tr.ewm(alpha=alpha, min_periods=period, adjust=False).mean()
    pos_di = 100.0 * pos_dm.ewm(alpha=alpha, min_periods=period, adjust=False).mean() / (atr + 1e-10)
    neg_di = 100.0 * neg_dm.ewm(alpha=alpha, min_periods=period, adjust=False).mean() / (atr + 1e-10)

    dx  = 100.0 * (pos_di - neg_di).abs(
Repobility · code-quality intelligence · https://repobility.com
compute_indicators function · python · L98-L145 (48 LOC)
HMM/data_loader.py
def compute_indicators(df: pd.DataFrame) -> pd.DataFrame:
    """
    Compute all technical indicators and HMM feature columns.

    HMM Features
    ------------
    Returns   : 1-period log return
    Range     : (High - Low) / Close  — normalised intrabar range
    VolVol    : rolling 20-bar coefficient-of-variation of Volume

    Strategy Signals
    ----------------
    RSI           : 14-period RSI
    Momentum      : 20-bar price change as %
    Volatility    : 20-bar rolling std of Returns × √24  (≈ daily vol %)
    VolSMA20      : 20-bar simple moving average of Volume
    ADX           : 14-period Average Directional Index
    EMA50 / EMA200: Exponential Moving Averages
    MACD          : EMA12 − EMA26
    MACD_Signal   : 9-bar EMA of MACD
    """
    out = df.copy()

    # --- HMM features ---
    out["Returns"] = np.log(out["Close"] / out["Close"].shift(1))
    out["Range"]   = (out["High"] - out["Low"]) / (out["Close"] + 1e-10)
    vol_mean       = out["Volume"].rolling(2
assess_ai_sensitivity function · python · L121-L190 (70 LOC)
Red Flag Engine/src/ai_sensitivity.py
def assess_ai_sensitivity(
    company: str,
    claims:  list[Claim],
    client:  Any,
) -> str:
    """Call Claude to produce an AI-announcement sensitivity section.

    Args:
        company: Company ticker (e.g. "BA", "TSLA", "NFLX").
        claims:  Claims extracted from the current-quarter transcript.
        client:  An instantiated ``anthropic.Anthropic()`` client.

    Returns:
        Markdown string for the section (begins with ``## AI Announcement
        Sensitivity``), or a short fallback placeholder if the API call fails.
    """
    # Build a compact claim digest (category + polarity + statement).
    # Deduplicate on the first 80 chars of the claim text; cap at 50 entries
    # to stay comfortably within the model's context window.
    if claims:
        seen: set[str] = set()
        lines: list[str] = []
        for c in claims:
            key = c.claim[:80]
            if key not in seen:
                seen.add(key)
                lines.append(
              
PostEarningsReturns class · python · L37-L46 (10 LOC)
Red Flag Engine/src/backtest.py
class PostEarningsReturns(BaseModel):
    """Post-earnings price returns for a single report / earnings call."""

    model_config = ConfigDict(extra="forbid")

    ticker:    str
    call_date: str            # ISO 8601 date of the earnings call
    ret_1d:    Optional[float] = None   # 1-trading-day return
    ret_5d:    Optional[float] = None   # 5-trading-day return
    ret_20d:   Optional[float] = None   # 20-trading-day return
_load_earnings_dates function · python · L53-L65 (13 LOC)
Red Flag Engine/src/backtest.py
def _load_earnings_dates(path: Path) -> dict[str, str]:
    """Load earnings_dates.json; return empty dict on any error."""
    if not path.exists():
        logger.debug("earnings_dates.json not found at %s", path)
        return {}
    try:
        with path.open(encoding="utf-8") as fh:
            data = json.load(fh)
        # Drop the _comment key if present
        return {k: v for k, v in data.items() if not k.startswith("_")}
    except Exception as exc:
        logger.warning("Failed to load earnings_dates.json: %s", exc)
        return {}
_ret function · python · L68-L72 (5 LOC)
Red Flag Engine/src/backtest.py
def _ret(close_series, t: int, baseline: float) -> Optional[float]:
    """Compute return at index *t* relative to *baseline*.  Returns None if out of range."""
    if t < len(close_series) and baseline != 0:
        return round((close_series.iloc[t] - baseline) / baseline, 4)
    return None
compute_post_earnings_returns function · python · L79-L159 (81 LOC)
Red Flag Engine/src/backtest.py
def compute_post_earnings_returns(
    report_filename:     str,
    earnings_dates_path: Path,
) -> Optional[PostEarningsReturns]:
    """Compute post-earnings price returns for one report.

    Looks up the earnings call date in earnings_dates.json using the report
    filename stem (without ``.md``), then fetches price history via yfinance
    and computes 1d / 5d / 20d returns.

    Args:
        report_filename:     E.g. ``"BA_2025Q4_vs_2025Q3.md"``.
        earnings_dates_path: Path to earnings_dates.json.

    Returns:
        :class:`PostEarningsReturns` on success, or ``None`` on any error
        (missing date entry, network failure, import error, etc.).
    """
    # ── Lazy import of yfinance ───────────────────────────────────────────
    try:
        import yfinance as yf
    except ImportError:
        logger.warning("yfinance not installed — backtest unavailable. Run: pip install yfinance")
        return None

    # ── Look up earnings date ────────────────────────────
load_backtest_summary function · python · L162-L206 (45 LOC)
Red Flag Engine/src/backtest.py
def load_backtest_summary(
    outputs_dir:         Path,
    earnings_dates_path: Path,
) -> "pd.DataFrame":
    """Build an aggregate backtest DataFrame from all reports in outputs_dir.

    Args:
        outputs_dir:         Directory containing generated .md reports.
        earnings_dates_path: Path to earnings_dates.json.

    Returns:
        pandas DataFrame with columns: Report, Ticker, Date, 1d%, 5d%, 20d%.
        Returns an empty DataFrame when no data is available (no yfinance,
        no date entries, or network failure).
    """
    try:
        import pandas as pd
    except ImportError:
        logger.warning("pandas not available — cannot build backtest summary")
        import types
        return types.SimpleNamespace(empty=True)  # type: ignore[return-value]

    rows: list[dict] = []

    for md_path in sorted(outputs_dir.glob("*.md")):
        result = compute_post_earnings_returns(md_path.name, earnings_dates_path)
        if result is None:
            continue
ChangeType class · python · L56-L60 (5 LOC)
Red Flag Engine/src/diff.py
class ChangeType(str, Enum):
    new       = "new"
    worsened  = "worsened"
    improved  = "improved"
    unchanged = "unchanged"
Repobility · MCP-ready · https://repobility.com
Change class · python · L63-L82 (20 LOC)
Red Flag Engine/src/diff.py
class Change(BaseModel):
    model_config = ConfigDict(extra="forbid")

    category:         Category
    change_type:      ChangeType
    severity:         int               # 1–5
    confidence:       Confidence
    summary:          str
    # Current quarter
    now_claim:        str
    now_evidence:     str
    now_chunk_id:     str
    now_speaker_role: str = "unknown"   # propagated from Claim.speaker_role
    # Prior quarter (None for new claims)
    prev_claim:       Optional[str] = None
    prev_evidence:    Optional[str] = None
    prev_chunk_id:    Optional[str] = None
    # Similarity metadata
    match_score:      float = 0.0
    match_quality:    str   = "strict"  # "strict" | "soft"
_assign_severity function · python · L89-L139 (51 LOC)
Red Flag Engine/src/diff.py
def _assign_severity(
    change_type:  ChangeType,
    category:     Category,
    polarity_now: Polarity,
    confidence:   Confidence,
) -> int:
    """Return a severity score in [1, 5] applying all rules in order.

    Base rules:
      new       → 3  (+1 if high-risk category, +1 if negative polarity)
      worsened  → 4  (+1 if high-risk category)
      improved  → 1  (+1 if high confidence)
      unchanged → 2

    Post-base adjustments:
      low confidence  → cap at 3
      high confidence + worsened + bump-category → +1 (cap 5)
    """
    # ── Base ──────────────────────────────────────────────────────────────
    if change_type == ChangeType.new:
        base = 3
        if category in _HIGH_RISK_CATEGORIES:
            base += 1
        if polarity_now == Polarity.negative:
            base += 1

    elif change_type == ChangeType.worsened:
        base = 4
        if category in _HIGH_RISK_CATEGORIES:
            base += 1

    elif change_type == ChangeType.improved:
   
_determine_change_type function · python · L146-L157 (12 LOC)
Red Flag Engine/src/diff.py
def _determine_change_type(
    polarity_now:  Polarity,
    polarity_prev: Polarity,
) -> ChangeType:
    """Map a polarity transition to a ChangeType."""
    now_val  = POLARITY_ORDER[polarity_now]
    prev_val = POLARITY_ORDER[polarity_prev]
    if now_val < prev_val:
        return ChangeType.worsened
    if now_val > prev_val:
        return ChangeType.improved
    return ChangeType.unchanged
_build_summary function · python · L160-L182 (23 LOC)
Red Flag Engine/src/diff.py
def _build_summary(
    change_type: ChangeType,
    claim_now:   Claim,
    claim_prev:  Optional[Claim],
) -> str:
    """Produce a one-line human-readable summary for a Change."""
    cat = claim_now.category.value.replace("_", " ").title()

    if change_type == ChangeType.new:
        return f"[NEW] {cat}: {claim_now.claim}"
    if change_type == ChangeType.worsened:
        return (
            f"[WORSENED] {cat}: sentiment shifted "
            f"{claim_prev.polarity.value} → {claim_now.polarity.value}. "
            f"{claim_now.claim}"
        )
    if change_type == ChangeType.improved:
        return (
            f"[IMPROVED] {cat}: sentiment shifted "
            f"{claim_prev.polarity.value} → {claim_now.polarity.value}. "
            f"{claim_now.claim}"
        )
    return f"[UNCHANGED] {cat}: {claim_now.claim}"
match_claims function · python · L189-L305 (117 LOC)
Red Flag Engine/src/diff.py
def match_claims(
    now_claims:  list[Claim],
    prev_claims: list[Claim],
    threshold:   int = MATCH_THRESHOLD,
) -> list[Change]:
    """Compare two claim lists and return classified, ranked Changes.

    Two-pass matching strategy
    --------------------------
    Pass 1 (strict):  token_set_ratio on full claim text >= threshold.
    Pass 2 (soft):    For unmatched claims only — token_set_ratio on the
                      first SOFT_WINDOW chars, same category, >= SOFT_THRESHOLD.
                      Produces match_quality="soft".

    Args:
        now_claims:  Claims from the current (newer) quarter.
        prev_claims: Claims from the prior quarter. May be empty.
        threshold:   Strict match threshold (default 72).

    Returns:
        List of Change objects sorted by severity DESC, then confidence DESC.
    """
    changes: list[Change] = []

    for now in now_claims:
        # ── Pass 1: strict full-text match ─────────────────────────────
        best_score: fl
AbandonedMetric class · python · L312-L321 (10 LOC)
Red Flag Engine/src/diff.py
class AbandonedMetric(BaseModel):
    """A claim category that was prominent last quarter but absent this quarter."""

    model_config = ConfigDict(extra="forbid")

    category:             Category
    representative_claim: str    # most-confident prev claim text for this category
    evidence:             str    # evidence quote from the representative claim
    chunk_id:             str    # chunk_id of the representative claim
    confidence:           Confidence  # high ≥3 prev claims, medium ≥2
find_abandoned_metrics function · python · L324-L411 (88 LOC)
Red Flag Engine/src/diff.py
def find_abandoned_metrics(
    claims_now:  list[Claim],
    claims_prev: list[Claim],
    threshold:   int = MATCH_THRESHOLD,
) -> list[AbandonedMetric]:
    """Identify claim categories present last quarter but absent this quarter.

    A category is "abandoned" when:
      - It has ≥ 2 prior-quarter claims, AND
      - It is not an all-positive, non-guidance category (these may drop off
        naturally without being a signal), AND
      - No current-quarter claim has token_set_ratio ≥ SOFT_THRESHOLD against
        any prior-quarter claim in that category.

    Args:
        claims_now:  Claims extracted from the current quarter.
        claims_prev: Claims extracted from the prior quarter.
        threshold:   Ignored (kept for API symmetry with match_claims).

    Returns:
        List of :class:`AbandonedMetric` sorted by confidence DESC then
        category name ASC.  Empty list if claims_prev is empty.
    """
    if not claims_prev:
        return []

    # ── Group prior 
HedgeDelta class · python · L67-L76 (10 LOC)
Red Flag Engine/src/hedge_score.py
class HedgeDelta(BaseModel):
    """Comparison of hedge scores for one section between two quarters."""

    model_config = ConfigDict(extra="forbid")

    section:    str
    now_score:  float   # hedge matches per 100 words (current quarter)
    prev_score: float   # hedge matches per 100 words (prior quarter)
    delta:      float   # now_score − prev_score (positive = more hedged now)
    flag:       bool    # True when delta > _FLAG_DELTA_PP
Repobility (the analyzer behind this table) · https://repobility.com
_strip_safe_harbour function · python · L83-L92 (10 LOC)
Red Flag Engine/src/hedge_score.py
def _strip_safe_harbour(text: str) -> str:
    """Remove paragraphs that contain a Tier 3 safe-harbour phrase."""
    paragraphs = re.split(r"\n{2,}", text)
    clean: list[str] = []
    for para in paragraphs:
        para_lower = para.lower()
        if any(phrase in para_lower for phrase in _TIER3_PHRASES):
            continue
        clean.append(para)
    return "\n\n".join(clean)
_count_matches function · python · L95-L109 (15 LOC)
Red Flag Engine/src/hedge_score.py
def _count_matches(text: str) -> int:
    """Count total Tier 1 + Tier 2 hedge term matches in *text*."""
    count = 0
    # Single-word patterns (word-boundary regex)
    for pat in _TIER1_WORD_RES:
        count += len(pat.findall(text))
    for pat in _TIER2_WORD_RES:
        count += len(pat.findall(text))
    # Multi-word phrases (simple case-insensitive substring)
    text_lower = text.lower()
    for phrase in _TIER1_PHRASES:
        count += text_lower.count(phrase)
    for phrase in _TIER2_PHRASES:
        count += text_lower.count(phrase)
    return count
score_hedging function · python · L116-L146 (31 LOC)
Red Flag Engine/src/hedge_score.py
def score_hedging(chunks: "list[Chunk]") -> dict[str, float]:
    """Return a mapping of section label → hedge score (matches per 100 words).

    Safe-harbour paragraphs are stripped before counting.  Scores for chunks
    with the same section label are averaged.

    Args:
        chunks: Segmented transcript chunks (current or prior quarter).

    Returns:
        Dict keyed by section label (e.g. ``"guidance"``, ``"demand"``).
        Sections with zero total words after stripping are skipped.
    """
    # Accumulate (total_matches, total_words) per section
    section_data: dict[str, list[float]] = {}

    for chunk in chunks:
        text = _strip_safe_harbour(chunk.text)
        words = text.split()
        if not words:
            continue
        count = _count_matches(text)
        score = count / len(words) * 100.0
        section_data.setdefault(chunk.section, []).append(score)

    result: dict[str, float] = {}
    for section, scores in section_data.items():
        re
diff_hedge_scores function · python · L149-L188 (40 LOC)
Red Flag Engine/src/hedge_score.py
def diff_hedge_scores(
    now_scores:  dict[str, float],
    prev_scores: dict[str, float],
) -> list[HedgeDelta]:
    """Compare section-level hedge scores between two quarters.

    For sections present only in one quarter, the missing quarter's score
    is treated as 0.0 (new section or dropped section).

    Args:
        now_scores:  Output of ``score_hedging()`` for the current quarter.
        prev_scores: Output of ``score_hedging()`` for the prior quarter.

    Returns:
        List of :class:`HedgeDelta` sorted by ``|delta|`` descending.
    """
    all_sections = set(now_scores) | set(prev_scores)
    deltas: list[HedgeDelta] = []

    for section in all_sections:
        now_val  = now_scores.get(section, 0.0)
        prev_val = prev_scores.get(section, 0.0)
        delta    = round(now_val - prev_val, 2)
        deltas.append(HedgeDelta(
            section    = section,
            now_score  = round(now_val,  2),
            prev_score = round(prev_val, 2),
           
Doc class · python · L11-L14 (4 LOC)
Red Flag Engine/src/ingest.py
class Doc:
    company: str
    period: str
    text: str
load_doc function · python · L17-L70 (54 LOC)
Red Flag Engine/src/ingest.py
def load_doc(company: str, period: str, filepath: str | Path) -> Doc:
    """Load a transcript from a PDF or TXT file and return a Doc.

    Args:
        company:  Company identifier (e.g. "AAPL").
        period:   Period label (e.g. "Q4_2024").
        filepath: Absolute or relative path to a .pdf or .txt file.

    Returns:
        Doc with the full extracted text.

    Raises:
        FileNotFoundError: If the file does not exist.
        ValueError: If the file extension is not .pdf or .txt.
        RuntimeError: If PDF extraction yields no text.
    """
    path = Path(filepath)

    if not path.exists():
        raise FileNotFoundError(f"Transcript file not found: {path}")

    suffix = path.suffix.lower()

    if suffix == ".txt":
        text = path.read_text(encoding="utf-8", errors="ignore")
        logger.debug("Loaded TXT %s (%d chars)", path.name, len(text))
        return Doc(company=company, period=period, text=text)

    if suffix == ".pdf":
        try:
            i
Category class · python · L19-L29 (11 LOC)
Red Flag Engine/src/llm_extract.py
class Category(str, Enum):
    guidance              = "guidance"
    demand                = "demand"
    pricing_margin        = "pricing_margin"
    liquidity             = "liquidity"
    reg_legal             = "reg_legal"
    competition           = "competition"
    costs_restructuring   = "costs_restructuring"
    ops_execution         = "ops_execution"
    accounting            = "accounting"
    other                 = "other"
Polarity class · python · L32-L36 (5 LOC)
Red Flag Engine/src/llm_extract.py
class Polarity(str, Enum):
    positive = "positive"
    negative = "negative"
    neutral  = "neutral"
    mixed    = "mixed"
Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
Confidence class · python · L39-L42 (4 LOC)
Red Flag Engine/src/llm_extract.py
class Confidence(str, Enum):
    low    = "low"
    medium = "medium"
    high   = "high"
Claim class · python · L49-L65 (17 LOC)
Red Flag Engine/src/llm_extract.py
class Claim(BaseModel):
    model_config = ConfigDict(extra="forbid")

    category:     Category
    polarity:     Polarity
    claim:        str
    evidence:     str
    chunk_id:     str
    confidence:   Confidence
    speaker_role: str = "unknown"   # set post-parse from Chunk; not in LLM schema

    @field_validator("evidence")
    @classmethod
    def evidence_not_empty(cls, v: str) -> str:
        if not v.strip():
            raise ValueError("evidence must not be empty")
        return v
evidence_not_empty method · python · L62-L65 (4 LOC)
Red Flag Engine/src/llm_extract.py
    def evidence_not_empty(cls, v: str) -> str:
        if not v.strip():
            raise ValueError("evidence must not be empty")
        return v
ClaimList class · python · L68-L71 (4 LOC)
Red Flag Engine/src/llm_extract.py
class ClaimList(BaseModel):
    model_config = ConfigDict(extra="forbid")

    claims: list[Claim]
_build_user_prompt function · python · L137-L143 (7 LOC)
Red Flag Engine/src/llm_extract.py
def _build_user_prompt(chunk: Chunk) -> str:
    return _USER_PROMPT_TEMPLATE.format(
        chunk_id=chunk.chunk_id,
        section=chunk.section,
        speaker_role=chunk.speaker_role,
        chunk_text=chunk.text,
    )
_filter_claims function · python · L150-L215 (66 LOC)
Red Flag Engine/src/llm_extract.py
def _filter_claims(
    claims: list[Claim],
    chunk_id: str,
    chunk_text: str,
) -> list[Claim]:
    """Apply post-LLM validation and return only fully-passing claims.

    Checks (in order, all must pass):
    1. Evidence is non-empty.
    2. Evidence word count ≤ 25.
    3. Evidence is a verbatim substring of the chunk text (exact match).

    Claims failing any check are dropped and the reason is logged.
    The returned list is capped at MAX_CLAIMS_PER_CHUNK.
    """
    valid: list[Claim] = []
    for claim in claims:
        ev = claim.evidence.strip()
        cl = claim.claim.strip()

        # Check 0a — minimum claim length (drops boilerplate like speaker intros)
        if len(cl.split()) < MIN_CLAIM_WORDS:
            logger.debug(
                "%s: DROP claim too short (%d words) | claim: %.60s",
                chunk_id, len(cl.split()), cl,
            )
            continue

        # Check 0b — corporate header pattern (title + company suffix in same claim)
   
_parse_response function · python · L218-L240 (23 LOC)
Red Flag Engine/src/llm_extract.py
def _parse_response(raw: str, chunk_id: str, chunk_text: str) -> list[Claim]:
    """Parse + validate the raw JSON string from the API response."""
    text = raw.strip()
    # Strip markdown code fences if the model wrapped its output.
    if text.startswith("```"):
        lines = text.splitlines()
        text = "\n".join(
            line for line in lines if not line.startswith("```")
        ).strip()

    try:
        data: Any = json.loads(text)
    except json.JSONDecodeError as exc:
        logger.error("%s: JSON parse error — %s", chunk_id, exc)
        return []

    try:
        claim_list = ClaimList.model_validate(data)
    except Exception as exc:
        logger.error("%s: Pydantic validation error — %s", chunk_id, exc)
        return []

    return _filter_claims(claim_list.claims, chunk_id, chunk_text)
_call_api function · python · L247-L270 (24 LOC)
Red Flag Engine/src/llm_extract.py
def _call_api(chunk: Chunk, client: Any) -> str | None:
    """Call the Claude API with one retry on failure. Returns raw text or None."""
    user_prompt = _build_user_prompt(chunk)
    kwargs = dict(
        model="claude-opus-4-6",
        max_tokens=1024,
        temperature=0,
        system=SYSTEM_PROMPT,
        messages=[{"role": "user", "content": user_prompt}],
    )
    for attempt in (1, 2):
        try:
            response = client.messages.create(**kwargs)
            return response.content[0].text
        except Exception as exc:
            if attempt == 1:
                logger.warning(
                    "%s: API error on attempt 1 (%s), retrying…", chunk.chunk_id, exc
                )
            else:
                logger.warning(
                    "%s: API error on attempt 2 (%s), skipping chunk", chunk.chunk_id, exc
                )
    return None
Repobility · code-quality intelligence · https://repobility.com
extract_claims_from_chunk function · python · L273-L298 (26 LOC)
Red Flag Engine/src/llm_extract.py
def extract_claims_from_chunk(chunk: Chunk, client: Any) -> list[Claim]:
    """Call the Claude API for a single chunk and return validated Claims.

    Args:
        chunk:  The transcript chunk to analyse.
        client: An instantiated ``anthropic.Anthropic`` client.

    Returns:
        List of validated Claim objects (may be empty).
    """
    raw = _call_api(chunk, client)
    if raw is None:
        return []

    claims = _parse_response(raw, chunk.chunk_id, chunk.text)

    # Propagate speaker_role from chunk onto each validated claim.
    if chunk.speaker_role != "unknown":
        claims = [c.model_copy(update={"speaker_role": chunk.speaker_role})
                  for c in claims]

    logger.debug(
        "%s: extracted %d claim(s)  section=%s  speaker=%s",
        chunk.chunk_id, len(claims), chunk.section, chunk.speaker_role,
    )
    return claims
extract_claims function · python · L301-L318 (18 LOC)
Red Flag Engine/src/llm_extract.py
def extract_claims(chunks: list[Chunk], client: Any) -> list[Claim]:
    """Extract claims from all chunks, skipping any that fail.

    Args:
        chunks: Ordered list of Chunk objects from segment_doc().
        client: An instantiated ``anthropic.Anthropic`` client.

    Returns:
        Flat list of all validated Claims across the document.
    """
    all_claims: list[Claim] = []
    for chunk in chunks:
        claims = extract_claims_from_chunk(chunk, client)
        all_claims.extend(claims)
    logger.info(
        "Total claims extracted: %d from %d chunks", len(all_claims), len(chunks)
    )
    return all_claims
_configure_logging function · python · L68-L74 (7 LOC)
Red Flag Engine/src/main.py
def _configure_logging(level: str) -> None:
    logging.basicConfig(
        level=getattr(logging, level.upper(), logging.INFO),
        format="%(asctime)s  %(levelname)-8s  %(name)s  %(message)s",
        datefmt="%H:%M:%S",
        stream=sys.stderr,
    )
page 1 / 3next ›