← back to dreamer09__agentxspace

Function bodies 529 total

All specs Real LLM only Function bodies
tt_search function · python · L239-L247 (9 LOC)
src/hive_mcp/rest_api.py
async def tt_search(
    q: str = Query(...),
    count: int = Query(20, ge=1, le=50),
):
    from hive_mcp.platforms.tiktok import search_videos
    try:
        return await search_videos(q, count=count, proxy_url=_proxy_url)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
tt_hashtag function · python · L251-L259 (9 LOC)
src/hive_mcp/rest_api.py
async def tt_hashtag(
    tag: str = Query(...),
    count: int = Query(20, ge=1, le=50),
):
    from hive_mcp.platforms.tiktok import get_hashtag_videos
    try:
        return await get_hashtag_videos(tag, count=count, proxy_url=_proxy_url)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
tt_trending function · python · L263-L270 (8 LOC)
src/hive_mcp/rest_api.py
async def tt_trending(
    count: int = Query(20, ge=1, le=50),
):
    from hive_mcp.platforms.tiktok import get_trending_videos
    try:
        return await get_trending_videos(count=count, proxy_url=_proxy_url)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
weibo function · python · L276-L281 (6 LOC)
src/hive_mcp/rest_api.py
async def weibo(count: int = Query(50, ge=1, le=100)):
    from hive_mcp.platforms.trends import get_weibo_trending
    try:
        return await get_weibo_trending(count, proxy_url=_proxy_url)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
zhihu function · python · L285-L290 (6 LOC)
src/hive_mcp/rest_api.py
async def zhihu(count: int = Query(50, ge=1, le=100)):
    from hive_mcp.platforms.trends import get_zhihu_trending
    try:
        return await get_zhihu_trending(count, proxy_url=_proxy_url)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
bilibili function · python · L294-L299 (6 LOC)
src/hive_mcp/rest_api.py
async def bilibili(count: int = Query(50, ge=1, le=100)):
    from hive_mcp.platforms.trends import get_bilibili_rank
    try:
        return await get_bilibili_rank(count, proxy_url=_proxy_url)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
main function · python · L304-L310 (7 LOC)
src/hive_mcp/rest_api.py
def main():
    """CLI 入口: hive-mcp-api"""
    import uvicorn
    host = os.getenv("HOST", "0.0.0.0")
    port = int(os.getenv("PORT", "8000"))
    log_level = os.getenv("LOG_LEVEL", "info").lower()
    uvicorn.run("hive_mcp.rest_api:app", host=host, port=port, log_level=log_level)
All rows scored by the Repobility analyzer (https://repobility.com)
_get_http_client function · python · L225-L236 (12 LOC)
src/hive_mcp/server.py
async def _get_http_client() -> httpx.AsyncClient:
    """获取或创建共享 HTTP 客户端(带连接池)。"""
    global _http_client
    if _http_client is None or _http_client.is_closed:
        async with _http_client_lock:
            if _http_client is None or _http_client.is_closed:
                _http_client = httpx.AsyncClient(
                    timeout=PROXY_TIMEOUT_MS / 1000 + 10,
                    trust_env=False,
                    limits=httpx.Limits(max_connections=20, max_keepalive_connections=10),
                )
    return _http_client
_RateLimiter.__init__ method · python · L251-L272 (22 LOC)
src/hive_mcp/server.py
    def __init__(self) -> None:
        raw = os.getenv("HIVE_RATE_LIMIT", "")
        self._enabled = False
        self._max_calls = 100
        self._period = timedelta(hours=1)
        if raw:
            try:
                parts = raw.split("/")
                self._max_calls = int(parts[0])
                unit = parts[1].strip().lower() if len(parts) > 1 else "1h"
                if unit.endswith("h"):
                    self._period = timedelta(hours=int(unit[:-1] or 1))
                elif unit.endswith("m"):
                    self._period = timedelta(minutes=int(unit[:-1] or 1))
                else:
                    self._period = timedelta(seconds=int(unit))
                self._enabled = True
                logger.info("速率限制: %d 次 / %s", self._max_calls, self._period)
            except Exception as exc:
                logger.warning("HIVE_RATE_LIMIT 格式错误: %s,已禁用限制", exc)

        self._calls: dict[str, list[float]] = defaultdict(list)
_RateLimiter.check method · python · L274-L289 (16 LOC)
src/hive_mcp/server.py
    def check(self, tool: str = "default") -> tuple[bool, str]:
        """返回 (is_allowed, error_message)。使用单调时钟避免系统时间调整漏洞。"""
        if not self._enabled:
            return True, ""
        import time as _time
        now = _time.monotonic()
        period_sec = self._period.total_seconds()
        self._calls[tool] = [t for t in self._calls[tool] if now - t < period_sec]
        if len(self._calls[tool]) >= self._max_calls:
            wait_sec = max(0, int(self._calls[tool][0] + period_sec - now))
            return False, (
                f"⏳ 速率限制:{tool} 已达上限 {self._max_calls} 次/{self._period}。"
                f"请等待约 {wait_sec} 秒后重试。"
            )
        self._calls[tool].append(now)
        return True, ""
_proxy_request_inner function · python · L299-L359 (61 LOC)
src/hive_mcp/server.py
async def _proxy_request_inner(
    url: str,
    country_code: Optional[str] = None,
    timeout_ms: int = PROXY_TIMEOUT_MS,
    mobile: bool = False,
    no_cache: bool = False,
) -> dict:
    """向 Director 发起单次代理请求(内部,不含重试逻辑)。"""
    payload: dict = {
        "target_url": url,
        "method": "GET",
        "options": {"timeout_ms": timeout_ms},
    }
    if country_code:
        payload["options"]["country"] = country_code.lower()

    # UA 轮换 — 对标 Crawl4AI/Crawlee 最佳实践
    headers = {"Authorization": f"Bearer {API_KEY}"}
    headers["User-Agent"] = random_ua(mobile=mobile)
    if no_cache:
        headers["Cache-Control"] = "no-cache, no-store"

    # ── Cookie 注入 (Phase 2.2) ──────────────────────────────
    cookie_header = get_cookie_header_for_url(url)
    if cookie_header:
        headers["Cookie"] = cookie_header

    payload["headers"] = {k: v for k, v in headers.items() if k != "Authorization"}

    client = await _get_http_client()
    r = await client.post(
        f"
_proxy_request function · python · L362-L435 (74 LOC)
src/hive_mcp/server.py
async def _proxy_request(
    url: str,
    country_code: Optional[str] = None,
    timeout_ms: int = PROXY_TIMEOUT_MS,
    mobile: bool = False,
    no_cache: bool = False,
) -> dict:
    """
    向 Director 发起代理请求 — 带 HTTP 缓存 + 断路器 + 重试 + 指数退避 + jitter。
    对标 Scrapy HttpCacheMiddleware + RetryMiddleware + Resilience4j 断路器。

    Phase 1.1: 同一 URL 的重复请求(scrape → extract → highlights)
    直接从内存返回,延迟从秒级降到毫秒级。
    """
    _req_start = time.monotonic()

    # ── HTTP 缓存查询 (Phase 1.1) ──────────────────────────────
    if not no_cache:
        cached = await http_cache.get(url)
        if cached is not None:
            logger.info("HTTP 缓存命中: %s", url[:80])
            metrics.record_cache(hit=True)
            return cached
        metrics.record_cache(hit=False)

    # 断路器检查
    if not director_breaker.is_allowed():
        return {
            "error": f"断路器已断开 (连续失败过多),请等待 {director_breaker.recovery_timeout}s 后重试",
            "_http_error": 503,
        }

    # 请求间隔 jitter — 对标 Scrap
_format_body function · python · L438-L477 (40 LOC)
src/hive_mcp/server.py
def _format_body(
    raw_body: str,
    fmt: str,
    url: str = "",
) -> str:
    """根据 format 参数转换响应体。"""
    if fmt == "markdown":
        return html_to_markdown(raw_body, base_url=url)
    elif fmt == "text":
        return html_to_text(raw_body)
    elif fmt == "json":
        return format_as_json(raw_body)
    elif fmt == "links":
        # Extract links from HTML
        try:
            from bs4 import BeautifulSoup
            soup = BeautifulSoup(raw_body, 'html.parser')
            links = []
            for a_tag in soup.find_all('a', href=True):
                links.append({
                    "text": a_tag.get_text(strip=True),
                    "url": a_tag['href']
                })
            return json.dumps(links, ensure_ascii=False, indent=2)
        except ImportError:
            # Fallback to regex if BeautifulSoup not available
            import re
            links = []
            for match in re.finditer(r'<a[^>]+href=["\']([^"\']+)["\'][^>]*>([^<]+
_guard_api function · python · L480-L494 (15 LOC)
src/hive_mcp/server.py
def _guard_api() -> Optional[str]:
    """检查 API_KEY 与同意状态,返回错误信息或 None。"""
    if not API_KEY:
        return (
            "❌ HIVE_API_KEY 未配置。\n"
            "请到 https://proxy.clawbro.com 注册获取 API Key,"
            "然后在环境变量中设置 HIVE_API_KEY 并重启服务。"
        )
    if not is_consented():
        return (
            "❌ 尚未同意蜂巢网络协议。\n"
            "请先调用 hive_confirm_consent() 同意协议并激活节点。\n"
            "蜂巢网络要求每位用户贡献闲置带宽以换取免费代理服务。"
        )
    return None
lifespan function · python · L502-L594 (93 LOC)
src/hive_mcp/server.py
async def lifespan(server: FastMCP):
    global _proxy_url
    logger.info("═══ Hive MCP v3.0 启动 (node=%s) ═══", node_state.node_id)

    # 初始化共享代理 URL — 所有 Layer 2-3 工具统一走本地代理
    _proxy_url = f"http://127.0.0.1:{_local_proxy.port}"
    logger.info("统一代理 URL: %s", _proxy_url)
    logger.info(
        "Layer 3 状态: social=%s  TikTok token=%s  YouTube key=%s",
        _SOCIAL_AVAILABLE,
        bool(_cfg.TIKTOK_MS_TOKEN),
        bool(_cfg.YOUTUBE_API_KEY),
    )

    # 打印依赖状态
    deps = dependency_status()
    logger.info(
        "依赖状态 — html2text=%s  beautifulsoup4=%s",
        deps["html2text"], deps["beautifulsoup4"],
    )
    if not deps["html2text"]:
        logger.warning("⚠️ html2text 未安装,scrape_as_markdown 将使用降级模式")
    if not deps["beautifulsoup4"]:
        logger.warning("⚠️ beautifulsoup4 未安装,search_engine 解析功能不可用")

    if API_KEY:
        await _local_proxy.start()
        if not _local_proxy.is_running:
            _proxy_url = ""
            logger.error("⚠️ 本地代理启动失败,La
About: code-quality intelligence by Repobility · https://repobility.com
health_check function · python · L612-L663 (52 LOC)
src/hive_mcp/server.py
async def health_check() -> str:
    """\
    检查 Hive MCP 各组件运行状态。
    返回: 节点状态、代理、浏览器、断路器、缓存等健康信息。
    """
    import time as _time
    from hive_mcp.tools.search import _cache

    health = {
        "status": "healthy",
        "node_id": node_state.node_id,
        "node_status": node_state.status,
        "proxy_url": _proxy_url or "未启动",
        "api_key_configured": bool(API_KEY),
        "consented": is_consented(),
        "components": {
            "local_proxy": _local_proxy.is_running if hasattr(_local_proxy, 'is_running') else "unknown",
            "http_client": _http_client is not None and not _http_client.is_closed if _http_client else False,
            "browser_available": _RENDER_AVAILABLE,
            "pdf_available": _PDF_AVAILABLE,
            "crawl_available": _CRAWL_AVAILABLE,
            "social_available": _SOCIAL_AVAILABLE,
            "extra_platforms": _EXTRA_PLATFORMS,
        },
        "resilience": {
            "director_breaker": director_breaker.s
hive_guide function · python · L671-L713 (43 LOC)
src/hive_mcp/server.py
def hive_guide() -> str:
    """蜂巢网络使用引导 — AI 客户端启动时可读取此提示词了解服务能力"""
    status = node_state.status
    proxy_addr = f"http://127.0.0.1:{_local_proxy.port}"

    if status == "no_api_key":
        return (
            "🐝 蜂巢网络 MCP 服务已加载,但 API Key 未配置。\n"
            "请让用户到 https://proxy.clawbro.com 注册获取 API Key,\n"
            "然后重新运行安装命令。"
        )

    if status == "waiting_consent":
        return (
            "🐝 蜂巢网络 MCP 服务已加载,节点尚未激活。\n\n"
            "请告知用户调用 hive_confirm_consent() 即可激活节点并开始使用所有功能。"
        )

    deps = dependency_status()
    dep_note = ""
    if not deps["html2text"] or not deps["beautifulsoup4"]:
        missing = [k for k, v in deps.items() if not v and k in ("html2text", "beautifulsoup4")]
        dep_note = f"\n⚠️ 建议安装:pip install {' '.join(missing)}(启用完整内容解析)\n"

    return (
        "🐝 蜂巢网络 MCP v2.1 运行中。\n\n"
        f"节点状态: {status}\n"
        f"本地代理: {proxy_addr}(已启动)\n"
        f"{dep_note}\n"
        "可用工具:\n"
        "• proxy_fetch(url, format, max_
hive_usage_guide function · python · L717-L757 (41 LOC)
src/hive_mcp/server.py
def hive_usage_guide() -> str:
    """蜂巢网络完整使用指南"""
    return """\
# 🐝 蜂巢网络 (Hive Network) v2.1 使用指南

## 这是什么?
蜂巢网络是一个去中心化 IP 代理服务。安装后你的设备同时作为:
- **代理消费者**:免费使用网络中其他节点的 IP 访问互联网
- **带宽贡献者**:用闲置带宽帮助其他用户,贡献越多积分越多

## 工具列表

| 工具 | 功能 | 典型场景 |
|------|------|---------|
| `proxy_fetch` | 代理访问 URL,支持多种格式 | 访问被封锁的内容 |
| `scrape_as_markdown` | 网页 → 干净 Markdown | 提取文章/文档内容 |
| `search_engine` | Web 搜索(DDG/Bing/Google)| 实时信息检索 |
| `batch_proxy_fetch` | 并发批量抓取多 URL | 批量数据采集 |
| `get_hive_status` | 节点状态 & 流量统计 | 监控节点运行 |
| `stop_hive_node` | 停止贡献带宽 | 节省资源时使用 |

## proxy_fetch 格式参数
- `format="markdown"` (默认) — 返回干净 Markdown,AI 最易处理
- `format="text"` — 纯文本,无格式
- `format="html"` — 清洗后的 HTML
- `format="json"` — 尝试解析为 JSON
- `format="raw"` — 原始响应,不做处理

## 方式 2:本地 HTTP 代理
```bash
curl -x http://127.0.0.1:18983 https://httpbin.org/ip
export http_proxy=http://127.0.0.1:18983
export https_proxy=http://127.0.0.1:18983
```

## 日志位置
`~/.hive/hive-mcp.log`

## 控制台
https://proxy.clawbro.com
"""
hive_confirm_consent function · python · L772-L820 (49 LOC)
src/hive_mcp/server.py
async def hive_confirm_consent() -> str:
    """\
    【首次使用必调】同意蜂巢网络协议并启动节点。
    调用后:1) 节点上线,开始贡献带宽  2) 本地代理 127.0.0.1:18983 可用
    当用户首次使用蜂巢网络、或状态显示 waiting_consent 时,应主动调用此工具。

    重要提示:如果用户第一次提到蜂巢网络/Hive Network,且尚未调用过此工具,
    请先询问用户是否同意加入蜂巢网络(贡献闲置带宽以换取免费 IP 代理),
    用户同意后立即调用此工具。
    """
    if not API_KEY:
        return (
            "❌ HIVE_API_KEY 未配置。\n"
            "请到 https://proxy.clawbro.com 注册并获取 API Key,\n"
            "然后在 MCP 服务环境变量中设置 HIVE_API_KEY 并重启。"
        )

    if is_consented():
        return f"✅ 您已同意过协议。当前节点状态: {node_state.status}"

    save_consent(True)
    logger.info("用户同意协议")

    async with _tunnel_lock:
        if node_state.tunnel_task is None or node_state.tunnel_task.done():
            node_state.stop_requested = False
            node_state.tunnel_task = asyncio.create_task(run_tunnel())
            for _ in range(16):
                await asyncio.sleep(0.5)
                if node_state.status == "online":
                    break

    proxy_i
get_hive_status function · python · L835-L873 (39 LOC)
src/hive_mcp/server.py
async def get_hive_status() -> str:
    """\
    查询蜂巢节点当前运行状态、流量统计和心跳信息。
    当用户问"蜂巢网络状态"、"节点状态"、"代理状态"时调用此工具。
    如果返回 waiting_consent,请提示用户需要先调用 hive_confirm_consent()。
    """
    if node_state.status == "no_api_key":
        return "❌ HIVE_API_KEY 未配置。请在环境变量中设置后重启。"

    if node_state.status == "waiting_consent":
        return CONSENT_TEXT + "\n请先调用 hive_confirm_consent() 同意协议并启动节点。"

    s = node_state.to_dict()
    proxy_status = (
        f"http://{_local_proxy.host}:{_local_proxy.port}"
        if _local_proxy.is_running
        else "未启动"
    )
    deps = dependency_status()

    return "\n".join([
        "🐝 蜂巢节点状态 (v2.1)",
        "─" * 42,
        f"节点 ID  : {s['node_id']}",
        f"状态     : {s['status']}",
        f"Director : {DIRECTOR_URL}",
        f"Relay 地址: {s['relay_url'] or '未获取'}",
        f"本地代理 : {proxy_status}",
        f"连接时间 : {s['connected_at'] or '未连接'}",
        f"最后心跳 : {s['last_heartbeat_at'] or '无'}",
        f"重连次数 : {s['reconnect_count']}",
       
stop_hive_node function · python · L888-L922 (35 LOC)
src/hive_mcp/server.py
async def stop_hive_node() -> str:
    """停止蜂巢节点,断开 Relay 隧道连接。本地代理仍可继续使用。"""
    if node_state.status == "stopped":
        return "⚠️ 节点已经停止。如需重新激活,请调用 hive_confirm_consent()。"

    node_state.stop_requested = True

    try:
        async with httpx.AsyncClient(timeout=5, trust_env=False) as client:
            await client.delete(
                f"{DIRECTOR_URL}/api/v1/nodes/{node_state.node_id}",
                headers={"Authorization": f"Bearer {API_KEY}"},
            )
    except Exception:
        pass

    if node_state.tunnel_task and not node_state.tunnel_task.done():
        node_state.tunnel_task.cancel()
        try:
            await asyncio.wait_for(node_state.tunnel_task, timeout=5)
        except (asyncio.CancelledError, asyncio.TimeoutError):
            pass

    node_state.status = "stopped"
    proxy_note = ""
    if _local_proxy.is_running:
        proxy_note = f"\n本地代理仍在运行: http://{_local_proxy.host}:{_local_proxy.port}"

    return (
        f"✅ 节点已停止。\n"
   
hive_available_countries function · python · L932-L951 (20 LOC)
src/hive_mcp/server.py
async def hive_available_countries() -> str:
    """\
    查询蜂巢网络中当前可用的代理国家列表。
    使用 country_code 参数前,先调用此工具确认该国家有在线节点。
    """
    if not API_KEY:
        return "❌ HIVE_API_KEY 未配置。"
    try:
        async with httpx.AsyncClient(timeout=10, trust_env=False) as client:
            r = await client.get(
                f"{DIRECTOR_URL}/api/v1/nodes/countries",
                headers={"Authorization": f"Bearer {API_KEY}"},
            )
            if r.is_success:
                data = r.json()
                return _json_result(data, "可用代理国家")
            else:
                return f"⚠️ 查询失败 (HTTP {r.status_code})。Director 可能尚不支持此端点。"
    except Exception as e:
        return f"❌ 查询可用国家失败: {e}"
proxy_fetch function · python · L966-L1094 (129 LOC)
src/hive_mcp/server.py
async def proxy_fetch(
    url: Annotated[
        str,
        Field(description="目标 URL,需包含 http:// 或 https://"),
    ],
    format: Annotated[  # noqa: A002
        Literal["markdown", "html", "text", "json", "raw", "links", "summary"],
        Field(
            default="markdown",
            description=(
                "响应格式:\n"
                "• markdown (默认) — 干净 Markdown,AI 最易处理\n"
                "• text   — 纯文本\n"
                "• html   — 清洗后的 HTML\n"
                "• json   — 尝试解析为 JSON\n"
                "• links  — 提取所有链接为 JSON 列表\n"
                "• summary — 前 500 字符的 Markdown 摘要\n"
                "• raw    — 原始响应,不做处理"
            ),
        ),
    ] = "markdown",
    max_chars: Annotated[
        int,
        Field(
            default=DEFAULT_MAX_CHARS,
            ge=100,
            le=100_000,
            description=f"最大返回字符数(默认 {DEFAULT_MAX_CHARS},原来硬限 2000 已移除)",
        ),
    ] = DEFAULT_MAX_CHARS,
    country_code: Annotated[
        Optional[str]
Repobility (the analyzer behind this table) · https://repobility.com
scrape_as_markdown function · python · L1109-L1212 (104 LOC)
src/hive_mcp/server.py
async def scrape_as_markdown(
    url: Annotated[
        str,
        Field(description="要抓取的目标网页 URL"),
    ],
    max_chars: Annotated[
        int,
        Field(
            default=DEFAULT_MAX_CHARS,
            ge=100,
            le=100_000,
            description="最大返回字符数,默认 8000",
        ),
    ] = DEFAULT_MAX_CHARS,
    country_code: Annotated[
        Optional[str],
        Field(
            default=None,
            min_length=2,
            max_length=2,
            description="地理定向:ISO 两字母国家代码(可选)",
        ),
    ] = None,
    mobile: Annotated[
        bool,
        Field(
            default=False,
            description="使用移动设备 User-Agent (iPhone 17),适合检测移动端渲染差异",
        ),
    ] = False,
    no_cache: Annotated[
        bool,
        Field(
            default=False,
            description="禁用缓存:添加 Cache-Control: no-cache, no-store 头,强制获取最新内容",
        ),
    ] = False,
    start_index: Annotated[
        int,
        Field(
            default=0,
           
search_engine function · python · L1227-L1323 (97 LOC)
src/hive_mcp/server.py
async def search_engine(
    query: Annotated[
        str,
        Field(description="搜索关键词或问题"),
    ],
    engine: Annotated[
        Literal["duckduckgo", "bing", "google"],
        Field(
            default="duckduckgo",
            description=(
                "搜索引擎:\n"
                "• duckduckgo (默认) — 无需 API Key,隐私友好\n"
                "• bing              — 微软 Bing\n"
                "• google            — Google 搜索"
            ),
        ),
    ] = "duckduckgo",
    num_results: Annotated[
        int,
        Field(default=10, ge=1, le=20, description="返回结果数量(1-20,默认 10)"),
    ] = 10,
    country_code: Annotated[
        Optional[str],
        Field(
            default=None,
            min_length=2,
            max_length=2,
            description="搜索地区,如 'cn'/'us'/'jp'(影响搜索结果语言和地区)",
        ),
    ] = None,
    freshness: Annotated[
        Optional[Literal["d", "w", "m", "y"]],
        Field(
            default=None,
            description="时间过滤: d=一天内, w=一周内,
clear_search_cache function · python · L1338-L1345 (8 LOC)
src/hive_mcp/server.py
def clear_search_cache() -> str:
    """\
    清除搜索缓存,强制后续搜索重新获取最新结果。
    适合:需要获取最新搜索结果、或缓存占用过多内存时。
    """
    from hive_mcp.tools.search import clear_cache
    clear_cache()
    return "✅ 搜索缓存已清除。后续搜索将获取最新结果。"
batch_proxy_fetch function · python · L1360-L1469 (110 LOC)
src/hive_mcp/server.py
async def batch_proxy_fetch(
    urls: Annotated[
        list[str],
        Field(
            description=f"要抓取的 URL 列表(最多 {DEFAULT_BATCH_LIMIT} 个)",
            min_length=1,
            max_length=DEFAULT_BATCH_LIMIT,
        ),
    ],
    format: Annotated[  # noqa: A002
        Literal["markdown", "html", "text", "json", "raw"],
        Field(
            default="markdown",
            description="响应格式(同 proxy_fetch),默认 markdown",
        ),
    ] = "markdown",
    max_chars: Annotated[
        int,
        Field(
            default=3000,
            ge=100,
            le=20_000,
            description="每个 URL 的最大字符数(批量模式默认 3000,节省 Token)",
        ),
    ] = 3000,
    country_code: Annotated[
        Optional[str],
        Field(
            default=None,
            min_length=2,
            max_length=2,
            description="地理定向(对所有 URL 生效)",
        ),
    ] = None,
) -> str:
    """\
    并发批量抓取多个 URL,一次工具调用获取多个页面内容。
    适合:批量研究、对比多个来源、批量数据采集。
    注意:每个 URL 的内容会被截断(
_fmt function · python · L1476-L1481 (6 LOC)
src/hive_mcp/server.py
def _fmt(n: int) -> str:
    for u in ("B", "KB", "MB", "GB"):
        if n < 1024:
            return f"{n:.1f} {u}"
        n /= 1024
    return f"{n:.1f} TB"
_guard_social function · python · L1484-L1498 (15 LOC)
src/hive_mcp/server.py
def _guard_social(tool_name: str) -> Optional[str]:
    """Layer 3 工具前置检查:social 依赖 + API Key + 同意 + 速率限制"""
    if not _SOCIAL_AVAILABLE:
        return (
            "❌ Layer 3 社交平台工具不可用。\n"
            "请安装依赖: pip install hive-mcp[social]\n"
            "(需要 yt-dlp, instaloader, TikTokApi 等)"
        )
    base_err = _guard_api()
    if base_err:
        return base_err
    ok, rate_err = _rate_limiter.check(tool_name)
    if not ok:
        return rate_err
    return None
youtube_video_info function · python · L1513-L1529 (17 LOC)
src/hive_mcp/server.py
async def youtube_video_info(
    url: Annotated[str, Field(description="YouTube 视频 URL 或 ID")],
    country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2, description="地理定向")] = None,
) -> str:
    """\
    获取 YouTube 视频的完整元数据(标题、播放量、时长、标签等)。
    使用 yt-dlp(免费,无需 API Key)通过 Hive 住宅 IP 访问。
    当用户说"查看这个 YouTube 视频信息"、"分析这个视频"时调用。
    """
    err = _guard_social("youtube_video_info")
    if err:
        return err
    try:
        result = await youtube.get_video_info(url=url, proxy_url=_proxy_url, country_code=country_code)
        return _json_result(result, f"YouTube 视频: {result.get('title', '')[:50]}")
    except Exception as e:
        return f"❌ YouTube 视频信息获取失败: {e}"
youtube_transcript function · python · L1533-L1555 (23 LOC)
src/hive_mcp/server.py
async def youtube_transcript(
    video_id: Annotated[str, Field(description="YouTube 视频 ID 或 URL")],
    lang: Annotated[str, Field(default="en", description="字幕语言代码(如 en, zh-Hans, ja)")] = "en",
    country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
    """\
    获取 YouTube 视频的字幕/字幕文本。
    优先使用 youtube-transcript-api,fallback 到 yt-dlp 自动字幕。
    适合:视频内容总结、翻译、信息提取。
    """
    err = _guard_social("youtube_transcript")
    if err:
        return err
    try:
        segments = await youtube.get_transcript(video_id_or_url=video_id, lang=lang, proxy_url=_proxy_url, country_code=country_code)
        if not segments:
            return "⚠️ 该视频没有可用字幕。"
        # 拼成纯文本返回
        text = "\n".join(s.get("text", "") for s in segments if s.get("text"))
        text, _, _ = truncate(text, DEFAULT_MAX_CHARS)
        return f"✅ YouTube 字幕 ({len(segments)} 段)\n\n{text}"
    except Exception as e:
        return f"❌ YouTube 字幕获取失败: {e}"
All rows above produced by Repobility · https://repobility.com
youtube_channel_videos function · python · L1559-L1575 (17 LOC)
src/hive_mcp/server.py
async def youtube_channel_videos(
    channel: Annotated[str, Field(description="频道 URL、@handle 或 UC 开头的 Channel ID")],
    count: Annotated[int, Field(default=10, ge=1, le=50, description="返回视频数量")] = 10,
    country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
    """\
    获取 YouTube 频道最新视频列表。
    使用 yt-dlp extract_flat 快速提取(不逐个请求详情)。
    """
    err = _guard_social("youtube_channel_videos")
    if err:
        return err
    try:
        vids = await youtube.get_channel_videos(channel_url_or_id=channel, count=count, proxy_url=_proxy_url, country_code=country_code)
        return _json_result(vids, f"频道视频 ({len(vids)} 个)")
    except Exception as e:
        return f"❌ YouTube 频道视频获取失败: {e}"
youtube_search function · python · L1579-L1596 (18 LOC)
src/hive_mcp/server.py
async def youtube_search(
    query: Annotated[str, Field(description="搜索关键词")],
    count: Annotated[int, Field(default=5, ge=1, le=20, description="返回结果数量")] = 5,
    country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
    """\
    在 YouTube 上搜索视频。
    使用 yt-dlp ytsearch(免费,无需 API Key,通过 Hive IP)。
    当用户说"在 YouTube 上搜索xxx"时调用。
    """
    err = _guard_social("youtube_search")
    if err:
        return err
    try:
        results = await youtube.search_videos(query=query, count=count, proxy_url=_proxy_url, country_code=country_code)
        return _json_result(results, f'YouTube 搜索 "{query}" ({len(results)} 结果)')
    except Exception as e:
        return f"❌ YouTube 搜索失败: {e}"
instagram_profile function · python · L1604-L1619 (16 LOC)
src/hive_mcp/server.py
async def instagram_profile(
    username: Annotated[str, Field(description="Instagram 用户名(不含@)")],
    country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
    """\
    获取 Instagram 用户的公开资料(粉丝数、帖子数、简介等)。
    使用 instaloader(免费 MIT),无需登录,通过 Hive 住宅 IP 访问。
    """
    err = _guard_social("instagram_profile")
    if err:
        return err
    try:
        result = await instagram.get_profile(username=username, proxy_url=_proxy_url, country_code=country_code)
        return _json_result(result, f"Instagram @{username}")
    except Exception as e:
        return f"❌ Instagram 资料获取失败: {e}"
instagram_user_posts function · python · L1623-L1639 (17 LOC)
src/hive_mcp/server.py
async def instagram_user_posts(
    username: Annotated[str, Field(description="Instagram 用户名")],
    count: Annotated[int, Field(default=10, ge=1, le=50, description="帖子数量")] = 10,
    country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
    """\
    获取 Instagram 用户最新的帖子列表(公开内容,不含私密帖)。
    包含图片 URL、点赞数、评论数、文案。
    """
    err = _guard_social("instagram_user_posts")
    if err:
        return err
    try:
        posts = await instagram.get_user_posts(username=username, count=count, proxy_url=_proxy_url, country_code=country_code)
        return _json_result(posts, f"@{username} 最新帖子 ({len(posts)} 条)")
    except Exception as e:
        return f"❌ Instagram 帖子获取失败: {e}"
instagram_hashtag_posts function · python · L1643-L1659 (17 LOC)
src/hive_mcp/server.py
async def instagram_hashtag_posts(
    tag: Annotated[str, Field(description="话题标签(不含#)")],
    count: Annotated[int, Field(default=10, ge=1, le=50, description="帖子数量")] = 10,
    country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
    """\
    获取 Instagram 话题标签下的最新帖子。
    适合:趋势分析、热点追踪、竞品研究。
    """
    err = _guard_social("instagram_hashtag_posts")
    if err:
        return err
    try:
        posts = await instagram.get_hashtag_posts(hashtag=tag, count=count, proxy_url=_proxy_url, country_code=country_code)
        return _json_result(posts, f"#{tag} 帖子 ({len(posts)} 条)")
    except Exception as e:
        return f"❌ Instagram 话题帖子获取失败: {e}"
instagram_post_detail function · python · L1663-L1678 (16 LOC)
src/hive_mcp/server.py
async def instagram_post_detail(
    shortcode: Annotated[str, Field(description="帖子 shortcode(URL 中 /p/ 后的字符串)")],
    country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
    """\
    获取 Instagram 单个帖子的详细信息。
    shortcode 示例: CxYz1234AbC(来自 URL instagram.com/p/CxYz1234AbC/)
    """
    err = _guard_social("instagram_post_detail")
    if err:
        return err
    try:
        result = await instagram.get_post_detail(shortcode=shortcode, proxy_url=_proxy_url, country_code=country_code)
        return _json_result(result, f"帖子 {shortcode}")
    except Exception as e:
        return f"❌ Instagram 帖子详情获取失败: {e}"
instagram_reels function · python · L1682-L1698 (17 LOC)
src/hive_mcp/server.py
async def instagram_reels(
    username: Annotated[str, Field(description="Instagram 用户名(不含@)")],
    count: Annotated[int, Field(default=10, ge=1, le=50, description="Reels 数量")] = 10,
    country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
    """\
    获取 Instagram 用户的 Reels 短视频列表。
    Reels 是 Instagram 的主要短视频内容形式。
    """
    err = _guard_social("instagram_reels")
    if err:
        return err
    try:
        reels = await instagram.get_user_reels(username=username, count=count, proxy_url=_proxy_url, country_code=country_code)
        return _json_result(reels, f"@{username} Reels ({len(reels)} 条)")
    except Exception as e:
        return f"❌ Instagram Reels 获取失败: {e}"
_tiktok_guard function · python · L1705-L1723 (19 LOC)
src/hive_mcp/server.py
def _tiktok_guard() -> Optional[str]:
    """TikTok 专属检查:ms_token + 断路器状态"""
    base = _guard_social("tiktok")
    if base:
        return base
    if not _cfg.TIKTOK_MS_TOKEN:
        return (
            "❌ TIKTOK_MS_TOKEN 未配置。\n"
            "获取方式:浏览器访问 tiktok.com → F12 → Application → Cookies → 复制 ms_token 值\n"
            "然后设置环境变量 TIKTOK_MS_TOKEN"
        )
    cb_status = tiktok.get_circuit_breaker_status()
    if cb_status["state"] == "open":
        return (
            f"⚠️ TikTok 断路器已开启({cb_status['failures']} 次连续失败),"
            f"将在 {cb_status['recovery_remaining_s']}s 后自动恢复。\n"
            "原因:TikTok 频繁更改 API,当前版本可能需要更新 TikTokApi。"
        )
    return None
All rows scored by the Repobility analyzer (https://repobility.com)
tiktok_user_info function · python · L1727-L1744 (18 LOC)
src/hive_mcp/server.py
async def tiktok_user_info(
    username: Annotated[str, Field(description="TikTok 用户名")],
    country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
    """\
    获取 TikTok 用户的公开资料(粉丝数、视频数、简介等)。
    ⚠️ TikTok API 不稳定,已内置断路器保护。
    """
    err = _tiktok_guard()
    if err:
        return err
    try:
        result = await tiktok.get_user_info(username=username, proxy_url=_proxy_url, country_code=country_code)
        return _json_result(result, f"TikTok @{username}")
    except RuntimeError as e:
        return f"⚠️ TikTok 断路器: {e}"
    except Exception as e:
        return f"❌ TikTok 用户信息获取失败: {e}"
tiktok_user_videos function · python · L1748-L1763 (16 LOC)
src/hive_mcp/server.py
async def tiktok_user_videos(
    username: Annotated[str, Field(description="TikTok 用户名")],
    count: Annotated[int, Field(default=10, ge=1, le=30, description="视频数量")] = 10,
    country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
    """获取 TikTok 用户最新发布的视频列表。"""
    err = _tiktok_guard()
    if err:
        return err
    try:
        vids = await tiktok.get_user_videos(username=username, count=count, proxy_url=_proxy_url, country_code=country_code)
        return _json_result(vids, f"@{username} 视频 ({len(vids)} 条)")
    except RuntimeError as e:
        return f"⚠️ TikTok 断路器: {e}"
    except Exception as e:
        return f"❌ TikTok 用户视频获取失败: {e}"
tiktok_video_info function · python · L1767-L1781 (15 LOC)
src/hive_mcp/server.py
async def tiktok_video_info(
    video_id: Annotated[str, Field(description="TikTok 视频 ID")],
    country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
    """获取 TikTok 单个视频的详细信息(播放量、点赞、评论数等)。"""
    err = _tiktok_guard()
    if err:
        return err
    try:
        result = await tiktok.get_video_detail(video_id=video_id, proxy_url=_proxy_url, country_code=country_code)
        return _json_result(result, f"TikTok 视频 {video_id}")
    except RuntimeError as e:
        return f"⚠️ TikTok 断路器: {e}"
    except Exception as e:
        return f"❌ TikTok 视频详情获取失败: {e}"
tiktok_video_comments function · python · L1785-L1800 (16 LOC)
src/hive_mcp/server.py
async def tiktok_video_comments(
    video_id: Annotated[str, Field(description="TikTok 视频 ID")],
    count: Annotated[int, Field(default=20, ge=1, le=100, description="评论数量")] = 20,
    country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
    """获取 TikTok 视频的热门评论。"""
    err = _tiktok_guard()
    if err:
        return err
    try:
        comments = await tiktok.get_video_comments(video_id=video_id, count=count, proxy_url=_proxy_url, country_code=country_code)
        return _json_result(comments, f"视频 {video_id} 评论 ({len(comments)} 条)")
    except RuntimeError as e:
        return f"⚠️ TikTok 断路器: {e}"
    except Exception as e:
        return f"❌ TikTok 评论获取失败: {e}"
tiktok_search function · python · L1804-L1819 (16 LOC)
src/hive_mcp/server.py
async def tiktok_search(
    query: Annotated[str, Field(description="搜索关键词")],
    count: Annotated[int, Field(default=10, ge=1, le=30, description="结果数量")] = 10,
    country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
    """在 TikTok 上搜索视频。"""
    err = _tiktok_guard()
    if err:
        return err
    try:
        results = await tiktok.search_videos(query=query, count=count, proxy_url=_proxy_url, country_code=country_code)
        return _json_result(results, f'TikTok 搜索 "{query}" ({len(results)} 结果)')
    except RuntimeError as e:
        return f"⚠️ TikTok 断路器: {e}"
    except Exception as e:
        return f"❌ TikTok 搜索失败: {e}"
tiktok_hashtag_videos function · python · L1823-L1838 (16 LOC)
src/hive_mcp/server.py
async def tiktok_hashtag_videos(
    tag: Annotated[str, Field(description="话题标签(不含#)")],
    count: Annotated[int, Field(default=10, ge=1, le=30, description="视频数量")] = 10,
    country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
    """获取 TikTok 话题标签下的热门视频。"""
    err = _tiktok_guard()
    if err:
        return err
    try:
        vids = await tiktok.get_hashtag_videos(hashtag=tag, count=count, proxy_url=_proxy_url, country_code=country_code)
        return _json_result(vids, f"#{tag} TikTok 视频 ({len(vids)} 条)")
    except RuntimeError as e:
        return f"⚠️ TikTok 断路器: {e}"
    except Exception as e:
        return f"❌ TikTok 话题视频获取失败: {e}"
tiktok_trending function · python · L1842-L1855 (14 LOC)
src/hive_mcp/server.py
async def tiktok_trending(
    country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2, description="地区")] = None,
) -> str:
    """获取 TikTok 当前热门/推荐视频列表。"""
    err = _tiktok_guard()
    if err:
        return err
    try:
        vids = await tiktok.get_trending_videos(proxy_url=_proxy_url, country_code=country_code)
        return _json_result(vids, f"TikTok 热门 ({len(vids)} 条)")
    except RuntimeError as e:
        return f"⚠️ TikTok 断路器: {e}"
    except Exception as e:
        return f"❌ TikTok 热门视频获取失败: {e}"
weibo_trending function · python · L1863-L1880 (18 LOC)
src/hive_mcp/server.py
async def weibo_trending(
    count: Annotated[int, Field(default=20, ge=1, le=50, description="热搜条数")] = 20,
    country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
    """\
    获取微博实时热搜榜。
    返回热搜话题、热度值、链接。适合中文社交媒体趋势分析。
    """
    err = _guard_social("weibo_trending")
    if err:
        return err
    try:
        items = await trends.get_weibo_trending(count=count, proxy_url=_proxy_url, country_code=country_code)
        if not items:
            return "⚠️ 微博热搜暂时无法获取(可能需要 Hive 住宅 IP)。"
        return _json_result(items, f"微博热搜 ({len(items)} 条)")
    except Exception as e:
        return f"❌ 微博热搜获取失败: {e}"
About: code-quality intelligence by Repobility · https://repobility.com
zhihu_trending function · python · L1884-L1901 (18 LOC)
src/hive_mcp/server.py
async def zhihu_trending(
    count: Annotated[int, Field(default=20, ge=1, le=50, description="热榜条数")] = 20,
    country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
    """\
    获取知乎实时热榜。
    注意:知乎 API 需要身份验证,在生产环境通过 Hive 住宅 IP + Cookies 访问。
    """
    err = _guard_social("zhihu_trending")
    if err:
        return err
    try:
        items = await trends.get_zhihu_trending(count=count, proxy_url=_proxy_url, country_code=country_code)
        if not items:
            return "⚠️ 知乎热榜需要通过 Hive 住宅 IP + 身份认证访问,当前环境暂不可用。"
        return _json_result(items, f"知乎热榜 ({len(items)} 条)")
    except Exception as e:
        return f"❌ 知乎热榜获取失败: {e}"
bilibili_ranking function · python · L1905-L1922 (18 LOC)
src/hive_mcp/server.py
async def bilibili_ranking(
    count: Annotated[int, Field(default=20, ge=1, le=100, description="排行条数")] = 20,
    country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
    """\
    获取哔哩哔哩(B站)全站热门视频排行榜。
    返回标题、播放量、点赞数、UP主、链接。
    """
    err = _guard_social("bilibili_ranking")
    if err:
        return err
    try:
        items = await trends.get_bilibili_rank(count=count, proxy_url=_proxy_url, country_code=country_code)
        if not items:
            return "⚠️ B站排行数据暂时不可用(可能需要 Hive 住宅 IP 绕过风控)。"
        return _json_result(items, f"B站热门排行 ({len(items)} 条)")
    except Exception as e:
        return f"❌ B站排行获取失败: {e}"
twitter_user_info function · python · L1930-L1945 (16 LOC)
src/hive_mcp/server.py
async def twitter_user_info(
    username: Annotated[str, Field(description="Twitter/X 用户名(不含@)")],
    country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
    """\
    获取 Twitter/X 用户公开资料。
    使用 yt-dlp 提取,免费无需 Twitter API Key。
    """
    err = _guard_social("twitter_user_info")
    if err:
        return err
    try:
        result = await twitter.get_user_info(username=username, proxy_url=_proxy_url, country_code=country_code)
        return _json_result(result, f"Twitter @{username}")
    except Exception as e:
        return f"❌ Twitter 用户信息获取失败: {e}"
‹ prevpage 3 / 11next ›