Function bodies 529 total
tt_search function · python · L239-L247 (9 LOC)src/hive_mcp/rest_api.py
async def tt_search(
q: str = Query(...),
count: int = Query(20, ge=1, le=50),
):
from hive_mcp.platforms.tiktok import search_videos
try:
return await search_videos(q, count=count, proxy_url=_proxy_url)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))tt_hashtag function · python · L251-L259 (9 LOC)src/hive_mcp/rest_api.py
async def tt_hashtag(
tag: str = Query(...),
count: int = Query(20, ge=1, le=50),
):
from hive_mcp.platforms.tiktok import get_hashtag_videos
try:
return await get_hashtag_videos(tag, count=count, proxy_url=_proxy_url)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))tt_trending function · python · L263-L270 (8 LOC)src/hive_mcp/rest_api.py
async def tt_trending(
count: int = Query(20, ge=1, le=50),
):
from hive_mcp.platforms.tiktok import get_trending_videos
try:
return await get_trending_videos(count=count, proxy_url=_proxy_url)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))weibo function · python · L276-L281 (6 LOC)src/hive_mcp/rest_api.py
async def weibo(count: int = Query(50, ge=1, le=100)):
from hive_mcp.platforms.trends import get_weibo_trending
try:
return await get_weibo_trending(count, proxy_url=_proxy_url)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))zhihu function · python · L285-L290 (6 LOC)src/hive_mcp/rest_api.py
async def zhihu(count: int = Query(50, ge=1, le=100)):
from hive_mcp.platforms.trends import get_zhihu_trending
try:
return await get_zhihu_trending(count, proxy_url=_proxy_url)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))bilibili function · python · L294-L299 (6 LOC)src/hive_mcp/rest_api.py
async def bilibili(count: int = Query(50, ge=1, le=100)):
from hive_mcp.platforms.trends import get_bilibili_rank
try:
return await get_bilibili_rank(count, proxy_url=_proxy_url)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))main function · python · L304-L310 (7 LOC)src/hive_mcp/rest_api.py
def main():
"""CLI 入口: hive-mcp-api"""
import uvicorn
host = os.getenv("HOST", "0.0.0.0")
port = int(os.getenv("PORT", "8000"))
log_level = os.getenv("LOG_LEVEL", "info").lower()
uvicorn.run("hive_mcp.rest_api:app", host=host, port=port, log_level=log_level)All rows scored by the Repobility analyzer (https://repobility.com)
_get_http_client function · python · L225-L236 (12 LOC)src/hive_mcp/server.py
async def _get_http_client() -> httpx.AsyncClient:
"""获取或创建共享 HTTP 客户端(带连接池)。"""
global _http_client
if _http_client is None or _http_client.is_closed:
async with _http_client_lock:
if _http_client is None or _http_client.is_closed:
_http_client = httpx.AsyncClient(
timeout=PROXY_TIMEOUT_MS / 1000 + 10,
trust_env=False,
limits=httpx.Limits(max_connections=20, max_keepalive_connections=10),
)
return _http_client_RateLimiter.__init__ method · python · L251-L272 (22 LOC)src/hive_mcp/server.py
def __init__(self) -> None:
raw = os.getenv("HIVE_RATE_LIMIT", "")
self._enabled = False
self._max_calls = 100
self._period = timedelta(hours=1)
if raw:
try:
parts = raw.split("/")
self._max_calls = int(parts[0])
unit = parts[1].strip().lower() if len(parts) > 1 else "1h"
if unit.endswith("h"):
self._period = timedelta(hours=int(unit[:-1] or 1))
elif unit.endswith("m"):
self._period = timedelta(minutes=int(unit[:-1] or 1))
else:
self._period = timedelta(seconds=int(unit))
self._enabled = True
logger.info("速率限制: %d 次 / %s", self._max_calls, self._period)
except Exception as exc:
logger.warning("HIVE_RATE_LIMIT 格式错误: %s,已禁用限制", exc)
self._calls: dict[str, list[float]] = defaultdict(list)_RateLimiter.check method · python · L274-L289 (16 LOC)src/hive_mcp/server.py
def check(self, tool: str = "default") -> tuple[bool, str]:
"""返回 (is_allowed, error_message)。使用单调时钟避免系统时间调整漏洞。"""
if not self._enabled:
return True, ""
import time as _time
now = _time.monotonic()
period_sec = self._period.total_seconds()
self._calls[tool] = [t for t in self._calls[tool] if now - t < period_sec]
if len(self._calls[tool]) >= self._max_calls:
wait_sec = max(0, int(self._calls[tool][0] + period_sec - now))
return False, (
f"⏳ 速率限制:{tool} 已达上限 {self._max_calls} 次/{self._period}。"
f"请等待约 {wait_sec} 秒后重试。"
)
self._calls[tool].append(now)
return True, ""_proxy_request_inner function · python · L299-L359 (61 LOC)src/hive_mcp/server.py
async def _proxy_request_inner(
url: str,
country_code: Optional[str] = None,
timeout_ms: int = PROXY_TIMEOUT_MS,
mobile: bool = False,
no_cache: bool = False,
) -> dict:
"""向 Director 发起单次代理请求(内部,不含重试逻辑)。"""
payload: dict = {
"target_url": url,
"method": "GET",
"options": {"timeout_ms": timeout_ms},
}
if country_code:
payload["options"]["country"] = country_code.lower()
# UA 轮换 — 对标 Crawl4AI/Crawlee 最佳实践
headers = {"Authorization": f"Bearer {API_KEY}"}
headers["User-Agent"] = random_ua(mobile=mobile)
if no_cache:
headers["Cache-Control"] = "no-cache, no-store"
# ── Cookie 注入 (Phase 2.2) ──────────────────────────────
cookie_header = get_cookie_header_for_url(url)
if cookie_header:
headers["Cookie"] = cookie_header
payload["headers"] = {k: v for k, v in headers.items() if k != "Authorization"}
client = await _get_http_client()
r = await client.post(
f"_proxy_request function · python · L362-L435 (74 LOC)src/hive_mcp/server.py
async def _proxy_request(
url: str,
country_code: Optional[str] = None,
timeout_ms: int = PROXY_TIMEOUT_MS,
mobile: bool = False,
no_cache: bool = False,
) -> dict:
"""
向 Director 发起代理请求 — 带 HTTP 缓存 + 断路器 + 重试 + 指数退避 + jitter。
对标 Scrapy HttpCacheMiddleware + RetryMiddleware + Resilience4j 断路器。
Phase 1.1: 同一 URL 的重复请求(scrape → extract → highlights)
直接从内存返回,延迟从秒级降到毫秒级。
"""
_req_start = time.monotonic()
# ── HTTP 缓存查询 (Phase 1.1) ──────────────────────────────
if not no_cache:
cached = await http_cache.get(url)
if cached is not None:
logger.info("HTTP 缓存命中: %s", url[:80])
metrics.record_cache(hit=True)
return cached
metrics.record_cache(hit=False)
# 断路器检查
if not director_breaker.is_allowed():
return {
"error": f"断路器已断开 (连续失败过多),请等待 {director_breaker.recovery_timeout}s 后重试",
"_http_error": 503,
}
# 请求间隔 jitter — 对标 Scrap_format_body function · python · L438-L477 (40 LOC)src/hive_mcp/server.py
def _format_body(
raw_body: str,
fmt: str,
url: str = "",
) -> str:
"""根据 format 参数转换响应体。"""
if fmt == "markdown":
return html_to_markdown(raw_body, base_url=url)
elif fmt == "text":
return html_to_text(raw_body)
elif fmt == "json":
return format_as_json(raw_body)
elif fmt == "links":
# Extract links from HTML
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(raw_body, 'html.parser')
links = []
for a_tag in soup.find_all('a', href=True):
links.append({
"text": a_tag.get_text(strip=True),
"url": a_tag['href']
})
return json.dumps(links, ensure_ascii=False, indent=2)
except ImportError:
# Fallback to regex if BeautifulSoup not available
import re
links = []
for match in re.finditer(r'<a[^>]+href=["\']([^"\']+)["\'][^>]*>([^<]+_guard_api function · python · L480-L494 (15 LOC)src/hive_mcp/server.py
def _guard_api() -> Optional[str]:
"""检查 API_KEY 与同意状态,返回错误信息或 None。"""
if not API_KEY:
return (
"❌ HIVE_API_KEY 未配置。\n"
"请到 https://proxy.clawbro.com 注册获取 API Key,"
"然后在环境变量中设置 HIVE_API_KEY 并重启服务。"
)
if not is_consented():
return (
"❌ 尚未同意蜂巢网络协议。\n"
"请先调用 hive_confirm_consent() 同意协议并激活节点。\n"
"蜂巢网络要求每位用户贡献闲置带宽以换取免费代理服务。"
)
return Nonelifespan function · python · L502-L594 (93 LOC)src/hive_mcp/server.py
async def lifespan(server: FastMCP):
global _proxy_url
logger.info("═══ Hive MCP v3.0 启动 (node=%s) ═══", node_state.node_id)
# 初始化共享代理 URL — 所有 Layer 2-3 工具统一走本地代理
_proxy_url = f"http://127.0.0.1:{_local_proxy.port}"
logger.info("统一代理 URL: %s", _proxy_url)
logger.info(
"Layer 3 状态: social=%s TikTok token=%s YouTube key=%s",
_SOCIAL_AVAILABLE,
bool(_cfg.TIKTOK_MS_TOKEN),
bool(_cfg.YOUTUBE_API_KEY),
)
# 打印依赖状态
deps = dependency_status()
logger.info(
"依赖状态 — html2text=%s beautifulsoup4=%s",
deps["html2text"], deps["beautifulsoup4"],
)
if not deps["html2text"]:
logger.warning("⚠️ html2text 未安装,scrape_as_markdown 将使用降级模式")
if not deps["beautifulsoup4"]:
logger.warning("⚠️ beautifulsoup4 未安装,search_engine 解析功能不可用")
if API_KEY:
await _local_proxy.start()
if not _local_proxy.is_running:
_proxy_url = ""
logger.error("⚠️ 本地代理启动失败,LaAbout: code-quality intelligence by Repobility · https://repobility.com
health_check function · python · L612-L663 (52 LOC)src/hive_mcp/server.py
async def health_check() -> str:
"""\
检查 Hive MCP 各组件运行状态。
返回: 节点状态、代理、浏览器、断路器、缓存等健康信息。
"""
import time as _time
from hive_mcp.tools.search import _cache
health = {
"status": "healthy",
"node_id": node_state.node_id,
"node_status": node_state.status,
"proxy_url": _proxy_url or "未启动",
"api_key_configured": bool(API_KEY),
"consented": is_consented(),
"components": {
"local_proxy": _local_proxy.is_running if hasattr(_local_proxy, 'is_running') else "unknown",
"http_client": _http_client is not None and not _http_client.is_closed if _http_client else False,
"browser_available": _RENDER_AVAILABLE,
"pdf_available": _PDF_AVAILABLE,
"crawl_available": _CRAWL_AVAILABLE,
"social_available": _SOCIAL_AVAILABLE,
"extra_platforms": _EXTRA_PLATFORMS,
},
"resilience": {
"director_breaker": director_breaker.shive_guide function · python · L671-L713 (43 LOC)src/hive_mcp/server.py
def hive_guide() -> str:
"""蜂巢网络使用引导 — AI 客户端启动时可读取此提示词了解服务能力"""
status = node_state.status
proxy_addr = f"http://127.0.0.1:{_local_proxy.port}"
if status == "no_api_key":
return (
"🐝 蜂巢网络 MCP 服务已加载,但 API Key 未配置。\n"
"请让用户到 https://proxy.clawbro.com 注册获取 API Key,\n"
"然后重新运行安装命令。"
)
if status == "waiting_consent":
return (
"🐝 蜂巢网络 MCP 服务已加载,节点尚未激活。\n\n"
"请告知用户调用 hive_confirm_consent() 即可激活节点并开始使用所有功能。"
)
deps = dependency_status()
dep_note = ""
if not deps["html2text"] or not deps["beautifulsoup4"]:
missing = [k for k, v in deps.items() if not v and k in ("html2text", "beautifulsoup4")]
dep_note = f"\n⚠️ 建议安装:pip install {' '.join(missing)}(启用完整内容解析)\n"
return (
"🐝 蜂巢网络 MCP v2.1 运行中。\n\n"
f"节点状态: {status}\n"
f"本地代理: {proxy_addr}(已启动)\n"
f"{dep_note}\n"
"可用工具:\n"
"• proxy_fetch(url, format, max_hive_usage_guide function · python · L717-L757 (41 LOC)src/hive_mcp/server.py
def hive_usage_guide() -> str:
"""蜂巢网络完整使用指南"""
return """\
# 🐝 蜂巢网络 (Hive Network) v2.1 使用指南
## 这是什么?
蜂巢网络是一个去中心化 IP 代理服务。安装后你的设备同时作为:
- **代理消费者**:免费使用网络中其他节点的 IP 访问互联网
- **带宽贡献者**:用闲置带宽帮助其他用户,贡献越多积分越多
## 工具列表
| 工具 | 功能 | 典型场景 |
|------|------|---------|
| `proxy_fetch` | 代理访问 URL,支持多种格式 | 访问被封锁的内容 |
| `scrape_as_markdown` | 网页 → 干净 Markdown | 提取文章/文档内容 |
| `search_engine` | Web 搜索(DDG/Bing/Google)| 实时信息检索 |
| `batch_proxy_fetch` | 并发批量抓取多 URL | 批量数据采集 |
| `get_hive_status` | 节点状态 & 流量统计 | 监控节点运行 |
| `stop_hive_node` | 停止贡献带宽 | 节省资源时使用 |
## proxy_fetch 格式参数
- `format="markdown"` (默认) — 返回干净 Markdown,AI 最易处理
- `format="text"` — 纯文本,无格式
- `format="html"` — 清洗后的 HTML
- `format="json"` — 尝试解析为 JSON
- `format="raw"` — 原始响应,不做处理
## 方式 2:本地 HTTP 代理
```bash
curl -x http://127.0.0.1:18983 https://httpbin.org/ip
export http_proxy=http://127.0.0.1:18983
export https_proxy=http://127.0.0.1:18983
```
## 日志位置
`~/.hive/hive-mcp.log`
## 控制台
https://proxy.clawbro.com
"""hive_confirm_consent function · python · L772-L820 (49 LOC)src/hive_mcp/server.py
async def hive_confirm_consent() -> str:
"""\
【首次使用必调】同意蜂巢网络协议并启动节点。
调用后:1) 节点上线,开始贡献带宽 2) 本地代理 127.0.0.1:18983 可用
当用户首次使用蜂巢网络、或状态显示 waiting_consent 时,应主动调用此工具。
重要提示:如果用户第一次提到蜂巢网络/Hive Network,且尚未调用过此工具,
请先询问用户是否同意加入蜂巢网络(贡献闲置带宽以换取免费 IP 代理),
用户同意后立即调用此工具。
"""
if not API_KEY:
return (
"❌ HIVE_API_KEY 未配置。\n"
"请到 https://proxy.clawbro.com 注册并获取 API Key,\n"
"然后在 MCP 服务环境变量中设置 HIVE_API_KEY 并重启。"
)
if is_consented():
return f"✅ 您已同意过协议。当前节点状态: {node_state.status}"
save_consent(True)
logger.info("用户同意协议")
async with _tunnel_lock:
if node_state.tunnel_task is None or node_state.tunnel_task.done():
node_state.stop_requested = False
node_state.tunnel_task = asyncio.create_task(run_tunnel())
for _ in range(16):
await asyncio.sleep(0.5)
if node_state.status == "online":
break
proxy_iget_hive_status function · python · L835-L873 (39 LOC)src/hive_mcp/server.py
async def get_hive_status() -> str:
"""\
查询蜂巢节点当前运行状态、流量统计和心跳信息。
当用户问"蜂巢网络状态"、"节点状态"、"代理状态"时调用此工具。
如果返回 waiting_consent,请提示用户需要先调用 hive_confirm_consent()。
"""
if node_state.status == "no_api_key":
return "❌ HIVE_API_KEY 未配置。请在环境变量中设置后重启。"
if node_state.status == "waiting_consent":
return CONSENT_TEXT + "\n请先调用 hive_confirm_consent() 同意协议并启动节点。"
s = node_state.to_dict()
proxy_status = (
f"http://{_local_proxy.host}:{_local_proxy.port}"
if _local_proxy.is_running
else "未启动"
)
deps = dependency_status()
return "\n".join([
"🐝 蜂巢节点状态 (v2.1)",
"─" * 42,
f"节点 ID : {s['node_id']}",
f"状态 : {s['status']}",
f"Director : {DIRECTOR_URL}",
f"Relay 地址: {s['relay_url'] or '未获取'}",
f"本地代理 : {proxy_status}",
f"连接时间 : {s['connected_at'] or '未连接'}",
f"最后心跳 : {s['last_heartbeat_at'] or '无'}",
f"重连次数 : {s['reconnect_count']}",
stop_hive_node function · python · L888-L922 (35 LOC)src/hive_mcp/server.py
async def stop_hive_node() -> str:
"""停止蜂巢节点,断开 Relay 隧道连接。本地代理仍可继续使用。"""
if node_state.status == "stopped":
return "⚠️ 节点已经停止。如需重新激活,请调用 hive_confirm_consent()。"
node_state.stop_requested = True
try:
async with httpx.AsyncClient(timeout=5, trust_env=False) as client:
await client.delete(
f"{DIRECTOR_URL}/api/v1/nodes/{node_state.node_id}",
headers={"Authorization": f"Bearer {API_KEY}"},
)
except Exception:
pass
if node_state.tunnel_task and not node_state.tunnel_task.done():
node_state.tunnel_task.cancel()
try:
await asyncio.wait_for(node_state.tunnel_task, timeout=5)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
node_state.status = "stopped"
proxy_note = ""
if _local_proxy.is_running:
proxy_note = f"\n本地代理仍在运行: http://{_local_proxy.host}:{_local_proxy.port}"
return (
f"✅ 节点已停止。\n"
hive_available_countries function · python · L932-L951 (20 LOC)src/hive_mcp/server.py
async def hive_available_countries() -> str:
"""\
查询蜂巢网络中当前可用的代理国家列表。
使用 country_code 参数前,先调用此工具确认该国家有在线节点。
"""
if not API_KEY:
return "❌ HIVE_API_KEY 未配置。"
try:
async with httpx.AsyncClient(timeout=10, trust_env=False) as client:
r = await client.get(
f"{DIRECTOR_URL}/api/v1/nodes/countries",
headers={"Authorization": f"Bearer {API_KEY}"},
)
if r.is_success:
data = r.json()
return _json_result(data, "可用代理国家")
else:
return f"⚠️ 查询失败 (HTTP {r.status_code})。Director 可能尚不支持此端点。"
except Exception as e:
return f"❌ 查询可用国家失败: {e}"proxy_fetch function · python · L966-L1094 (129 LOC)src/hive_mcp/server.py
async def proxy_fetch(
url: Annotated[
str,
Field(description="目标 URL,需包含 http:// 或 https://"),
],
format: Annotated[ # noqa: A002
Literal["markdown", "html", "text", "json", "raw", "links", "summary"],
Field(
default="markdown",
description=(
"响应格式:\n"
"• markdown (默认) — 干净 Markdown,AI 最易处理\n"
"• text — 纯文本\n"
"• html — 清洗后的 HTML\n"
"• json — 尝试解析为 JSON\n"
"• links — 提取所有链接为 JSON 列表\n"
"• summary — 前 500 字符的 Markdown 摘要\n"
"• raw — 原始响应,不做处理"
),
),
] = "markdown",
max_chars: Annotated[
int,
Field(
default=DEFAULT_MAX_CHARS,
ge=100,
le=100_000,
description=f"最大返回字符数(默认 {DEFAULT_MAX_CHARS},原来硬限 2000 已移除)",
),
] = DEFAULT_MAX_CHARS,
country_code: Annotated[
Optional[str]Repobility (the analyzer behind this table) · https://repobility.com
scrape_as_markdown function · python · L1109-L1212 (104 LOC)src/hive_mcp/server.py
async def scrape_as_markdown(
url: Annotated[
str,
Field(description="要抓取的目标网页 URL"),
],
max_chars: Annotated[
int,
Field(
default=DEFAULT_MAX_CHARS,
ge=100,
le=100_000,
description="最大返回字符数,默认 8000",
),
] = DEFAULT_MAX_CHARS,
country_code: Annotated[
Optional[str],
Field(
default=None,
min_length=2,
max_length=2,
description="地理定向:ISO 两字母国家代码(可选)",
),
] = None,
mobile: Annotated[
bool,
Field(
default=False,
description="使用移动设备 User-Agent (iPhone 17),适合检测移动端渲染差异",
),
] = False,
no_cache: Annotated[
bool,
Field(
default=False,
description="禁用缓存:添加 Cache-Control: no-cache, no-store 头,强制获取最新内容",
),
] = False,
start_index: Annotated[
int,
Field(
default=0,
search_engine function · python · L1227-L1323 (97 LOC)src/hive_mcp/server.py
async def search_engine(
query: Annotated[
str,
Field(description="搜索关键词或问题"),
],
engine: Annotated[
Literal["duckduckgo", "bing", "google"],
Field(
default="duckduckgo",
description=(
"搜索引擎:\n"
"• duckduckgo (默认) — 无需 API Key,隐私友好\n"
"• bing — 微软 Bing\n"
"• google — Google 搜索"
),
),
] = "duckduckgo",
num_results: Annotated[
int,
Field(default=10, ge=1, le=20, description="返回结果数量(1-20,默认 10)"),
] = 10,
country_code: Annotated[
Optional[str],
Field(
default=None,
min_length=2,
max_length=2,
description="搜索地区,如 'cn'/'us'/'jp'(影响搜索结果语言和地区)",
),
] = None,
freshness: Annotated[
Optional[Literal["d", "w", "m", "y"]],
Field(
default=None,
description="时间过滤: d=一天内, w=一周内,clear_search_cache function · python · L1338-L1345 (8 LOC)src/hive_mcp/server.py
def clear_search_cache() -> str:
"""\
清除搜索缓存,强制后续搜索重新获取最新结果。
适合:需要获取最新搜索结果、或缓存占用过多内存时。
"""
from hive_mcp.tools.search import clear_cache
clear_cache()
return "✅ 搜索缓存已清除。后续搜索将获取最新结果。"batch_proxy_fetch function · python · L1360-L1469 (110 LOC)src/hive_mcp/server.py
async def batch_proxy_fetch(
urls: Annotated[
list[str],
Field(
description=f"要抓取的 URL 列表(最多 {DEFAULT_BATCH_LIMIT} 个)",
min_length=1,
max_length=DEFAULT_BATCH_LIMIT,
),
],
format: Annotated[ # noqa: A002
Literal["markdown", "html", "text", "json", "raw"],
Field(
default="markdown",
description="响应格式(同 proxy_fetch),默认 markdown",
),
] = "markdown",
max_chars: Annotated[
int,
Field(
default=3000,
ge=100,
le=20_000,
description="每个 URL 的最大字符数(批量模式默认 3000,节省 Token)",
),
] = 3000,
country_code: Annotated[
Optional[str],
Field(
default=None,
min_length=2,
max_length=2,
description="地理定向(对所有 URL 生效)",
),
] = None,
) -> str:
"""\
并发批量抓取多个 URL,一次工具调用获取多个页面内容。
适合:批量研究、对比多个来源、批量数据采集。
注意:每个 URL 的内容会被截断(_fmt function · python · L1476-L1481 (6 LOC)src/hive_mcp/server.py
def _fmt(n: int) -> str:
for u in ("B", "KB", "MB", "GB"):
if n < 1024:
return f"{n:.1f} {u}"
n /= 1024
return f"{n:.1f} TB"_guard_social function · python · L1484-L1498 (15 LOC)src/hive_mcp/server.py
def _guard_social(tool_name: str) -> Optional[str]:
"""Layer 3 工具前置检查:social 依赖 + API Key + 同意 + 速率限制"""
if not _SOCIAL_AVAILABLE:
return (
"❌ Layer 3 社交平台工具不可用。\n"
"请安装依赖: pip install hive-mcp[social]\n"
"(需要 yt-dlp, instaloader, TikTokApi 等)"
)
base_err = _guard_api()
if base_err:
return base_err
ok, rate_err = _rate_limiter.check(tool_name)
if not ok:
return rate_err
return Noneyoutube_video_info function · python · L1513-L1529 (17 LOC)src/hive_mcp/server.py
async def youtube_video_info(
url: Annotated[str, Field(description="YouTube 视频 URL 或 ID")],
country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2, description="地理定向")] = None,
) -> str:
"""\
获取 YouTube 视频的完整元数据(标题、播放量、时长、标签等)。
使用 yt-dlp(免费,无需 API Key)通过 Hive 住宅 IP 访问。
当用户说"查看这个 YouTube 视频信息"、"分析这个视频"时调用。
"""
err = _guard_social("youtube_video_info")
if err:
return err
try:
result = await youtube.get_video_info(url=url, proxy_url=_proxy_url, country_code=country_code)
return _json_result(result, f"YouTube 视频: {result.get('title', '')[:50]}")
except Exception as e:
return f"❌ YouTube 视频信息获取失败: {e}"youtube_transcript function · python · L1533-L1555 (23 LOC)src/hive_mcp/server.py
async def youtube_transcript(
video_id: Annotated[str, Field(description="YouTube 视频 ID 或 URL")],
lang: Annotated[str, Field(default="en", description="字幕语言代码(如 en, zh-Hans, ja)")] = "en",
country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
"""\
获取 YouTube 视频的字幕/字幕文本。
优先使用 youtube-transcript-api,fallback 到 yt-dlp 自动字幕。
适合:视频内容总结、翻译、信息提取。
"""
err = _guard_social("youtube_transcript")
if err:
return err
try:
segments = await youtube.get_transcript(video_id_or_url=video_id, lang=lang, proxy_url=_proxy_url, country_code=country_code)
if not segments:
return "⚠️ 该视频没有可用字幕。"
# 拼成纯文本返回
text = "\n".join(s.get("text", "") for s in segments if s.get("text"))
text, _, _ = truncate(text, DEFAULT_MAX_CHARS)
return f"✅ YouTube 字幕 ({len(segments)} 段)\n\n{text}"
except Exception as e:
return f"❌ YouTube 字幕获取失败: {e}"All rows above produced by Repobility · https://repobility.com
youtube_channel_videos function · python · L1559-L1575 (17 LOC)src/hive_mcp/server.py
async def youtube_channel_videos(
channel: Annotated[str, Field(description="频道 URL、@handle 或 UC 开头的 Channel ID")],
count: Annotated[int, Field(default=10, ge=1, le=50, description="返回视频数量")] = 10,
country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
"""\
获取 YouTube 频道最新视频列表。
使用 yt-dlp extract_flat 快速提取(不逐个请求详情)。
"""
err = _guard_social("youtube_channel_videos")
if err:
return err
try:
vids = await youtube.get_channel_videos(channel_url_or_id=channel, count=count, proxy_url=_proxy_url, country_code=country_code)
return _json_result(vids, f"频道视频 ({len(vids)} 个)")
except Exception as e:
return f"❌ YouTube 频道视频获取失败: {e}"youtube_search function · python · L1579-L1596 (18 LOC)src/hive_mcp/server.py
async def youtube_search(
query: Annotated[str, Field(description="搜索关键词")],
count: Annotated[int, Field(default=5, ge=1, le=20, description="返回结果数量")] = 5,
country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
"""\
在 YouTube 上搜索视频。
使用 yt-dlp ytsearch(免费,无需 API Key,通过 Hive IP)。
当用户说"在 YouTube 上搜索xxx"时调用。
"""
err = _guard_social("youtube_search")
if err:
return err
try:
results = await youtube.search_videos(query=query, count=count, proxy_url=_proxy_url, country_code=country_code)
return _json_result(results, f'YouTube 搜索 "{query}" ({len(results)} 结果)')
except Exception as e:
return f"❌ YouTube 搜索失败: {e}"instagram_profile function · python · L1604-L1619 (16 LOC)src/hive_mcp/server.py
async def instagram_profile(
username: Annotated[str, Field(description="Instagram 用户名(不含@)")],
country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
"""\
获取 Instagram 用户的公开资料(粉丝数、帖子数、简介等)。
使用 instaloader(免费 MIT),无需登录,通过 Hive 住宅 IP 访问。
"""
err = _guard_social("instagram_profile")
if err:
return err
try:
result = await instagram.get_profile(username=username, proxy_url=_proxy_url, country_code=country_code)
return _json_result(result, f"Instagram @{username}")
except Exception as e:
return f"❌ Instagram 资料获取失败: {e}"instagram_user_posts function · python · L1623-L1639 (17 LOC)src/hive_mcp/server.py
async def instagram_user_posts(
username: Annotated[str, Field(description="Instagram 用户名")],
count: Annotated[int, Field(default=10, ge=1, le=50, description="帖子数量")] = 10,
country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
"""\
获取 Instagram 用户最新的帖子列表(公开内容,不含私密帖)。
包含图片 URL、点赞数、评论数、文案。
"""
err = _guard_social("instagram_user_posts")
if err:
return err
try:
posts = await instagram.get_user_posts(username=username, count=count, proxy_url=_proxy_url, country_code=country_code)
return _json_result(posts, f"@{username} 最新帖子 ({len(posts)} 条)")
except Exception as e:
return f"❌ Instagram 帖子获取失败: {e}"instagram_hashtag_posts function · python · L1643-L1659 (17 LOC)src/hive_mcp/server.py
async def instagram_hashtag_posts(
tag: Annotated[str, Field(description="话题标签(不含#)")],
count: Annotated[int, Field(default=10, ge=1, le=50, description="帖子数量")] = 10,
country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
"""\
获取 Instagram 话题标签下的最新帖子。
适合:趋势分析、热点追踪、竞品研究。
"""
err = _guard_social("instagram_hashtag_posts")
if err:
return err
try:
posts = await instagram.get_hashtag_posts(hashtag=tag, count=count, proxy_url=_proxy_url, country_code=country_code)
return _json_result(posts, f"#{tag} 帖子 ({len(posts)} 条)")
except Exception as e:
return f"❌ Instagram 话题帖子获取失败: {e}"instagram_post_detail function · python · L1663-L1678 (16 LOC)src/hive_mcp/server.py
async def instagram_post_detail(
shortcode: Annotated[str, Field(description="帖子 shortcode(URL 中 /p/ 后的字符串)")],
country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
"""\
获取 Instagram 单个帖子的详细信息。
shortcode 示例: CxYz1234AbC(来自 URL instagram.com/p/CxYz1234AbC/)
"""
err = _guard_social("instagram_post_detail")
if err:
return err
try:
result = await instagram.get_post_detail(shortcode=shortcode, proxy_url=_proxy_url, country_code=country_code)
return _json_result(result, f"帖子 {shortcode}")
except Exception as e:
return f"❌ Instagram 帖子详情获取失败: {e}"instagram_reels function · python · L1682-L1698 (17 LOC)src/hive_mcp/server.py
async def instagram_reels(
username: Annotated[str, Field(description="Instagram 用户名(不含@)")],
count: Annotated[int, Field(default=10, ge=1, le=50, description="Reels 数量")] = 10,
country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
"""\
获取 Instagram 用户的 Reels 短视频列表。
Reels 是 Instagram 的主要短视频内容形式。
"""
err = _guard_social("instagram_reels")
if err:
return err
try:
reels = await instagram.get_user_reels(username=username, count=count, proxy_url=_proxy_url, country_code=country_code)
return _json_result(reels, f"@{username} Reels ({len(reels)} 条)")
except Exception as e:
return f"❌ Instagram Reels 获取失败: {e}"_tiktok_guard function · python · L1705-L1723 (19 LOC)src/hive_mcp/server.py
def _tiktok_guard() -> Optional[str]:
"""TikTok 专属检查:ms_token + 断路器状态"""
base = _guard_social("tiktok")
if base:
return base
if not _cfg.TIKTOK_MS_TOKEN:
return (
"❌ TIKTOK_MS_TOKEN 未配置。\n"
"获取方式:浏览器访问 tiktok.com → F12 → Application → Cookies → 复制 ms_token 值\n"
"然后设置环境变量 TIKTOK_MS_TOKEN"
)
cb_status = tiktok.get_circuit_breaker_status()
if cb_status["state"] == "open":
return (
f"⚠️ TikTok 断路器已开启({cb_status['failures']} 次连续失败),"
f"将在 {cb_status['recovery_remaining_s']}s 后自动恢复。\n"
"原因:TikTok 频繁更改 API,当前版本可能需要更新 TikTokApi。"
)
return NoneAll rows scored by the Repobility analyzer (https://repobility.com)
tiktok_user_info function · python · L1727-L1744 (18 LOC)src/hive_mcp/server.py
async def tiktok_user_info(
username: Annotated[str, Field(description="TikTok 用户名")],
country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
"""\
获取 TikTok 用户的公开资料(粉丝数、视频数、简介等)。
⚠️ TikTok API 不稳定,已内置断路器保护。
"""
err = _tiktok_guard()
if err:
return err
try:
result = await tiktok.get_user_info(username=username, proxy_url=_proxy_url, country_code=country_code)
return _json_result(result, f"TikTok @{username}")
except RuntimeError as e:
return f"⚠️ TikTok 断路器: {e}"
except Exception as e:
return f"❌ TikTok 用户信息获取失败: {e}"tiktok_user_videos function · python · L1748-L1763 (16 LOC)src/hive_mcp/server.py
async def tiktok_user_videos(
username: Annotated[str, Field(description="TikTok 用户名")],
count: Annotated[int, Field(default=10, ge=1, le=30, description="视频数量")] = 10,
country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
"""获取 TikTok 用户最新发布的视频列表。"""
err = _tiktok_guard()
if err:
return err
try:
vids = await tiktok.get_user_videos(username=username, count=count, proxy_url=_proxy_url, country_code=country_code)
return _json_result(vids, f"@{username} 视频 ({len(vids)} 条)")
except RuntimeError as e:
return f"⚠️ TikTok 断路器: {e}"
except Exception as e:
return f"❌ TikTok 用户视频获取失败: {e}"tiktok_video_info function · python · L1767-L1781 (15 LOC)src/hive_mcp/server.py
async def tiktok_video_info(
video_id: Annotated[str, Field(description="TikTok 视频 ID")],
country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
"""获取 TikTok 单个视频的详细信息(播放量、点赞、评论数等)。"""
err = _tiktok_guard()
if err:
return err
try:
result = await tiktok.get_video_detail(video_id=video_id, proxy_url=_proxy_url, country_code=country_code)
return _json_result(result, f"TikTok 视频 {video_id}")
except RuntimeError as e:
return f"⚠️ TikTok 断路器: {e}"
except Exception as e:
return f"❌ TikTok 视频详情获取失败: {e}"tiktok_video_comments function · python · L1785-L1800 (16 LOC)src/hive_mcp/server.py
async def tiktok_video_comments(
video_id: Annotated[str, Field(description="TikTok 视频 ID")],
count: Annotated[int, Field(default=20, ge=1, le=100, description="评论数量")] = 20,
country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
"""获取 TikTok 视频的热门评论。"""
err = _tiktok_guard()
if err:
return err
try:
comments = await tiktok.get_video_comments(video_id=video_id, count=count, proxy_url=_proxy_url, country_code=country_code)
return _json_result(comments, f"视频 {video_id} 评论 ({len(comments)} 条)")
except RuntimeError as e:
return f"⚠️ TikTok 断路器: {e}"
except Exception as e:
return f"❌ TikTok 评论获取失败: {e}"tiktok_search function · python · L1804-L1819 (16 LOC)src/hive_mcp/server.py
async def tiktok_search(
query: Annotated[str, Field(description="搜索关键词")],
count: Annotated[int, Field(default=10, ge=1, le=30, description="结果数量")] = 10,
country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
"""在 TikTok 上搜索视频。"""
err = _tiktok_guard()
if err:
return err
try:
results = await tiktok.search_videos(query=query, count=count, proxy_url=_proxy_url, country_code=country_code)
return _json_result(results, f'TikTok 搜索 "{query}" ({len(results)} 结果)')
except RuntimeError as e:
return f"⚠️ TikTok 断路器: {e}"
except Exception as e:
return f"❌ TikTok 搜索失败: {e}"tiktok_hashtag_videos function · python · L1823-L1838 (16 LOC)src/hive_mcp/server.py
async def tiktok_hashtag_videos(
tag: Annotated[str, Field(description="话题标签(不含#)")],
count: Annotated[int, Field(default=10, ge=1, le=30, description="视频数量")] = 10,
country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
"""获取 TikTok 话题标签下的热门视频。"""
err = _tiktok_guard()
if err:
return err
try:
vids = await tiktok.get_hashtag_videos(hashtag=tag, count=count, proxy_url=_proxy_url, country_code=country_code)
return _json_result(vids, f"#{tag} TikTok 视频 ({len(vids)} 条)")
except RuntimeError as e:
return f"⚠️ TikTok 断路器: {e}"
except Exception as e:
return f"❌ TikTok 话题视频获取失败: {e}"tiktok_trending function · python · L1842-L1855 (14 LOC)src/hive_mcp/server.py
async def tiktok_trending(
country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2, description="地区")] = None,
) -> str:
"""获取 TikTok 当前热门/推荐视频列表。"""
err = _tiktok_guard()
if err:
return err
try:
vids = await tiktok.get_trending_videos(proxy_url=_proxy_url, country_code=country_code)
return _json_result(vids, f"TikTok 热门 ({len(vids)} 条)")
except RuntimeError as e:
return f"⚠️ TikTok 断路器: {e}"
except Exception as e:
return f"❌ TikTok 热门视频获取失败: {e}"weibo_trending function · python · L1863-L1880 (18 LOC)src/hive_mcp/server.py
async def weibo_trending(
count: Annotated[int, Field(default=20, ge=1, le=50, description="热搜条数")] = 20,
country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
"""\
获取微博实时热搜榜。
返回热搜话题、热度值、链接。适合中文社交媒体趋势分析。
"""
err = _guard_social("weibo_trending")
if err:
return err
try:
items = await trends.get_weibo_trending(count=count, proxy_url=_proxy_url, country_code=country_code)
if not items:
return "⚠️ 微博热搜暂时无法获取(可能需要 Hive 住宅 IP)。"
return _json_result(items, f"微博热搜 ({len(items)} 条)")
except Exception as e:
return f"❌ 微博热搜获取失败: {e}"About: code-quality intelligence by Repobility · https://repobility.com
zhihu_trending function · python · L1884-L1901 (18 LOC)src/hive_mcp/server.py
async def zhihu_trending(
count: Annotated[int, Field(default=20, ge=1, le=50, description="热榜条数")] = 20,
country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
"""\
获取知乎实时热榜。
注意:知乎 API 需要身份验证,在生产环境通过 Hive 住宅 IP + Cookies 访问。
"""
err = _guard_social("zhihu_trending")
if err:
return err
try:
items = await trends.get_zhihu_trending(count=count, proxy_url=_proxy_url, country_code=country_code)
if not items:
return "⚠️ 知乎热榜需要通过 Hive 住宅 IP + 身份认证访问,当前环境暂不可用。"
return _json_result(items, f"知乎热榜 ({len(items)} 条)")
except Exception as e:
return f"❌ 知乎热榜获取失败: {e}"bilibili_ranking function · python · L1905-L1922 (18 LOC)src/hive_mcp/server.py
async def bilibili_ranking(
count: Annotated[int, Field(default=20, ge=1, le=100, description="排行条数")] = 20,
country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
"""\
获取哔哩哔哩(B站)全站热门视频排行榜。
返回标题、播放量、点赞数、UP主、链接。
"""
err = _guard_social("bilibili_ranking")
if err:
return err
try:
items = await trends.get_bilibili_rank(count=count, proxy_url=_proxy_url, country_code=country_code)
if not items:
return "⚠️ B站排行数据暂时不可用(可能需要 Hive 住宅 IP 绕过风控)。"
return _json_result(items, f"B站热门排行 ({len(items)} 条)")
except Exception as e:
return f"❌ B站排行获取失败: {e}"twitter_user_info function · python · L1930-L1945 (16 LOC)src/hive_mcp/server.py
async def twitter_user_info(
username: Annotated[str, Field(description="Twitter/X 用户名(不含@)")],
country_code: Annotated[Optional[str], Field(default=None, min_length=2, max_length=2)] = None,
) -> str:
"""\
获取 Twitter/X 用户公开资料。
使用 yt-dlp 提取,免费无需 Twitter API Key。
"""
err = _guard_social("twitter_user_info")
if err:
return err
try:
result = await twitter.get_user_info(username=username, proxy_url=_proxy_url, country_code=country_code)
return _json_result(result, f"Twitter @{username}")
except Exception as e:
return f"❌ Twitter 用户信息获取失败: {e}"