Function bodies 90 total
LiveAvatarService class · python · L12-L267 (256 LOC)services/heygen_service.py
class LiveAvatarService:
"""Service para interactuar con LiveAvatar API"""
def __init__(self):
# Use HEYGEN_API_KEY and HEYGEN_AVATAR_ID from .env
self.api_key = os.getenv('HEYGEN_API_KEY')
# Use configured avatar_id or fallback to a public avatar for testing
configured_avatar = os.getenv('HEYGEN_AVATAR_ID')
# Use Wayne_20240711 as fallback (public interactive avatar)
self.avatar_id = configured_avatar if configured_avatar else "Wayne_20240711"
self.base_url = "https://api.heygen.com/v1"
if not self.api_key:
logger.warning("⚠️ HeyGen API key not configured")
else:
logger.info(f"✅ HeyGen Service initialized (Avatar: {self.avatar_id})")
if not self.api_key:
logger.warning("⚠️ LiveAvatar API key not configured")
else:
logger.info(f"✅ LiveAvatar Service initialized (Avatar: {self.avatar_id})")
async def create_sessio__init__ method · python · L15-L32 (18 LOC)services/heygen_service.py
def __init__(self):
# Use HEYGEN_API_KEY and HEYGEN_AVATAR_ID from .env
self.api_key = os.getenv('HEYGEN_API_KEY')
# Use configured avatar_id or fallback to a public avatar for testing
configured_avatar = os.getenv('HEYGEN_AVATAR_ID')
# Use Wayne_20240711 as fallback (public interactive avatar)
self.avatar_id = configured_avatar if configured_avatar else "Wayne_20240711"
self.base_url = "https://api.heygen.com/v1"
if not self.api_key:
logger.warning("⚠️ HeyGen API key not configured")
else:
logger.info(f"✅ HeyGen Service initialized (Avatar: {self.avatar_id})")
if not self.api_key:
logger.warning("⚠️ LiveAvatar API key not configured")
else:
logger.info(f"✅ LiveAvatar Service initialized (Avatar: {self.avatar_id})")create_session_token method · python · L34-L115 (82 LOC)services/heygen_service.py
async def create_session_token(self, context: Optional[str] = None) -> Dict:
"""
Crea una nueva sesión de streaming de HeyGen con managed LiveKit credentials
Args:
context: System prompt / personalidad del avatar (knowledge)
Returns:
Dict con url (LiveKit WebSocket URL), access_token y session_id
"""
try:
if not self.api_key:
raise ValueError("HeyGen API key not configured")
headers = {
"x-api-key": self.api_key,
"Content-Type": "application/json"
}
# Payload según documentación de HeyGen streaming.new
payload = {
"quality": "high",
"avatar_id": self.avatar_id # Use avatar_id
}
# Don't include voice settings - HeyGen will use default voice for avatar
# Si se proporstart_streaming method · python · L117-L162 (46 LOC)services/heygen_service.py
async def start_streaming(self, session_id: str, sdp_answer: Dict) -> Dict:
"""
Inicia el streaming del avatar enviando el SDP answer generado por el browser.
HeyGen necesita el SDP answer real para activar al agente y que empiece a
emitir video/audio en la sala LiveKit.
Args:
session_id: ID de la sesión creada con streaming.new
sdp_answer: Dict con {type: "answer", sdp: "..."} generado por RTCPeerConnection
Returns:
Dict con respuesta de la API
"""
try:
if not self.api_key:
raise ValueError("HeyGen API key not configured")
headers = {
"x-api-key": self.api_key,
"Content-Type": "application/json"
}
payload = {
"session_id": session_id,
"sdp": sdp_answer
}
logger.info(f"🚀 Starting streaming for session: {session_id}")
list_sessions method · python · L164-L194 (31 LOC)services/heygen_service.py
async def list_sessions(self) -> List[Dict]:
"""
Lista todas las sesiones activas de streaming
Returns:
List de sesiones activas
"""
try:
if not self.api_key:
return []
headers = {
"x-api-key": self.api_key
}
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"{self.base_url}/streaming.list",
headers=headers
)
if response.status_code == 200:
result = response.json()
sessions = result.get("data", {}).get("sessions", [])
return sessions
return []
except Exception as e:
logger.error(f"Error listing sessions: {str(e)}")
return []close_session method · python · L196-L235 (40 LOC)services/heygen_service.py
async def close_session(self, session_id: str) -> bool:
"""
Cierra una sesión de streaming activa
Args:
session_id: ID de la sesión a cerrar
Returns:
True si se cerró exitosamente
"""
try:
if not self.api_key:
return False
headers = {
"x-api-key": self.api_key,
"Content-Type": "application/json"
}
payload = {
"session_id": session_id
}
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
f"{self.base_url}/streaming.stop",
headers=headers,
json=payload
)
if response.status_code == 200:
logger.info(f"✅ Session {session_id} closed successfully")
send_knowledge_context method · python · L237-L253 (17 LOC)services/heygen_service.py
async def send_knowledge_context(self, room_name: str, context: str) -> Dict:
"""
Envía contexto de la base de conocimientos a la sesión activa.
No soportado directamente por la API de HeyGen; el contexto se inyecta
al crear la sesión vía knowledge_base. Este método es un stub.
Args:
room_name: Nombre de la sala de LiveKit
context: Contexto legal de la base de conocimientos
Returns:
Dict con resultado
"""
logger.info(f"send_knowledge_context called for room {room_name} (context length: {len(context)})")
# HeyGen no expone un endpoint público para actualizar el contexto en tiempo real.
# El contexto se establece en create_session_token via knowledge_base.
return {"status": "ok", "message": "Context is set at session creation time"}If a scraper extracted this row, it came from Repobility (https://repobility.com)
get_avatar_config method · python · L255-L267 (13 LOC)services/heygen_service.py
def get_avatar_config(self) -> Dict:
"""
Retorna la configuración del avatar para el cliente
"""
return {
"avatar_id": self.avatar_id,
"avatar_name": "Valeria - Asistente Legal IA",
"language": "es-ES",
"voice_settings": {
"rate": 1.0,
"emotion": "friendly"
}
}KnowledgeBaseService class · python · L14-L237 (224 LOC)services/knowledge_base.py
class KnowledgeBaseService:
"""
Servicio para búsqueda semántica en base de conocimientos legal
"""
def __init__(self, db: AsyncIOMotorDatabase):
self.db = db
# Usar modelo multilingüe para español
self.model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
logger.info("✅ KnowledgeBase Service initialized")
async def embed_query(self, query: str) -> List[float]:
"""
Genera embedding para una consulta
"""
try:
embedding = self.model.encode(query, convert_to_numpy=True)
return embedding.tolist()
except Exception as e:
logger.error(f"Error generating embedding: {str(e)}")
raise
async def semantic_search(
self,
query: str,
limit: int = 3,
min_score: float = 0.5
) -> List[Dict]:
"""
Realiza búsqueda semántica en la base de conocimientos
Args:
__init__ method · python · L19-L23 (5 LOC)services/knowledge_base.py
def __init__(self, db: AsyncIOMotorDatabase):
self.db = db
# Usar modelo multilingüe para español
self.model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
logger.info("✅ KnowledgeBase Service initialized")embed_query method · python · L25-L34 (10 LOC)services/knowledge_base.py
async def embed_query(self, query: str) -> List[float]:
"""
Genera embedding para una consulta
"""
try:
embedding = self.model.encode(query, convert_to_numpy=True)
return embedding.tolist()
except Exception as e:
logger.error(f"Error generating embedding: {str(e)}")
raisesemantic_search method · python · L36-L71 (36 LOC)services/knowledge_base.py
async def semantic_search(
self,
query: str,
limit: int = 3,
min_score: float = 0.5
) -> List[Dict]:
"""
Realiza búsqueda semántica en la base de conocimientos
Args:
query: Consulta del usuario
limit: Número máximo de resultados
min_score: Score mínimo de similitud (0-1)
Returns:
Lista de documentos relevantes con sus scores
"""
try:
# Generar embedding de la consulta
query_embedding = await self.embed_query(query)
# Búsqueda en MongoDB con vector search
# Si MongoDB no tiene índices vectoriales, usamos similitud de coseno manual
documents = await self._cosine_similarity_search(
query_embedding,
limit,
min_score
)
logger.info(f"Found {len(documents)} relevant documents for query:_cosine_similarity_search method · python · L73-L124 (52 LOC)services/knowledge_base.py
async def _cosine_similarity_search(
self,
query_embedding: List[float],
limit: int,
min_score: float
) -> List[Dict]:
"""
Búsqueda por similitud de coseno con embeddings pre-calculados
"""
try:
# Obtener todos los documentos con embeddings
cursor = self.db.knowledge.find(
{"embedding": {"$exists": True}},
{"_id": 0}
).limit(100) # Limitar para eficiencia
docs = await cursor.to_list(length=100)
if not docs:
logger.warning("No documents with embeddings found")
return []
# Calcular similitudes
query_vec = np.array(query_embedding)
results = []
for doc in docs:
doc_vec = np.array(doc.get("embedding", []))
if len(doc_vec) == 0:
continue
_keyword_search method · python · L126-L153 (28 LOC)services/knowledge_base.py
async def _keyword_search(
self,
query: str,
limit: int
) -> List[Dict]:
"""
Búsqueda por palabras clave (fallback)
"""
try:
# Búsqueda de texto simple en MongoDB
cursor = self.db.knowledge.find(
{"$text": {"$search": query}},
{"_id": 0, "score": {"$meta": "textScore"}}
).sort([("score", {"$meta": "textScore"})]).limit(limit)
docs = await cursor.to_list(length=limit)
return [{
"content": doc.get("content", ""),
"title": doc.get("title", "Sin título"),
"category": doc.get("category", "General"),
"score": doc.get("score", 0.0),
"metadata": doc.get("metadata", {})
} for doc in docs]
except Exception as e:
logger.error(f"Error in keyword search: {str(e)}")
return []store_document method · python · L155-L186 (32 LOC)services/knowledge_base.py
async def store_document(
self,
content: str,
title: str,
category: str = "General",
metadata: Optional[Dict] = None
) -> bool:
"""
Almacena un documento con su embedding en la base de conocimientos
"""
try:
# Generar embedding
embedding = await self.embed_query(content)
# Preparar documento
doc = {
"content": content,
"title": title,
"category": category,
"embedding": embedding,
"metadata": metadata or {}
}
# Insertar en MongoDB
result = await self.db.knowledge.insert_one(doc)
logger.info(f"Document stored: {title}")
return True
except Exception as e:
logger.error(f"Error storing document: {str(e)}")
return FalseWant this analysis on your repo? https://repobility.com/scan/
initialize_sample_data method · python · L188-L237 (50 LOC)services/knowledge_base.py
async def initialize_sample_data(self):
"""
Inicializa datos de ejemplo si la base está vacía
"""
try:
count = await self.db.knowledge.count_documents({})
if count > 0:
logger.info(f"Knowledge base already has {count} documents")
return
# Datos de ejemplo de Prados de Paraíso
sample_docs = [
{
"title": "Posesión Legítima",
"category": "Propiedad",
"content": "La posesión legítima es el derecho que adquiere una persona sobre un inmueble al poseerlo de forma continua, pacífica y pública durante un período determinado. En Prados de Paraíso, los propietarios tienen posesión legítima desde 1998, avalada por documentación notarial y registral."
},
{
"title": "Proceso de Saneamiento",
"category": "Tramitación",
LiveAvatarService class · python · L12-L86 (75 LOC)services/liveavatar.py
class LiveAvatarService:
"""
Servicio para interactuar con LiveAvatar API
"""
def __init__(self):
self.api_key = os.getenv("LIVEAVATAR_API_KEY")
self.avatar_id = os.getenv("LIVEAVATAR_AVATAR_ID")
self.base_url = "https://api.heygen.com/v1"
if not self.api_key:
logger.warning("⚠️ LIVEAVATAR_API_KEY not configured")
if not self.avatar_id:
logger.warning("⚠️ LIVEAVATAR_AVATAR_ID not configured")
logger.info(f"✅ LiveAvatar Service initialized (Avatar ID: {self.avatar_id})")
async def create_session_token(self) -> Optional[Dict]:
"""
Crea un token de sesión para LiveAvatar
Returns:
Dict con token y información de sesión o None si hay error
"""
if not self.api_key:
logger.error("LiveAvatar API key not configured")
return None
try:
async with httpx.AsyncClient() __init__ method · python · L17-L27 (11 LOC)services/liveavatar.py
def __init__(self):
self.api_key = os.getenv("LIVEAVATAR_API_KEY")
self.avatar_id = os.getenv("LIVEAVATAR_AVATAR_ID")
self.base_url = "https://api.heygen.com/v1"
if not self.api_key:
logger.warning("⚠️ LIVEAVATAR_API_KEY not configured")
if not self.avatar_id:
logger.warning("⚠️ LIVEAVATAR_AVATAR_ID not configured")
logger.info(f"✅ LiveAvatar Service initialized (Avatar ID: {self.avatar_id})")create_session_token method · python · L29-L72 (44 LOC)services/liveavatar.py
async def create_session_token(self) -> Optional[Dict]:
"""
Crea un token de sesión para LiveAvatar
Returns:
Dict con token y información de sesión o None si hay error
"""
if not self.api_key:
logger.error("LiveAvatar API key not configured")
return None
try:
async with httpx.AsyncClient() as client:
headers = {
"x-api-key": self.api_key,
"Content-Type": "application/json"
}
response = await client.post(
f"{self.base_url}/streaming.create_token",
headers=headers,
timeout=10.0
)
if response.status_code != 200:
logger.error(f"LiveAvatar token error: {response.status_code} - {response.text}")
return None
get_avatar_config method · python · L74-L86 (13 LOC)services/liveavatar.py
def get_avatar_config(self) -> Dict:
"""
Retorna la configuración del avatar para el cliente
"""
return {
"avatar_id": self.avatar_id,
"avatar_name": "Valeria - IA",
"language": "es-ES",
"voice_settings": {
"rate": 1.0,
"emotion": "friendly"
}
}LiveAvatarService class · python · L28-L244 (217 LOC)services/liveavatar_service.py
class LiveAvatarService:
def __init__(self):
self.api_key = os.getenv("LIVEAVATAR_API_KEY")
self.avatar_id = os.getenv("LIVEAVATAR_AVATAR_ID")
# Dict: session_id → websocket connection
self._ws_connections: Dict[str, websockets.WebSocketClientProtocol] = {}
self._ws_listeners: Dict[str, asyncio.Task] = {}
self._event_counters: Dict[str, int] = {}
if not self.api_key:
logger.warning("⚠️ LIVEAVATAR_API_KEY not configured")
if not self.avatar_id:
logger.warning("⚠️ LIVEAVATAR_AVATAR_ID not configured")
else:
logger.info(f"✅ LiveAvatar LITE Service ready (Avatar: {self.avatar_id})")
def _headers(self) -> Dict:
return {"X-API-KEY": self.api_key, "Content-Type": "application/json"}
def _event_id(self, session_id: str) -> str:
n = self._event_counters.get(session_id, 0) + 1
self._event_counters[session_id] = n
return f"{session_id[:8]}___init__ method · python · L29-L43 (15 LOC)services/liveavatar_service.py
def __init__(self):
self.api_key = os.getenv("LIVEAVATAR_API_KEY")
self.avatar_id = os.getenv("LIVEAVATAR_AVATAR_ID")
# Dict: session_id → websocket connection
self._ws_connections: Dict[str, websockets.WebSocketClientProtocol] = {}
self._ws_listeners: Dict[str, asyncio.Task] = {}
self._event_counters: Dict[str, int] = {}
if not self.api_key:
logger.warning("⚠️ LIVEAVATAR_API_KEY not configured")
if not self.avatar_id:
logger.warning("⚠️ LIVEAVATAR_AVATAR_ID not configured")
else:
logger.info(f"✅ LiveAvatar LITE Service ready (Avatar: {self.avatar_id})")_event_id method · python · L48-L51 (4 LOC)services/liveavatar_service.py
def _event_id(self, session_id: str) -> str:
n = self._event_counters.get(session_id, 0) + 1
self._event_counters[session_id] = n
return f"{session_id[:8]}_{n}_{datetime.now().strftime('%H%M%S%f')}"Source: Repobility analyzer · https://repobility.com
create_session method · python · L56-L114 (59 LOC)services/liveavatar_service.py
async def create_session(self, system_prompt: Optional[str] = None) -> Dict:
"""
Crea sesión LITE en liveavatar.com.
Returns: session_id, session_token, livekit_url, livekit_token, ws_url
"""
if not self.api_key or not self.avatar_id:
raise ValueError("LiveAvatar API key or avatar ID not configured")
payload = {
"mode": "LITE",
"avatar_id": self.avatar_id,
}
logger.info(f"🔄 Creating LiveAvatar LITE session (avatar={self.avatar_id})")
async with httpx.AsyncClient(timeout=30.0) as client:
# Step 1: get session token
r = await client.post(
f"{BASE_URL}/sessions/token",
headers=self._headers(),
json=payload
)
result = r.json()
logger.info(f"Token response: code={result.get('code')} msg={result.get('message')}")
if result.get("code") != 1000:
connect_websocket method · python · L119-L138 (20 LOC)services/liveavatar_service.py
async def connect_websocket(self, session_id: str, ws_url: str) -> None:
"""
Conecta el WebSocket de la sesión LITE y espera estado 'connected'.
Lanza una tarea en background para mantener el WS vivo.
"""
if session_id in self._ws_connections:
logger.info(f"WS already connected for session {session_id[:8]}")
return
logger.info(f"🔌 Connecting WebSocket for session {session_id[:8]}...")
ws = await websockets.connect(ws_url, ping_interval=20, ping_timeout=10)
self._ws_connections[session_id] = ws
self._event_counters[session_id] = 0
# LITE mode WS is ready immediately after connect — no handshake event needed
logger.info(f"✅ WebSocket connected for session {session_id[:8]}")
# Background listener (logs events, handles reconnect if needed)
task = asyncio.create_task(self._ws_listener(session_id, ws))
self._ws_listeners[session_id] = task_ws_listener method · python · L140-L159 (20 LOC)services/liveavatar_service.py
async def _ws_listener(self, session_id: str, ws) -> None:
"""Background task: escucha eventos del WebSocket."""
try:
async for raw in ws:
try:
evt = json.loads(raw)
etype = evt.get("type", "")
if etype == "agent.speak_started":
logger.info(f"🔊 Avatar speak started [{session_id[:8]}]")
elif etype == "agent.speak_ended":
logger.info(f"✅ Avatar speak ended [{session_id[:8]}]")
elif etype == "session.state_updated":
logger.info(f"Session state: {evt.get('state')} [{session_id[:8]}]")
except Exception:
pass
except Exception as e:
logger.warning(f"WS listener ended for {session_id[:8]}: {e}")
finally:
self._ws_connections.pop(session_id, None)
self._ws_listeners.pop(session_id, None)speak method · python · L164-L189 (26 LOC)services/liveavatar_service.py
async def speak(self, session_id: str, pcm_bytes: bytes) -> None:
"""
Envía audio PCM 16-bit 24kHz al avatar para lip-sync.
Si el audio es muy largo lo divide en chunks de ~1 seg (96KB).
"""
ws = self._ws_connections.get(session_id)
if not ws:
raise Exception(f"No WebSocket connection for session {session_id[:8]}")
CHUNK = 96_000 # 1 segundo de PCM 16-bit 24kHz = 48000 samples × 2 bytes
offset = 0
chunk_n = 0
while offset < len(pcm_bytes):
chunk = pcm_bytes[offset:offset + CHUNK]
offset += CHUNK
chunk_n += 1
audio_b64 = base64.b64encode(chunk).decode("utf-8")
event = {
"type": "agent.speak",
"event_id": self._event_id(session_id),
"audio": audio_b64,
}
await ws.send(json.dumps(event))
logger.debug(f"Sent audio chunk {chunk_n} ({len(chunk)} binterrupt method · python · L194-L202 (9 LOC)services/liveavatar_service.py
async def interrupt(self, session_id: str) -> None:
"""Detiene al avatar inmediatamente."""
ws = self._ws_connections.get(session_id)
if not ws:
logger.warning(f"No WS to interrupt for {session_id[:8]}")
return
event = {"type": "agent.interrupt", "event_id": self._event_id(session_id)}
await ws.send(json.dumps(event))
logger.info(f"⏹️ Interrupted avatar [{session_id[:8]}]")keep_alive method · python · L207-L213 (7 LOC)services/liveavatar_service.py
async def keep_alive(self, session_id: str) -> None:
ws = self._ws_connections.get(session_id)
if not ws:
return
event = {"type": "session.keep_alive", "event_id": self._event_id(session_id)}
await ws.send(json.dumps(event))
logger.debug(f"♻️ Keep-alive sent [{session_id[:8]}]")close_session method · python · L218-L234 (17 LOC)services/liveavatar_service.py
async def close_session(self, session_id: str) -> bool:
# Cancel listener task
task = self._ws_listeners.pop(session_id, None)
if task:
task.cancel()
# Close WS
ws = self._ws_connections.pop(session_id, None)
if ws:
try:
await ws.close()
except Exception:
pass
self._event_counters.pop(session_id, None)
logger.info(f"✅ Session {session_id[:8]} closed")
return Trueget_avatar_config method · python · L236-L241 (6 LOC)services/liveavatar_service.py
def get_avatar_config(self) -> Dict:
return {
"avatar_id": self.avatar_id,
"avatar_name": "Valeria - Asistente Legal IA",
"service": "liveavatar-lite",
}Repobility (the analyzer behind this table) · https://repobility.com
SQLiteKnowledgeBase class · python · L14-L142 (129 LOC)services/sqlite_knowledge.py
class SQLiteKnowledgeBase:
def __init__(self, db_path: str = "/app/backend/prados.db"):
self.db_path = db_path
self._init_database()
logger.info(f"✅ SQLite KnowledgeBase initialized at {db_path}")
def _init_database(self):
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS conocimiento_legal (
id INTEGER PRIMARY KEY AUTOINCREMENT,
titulo TEXT NOT NULL,
contenido TEXT NOT NULL,
embedding TEXT,
metadata TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_titulo
ON conocimiento_legal(titulo)
''')
con__init__ method · python · L15-L18 (4 LOC)services/sqlite_knowledge.py
def __init__(self, db_path: str = "/app/backend/prados.db"):
self.db_path = db_path
self._init_database()
logger.info(f"✅ SQLite KnowledgeBase initialized at {db_path}")_init_database method · python · L20-L42 (23 LOC)services/sqlite_knowledge.py
def _init_database(self):
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS conocimiento_legal (
id INTEGER PRIMARY KEY AUTOINCREMENT,
titulo TEXT NOT NULL,
contenido TEXT NOT NULL,
embedding TEXT,
metadata TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_titulo
ON conocimiento_legal(titulo)
''')
conn.commit()
logger.info("✅ Database tables created/verified")
except Exception as e:
logger.error(f"Error initializing database: {str(e)}")
raise_keyword_score method · python · L44-L53 (10 LOC)services/sqlite_knowledge.py
def _keyword_score(self, query: str, titulo: str, contenido: str) -> float:
"""Puntaje simple por coincidencia de palabras clave."""
# Cap query length to prevent ReDoS — use str.split() instead of regex
safe_query = query[:500].lower()
words = set(w for w in safe_query.split() if len(w) >= 2)
if not words:
return 0.0
text = (titulo + " " + contenido).lower()
matches = sum(1 for w in words if w in text)
return matches / len(words)add_document method · python · L55-L70 (16 LOC)services/sqlite_knowledge.py
def add_document(self, titulo: str, contenido: str, metadata: Optional[Dict] = None):
try:
metadata_json = json.dumps(metadata) if metadata else None
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO conocimiento_legal (titulo, contenido, metadata)
VALUES (?, ?, ?)
''', (titulo, contenido, metadata_json))
conn.commit()
doc_id = cursor.lastrowid
logger.info(f"✅ Document added: {titulo} (ID: {doc_id})")
return doc_id
except Exception as e:
logger.error(f"Error adding document: {str(e)}")
raisesearch method · python · L72-L100 (29 LOC)services/sqlite_knowledge.py
def search(self, query: str, top_k: int = 3) -> List[Dict]:
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('SELECT id, titulo, contenido FROM conocimiento_legal')
rows = cursor.fetchall()
if not rows:
logger.warning("No documents in knowledge base")
return []
results = []
for doc_id, titulo, contenido in rows:
score = self._keyword_score(query, titulo, contenido)
results.append({
'id': doc_id,
'titulo': titulo,
'contenido': contenido,
'score': score
})
results.sort(key=lambda x: x['score'], reverse=True)
top_results = results[:top_k]
logger.info(f"Search '{query}' → {len(top_results)} docs")
return top_results
except Exceptiget_all_documents method · python · L102-L120 (19 LOC)services/sqlite_knowledge.py
def get_all_documents(self) -> List[Dict]:
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('SELECT id, titulo, contenido, metadata FROM conocimiento_legal')
rows = cursor.fetchall()
documents = []
for doc_id, titulo, contenido, metadata_json in rows:
metadata = json.loads(metadata_json) if metadata_json else {}
documents.append({
'id': doc_id,
'titulo': titulo,
'contenido': contenido[:200] + '...',
'metadata': metadata
})
return documents
except Exception as e:
logger.error(f"Error getting documents: {str(e)}")
return []count_documents method · python · L122-L131 (10 LOC)services/sqlite_knowledge.py
def count_documents(self) -> int:
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM conocimiento_legal')
count = cursor.fetchone()[0]
return count
except Exception as e:
logger.error(f"Error counting documents: {str(e)}")
return 0If a scraper extracted this row, it came from Repobility (https://repobility.com)
clear_database method · python · L133-L142 (10 LOC)services/sqlite_knowledge.py
def clear_database(self):
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM conocimiento_legal')
conn.commit()
logger.info("✅ Database cleared")
except Exception as e:
logger.error(f"Error clearing database: {str(e)}")
raise‹ prevpage 2 / 2