Function bodies 106 total
_resolve function · python · L81-L84 (4 LOC)Red Flag Engine/src/main.py
def _resolve(path_str: str) -> Path:
"""Return an absolute Path, resolving relative paths from the project root."""
p = Path(path_str)
return p if p.is_absolute() else (_PROJECT_ROOT / p).resolve()_find_transcript function · python · L87-L111 (25 LOC)Red Flag Engine/src/main.py
def _find_transcript(company: str, period: str, data_dir: Path) -> Path:
"""Locate <period>_transcript.pdf or .txt under data_dir/company/.
Raises:
FileNotFoundError: With an actionable message if no file is found.
"""
company_dir = data_dir / company
if not company_dir.exists():
raise FileNotFoundError(
f"Company directory not found: {company_dir}\n"
f" Expected location: {company_dir}\n"
f" Create it and add a transcript file named:\n"
f" {period}_transcript.pdf or {period}_transcript.txt"
)
for ext in (".pdf", ".txt"):
candidate = company_dir / f"{period}_transcript{ext}"
if candidate.exists():
return candidate
raise FileNotFoundError(
f"No transcript found for {company} / {period}\n"
f" Looked in: {company_dir}\n"
f" Expected one of:\n"
f" {period}_transcript.pdf\n"
f" {period}_transcript.txt"
)_check_api_key function · python · L118-L134 (17 LOC)Red Flag Engine/src/main.py
def _check_api_key() -> None:
key = os.environ.get("ANTHROPIC_API_KEY", "").strip()
if not key:
print(
"\nERROR: ANTHROPIC_API_KEY is not set.\n"
"\n"
" Fix:\n"
" 1. Create a file named .env in the project root:\n"
f" {_PROJECT_ROOT / '.env'}\n"
" 2. Add this line: ANTHROPIC_API_KEY=sk-ant-...\n"
" 3. Get a key at: https://console.anthropic.com/\n"
"\n"
" Note: A Claude Pro (claude.ai) subscription does NOT include API\n"
" access. You need a separate Anthropic API account.\n",
file=sys.stderr,
)
sys.exit(1)run_selfcheck function · python · L141-L178 (38 LOC)Red Flag Engine/src/main.py
def run_selfcheck(
company: str,
prev_period: str,
now_period: str,
data_dir: Path,
) -> None:
"""Ingest and segment both transcripts; print diagnostic stats.
No LLM calls are made. Use this to verify files load correctly
and chunking behaves as expected before spending API credits.
"""
for period in (prev_period, now_period):
try:
path = _find_transcript(company, period, data_dir)
except FileNotFoundError as exc:
print(f"\n[SELFCHECK] ERROR: {exc}", file=sys.stderr)
continue
doc = load_doc(company, period, path)
chunks = segment_doc(doc)
section_counts = Counter(c.section for c in chunks)
print(f"\n{'='*60}")
print(f" {company} {period}")
print(f"{'='*60}")
print(f" File : {path}")
print(f" Text length : {len(doc.text):,} chars")
print(f" Chunks : {len(chunks)}")
print(f" Sections :")
_parse_args function · python · L185-L243 (59 LOC)Red Flag Engine/src/main.py
def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
prog="python src/main.py",
description="Red Flag Engine — earnings call transcript change detector",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"Examples:\n"
" python src/main.py --company BA --prev 2025Q3 --now 2025Q4\n"
" python src/main.py --company TSLA --prev 2025Q3 --now 2025Q4 --threshold 80\n"
" python src/main.py --company BA --prev 2025Q3 --now 2025Q4 --selfcheck\n"
),
)
parser.add_argument(
"--company", required=True,
help="Company ticker, e.g. BA.",
)
parser.add_argument(
"--prev", required=True, metavar="PERIOD",
help="Prior-quarter period label, e.g. 2025Q3.",
)
parser.add_argument(
"--now", required=True, metavar="PERIOD",
help="Current-quarter period label, e.g. 2025Q4.",
)
run_pipeline function · python · L250-L398 (149 LOC)Red Flag Engine/src/main.py
def run_pipeline(
company: str,
now_period: str,
prev_period: str,
now_path: str,
prev_path: str,
) -> str:
"""Run the full Red Flag Engine pipeline with explicit file paths.
Unlike ``run()``, this function accepts absolute file paths directly
(useful when the caller — e.g. a Streamlit UI — already has the file
on disk) rather than deriving them from a company / period / data-dir
triple.
Args:
company: Company ticker (e.g. "BA").
now_period: Label for the current quarter (e.g. "2025Q4").
prev_period: Label for the prior quarter (e.g. "2025Q3").
now_path: Absolute path to the current-quarter transcript.
prev_path: Absolute path to the prior-quarter transcript.
Returns:
Absolute path to the generated Markdown report as a string.
Raises:
RuntimeError: If ingestion or segmentation yields empty results.
Any exception from the underlying pipeline steps irun function · python · L405-L543 (139 LOC)Red Flag Engine/src/main.py
def run(
company: str,
prev_period: str,
now_period: str,
data_dir: Path,
output_dir: Path,
threshold: int,
) -> Path:
"""Execute the full pipeline and return the path to the written report."""
log = logging.getLogger(__name__)
prev_path = _find_transcript(company, prev_period, data_dir)
now_path = _find_transcript(company, now_period, data_dir)
log.info("=== Red Flag Engine ===")
log.info("Project root : %s", _PROJECT_ROOT)
log.info("Company : %s", company)
log.info("Prior quarter : %s (%s)", prev_period, prev_path)
log.info("Now quarter : %s (%s)", now_period, now_path)
log.info("Threshold : %d", threshold)
log.info("Output dir : %s", output_dir)
# 1. Ingest
log.info("[1/5] Ingesting transcripts…")
doc_prev = load_doc(company, prev_period, prev_path)
doc_now = load_doc(company, now_period, now_path)
if not doc_prev.text.strip():
raise RuntimeError(f"Prior transcriIf a scraper extracted this row, it came from Repobility (https://repobility.com)
main function · python · L550-L586 (37 LOC)Red Flag Engine/src/main.py
def main(argv: list[str] | None = None) -> None:
args = _parse_args(argv)
_configure_logging(args.log_level)
data_dir = _resolve(args.data_dir)
output_dir = _resolve(args.output_dir)
# ── Self-check: no API needed ──────────────────────────────────────────
if args.selfcheck:
run_selfcheck(
company=args.company,
prev_period=args.prev,
now_period=args.now,
data_dir=data_dir,
)
return
# ── Full pipeline: API required ────────────────────────────────────────
_check_api_key()
try:
out_path = run(
company=args.company,
prev_period=args.prev,
now_period=args.now,
data_dir=data_dir,
output_dir=output_dir,
threshold=args.threshold,
)
except FileNotFoundError as exc:
print(f"\nERROR (file not found):\n{exc}\n", file=sys.stderr)
sys.exit(1)
except (ValueError, RuntimeError) PeerSignal class · python · L32-L44 (13 LOC)Red Flag Engine/src/peer_contagion.py
class PeerSignal(BaseModel):
"""A single red flag sourced from a related company's report."""
model_config = ConfigDict(extra="forbid")
source_company: str # ticker of the peer / supplier
relationship: str # "peer" | "supplier"
category: str
claim: str
evidence: str
polarity: str # "negative" | "mixed"
sev: str # severity label, e.g. "High", "Critical"
report_filename: str_split_pipe_row function · python · L52-L60 (9 LOC)Red Flag Engine/src/peer_contagion.py
def _split_pipe_row(line: str) -> list[str]:
"""Split one Markdown table row on unescaped pipes."""
parts = re.split(r"(?<!\\)\|", line)
# Drop leading/trailing empty cells from the outer | delimiters
while parts and not parts[0].strip():
parts.pop(0)
while parts and not parts[-1].strip():
parts.pop()
return [p.strip().replace("\\|", "|") for p in parts]_parse_red_flags_table function · python · L63-L87 (25 LOC)Red Flag Engine/src/peer_contagion.py
def _parse_red_flags_table(md_text: str) -> list[dict[str, str]]:
"""Extract rows from the ## Red Flags section of a report Markdown string.
Returns a list of dicts keyed by the table header names.
Returns an empty list if the section or table is absent / malformed.
"""
# Split report into sections using the --- separator
parts = re.split(r"\n\n---\n\n", md_text)
for part in parts:
if re.match(r"^## Red Flags\b", part.strip()):
# Found the Red Flags section — parse the pipe table within it
lines = [
ln for ln in part.splitlines()
if re.match(r"^\s*\|", ln)
]
if len(lines) < 3:
return []
headers = _split_pipe_row(lines[0])
rows: list[dict[str, str]] = []
for line in lines[2:]: # skip header + separator row
cells = _split_pipe_row(line)
if len(cells) == len(headers):
ro_polarity_proxy function · python · L94-L106 (13 LOC)Red Flag Engine/src/peer_contagion.py
def _polarity_proxy(change: str, sev: str) -> str | None:
"""Map a Change cell + Sev cell to a polarity string, or None to skip.
Rules:
- "WORSENED" in change → "negative"
- "NEW" in change AND sev in _HIGH_SEV_LABELS → "mixed"
- All other rows → None (excluded)
"""
if "WORSENED" in change:
return "negative"
if "NEW" in change and sev in _HIGH_SEV_LABELS:
return "mixed"
return Noneload_peer_signals function · python · L113-L212 (100 LOC)Red Flag Engine/src/peer_contagion.py
def load_peer_signals(
company: str,
outputs_dir: Path,
peer_map_path: Path,
) -> list[PeerSignal]:
"""Load red flag signals from related companies' existing reports.
Args:
company: Target company ticker (e.g. "BA").
outputs_dir: Directory containing generated .md reports.
peer_map_path: Path to peer_map.json.
Returns:
List of :class:`PeerSignal` sorted by (supplier-first, sev DESC).
Returns an empty list gracefully on any configuration or parse error.
"""
# ── Load peer map ─────────────────────────────────────────────────────
if not peer_map_path.exists():
logger.warning("peer_map.json not found at %s — skipping peer signals", peer_map_path)
return []
try:
with peer_map_path.open(encoding="utf-8") as fh:
peer_map: dict[str, dict[str, list[str]]] = json.load(fh)
except Exception as exc:
logger.warning("Failed to load peer_map.json: %s", excPredictionMarket class · python · L104-L116 (13 LOC)Red Flag Engine/src/prediction_markets.py
class PredictionMarket(BaseModel):
"""A single prediction market fetched from Polymarket or Kalshi."""
model_config = ConfigDict(extra="forbid")
platform: str
question: str
yes_probability: float # 0.0 – 1.0
volume_usd: float
liquidity_usd: float = 0.0
expires: Optional[str] = None # ISO date string
url: Optional[str] = None
relevance_score: float = 0.0 # computed post-retrievalMarketClaimCrossRef class · python · L119-L134 (16 LOC)Red Flag Engine/src/prediction_markets.py
class MarketClaimCrossRef(BaseModel):
"""A cross-reference between one prediction market and one extracted claim."""
model_config = ConfigDict(extra="forbid")
market_question: str
platform: str
yes_probability: float
volume_usd: float
expires: Optional[str] = None
url: Optional[str] = None
claim_text: str
claim_polarity: str # Polarity enum value
claim_category: str # Category enum value, human-readable
alignment: str # "CONTRADICTS" | "CONFIRMS" | "NEUTRAL"
interpretation: strRepobility analyzer · published findings · https://repobility.com
_get_json function · python · L141-L148 (8 LOC)Red Flag Engine/src/prediction_markets.py
def _get_json(url: str, headers: dict[str, str] | None = None, timeout: int = 12) -> Any:
req = urllib.request.Request(url)
req.add_header("User-Agent", "RedFlagEngine/1.0")
req.add_header("Accept", "application/json")
for k, v in (headers or {}).items():
req.add_header(k, v)
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read().decode("utf-8"))_PolymarketClient class · python · L155-L203 (49 LOC)Red Flag Engine/src/prediction_markets.py
class _PolymarketClient:
"""Polymarket Gamma public API — no authentication required."""
def search(self, query: str, limit: int = 30) -> list[PredictionMarket]:
params = urllib.parse.urlencode({
"search": query,
"active": "true",
"closed": "false",
"limit": str(limit),
})
url = f"{_POLYMARKET_BASE}/markets?{params}"
try:
data = _get_json(url)
except Exception as exc:
logger.debug("Polymarket search '%s' failed: %s", query, exc)
return []
results: list[PredictionMarket] = []
for item in (data if isinstance(data, list) else []):
try:
question = item.get("question", "").strip()
if not question:
continue
# outcomePrices is a JSON string e.g. '["0.65","0.35"]'
prices_raw = item.get("outcomePrices", "[]")
prices = json.loads(pricessearch method · python · L158-L203 (46 LOC)Red Flag Engine/src/prediction_markets.py
def search(self, query: str, limit: int = 30) -> list[PredictionMarket]:
params = urllib.parse.urlencode({
"search": query,
"active": "true",
"closed": "false",
"limit": str(limit),
})
url = f"{_POLYMARKET_BASE}/markets?{params}"
try:
data = _get_json(url)
except Exception as exc:
logger.debug("Polymarket search '%s' failed: %s", query, exc)
return []
results: list[PredictionMarket] = []
for item in (data if isinstance(data, list) else []):
try:
question = item.get("question", "").strip()
if not question:
continue
# outcomePrices is a JSON string e.g. '["0.65","0.35"]'
prices_raw = item.get("outcomePrices", "[]")
prices = json.loads(prices_raw) if isinstance(prices_raw, str) else prices_raw
yes_prob = float(prices[_KalshiClient class · python · L206-L259 (54 LOC)Red Flag Engine/src/prediction_markets.py
class _KalshiClient:
"""Kalshi REST API — requires KALSHI_API_KEY environment variable."""
def __init__(self) -> None:
self._api_key = os.environ.get("KALSHI_API_KEY", "").strip()
@property
def available(self) -> bool:
return bool(self._api_key)
def search(self, query: str, limit: int = 15) -> list[PredictionMarket]:
if not self.available:
return []
params = urllib.parse.urlencode({
"search": query,
"status": "open",
"limit": str(limit),
})
url = f"{_KALSHI_BASE}/markets?{params}"
try:
data = _get_json(url, headers={"Authorization": f"Bearer {self._api_key}"})
except Exception as exc:
logger.debug("Kalshi search '%s' failed: %s", query, exc)
return []
results: list[PredictionMarket] = []
for item in (data.get("markets", []) if isinstance(data, dict) else []):
try:
title = search method · python · L216-L259 (44 LOC)Red Flag Engine/src/prediction_markets.py
def search(self, query: str, limit: int = 15) -> list[PredictionMarket]:
if not self.available:
return []
params = urllib.parse.urlencode({
"search": query,
"status": "open",
"limit": str(limit),
})
url = f"{_KALSHI_BASE}/markets?{params}"
try:
data = _get_json(url, headers={"Authorization": f"Bearer {self._api_key}"})
except Exception as exc:
logger.debug("Kalshi search '%s' failed: %s", query, exc)
return []
results: list[PredictionMarket] = []
for item in (data.get("markets", []) if isinstance(data, dict) else []):
try:
title = item.get("title", "").strip()
if not title:
continue
# Kalshi v2 prices are integers 0–100 (cents)
yes_ask = item.get("yes_ask", 50)
yes_bid = item.get("yes_bid", 50)
yes_prob = (_tokenize function · python · L266-L268 (3 LOC)Red Flag Engine/src/prediction_markets.py
def _tokenize(text: str) -> frozenset[str]:
tokens = re.findall(r"[a-z]+", text.lower())
return frozenset(t for t in tokens if t not in _STOP_WORDS and len(t) > 2)_jaccard function · python · L271-L274 (4 LOC)Red Flag Engine/src/prediction_markets.py
def _jaccard(a: frozenset[str], b: frozenset[str]) -> float:
if not a or not b:
return 0.0
return len(a & b) / len(a | b)_infer_framing function · python · L277-L286 (10 LOC)Red Flag Engine/src/prediction_markets.py
def _infer_framing(question: str) -> str:
"""Infer whether a Yes outcome is directionally positive or negative."""
q_tokens = _tokenize(question)
pos_score = len(q_tokens & _POS_FRAME)
neg_score = len(q_tokens & _NEG_FRAME)
if pos_score > neg_score:
return "positive"
if neg_score > pos_score:
return "negative"
return "neutral"Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
_compute_alignment function · python · L289-L351 (63 LOC)Red Flag Engine/src/prediction_markets.py
def _compute_alignment(
framing: str,
yes_probability: float,
claim_polarity: Polarity,
) -> tuple[str, str]:
"""Return (alignment_label, interpretation_text).
Alignment logic:
positive-framed market + high prob → bullish signal
positive-framed market + low prob → bearish signal
negative-framed market + high prob → bearish signal
negative-framed market + low prob → bullish signal
Cross-reference with claim polarity → CONFIRMS / CONTRADICTS / NEUTRAL.
"""
prob_pct = f"{yes_probability:.0%}"
# Translate market framing + probability into a directional stance
if framing == "positive":
if yes_probability >= _STRONG_PROB_HIGH:
mkt_stance = "bullish"
mkt_desc = f"market strongly expects this positive outcome ({prob_pct})"
elif yes_probability <= _STRONG_PROB_LOW:
mkt_stance = "bearish"
mkt_desc = f"market is skeptical of this positive outcome ({_build_queries function · python · L354-L380 (27 LOC)Red Flag Engine/src/prediction_markets.py
def _build_queries(company: str, claims: list[Claim]) -> list[str]:
"""Build targeted search queries: company name + category enrichments."""
name = _COMPANY_NAMES.get(company.upper(), company)
# Start with ticker itself as first query, then full name
queries: list[str] = [company.upper(), name]
category_enrichments = {
"reg_legal": f"{name} regulatory investigation",
"guidance": f"{name} earnings",
"liquidity": f"{name} debt financing",
"costs_restructuring": f"{name} restructuring layoffs",
"competition": f"{name} market share",
}
active_cats = {c.category.value for c in claims}
for cat, qry in category_enrichments.items():
if cat in active_cats:
queries.append(qry)
# Deduplicate, preserve order
seen: set[str] = set()
unique: list[str] = []
for q in queries:
if q not in seen:
seen.add(q)
unique.append(q)
_filter_and_rank function · python · L383-L427 (45 LOC)Red Flag Engine/src/prediction_markets.py
def _filter_and_rank(
markets: list[PredictionMarket],
company: str,
claims: list[Claim],
) -> list[PredictionMarket]:
"""Score markets for company relevance; return top _MAX_MARKETS."""
name = _COMPANY_NAMES.get(company.upper(), company)
company_tokens = _tokenize(name) | _tokenize(company)
claim_tokens: frozenset[str] = frozenset()
for c in claims:
claim_tokens = claim_tokens | _tokenize(c.claim)
scored: list[PredictionMarket] = []
seen: set[str] = set()
for m in markets:
q_norm = m.question.strip().lower()
if q_norm in seen:
continue
seen.add(q_norm)
mq_tokens = _tokenize(m.question)
# Hard filter: market question must mention the company ticker or primary name
company_str = company.lower()
q_lower = m.question.lower()
name_parts = name.lower().split() # all words of the company name
if (
cfind_relevant_markets function · python · L434-L473 (40 LOC)Red Flag Engine/src/prediction_markets.py
def find_relevant_markets(
company: str,
claims: list[Claim],
now_period: str = "",
) -> list[PredictionMarket]:
"""Fetch and rank prediction markets relevant to this company and its claims.
Queries Polymarket (always) and Kalshi (when KALSHI_API_KEY is set).
Results are filtered for minimum liquidity ($5k volume) and ranked by
relevance × log(volume).
Args:
company: Ticker (e.g. "BA").
claims: Claims from the current-quarter transcript.
now_period: Period label — informational only.
Returns:
Up to 12 :class:`PredictionMarket` objects. Empty list on total failure.
"""
polymarket = _PolymarketClient()
kalshi = _KalshiClient()
queries = _build_queries(company, claims)
all_markets: list[PredictionMarket] = []
for query in queries:
all_markets.extend(polymarket.search(query))
if kalshi.available:
all_markets.extend(kalshi.search(query))
if cross_reference_with_claims function · python · L476-L548 (73 LOC)Red Flag Engine/src/prediction_markets.py
def cross_reference_with_claims(
markets: list[PredictionMarket],
claims: list[Claim],
) -> list[MarketClaimCrossRef]:
"""Cross-reference prediction markets against extracted management claims.
Only processes markets with a strong directional signal (probability
outside the 35–65% inconclusive zone). For each such market, finds the
most semantically overlapping claim and classifies the relationship as
CONTRADICTS, CONFIRMS, or NEUTRAL.
CONTRADICTS entries appear first — they are the most actionable signal.
Args:
markets: Output of :func:`find_relevant_markets`.
claims: Claims from the current-quarter transcript.
Returns:
List of :class:`MarketClaimCrossRef`. Empty if no meaningful pairings.
"""
if not markets or not claims:
return []
refs: list[MarketClaimCrossRef] = []
for market in markets:
# Only strong probability signals are worth cross-referencing
if _STRONG_PROB_LOW <ReportStats class · python · L24-L33 (10 LOC)Red Flag Engine/src/report.py
class ReportStats(BaseModel):
model_config = ConfigDict(extra="forbid")
n_chunks_now: int = 0
n_chunks_prev: int = 0
n_claims_now: int = 0
n_claims_prev: int = 0
n_matched: int = 0 # strict + soft matches (change_type != new)
n_new: int = 0
n_soft: int = 0 # soft-matched changes_escape_pipe function · python · L68-L70 (3 LOC)Red Flag Engine/src/report.py
def _escape_pipe(text: str) -> str:
"""Escape pipe characters so they don't break Markdown table cells."""
return text.replace("|", "\\|")_render_header function · python · L73-L104 (32 LOC)Red Flag Engine/src/report.py
def _render_header(
company: str,
now_period: str,
prev_period: str,
changes: list[Change],
stats: ReportStats,
) -> str:
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
high_critical = sum(1 for c in changes if c.severity >= 4)
return "\n".join([
f"# Red Flag Report: {company}",
"",
"| Field | Value |",
"|-------|-------|",
f"| Company | **{company}** |",
f"| Current quarter | `{now_period}` |",
f"| Prior quarter | `{prev_period}` |",
f"| Generated | {ts} |",
f"| Total changes detected | {len(changes)} |",
f"| High / Critical | **{high_critical}** |",
"",
"> **Disclaimer:** This report is a triage aid only. It is NOT a trading signal, "
"investment recommendation, or financial advice. It may contain false positives or "
"miss subtle language. All findings must be independently verified by a qualified anRepobility · code-quality intelligence · https://repobility.com
_render_executive_summary function · python · L107-L117 (11 LOC)Red Flag Engine/src/report.py
def _render_executive_summary(changes: list[Change]) -> str:
top = changes[:_TOP_SUMMARY]
lines = ["## Executive Summary", ""]
if not top:
lines.append("_No material changes detected._")
return "\n".join(lines)
for c in top:
badge = _CHANGE_BADGE.get(c.change_type, c.change_type.value.upper())
sev = _severity_label(c.severity)
lines.append(f"- {badge} **[{sev}]** {c.summary}")
return "\n".join(lines)_render_red_flags_table function · python · L120-L175 (56 LOC)Red Flag Engine/src/report.py
def _render_red_flags_table(changes: list[Change]) -> str:
top = changes[:_TOP_TABLE]
lines = [
"## Red Flags",
"",
"| # | Category | Change | Sev | Evidence (Now) | Chunk (Now) | Evidence (Prev) | Chunk (Prev) |",
"|---|----------|--------|-----|----------------|-------------|-----------------|--------------|",
]
if not top:
lines.append("| — | — | — | — | _No changes_ | — | — | — |")
return "\n".join(lines)
used_analyst = False
used_soft = False
for i, c in enumerate(top, start=1):
cat = c.category.value.replace("_", " ").title()
badge = _CHANGE_BADGE.get(c.change_type, c.change_type.value.upper())
sev = _severity_label(c.severity)
# ── Analyst marker ────────────────────────────────────────────
analyst_tag = ""
if c.now_speaker_role == "analyst":
analyst_tag = " ⁽ᴬ⁾"
used_analyst = True
# ── Soft-match marker ─────────_render_limitations function · python · L179-L196 (18 LOC)Red Flag Engine/src/report.py
def _render_limitations() -> str:
return "\n".join([
"## Limitations",
"",
"- **Triage tool only.** This engine surfaces potential narrative shifts for analyst "
"review. It is not predictive and must not be used as a basis for trading decisions.",
"- **False positives / negatives.** LLM extraction at temperature=0 is conservative "
"but not infallible. Subtle hedging, irony, or boilerplate language may be "
"misclassified.",
"- **Transcript quality dependency.** Poor-quality PDFs, missing Q&A sections, or "
"partial transcripts will reduce recall.",
"- **Evidence is bounded by chunk context.** Cross-paragraph nuance may be missed "
"if a claim spans a chunk boundary.",
"- **No investment advice.** No financial advice is provided or implied. The authors "
"accept no liability for decisions made based on this output.",
"- **Not a substitute for primary source review.** Alw_render_methodology function · python · L199-L215 (17 LOC)Red Flag Engine/src/report.py
def _render_methodology() -> str:
return "\n".join([
"## Methodology",
"",
"Transcripts are split into ~3,500-character chunks at paragraph boundaries. "
"Each chunk is labelled with an inferred section (guidance, liquidity, demand, etc.) "
"via keyword regex and a speaker role (management / analyst / operator) via regex. "
"Claude extracts at most 6 claims per chunk using a strict zero-temperature prompt "
"that requires a verbatim evidence quote (≤ 25 words) for every claim; claims without "
"valid evidence are discarded. Quarter-over-quarter change detection uses a two-pass "
"RapidFuzz strategy: strict (`token_set_ratio` ≥ 65 on full claim text) then soft "
"(same category, first 60 chars, ≥ 60). Severity is assigned by a deterministic "
"heuristic based on change type, category risk, polarity, and confidence; "
"low-confidence claims are capped at severity 3. Supplementary signals —_render_abandoned_metrics function · python · L218-L237 (20 LOC)Red Flag Engine/src/report.py
def _render_abandoned_metrics(abandoned: "list[AbandonedMetric]") -> str:
"""Render the Abandoned Metrics section; returns '' if list is empty."""
if not abandoned:
return ""
lines = [
"## Abandoned Metrics",
"",
"The following categories were discussed in the prior quarter but appear absent "
"from the current transcript (≥ 2 prior claims, zero fuzzy matches now).",
"",
"| Category | Prior Quarter Statement | Evidence | Chunk | Confidence |",
"|----------|------------------------|----------|-------|------------|",
]
for m in abandoned:
cat = m.category.value.replace("_", " ").title()
lines.append(
f"| {cat} | {_escape_pipe(m.representative_claim)} "
f"| {_escape_pipe(m.evidence)} | `{m.chunk_id}` | {m.confidence.value} |"
)
return "\n".join(lines)_render_hedging_intensity function · python · L240-L260 (21 LOC)Red Flag Engine/src/report.py
def _render_hedging_intensity(deltas: "list[HedgeDelta]") -> str:
"""Render the Hedging Intensity section; returns '' if list is empty."""
if not deltas:
return ""
lines = [
"## Hedging Intensity",
"",
"Hedge word density (Tier 1: may/might/could/uncertain…; "
"Tier 2: expect/anticipate/believe…) per 100 words, by section. "
"FLAG marks sections where hedging shifted by > 3 percentage points in either direction.",
"",
"| Section | Now (/100w) | Prev (/100w) | Chg | Flag |",
"|---------|------------|-------------|-----|------|",
]
for d in deltas:
delta_str = f"+{d.delta:.1f}" if d.delta >= 0 else f"{d.delta:.1f}"
flag_str = "FLAG" if d.flag else ""
lines.append(
f"| {d.section} | {d.now_score:.1f} | {d.prev_score:.1f} | {delta_str} | {flag_str} |"
)
return "\n".join(lines)_render_peer_signals function · python · L263-L281 (19 LOC)Red Flag Engine/src/report.py
def _render_peer_signals(signals: "list[PeerSignal]") -> str:
"""Render the Peer & Supplier Signals section; returns '' if list is empty."""
if not signals:
return ""
lines = [
"## Peer & Supplier Signals",
"",
"Red flags surfaced from related companies' most recent reports. "
"These may indicate sector-level or supply-chain stress relevant to this company.",
"",
"| Source | Rel | Category | Evidence | Polarity | Sev | Report |",
"|--------|-----|----------|----------|----------|-----|--------|",
]
for s in signals:
lines.append(
f"| {s.source_company} | {s.relationship} | {s.category} "
f"| {_escape_pipe(s.evidence)} | {s.polarity} | {s.sev} | `{s.report_filename}` |"
)
return "\n".join(lines)_render_prediction_markets function · python · L284-L353 (70 LOC)Red Flag Engine/src/report.py
def _render_prediction_markets(
markets: "list[PredictionMarket]",
crossrefs: "list[MarketClaimCrossRef]",
) -> str:
"""Render the Prediction Market Context section; returns '' if both lists are empty."""
if not markets and not crossrefs:
return ""
from datetime import date as _date
today = _date.today().isoformat()
lines = [
"## Prediction Market Context",
"",
f"Active markets sourced from Polymarket / Kalshi as of {today}. "
"Minimum volume: $5,000. Only markets with a strong probability signal "
"(Yes < 35% or Yes > 65%) are cross-referenced with management claims.",
]
# ── Active Markets table ──────────────────────────────────────────────
if markets:
lines += [
"",
"### Active Markets",
"",
"| Platform | Market | Yes % | Volume (USD) | Expires |",
"|----------|--------|-------|--------------|---------|",
]
If a scraper extracted this row, it came from Repobility (https://repobility.com)
_render_backtest_context function · python · L356-L376 (21 LOC)Red Flag Engine/src/report.py
def _render_backtest_context(bt: "PostEarningsReturns") -> str:
"""Render the Backtest Context section."""
def _fmt(r: Optional[float]) -> str:
if r is None:
return "—"
sign = "+" if r > 0 else ""
return f"{sign}{r * 100:.1f}%"
lines = [
"## Backtest Context",
"",
"| Window | Return |",
"|--------|--------|",
f"| 1-day post-earnings | {_fmt(bt.ret_1d)} |",
f"| 5-day post-earnings | {_fmt(bt.ret_5d)} |",
f"| 20-day post-earnings | {_fmt(bt.ret_20d)} |",
"",
f"*Based on earnings call date {bt.call_date}. "
"Retrospective data only — not a trading signal.*",
]
return "\n".join(lines)generate_report function · python · L383-L454 (72 LOC)Red Flag Engine/src/report.py
def generate_report(
company: str,
now_period: str,
prev_period: str,
changes: list[Change],
stats: ReportStats | None = None,
ai_sensitivity_md: str = "",
abandoned_metrics: "list[AbandonedMetric] | None" = None,
hedge_deltas: "list[HedgeDelta] | None" = None,
peer_signals: "list[PeerSignal] | None" = None,
backtest_returns: "PostEarningsReturns | None" = None,
pred_markets: "list[PredictionMarket] | None" = None,
pred_crossref: "list[MarketClaimCrossRef] | None" = None,
) -> str:
"""Render a complete Markdown report string from a list of Changes.
Args:
company: Company identifier (e.g. "BA").
now_period: Label for the current quarter (e.g. "2025Q4").
prev_period: Label for the prior quarter (e.g. "2025Q3").
changes: Output of diff.match_claims(), sorted by severity DESC.
stats: save_report function · python · L457-L470 (14 LOC)Red Flag Engine/src/report.py
def save_report(
report_md: str,
company: str,
now_period: str,
prev_period: str,
output_dir: Path,
) -> Path:
"""Write the Markdown report to *output_dir* and return the file path."""
output_dir.mkdir(parents=True, exist_ok=True)
filename = f"{company}_{now_period}_vs_{prev_period}.md"
out_path = output_dir / filename
out_path.write_text(report_md, encoding="utf-8")
logger.info("Report saved → %s", out_path)
return out_pathtag_speaker_role function · python · L61-L70 (10 LOC)Red Flag Engine/src/segment.py
def tag_speaker_role(text: str) -> str:
"""Infer the dominant speaker role in a chunk using regex patterns.
Returns one of: ``"operator"``, ``"analyst"``, ``"management"``,
``"unknown"``. First match wins in the priority order above.
"""
for role, pattern in _SPEAKER_PATTERNS:
if pattern.search(text):
return role
return "unknown"Chunk class · python · L74-L78 (5 LOC)Red Flag Engine/src/segment.py
class Chunk:
chunk_id: str # "chunk_000", "chunk_001", …
section: str # inferred section label
text: str
speaker_role: str = "unknown" # operator | analyst | management | unknowninfer_section function · python · L85-L90 (6 LOC)Red Flag Engine/src/segment.py
def infer_section(text: str) -> str:
"""Return the first matching section label for *text*, or 'general'."""
for label, pattern in _COMPILED_HINTS:
if pattern.search(text):
return label
return "general"chunk_text function · python · L93-L138 (46 LOC)Red Flag Engine/src/segment.py
def chunk_text(text: str, max_chars: int = 3500) -> list[str]:
"""Split *text* into chunks of at most *max_chars* characters.
Splits preferentially on blank lines (paragraph boundaries). The running
length correctly accounts for the ``\\n\\n`` separator that will be inserted
between paragraphs when the chunk is joined. A single paragraph that
exceeds *max_chars* is hard-split at the character limit rather than
being dropped.
"""
paragraphs = re.split(r"\n{2,}", text)
chunks: list[str] = []
current_parts: list[str] = []
current_len: int = 0 # tracks *actual* joined byte length including separators
for para in paragraphs:
para = para.strip()
if not para:
continue
# If a single paragraph is itself too long, hard-split it first.
if len(para) > max_chars:
if current_parts:
chunks.append(_SEPARATOR.join(current_parts))
current_parts = []
segment_doc function · python · L141-L165 (25 LOC)Red Flag Engine/src/segment.py
def segment_doc(doc: Doc, max_chars: int = 3500) -> list[Chunk]:
"""Segment a Doc into labeled Chunk objects.
Args:
doc: The loaded transcript document.
max_chars: Maximum characters per chunk (default 3500).
Returns:
Ordered list of Chunk objects with unique chunk_ids.
"""
raw_chunks = chunk_text(doc.text, max_chars=max_chars)
chunks: list[Chunk] = []
for idx, text in enumerate(raw_chunks):
chunk_id = f"chunk_{idx:03d}"
section = infer_section(text)
speaker_role = tag_speaker_role(text)
chunks.append(Chunk(chunk_id=chunk_id, section=section, text=text,
speaker_role=speaker_role))
logger.debug(" %s section=%-22s len=%d", chunk_id, section, len(text))
logger.info(
"Segmented '%s %s' → %d chunks", doc.company, doc.period, len(chunks)
)
return chunksRepobility analyzer · published findings · https://repobility.com
_parse_report_meta function · python · L313-L331 (19 LOC)Red Flag Engine/streamlit_app.py
def _parse_report_meta(md_path: Path) -> dict:
"""Return dashboard metadata for one report file."""
m = _FILENAME_RE.match(md_path.name)
company = m.group(1).upper() if m else md_path.stem
now = m.group(2) if m else "—"
prev = m.group(3) if m else "—"
content = md_path.read_text(encoding="utf-8", errors="ignore")
total_m = re.search(r"\| Total changes detected \| (\d+) \|", content)
high_m = re.search(r"\| High / Critical \| \*\*(\d+)\*\* \|", content)
return {
"company": company,
"now": now,
"prev": prev,
"total_changes": int(total_m.group(1)) if total_m else 0,
"high_critical": int(high_m.group(1)) if high_m else 0,
"filename": md_path.name,
}_split_sections function · python · L334-L346 (13 LOC)Red Flag Engine/streamlit_app.py
def _split_sections(content: str) -> dict[str, str]:
"""Split a report into named sections using the --- separators."""
parts = re.split(r"\n\n---\n\n", content)
sections: dict[str, str] = {}
for part in parts:
stripped = part.strip()
if stripped.startswith("# Red Flag Report"):
sections["header"] = stripped
else:
hm = re.match(r"^## (.+)", stripped)
if hm:
sections[hm.group(1).strip()] = stripped
return sections_split_pipe_row function · python · L349-L356 (8 LOC)Red Flag Engine/streamlit_app.py
def _split_pipe_row(line: str) -> list[str]:
"""Split a Markdown table row on unescaped pipes; strip leading/trailing empties."""
parts = re.split(r"(?<!\\)\|", line)
while parts and not parts[0].strip():
parts.pop(0)
while parts and not parts[-1].strip():
parts.pop()
return [p.strip().replace("\\|", "|") for p in parts]