← back to fabioB33__BackendAsisLegal

Function bodies 90 total

All specs Real LLM only Function bodies
LiveAvatarService class · python · L12-L267 (256 LOC)
services/heygen_service.py
class LiveAvatarService:
    """Service para interactuar con LiveAvatar API"""
    
    def __init__(self):
        # Use HEYGEN_API_KEY and HEYGEN_AVATAR_ID from .env
        self.api_key = os.getenv('HEYGEN_API_KEY')
        # Use configured avatar_id or fallback to a public avatar for testing
        configured_avatar = os.getenv('HEYGEN_AVATAR_ID')
        # Use Wayne_20240711 as fallback (public interactive avatar)
        self.avatar_id = configured_avatar if configured_avatar else "Wayne_20240711"
        self.base_url = "https://api.heygen.com/v1"
        
        if not self.api_key:
            logger.warning("⚠️ HeyGen API key not configured")
        else:
            logger.info(f"✅ HeyGen Service initialized (Avatar: {self.avatar_id})")
        
        if not self.api_key:
            logger.warning("⚠️ LiveAvatar API key not configured")
        else:
            logger.info(f"✅ LiveAvatar Service initialized (Avatar: {self.avatar_id})")
    
    async def create_sessio
__init__ method · python · L15-L32 (18 LOC)
services/heygen_service.py
    def __init__(self):
        # Use HEYGEN_API_KEY and HEYGEN_AVATAR_ID from .env
        self.api_key = os.getenv('HEYGEN_API_KEY')
        # Use configured avatar_id or fallback to a public avatar for testing
        configured_avatar = os.getenv('HEYGEN_AVATAR_ID')
        # Use Wayne_20240711 as fallback (public interactive avatar)
        self.avatar_id = configured_avatar if configured_avatar else "Wayne_20240711"
        self.base_url = "https://api.heygen.com/v1"
        
        if not self.api_key:
            logger.warning("⚠️ HeyGen API key not configured")
        else:
            logger.info(f"✅ HeyGen Service initialized (Avatar: {self.avatar_id})")
        
        if not self.api_key:
            logger.warning("⚠️ LiveAvatar API key not configured")
        else:
            logger.info(f"✅ LiveAvatar Service initialized (Avatar: {self.avatar_id})")
create_session_token method · python · L34-L115 (82 LOC)
services/heygen_service.py
    async def create_session_token(self, context: Optional[str] = None) -> Dict:
        """
        Crea una nueva sesión de streaming de HeyGen con managed LiveKit credentials
        
        Args:
            context: System prompt / personalidad del avatar (knowledge)
            
        Returns:
            Dict con url (LiveKit WebSocket URL), access_token y session_id
        """
        try:
            if not self.api_key:
                raise ValueError("HeyGen API key not configured")
            
            headers = {
                "x-api-key": self.api_key,
                "Content-Type": "application/json"
            }
            
            # Payload según documentación de HeyGen streaming.new
            payload = {
                "quality": "high",
                "avatar_id": self.avatar_id  # Use avatar_id
            }
            
            # Don't include voice settings - HeyGen will use default voice for avatar
            
            # Si se propor
start_streaming method · python · L117-L162 (46 LOC)
services/heygen_service.py
    async def start_streaming(self, session_id: str, sdp_answer: Dict) -> Dict:
        """
        Inicia el streaming del avatar enviando el SDP answer generado por el browser.
        HeyGen necesita el SDP answer real para activar al agente y que empiece a
        emitir video/audio en la sala LiveKit.

        Args:
            session_id: ID de la sesión creada con streaming.new
            sdp_answer: Dict con {type: "answer", sdp: "..."} generado por RTCPeerConnection

        Returns:
            Dict con respuesta de la API
        """
        try:
            if not self.api_key:
                raise ValueError("HeyGen API key not configured")

            headers = {
                "x-api-key": self.api_key,
                "Content-Type": "application/json"
            }

            payload = {
                "session_id": session_id,
                "sdp": sdp_answer
            }

            logger.info(f"🚀 Starting streaming for session: {session_id}")

           
list_sessions method · python · L164-L194 (31 LOC)
services/heygen_service.py
    async def list_sessions(self) -> List[Dict]:
        """
        Lista todas las sesiones activas de streaming
        
        Returns:
            List de sesiones activas
        """
        try:
            if not self.api_key:
                return []
            
            headers = {
                "x-api-key": self.api_key
            }
            
            async with httpx.AsyncClient(timeout=10.0) as client:
                response = await client.get(
                    f"{self.base_url}/streaming.list",
                    headers=headers
                )
                
                if response.status_code == 200:
                    result = response.json()
                    sessions = result.get("data", {}).get("sessions", [])
                    return sessions
                
                return []
                
        except Exception as e:
            logger.error(f"Error listing sessions: {str(e)}")
            return []
close_session method · python · L196-L235 (40 LOC)
services/heygen_service.py
    async def close_session(self, session_id: str) -> bool:
        """
        Cierra una sesión de streaming activa
        
        Args:
            session_id: ID de la sesión a cerrar
            
        Returns:
            True si se cerró exitosamente
        """
        try:
            if not self.api_key:
                return False
            
            headers = {
                "x-api-key": self.api_key,
                "Content-Type": "application/json"
            }
            
            payload = {
                "session_id": session_id
            }
            
            async with httpx.AsyncClient(timeout=10.0) as client:
                response = await client.post(
                    f"{self.base_url}/streaming.stop",
                    headers=headers,
                    json=payload
                )
                
                if response.status_code == 200:
                    logger.info(f"✅ Session {session_id} closed successfully")
  
send_knowledge_context method · python · L237-L253 (17 LOC)
services/heygen_service.py
    async def send_knowledge_context(self, room_name: str, context: str) -> Dict:
        """
        Envía contexto de la base de conocimientos a la sesión activa.
        No soportado directamente por la API de HeyGen; el contexto se inyecta
        al crear la sesión vía knowledge_base. Este método es un stub.

        Args:
            room_name: Nombre de la sala de LiveKit
            context: Contexto legal de la base de conocimientos

        Returns:
            Dict con resultado
        """
        logger.info(f"send_knowledge_context called for room {room_name} (context length: {len(context)})")
        # HeyGen no expone un endpoint público para actualizar el contexto en tiempo real.
        # El contexto se establece en create_session_token via knowledge_base.
        return {"status": "ok", "message": "Context is set at session creation time"}
If a scraper extracted this row, it came from Repobility (https://repobility.com)
get_avatar_config method · python · L255-L267 (13 LOC)
services/heygen_service.py
    def get_avatar_config(self) -> Dict:
        """
        Retorna la configuración del avatar para el cliente
        """
        return {
            "avatar_id": self.avatar_id,
            "avatar_name": "Valeria - Asistente Legal IA",
            "language": "es-ES",
            "voice_settings": {
                "rate": 1.0,
                "emotion": "friendly"
            }
        }
KnowledgeBaseService class · python · L14-L237 (224 LOC)
services/knowledge_base.py
class KnowledgeBaseService:
    """
    Servicio para búsqueda semántica en base de conocimientos legal
    """
    
    def __init__(self, db: AsyncIOMotorDatabase):
        self.db = db
        # Usar modelo multilingüe para español
        self.model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
        logger.info("✅ KnowledgeBase Service initialized")
    
    async def embed_query(self, query: str) -> List[float]:
        """
        Genera embedding para una consulta
        """
        try:
            embedding = self.model.encode(query, convert_to_numpy=True)
            return embedding.tolist()
        except Exception as e:
            logger.error(f"Error generating embedding: {str(e)}")
            raise
    
    async def semantic_search(
        self,
        query: str,
        limit: int = 3,
        min_score: float = 0.5
    ) -> List[Dict]:
        """
        Realiza búsqueda semántica en la base de conocimientos
        
        Args:
          
__init__ method · python · L19-L23 (5 LOC)
services/knowledge_base.py
    def __init__(self, db: AsyncIOMotorDatabase):
        self.db = db
        # Usar modelo multilingüe para español
        self.model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
        logger.info("✅ KnowledgeBase Service initialized")
embed_query method · python · L25-L34 (10 LOC)
services/knowledge_base.py
    async def embed_query(self, query: str) -> List[float]:
        """
        Genera embedding para una consulta
        """
        try:
            embedding = self.model.encode(query, convert_to_numpy=True)
            return embedding.tolist()
        except Exception as e:
            logger.error(f"Error generating embedding: {str(e)}")
            raise
semantic_search method · python · L36-L71 (36 LOC)
services/knowledge_base.py
    async def semantic_search(
        self,
        query: str,
        limit: int = 3,
        min_score: float = 0.5
    ) -> List[Dict]:
        """
        Realiza búsqueda semántica en la base de conocimientos
        
        Args:
            query: Consulta del usuario
            limit: Número máximo de resultados
            min_score: Score mínimo de similitud (0-1)
        
        Returns:
            Lista de documentos relevantes con sus scores
        """
        try:
            # Generar embedding de la consulta
            query_embedding = await self.embed_query(query)
            
            # Búsqueda en MongoDB con vector search
            # Si MongoDB no tiene índices vectoriales, usamos similitud de coseno manual
            documents = await self._cosine_similarity_search(
                query_embedding,
                limit,
                min_score
            )
            
            logger.info(f"Found {len(documents)} relevant documents for query:
_cosine_similarity_search method · python · L73-L124 (52 LOC)
services/knowledge_base.py
    async def _cosine_similarity_search(
        self,
        query_embedding: List[float],
        limit: int,
        min_score: float
    ) -> List[Dict]:
        """
        Búsqueda por similitud de coseno con embeddings pre-calculados
        """
        try:
            # Obtener todos los documentos con embeddings
            cursor = self.db.knowledge.find(
                {"embedding": {"$exists": True}},
                {"_id": 0}
            ).limit(100)  # Limitar para eficiencia
            
            docs = await cursor.to_list(length=100)
            
            if not docs:
                logger.warning("No documents with embeddings found")
                return []
            
            # Calcular similitudes
            query_vec = np.array(query_embedding)
            results = []
            
            for doc in docs:
                doc_vec = np.array(doc.get("embedding", []))
                if len(doc_vec) == 0:
                    continue
          
_keyword_search method · python · L126-L153 (28 LOC)
services/knowledge_base.py
    async def _keyword_search(
        self,
        query: str,
        limit: int
    ) -> List[Dict]:
        """
        Búsqueda por palabras clave (fallback)
        """
        try:
            # Búsqueda de texto simple en MongoDB
            cursor = self.db.knowledge.find(
                {"$text": {"$search": query}},
                {"_id": 0, "score": {"$meta": "textScore"}}
            ).sort([("score", {"$meta": "textScore"})]).limit(limit)
            
            docs = await cursor.to_list(length=limit)
            
            return [{
                "content": doc.get("content", ""),
                "title": doc.get("title", "Sin título"),
                "category": doc.get("category", "General"),
                "score": doc.get("score", 0.0),
                "metadata": doc.get("metadata", {})
            } for doc in docs]
            
        except Exception as e:
            logger.error(f"Error in keyword search: {str(e)}")
            return []
store_document method · python · L155-L186 (32 LOC)
services/knowledge_base.py
    async def store_document(
        self,
        content: str,
        title: str,
        category: str = "General",
        metadata: Optional[Dict] = None
    ) -> bool:
        """
        Almacena un documento con su embedding en la base de conocimientos
        """
        try:
            # Generar embedding
            embedding = await self.embed_query(content)
            
            # Preparar documento
            doc = {
                "content": content,
                "title": title,
                "category": category,
                "embedding": embedding,
                "metadata": metadata or {}
            }
            
            # Insertar en MongoDB
            result = await self.db.knowledge.insert_one(doc)
            
            logger.info(f"Document stored: {title}")
            return True
            
        except Exception as e:
            logger.error(f"Error storing document: {str(e)}")
            return False
Want this analysis on your repo? https://repobility.com/scan/
initialize_sample_data method · python · L188-L237 (50 LOC)
services/knowledge_base.py
    async def initialize_sample_data(self):
        """
        Inicializa datos de ejemplo si la base está vacía
        """
        try:
            count = await self.db.knowledge.count_documents({})
            if count > 0:
                logger.info(f"Knowledge base already has {count} documents")
                return
            
            # Datos de ejemplo de Prados de Paraíso
            sample_docs = [
                {
                    "title": "Posesión Legítima",
                    "category": "Propiedad",
                    "content": "La posesión legítima es el derecho que adquiere una persona sobre un inmueble al poseerlo de forma continua, pacífica y pública durante un período determinado. En Prados de Paraíso, los propietarios tienen posesión legítima desde 1998, avalada por documentación notarial y registral."
                },
                {
                    "title": "Proceso de Saneamiento",
                    "category": "Tramitación",
         
LiveAvatarService class · python · L12-L86 (75 LOC)
services/liveavatar.py
class LiveAvatarService:
    """
    Servicio para interactuar con LiveAvatar API
    """
    
    def __init__(self):
        self.api_key = os.getenv("LIVEAVATAR_API_KEY")
        self.avatar_id = os.getenv("LIVEAVATAR_AVATAR_ID")
        self.base_url = "https://api.heygen.com/v1"
        
        if not self.api_key:
            logger.warning("⚠️ LIVEAVATAR_API_KEY not configured")
        if not self.avatar_id:
            logger.warning("⚠️ LIVEAVATAR_AVATAR_ID not configured")
        
        logger.info(f"✅ LiveAvatar Service initialized (Avatar ID: {self.avatar_id})")
    
    async def create_session_token(self) -> Optional[Dict]:
        """
        Crea un token de sesión para LiveAvatar
        
        Returns:
            Dict con token y información de sesión o None si hay error
        """
        if not self.api_key:
            logger.error("LiveAvatar API key not configured")
            return None
        
        try:
            async with httpx.AsyncClient() 
__init__ method · python · L17-L27 (11 LOC)
services/liveavatar.py
    def __init__(self):
        self.api_key = os.getenv("LIVEAVATAR_API_KEY")
        self.avatar_id = os.getenv("LIVEAVATAR_AVATAR_ID")
        self.base_url = "https://api.heygen.com/v1"
        
        if not self.api_key:
            logger.warning("⚠️ LIVEAVATAR_API_KEY not configured")
        if not self.avatar_id:
            logger.warning("⚠️ LIVEAVATAR_AVATAR_ID not configured")
        
        logger.info(f"✅ LiveAvatar Service initialized (Avatar ID: {self.avatar_id})")
create_session_token method · python · L29-L72 (44 LOC)
services/liveavatar.py
    async def create_session_token(self) -> Optional[Dict]:
        """
        Crea un token de sesión para LiveAvatar
        
        Returns:
            Dict con token y información de sesión o None si hay error
        """
        if not self.api_key:
            logger.error("LiveAvatar API key not configured")
            return None
        
        try:
            async with httpx.AsyncClient() as client:
                headers = {
                    "x-api-key": self.api_key,
                    "Content-Type": "application/json"
                }
                
                response = await client.post(
                    f"{self.base_url}/streaming.create_token",
                    headers=headers,
                    timeout=10.0
                )
                
                if response.status_code != 200:
                    logger.error(f"LiveAvatar token error: {response.status_code} - {response.text}")
                    return None
                
  
get_avatar_config method · python · L74-L86 (13 LOC)
services/liveavatar.py
    def get_avatar_config(self) -> Dict:
        """
        Retorna la configuración del avatar para el cliente
        """
        return {
            "avatar_id": self.avatar_id,
            "avatar_name": "Valeria - IA",
            "language": "es-ES",
            "voice_settings": {
                "rate": 1.0,
                "emotion": "friendly"
            }
        }
LiveAvatarService class · python · L28-L244 (217 LOC)
services/liveavatar_service.py
class LiveAvatarService:
    def __init__(self):
        self.api_key  = os.getenv("LIVEAVATAR_API_KEY")
        self.avatar_id = os.getenv("LIVEAVATAR_AVATAR_ID")

        # Dict: session_id → websocket connection
        self._ws_connections: Dict[str, websockets.WebSocketClientProtocol] = {}
        self._ws_listeners:   Dict[str, asyncio.Task] = {}
        self._event_counters: Dict[str, int] = {}

        if not self.api_key:
            logger.warning("⚠️ LIVEAVATAR_API_KEY not configured")
        if not self.avatar_id:
            logger.warning("⚠️ LIVEAVATAR_AVATAR_ID not configured")
        else:
            logger.info(f"✅ LiveAvatar LITE Service ready (Avatar: {self.avatar_id})")

    def _headers(self) -> Dict:
        return {"X-API-KEY": self.api_key, "Content-Type": "application/json"}

    def _event_id(self, session_id: str) -> str:
        n = self._event_counters.get(session_id, 0) + 1
        self._event_counters[session_id] = n
        return f"{session_id[:8]}_
__init__ method · python · L29-L43 (15 LOC)
services/liveavatar_service.py
    def __init__(self):
        self.api_key  = os.getenv("LIVEAVATAR_API_KEY")
        self.avatar_id = os.getenv("LIVEAVATAR_AVATAR_ID")

        # Dict: session_id → websocket connection
        self._ws_connections: Dict[str, websockets.WebSocketClientProtocol] = {}
        self._ws_listeners:   Dict[str, asyncio.Task] = {}
        self._event_counters: Dict[str, int] = {}

        if not self.api_key:
            logger.warning("⚠️ LIVEAVATAR_API_KEY not configured")
        if not self.avatar_id:
            logger.warning("⚠️ LIVEAVATAR_AVATAR_ID not configured")
        else:
            logger.info(f"✅ LiveAvatar LITE Service ready (Avatar: {self.avatar_id})")
_event_id method · python · L48-L51 (4 LOC)
services/liveavatar_service.py
    def _event_id(self, session_id: str) -> str:
        n = self._event_counters.get(session_id, 0) + 1
        self._event_counters[session_id] = n
        return f"{session_id[:8]}_{n}_{datetime.now().strftime('%H%M%S%f')}"
Source: Repobility analyzer · https://repobility.com
create_session method · python · L56-L114 (59 LOC)
services/liveavatar_service.py
    async def create_session(self, system_prompt: Optional[str] = None) -> Dict:
        """
        Crea sesión LITE en liveavatar.com.
        Returns: session_id, session_token, livekit_url, livekit_token, ws_url
        """
        if not self.api_key or not self.avatar_id:
            raise ValueError("LiveAvatar API key or avatar ID not configured")

        payload = {
            "mode": "LITE",
            "avatar_id": self.avatar_id,
        }

        logger.info(f"🔄 Creating LiveAvatar LITE session (avatar={self.avatar_id})")

        async with httpx.AsyncClient(timeout=30.0) as client:
            # Step 1: get session token
            r = await client.post(
                f"{BASE_URL}/sessions/token",
                headers=self._headers(),
                json=payload
            )
            result = r.json()
            logger.info(f"Token response: code={result.get('code')} msg={result.get('message')}")

            if result.get("code") != 1000:
                
connect_websocket method · python · L119-L138 (20 LOC)
services/liveavatar_service.py
    async def connect_websocket(self, session_id: str, ws_url: str) -> None:
        """
        Conecta el WebSocket de la sesión LITE y espera estado 'connected'.
        Lanza una tarea en background para mantener el WS vivo.
        """
        if session_id in self._ws_connections:
            logger.info(f"WS already connected for session {session_id[:8]}")
            return

        logger.info(f"🔌 Connecting WebSocket for session {session_id[:8]}...")
        ws = await websockets.connect(ws_url, ping_interval=20, ping_timeout=10)
        self._ws_connections[session_id] = ws
        self._event_counters[session_id] = 0

        # LITE mode WS is ready immediately after connect — no handshake event needed
        logger.info(f"✅ WebSocket connected for session {session_id[:8]}")

        # Background listener (logs events, handles reconnect if needed)
        task = asyncio.create_task(self._ws_listener(session_id, ws))
        self._ws_listeners[session_id] = task
_ws_listener method · python · L140-L159 (20 LOC)
services/liveavatar_service.py
    async def _ws_listener(self, session_id: str, ws) -> None:
        """Background task: escucha eventos del WebSocket."""
        try:
            async for raw in ws:
                try:
                    evt = json.loads(raw)
                    etype = evt.get("type", "")
                    if etype == "agent.speak_started":
                        logger.info(f"🔊 Avatar speak started [{session_id[:8]}]")
                    elif etype == "agent.speak_ended":
                        logger.info(f"✅ Avatar speak ended [{session_id[:8]}]")
                    elif etype == "session.state_updated":
                        logger.info(f"Session state: {evt.get('state')} [{session_id[:8]}]")
                except Exception:
                    pass
        except Exception as e:
            logger.warning(f"WS listener ended for {session_id[:8]}: {e}")
        finally:
            self._ws_connections.pop(session_id, None)
            self._ws_listeners.pop(session_id, None)
speak method · python · L164-L189 (26 LOC)
services/liveavatar_service.py
    async def speak(self, session_id: str, pcm_bytes: bytes) -> None:
        """
        Envía audio PCM 16-bit 24kHz al avatar para lip-sync.
        Si el audio es muy largo lo divide en chunks de ~1 seg (96KB).
        """
        ws = self._ws_connections.get(session_id)
        if not ws:
            raise Exception(f"No WebSocket connection for session {session_id[:8]}")

        CHUNK = 96_000  # 1 segundo de PCM 16-bit 24kHz = 48000 samples × 2 bytes
        offset = 0
        chunk_n = 0
        while offset < len(pcm_bytes):
            chunk = pcm_bytes[offset:offset + CHUNK]
            offset += CHUNK
            chunk_n += 1
            audio_b64 = base64.b64encode(chunk).decode("utf-8")
            event = {
                "type":     "agent.speak",
                "event_id": self._event_id(session_id),
                "audio":    audio_b64,
            }
            await ws.send(json.dumps(event))
            logger.debug(f"Sent audio chunk {chunk_n} ({len(chunk)} b
interrupt method · python · L194-L202 (9 LOC)
services/liveavatar_service.py
    async def interrupt(self, session_id: str) -> None:
        """Detiene al avatar inmediatamente."""
        ws = self._ws_connections.get(session_id)
        if not ws:
            logger.warning(f"No WS to interrupt for {session_id[:8]}")
            return
        event = {"type": "agent.interrupt", "event_id": self._event_id(session_id)}
        await ws.send(json.dumps(event))
        logger.info(f"⏹️ Interrupted avatar [{session_id[:8]}]")
keep_alive method · python · L207-L213 (7 LOC)
services/liveavatar_service.py
    async def keep_alive(self, session_id: str) -> None:
        ws = self._ws_connections.get(session_id)
        if not ws:
            return
        event = {"type": "session.keep_alive", "event_id": self._event_id(session_id)}
        await ws.send(json.dumps(event))
        logger.debug(f"♻️ Keep-alive sent [{session_id[:8]}]")
close_session method · python · L218-L234 (17 LOC)
services/liveavatar_service.py
    async def close_session(self, session_id: str) -> bool:
        # Cancel listener task
        task = self._ws_listeners.pop(session_id, None)
        if task:
            task.cancel()

        # Close WS
        ws = self._ws_connections.pop(session_id, None)
        if ws:
            try:
                await ws.close()
            except Exception:
                pass

        self._event_counters.pop(session_id, None)
        logger.info(f"✅ Session {session_id[:8]} closed")
        return True
get_avatar_config method · python · L236-L241 (6 LOC)
services/liveavatar_service.py
    def get_avatar_config(self) -> Dict:
        return {
            "avatar_id":   self.avatar_id,
            "avatar_name": "Valeria - Asistente Legal IA",
            "service":     "liveavatar-lite",
        }
Repobility (the analyzer behind this table) · https://repobility.com
SQLiteKnowledgeBase class · python · L14-L142 (129 LOC)
services/sqlite_knowledge.py
class SQLiteKnowledgeBase:
    def __init__(self, db_path: str = "/app/backend/prados.db"):
        self.db_path = db_path
        self._init_database()
        logger.info(f"✅ SQLite KnowledgeBase initialized at {db_path}")

    def _init_database(self):
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                    CREATE TABLE IF NOT EXISTS conocimiento_legal (
                        id INTEGER PRIMARY KEY AUTOINCREMENT,
                        titulo TEXT NOT NULL,
                        contenido TEXT NOT NULL,
                        embedding TEXT,
                        metadata TEXT,
                        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                    )
                ''')
                cursor.execute('''
                    CREATE INDEX IF NOT EXISTS idx_titulo
                    ON conocimiento_legal(titulo)
                ''')
                con
__init__ method · python · L15-L18 (4 LOC)
services/sqlite_knowledge.py
    def __init__(self, db_path: str = "/app/backend/prados.db"):
        self.db_path = db_path
        self._init_database()
        logger.info(f"✅ SQLite KnowledgeBase initialized at {db_path}")
_init_database method · python · L20-L42 (23 LOC)
services/sqlite_knowledge.py
    def _init_database(self):
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                    CREATE TABLE IF NOT EXISTS conocimiento_legal (
                        id INTEGER PRIMARY KEY AUTOINCREMENT,
                        titulo TEXT NOT NULL,
                        contenido TEXT NOT NULL,
                        embedding TEXT,
                        metadata TEXT,
                        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                    )
                ''')
                cursor.execute('''
                    CREATE INDEX IF NOT EXISTS idx_titulo
                    ON conocimiento_legal(titulo)
                ''')
                conn.commit()
            logger.info("✅ Database tables created/verified")
        except Exception as e:
            logger.error(f"Error initializing database: {str(e)}")
            raise
_keyword_score method · python · L44-L53 (10 LOC)
services/sqlite_knowledge.py
    def _keyword_score(self, query: str, titulo: str, contenido: str) -> float:
        """Puntaje simple por coincidencia de palabras clave."""
        # Cap query length to prevent ReDoS — use str.split() instead of regex
        safe_query = query[:500].lower()
        words = set(w for w in safe_query.split() if len(w) >= 2)
        if not words:
            return 0.0
        text = (titulo + " " + contenido).lower()
        matches = sum(1 for w in words if w in text)
        return matches / len(words)
add_document method · python · L55-L70 (16 LOC)
services/sqlite_knowledge.py
    def add_document(self, titulo: str, contenido: str, metadata: Optional[Dict] = None):
        try:
            metadata_json = json.dumps(metadata) if metadata else None
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('''
                    INSERT INTO conocimiento_legal (titulo, contenido, metadata)
                    VALUES (?, ?, ?)
                ''', (titulo, contenido, metadata_json))
                conn.commit()
                doc_id = cursor.lastrowid
            logger.info(f"✅ Document added: {titulo} (ID: {doc_id})")
            return doc_id
        except Exception as e:
            logger.error(f"Error adding document: {str(e)}")
            raise
search method · python · L72-L100 (29 LOC)
services/sqlite_knowledge.py
    def search(self, query: str, top_k: int = 3) -> List[Dict]:
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('SELECT id, titulo, contenido FROM conocimiento_legal')
                rows = cursor.fetchall()

            if not rows:
                logger.warning("No documents in knowledge base")
                return []

            results = []
            for doc_id, titulo, contenido in rows:
                score = self._keyword_score(query, titulo, contenido)
                results.append({
                    'id': doc_id,
                    'titulo': titulo,
                    'contenido': contenido,
                    'score': score
                })

            results.sort(key=lambda x: x['score'], reverse=True)
            top_results = results[:top_k]
            logger.info(f"Search '{query}' → {len(top_results)} docs")
            return top_results

        except Excepti
get_all_documents method · python · L102-L120 (19 LOC)
services/sqlite_knowledge.py
    def get_all_documents(self) -> List[Dict]:
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('SELECT id, titulo, contenido, metadata FROM conocimiento_legal')
                rows = cursor.fetchall()
            documents = []
            for doc_id, titulo, contenido, metadata_json in rows:
                metadata = json.loads(metadata_json) if metadata_json else {}
                documents.append({
                    'id': doc_id,
                    'titulo': titulo,
                    'contenido': contenido[:200] + '...',
                    'metadata': metadata
                })
            return documents
        except Exception as e:
            logger.error(f"Error getting documents: {str(e)}")
            return []
count_documents method · python · L122-L131 (10 LOC)
services/sqlite_knowledge.py
    def count_documents(self) -> int:
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('SELECT COUNT(*) FROM conocimiento_legal')
                count = cursor.fetchone()[0]
            return count
        except Exception as e:
            logger.error(f"Error counting documents: {str(e)}")
            return 0
If a scraper extracted this row, it came from Repobility (https://repobility.com)
clear_database method · python · L133-L142 (10 LOC)
services/sqlite_knowledge.py
    def clear_database(self):
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()
                cursor.execute('DELETE FROM conocimiento_legal')
                conn.commit()
            logger.info("✅ Database cleared")
        except Exception as e:
            logger.error(f"Error clearing database: {str(e)}")
            raise
‹ prevpage 2 / 2