← back to krivamsh007__fabric-agent

Function bodies 807 total

All specs Real LLM only Function bodies
ImpactAgent._calculate_risk_score method · python · L337-L427 (91 LOC)
fabric_agent/agents/specialized.py
    async def _calculate_risk_score(
        self,
        total_impact: int,
        has_production_reports: bool,
        has_downstream_dependencies: bool,
        proposed_change: str = "",
    ) -> Dict:
        """
        Calculate composite risk score — data-driven when history is available.

        STEP 1: Algorithmic base score (always runs):
          base = total_impact × 2
          × 1.5 if production reports are affected
          × 1.3 if downstream dependencies exist

        STEP 2: Historical calibration (when operation_memory is wired in):
          Retrieves top-K similar past operations via vector similarity.
          If historical_failure_rate > 30%: score × (1 + failure_rate)
          Also surfaces past failure reasons as recommendations.

        WHY THIS MATTERS:
          A "rename Revenue → Gross Revenue" might score medium_risk by
          algorithm alone. But if history shows that 5 of the last 7 similar
          renames rolled back due to DAX formula
ImpactAgent.analyze_change method · python · L429-L478 (50 LOC)
fabric_agent/agents/specialized.py
    async def analyze_change(
        self,
        change_type: str,
        workspace_id: str,
        **kwargs,
    ) -> AgentResult:
        """
        Analyze a proposed change.
        
        Args:
            change_type: "measure_rename", "notebook_change", etc.
            workspace_id: Target workspace
            **kwargs: Change-specific parameters
        """
        import time
        start = time.time()
        
        try:
            if change_type == "measure_rename":
                impact = await self._analyze_measure_rename(
                    workspace_id=workspace_id,
                    model_id=kwargs.get("model_id", ""),
                    measure_name=kwargs.get("measure_name", ""),
                    new_name=kwargs.get("new_name", ""),
                )
            elif change_type == "notebook_change":
                impact = await self._analyze_notebook_change(
                    workspace_id=workspace_id,
                    notebook_id=kwargs.
RefactorAgent.__init__ method · python · L496-L532 (37 LOC)
fabric_agent/agents/specialized.py
    def __init__(
        self,
        memory: SharedMemory,
        workspace_name: str,
    ):
        self.workspace_name = workspace_name
        self._executor = None
        
        tools = [
            ToolDefinition(
                name="rename_measure",
                description="Rename a measure with automatic reference updates",
                parameters={
                    "model_name": {"type": "string"},
                    "old_name": {"type": "string"},
                    "new_name": {"type": "string"},
                    "dry_run": {"type": "boolean"},
                },
                handler=self._rename_measure,
            ),
            ToolDefinition(
                name="rollback",
                description="Rollback to a checkpoint",
                parameters={
                    "checkpoint_id": {"type": "string"},
                    "dry_run": {"type": "boolean"},
                },
                handler=self._rollback,
            ),
    
RefactorAgent._get_executor method · python · L534-L539 (6 LOC)
fabric_agent/agents/specialized.py
    def _get_executor(self):
        """Lazy load executor."""
        if self._executor is None:
            from fabric_agent.refactor.executor import RefactorExecutor
            self._executor = RefactorExecutor(workspace_name=self.workspace_name)
        return self._executor
RefactorAgent._rename_measure method · python · L541-L557 (17 LOC)
fabric_agent/agents/specialized.py
    async def _rename_measure(
        self,
        model_name: str,
        old_name: str,
        new_name: str,
        dry_run: bool = True,
    ) -> Dict:
        """Execute measure rename."""
        executor = self._get_executor()
        result = await executor.rename_measure(
            model_name=model_name,
            old_name=old_name,
            new_name=new_name,
            update_references=True,
            dry_run=dry_run,
        )
        return result.to_dict()
RefactorAgent._rollback method · python · L559-L570 (12 LOC)
fabric_agent/agents/specialized.py
    async def _rollback(
        self,
        checkpoint_id: str,
        dry_run: bool = True,
    ) -> Dict:
        """Rollback to checkpoint."""
        executor = self._get_executor()
        result = await executor.rollback(
            checkpoint_id=checkpoint_id,
            dry_run=dry_run,
        )
        return result.to_dict()
RefactorAgent.execute_refactor method · python · L572-L625 (54 LOC)
fabric_agent/agents/specialized.py
    async def execute_refactor(
        self,
        refactor_type: str,
        dry_run: bool = True,
        **kwargs,
    ) -> AgentResult:
        """
        Execute a refactoring operation.
        
        Args:
            refactor_type: "rename_measure", "rollback"
            dry_run: If True, only simulate
            **kwargs: Refactor-specific parameters
        """
        import time
        start = time.time()
        
        try:
            if refactor_type == "rename_measure":
                result = await self._rename_measure(
                    model_name=kwargs.get("model_name", ""),
                    old_name=kwargs.get("old_name", ""),
                    new_name=kwargs.get("new_name", ""),
                    dry_run=dry_run,
                )
            elif refactor_type == "rollback":
                result = await self._rollback(
                    checkpoint_id=kwargs.get("checkpoint_id", ""),
                    dry_run=dry_run,
                )
Repobility — the code-quality scanner for AI-generated software · https://repobility.com
HealerAgent.__init__ method · python · L665-L723 (59 LOC)
fabric_agent/agents/specialized.py
    def __init__(
        self,
        memory: SharedMemory,
        fabric_client: Any,               # FabricApiClient
        memory_manager: Optional[Any] = None,  # MemoryManager
        contracts_dir: Optional[str] = None,
        stale_hours: int = 24,
    ):
        self.fabric_client = fabric_client
        self.memory_manager = memory_manager

        tools = [
            ToolDefinition(
                name="scan_for_anomalies",
                description="Scan workspace(s) for infrastructure anomalies",
                parameters={
                    "workspace_ids": {"type": "array", "items": {"type": "string"}},
                },
                handler=self._scan_for_anomalies,
            ),
            ToolDefinition(
                name="build_healing_plan",
                description="Build a healing plan from detected anomalies",
                parameters={
                    "workspace_ids": {"type": "array", "items": {"type": "string"}},
                }
HealerAgent._get_monitor method · python · L725-L735 (11 LOC)
fabric_agent/agents/specialized.py
    def _get_monitor(self):
        """Lazy-construct SelfHealingMonitor on first use."""
        if self._monitor is None:
            from fabric_agent.healing.monitor import SelfHealingMonitor
            self._monitor = SelfHealingMonitor(
                client=self.fabric_client,
                memory=self.memory_manager,
                contracts_dir=self._contracts_dir,
                stale_hours=self._stale_hours,
            )
        return self._monitor
HealerAgent.run method · python · L737-L785 (49 LOC)
fabric_agent/agents/specialized.py
    async def run(self, task: str, context: Dict[str, Any]) -> AgentResult:
        """
        Execute a healing task.

        Args:
            task: One of "health_scan", "scan_anomalies", "build_plan", "execute_plan".
            context: Must include "workspace_ids". Optional "dry_run" (default True).

        Returns:
            AgentResult with result=HealthReport (or List[Anomaly] / HealingPlan).
        """
        import time
        start = time.time()
        workspace_ids = context.get("workspace_ids", [])
        dry_run = context.get("dry_run", True)

        try:
            if task in ("health_scan", "get_health_report"):
                result = await self._get_health_report(
                    workspace_ids=workspace_ids, dry_run=dry_run
                )
            elif task == "scan_anomalies":
                result = await self._scan_for_anomalies(workspace_ids=workspace_ids)
            elif task == "build_plan":
                result = await self._build_he
HealerAgent._build_healing_plan method · python · L793-L798 (6 LOC)
fabric_agent/agents/specialized.py
    async def _build_healing_plan(self, workspace_ids: List[str], **_) -> Dict:
        """Scan + build plan, return serializable plan dict."""
        monitor = self._get_monitor()
        anomalies = await monitor.detector.scan(workspace_ids)
        plan = monitor.healer.build_plan(anomalies)
        return plan.to_dict()
HealerAgent._execute_healing_plan method · python · L800-L808 (9 LOC)
fabric_agent/agents/specialized.py
    async def _execute_healing_plan(
        self, workspace_ids: List[str], dry_run: bool = True, **_
    ) -> Dict:
        """Scan + build plan + execute, return result dict."""
        monitor = self._get_monitor()
        anomalies = await monitor.detector.scan(workspace_ids)
        plan = monitor.healer.build_plan(anomalies)
        result = await monitor.healer.execute_healing_plan(plan, dry_run=dry_run)
        return result.to_dict()
HealerAgent._get_health_report method · python · L810-L816 (7 LOC)
fabric_agent/agents/specialized.py
    async def _get_health_report(
        self, workspace_ids: List[str], dry_run: bool = True, **_
    ) -> Dict:
        """Full scan cycle, return HealthReport as dict."""
        monitor = self._get_monitor()
        report = await monitor.run_once(workspace_ids, dry_run=dry_run)
        return report.to_dict()
QueryResult.compute_checksum method · python · L38-L43 (6 LOC)
fabric_agent/agents/validation.py
    def compute_checksum(self) -> str:
        """Compute a checksum of the result data."""
        if self.data is None:
            return ""
        data_str = json.dumps(self.data, sort_keys=True, default=str)
        return hashlib.sha256(data_str.encode()).hexdigest()[:16]
_try_import_sempy function · python · L84-L90 (7 LOC)
fabric_agent/agents/validation.py
def _try_import_sempy():
    """Try to import SemPy."""
    try:
        import sempy.fabric as fabric
        return fabric
    except ImportError:
        return None
Repobility analyzer · published findings · https://repobility.com
ValidationAgent._ensure_sempy method · python · L120-L126 (7 LOC)
fabric_agent/agents/validation.py
    def _ensure_sempy(self) -> Any:
        """Ensure SemPy is available."""
        if self._fabric is None:
            self._fabric = _try_import_sempy()
        if self._fabric is None:
            raise RuntimeError("SemPy required for validation")
        return self._fabric
ValidationAgent.generate_tests_for_measure method · python · L132-L189 (58 LOC)
fabric_agent/agents/validation.py
    def generate_tests_for_measure(
        self,
        measure_name: str,
        table_name: Optional[str] = None,
    ) -> List[ValidationTest]:
        """
        Generate standard test queries for a measure.
        
        Args:
            measure_name: The measure to test.
            table_name: Optional table context.
        
        Returns:
            List of validation tests.
        """
        tests = []
        
        # Test 1: Simple evaluation
        tests.append(ValidationTest(
            test_id=str(uuid4()),
            name=f"Evaluate {measure_name}",
            description=f"Simple evaluation of [{measure_name}]",
            dax_query=f'EVALUATE ROW("Value", [{measure_name}])',
            expected_behavior="results_match",
        ))
        
        # Test 2: Evaluate with table context (if provided)
        if table_name:
            tests.append(ValidationTest(
                test_id=str(uuid4()),
                name=f"{measure_name} by {table_n
ValidationAgent.generate_tests_for_refactor method · python · L191-L217 (27 LOC)
fabric_agent/agents/validation.py
    def generate_tests_for_refactor(
        self,
        old_name: str,
        new_name: str,
        affected_measures: List[str],
    ) -> List[ValidationTest]:
        """
        Generate comprehensive tests for a refactoring operation.
        
        Args:
            old_name: Original measure name.
            new_name: New measure name.
            affected_measures: List of measures that reference the target.
        
        Returns:
            List of validation tests.
        """
        tests = []
        
        # Test the renamed measure itself (using new name after refactor)
        tests.extend(self.generate_tests_for_measure(new_name))
        
        # Test each affected measure
        for measure in affected_measures[:10]:  # Limit to 10
            tests.extend(self.generate_tests_for_measure(measure))
        
        return tests
ValidationAgent.execute_query method · python · L223-L275 (53 LOC)
fabric_agent/agents/validation.py
    async def execute_query(
        self,
        model_name: str,
        query: str,
    ) -> QueryResult:
        """
        Execute a DAX query and return results.
        
        Args:
            model_name: Semantic model name.
            query: DAX query to execute.
        
        Returns:
            QueryResult with data or error.
        """
        import time
        start_time = time.time()
        
        try:
            fabric = self._ensure_sempy()
            
            # Execute using SemPy evaluate_dax
            df = fabric.evaluate_dax(
                dataset=model_name,
                dax_string=query,
                workspace=self.workspace_name,
            )
            
            execution_time = (time.time() - start_time) * 1000
            
            # Convert DataFrame to list of dicts
            data = df.to_dict(orient='records') if df is not None else []
            
            result = QueryResult(
                query=query,
     
ValidationAgent.capture_state method · python · L277-L299 (23 LOC)
fabric_agent/agents/validation.py
    async def capture_state(
        self,
        model_name: str,
        tests: List[ValidationTest],
    ) -> Dict[str, QueryResult]:
        """
        Capture the current state by running all tests.
        
        Args:
            model_name: Semantic model name.
            tests: List of tests to run.
        
        Returns:
            Dictionary mapping test_id to QueryResult.
        """
        results = {}
        
        for test in tests:
            logger.debug(f"Capturing: {test.name}")
            result = await self.execute_query(model_name, test.dax_query)
            results[test.test_id] = result
        
        return results
ValidationAgent.validate method · python · L305-L375 (71 LOC)
fabric_agent/agents/validation.py
    async def validate(
        self,
        model_name: str,
        tests: List[ValidationTest],
        before_results: Dict[str, QueryResult],
        operation_id: str = "",
    ) -> ValidationReport:
        """
        Validate by comparing before and after results.
        
        Args:
            model_name: Semantic model name.
            tests: List of tests to run.
            before_results: Results captured before refactoring.
            operation_id: ID of the refactoring operation.
        
        Returns:
            ValidationReport with comparison details.
        """
        import time
        start_time = time.time()
        
        comparisons: List[TestComparison] = []
        tests_passed = 0
        tests_failed = 0
        
        for test in tests:
            before = before_results.get(test.test_id)
            if not before:
                logger.warning(f"No before result for test: {test.name}")
                continue
            
            
ValidationAgent._compare_results method · python · L377-L421 (45 LOC)
fabric_agent/agents/validation.py
    def _compare_results(
        self,
        test: ValidationTest,
        before: QueryResult,
        after: QueryResult,
    ) -> Tuple[bool, str]:
        """
        Compare before and after results.
        
        Returns:
            Tuple of (passed, difference_summary).
        """
        # If either query failed, compare error states
        if not before.success and not after.success:
            return True, "Both queries failed (expected if measure was removed)"
        
        if before.success != after.success:
            if not after.success:
                return False, f"Query failed after refactor: {after.error_message}"
            else:
                return False, f"Query started working (was failing before)"
        
        # Both succeeded - compare based on expected behavior
        if test.expected_behavior == "no_error":
            return True, "Query executed without error"
        
        elif test.expected_behavior == "row_count_match":
      
ValidationAgent._detailed_comparison method · python · L423-L461 (39 LOC)
fabric_agent/agents/validation.py
    def _detailed_comparison(
        self,
        before_data: Optional[List[Dict]],
        after_data: Optional[List[Dict]],
        tolerance: float,
    ) -> Optional[str]:
        """
        Perform detailed comparison of result data.
        
        Returns:
            Difference description or None if equal.
        """
        if before_data is None and after_data is None:
            return None
        
        if before_data is None or after_data is None:
            return "One result is None"
        
        if len(before_data) != len(after_data):
            return f"Row count differs: {len(before_data)} vs {len(after_data)}"
        
        # Compare row by row
        for i, (before_row, after_row) in enumerate(zip(before_data, after_data)):
            if set(before_row.keys()) != set(after_row.keys()):
                return f"Row {i}: Column mismatch"
            
            for key in before_row:
                before_val = before_row[key]
                a
Repobility · code-quality intelligence · https://repobility.com
ValidationAgent._generate_recommendations method · python · L463-L489 (27 LOC)
fabric_agent/agents/validation.py
    def _generate_recommendations(
        self,
        comparisons: List[TestComparison],
    ) -> List[str]:
        """Generate recommendations based on test results."""
        recommendations = []
        
        failed = [c for c in comparisons if not c.passed]
        
        if not failed:
            recommendations.append("✅ All validation tests passed. Safe to proceed.")
        else:
            recommendations.append(f"⚠️ {len(failed)} test(s) failed. Review before proceeding.")
            
            for comp in failed[:3]:  # Show first 3
                recommendations.append(f"  - {comp.test.name}: {comp.difference_summary}")
            
            if len(failed) > 3:
                recommendations.append(f"  ... and {len(failed) - 3} more failures")
            
            recommendations.append("")
            recommendations.append("Recommended actions:")
            recommendations.append("1. Review the DAX expressions that changed")
            recommenda
validate_refactor function · python · L496-L521 (26 LOC)
fabric_agent/agents/validation.py
async def validate_refactor(
    workspace_name: str,
    model_name: str,
    measure_name: str,
    before_state: Dict[str, QueryResult],
) -> ValidationReport:
    """
    Quick validation after a refactor.
    
    Example:
        >>> # Before refactor
        >>> validator = ValidationAgent(workspace_name)
        >>> tests = validator.generate_tests_for_measure("Revenue")
        >>> before = await validator.capture_state(model_name, tests)
        >>> 
        >>> # ... do refactor ...
        >>> 
        >>> # Validate
        >>> report = await validate_refactor(
        ...     workspace_name, model_name, "Revenue", before
        ... )
        >>> print(f"Passed: {report.overall_passed}")
    """
    validator = ValidationAgent(workspace_name)
    tests = validator.generate_tests_for_measure(measure_name)
    return await validator.validate(model_name, tests, before_state)
FabricApiError.__init__ method · python · L39-L47 (9 LOC)
fabric_agent/api/fabric_client.py
    def __init__(
        self,
        message: str,
        status_code: Optional[int] = None,
        response_body: Optional[Any] = None,
    ):
        super().__init__(message)
        self.status_code = status_code
        self.response_body = response_body
FabricResponse.__init__ method · python · L62-L81 (20 LOC)
fabric_agent/api/fabric_client.py
    def __init__(self, raw: httpx.Response):
        self.raw = raw

        data: Dict[str, Any] = {}
        if raw.content:
            try:
                parsed = raw.json()
            except Exception:
                parsed = {}

            if parsed is None:
                data = {}
            elif isinstance(parsed, dict):
                data = parsed
            elif isinstance(parsed, list):
                data = {"value": parsed}
            else:
                data = {"value": parsed}

        super().__init__(data)
FabricApiClient.__init__ method · python · L115-L145 (31 LOC)
fabric_agent/api/fabric_client.py
    def __init__(
        self,
        auth_config: FabricAuthConfig,
        base_url: str = "https://api.fabric.microsoft.com",
        api_version: str = "/v1",
        timeout_seconds: int = 30,
        max_retries: int = 3,
        circuit_breaker_threshold: int = 5,
        circuit_breaker_reset_seconds: float = 60.0,
    ):
        self.auth_config = auth_config
        self.base_url = base_url.rstrip("/") + api_version
        self.timeout = httpx.Timeout(timeout_seconds)

        self._token: Optional[str] = None
        self._token_expiry: float = 0.0
        self._credential: Any = None
        self._client: Optional[httpx.AsyncClient] = None
        self._initialized = False

        # Resilience primitives — every HTTP call goes through both.
        # RetryPolicy:    backs off on 429/5xx with full jitter.
        # CircuitBreaker: stops hammering a degraded Fabric API after
        #                 `circuit_breaker_threshold` consecutive failures.
        self._retry_po
FabricApiClient.initialize method · python · L147-L160 (14 LOC)
fabric_agent/api/fabric_client.py
    async def initialize(self) -> None:
        if self._initialized:
            return

        logger.info("Initializing Fabric API client")
        self._credential = await self._get_credential()
        await self._refresh_token()

        self._client = httpx.AsyncClient(
            timeout=self.timeout,
            headers={"Content-Type": "application/json"},
        )
        self._initialized = True
        logger.info("Fabric API client initialized successfully")
FabricApiClient._get_credential method · python · L162-L178 (17 LOC)
fabric_agent/api/fabric_client.py
    async def _get_credential(self):
        from azure.identity.aio import AzureCliCredential, ClientSecretCredential, DefaultAzureCredential

        # Service Principal
        if self.auth_config.client_secret:
            return ClientSecretCredential(
                tenant_id=self.auth_config.tenant_id,
                client_id=self.auth_config.client_id,
                client_secret=self.auth_config.client_secret.get_secret_value(),
            )

        # Interactive mode -> rely on `az login`
        if self.auth_config.auth_mode == AuthMode.INTERACTIVE:
            return AzureCliCredential()

        # Default chain (exclude browser)
        return DefaultAzureCredential(exclude_interactive_browser_credential=True)
FabricApiClient._refresh_token method · python · L180-L190 (11 LOC)
fabric_agent/api/fabric_client.py
    async def _refresh_token(self) -> None:
        # refresh if expiring within 60 seconds
        if self._token and time.time() < self._token_expiry - 60:
            return

        logger.debug("Refreshing access token")
        scopes = ["https://api.fabric.microsoft.com/.default"]
        token_response = await self._credential.get_token(*scopes)
        self._token = token_response.token
        self._token_expiry = float(token_response.expires_on)
        logger.debug(f"Token refreshed, expires at {self._token_expiry}")
Repobility · MCP-ready · https://repobility.com
FabricApiClient._get_headers method · python · L196-L201 (6 LOC)
fabric_agent/api/fabric_client.py
    async def _get_headers(self) -> Dict[str, str]:
        await self._refresh_token()
        return {
            "Authorization": f"Bearer {self._token}",
            "Content-Type": "application/json",
        }
FabricApiClient.request method · python · L203-L263 (61 LOC)
fabric_agent/api/fabric_client.py
    async def request(
        self,
        method: str,
        path: str,
        json_data: Optional[Dict[str, Any]] = None,
        params: Optional[Dict[str, Any]] = None,
    ) -> httpx.Response:
        """
        Execute an HTTP request with automatic retry + circuit breaking.

        All GET/POST/PUT/DELETE calls route through here, so resilience is
        applied uniformly — no per-call retry logic needed at call sites.

        RetryPolicy:    Retries 429/5xx with exponential backoff + full jitter.
        CircuitBreaker: Fast-fails once `failure_threshold` consecutive failures
                        occur, giving the Fabric API time to recover.

        FAANG PARALLEL:
          AWS SDK: every API call goes through the retry handler middleware.
          Google gRPC: retry interceptor wraps all unary calls.
          Stripe API client: automatic retry with idempotency keys.
        """
        self._ensure_initialized()

        url = self.base_url + (path if path.star
FabricApiClient.post_raw method · python · L272-L281 (10 LOC)
fabric_agent/api/fabric_client.py
    async def post_raw(
        self,
        path: str,
        json_data: Optional[Dict[str, Any]] = None,
        params: Optional[Dict[str, Any]] = None,
        json: Optional[Dict[str, Any]] = None,  # alias
    ) -> httpx.Response:
        if json is not None and json_data is None:
            json_data = json
        return await self.request("POST", path, json_data=json_data, params=params)
FabricApiClient.put_raw method · python · L283-L291 (9 LOC)
fabric_agent/api/fabric_client.py
    async def put_raw(
        self,
        path: str,
        json_data: Optional[Dict[str, Any]] = None,
        json: Optional[Dict[str, Any]] = None,  # alias
    ) -> httpx.Response:
        if json is not None and json_data is None:
            json_data = json
        return await self.request("PUT", path, json_data=json_data)
FabricApiClient.get method · python · L300-L308 (9 LOC)
fabric_agent/api/fabric_client.py
    async def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> FabricResponse:
        resp = await self.get_raw(path, params=params)
        if resp.status_code != 200:
            raise FabricApiError(
                f"GET {path} failed with status {resp.status_code}",
                status_code=resp.status_code,
                response_body=resp.json() if resp.content else None,
            )
        return FabricResponse(resp)
FabricApiClient.post method · python · L310-L324 (15 LOC)
fabric_agent/api/fabric_client.py
    async def post(
        self,
        path: str,
        json_data: Optional[Dict[str, Any]] = None,
        params: Optional[Dict[str, Any]] = None,
        json: Optional[Dict[str, Any]] = None,  # alias
    ) -> FabricResponse:
        resp = await self.post_raw(path, json_data=json_data, params=params, json=json)
        if resp.status_code not in (200, 201, 202):
            raise FabricApiError(
                f"POST {path} failed with status {resp.status_code}",
                status_code=resp.status_code,
                response_body=resp.json() if resp.content else None,
            )
        return FabricResponse(resp)
FabricApiClient.put method · python · L326-L339 (14 LOC)
fabric_agent/api/fabric_client.py
    async def put(
        self,
        path: str,
        json_data: Optional[Dict[str, Any]] = None,
        json: Optional[Dict[str, Any]] = None,  # alias
    ) -> FabricResponse:
        resp = await self.put_raw(path, json_data=json_data, json=json)
        if resp.status_code not in (200, 201, 202):
            raise FabricApiError(
                f"PUT {path} failed with status {resp.status_code}",
                status_code=resp.status_code,
                response_body=resp.json() if resp.content else None,
            )
        return FabricResponse(resp)
FabricApiClient.put_binary method · python · L341-L390 (50 LOC)
fabric_agent/api/fabric_client.py
    async def put_binary(
        self,
        path: str,
        data: bytes,
        content_type: str = "application/octet-stream",
    ) -> httpx.Response:
        """
        Upload raw bytes to a Fabric endpoint (e.g., OneLake Files API).

        Used by build_and_upload.py and bootstrap to push .whl files into
        Lakehouse Files/wheels/ so Notebooks can %pip install them.

        Standard REST pattern: PUT with Content-Type: application/octet-stream.
        The regular put() method serialises JSON — this bypasses that and
        streams raw bytes instead.

        FAANG PATTERN:
            Same as AWS S3 PutObject — direct binary streaming upload.
            Compute reads from where data lives (OneLake), not from the
            developer's machine.

        Args:
            path: API path, e.g. /workspaces/{ws}/lakehouses/{lh}/files/...
            data: Raw bytes to upload (e.g., wheel file contents).
            content_type: MIME type (default: application/octet
Repobility — the code-quality scanner for AI-generated software · https://repobility.com
FabricApiClient.delete method · python · L392-L400 (9 LOC)
fabric_agent/api/fabric_client.py
    async def delete(self, path: str) -> FabricResponse:
        resp = await self.delete_raw(path)
        if resp.status_code not in (200, 202, 204):
            raise FabricApiError(
                f"DELETE {path} failed with status {resp.status_code}",
                status_code=resp.status_code,
                response_body=resp.json() if resp.content else None,
            )
        return FabricResponse(resp)
FabricApiClient.wait_for_lro method · python · L413-L524 (112 LOC)
fabric_agent/api/fabric_client.py
    async def wait_for_lro(
        self,
        initial_response: Union[httpx.Response, FabricResponse],
        poll_interval: float = 1.5,
        timeout: int = 300,
    ) -> Dict[str, Any]:
        raw = initial_response.raw if isinstance(initial_response, FabricResponse) else initial_response

        # Not an LRO
        if raw.status_code in (200, 201):
            return raw.json() if raw.content else {}

        # Unexpected status
        if raw.status_code != 202:
            raw.raise_for_status()
            return raw.json() if raw.content else {}

        # LRO headers
        location = raw.headers.get("Location") or raw.headers.get("location")
        op_id = raw.headers.get("x-ms-operation-id") or raw.headers.get("x-ms-operationid")

        # Try to extract from Location if missing
        if not op_id:
            op_id = self._extract_operation_id(location)

        # Prefer documented operations endpoint on api.fabric.microsoft.com
        op_url = f"{self.base_
FabricApiClient.post_with_lro method · python · L526-L554 (29 LOC)
fabric_agent/api/fabric_client.py
    async def post_with_lro(
        self,
        path: str,
        json_data: Optional[Dict[str, Any]] = None,
        params: Optional[Dict[str, Any]] = None,
        json: Optional[Dict[str, Any]] = None,  # alias
        *,
        lro_poll_seconds: float = 1.5,
        max_polls: int = 200,
        timeout: Optional[int] = None,
    ) -> Dict[str, Any]:
        """
        POST and wait for Long Running Operation completion.

        - lro_poll_seconds: poll interval
        - max_polls: used to derive timeout if timeout is not provided
        """
        resp = await self.post_raw(path, json_data=json_data, params=params, json=json)

        poll_interval = float(lro_poll_seconds)
        if timeout is None:
            timeout = int(max(30.0, poll_interval * float(max_polls)))

        if resp.status_code == 202:
            return await self.wait_for_lro(resp, poll_interval=poll_interval, timeout=timeout)

        # Non-LRO create/update responses
        resp.raise_for_stat
FabricApiClient.close method · python · L556-L565 (10 LOC)
fabric_agent/api/fabric_client.py
    async def close(self) -> None:
        if self._client:
            await self._client.aclose()
            self._client = None

        if self._credential and hasattr(self._credential, "close"):
            await self._credential.close()

        self._initialized = False
        logger.info("Fabric API client closed")
FabricApiClient.__aenter__ method · python · L571-L584 (14 LOC)
fabric_agent/api/fabric_client.py
    async def __aenter__(self) -> "FabricApiClient":
        """
        Support ``async with FabricApiClient(...) as client:`` usage.

        Calls initialize() so callers don't need a separate await step,
        and guarantees close() is called even if the body raises.

        FAANG PATTERN:
            boto3 Session, google-cloud clients, and httpx.AsyncClient all
            expose async context managers for the same reason — guaranteed
            cleanup without try/finally boilerplate at every call site.
        """
        await self.initialize()
        return self
SyncFabricApiClient.__init__ method · python · L600-L615 (16 LOC)
fabric_agent/api/fabric_client.py
    def __init__(
        self,
        auth_config: FabricAuthConfig,
        base_url: str = "https://api.fabric.microsoft.com",
        api_version: str = "/v1",
        timeout_seconds: int = 30,
    ):
        import requests

        self.auth_config = auth_config
        self.base_url = base_url.rstrip("/") + api_version
        self.timeout = timeout_seconds

        self._token: Optional[str] = None
        self._session = requests.Session()
        self._session.headers.update({"Content-Type": "application/json"})
SyncFabricApiClient._get_token_sync method · python · L617-L633 (17 LOC)
fabric_agent/api/fabric_client.py
    def _get_token_sync(self) -> str:
        from azure.identity import AzureCliCredential, ClientSecretCredential, DefaultAzureCredential

        if self.auth_config.client_secret:
            credential = ClientSecretCredential(
                tenant_id=self.auth_config.tenant_id,
                client_id=self.auth_config.client_id,
                client_secret=self.auth_config.client_secret.get_secret_value(),
            )
        else:
            if self.auth_config.auth_mode == AuthMode.INTERACTIVE:
                credential = AzureCliCredential()
            else:
                credential = DefaultAzureCredential(exclude_interactive_browser_credential=True)

        scopes = ["https://api.fabric.microsoft.com/.default"]
        return credential.get_token(*scopes).token
_to_openai_messages function · python · L74-L152 (79 LOC)
fabric_agent/api/llm_providers.py
def _to_openai_messages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Convert Anthropic-format message history to OpenAI format.

    Anthropic format (what BaseAgent stores):
      assistant: {"role": "assistant", "content": [TextBlock|dict, ToolUseBlock|dict, ...]}
      tool results: {"role": "user", "content": [{"type": "tool_result", "tool_use_id": "...", "content": "..."}]}

    OpenAI format (what Ollama/Azure OpenAI expect):
      assistant: {"role": "assistant", "content": "text", "tool_calls": [...]}
      tool results: {"role": "tool", "tool_call_id": "...", "content": "..."}
    """
    result: List[Dict[str, Any]] = []

    for msg in messages:
        role = msg.get("role", "user")
        content = msg.get("content", "")

        # Simple string content — compatible as-is
        if isinstance(content, str):
            result.append({"role": role, "content": content})
            continue

        # List of content blocks
        if isinstance(con
Repobility analyzer · published findings · https://repobility.com
_to_openai_tools function · python · L155-L172 (18 LOC)
fabric_agent/api/llm_providers.py
def _to_openai_tools(tools: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Convert Anthropic tool schemas to OpenAI function calling format.

    Anthropic: {"name": "...", "description": "...", "input_schema": {...}}
    OpenAI:    {"type": "function", "function": {"name": "...", "description": "...", "parameters": {...}}}
    """
    result = []
    for tool in tools:
        result.append({
            "type": "function",
            "function": {
                "name": tool["name"],
                "description": tool.get("description", ""),
                "parameters": tool.get("input_schema", {"type": "object", "properties": {}}),
            },
        })
    return result
_from_openai_response function · python · L175-L233 (59 LOC)
fabric_agent/api/llm_providers.py
def _from_openai_response(response: Dict[str, Any]) -> Dict[str, Any]:
    """
    Convert an OpenAI-format chat completion response to Anthropic format.

    OpenAI response:
      choices[0].message.content: "text" | None
      choices[0].message.tool_calls: [{"id": "...", "function": {"name": "...", "arguments": "{...}"}}]
      choices[0].finish_reason: "stop" | "tool_calls" | "length"
      usage.prompt_tokens / completion_tokens

    Anthropic format (what BaseAgent expects):
      content: [{"type": "text", "text": "..."}, {"type": "tool_use", "id": "...", "name": "...", "input": {...}}]
      stop_reason: "end_turn" | "tool_use" | "max_tokens"
      usage: {"input_tokens": N, "output_tokens": N}
    """
    choices = response.get("choices", [{}])
    choice = choices[0] if choices else {}
    message = choice.get("message", {})
    finish_reason = choice.get("finish_reason", "stop")
    usage = response.get("usage", {})

    content: List[Dict[str, Any]] = []

    # Text conten
_extract_text function · python · L236-L242 (7 LOC)
fabric_agent/api/llm_providers.py
def _extract_text(block: Any) -> str:
    """Extract text string from a content block (dict or Anthropic object)."""
    if isinstance(block, dict):
        return block.get("text", "") if block.get("type") == "text" else ""
    if hasattr(block, "type") and block.type == "text":
        return getattr(block, "text", "")
    return ""
‹ prevpage 2 / 17next ›