Function bodies 529 total
_fmt_video function · python · L318-L340 (23 LOC)src/hive_mcp/platforms/tiktok.py
def _fmt_video(v) -> dict:
d = v.as_dict if hasattr(v, "as_dict") else {}
stats = d.get("stats", {}) or d.get("statsV2", {})
author = d.get("author", {}) or {}
music = d.get("music", {}) or {}
return {
"video_id": d.get("id"),
"desc": d.get("desc", "")[:300],
"create_time": d.get("createTime"),
"play_count": stats.get("playCount") or stats.get("play_count"),
"like_count": stats.get("diggCount") or stats.get("like_count"),
"comment_count": stats.get("commentCount") or stats.get("comment_count"),
"share_count": stats.get("shareCount"),
"collect_count": stats.get("collectCount"),
"hashtags": [h.get("hashtagName") for h in (d.get("textExtra") or []) if h.get("hashtagType") == 1],
"author_id": author.get("id"),
"author_unique_id": author.get("uniqueId"),
"author_nickname": author.get("nickname"),
"music_id": music.get("id"),
"music_title": music.get("title"),
_fmt_user function · python · L343-L359 (17 LOC)src/hive_mcp/platforms/tiktok.py
def _fmt_user(u) -> dict:
d = u.as_dict if hasattr(u, "as_dict") else {}
user = d.get("user", d)
stats = d.get("stats", {}) or {}
return {
"user_id": user.get("id"),
"unique_id": user.get("uniqueId"),
"nickname": user.get("nickname"),
"signature": (user.get("signature") or "")[:200],
"avatar_url": user.get("avatarMedium"),
"follower_count": stats.get("followerCount"),
"following_count": stats.get("followingCount"),
"video_count": stats.get("videoCount"),
"heart_count": stats.get("heartCount"),
"is_verified": user.get("verified", False),
"profile_url": f"https://www.tiktok.com/@{user.get('uniqueId', '')}",
}get_user_info function · python · L365-L388 (24 LOC)src/hive_mcp/platforms/tiktok.py
async def get_user_info(
username: str,
proxy_url: Optional[str] = None,
country_code: Optional[str] = None,
) -> dict:
"""获取 TikTok 用户资料(粉丝数/作品数/简介等)。"""
# 优先 TikTokApi
if _cb.allow_request():
try:
async def _call():
api = await _get_api(proxy_url)
user = api.user(username)
info = await user.info()
return _fmt_user(info)
return await _execute(_call, f"user.info({username})")
except Exception as e:
logger.warning("TikTokApi 失败,尝试 yt-dlp fallback: %s", e)
# Fallback: yt-dlp
try:
return await _ytdlp_fallback_user_info(username, proxy_url)
except Exception as e2:
logger.error("yt-dlp fallback 也失败: %s", e2)
raise RuntimeError(f"TikTok 数据获取失败 (TikTokApi + yt-dlp): {e2}")get_user_videos function · python · L391-L416 (26 LOC)src/hive_mcp/platforms/tiktok.py
async def get_user_videos(
username: str,
count: int = 20,
proxy_url: Optional[str] = None,
country_code: Optional[str] = None,
) -> list[dict]:
"""获取用户发布的视频列表。"""
# 优先 TikTokApi
if _cb.allow_request():
try:
async def _call():
api = await _get_api(proxy_url)
videos = []
async for video in api.user(username).videos(count=count):
videos.append(_fmt_video(video))
return videos
return await _execute(_call, f"user.videos({username})")
except Exception as e:
logger.warning("TikTokApi 失败,尝试 yt-dlp fallback: %s", e)
# Fallback: yt-dlp
try:
return await _ytdlp_fallback_user_videos(username, count, proxy_url)
except Exception as e2:
logger.error("yt-dlp fallback 也失败: %s", e2)
raise RuntimeError(f"TikTok 数据获取失败 (TikTokApi + yt-dlp): {e2}")get_video_detail function · python · L419-L442 (24 LOC)src/hive_mcp/platforms/tiktok.py
async def get_video_detail(
video_id: str,
proxy_url: Optional[str] = None,
country_code: Optional[str] = None,
) -> dict:
"""获取单个视频详细信息。"""
# 优先 TikTokApi
if _cb.allow_request():
try:
async def _call():
api = await _get_api(proxy_url)
video = api.video(id=video_id)
info = await video.info()
return _fmt_video(info)
return await _execute(_call, f"video.info({video_id})")
except Exception as e:
logger.warning("TikTokApi 失败,尝试 yt-dlp fallback: %s", e)
# Fallback: yt-dlp
try:
return await _ytdlp_fallback_video_detail(video_id, proxy_url)
except Exception as e2:
logger.error("yt-dlp fallback 也失败: %s", e2)
raise RuntimeError(f"TikTok 数据获取失败 (TikTokApi + yt-dlp): {e2}")get_video_comments function · python · L445-L473 (29 LOC)src/hive_mcp/platforms/tiktok.py
async def get_video_comments(
video_id: str,
count: int = 50,
proxy_url: Optional[str] = None,
country_code: Optional[str] = None,
) -> list[dict]:
"""获取视频评论。"""
async def _call():
api = await _get_api(proxy_url)
comments = []
async for c in api.video(id=video_id).comments(count=count):
d = c.as_dict if hasattr(c, "as_dict") else {}
comments.append({
"comment_id": d.get("cid"),
"text": d.get("text", ""),
"likes": d.get("digg_count"),
"author": (d.get("user") or {}).get("unique_id"),
"create_time": d.get("create_time"),
"reply_count": d.get("reply_comment_total", 0),
})
return comments
try:
return await _execute(_call, f"video.comments({video_id})")
except Exception as e:
# 评论获取不支持 yt-dlp fallback
logger.error("视频评论获取失败(不支持 fallback): %s", e)
raise RuntimeErrorget_hashtag_videos function · python · L476-L497 (22 LOC)src/hive_mcp/platforms/tiktok.py
async def get_hashtag_videos(
hashtag: str,
count: int = 20,
proxy_url: Optional[str] = None,
country_code: Optional[str] = None,
) -> list[dict]:
"""获取话题标签下的视频。hashtag 不含 #。"""
async def _call():
api = await _get_api(proxy_url)
tag = api.hashtag(name=hashtag.lstrip("#"))
videos = []
async for v in tag.videos(count=count):
videos.append(_fmt_video(v))
return videos
try:
return await _execute(_call, f"hashtag.videos({hashtag})")
except Exception as e:
# 话题搜索不支持 yt-dlp fallback
logger.error("话题搜索失败(不支持 fallback): %s", e)
raise RuntimeError(f"话题搜索失败,yt-dlp fallback 不可用: {e}")If a scraper extracted this row, it came from Repobility (https://repobility.com)
search_videos function · python · L500-L520 (21 LOC)src/hive_mcp/platforms/tiktok.py
async def search_videos(
query: str,
count: int = 20,
proxy_url: Optional[str] = None,
country_code: Optional[str] = None,
) -> list[dict]:
"""关键词搜索视频。"""
async def _call():
api = await _get_api(proxy_url)
videos = []
async for v in api.search.videos(query, count=count):
videos.append(_fmt_video(v))
return videos
try:
return await _execute(_call, f"search.videos({query})")
except Exception as e:
# 搜索不支持 yt-dlp fallback
logger.error("视频搜索失败(不支持 fallback): %s", e)
raise RuntimeError(f"视频搜索失败,yt-dlp fallback 不可用: {e}")get_trending_videos function · python · L523-L542 (20 LOC)src/hive_mcp/platforms/tiktok.py
async def get_trending_videos(
count: int = 20,
proxy_url: Optional[str] = None,
country_code: Optional[str] = None,
) -> list[dict]:
"""获取 TikTok 全站趋势视频。"""
async def _call():
api = await _get_api(proxy_url)
videos = []
async for v in api.trending.videos(count=count):
videos.append(_fmt_video(v))
return videos
try:
return await _execute(_call, "trending.videos()")
except Exception as e:
# 趋势视频不支持 yt-dlp fallback
logger.error("趋势视频获取失败(不支持 fallback): %s", e)
raise RuntimeError(f"趋势视频获取失败,yt-dlp fallback 不可用: {e}")_hive_headers function · python · L15-L22 (8 LOC)src/hive_mcp/platforms/trends.py
def _hive_headers(country_code: Optional[str] = None, cookie: Optional[str] = None) -> dict:
"""构建包含地理定向和 Cookie 的请求 headers。"""
h = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
if country_code:
h["X-Hive-Country"] = country_code.lower()
if cookie:
h["Cookie"] = cookie
return hget_weibo_trending function · python · L25-L62 (38 LOC)src/hive_mcp/platforms/trends.py
async def get_weibo_trending(
count: int = 50,
proxy_url: Optional[str] = None,
country_code: Optional[str] = None,
) -> list[dict]:
"""Fetch trending topics from Weibo."""
try:
async with httpx.AsyncClient(
proxy=proxy_url,
timeout=30.0,
follow_redirects=True,
) as client:
headers = {
**_hive_headers(country_code, cookie=config.WEIBO_COOKIE),
"Referer": "https://weibo.com/",
}
response = await client.get(
"https://weibo.com/ajax/statuses/hot_band",
headers=headers
)
response.raise_for_status()
data = response.json()
results = []
for idx, item in enumerate(data.get("data", {}).get("trends", [])[:count], 1):
try:
results.append({
"rank": idx,
"title": item.get("title", ""),
get_zhihu_trending function · python · L65-L105 (41 LOC)src/hive_mcp/platforms/trends.py
async def get_zhihu_trending(
count: int = 20,
proxy_url: Optional[str] = None,
country_code: Optional[str] = None,
) -> list[dict]:
"""Fetch trending topics from Zhihu."""
try:
async with httpx.AsyncClient(
proxy=proxy_url,
timeout=30.0,
follow_redirects=True,
) as client:
headers = {
**_hive_headers(country_code, cookie=config.ZHIHU_COOKIE),
"Referer": "https://www.zhihu.com/",
}
response = await client.get(
"https://www.zhihu.com/api/v3/feed/topstory/hot-lists/total",
params={"limit": count},
headers=headers
)
response.raise_for_status()
data = response.json()
results = []
for idx, item in enumerate(data.get("data", [])[:count], 1):
try:
target = item.get("target", {})
results.append(get_bilibili_rank function · python · L108-L150 (43 LOC)src/hive_mcp/platforms/trends.py
async def get_bilibili_rank(
count: int = 20,
proxy_url: Optional[str] = None,
country_code: Optional[str] = None,
) -> list[dict]:
"""Fetch ranking videos from Bilibili."""
try:
async with httpx.AsyncClient(
proxy=proxy_url,
timeout=30.0,
follow_redirects=True,
) as client:
headers = {
**_hive_headers(country_code, cookie=config.BILIBILI_COOKIE),
"Referer": "https://www.bilibili.com/",
}
response = await client.get(
"https://api.bilibili.com/x/web-interface/ranking/v2",
params={"rid": 0, "type": "all"},
headers=headers
)
response.raise_for_status()
data = response.json()
results = []
for idx, item in enumerate(data.get("data", {}).get("list", [])[:count], 1):
try:
stat = item.get("stat", {})
_run_sync function · python · L17-L23 (7 LOC)src/hive_mcp/platforms/twitter.py
async def _run_sync(func, *args, timeout_s: int = 60, **kwargs):
"""在线程池中运行同步函数,带超时保护。"""
loop = asyncio.get_event_loop()
return await asyncio.wait_for(
loop.run_in_executor(None, partial(func, *args, **kwargs)),
timeout=timeout_s,
)_make_opts function · python · L26-L37 (12 LOC)src/hive_mcp/platforms/twitter.py
def _make_opts(proxy_url: Optional[str] = None, country_code: Optional[str] = None) -> dict:
opts = {
"quiet": True,
"no_warnings": True,
"skip_download": True,
"extract_flat": False,
}
if proxy_url:
opts["proxy"] = proxy_url
if country_code:
opts.setdefault("http_headers", {})["X-Hive-Country"] = country_code.lower()
return optsRepobility analyzer · published findings · https://repobility.com
get_user_info function · python · L40-L70 (31 LOC)src/hive_mcp/platforms/twitter.py
async def get_user_info(
username: str,
proxy_url: Optional[str] = None,
country_code: Optional[str] = None,
) -> dict:
"""获取 Twitter/X 用户公开资料。"""
try:
import yt_dlp
except ImportError:
raise RuntimeError("yt-dlp 未安装")
opts = {**_make_opts(proxy_url, country_code), "extract_flat": True}
def _extract():
url = f"https://twitter.com/{username}"
with yt_dlp.YoutubeDL(opts) as ydl:
info = ydl.extract_info(url, download=False)
return {
"username": username,
"display_name": info.get("uploader") or info.get("channel") or username,
"description": (info.get("description") or "")[:300],
"follower_count": info.get("follower_count"),
"playlist_count": info.get("playlist_count"),
"profile_url": f"https://x.com/{username}",
"thumbnail": info.get("thumbnail"),
}
try:
return awaiget_user_tweets function · python · L73-L112 (40 LOC)src/hive_mcp/platforms/twitter.py
async def get_user_tweets(
username: str,
count: int = 20,
proxy_url: Optional[str] = None,
country_code: Optional[str] = None,
) -> list[dict]:
"""获取用户最近推文。"""
try:
import yt_dlp
except ImportError:
raise RuntimeError("yt-dlp 未安装")
opts = {
**_make_opts(proxy_url, country_code),
"extract_flat": "in_playlist",
"playlistend": count,
}
def _extract():
url = f"https://twitter.com/{username}"
with yt_dlp.YoutubeDL(opts) as ydl:
info = ydl.extract_info(url, download=False)
entries = info.get("entries") or []
return [
{
"tweet_id": e.get("id"),
"text": (e.get("title") or e.get("description") or "")[:500],
"url": e.get("url") or e.get("webpage_url") or f"https://x.com/{username}/status/{e.get('id')}",
"timestamp": e.get("timestamp"),
"view_count":search_tweets function · python · L115-L152 (38 LOC)src/hive_mcp/platforms/twitter.py
async def search_tweets(
query: str,
count: int = 20,
proxy_url: Optional[str] = None,
country_code: Optional[str] = None,
) -> list[dict]:
"""搜索 Twitter/X 推文。"""
try:
import yt_dlp
except ImportError:
raise RuntimeError("yt-dlp 未安装")
opts = {
**_make_opts(proxy_url, country_code),
"extract_flat": True,
}
def _extract():
# yt-dlp 支持 twitter search URL
url = f"https://twitter.com/search?q={query}&f=live"
with yt_dlp.YoutubeDL(opts) as ydl:
info = ydl.extract_info(url, download=False)
entries = info.get("entries") or []
return [
{
"tweet_id": e.get("id"),
"text": (e.get("title") or "")[:500],
"author": e.get("uploader") or e.get("uploader_id"),
"url": e.get("url") or e.get("webpage_url"),
"timestamp": e.get("timestamp"),
}
_make_ydl_opts function · python · L25-L44 (20 LOC)src/hive_mcp/platforms/youtube.py
def _make_ydl_opts(
proxy_url: Optional[str] = None,
country_code: Optional[str] = None,
quiet: bool = True,
) -> dict:
opts: dict = {
"quiet": quiet,
"no_warnings": True,
"skip_download": True,
"extract_flat": False,
"writesubtitles": False,
"writeautomaticsub": True,
"subtitleslangs": ["zh-Hans", "zh-Hant", "en"],
}
if proxy_url:
opts["proxy"] = proxy_url
if country_code:
# 蜂巢自定义 header,LocalProxyServer 读取后传给 Director 做地理定向
opts.setdefault("http_headers", {})["X-Hive-Country"] = country_code.lower()
return opts_extract_video_info function · python · L47-L66 (20 LOC)src/hive_mcp/platforms/youtube.py
def _extract_video_info(info: dict) -> dict:
"""从 yt-dlp info_dict 中提取关键字段,过滤大字段。"""
return {
"video_id": info.get("id"),
"title": info.get("title"),
"description": (info.get("description") or "")[:500],
"channel_id": info.get("channel_id"),
"channel": info.get("uploader") or info.get("channel"),
"upload_date": info.get("upload_date"),
"duration_s": info.get("duration"),
"view_count": info.get("view_count"),
"like_count": info.get("like_count"),
"comment_count": info.get("comment_count"),
"tags": (info.get("tags") or [])[:20],
"categories": info.get("categories") or [],
"thumbnail": info.get("thumbnail"),
"webpage_url": info.get("webpage_url"),
"subtitles_langs": list((info.get("subtitles") or {}).keys()),
"automatic_captions_langs": list((info.get("automatic_captions") or {}).keys()),
}_run_sync function · python · L69-L75 (7 LOC)src/hive_mcp/platforms/youtube.py
async def _run_sync(func, *args, timeout_s: int = 60, **kwargs):
"""在线程池中运行同步函数,带超时保护。"""
loop = asyncio.get_event_loop()
return await asyncio.wait_for(
loop.run_in_executor(None, partial(func, *args, **kwargs)),
timeout=timeout_s,
)get_video_info function · python · L81-L97 (17 LOC)src/hive_mcp/platforms/youtube.py
async def get_video_info(
url: str,
proxy_url: Optional[str] = None,
country_code: Optional[str] = None,
) -> dict:
"""获取单个 YouTube 视频元数据。"""
opts = _make_ydl_opts(proxy_url, country_code=country_code)
def _extract():
with yt_dlp.YoutubeDL(opts) as ydl:
info = ydl.extract_info(url, download=False)
return _extract_video_info(info)
try:
return await _run_sync(_extract, timeout_s=60)
except asyncio.TimeoutError:
raise RuntimeError(f"YouTube 请求超时 (60s),可能是代理网络慢或视频不可用")get_transcript function · python · L100-L164 (65 LOC)src/hive_mcp/platforms/youtube.py
async def get_transcript(
video_id_or_url: str,
lang: str = "en",
proxy_url: Optional[str] = None,
country_code: Optional[str] = None,
) -> list[dict]:
"""获取 YouTube 视频字幕文本。优先 youtube-transcript-api,fallback yt-dlp。"""
# 从 URL 中提取 video_id
if "youtube.com" in video_id_or_url or "youtu.be" in video_id_or_url:
import re
m = re.search(r"(?:v=|youtu\.be/)([A-Za-z0-9_-]{11})", video_id_or_url)
video_id = m.group(1) if m else video_id_or_url
else:
video_id = video_id_or_url
# ① 优先 youtube-transcript-api
try:
from youtube_transcript_api import YouTubeTranscriptApi
def _fetch_transcript():
api = YouTubeTranscriptApi()
try:
result = api.fetch(video_id, languages=[lang, "en", "zh-Hans", "zh-Hant"])
return [{"text": s.text, "start": s.start, "duration": s.duration} for s in result]
except TypeError:
result = YouTubeTranscriptRepobility · severity-and-effort ranking · https://repobility.com
get_channel_videos function · python · L167-L211 (45 LOC)src/hive_mcp/platforms/youtube.py
async def get_channel_videos(
channel_url_or_id: str,
count: int = 20,
proxy_url: Optional[str] = None,
country_code: Optional[str] = None,
) -> list[dict]:
"""获取 YouTube 频道最新视频列表。"""
opts = {
"quiet": True, "no_warnings": True, "skip_download": True,
"extract_flat": "in_playlist", "playlistend": count,
}
if proxy_url:
opts["proxy"] = proxy_url
if country_code:
opts.setdefault("http_headers", {})["X-Hive-Country"] = country_code.lower()
url = channel_url_or_id
if not url.startswith("http"):
if url.startswith("UC"):
url = f"https://www.youtube.com/channel/{url}/videos"
elif url.startswith("@"):
url = f"https://www.youtube.com/{url}/videos"
else:
url = f"https://www.youtube.com/@{url}/videos"
elif "/videos" not in url:
url = url.rstrip("/") + "/videos"
def _extract():
with yt_dlp.YoutubeDL(opts) as ydl:
info = ydl.exsearch_videos function · python · L214-L245 (32 LOC)src/hive_mcp/platforms/youtube.py
async def search_videos(
query: str,
count: int = 10,
proxy_url: Optional[str] = None,
country_code: Optional[str] = None,
) -> list[dict]:
"""YouTube 关键词搜索(yt-dlp ytsearch,完全免费无需 API Key)。"""
opts = {"quiet": True, "no_warnings": True, "skip_download": True, "extract_flat": True}
if proxy_url:
opts["proxy"] = proxy_url
if country_code:
opts.setdefault("http_headers", {})["X-Hive-Country"] = country_code.lower()
def _extract():
with yt_dlp.YoutubeDL(opts) as ydl:
info = ydl.extract_info(f"ytsearch{count}:{query}", download=False)
entries = info.get("entries") or []
return [
{
"video_id": e.get("id"), "title": e.get("title"),
"channel": e.get("uploader") or e.get("channel"),
"view_count": e.get("view_count"), "duration_s": e.get("duration"),
"upload_date": e.get("upload_date"), "thumbnail": e.get(_is_blocked_ip function · python · L39-L44 (6 LOC)src/hive_mcp/proxy/executor.py
def _is_blocked_ip(addr: ipaddress.IPv4Address | ipaddress.IPv6Address) -> bool:
"""检查 IP 是否在黑名单网络中"""
for net in _BLOCKED_NETWORKS:
if addr in net:
return True
return False_is_blocked_host function · python · L47-L79 (33 LOC)src/hive_mcp/proxy/executor.py
def _is_blocked_host(hostname: str) -> bool:
"""检查目标地址是否为内网 / 受限地址(含 DNS 解析后验证,防 DNS 重绑定攻击)"""
if not hostname:
return True
if hostname.lower() in ("localhost", "0.0.0.0"):
return True
# 先检查是否直接是 IP 地址
try:
addr = ipaddress.ip_address(hostname)
return _is_blocked_ip(addr)
except ValueError:
pass # 是域名,继续 DNS 解析
# [安全] 域名:解析 DNS 并检查所有 A/AAAA 记录
# 防止 DNS 重绑定攻击(域名先解析到公网 IP 通过检查,再解析到内网 IP)
try:
results = socket.getaddrinfo(hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM)
if not results:
return True # 无法解析,拒绝
for _family, _type, _proto, _canon, sockaddr in results:
ip_str = sockaddr[0]
try:
addr = ipaddress.ip_address(ip_str)
if _is_blocked_ip(addr):
logger.warning("[SECURITY] DNS -> blocked IP: %s -> %s", hostname, ip_str)
return True
except ValueError:
execute_request function · python · L82-L156 (75 LOC)src/hive_mcp/proxy/executor.py
async def execute_request(
url: str,
method: str = "GET",
headers: dict | None = None,
body: bytes | None = None,
timeout: int = PROXY_TIMEOUT,
) -> dict:
"""在本地网络中发起 HTTP 请求,返回响应结果。"""
method = method.upper()
if method not in _ALLOWED_METHODS:
return {
"status_code": 0, "body_text": "", "headers": {},
"bytes": 0, "error": "BLOCKED_METHOD",
}
try:
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
return {
"status_code": 0, "body_text": "", "headers": {},
"bytes": 0, "error": "BLOCKED_SCHEME",
}
if _is_blocked_host(parsed.hostname or ""):
logger.warning("[SECURITY] SSRF blocked: %s", url)
return {
"status_code": 0, "body_text": "", "headers": {},
"bytes": 0, "error": "BLOCKED_TARGET",
}
except Exception:
return {
"status_code"_is_blocked_connect_host function · python · L45-L57 (13 LOC)src/hive_mcp/proxy/local_server.py
def _is_blocked_connect_host(hostname: str) -> bool:
if not hostname:
return True
if hostname.lower() in ("localhost", "0.0.0.0"):
return True
try:
addr = ipaddress.ip_address(hostname)
for net in _BLOCKED_NETWORKS:
if addr in net:
return True
except ValueError:
pass
return FalseLocalProxyServer.start method · python · L73-L92 (20 LOC)src/hive_mcp/proxy/local_server.py
async def start(self) -> None:
if not API_KEY:
logger.warning("HIVE_API_KEY not set, local proxy disabled")
return
try:
self._server = await asyncio.start_server(
self._handle_client, self.host, self.port
)
self._running = True
logger.info("✅ Local proxy listening on http://%s:%d", self.host, self.port)
except OSError as e:
if "Address already in use" in str(e) or e.errno == 98 or e.errno == 48:
logger.error(
"⚠️ 端口 %d 已被占用(可能是上一个 hive-mcp 进程未退出)。"
"本地代理未启动,但 MCP 工具仍可用。"
"解决方法:kill 占用端口的进程后重启 AI 客户端。",
self.port,
)
else:
logger.error("Failed to start local proxy on %s:%d: %s", self.host, self.port, e)LocalProxyServer.stop method · python · L94-L99 (6 LOC)src/hive_mcp/proxy/local_server.py
async def stop(self) -> None:
if self._server:
self._server.close()
await self._server.wait_closed()
self._running = False
logger.info("Local proxy stopped")Repobility (the analyzer behind this table) · https://repobility.com
LocalProxyServer._handle_client method · python · L105-L132 (28 LOC)src/hive_mcp/proxy/local_server.py
async def _handle_client(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
try:
first_line = await asyncio.wait_for(reader.readline(), timeout=10)
if not first_line:
writer.close()
return
first_line_str = first_line.decode("utf-8", errors="replace").strip()
parts = first_line_str.split()
if len(parts) < 3:
writer.close()
return
method = parts[0].upper()
if method == "CONNECT":
await self._handle_connect(parts[1], reader, writer)
else:
await self._handle_http(method, parts[1], reader, writer, first_line)
except asyncio.TimeoutError:
logger.debug("Client timeout")
except Exception as e:
logger.debug("Client handler error: %s", e)
finally:
try:
writer.close()
LocalProxyServer._handle_http method · python · L134-L198 (65 LOC)src/hive_mcp/proxy/local_server.py
async def _handle_http(
self, method: str, url: str,
reader: asyncio.StreamReader, writer: asyncio.StreamWriter,
first_line: bytes,
) -> None:
headers = {}
content_length = 0
while True:
line = await asyncio.wait_for(reader.readline(), timeout=10)
if line in (b"\r\n", b"\n", b""):
break
hdr = line.decode("utf-8", errors="replace").strip()
if ":" in hdr:
key, val = hdr.split(":", 1)
key = key.strip()
val = val.strip()
if key.lower() == "content-length":
content_length = int(val)
if content_length > MAX_REQUEST_BODY:
writer.write(b"HTTP/1.1 413 Payload Too Large\r\nConnection: close\r\n\r\n")
await writer.drain()
return
elif key.lower() not in ("proxy-connection", "proxy-authorizatLocalProxyServer._handle_connect method · python · L200-L275 (76 LOC)src/hive_mcp/proxy/local_server.py
async def _handle_connect(
self, target: str,
reader: asyncio.StreamReader, writer: asyncio.StreamWriter,
) -> None:
while True:
line = await asyncio.wait_for(reader.readline(), timeout=10)
if line in (b"\r\n", b"\n", b""):
break
if ":" in target:
host, port_str = target.rsplit(":", 1)
try:
port = int(port_str)
except ValueError:
writer.write(b"HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\n")
await writer.drain()
return
else:
host = target
port = 443
if _is_blocked_connect_host(host):
logger.warning("[SECURITY] CONNECT SSRF blocked: %s:%d", host, port)
writer.write(b"HTTP/1.1 403 Forbidden\r\nConnection: close\r\n\r\n")
await writer.drain()
return
if port not in _ALLOWED_CONNECT_PORTS:
logger.LocalProxyServer._proxy_via_director method · python · L277-L304 (28 LOC)src/hive_mcp/proxy/local_server.py
async def _proxy_via_director(
self, url: str, method: str = "GET", headers: dict | None = None,
*, country_code: str | None = None,
) -> dict:
try:
options: dict = {"timeout_ms": 15000}
if country_code:
options["country"] = country_code.lower()
async with httpx.AsyncClient(timeout=30, trust_env=False) as client:
r = await client.post(
f"{DIRECTOR_URL}/api/v1/proxy/request",
headers={"Authorization": f"Bearer {API_KEY}"},
json={
"target_url": url,
"method": method,
"headers": headers or {},
"options": options,
},
)
data = r.json()
if r.is_success:
return data
else:
return {"status_code": r.status_code, "body_texlifespan function · python · L31-L45 (15 LOC)src/hive_mcp/rest_api.py
async def lifespan(app: FastAPI):
global _proxy_url
from hive_mcp.proxy.local_server import LocalProxyServer
proxy = LocalProxyServer()
await proxy.start()
if proxy.is_running:
_proxy_url = f"http://127.0.0.1:{proxy.port}"
logger.info("REST API started — proxy_url=%s", _proxy_url)
else:
_proxy_url = ""
logger.error("REST API started — 本地代理启动失败,代理工具不可用")
yield
await proxy.stop()
logger.info("REST API shutting down")search function · python · L66-L76 (11 LOC)src/hive_mcp/rest_api.py
async def search(
q: str = Query(..., description="搜索关键词"),
count: int = Query(10, ge=1, le=50),
):
"""DuckDuckGo 搜索。"""
from hive_mcp.tools.search import web_search
try:
results = await web_search(q, count=count, proxy_url=_proxy_url)
return {"query": q, "results": results}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))scrape function · python · L80-L89 (10 LOC)src/hive_mcp/rest_api.py
async def scrape(
url: str = Query(..., description="目标 URL"),
):
"""通用网页抓取 → Markdown。"""
from hive_mcp.tools.scrape import scrape_url
try:
result = await scrape_url(url, proxy_url=_proxy_url)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))yt_video function · python · L95-L102 (8 LOC)src/hive_mcp/rest_api.py
async def yt_video(
url: str = Query(..., description="YouTube 视频 URL"),
):
from hive_mcp.platforms.youtube import get_video_info
try:
return await get_video_info(url, proxy_url=_proxy_url)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))If a scraper extracted this row, it came from Repobility (https://repobility.com)
yt_transcript function · python · L106-L115 (10 LOC)src/hive_mcp/rest_api.py
async def yt_transcript(
video_id: str = Query(...),
lang: str = Query("en"),
):
from hive_mcp.platforms.youtube import get_transcript
try:
segments = await get_transcript(video_id, lang=lang, proxy_url=_proxy_url)
return {"video_id": video_id, "lang": lang, "segments": segments}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))yt_channel function · python · L119-L127 (9 LOC)src/hive_mcp/rest_api.py
async def yt_channel(
channel: str = Query(..., description="频道 URL / @handle / UC...ID"),
count: int = Query(20, ge=1, le=100),
):
from hive_mcp.platforms.youtube import get_channel_videos
try:
return await get_channel_videos(channel, count=count, proxy_url=_proxy_url)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))yt_search function · python · L131-L139 (9 LOC)src/hive_mcp/rest_api.py
async def yt_search(
q: str = Query(...),
count: int = Query(10, ge=1, le=50),
):
from hive_mcp.platforms.youtube import search_videos
try:
return await search_videos(q, count=count, proxy_url=_proxy_url)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))ig_profile function · python · L145-L152 (8 LOC)src/hive_mcp/rest_api.py
async def ig_profile(
username: str = Query(...),
):
from hive_mcp.platforms.instagram import get_profile
try:
return await get_profile(username, proxy_url=_proxy_url)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))ig_posts function · python · L156-L164 (9 LOC)src/hive_mcp/rest_api.py
async def ig_posts(
username: str = Query(...),
count: int = Query(20, ge=1, le=50),
):
from hive_mcp.platforms.instagram import get_user_posts
try:
return await get_user_posts(username, count=count, proxy_url=_proxy_url)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))ig_hashtag function · python · L168-L176 (9 LOC)src/hive_mcp/rest_api.py
async def ig_hashtag(
tag: str = Query(...),
count: int = Query(20, ge=1, le=50),
):
from hive_mcp.platforms.instagram import get_hashtag_posts
try:
return await get_hashtag_posts(tag, count=count, proxy_url=_proxy_url)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))ig_post function · python · L180-L187 (8 LOC)src/hive_mcp/rest_api.py
async def ig_post(
shortcode: str = Query(..., description="帖子 shortcode 或完整 URL"),
):
from hive_mcp.platforms.instagram import get_post_detail
try:
return await get_post_detail(shortcode, proxy_url=_proxy_url)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))tt_user function · python · L193-L200 (8 LOC)src/hive_mcp/rest_api.py
async def tt_user(
username: str = Query(...),
):
from hive_mcp.platforms.tiktok import get_user_info
try:
return await get_user_info(username, proxy_url=_proxy_url)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))Repobility analyzer · published findings · https://repobility.com
tt_user_videos function · python · L204-L212 (9 LOC)src/hive_mcp/rest_api.py
async def tt_user_videos(
username: str = Query(...),
count: int = Query(20, ge=1, le=50),
):
from hive_mcp.platforms.tiktok import get_user_videos
try:
return await get_user_videos(username, count=count, proxy_url=_proxy_url)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))tt_video function · python · L216-L223 (8 LOC)src/hive_mcp/rest_api.py
async def tt_video(
video_id: str = Query(...),
):
from hive_mcp.platforms.tiktok import get_video_detail
try:
return await get_video_detail(video_id, proxy_url=_proxy_url)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))tt_comments function · python · L227-L235 (9 LOC)src/hive_mcp/rest_api.py
async def tt_comments(
video_id: str = Query(...),
count: int = Query(50, ge=1, le=100),
):
from hive_mcp.platforms.tiktok import get_video_comments
try:
return await get_video_comments(video_id, count=count, proxy_url=_proxy_url)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))