← back to dreamer09__agentxspace

Function bodies 529 total

All specs Real LLM only Function bodies
_fmt_video function · python · L318-L340 (23 LOC)
src/hive_mcp/platforms/tiktok.py
def _fmt_video(v) -> dict:
    d = v.as_dict if hasattr(v, "as_dict") else {}
    stats = d.get("stats", {}) or d.get("statsV2", {})
    author = d.get("author", {}) or {}
    music = d.get("music", {}) or {}
    return {
        "video_id": d.get("id"),
        "desc": d.get("desc", "")[:300],
        "create_time": d.get("createTime"),
        "play_count": stats.get("playCount") or stats.get("play_count"),
        "like_count": stats.get("diggCount") or stats.get("like_count"),
        "comment_count": stats.get("commentCount") or stats.get("comment_count"),
        "share_count": stats.get("shareCount"),
        "collect_count": stats.get("collectCount"),
        "hashtags": [h.get("hashtagName") for h in (d.get("textExtra") or []) if h.get("hashtagType") == 1],
        "author_id": author.get("id"),
        "author_unique_id": author.get("uniqueId"),
        "author_nickname": author.get("nickname"),
        "music_id": music.get("id"),
        "music_title": music.get("title"),
 
_fmt_user function · python · L343-L359 (17 LOC)
src/hive_mcp/platforms/tiktok.py
def _fmt_user(u) -> dict:
    d = u.as_dict if hasattr(u, "as_dict") else {}
    user = d.get("user", d)
    stats = d.get("stats", {}) or {}
    return {
        "user_id": user.get("id"),
        "unique_id": user.get("uniqueId"),
        "nickname": user.get("nickname"),
        "signature": (user.get("signature") or "")[:200],
        "avatar_url": user.get("avatarMedium"),
        "follower_count": stats.get("followerCount"),
        "following_count": stats.get("followingCount"),
        "video_count": stats.get("videoCount"),
        "heart_count": stats.get("heartCount"),
        "is_verified": user.get("verified", False),
        "profile_url": f"https://www.tiktok.com/@{user.get('uniqueId', '')}",
    }
get_user_info function · python · L365-L388 (24 LOC)
src/hive_mcp/platforms/tiktok.py
async def get_user_info(
    username: str,
    proxy_url: Optional[str] = None,
    country_code: Optional[str] = None,
) -> dict:
    """获取 TikTok 用户资料(粉丝数/作品数/简介等)。"""
    # 优先 TikTokApi
    if _cb.allow_request():
        try:
            async def _call():
                api = await _get_api(proxy_url)
                user = api.user(username)
                info = await user.info()
                return _fmt_user(info)
            return await _execute(_call, f"user.info({username})")
        except Exception as e:
            logger.warning("TikTokApi 失败,尝试 yt-dlp fallback: %s", e)

    # Fallback: yt-dlp
    try:
        return await _ytdlp_fallback_user_info(username, proxy_url)
    except Exception as e2:
        logger.error("yt-dlp fallback 也失败: %s", e2)
        raise RuntimeError(f"TikTok 数据获取失败 (TikTokApi + yt-dlp): {e2}")
get_user_videos function · python · L391-L416 (26 LOC)
src/hive_mcp/platforms/tiktok.py
async def get_user_videos(
    username: str,
    count: int = 20,
    proxy_url: Optional[str] = None,
    country_code: Optional[str] = None,
) -> list[dict]:
    """获取用户发布的视频列表。"""
    # 优先 TikTokApi
    if _cb.allow_request():
        try:
            async def _call():
                api = await _get_api(proxy_url)
                videos = []
                async for video in api.user(username).videos(count=count):
                    videos.append(_fmt_video(video))
                return videos
            return await _execute(_call, f"user.videos({username})")
        except Exception as e:
            logger.warning("TikTokApi 失败,尝试 yt-dlp fallback: %s", e)

    # Fallback: yt-dlp
    try:
        return await _ytdlp_fallback_user_videos(username, count, proxy_url)
    except Exception as e2:
        logger.error("yt-dlp fallback 也失败: %s", e2)
        raise RuntimeError(f"TikTok 数据获取失败 (TikTokApi + yt-dlp): {e2}")
get_video_detail function · python · L419-L442 (24 LOC)
src/hive_mcp/platforms/tiktok.py
async def get_video_detail(
    video_id: str,
    proxy_url: Optional[str] = None,
    country_code: Optional[str] = None,
) -> dict:
    """获取单个视频详细信息。"""
    # 优先 TikTokApi
    if _cb.allow_request():
        try:
            async def _call():
                api = await _get_api(proxy_url)
                video = api.video(id=video_id)
                info = await video.info()
                return _fmt_video(info)
            return await _execute(_call, f"video.info({video_id})")
        except Exception as e:
            logger.warning("TikTokApi 失败,尝试 yt-dlp fallback: %s", e)

    # Fallback: yt-dlp
    try:
        return await _ytdlp_fallback_video_detail(video_id, proxy_url)
    except Exception as e2:
        logger.error("yt-dlp fallback 也失败: %s", e2)
        raise RuntimeError(f"TikTok 数据获取失败 (TikTokApi + yt-dlp): {e2}")
get_video_comments function · python · L445-L473 (29 LOC)
src/hive_mcp/platforms/tiktok.py
async def get_video_comments(
    video_id: str,
    count: int = 50,
    proxy_url: Optional[str] = None,
    country_code: Optional[str] = None,
) -> list[dict]:
    """获取视频评论。"""

    async def _call():
        api = await _get_api(proxy_url)
        comments = []
        async for c in api.video(id=video_id).comments(count=count):
            d = c.as_dict if hasattr(c, "as_dict") else {}
            comments.append({
                "comment_id": d.get("cid"),
                "text": d.get("text", ""),
                "likes": d.get("digg_count"),
                "author": (d.get("user") or {}).get("unique_id"),
                "create_time": d.get("create_time"),
                "reply_count": d.get("reply_comment_total", 0),
            })
        return comments

    try:
        return await _execute(_call, f"video.comments({video_id})")
    except Exception as e:
        # 评论获取不支持 yt-dlp fallback
        logger.error("视频评论获取失败(不支持 fallback): %s", e)
        raise RuntimeError
get_hashtag_videos function · python · L476-L497 (22 LOC)
src/hive_mcp/platforms/tiktok.py
async def get_hashtag_videos(
    hashtag: str,
    count: int = 20,
    proxy_url: Optional[str] = None,
    country_code: Optional[str] = None,
) -> list[dict]:
    """获取话题标签下的视频。hashtag 不含 #。"""

    async def _call():
        api = await _get_api(proxy_url)
        tag = api.hashtag(name=hashtag.lstrip("#"))
        videos = []
        async for v in tag.videos(count=count):
            videos.append(_fmt_video(v))
        return videos

    try:
        return await _execute(_call, f"hashtag.videos({hashtag})")
    except Exception as e:
        # 话题搜索不支持 yt-dlp fallback
        logger.error("话题搜索失败(不支持 fallback): %s", e)
        raise RuntimeError(f"话题搜索失败,yt-dlp fallback 不可用: {e}")
If a scraper extracted this row, it came from Repobility (https://repobility.com)
search_videos function · python · L500-L520 (21 LOC)
src/hive_mcp/platforms/tiktok.py
async def search_videos(
    query: str,
    count: int = 20,
    proxy_url: Optional[str] = None,
    country_code: Optional[str] = None,
) -> list[dict]:
    """关键词搜索视频。"""

    async def _call():
        api = await _get_api(proxy_url)
        videos = []
        async for v in api.search.videos(query, count=count):
            videos.append(_fmt_video(v))
        return videos

    try:
        return await _execute(_call, f"search.videos({query})")
    except Exception as e:
        # 搜索不支持 yt-dlp fallback
        logger.error("视频搜索失败(不支持 fallback): %s", e)
        raise RuntimeError(f"视频搜索失败,yt-dlp fallback 不可用: {e}")
get_trending_videos function · python · L523-L542 (20 LOC)
src/hive_mcp/platforms/tiktok.py
async def get_trending_videos(
    count: int = 20,
    proxy_url: Optional[str] = None,
    country_code: Optional[str] = None,
) -> list[dict]:
    """获取 TikTok 全站趋势视频。"""

    async def _call():
        api = await _get_api(proxy_url)
        videos = []
        async for v in api.trending.videos(count=count):
            videos.append(_fmt_video(v))
        return videos

    try:
        return await _execute(_call, "trending.videos()")
    except Exception as e:
        # 趋势视频不支持 yt-dlp fallback
        logger.error("趋势视频获取失败(不支持 fallback): %s", e)
        raise RuntimeError(f"趋势视频获取失败,yt-dlp fallback 不可用: {e}")
_hive_headers function · python · L15-L22 (8 LOC)
src/hive_mcp/platforms/trends.py
def _hive_headers(country_code: Optional[str] = None, cookie: Optional[str] = None) -> dict:
    """构建包含地理定向和 Cookie 的请求 headers。"""
    h = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
    if country_code:
        h["X-Hive-Country"] = country_code.lower()
    if cookie:
        h["Cookie"] = cookie
    return h
get_weibo_trending function · python · L25-L62 (38 LOC)
src/hive_mcp/platforms/trends.py
async def get_weibo_trending(
    count: int = 50,
    proxy_url: Optional[str] = None,
    country_code: Optional[str] = None,
) -> list[dict]:
    """Fetch trending topics from Weibo."""
    try:
        async with httpx.AsyncClient(
            proxy=proxy_url,
            timeout=30.0,
            follow_redirects=True,
        ) as client:
            headers = {
                **_hive_headers(country_code, cookie=config.WEIBO_COOKIE),
                "Referer": "https://weibo.com/",
            }
            response = await client.get(
                "https://weibo.com/ajax/statuses/hot_band",
                headers=headers
            )
            response.raise_for_status()
            data = response.json()
            results = []
            for idx, item in enumerate(data.get("data", {}).get("trends", [])[:count], 1):
                try:
                    results.append({
                        "rank": idx,
                        "title": item.get("title", ""),
  
get_zhihu_trending function · python · L65-L105 (41 LOC)
src/hive_mcp/platforms/trends.py
async def get_zhihu_trending(
    count: int = 20,
    proxy_url: Optional[str] = None,
    country_code: Optional[str] = None,
) -> list[dict]:
    """Fetch trending topics from Zhihu."""
    try:
        async with httpx.AsyncClient(
            proxy=proxy_url,
            timeout=30.0,
            follow_redirects=True,
        ) as client:
            headers = {
                **_hive_headers(country_code, cookie=config.ZHIHU_COOKIE),
                "Referer": "https://www.zhihu.com/",
            }
            response = await client.get(
                "https://www.zhihu.com/api/v3/feed/topstory/hot-lists/total",
                params={"limit": count},
                headers=headers
            )
            response.raise_for_status()
            data = response.json()
            results = []
            for idx, item in enumerate(data.get("data", [])[:count], 1):
                try:
                    target = item.get("target", {})
                    results.append(
get_bilibili_rank function · python · L108-L150 (43 LOC)
src/hive_mcp/platforms/trends.py
async def get_bilibili_rank(
    count: int = 20,
    proxy_url: Optional[str] = None,
    country_code: Optional[str] = None,
) -> list[dict]:
    """Fetch ranking videos from Bilibili."""
    try:
        async with httpx.AsyncClient(
            proxy=proxy_url,
            timeout=30.0,
            follow_redirects=True,
        ) as client:
            headers = {
                **_hive_headers(country_code, cookie=config.BILIBILI_COOKIE),
                "Referer": "https://www.bilibili.com/",
            }
            response = await client.get(
                "https://api.bilibili.com/x/web-interface/ranking/v2",
                params={"rid": 0, "type": "all"},
                headers=headers
            )
            response.raise_for_status()
            data = response.json()
            results = []
            for idx, item in enumerate(data.get("data", {}).get("list", [])[:count], 1):
                try:
                    stat = item.get("stat", {})
              
_run_sync function · python · L17-L23 (7 LOC)
src/hive_mcp/platforms/twitter.py
async def _run_sync(func, *args, timeout_s: int = 60, **kwargs):
    """在线程池中运行同步函数,带超时保护。"""
    loop = asyncio.get_event_loop()
    return await asyncio.wait_for(
        loop.run_in_executor(None, partial(func, *args, **kwargs)),
        timeout=timeout_s,
    )
_make_opts function · python · L26-L37 (12 LOC)
src/hive_mcp/platforms/twitter.py
def _make_opts(proxy_url: Optional[str] = None, country_code: Optional[str] = None) -> dict:
    opts = {
        "quiet": True,
        "no_warnings": True,
        "skip_download": True,
        "extract_flat": False,
    }
    if proxy_url:
        opts["proxy"] = proxy_url
    if country_code:
        opts.setdefault("http_headers", {})["X-Hive-Country"] = country_code.lower()
    return opts
Repobility analyzer · published findings · https://repobility.com
get_user_info function · python · L40-L70 (31 LOC)
src/hive_mcp/platforms/twitter.py
async def get_user_info(
    username: str,
    proxy_url: Optional[str] = None,
    country_code: Optional[str] = None,
) -> dict:
    """获取 Twitter/X 用户公开资料。"""
    try:
        import yt_dlp
    except ImportError:
        raise RuntimeError("yt-dlp 未安装")

    opts = {**_make_opts(proxy_url, country_code), "extract_flat": True}

    def _extract():
        url = f"https://twitter.com/{username}"
        with yt_dlp.YoutubeDL(opts) as ydl:
            info = ydl.extract_info(url, download=False)
            return {
                "username": username,
                "display_name": info.get("uploader") or info.get("channel") or username,
                "description": (info.get("description") or "")[:300],
                "follower_count": info.get("follower_count"),
                "playlist_count": info.get("playlist_count"),
                "profile_url": f"https://x.com/{username}",
                "thumbnail": info.get("thumbnail"),
            }

    try:
        return awai
get_user_tweets function · python · L73-L112 (40 LOC)
src/hive_mcp/platforms/twitter.py
async def get_user_tweets(
    username: str,
    count: int = 20,
    proxy_url: Optional[str] = None,
    country_code: Optional[str] = None,
) -> list[dict]:
    """获取用户最近推文。"""
    try:
        import yt_dlp
    except ImportError:
        raise RuntimeError("yt-dlp 未安装")

    opts = {
        **_make_opts(proxy_url, country_code),
        "extract_flat": "in_playlist",
        "playlistend": count,
    }

    def _extract():
        url = f"https://twitter.com/{username}"
        with yt_dlp.YoutubeDL(opts) as ydl:
            info = ydl.extract_info(url, download=False)
            entries = info.get("entries") or []
            return [
                {
                    "tweet_id": e.get("id"),
                    "text": (e.get("title") or e.get("description") or "")[:500],
                    "url": e.get("url") or e.get("webpage_url") or f"https://x.com/{username}/status/{e.get('id')}",
                    "timestamp": e.get("timestamp"),
                    "view_count":
search_tweets function · python · L115-L152 (38 LOC)
src/hive_mcp/platforms/twitter.py
async def search_tweets(
    query: str,
    count: int = 20,
    proxy_url: Optional[str] = None,
    country_code: Optional[str] = None,
) -> list[dict]:
    """搜索 Twitter/X 推文。"""
    try:
        import yt_dlp
    except ImportError:
        raise RuntimeError("yt-dlp 未安装")

    opts = {
        **_make_opts(proxy_url, country_code),
        "extract_flat": True,
    }

    def _extract():
        # yt-dlp 支持 twitter search URL
        url = f"https://twitter.com/search?q={query}&f=live"
        with yt_dlp.YoutubeDL(opts) as ydl:
            info = ydl.extract_info(url, download=False)
            entries = info.get("entries") or []
            return [
                {
                    "tweet_id": e.get("id"),
                    "text": (e.get("title") or "")[:500],
                    "author": e.get("uploader") or e.get("uploader_id"),
                    "url": e.get("url") or e.get("webpage_url"),
                    "timestamp": e.get("timestamp"),
                }
   
_make_ydl_opts function · python · L25-L44 (20 LOC)
src/hive_mcp/platforms/youtube.py
def _make_ydl_opts(
    proxy_url: Optional[str] = None,
    country_code: Optional[str] = None,
    quiet: bool = True,
) -> dict:
    opts: dict = {
        "quiet": quiet,
        "no_warnings": True,
        "skip_download": True,
        "extract_flat": False,
        "writesubtitles": False,
        "writeautomaticsub": True,
        "subtitleslangs": ["zh-Hans", "zh-Hant", "en"],
    }
    if proxy_url:
        opts["proxy"] = proxy_url
    if country_code:
        # 蜂巢自定义 header,LocalProxyServer 读取后传给 Director 做地理定向
        opts.setdefault("http_headers", {})["X-Hive-Country"] = country_code.lower()
    return opts
_extract_video_info function · python · L47-L66 (20 LOC)
src/hive_mcp/platforms/youtube.py
def _extract_video_info(info: dict) -> dict:
    """从 yt-dlp info_dict 中提取关键字段,过滤大字段。"""
    return {
        "video_id": info.get("id"),
        "title": info.get("title"),
        "description": (info.get("description") or "")[:500],
        "channel_id": info.get("channel_id"),
        "channel": info.get("uploader") or info.get("channel"),
        "upload_date": info.get("upload_date"),
        "duration_s": info.get("duration"),
        "view_count": info.get("view_count"),
        "like_count": info.get("like_count"),
        "comment_count": info.get("comment_count"),
        "tags": (info.get("tags") or [])[:20],
        "categories": info.get("categories") or [],
        "thumbnail": info.get("thumbnail"),
        "webpage_url": info.get("webpage_url"),
        "subtitles_langs": list((info.get("subtitles") or {}).keys()),
        "automatic_captions_langs": list((info.get("automatic_captions") or {}).keys()),
    }
_run_sync function · python · L69-L75 (7 LOC)
src/hive_mcp/platforms/youtube.py
async def _run_sync(func, *args, timeout_s: int = 60, **kwargs):
    """在线程池中运行同步函数,带超时保护。"""
    loop = asyncio.get_event_loop()
    return await asyncio.wait_for(
        loop.run_in_executor(None, partial(func, *args, **kwargs)),
        timeout=timeout_s,
    )
get_video_info function · python · L81-L97 (17 LOC)
src/hive_mcp/platforms/youtube.py
async def get_video_info(
    url: str,
    proxy_url: Optional[str] = None,
    country_code: Optional[str] = None,
) -> dict:
    """获取单个 YouTube 视频元数据。"""
    opts = _make_ydl_opts(proxy_url, country_code=country_code)

    def _extract():
        with yt_dlp.YoutubeDL(opts) as ydl:
            info = ydl.extract_info(url, download=False)
            return _extract_video_info(info)

    try:
        return await _run_sync(_extract, timeout_s=60)
    except asyncio.TimeoutError:
        raise RuntimeError(f"YouTube 请求超时 (60s),可能是代理网络慢或视频不可用")
get_transcript function · python · L100-L164 (65 LOC)
src/hive_mcp/platforms/youtube.py
async def get_transcript(
    video_id_or_url: str,
    lang: str = "en",
    proxy_url: Optional[str] = None,
    country_code: Optional[str] = None,
) -> list[dict]:
    """获取 YouTube 视频字幕文本。优先 youtube-transcript-api,fallback yt-dlp。"""
    # 从 URL 中提取 video_id
    if "youtube.com" in video_id_or_url or "youtu.be" in video_id_or_url:
        import re
        m = re.search(r"(?:v=|youtu\.be/)([A-Za-z0-9_-]{11})", video_id_or_url)
        video_id = m.group(1) if m else video_id_or_url
    else:
        video_id = video_id_or_url

    # ① 优先 youtube-transcript-api
    try:
        from youtube_transcript_api import YouTubeTranscriptApi

        def _fetch_transcript():
            api = YouTubeTranscriptApi()
            try:
                result = api.fetch(video_id, languages=[lang, "en", "zh-Hans", "zh-Hant"])
                return [{"text": s.text, "start": s.start, "duration": s.duration} for s in result]
            except TypeError:
                result = YouTubeTranscript
Repobility · severity-and-effort ranking · https://repobility.com
get_channel_videos function · python · L167-L211 (45 LOC)
src/hive_mcp/platforms/youtube.py
async def get_channel_videos(
    channel_url_or_id: str,
    count: int = 20,
    proxy_url: Optional[str] = None,
    country_code: Optional[str] = None,
) -> list[dict]:
    """获取 YouTube 频道最新视频列表。"""
    opts = {
        "quiet": True, "no_warnings": True, "skip_download": True,
        "extract_flat": "in_playlist", "playlistend": count,
    }
    if proxy_url:
        opts["proxy"] = proxy_url
    if country_code:
        opts.setdefault("http_headers", {})["X-Hive-Country"] = country_code.lower()

    url = channel_url_or_id
    if not url.startswith("http"):
        if url.startswith("UC"):
            url = f"https://www.youtube.com/channel/{url}/videos"
        elif url.startswith("@"):
            url = f"https://www.youtube.com/{url}/videos"
        else:
            url = f"https://www.youtube.com/@{url}/videos"
    elif "/videos" not in url:
        url = url.rstrip("/") + "/videos"

    def _extract():
        with yt_dlp.YoutubeDL(opts) as ydl:
            info = ydl.ex
search_videos function · python · L214-L245 (32 LOC)
src/hive_mcp/platforms/youtube.py
async def search_videos(
    query: str,
    count: int = 10,
    proxy_url: Optional[str] = None,
    country_code: Optional[str] = None,
) -> list[dict]:
    """YouTube 关键词搜索(yt-dlp ytsearch,完全免费无需 API Key)。"""
    opts = {"quiet": True, "no_warnings": True, "skip_download": True, "extract_flat": True}
    if proxy_url:
        opts["proxy"] = proxy_url
    if country_code:
        opts.setdefault("http_headers", {})["X-Hive-Country"] = country_code.lower()

    def _extract():
        with yt_dlp.YoutubeDL(opts) as ydl:
            info = ydl.extract_info(f"ytsearch{count}:{query}", download=False)
            entries = info.get("entries") or []
            return [
                {
                    "video_id": e.get("id"), "title": e.get("title"),
                    "channel": e.get("uploader") or e.get("channel"),
                    "view_count": e.get("view_count"), "duration_s": e.get("duration"),
                    "upload_date": e.get("upload_date"), "thumbnail": e.get(
_is_blocked_ip function · python · L39-L44 (6 LOC)
src/hive_mcp/proxy/executor.py
def _is_blocked_ip(addr: ipaddress.IPv4Address | ipaddress.IPv6Address) -> bool:
    """检查 IP 是否在黑名单网络中"""
    for net in _BLOCKED_NETWORKS:
        if addr in net:
            return True
    return False
_is_blocked_host function · python · L47-L79 (33 LOC)
src/hive_mcp/proxy/executor.py
def _is_blocked_host(hostname: str) -> bool:
    """检查目标地址是否为内网 / 受限地址(含 DNS 解析后验证,防 DNS 重绑定攻击)"""
    if not hostname:
        return True
    if hostname.lower() in ("localhost", "0.0.0.0"):
        return True

    # 先检查是否直接是 IP 地址
    try:
        addr = ipaddress.ip_address(hostname)
        return _is_blocked_ip(addr)
    except ValueError:
        pass  # 是域名,继续 DNS 解析

    # [安全] 域名:解析 DNS 并检查所有 A/AAAA 记录
    # 防止 DNS 重绑定攻击(域名先解析到公网 IP 通过检查,再解析到内网 IP)
    try:
        results = socket.getaddrinfo(hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM)
        if not results:
            return True  # 无法解析,拒绝
        for _family, _type, _proto, _canon, sockaddr in results:
            ip_str = sockaddr[0]
            try:
                addr = ipaddress.ip_address(ip_str)
                if _is_blocked_ip(addr):
                    logger.warning("[SECURITY] DNS -> blocked IP: %s -> %s", hostname, ip_str)
                    return True
            except ValueError:
           
execute_request function · python · L82-L156 (75 LOC)
src/hive_mcp/proxy/executor.py
async def execute_request(
    url: str,
    method: str = "GET",
    headers: dict | None = None,
    body: bytes | None = None,
    timeout: int = PROXY_TIMEOUT,
) -> dict:
    """在本地网络中发起 HTTP 请求,返回响应结果。"""
    method = method.upper()
    if method not in _ALLOWED_METHODS:
        return {
            "status_code": 0, "body_text": "", "headers": {},
            "bytes": 0, "error": "BLOCKED_METHOD",
        }

    try:
        parsed = urlparse(url)
        if parsed.scheme not in ("http", "https"):
            return {
                "status_code": 0, "body_text": "", "headers": {},
                "bytes": 0, "error": "BLOCKED_SCHEME",
            }
        if _is_blocked_host(parsed.hostname or ""):
            logger.warning("[SECURITY] SSRF blocked: %s", url)
            return {
                "status_code": 0, "body_text": "", "headers": {},
                "bytes": 0, "error": "BLOCKED_TARGET",
            }
    except Exception:
        return {
            "status_code"
_is_blocked_connect_host function · python · L45-L57 (13 LOC)
src/hive_mcp/proxy/local_server.py
def _is_blocked_connect_host(hostname: str) -> bool:
    if not hostname:
        return True
    if hostname.lower() in ("localhost", "0.0.0.0"):
        return True
    try:
        addr = ipaddress.ip_address(hostname)
        for net in _BLOCKED_NETWORKS:
            if addr in net:
                return True
    except ValueError:
        pass
    return False
LocalProxyServer.start method · python · L73-L92 (20 LOC)
src/hive_mcp/proxy/local_server.py
    async def start(self) -> None:
        if not API_KEY:
            logger.warning("HIVE_API_KEY not set, local proxy disabled")
            return
        try:
            self._server = await asyncio.start_server(
                self._handle_client, self.host, self.port
            )
            self._running = True
            logger.info("✅ Local proxy listening on http://%s:%d", self.host, self.port)
        except OSError as e:
            if "Address already in use" in str(e) or e.errno == 98 or e.errno == 48:
                logger.error(
                    "⚠️ 端口 %d 已被占用(可能是上一个 hive-mcp 进程未退出)。"
                    "本地代理未启动,但 MCP 工具仍可用。"
                    "解决方法:kill 占用端口的进程后重启 AI 客户端。",
                    self.port,
                )
            else:
                logger.error("Failed to start local proxy on %s:%d: %s", self.host, self.port, e)
LocalProxyServer.stop method · python · L94-L99 (6 LOC)
src/hive_mcp/proxy/local_server.py
    async def stop(self) -> None:
        if self._server:
            self._server.close()
            await self._server.wait_closed()
            self._running = False
            logger.info("Local proxy stopped")
Repobility (the analyzer behind this table) · https://repobility.com
LocalProxyServer._handle_client method · python · L105-L132 (28 LOC)
src/hive_mcp/proxy/local_server.py
    async def _handle_client(
        self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
    ) -> None:
        try:
            first_line = await asyncio.wait_for(reader.readline(), timeout=10)
            if not first_line:
                writer.close()
                return
            first_line_str = first_line.decode("utf-8", errors="replace").strip()
            parts = first_line_str.split()
            if len(parts) < 3:
                writer.close()
                return
            method = parts[0].upper()
            if method == "CONNECT":
                await self._handle_connect(parts[1], reader, writer)
            else:
                await self._handle_http(method, parts[1], reader, writer, first_line)
        except asyncio.TimeoutError:
            logger.debug("Client timeout")
        except Exception as e:
            logger.debug("Client handler error: %s", e)
        finally:
            try:
                writer.close()
                
LocalProxyServer._handle_http method · python · L134-L198 (65 LOC)
src/hive_mcp/proxy/local_server.py
    async def _handle_http(
        self, method: str, url: str,
        reader: asyncio.StreamReader, writer: asyncio.StreamWriter,
        first_line: bytes,
    ) -> None:
        headers = {}
        content_length = 0
        while True:
            line = await asyncio.wait_for(reader.readline(), timeout=10)
            if line in (b"\r\n", b"\n", b""):
                break
            hdr = line.decode("utf-8", errors="replace").strip()
            if ":" in hdr:
                key, val = hdr.split(":", 1)
                key = key.strip()
                val = val.strip()
                if key.lower() == "content-length":
                    content_length = int(val)
                    if content_length > MAX_REQUEST_BODY:
                        writer.write(b"HTTP/1.1 413 Payload Too Large\r\nConnection: close\r\n\r\n")
                        await writer.drain()
                        return
                elif key.lower() not in ("proxy-connection", "proxy-authorizat
LocalProxyServer._handle_connect method · python · L200-L275 (76 LOC)
src/hive_mcp/proxy/local_server.py
    async def _handle_connect(
        self, target: str,
        reader: asyncio.StreamReader, writer: asyncio.StreamWriter,
    ) -> None:
        while True:
            line = await asyncio.wait_for(reader.readline(), timeout=10)
            if line in (b"\r\n", b"\n", b""):
                break

        if ":" in target:
            host, port_str = target.rsplit(":", 1)
            try:
                port = int(port_str)
            except ValueError:
                writer.write(b"HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\n")
                await writer.drain()
                return
        else:
            host = target
            port = 443

        if _is_blocked_connect_host(host):
            logger.warning("[SECURITY] CONNECT SSRF blocked: %s:%d", host, port)
            writer.write(b"HTTP/1.1 403 Forbidden\r\nConnection: close\r\n\r\n")
            await writer.drain()
            return

        if port not in _ALLOWED_CONNECT_PORTS:
            logger.
LocalProxyServer._proxy_via_director method · python · L277-L304 (28 LOC)
src/hive_mcp/proxy/local_server.py
    async def _proxy_via_director(
        self, url: str, method: str = "GET", headers: dict | None = None,
        *, country_code: str | None = None,
    ) -> dict:
        try:
            options: dict = {"timeout_ms": 15000}
            if country_code:
                options["country"] = country_code.lower()

            async with httpx.AsyncClient(timeout=30, trust_env=False) as client:
                r = await client.post(
                    f"{DIRECTOR_URL}/api/v1/proxy/request",
                    headers={"Authorization": f"Bearer {API_KEY}"},
                    json={
                        "target_url": url,
                        "method": method,
                        "headers": headers or {},
                        "options": options,
                    },
                )
                data = r.json()
                if r.is_success:
                    return data
                else:
                    return {"status_code": r.status_code, "body_tex
lifespan function · python · L31-L45 (15 LOC)
src/hive_mcp/rest_api.py
async def lifespan(app: FastAPI):
    global _proxy_url
    from hive_mcp.proxy.local_server import LocalProxyServer

    proxy = LocalProxyServer()
    await proxy.start()
    if proxy.is_running:
        _proxy_url = f"http://127.0.0.1:{proxy.port}"
        logger.info("REST API started — proxy_url=%s", _proxy_url)
    else:
        _proxy_url = ""
        logger.error("REST API started — 本地代理启动失败,代理工具不可用")
    yield
    await proxy.stop()
    logger.info("REST API shutting down")
search function · python · L66-L76 (11 LOC)
src/hive_mcp/rest_api.py
async def search(
    q: str = Query(..., description="搜索关键词"),
    count: int = Query(10, ge=1, le=50),
):
    """DuckDuckGo 搜索。"""
    from hive_mcp.tools.search import web_search
    try:
        results = await web_search(q, count=count, proxy_url=_proxy_url)
        return {"query": q, "results": results}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
scrape function · python · L80-L89 (10 LOC)
src/hive_mcp/rest_api.py
async def scrape(
    url: str = Query(..., description="目标 URL"),
):
    """通用网页抓取 → Markdown。"""
    from hive_mcp.tools.scrape import scrape_url
    try:
        result = await scrape_url(url, proxy_url=_proxy_url)
        return result
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
yt_video function · python · L95-L102 (8 LOC)
src/hive_mcp/rest_api.py
async def yt_video(
    url: str = Query(..., description="YouTube 视频 URL"),
):
    from hive_mcp.platforms.youtube import get_video_info
    try:
        return await get_video_info(url, proxy_url=_proxy_url)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
If a scraper extracted this row, it came from Repobility (https://repobility.com)
yt_transcript function · python · L106-L115 (10 LOC)
src/hive_mcp/rest_api.py
async def yt_transcript(
    video_id: str = Query(...),
    lang: str = Query("en"),
):
    from hive_mcp.platforms.youtube import get_transcript
    try:
        segments = await get_transcript(video_id, lang=lang, proxy_url=_proxy_url)
        return {"video_id": video_id, "lang": lang, "segments": segments}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
yt_channel function · python · L119-L127 (9 LOC)
src/hive_mcp/rest_api.py
async def yt_channel(
    channel: str = Query(..., description="频道 URL / @handle / UC...ID"),
    count: int = Query(20, ge=1, le=100),
):
    from hive_mcp.platforms.youtube import get_channel_videos
    try:
        return await get_channel_videos(channel, count=count, proxy_url=_proxy_url)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
yt_search function · python · L131-L139 (9 LOC)
src/hive_mcp/rest_api.py
async def yt_search(
    q: str = Query(...),
    count: int = Query(10, ge=1, le=50),
):
    from hive_mcp.platforms.youtube import search_videos
    try:
        return await search_videos(q, count=count, proxy_url=_proxy_url)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
ig_profile function · python · L145-L152 (8 LOC)
src/hive_mcp/rest_api.py
async def ig_profile(
    username: str = Query(...),
):
    from hive_mcp.platforms.instagram import get_profile
    try:
        return await get_profile(username, proxy_url=_proxy_url)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
ig_posts function · python · L156-L164 (9 LOC)
src/hive_mcp/rest_api.py
async def ig_posts(
    username: str = Query(...),
    count: int = Query(20, ge=1, le=50),
):
    from hive_mcp.platforms.instagram import get_user_posts
    try:
        return await get_user_posts(username, count=count, proxy_url=_proxy_url)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
ig_hashtag function · python · L168-L176 (9 LOC)
src/hive_mcp/rest_api.py
async def ig_hashtag(
    tag: str = Query(...),
    count: int = Query(20, ge=1, le=50),
):
    from hive_mcp.platforms.instagram import get_hashtag_posts
    try:
        return await get_hashtag_posts(tag, count=count, proxy_url=_proxy_url)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
ig_post function · python · L180-L187 (8 LOC)
src/hive_mcp/rest_api.py
async def ig_post(
    shortcode: str = Query(..., description="帖子 shortcode 或完整 URL"),
):
    from hive_mcp.platforms.instagram import get_post_detail
    try:
        return await get_post_detail(shortcode, proxy_url=_proxy_url)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
tt_user function · python · L193-L200 (8 LOC)
src/hive_mcp/rest_api.py
async def tt_user(
    username: str = Query(...),
):
    from hive_mcp.platforms.tiktok import get_user_info
    try:
        return await get_user_info(username, proxy_url=_proxy_url)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
Repobility analyzer · published findings · https://repobility.com
tt_user_videos function · python · L204-L212 (9 LOC)
src/hive_mcp/rest_api.py
async def tt_user_videos(
    username: str = Query(...),
    count: int = Query(20, ge=1, le=50),
):
    from hive_mcp.platforms.tiktok import get_user_videos
    try:
        return await get_user_videos(username, count=count, proxy_url=_proxy_url)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
tt_video function · python · L216-L223 (8 LOC)
src/hive_mcp/rest_api.py
async def tt_video(
    video_id: str = Query(...),
):
    from hive_mcp.platforms.tiktok import get_video_detail
    try:
        return await get_video_detail(video_id, proxy_url=_proxy_url)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
tt_comments function · python · L227-L235 (9 LOC)
src/hive_mcp/rest_api.py
async def tt_comments(
    video_id: str = Query(...),
    count: int = Query(50, ge=1, le=100),
):
    from hive_mcp.platforms.tiktok import get_video_comments
    try:
        return await get_video_comments(video_id, count=count, proxy_url=_proxy_url)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
‹ prevpage 2 / 11next ›