← back to eloiduc__redflag-engine

Function bodies 106 total

All specs Real LLM only Function bodies
_resolve function · python · L81-L84 (4 LOC)
Red Flag Engine/src/main.py
def _resolve(path_str: str) -> Path:
    """Return an absolute Path, resolving relative paths from the project root."""
    p = Path(path_str)
    return p if p.is_absolute() else (_PROJECT_ROOT / p).resolve()
_find_transcript function · python · L87-L111 (25 LOC)
Red Flag Engine/src/main.py
def _find_transcript(company: str, period: str, data_dir: Path) -> Path:
    """Locate <period>_transcript.pdf or .txt under data_dir/company/.

    Raises:
        FileNotFoundError: With an actionable message if no file is found.
    """
    company_dir = data_dir / company
    if not company_dir.exists():
        raise FileNotFoundError(
            f"Company directory not found: {company_dir}\n"
            f"  Expected location: {company_dir}\n"
            f"  Create it and add a transcript file named:\n"
            f"    {period}_transcript.pdf  or  {period}_transcript.txt"
        )
    for ext in (".pdf", ".txt"):
        candidate = company_dir / f"{period}_transcript{ext}"
        if candidate.exists():
            return candidate
    raise FileNotFoundError(
        f"No transcript found for {company} / {period}\n"
        f"  Looked in: {company_dir}\n"
        f"  Expected one of:\n"
        f"    {period}_transcript.pdf\n"
        f"    {period}_transcript.txt"
    )
_check_api_key function · python · L118-L134 (17 LOC)
Red Flag Engine/src/main.py
def _check_api_key() -> None:
    key = os.environ.get("ANTHROPIC_API_KEY", "").strip()
    if not key:
        print(
            "\nERROR: ANTHROPIC_API_KEY is not set.\n"
            "\n"
            "  Fix:\n"
            "    1. Create a file named  .env  in the project root:\n"
            f"       {_PROJECT_ROOT / '.env'}\n"
            "    2. Add this line:  ANTHROPIC_API_KEY=sk-ant-...\n"
            "    3. Get a key at:   https://console.anthropic.com/\n"
            "\n"
            "  Note: A Claude Pro (claude.ai) subscription does NOT include API\n"
            "        access.  You need a separate Anthropic API account.\n",
            file=sys.stderr,
        )
        sys.exit(1)
run_selfcheck function · python · L141-L178 (38 LOC)
Red Flag Engine/src/main.py
def run_selfcheck(
    company: str,
    prev_period: str,
    now_period: str,
    data_dir: Path,
) -> None:
    """Ingest and segment both transcripts; print diagnostic stats.

    No LLM calls are made.  Use this to verify files load correctly
    and chunking behaves as expected before spending API credits.
    """
    for period in (prev_period, now_period):
        try:
            path = _find_transcript(company, period, data_dir)
        except FileNotFoundError as exc:
            print(f"\n[SELFCHECK] ERROR: {exc}", file=sys.stderr)
            continue

        doc    = load_doc(company, period, path)
        chunks = segment_doc(doc)
        section_counts = Counter(c.section for c in chunks)

        print(f"\n{'='*60}")
        print(f"  {company}  {period}")
        print(f"{'='*60}")
        print(f"  File        : {path}")
        print(f"  Text length : {len(doc.text):,} chars")
        print(f"  Chunks      : {len(chunks)}")
        print(f"  Sections    :")
       
_parse_args function · python · L185-L243 (59 LOC)
Red Flag Engine/src/main.py
def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        prog="python src/main.py",
        description="Red Flag Engine — earnings call transcript change detector",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog=(
            "Examples:\n"
            "  python src/main.py --company BA   --prev 2025Q3 --now 2025Q4\n"
            "  python src/main.py --company TSLA --prev 2025Q3 --now 2025Q4 --threshold 80\n"
            "  python src/main.py --company BA   --prev 2025Q3 --now 2025Q4 --selfcheck\n"
        ),
    )
    parser.add_argument(
        "--company", required=True,
        help="Company ticker, e.g. BA.",
    )
    parser.add_argument(
        "--prev", required=True, metavar="PERIOD",
        help="Prior-quarter period label, e.g. 2025Q3.",
    )
    parser.add_argument(
        "--now", required=True, metavar="PERIOD",
        help="Current-quarter period label, e.g. 2025Q4.",
    )
  
run_pipeline function · python · L250-L398 (149 LOC)
Red Flag Engine/src/main.py
def run_pipeline(
    company:     str,
    now_period:  str,
    prev_period: str,
    now_path:    str,
    prev_path:   str,
) -> str:
    """Run the full Red Flag Engine pipeline with explicit file paths.

    Unlike ``run()``, this function accepts absolute file paths directly
    (useful when the caller — e.g. a Streamlit UI — already has the file
    on disk) rather than deriving them from a company / period / data-dir
    triple.

    Args:
        company:     Company ticker (e.g. "BA").
        now_period:  Label for the current quarter (e.g. "2025Q4").
        prev_period: Label for the prior quarter (e.g. "2025Q3").
        now_path:    Absolute path to the current-quarter transcript.
        prev_path:   Absolute path to the prior-quarter transcript.

    Returns:
        Absolute path to the generated Markdown report as a string.

    Raises:
        RuntimeError: If ingestion or segmentation yields empty results.
        Any exception from the underlying pipeline steps i
run function · python · L405-L543 (139 LOC)
Red Flag Engine/src/main.py
def run(
    company: str,
    prev_period: str,
    now_period: str,
    data_dir: Path,
    output_dir: Path,
    threshold: int,
) -> Path:
    """Execute the full pipeline and return the path to the written report."""
    log = logging.getLogger(__name__)

    prev_path = _find_transcript(company, prev_period, data_dir)
    now_path  = _find_transcript(company, now_period,  data_dir)

    log.info("=== Red Flag Engine ===")
    log.info("Project root  : %s", _PROJECT_ROOT)
    log.info("Company       : %s", company)
    log.info("Prior quarter : %s  (%s)", prev_period, prev_path)
    log.info("Now quarter   : %s  (%s)", now_period, now_path)
    log.info("Threshold     : %d", threshold)
    log.info("Output dir    : %s", output_dir)

    # 1. Ingest
    log.info("[1/5] Ingesting transcripts…")
    doc_prev = load_doc(company, prev_period, prev_path)
    doc_now  = load_doc(company, now_period,  now_path)

    if not doc_prev.text.strip():
        raise RuntimeError(f"Prior transcri
If a scraper extracted this row, it came from Repobility (https://repobility.com)
main function · python · L550-L586 (37 LOC)
Red Flag Engine/src/main.py
def main(argv: list[str] | None = None) -> None:
    args = _parse_args(argv)
    _configure_logging(args.log_level)

    data_dir   = _resolve(args.data_dir)
    output_dir = _resolve(args.output_dir)

    # ── Self-check: no API needed ──────────────────────────────────────────
    if args.selfcheck:
        run_selfcheck(
            company=args.company,
            prev_period=args.prev,
            now_period=args.now,
            data_dir=data_dir,
        )
        return

    # ── Full pipeline: API required ────────────────────────────────────────
    _check_api_key()

    try:
        out_path = run(
            company=args.company,
            prev_period=args.prev,
            now_period=args.now,
            data_dir=data_dir,
            output_dir=output_dir,
            threshold=args.threshold,
        )
    except FileNotFoundError as exc:
        print(f"\nERROR (file not found):\n{exc}\n", file=sys.stderr)
        sys.exit(1)
    except (ValueError, RuntimeError) 
PeerSignal class · python · L32-L44 (13 LOC)
Red Flag Engine/src/peer_contagion.py
class PeerSignal(BaseModel):
    """A single red flag sourced from a related company's report."""

    model_config = ConfigDict(extra="forbid")

    source_company:  str   # ticker of the peer / supplier
    relationship:    str   # "peer" | "supplier"
    category:        str
    claim:           str
    evidence:        str
    polarity:        str   # "negative" | "mixed"
    sev:             str   # severity label, e.g. "High", "Critical"
    report_filename: str
_split_pipe_row function · python · L52-L60 (9 LOC)
Red Flag Engine/src/peer_contagion.py
def _split_pipe_row(line: str) -> list[str]:
    """Split one Markdown table row on unescaped pipes."""
    parts = re.split(r"(?<!\\)\|", line)
    # Drop leading/trailing empty cells from the outer | delimiters
    while parts and not parts[0].strip():
        parts.pop(0)
    while parts and not parts[-1].strip():
        parts.pop()
    return [p.strip().replace("\\|", "|") for p in parts]
_parse_red_flags_table function · python · L63-L87 (25 LOC)
Red Flag Engine/src/peer_contagion.py
def _parse_red_flags_table(md_text: str) -> list[dict[str, str]]:
    """Extract rows from the ## Red Flags section of a report Markdown string.

    Returns a list of dicts keyed by the table header names.
    Returns an empty list if the section or table is absent / malformed.
    """
    # Split report into sections using the --- separator
    parts = re.split(r"\n\n---\n\n", md_text)
    for part in parts:
        if re.match(r"^## Red Flags\b", part.strip()):
            # Found the Red Flags section — parse the pipe table within it
            lines = [
                ln for ln in part.splitlines()
                if re.match(r"^\s*\|", ln)
            ]
            if len(lines) < 3:
                return []
            headers = _split_pipe_row(lines[0])
            rows: list[dict[str, str]] = []
            for line in lines[2:]:  # skip header + separator row
                cells = _split_pipe_row(line)
                if len(cells) == len(headers):
                    ro
_polarity_proxy function · python · L94-L106 (13 LOC)
Red Flag Engine/src/peer_contagion.py
def _polarity_proxy(change: str, sev: str) -> str | None:
    """Map a Change cell + Sev cell to a polarity string, or None to skip.

    Rules:
      - "WORSENED" in change → "negative"
      - "NEW" in change AND sev in _HIGH_SEV_LABELS → "mixed"
      - All other rows → None (excluded)
    """
    if "WORSENED" in change:
        return "negative"
    if "NEW" in change and sev in _HIGH_SEV_LABELS:
        return "mixed"
    return None
load_peer_signals function · python · L113-L212 (100 LOC)
Red Flag Engine/src/peer_contagion.py
def load_peer_signals(
    company:       str,
    outputs_dir:   Path,
    peer_map_path: Path,
) -> list[PeerSignal]:
    """Load red flag signals from related companies' existing reports.

    Args:
        company:       Target company ticker (e.g. "BA").
        outputs_dir:   Directory containing generated .md reports.
        peer_map_path: Path to peer_map.json.

    Returns:
        List of :class:`PeerSignal` sorted by (supplier-first, sev DESC).
        Returns an empty list gracefully on any configuration or parse error.
    """
    # ── Load peer map ─────────────────────────────────────────────────────
    if not peer_map_path.exists():
        logger.warning("peer_map.json not found at %s — skipping peer signals", peer_map_path)
        return []

    try:
        with peer_map_path.open(encoding="utf-8") as fh:
            peer_map: dict[str, dict[str, list[str]]] = json.load(fh)
    except Exception as exc:
        logger.warning("Failed to load peer_map.json: %s", exc
PredictionMarket class · python · L104-L116 (13 LOC)
Red Flag Engine/src/prediction_markets.py
class PredictionMarket(BaseModel):
    """A single prediction market fetched from Polymarket or Kalshi."""

    model_config = ConfigDict(extra="forbid")

    platform:        str
    question:        str
    yes_probability: float           # 0.0 – 1.0
    volume_usd:      float
    liquidity_usd:   float  = 0.0
    expires:         Optional[str] = None   # ISO date string
    url:             Optional[str] = None
    relevance_score: float  = 0.0    # computed post-retrieval
MarketClaimCrossRef class · python · L119-L134 (16 LOC)
Red Flag Engine/src/prediction_markets.py
class MarketClaimCrossRef(BaseModel):
    """A cross-reference between one prediction market and one extracted claim."""

    model_config = ConfigDict(extra="forbid")

    market_question:  str
    platform:         str
    yes_probability:  float
    volume_usd:       float
    expires:          Optional[str] = None
    url:              Optional[str] = None
    claim_text:       str
    claim_polarity:   str   # Polarity enum value
    claim_category:   str   # Category enum value, human-readable
    alignment:        str   # "CONTRADICTS" | "CONFIRMS" | "NEUTRAL"
    interpretation:   str
Repobility analyzer · published findings · https://repobility.com
_get_json function · python · L141-L148 (8 LOC)
Red Flag Engine/src/prediction_markets.py
def _get_json(url: str, headers: dict[str, str] | None = None, timeout: int = 12) -> Any:
    req = urllib.request.Request(url)
    req.add_header("User-Agent", "RedFlagEngine/1.0")
    req.add_header("Accept",     "application/json")
    for k, v in (headers or {}).items():
        req.add_header(k, v)
    with urllib.request.urlopen(req, timeout=timeout) as resp:
        return json.loads(resp.read().decode("utf-8"))
_PolymarketClient class · python · L155-L203 (49 LOC)
Red Flag Engine/src/prediction_markets.py
class _PolymarketClient:
    """Polymarket Gamma public API — no authentication required."""

    def search(self, query: str, limit: int = 30) -> list[PredictionMarket]:
        params = urllib.parse.urlencode({
            "search": query,
            "active": "true",
            "closed": "false",
            "limit":  str(limit),
        })
        url = f"{_POLYMARKET_BASE}/markets?{params}"
        try:
            data = _get_json(url)
        except Exception as exc:
            logger.debug("Polymarket search '%s' failed: %s", query, exc)
            return []

        results: list[PredictionMarket] = []
        for item in (data if isinstance(data, list) else []):
            try:
                question = item.get("question", "").strip()
                if not question:
                    continue

                # outcomePrices is a JSON string e.g. '["0.65","0.35"]'
                prices_raw = item.get("outcomePrices", "[]")
                prices = json.loads(prices
search method · python · L158-L203 (46 LOC)
Red Flag Engine/src/prediction_markets.py
    def search(self, query: str, limit: int = 30) -> list[PredictionMarket]:
        params = urllib.parse.urlencode({
            "search": query,
            "active": "true",
            "closed": "false",
            "limit":  str(limit),
        })
        url = f"{_POLYMARKET_BASE}/markets?{params}"
        try:
            data = _get_json(url)
        except Exception as exc:
            logger.debug("Polymarket search '%s' failed: %s", query, exc)
            return []

        results: list[PredictionMarket] = []
        for item in (data if isinstance(data, list) else []):
            try:
                question = item.get("question", "").strip()
                if not question:
                    continue

                # outcomePrices is a JSON string e.g. '["0.65","0.35"]'
                prices_raw = item.get("outcomePrices", "[]")
                prices = json.loads(prices_raw) if isinstance(prices_raw, str) else prices_raw
                yes_prob  = float(prices[
_KalshiClient class · python · L206-L259 (54 LOC)
Red Flag Engine/src/prediction_markets.py
class _KalshiClient:
    """Kalshi REST API — requires KALSHI_API_KEY environment variable."""

    def __init__(self) -> None:
        self._api_key = os.environ.get("KALSHI_API_KEY", "").strip()

    @property
    def available(self) -> bool:
        return bool(self._api_key)

    def search(self, query: str, limit: int = 15) -> list[PredictionMarket]:
        if not self.available:
            return []
        params = urllib.parse.urlencode({
            "search": query,
            "status": "open",
            "limit":  str(limit),
        })
        url = f"{_KALSHI_BASE}/markets?{params}"
        try:
            data = _get_json(url, headers={"Authorization": f"Bearer {self._api_key}"})
        except Exception as exc:
            logger.debug("Kalshi search '%s' failed: %s", query, exc)
            return []

        results: list[PredictionMarket] = []
        for item in (data.get("markets", []) if isinstance(data, dict) else []):
            try:
                title = 
search method · python · L216-L259 (44 LOC)
Red Flag Engine/src/prediction_markets.py
    def search(self, query: str, limit: int = 15) -> list[PredictionMarket]:
        if not self.available:
            return []
        params = urllib.parse.urlencode({
            "search": query,
            "status": "open",
            "limit":  str(limit),
        })
        url = f"{_KALSHI_BASE}/markets?{params}"
        try:
            data = _get_json(url, headers={"Authorization": f"Bearer {self._api_key}"})
        except Exception as exc:
            logger.debug("Kalshi search '%s' failed: %s", query, exc)
            return []

        results: list[PredictionMarket] = []
        for item in (data.get("markets", []) if isinstance(data, dict) else []):
            try:
                title = item.get("title", "").strip()
                if not title:
                    continue
                # Kalshi v2 prices are integers 0–100 (cents)
                yes_ask  = item.get("yes_ask", 50)
                yes_bid  = item.get("yes_bid", 50)
                yes_prob = (
_tokenize function · python · L266-L268 (3 LOC)
Red Flag Engine/src/prediction_markets.py
def _tokenize(text: str) -> frozenset[str]:
    tokens = re.findall(r"[a-z]+", text.lower())
    return frozenset(t for t in tokens if t not in _STOP_WORDS and len(t) > 2)
_jaccard function · python · L271-L274 (4 LOC)
Red Flag Engine/src/prediction_markets.py
def _jaccard(a: frozenset[str], b: frozenset[str]) -> float:
    if not a or not b:
        return 0.0
    return len(a & b) / len(a | b)
_infer_framing function · python · L277-L286 (10 LOC)
Red Flag Engine/src/prediction_markets.py
def _infer_framing(question: str) -> str:
    """Infer whether a Yes outcome is directionally positive or negative."""
    q_tokens  = _tokenize(question)
    pos_score = len(q_tokens & _POS_FRAME)
    neg_score = len(q_tokens & _NEG_FRAME)
    if pos_score > neg_score:
        return "positive"
    if neg_score > pos_score:
        return "negative"
    return "neutral"
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
_compute_alignment function · python · L289-L351 (63 LOC)
Red Flag Engine/src/prediction_markets.py
def _compute_alignment(
    framing:         str,
    yes_probability: float,
    claim_polarity:  Polarity,
) -> tuple[str, str]:
    """Return (alignment_label, interpretation_text).

    Alignment logic:
      positive-framed market + high prob  → bullish signal
      positive-framed market + low prob   → bearish signal
      negative-framed market + high prob  → bearish signal
      negative-framed market + low prob   → bullish signal

    Cross-reference with claim polarity → CONFIRMS / CONTRADICTS / NEUTRAL.
    """
    prob_pct = f"{yes_probability:.0%}"

    # Translate market framing + probability into a directional stance
    if framing == "positive":
        if yes_probability >= _STRONG_PROB_HIGH:
            mkt_stance = "bullish"
            mkt_desc   = f"market strongly expects this positive outcome ({prob_pct})"
        elif yes_probability <= _STRONG_PROB_LOW:
            mkt_stance = "bearish"
            mkt_desc   = f"market is skeptical of this positive outcome ({
_build_queries function · python · L354-L380 (27 LOC)
Red Flag Engine/src/prediction_markets.py
def _build_queries(company: str, claims: list[Claim]) -> list[str]:
    """Build targeted search queries: company name + category enrichments."""
    name = _COMPANY_NAMES.get(company.upper(), company)
    # Start with ticker itself as first query, then full name
    queries: list[str] = [company.upper(), name]

    category_enrichments = {
        "reg_legal":           f"{name} regulatory investigation",
        "guidance":            f"{name} earnings",
        "liquidity":           f"{name} debt financing",
        "costs_restructuring": f"{name} restructuring layoffs",
        "competition":         f"{name} market share",
    }
    active_cats = {c.category.value for c in claims}
    for cat, qry in category_enrichments.items():
        if cat in active_cats:
            queries.append(qry)

    # Deduplicate, preserve order
    seen: set[str] = set()
    unique: list[str] = []
    for q in queries:
        if q not in seen:
            seen.add(q)
            unique.append(q)

_filter_and_rank function · python · L383-L427 (45 LOC)
Red Flag Engine/src/prediction_markets.py
def _filter_and_rank(
    markets:  list[PredictionMarket],
    company:  str,
    claims:   list[Claim],
) -> list[PredictionMarket]:
    """Score markets for company relevance; return top _MAX_MARKETS."""
    name           = _COMPANY_NAMES.get(company.upper(), company)
    company_tokens = _tokenize(name) | _tokenize(company)

    claim_tokens: frozenset[str] = frozenset()
    for c in claims:
        claim_tokens = claim_tokens | _tokenize(c.claim)

    scored: list[PredictionMarket] = []
    seen:   set[str]               = set()

    for m in markets:
        q_norm = m.question.strip().lower()
        if q_norm in seen:
            continue
        seen.add(q_norm)

        mq_tokens = _tokenize(m.question)

        # Hard filter: market question must mention the company ticker or primary name
        company_str  = company.lower()
        q_lower      = m.question.lower()
        name_parts   = name.lower().split()      # all words of the company name
        if (
            c
find_relevant_markets function · python · L434-L473 (40 LOC)
Red Flag Engine/src/prediction_markets.py
def find_relevant_markets(
    company:    str,
    claims:     list[Claim],
    now_period: str = "",
) -> list[PredictionMarket]:
    """Fetch and rank prediction markets relevant to this company and its claims.

    Queries Polymarket (always) and Kalshi (when KALSHI_API_KEY is set).
    Results are filtered for minimum liquidity ($5k volume) and ranked by
    relevance × log(volume).

    Args:
        company:    Ticker (e.g. "BA").
        claims:     Claims from the current-quarter transcript.
        now_period: Period label — informational only.

    Returns:
        Up to 12 :class:`PredictionMarket` objects. Empty list on total failure.
    """
    polymarket = _PolymarketClient()
    kalshi     = _KalshiClient()

    queries     = _build_queries(company, claims)
    all_markets: list[PredictionMarket] = []

    for query in queries:
        all_markets.extend(polymarket.search(query))
        if kalshi.available:
            all_markets.extend(kalshi.search(query))

    if 
cross_reference_with_claims function · python · L476-L548 (73 LOC)
Red Flag Engine/src/prediction_markets.py
def cross_reference_with_claims(
    markets: list[PredictionMarket],
    claims:  list[Claim],
) -> list[MarketClaimCrossRef]:
    """Cross-reference prediction markets against extracted management claims.

    Only processes markets with a strong directional signal (probability
    outside the 35–65% inconclusive zone).  For each such market, finds the
    most semantically overlapping claim and classifies the relationship as
    CONTRADICTS, CONFIRMS, or NEUTRAL.

    CONTRADICTS entries appear first — they are the most actionable signal.

    Args:
        markets: Output of :func:`find_relevant_markets`.
        claims:  Claims from the current-quarter transcript.

    Returns:
        List of :class:`MarketClaimCrossRef`. Empty if no meaningful pairings.
    """
    if not markets or not claims:
        return []

    refs: list[MarketClaimCrossRef] = []

    for market in markets:
        # Only strong probability signals are worth cross-referencing
        if _STRONG_PROB_LOW <
ReportStats class · python · L24-L33 (10 LOC)
Red Flag Engine/src/report.py
class ReportStats(BaseModel):
    model_config = ConfigDict(extra="forbid")

    n_chunks_now:  int = 0
    n_chunks_prev: int = 0
    n_claims_now:  int = 0
    n_claims_prev: int = 0
    n_matched:     int = 0   # strict + soft matches (change_type != new)
    n_new:         int = 0
    n_soft:        int = 0   # soft-matched changes
_escape_pipe function · python · L68-L70 (3 LOC)
Red Flag Engine/src/report.py
def _escape_pipe(text: str) -> str:
    """Escape pipe characters so they don't break Markdown table cells."""
    return text.replace("|", "\\|")
_render_header function · python · L73-L104 (32 LOC)
Red Flag Engine/src/report.py
def _render_header(
    company:     str,
    now_period:  str,
    prev_period: str,
    changes:     list[Change],
    stats:       ReportStats,
) -> str:
    ts            = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
    high_critical = sum(1 for c in changes if c.severity >= 4)
    return "\n".join([
        f"# Red Flag Report: {company}",
        "",
        "| Field | Value |",
        "|-------|-------|",
        f"| Company | **{company}** |",
        f"| Current quarter | `{now_period}` |",
        f"| Prior quarter | `{prev_period}` |",
        f"| Generated | {ts} |",
        f"| Total changes detected | {len(changes)} |",
        f"| High / Critical | **{high_critical}** |",
        "",
        "> **Disclaimer:** This report is a triage aid only. It is NOT a trading signal, "
        "investment recommendation, or financial advice. It may contain false positives or "
        "miss subtle language. All findings must be independently verified by a qualified an
Repobility · code-quality intelligence · https://repobility.com
_render_executive_summary function · python · L107-L117 (11 LOC)
Red Flag Engine/src/report.py
def _render_executive_summary(changes: list[Change]) -> str:
    top   = changes[:_TOP_SUMMARY]
    lines = ["## Executive Summary", ""]
    if not top:
        lines.append("_No material changes detected._")
        return "\n".join(lines)
    for c in top:
        badge = _CHANGE_BADGE.get(c.change_type, c.change_type.value.upper())
        sev   = _severity_label(c.severity)
        lines.append(f"- {badge} **[{sev}]** {c.summary}")
    return "\n".join(lines)
_render_red_flags_table function · python · L120-L175 (56 LOC)
Red Flag Engine/src/report.py
def _render_red_flags_table(changes: list[Change]) -> str:
    top = changes[:_TOP_TABLE]
    lines = [
        "## Red Flags",
        "",
        "| # | Category | Change | Sev | Evidence (Now) | Chunk (Now) | Evidence (Prev) | Chunk (Prev) |",
        "|---|----------|--------|-----|----------------|-------------|-----------------|--------------|",
    ]
    if not top:
        lines.append("| — | — | — | — | _No changes_ | — | — | — |")
        return "\n".join(lines)

    used_analyst = False
    used_soft    = False

    for i, c in enumerate(top, start=1):
        cat   = c.category.value.replace("_", " ").title()
        badge = _CHANGE_BADGE.get(c.change_type, c.change_type.value.upper())
        sev   = _severity_label(c.severity)

        # ── Analyst marker ────────────────────────────────────────────
        analyst_tag = ""
        if c.now_speaker_role == "analyst":
            analyst_tag = " ⁽ᴬ⁾"
            used_analyst = True

        # ── Soft-match marker ─────────
_render_limitations function · python · L179-L196 (18 LOC)
Red Flag Engine/src/report.py
def _render_limitations() -> str:
    return "\n".join([
        "## Limitations",
        "",
        "- **Triage tool only.** This engine surfaces potential narrative shifts for analyst "
        "review. It is not predictive and must not be used as a basis for trading decisions.",
        "- **False positives / negatives.** LLM extraction at temperature=0 is conservative "
        "but not infallible. Subtle hedging, irony, or boilerplate language may be "
        "misclassified.",
        "- **Transcript quality dependency.** Poor-quality PDFs, missing Q&A sections, or "
        "partial transcripts will reduce recall.",
        "- **Evidence is bounded by chunk context.** Cross-paragraph nuance may be missed "
        "if a claim spans a chunk boundary.",
        "- **No investment advice.** No financial advice is provided or implied. The authors "
        "accept no liability for decisions made based on this output.",
        "- **Not a substitute for primary source review.** Alw
_render_methodology function · python · L199-L215 (17 LOC)
Red Flag Engine/src/report.py
def _render_methodology() -> str:
    return "\n".join([
        "## Methodology",
        "",
        "Transcripts are split into ~3,500-character chunks at paragraph boundaries. "
        "Each chunk is labelled with an inferred section (guidance, liquidity, demand, etc.) "
        "via keyword regex and a speaker role (management / analyst / operator) via regex. "
        "Claude extracts at most 6 claims per chunk using a strict zero-temperature prompt "
        "that requires a verbatim evidence quote (≤ 25 words) for every claim; claims without "
        "valid evidence are discarded. Quarter-over-quarter change detection uses a two-pass "
        "RapidFuzz strategy: strict (`token_set_ratio` ≥ 65 on full claim text) then soft "
        "(same category, first 60 chars, ≥ 60). Severity is assigned by a deterministic "
        "heuristic based on change type, category risk, polarity, and confidence; "
        "low-confidence claims are capped at severity 3. Supplementary signals —
_render_abandoned_metrics function · python · L218-L237 (20 LOC)
Red Flag Engine/src/report.py
def _render_abandoned_metrics(abandoned: "list[AbandonedMetric]") -> str:
    """Render the Abandoned Metrics section; returns '' if list is empty."""
    if not abandoned:
        return ""
    lines = [
        "## Abandoned Metrics",
        "",
        "The following categories were discussed in the prior quarter but appear absent "
        "from the current transcript (≥ 2 prior claims, zero fuzzy matches now).",
        "",
        "| Category | Prior Quarter Statement | Evidence | Chunk | Confidence |",
        "|----------|------------------------|----------|-------|------------|",
    ]
    for m in abandoned:
        cat = m.category.value.replace("_", " ").title()
        lines.append(
            f"| {cat} | {_escape_pipe(m.representative_claim)} "
            f"| {_escape_pipe(m.evidence)} | `{m.chunk_id}` | {m.confidence.value} |"
        )
    return "\n".join(lines)
_render_hedging_intensity function · python · L240-L260 (21 LOC)
Red Flag Engine/src/report.py
def _render_hedging_intensity(deltas: "list[HedgeDelta]") -> str:
    """Render the Hedging Intensity section; returns '' if list is empty."""
    if not deltas:
        return ""
    lines = [
        "## Hedging Intensity",
        "",
        "Hedge word density (Tier 1: may/might/could/uncertain…; "
        "Tier 2: expect/anticipate/believe…) per 100 words, by section. "
        "FLAG marks sections where hedging shifted by > 3 percentage points in either direction.",
        "",
        "| Section | Now (/100w) | Prev (/100w) | Chg | Flag |",
        "|---------|------------|-------------|-----|------|",
    ]
    for d in deltas:
        delta_str = f"+{d.delta:.1f}" if d.delta >= 0 else f"{d.delta:.1f}"
        flag_str  = "FLAG" if d.flag else ""
        lines.append(
            f"| {d.section} | {d.now_score:.1f} | {d.prev_score:.1f} | {delta_str} | {flag_str} |"
        )
    return "\n".join(lines)
_render_peer_signals function · python · L263-L281 (19 LOC)
Red Flag Engine/src/report.py
def _render_peer_signals(signals: "list[PeerSignal]") -> str:
    """Render the Peer & Supplier Signals section; returns '' if list is empty."""
    if not signals:
        return ""
    lines = [
        "## Peer & Supplier Signals",
        "",
        "Red flags surfaced from related companies' most recent reports. "
        "These may indicate sector-level or supply-chain stress relevant to this company.",
        "",
        "| Source | Rel | Category | Evidence | Polarity | Sev | Report |",
        "|--------|-----|----------|----------|----------|-----|--------|",
    ]
    for s in signals:
        lines.append(
            f"| {s.source_company} | {s.relationship} | {s.category} "
            f"| {_escape_pipe(s.evidence)} | {s.polarity} | {s.sev} | `{s.report_filename}` |"
        )
    return "\n".join(lines)
_render_prediction_markets function · python · L284-L353 (70 LOC)
Red Flag Engine/src/report.py
def _render_prediction_markets(
    markets:   "list[PredictionMarket]",
    crossrefs: "list[MarketClaimCrossRef]",
) -> str:
    """Render the Prediction Market Context section; returns '' if both lists are empty."""
    if not markets and not crossrefs:
        return ""

    from datetime import date as _date
    today = _date.today().isoformat()

    lines = [
        "## Prediction Market Context",
        "",
        f"Active markets sourced from Polymarket / Kalshi as of {today}. "
        "Minimum volume: $5,000. Only markets with a strong probability signal "
        "(Yes < 35% or Yes > 65%) are cross-referenced with management claims.",
    ]

    # ── Active Markets table ──────────────────────────────────────────────
    if markets:
        lines += [
            "",
            "### Active Markets",
            "",
            "| Platform | Market | Yes % | Volume (USD) | Expires |",
            "|----------|--------|-------|--------------|---------|",
        ]
        
If a scraper extracted this row, it came from Repobility (https://repobility.com)
_render_backtest_context function · python · L356-L376 (21 LOC)
Red Flag Engine/src/report.py
def _render_backtest_context(bt: "PostEarningsReturns") -> str:
    """Render the Backtest Context section."""
    def _fmt(r: Optional[float]) -> str:
        if r is None:
            return "—"
        sign = "+" if r > 0 else ""
        return f"{sign}{r * 100:.1f}%"

    lines = [
        "## Backtest Context",
        "",
        "| Window | Return |",
        "|--------|--------|",
        f"| 1-day post-earnings  | {_fmt(bt.ret_1d)} |",
        f"| 5-day post-earnings  | {_fmt(bt.ret_5d)} |",
        f"| 20-day post-earnings | {_fmt(bt.ret_20d)} |",
        "",
        f"*Based on earnings call date {bt.call_date}. "
        "Retrospective data only — not a trading signal.*",
    ]
    return "\n".join(lines)
generate_report function · python · L383-L454 (72 LOC)
Red Flag Engine/src/report.py
def generate_report(
    company:            str,
    now_period:         str,
    prev_period:        str,
    changes:            list[Change],
    stats:              ReportStats | None = None,
    ai_sensitivity_md:  str = "",
    abandoned_metrics:  "list[AbandonedMetric] | None" = None,
    hedge_deltas:       "list[HedgeDelta] | None" = None,
    peer_signals:       "list[PeerSignal] | None" = None,
    backtest_returns:   "PostEarningsReturns | None" = None,
    pred_markets:       "list[PredictionMarket] | None" = None,
    pred_crossref:      "list[MarketClaimCrossRef] | None" = None,
) -> str:
    """Render a complete Markdown report string from a list of Changes.

    Args:
        company:           Company identifier (e.g. "BA").
        now_period:        Label for the current quarter (e.g. "2025Q4").
        prev_period:       Label for the prior quarter (e.g. "2025Q3").
        changes:           Output of diff.match_claims(), sorted by severity DESC.
        stats:   
save_report function · python · L457-L470 (14 LOC)
Red Flag Engine/src/report.py
def save_report(
    report_md:   str,
    company:     str,
    now_period:  str,
    prev_period: str,
    output_dir:  Path,
) -> Path:
    """Write the Markdown report to *output_dir* and return the file path."""
    output_dir.mkdir(parents=True, exist_ok=True)
    filename = f"{company}_{now_period}_vs_{prev_period}.md"
    out_path = output_dir / filename
    out_path.write_text(report_md, encoding="utf-8")
    logger.info("Report saved → %s", out_path)
    return out_path
tag_speaker_role function · python · L61-L70 (10 LOC)
Red Flag Engine/src/segment.py
def tag_speaker_role(text: str) -> str:
    """Infer the dominant speaker role in a chunk using regex patterns.

    Returns one of: ``"operator"``, ``"analyst"``, ``"management"``,
    ``"unknown"``.  First match wins in the priority order above.
    """
    for role, pattern in _SPEAKER_PATTERNS:
        if pattern.search(text):
            return role
    return "unknown"
Chunk class · python · L74-L78 (5 LOC)
Red Flag Engine/src/segment.py
class Chunk:
    chunk_id:     str   # "chunk_000", "chunk_001", …
    section:      str   # inferred section label
    text:         str
    speaker_role: str = "unknown"   # operator | analyst | management | unknown
infer_section function · python · L85-L90 (6 LOC)
Red Flag Engine/src/segment.py
def infer_section(text: str) -> str:
    """Return the first matching section label for *text*, or 'general'."""
    for label, pattern in _COMPILED_HINTS:
        if pattern.search(text):
            return label
    return "general"
chunk_text function · python · L93-L138 (46 LOC)
Red Flag Engine/src/segment.py
def chunk_text(text: str, max_chars: int = 3500) -> list[str]:
    """Split *text* into chunks of at most *max_chars* characters.

    Splits preferentially on blank lines (paragraph boundaries).  The running
    length correctly accounts for the ``\\n\\n`` separator that will be inserted
    between paragraphs when the chunk is joined.  A single paragraph that
    exceeds *max_chars* is hard-split at the character limit rather than
    being dropped.
    """
    paragraphs = re.split(r"\n{2,}", text)

    chunks: list[str] = []
    current_parts: list[str] = []
    current_len: int = 0   # tracks *actual* joined byte length including separators

    for para in paragraphs:
        para = para.strip()
        if not para:
            continue

        # If a single paragraph is itself too long, hard-split it first.
        if len(para) > max_chars:
            if current_parts:
                chunks.append(_SEPARATOR.join(current_parts))
                current_parts = []
            
segment_doc function · python · L141-L165 (25 LOC)
Red Flag Engine/src/segment.py
def segment_doc(doc: Doc, max_chars: int = 3500) -> list[Chunk]:
    """Segment a Doc into labeled Chunk objects.

    Args:
        doc:       The loaded transcript document.
        max_chars: Maximum characters per chunk (default 3500).

    Returns:
        Ordered list of Chunk objects with unique chunk_ids.
    """
    raw_chunks = chunk_text(doc.text, max_chars=max_chars)

    chunks: list[Chunk] = []
    for idx, text in enumerate(raw_chunks):
        chunk_id     = f"chunk_{idx:03d}"
        section      = infer_section(text)
        speaker_role = tag_speaker_role(text)
        chunks.append(Chunk(chunk_id=chunk_id, section=section, text=text,
                            speaker_role=speaker_role))
        logger.debug("  %s  section=%-22s  len=%d", chunk_id, section, len(text))

    logger.info(
        "Segmented '%s %s' → %d chunks", doc.company, doc.period, len(chunks)
    )
    return chunks
Repobility analyzer · published findings · https://repobility.com
_parse_report_meta function · python · L313-L331 (19 LOC)
Red Flag Engine/streamlit_app.py
def _parse_report_meta(md_path: Path) -> dict:
    """Return dashboard metadata for one report file."""
    m       = _FILENAME_RE.match(md_path.name)
    company = m.group(1).upper() if m else md_path.stem
    now     = m.group(2)         if m else "—"
    prev    = m.group(3)         if m else "—"

    content = md_path.read_text(encoding="utf-8", errors="ignore")
    total_m = re.search(r"\| Total changes detected \| (\d+) \|", content)
    high_m  = re.search(r"\| High / Critical \| \*\*(\d+)\*\* \|", content)

    return {
        "company":       company,
        "now":           now,
        "prev":          prev,
        "total_changes": int(total_m.group(1)) if total_m else 0,
        "high_critical": int(high_m.group(1))  if high_m  else 0,
        "filename":      md_path.name,
    }
_split_sections function · python · L334-L346 (13 LOC)
Red Flag Engine/streamlit_app.py
def _split_sections(content: str) -> dict[str, str]:
    """Split a report into named sections using the --- separators."""
    parts    = re.split(r"\n\n---\n\n", content)
    sections: dict[str, str] = {}
    for part in parts:
        stripped = part.strip()
        if stripped.startswith("# Red Flag Report"):
            sections["header"] = stripped
        else:
            hm = re.match(r"^## (.+)", stripped)
            if hm:
                sections[hm.group(1).strip()] = stripped
    return sections
_split_pipe_row function · python · L349-L356 (8 LOC)
Red Flag Engine/streamlit_app.py
def _split_pipe_row(line: str) -> list[str]:
    """Split a Markdown table row on unescaped pipes; strip leading/trailing empties."""
    parts = re.split(r"(?<!\\)\|", line)
    while parts and not parts[0].strip():
        parts.pop(0)
    while parts and not parts[-1].strip():
        parts.pop()
    return [p.strip().replace("\\|", "|") for p in parts]
‹ prevpage 2 / 3next ›