Function bodies 138 total
chunk_by_paragraphs function · python · L28-L56 (29 LOC)backend/app/utils/chunker.py
def chunk_by_paragraphs(text: str, max_tokens: int = 1500) -> list[str]:
"""
Split text by double newlines (paragraphs).
Merges short paragraphs; splits oversized ones by tokens.
"""
paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()]
chunks: list[str] = []
current: list[str] = []
current_len = 0
for para in paragraphs:
para_len = len(para.split())
if para_len > max_tokens:
if current:
chunks.append("\n\n".join(current))
current, current_len = [], 0
chunks.extend(chunk_by_tokens(para, max_tokens=max_tokens))
elif current_len + para_len > max_tokens:
chunks.append("\n\n".join(current))
current = [para]
current_len = para_len
else:
current.append(para)
current_len += para_len
if current:
chunks.append("\n\n".join(current))
return chunks_make_pipeline function · python · L25-L31 (7 LOC)backend/ingestion/cli.py
def _make_pipeline() -> IngestionPipeline:
import chromadb
from app.config import settings
client = chromadb.PersistentClient(path=settings.chroma_path)
return IngestionPipeline(chroma_client=client)ingest_tickets function · python · L35-L46 (12 LOC)backend/ingestion/cli.py
def ingest_tickets(
export_file: Path = typer.Argument(..., help="Path to WHD JSON or CSV export file"),
) -> None:
"""Import resolved WHD tickets from a JSON or CSV export."""
if not export_file.exists():
typer.echo(f"[ERROR] File not found: {export_file}", err=True)
raise typer.Exit(1)
pipeline = _make_pipeline()
typer.echo(f"Ingesting tickets from {export_file} …")
count = pipeline.ingest_tickets(export_file)
typer.echo(f"[OK] Ingested {count} ticket(s).")ingest_kb_html function · python · L50-L61 (12 LOC)backend/ingestion/cli.py
def ingest_kb_html(
directory: Path = typer.Argument(..., help="Directory containing HTML KB articles"),
) -> None:
"""Import KB articles from a directory of HTML files."""
if not directory.is_dir():
typer.echo(f"[ERROR] Not a directory: {directory}", err=True)
raise typer.Exit(1)
pipeline = _make_pipeline()
typer.echo(f"Ingesting KB HTML from {directory} …")
count = pipeline.ingest_kb_html(directory)
typer.echo(f"[OK] Ingested {count} chunk(s) from HTML articles.")ingest_kb_pdf function · python · L65-L76 (12 LOC)backend/ingestion/cli.py
def ingest_kb_pdf(
directory: Path = typer.Argument(..., help="Directory containing PDF KB articles"),
) -> None:
"""Import KB articles from a directory of PDF files."""
if not directory.is_dir():
typer.echo(f"[ERROR] Not a directory: {directory}", err=True)
raise typer.Exit(1)
pipeline = _make_pipeline()
typer.echo(f"Ingesting KB PDF from {directory} …")
count = pipeline.ingest_kb_pdf(directory)
typer.echo(f"[OK] Ingested {count} chunk(s) from PDF articles.")status function · python · L80-L94 (15 LOC)backend/ingestion/cli.py
def status() -> None:
"""Show document counts for all ChromaDB collections."""
pipeline = _make_pipeline()
counts = pipeline.status()
if not counts:
typer.echo("No collections found. Run an ingest command first.")
return
typer.echo("\nChromaDB collection status:")
typer.echo("─" * 35)
for name, count in counts.items():
typer.echo(f" {name:<25} {count:>6} docs")
typer.echo("─" * 35)
typer.echo(f" {'TOTAL':<25} {sum(counts.values()):>6} docs\n")clear function · python · L98-L108 (11 LOC)backend/ingestion/cli.py
def clear(
confirm: bool = typer.Option(False, "--confirm", help="Required to confirm deletion"),
) -> None:
"""Clear ALL ChromaDB collections. Irreversible — requires --confirm."""
if not confirm:
typer.echo("[ABORT] Destructive operation. Pass --confirm to proceed.", err=True)
raise typer.Exit(1)
pipeline = _make_pipeline()
pipeline.clear_all()
typer.echo("[OK] All ChromaDB collections cleared.")Repobility — same analyzer, your code, free for public repos · /scan/
_content_id function · python · L21-L23 (3 LOC)backend/ingestion/kb_loader.py
def _content_id(content: str) -> str:
"""Stable SHA-256 document ID — re-ingesting same content is idempotent."""
return hashlib.sha256(content.encode()).hexdigest()load_kb_html function · python · L29-L75 (47 LOC)backend/ingestion/kb_loader.py
def load_kb_html(path: Path) -> Iterator[tuple[str, str, dict[str, str]]]:
"""
Yield (doc_id, text, metadata) tuples from an HTML KB article.
Splits on <h2> and <h3> headings; each section becomes one or more chunks.
"""
html = path.read_text(encoding="utf-8", errors="replace")
soup = BeautifulSoup(html, "html.parser")
# Extract article title
title_tag = soup.find("h1") or soup.find("title")
article_title = title_tag.get_text(strip=True) if title_tag else path.stem
# Split by heading tags
sections: list[tuple[str, str]] = [] # (heading, content)
current_heading = article_title
current_parts: list[str] = []
for element in soup.find_all(["h1", "h2", "h3", "p", "li", "pre"]):
if isinstance(element, Tag) and element.name in ("h1", "h2", "h3"):
if current_parts:
sections.append((current_heading, " ".join(current_parts)))
current_parts = []
current_heading = element.getload_kb_html_dir function · python · L78-L85 (8 LOC)backend/ingestion/kb_loader.py
def load_kb_html_dir(directory: Path) -> Iterator[tuple[str, str, dict[str, str]]]:
"""Yield chunks from all .html files in a directory (non-recursive)."""
for html_path in sorted(directory.glob("*.html")):
try:
yield from load_kb_html(html_path)
except Exception as exc:
# Log and continue — one bad file shouldn't abort the batch
logger.warning("Skipping %s: %s", html_path.name, exc)load_kb_pdf function · python · L91-L117 (27 LOC)backend/ingestion/kb_loader.py
def load_kb_pdf(path: Path) -> Iterator[tuple[str, str, dict[str, str]]]:
"""
Yield (doc_id, text, metadata) tuples from a PDF KB article.
Uses page-by-page extraction with 500-token sliding window, 50-token overlap.
"""
reader = PdfReader(str(path))
article_title = path.stem.replace("-", " ").replace("_", " ").title()
article_id = _content_id(article_title + path.name)
full_text_parts: list[str] = []
for page in reader.pages[:500]:
page_text = page.extract_text() or ""
full_text_parts.append(page_text)
full_text = "\n\n".join(full_text_parts)
for chunk in chunk_by_tokens(full_text, max_tokens=500, overlap_tokens=50):
if not chunk.strip():
continue
doc_id = _content_id(chunk)
metadata: dict[str, str] = {
"article_id": article_id,
"title": article_title,
"source_file": path.name,
"source_type": "pdf",
}
yield doc_id, chunk, meload_kb_pdf_dir function · python · L120-L126 (7 LOC)backend/ingestion/kb_loader.py
def load_kb_pdf_dir(directory: Path) -> Iterator[tuple[str, str, dict[str, str]]]:
"""Yield chunks from all .pdf files in a directory (non-recursive)."""
for pdf_path in sorted(directory.glob("*.pdf")):
try:
yield from load_kb_pdf(pdf_path)
except Exception as exc:
logger.warning("Skipping %s: %s", pdf_path.name, exc)IngestionPipeline class · python · L37-L177 (141 LOC)backend/ingestion/pipeline.py
class IngestionPipeline:
def __init__(
self,
chroma_client: ClientAPI,
embed_fn: Callable[[str], list[float]] | None = None,
) -> None:
self.client = chroma_client
self._custom_embed_fn = embed_fn
# ── Ticket ingestion ─────────────────────────────────────────────────────
def ingest_tickets(self, path: Path) -> int:
col = self.client.get_or_create_collection(
name=TICKET_COLLECTION,
metadata={"hnsw:space": "cosine"},
)
return self._upsert_stream(col, load_tickets(path))
# ── KB ingestion ─────────────────────────────────────────────────────────
def ingest_kb_html(self, directory: Path) -> int:
col = self.client.get_or_create_collection(
name=KB_COLLECTION,
metadata={"hnsw:space": "cosine"},
)
return self._upsert_stream(col, load_kb_html_dir(directory))
def ingest_kb_pdf(self, directory: Path) -> int:
col = self.__init__ method · python · L38-L44 (7 LOC)backend/ingestion/pipeline.py
def __init__(
self,
chroma_client: ClientAPI,
embed_fn: Callable[[str], list[float]] | None = None,
) -> None:
self.client = chroma_client
self._custom_embed_fn = embed_fningest_tickets method · python · L48-L53 (6 LOC)backend/ingestion/pipeline.py
def ingest_tickets(self, path: Path) -> int:
col = self.client.get_or_create_collection(
name=TICKET_COLLECTION,
metadata={"hnsw:space": "cosine"},
)
return self._upsert_stream(col, load_tickets(path))Repobility · code-quality intelligence · https://repobility.com
ingest_kb_html method · python · L57-L62 (6 LOC)backend/ingestion/pipeline.py
def ingest_kb_html(self, directory: Path) -> int:
col = self.client.get_or_create_collection(
name=KB_COLLECTION,
metadata={"hnsw:space": "cosine"},
)
return self._upsert_stream(col, load_kb_html_dir(directory))ingest_kb_pdf method · python · L64-L69 (6 LOC)backend/ingestion/pipeline.py
def ingest_kb_pdf(self, directory: Path) -> int:
col = self.client.get_or_create_collection(
name=KB_COLLECTION,
metadata={"hnsw:space": "cosine"},
)
return self._upsert_stream(col, load_kb_pdf_dir(directory))ingest_file method · python · L73-L100 (28 LOC)backend/ingestion/pipeline.py
def ingest_file(self, path: Path) -> tuple[str, int]:
"""Auto-route a single file to the correct collection by extension.
Returns (collection_name, chunks_ingested).
Raises ValueError for unsupported extensions.
"""
suffix = path.suffix.lower()
collection_name = _EXTENSION_MAP.get(suffix)
if collection_name is None:
raise ValueError(
f"Unsupported file extension: {suffix} "
f"(supported: {', '.join(sorted(_EXTENSION_MAP))})"
)
col = self.client.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"},
)
if suffix in (".json", ".csv"):
stream = load_tickets(path)
elif suffix in (".html", ".htm"):
stream = load_kb_html(path)
else: # .pdf
stream = load_kb_pdf(path)
chunks = self._upsert_stream(col, stream)
return collection_name, cstatus method · python · L104-L109 (6 LOC)backend/ingestion/pipeline.py
def status(self) -> dict[str, int]:
try:
collections = self.client.list_collections()
return {col.name: col.count() for col in collections}
except Exception:
return {}clear_all method · python · L111-L113 (3 LOC)backend/ingestion/pipeline.py
def clear_all(self) -> None:
for col in self.client.list_collections():
self.client.delete_collection(col.name)_upsert_stream method · python · L117-L160 (44 LOC)backend/ingestion/pipeline.py
def _upsert_stream(
self,
col: Collection,
stream: Iterator[tuple[str, str, dict[str, str]]],
) -> int:
total = 0
batch_num = 0
batch_ids: list[str] = []
batch_docs: list[str] = []
batch_embeds: list[list[float]] = []
batch_metas: list[dict[str, str]] = []
for doc_id, text, metadata in stream:
embedding = self._do_embed(text)
batch_ids.append(doc_id)
batch_docs.append(text)
batch_embeds.append(embedding)
batch_metas.append(metadata)
if len(batch_ids) >= _BATCH_SIZE:
col.upsert(
ids=batch_ids,
documents=batch_docs,
embeddings=batch_embeds, # type: ignore[arg-type]
metadatas=batch_metas, # type: ignore[arg-type]
)
total += len(batch_ids)
batch_num += 1
logger.info("Batc_do_embed method · python · L162-L166 (5 LOC)backend/ingestion/pipeline.py
def _do_embed(self, text: str) -> list[float]:
"""Embed text using the injected embed_fn or the default Ollama call."""
if self._custom_embed_fn is not None:
return self._custom_embed_fn(text)
return self._embed(text)_embed method · python · L168-L177 (10 LOC)backend/ingestion/pipeline.py
def _embed(self, text: str) -> list[float]:
"""Embed text using nomic-embed-text via Ollama (synchronous)."""
with httpx.Client() as client:
resp = client.post(
f"{settings.ollama_base_url}/api/embeddings",
json={"model": "nomic-embed-text", "prompt": text},
timeout=60.0,
)
resp.raise_for_status()
return list(resp.json()["embedding"])Powered by Repobility — scan your code at https://repobility.com
_content_id function · python · L28-L30 (3 LOC)backend/ingestion/ticket_loader.py
def _content_id(content: str) -> str:
"""Stable SHA-256 document ID — re-ingesting same content is idempotent."""
return hashlib.sha256(content.encode()).hexdigest()load_tickets_json function · python · L33-L64 (32 LOC)backend/ingestion/ticket_loader.py
def load_tickets_json(path: Path) -> Iterator[tuple[str, str, dict[str, str]]]:
"""
Yield (doc_id, text, metadata) tuples from a WHD JSON export.
'text' combines subject + description + resolution for embedding.
"""
with path.open(encoding="utf-8") as f:
records = json.load(f)
if not isinstance(records, list):
raise ValueError(f"Expected a JSON array, got {type(records).__name__}")
for rec in records:
if not isinstance(rec, dict):
continue
subject = str(rec.get("subject", "")).strip()
description = str(rec.get("description", "")).strip()
resolution = str(rec.get("resolution", "")).strip()
if not subject and not description:
continue
text = "\n\n".join(part for part in [subject, description, resolution] if part)
doc_id = _content_id(text)
metadata: dict[str, str] = {
"ticket_id": str(rec.get("id", "")),
"subject": subject[:200],load_tickets_csv function · python · L67-L93 (27 LOC)backend/ingestion/ticket_loader.py
def load_tickets_csv(path: Path) -> Iterator[tuple[str, str, dict[str, str]]]:
"""
Yield (doc_id, text, metadata) tuples from a WHD CSV export.
Header row field names are normalized to lowercase.
"""
with path.open(encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
norm = {k.lower().strip(): v.strip() for k, v in row.items() if k}
subject = norm.get("subject", "")
description = norm.get("description", "")
resolution = norm.get("resolution", "")
if not subject and not description:
continue
text = "\n\n".join(part for part in [subject, description, resolution] if part)
doc_id = _content_id(text)
metadata: dict[str, str] = {
"ticket_id": norm.get("id", ""),
"subject": subject[:200],
"category": norm.get("category", ""),
"status": norm.get("status"load_tickets function · python · L96-L104 (9 LOC)backend/ingestion/ticket_loader.py
def load_tickets(path: Path) -> Iterator[tuple[str, str, dict[str, str]]]:
"""Auto-detect format based on file extension."""
suffix = path.suffix.lower()
if suffix == ".json":
yield from load_tickets_json(path)
elif suffix == ".csv":
yield from load_tickets_csv(path)
else:
raise ValueError(f"Unsupported ticket export format: {suffix} (use .json or .csv)")log function · python · L21-L23 (3 LOC)backend/native_host.py
def log(msg: str) -> None:
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(msg + "\n")read_message function · python · L26-L32 (7 LOC)backend/native_host.py
def read_message() -> dict | None:
raw_length = sys.stdin.buffer.read(4)
if len(raw_length) < 4:
return None
length = struct.unpack("<I", raw_length)[0]
data = sys.stdin.buffer.read(length)
return json.loads(data.decode("utf-8"))send_message function · python · L35-L39 (5 LOC)backend/native_host.py
def send_message(msg: dict) -> None:
encoded = json.dumps(msg).encode("utf-8")
sys.stdout.buffer.write(struct.pack("<I", len(encoded)))
sys.stdout.buffer.write(encoded)
sys.stdout.buffer.flush()start_backend function · python · L42-L65 (24 LOC)backend/native_host.py
def start_backend() -> dict:
venv_python = os.path.join(BACKEND_DIR, ".venv", "Scripts", "python.exe")
if not os.path.exists(venv_python):
log(f"venv not found at {venv_python}")
return {"ok": False, "error": "Backend venv not found. Run setup first."}
log_out = os.path.join(BACKEND_DIR, "backend_stdout.log")
log_err = os.path.join(BACKEND_DIR, "backend_stderr.log")
log(f"Starting backend with {venv_python}, cwd={BACKEND_DIR}")
try:
with open(log_out, "w") as fout, open(log_err, "w") as ferr:
proc = subprocess.Popen(
[venv_python, "-m", "uvicorn", "app.main:app", "--host", "127.0.0.1", "--port", "8765"],
cwd=BACKEND_DIR,
stdout=fout,
stderr=ferr,
creationflags=subprocess.CREATE_NO_WINDOW,
)
log(f"Started PID={proc.pid}")
return {"ok": True, "status": "starting", "pid": proc.pid}
except Exception as e:
log(fRepobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
start_ollama function · python · L68-L80 (13 LOC)backend/native_host.py
def start_ollama() -> dict:
log("Starting Ollama")
try:
subprocess.Popen(
["ollama", "serve"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
creationflags=subprocess.CREATE_NO_WINDOW,
)
return {"ok": True, "status": "starting"}
except Exception as e:
log(f"Error starting Ollama: {e}")
return {"ok": False, "error": str(e)}main function · python · L83-L96 (14 LOC)backend/native_host.py
def main() -> None:
msg = read_message()
if not msg:
return
action = msg.get("action", "")
log(f"Received action: {action}")
if action == "start_backend":
send_message(start_backend())
elif action == "start_ollama":
send_message(start_ollama())
else:
send_message({"ok": False, "error": f"Unknown action: {action}"})DOMInserter class · typescript · L1-L46 (46 LOC)extension/src/content/dom-inserter.ts
export class DOMInserter {
private readonly techNotesSelectors = [
'textarea#techNotes',
'textarea[name="techNote"]',
'#techNotesDiv textarea',
]
/**
* Inserts text into the WHD reply textarea.
* Uses the native setter trick to bypass React/framework value caching.
* Returns true on success, false if textarea not found.
*/
insertReply(text: string): boolean {
const textarea = this.findTextarea()
if (!textarea) return false
// Native setter trick — bypasses framework value caching
const nativeSetter = Object.getOwnPropertyDescriptor(
HTMLTextAreaElement.prototype,
'value'
)?.set
if (nativeSetter) {
nativeSetter.call(textarea, text)
} else {
textarea.value = text
}
textarea.dispatchEvent(new Event('input', { bubbles: true }))
textarea.dispatchEvent(new Event('change', { bubbles: true }))
// Scroll into view and focus
textarea.scrollIntoView({ behavior: 'smooth', block: 'center' })insertReply method · typescript · L13-L37 (25 LOC)extension/src/content/dom-inserter.ts
insertReply(text: string): boolean {
const textarea = this.findTextarea()
if (!textarea) return false
// Native setter trick — bypasses framework value caching
const nativeSetter = Object.getOwnPropertyDescriptor(
HTMLTextAreaElement.prototype,
'value'
)?.set
if (nativeSetter) {
nativeSetter.call(textarea, text)
} else {
textarea.value = text
}
textarea.dispatchEvent(new Event('input', { bubbles: true }))
textarea.dispatchEvent(new Event('change', { bubbles: true }))
// Scroll into view and focus
textarea.scrollIntoView({ behavior: 'smooth', block: 'center' })
textarea.focus()
return true
}findTextarea method · typescript · L39-L45 (7 LOC)extension/src/content/dom-inserter.ts
private findTextarea(): HTMLTextAreaElement | null {
for (const sel of this.techNotesSelectors) {
const el = document.querySelector(sel)
if (el instanceof HTMLTextAreaElement) return el
}
return null
}safeQuerySelector function · typescript · L30-L40 (11 LOC)extension/src/content/dom-reader.ts
function safeQuerySelector(selector: string): Element | null {
try {
return document.querySelector(selector)
} catch (err) {
if (err instanceof SyntaxError) {
debugError('Invalid CSS selector skipped:', selector, err.message)
return null
}
throw err
}
}DOMReader class · typescript · L42-L205 (164 LOC)extension/src/content/dom-reader.ts
export class DOMReader {
private overrides: Partial<SelectorConfig> = {}
private readonly _ready: Promise<void>
constructor() {
this._ready = this.loadOverrides()
}
private loadOverrides(): Promise<void> {
return new Promise<void>((resolve) => {
chrome.storage.sync.get(STORAGE_KEY_SETTINGS, (result) => {
const saved = result[STORAGE_KEY_SETTINGS] as Partial<AppSettings> | undefined
this.overrides = saved?.selectorOverrides ?? {}
resolve()
})
})
}
/** Wait until selector overrides are loaded from storage. */
async ready(): Promise<void> {
await this._ready
}
/** Returns true if the current page is a WHD ticket page. */
isTicketPage(): boolean {
const url = window.location.href
if (TICKET_URL_PATTERNS.some((p) => p.test(url))) return true
if (TICKET_DOM_MARKERS.some((sel) => safeQuerySelector(sel) !== null)) return true
const bodyText = document.body?.innerText ?? ''
if (TICKET_CONTENT_SENconstructor method · typescript · L46-L48 (3 LOC)extension/src/content/dom-reader.ts
constructor() {
this._ready = this.loadOverrides()
}Repobility — same analyzer, your code, free for public repos · /scan/
loadOverrides method · typescript · L50-L58 (9 LOC)extension/src/content/dom-reader.ts
private loadOverrides(): Promise<void> {
return new Promise<void>((resolve) => {
chrome.storage.sync.get(STORAGE_KEY_SETTINGS, (result) => {
const saved = result[STORAGE_KEY_SETTINGS] as Partial<AppSettings> | undefined
this.overrides = saved?.selectorOverrides ?? {}
resolve()
})
})
}resolve method · typescript · L55-L63 (9 LOC)extension/src/content/dom-reader.ts
resolve()
})
})
}
/** Wait until selector overrides are loaded from storage. */
async ready(): Promise<void> {
await this._ready
}isTicketPage method · typescript · L66-L76 (11 LOC)extension/src/content/dom-reader.ts
isTicketPage(): boolean {
const url = window.location.href
if (TICKET_URL_PATTERNS.some((p) => p.test(url))) return true
if (TICKET_DOM_MARKERS.some((sel) => safeQuerySelector(sel) !== null)) return true
const bodyText = document.body?.innerText ?? ''
if (TICKET_CONTENT_SENTINELS.every((sentinel) => bodyText.includes(sentinel))) return true
return false
}readField method · typescript · L96-L121 (26 LOC)extension/src/content/dom-reader.ts
private readField(
field: keyof SelectorConfig,
primary: string,
fallbacks: readonly string[]
): string {
const override = this.overrides[field]
const selectors = override ? [override, primary, ...fallbacks] : [primary, ...fallbacks]
// Tier 1: CSS selectors
for (const sel of selectors) {
const el = safeQuerySelector(sel)
if (el) {
const text = this.extractText(el)
if (text) return text
}
}
// Tier 2: WHD label-based extraction (table layout)
const labels = WHD_LABELS[field]
if (labels) {
const result = this.readByLabel(labels)
if (result) return result
}
return ''
}extractText method · typescript · L196-L204 (9 LOC)extension/src/content/dom-reader.ts
private extractText(el: Element): string {
if (el instanceof HTMLInputElement || el instanceof HTMLTextAreaElement) {
return el.value.trim()
}
if (el instanceof HTMLSelectElement) {
return el.options[el.selectedIndex]?.text?.trim() ?? ''
}
return el.textContent?.trim() ?? ''
}init function · typescript · L8-L17 (10 LOC)extension/src/content/index.ts
async function init(): Promise<void> {
const { DOMInserter: DOMInserterClass } = await import('./dom-inserter')
const { SidebarHost: SidebarHostClass } = await import('./sidebar-host')
const reader = new DOMReader()
await reader.ready()
inserter = new DOMInserterClass()
const host = new SidebarHostClass(reader)
host.start()
}SidebarHost class · typescript · L4-L52 (49 LOC)extension/src/content/sidebar-host.ts
export class SidebarHost {
private reader: DOMReader
private observer: MutationObserver | null = null
private debounceTimer: ReturnType<typeof setTimeout> | null = null
constructor(reader: DOMReader) {
this.reader = reader
}
start(): void {
this.sendTicketData()
this.startObserver()
}
stop(): void {
if (this.observer) {
this.observer.disconnect()
this.observer = null
}
if (this.debounceTimer !== null) {
clearTimeout(this.debounceTimer)
this.debounceTimer = null
}
}
private sendTicketData(): void {
const data = this.reader.extract()
if (data) {
chrome.runtime.sendMessage({ type: 'TICKET_DATA_UPDATED', payload: data }).catch(() => {})
} else {
chrome.runtime.sendMessage({ type: 'NOT_A_TICKET_PAGE' }).catch(() => {})
}
}
private startObserver(): void {
this.observer = new MutationObserver(() => {
// Debounce to avoid flooding on WHD's rapid partial DOM updates
if (tconstructor method · typescript · L9-L11 (3 LOC)extension/src/content/sidebar-host.ts
constructor(reader: DOMReader) {
this.reader = reader
}Repobility · code-quality intelligence · https://repobility.com
start method · typescript · L13-L16 (4 LOC)extension/src/content/sidebar-host.ts
start(): void {
this.sendTicketData()
this.startObserver()
}stop method · typescript · L18-L27 (10 LOC)extension/src/content/sidebar-host.ts
stop(): void {
if (this.observer) {
this.observer.disconnect()
this.observer = null
}
if (this.debounceTimer !== null) {
clearTimeout(this.debounceTimer)
this.debounceTimer = null
}
}clearTimeout method · typescript · L24-L36 (13 LOC)extension/src/content/sidebar-host.ts
clearTimeout(this.debounceTimer)
this.debounceTimer = null
}
}
private sendTicketData(): void {
const data = this.reader.extract()
if (data) {
chrome.runtime.sendMessage({ type: 'TICKET_DATA_UPDATED', payload: data }).catch(() => {})
} else {
chrome.runtime.sendMessage({ type: 'NOT_A_TICKET_PAGE' }).catch(() => {})
}
}