← back to dreamer09__agentxspace

Function bodies 529 total

All specs Real LLM only Function bodies
_int_env function · python · L7-L15 (9 LOC)
src/hive_mcp/config.py
def _int_env(key: str, default: int) -> int:
    """安全解析整数环境变量"""
    val = os.getenv(key, "")
    if not val:
        return default
    try:
        return int(val)
    except ValueError:
        return default
_float_env function · python · L18-L26 (9 LOC)
src/hive_mcp/config.py
def _float_env(key: str, default: float) -> float:
    """安全解析浮点环境变量"""
    val = os.getenv(key, "")
    if not val:
        return default
    try:
        return float(val)
    except ValueError:
        return default
is_consented function · python · L30-L38 (9 LOC)
src/hive_mcp/consent.py
def is_consented() -> bool:
    """检查本地缓存是否已同意协议"""
    p = Path(CONSENT_FILE)
    if not p.exists():
        return False
    try:
        return json.loads(p.read_text()).get("consented", False)
    except Exception:
        return False
check_consent_from_server function · python · L41-L59 (19 LOC)
src/hive_mcp/consent.py
async def check_consent_from_server() -> bool:
    """从服务器查询用户是否在注册时已同意协议,同意则自动写入本地缓存"""
    if not API_KEY:
        return False
    try:
        async with httpx.AsyncClient(timeout=5, trust_env=False) as client:
            r = await client.get(
                f"{DIRECTOR_URL}/api/v1/user/consent",
                headers={"Authorization": f"Bearer {API_KEY}"},
            )
            if r.is_success:
                data = r.json()
                if data.get("consented"):
                    save_consent(True)
                    logger.info("用户已在网站注册时同意协议,自动激活")
                    return True
    except Exception as e:
        logger.debug("查询服务器 consent 失败: %s", e)
    return False
save_consent function · python · L62-L67 (6 LOC)
src/hive_mcp/consent.py
def save_consent(agreed: bool) -> None:
    p = Path(CONSENT_FILE)
    p.parent.mkdir(parents=True, exist_ok=True)
    tmp = p.with_suffix(".tmp")
    tmp.write_text(json.dumps({"consented": agreed, "timestamp": datetime.now(timezone.utc).isoformat()}, indent=2))
    tmp.replace(p)
clean_html function · python · L62-L88 (27 LOC)
src/hive_mcp/content_utils.py
def clean_html(raw_html: str) -> str:
    """移除 script/style/广告/噪音标签,返回干净 HTML。(T-02)"""
    if not raw_html:
        return ""

    if _HAS_BS4:
        soup = BeautifulSoup(raw_html, "html.parser")

        # 移除噪音标签
        for tag in soup.find_all(_NOISE_TAGS):
            tag.decompose()

        # 按 class/id 移除广告类元素
        for tag in soup.find_all(True):
            cls = " ".join(tag.get("class", []))
            tid = tag.get("id", "")
            if _NOISE_PATTERN.search(cls) or _NOISE_PATTERN.search(tid):
                tag.decompose()

        return str(soup)
    else:
        # Fallback:正则清洗
        content = re.sub(r"<script[^>]*>.*?</script>", "", raw_html,
                         flags=re.DOTALL | re.IGNORECASE)
        content = re.sub(r"<style[^>]*>.*?</style>", "", content,
                         flags=re.DOTALL | re.IGNORECASE)
        return content
html_to_markdown function · python · L95-L130 (36 LOC)
src/hive_mcp/content_utils.py
def html_to_markdown(raw_html: str, base_url: str = "") -> str:
    """将 HTML 转换为 Markdown,优先用 trafilatura 提取正文。(T-01 + T-02)"""
    if not raw_html:
        return ""

    # T-02: 优先使用 trafilatura 提取正文 (Readability 算法)
    if _HAS_TRAFILATURA:
        try:
            extracted = trafilatura.extract(
                raw_html, url=base_url,
                include_links=True, include_formatting=True,
                output_format="txt",
                favor_precision=True,
            )
            if extracted and len(extracted) > 100:
                return extracted.strip()
        except Exception:
            pass  # fallback to html2text

    # Fallback: html2text (全页转换)
    cleaned = clean_html(raw_html)

    if _HAS_HTML2TEXT:
        h = _html2text.HTML2Text()
        h.ignore_links = False
        h.ignore_images = True         # 减少干扰
        h.ignore_emphasis = False
        h.body_width = 0               # 不强制换行
        h.skip_internal_links = True
        h.inline_links =
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
_html_to_text_fallback function · python · L139-L145 (7 LOC)
src/hive_mcp/content_utils.py
def _html_to_text_fallback(html_content: str) -> str:
    """Fallback:基础正则去标签。"""
    text = re.sub(r"<[^>]+>", " ", html_content)
    text = html.unescape(text)
    text = re.sub(r"[ \t]+", " ", text)
    text = re.sub(r"\n{3,}", "\n\n", text)
    return text.strip()
format_as_json function · python · L152-L158 (7 LOC)
src/hive_mcp/content_utils.py
def format_as_json(raw_body: str) -> str:
    """尝试解析 + 美化 JSON,失败则原样返回。"""
    try:
        obj = json.loads(raw_body)
        return json.dumps(obj, ensure_ascii=False, indent=2)
    except (json.JSONDecodeError, TypeError):
        return raw_body
build_search_url function · python · L165-L187 (23 LOC)
src/hive_mcp/content_utils.py
def build_search_url(query: str, engine: str = "duckduckgo",
                     country_code: Optional[str] = None) -> str:
    """
    DEPRECATED: Use hive_mcp.tools.search.web_search() instead.

    构建搜索 URL。
    """
    q = quote_plus(query)
    if engine == "bing":
        url = f"https://www.bing.com/search?q={q}&count=20&setlang=zh-CN"
        if country_code:
            url += f"&cc={country_code.upper()}"
        return url
    elif engine == "google":
        url = f"https://www.google.com/search?q={q}&num=20&hl=zh-CN"
        if country_code:
            url += f"&gl={country_code.lower()}"
        return url
    else:  # duckduckgo (默认,无需 API Key)
        url = f"https://html.duckduckgo.com/html/?q={q}"
        if country_code:
            url += f"&kl={country_code.lower()}-{country_code.lower()}"
        return url
parse_search_results function · python · L190-L210 (21 LOC)
src/hive_mcp/content_utils.py
def parse_search_results(raw_html: str, engine: str = "duckduckgo",
                         max_results: int = 10) -> list[dict]:
    """
    DEPRECATED: Use hive_mcp.tools.search.web_search() instead.

    解析搜索结果页面,返回结构化列表。
    """
    if not _HAS_BS4:
        return []

    soup = BeautifulSoup(raw_html, "html.parser")
    results = []

    if engine == "duckduckgo":
        results = _parse_ddg(soup, max_results)
    elif engine == "bing":
        results = _parse_bing(soup, max_results)
    elif engine == "google":
        results = _parse_google(soup, max_results)

    return results
_parse_ddg function · python · L213-L245 (33 LOC)
src/hive_mcp/content_utils.py
def _parse_ddg(soup: "BeautifulSoup", max_results: int) -> list[dict]:
    """DEPRECATED: Use hive_mcp.tools.search.web_search() instead."""
    results = []
    # DDG HTML endpoint 的结果结构
    for item in soup.select(".result, .web-result")[:max_results * 2]:
        title_el = item.select_one(".result__title a, .result__a, h2 a")
        snippet_el = item.select_one(".result__snippet, .result__body")

        if not title_el:
            continue

        title = title_el.get_text(strip=True)
        href = title_el.get("href", "")

        # DDG 通过 uddg 参数传递真实 URL
        if "//duckduckgo.com/l/" in href or href.startswith("/l/"):
            from urllib.parse import urlparse, parse_qs
            try:
                params = parse_qs(urlparse(href).query)
                href = params.get("uddg", [href])[0]
            except Exception:
                pass

        if href.startswith("//"):
            href = "https:" + href

        snippet = snippet_el.get_text(strip=True) if sni
_parse_bing function · python · L248-L261 (14 LOC)
src/hive_mcp/content_utils.py
def _parse_bing(soup: "BeautifulSoup", max_results: int) -> list[dict]:
    """DEPRECATED: Use hive_mcp.tools.search.web_search() instead."""
    results = []
    for item in soup.select("li.b_algo")[:max_results]:
        title_el = item.select_one("h2 a")
        snippet_el = item.select_one(".b_caption p, .b_snippet, p")
        if not title_el:
            continue
        title = title_el.get_text(strip=True)
        url = title_el.get("href", "")
        snippet = snippet_el.get_text(strip=True) if snippet_el else ""
        if title and url:
            results.append({"title": title, "url": url, "snippet": snippet})
    return results
_parse_google function · python · L264-L287 (24 LOC)
src/hive_mcp/content_utils.py
def _parse_google(soup: "BeautifulSoup", max_results: int) -> list[dict]:
    """DEPRECATED: Use hive_mcp.tools.search.web_search() instead."""
    results = []
    for item in soup.select("div.g, div[data-sokoban-container]")[:max_results * 2]:
        title_el = item.select_one("h3")
        link_el = item.select_one("a[href]")
        snippet_el = item.select_one("div.VwiC3b, span.aCOpRe, div[data-sncf]")
        if not (title_el and link_el):
            continue
        title = title_el.get_text(strip=True)
        url = link_el.get("href", "")
        if url.startswith("/url?"):
            from urllib.parse import urlparse, parse_qs
            try:
                params = parse_qs(urlparse(url).query)
                url = params.get("q", [url])[0]
            except Exception:
                pass
        snippet = snippet_el.get_text(strip=True) if snippet_el else ""
        if title and url and url.startswith("http"):
            results.append({"title": title, "url": url, 
format_search_results_md function · python · L290-L310 (21 LOC)
src/hive_mcp/content_utils.py
def format_search_results_md(results: list[dict], query: str, engine: str) -> str:
    """将搜索结果格式化为 Markdown 字符串。"""
    if not results:
        return (
            f"❌ 未找到 `{query}` 的搜索结果。\n"
            f"建议:检查关键词拼写,或换用其他搜索引擎(bing/google/duckduckgo)。"
        )

    lines = [
        f"## 🔍 搜索结果",
        f"**关键词:** `{query}`  |  **引擎:** `{engine}`  |  **共 {len(results)} 条**",
        "",
        "---",
    ]
    for i, r in enumerate(results, 1):
        lines.append(f"\n### {i}. {r['title']}")
        lines.append(f"🔗 <{r['url']}>")
        if r.get("snippet"):
            lines.append(f"\n> {r['snippet']}")

    return "\n".join(lines)
Repobility — same analyzer, your code, free for public repos · /scan/
truncate function · python · L317-L337 (21 LOC)
src/hive_mcp/content_utils.py
def truncate(content: str, max_chars: int, start_index: int = 0) -> tuple[str, bool, int]:
    """
    截断内容,返回 (内容, 是否被截断, 下一次 start_index)。
    start_index: 从第 N 个字符开始返回 (用于分页读取长文档)
    """
    if not content:
        return content, False, 0

    # 应用 start_index 偏移
    if start_index > 0:
        content = content[start_index:]

    if len(content) <= max_chars:
        return content, False, 0

    # 尽量在段落边界截断
    cutoff = content.rfind("\n\n", 0, max_chars)
    if cutoff < max_chars * 0.8:
        cutoff = max_chars
    next_index = start_index + cutoff
    return content[:cutoff], True, next_index
dependency_status function · python · L340-L348 (9 LOC)
src/hive_mcp/content_utils.py
def dependency_status() -> dict:
    """返回软依赖安装状态(用于调试)。"""
    return {
        "html2text": _HAS_HTML2TEXT,
        "beautifulsoup4": _HAS_BS4,
        "trafilatura": _HAS_TRAFILATURA,
        "markdown_conversion": _HAS_HTML2TEXT or _HAS_TRAFILATURA,
        "search_parsing": _HAS_BS4,
    }
_fetch_html function · python · L24-L30 (7 LOC)
src/hive_mcp/platforms/ecommerce.py
async def _fetch_html(url: str, proxy_url: Optional[str] = None, headers: Optional[dict] = None) -> str:
    """通过代理获取 HTML。"""
    h = {**_HEADERS, **(headers or {})}
    async with httpx.AsyncClient(proxy=proxy_url, timeout=30.0, follow_redirects=True) as client:
        resp = await client.get(url, headers=h)
        resp.raise_for_status()
        return resp.text
amazon_search function · python · L33-L72 (40 LOC)
src/hive_mcp/platforms/ecommerce.py
async def amazon_search(
    query: str,
    country: str = "us",  # us, uk, de, jp, etc.
    count: int = 10,
    proxy_url: Optional[str] = None,
) -> list[dict]:
    """搜索 Amazon 商品。"""
    try:
        from bs4 import BeautifulSoup
    except ImportError:
        return [{"error": "beautifulsoup4 未安装"}]

    domains = {"us": "www.amazon.com", "uk": "www.amazon.co.uk", "de": "www.amazon.de", "jp": "www.amazon.co.jp", "cn": "www.amazon.cn"}
    domain = domains.get(country, "www.amazon.com")
    url = f"https://{domain}/s?k={quote_plus(query)}"

    html = await _fetch_html(url, proxy_url)
    soup = BeautifulSoup(html, "html.parser")

    products = []
    for item in soup.select('[data-component-type="s-search-result"]')[:count]:
        title_el = item.select_one("h2 a span") or item.select_one("h2 span")
        price_el = item.select_one(".a-price .a-offscreen")
        rating_el = item.select_one(".a-icon-alt")
        reviews_el = item.select_one('[aria-label*="stars"] + span'
amazon_product_detail function · python · L75-L103 (29 LOC)
src/hive_mcp/platforms/ecommerce.py
async def amazon_product_detail(
    url: str,
    proxy_url: Optional[str] = None,
) -> dict:
    """获取 Amazon 商品详情。"""
    try:
        from bs4 import BeautifulSoup
    except ImportError:
        return {"error": "beautifulsoup4 未安装"}

    html = await _fetch_html(url, proxy_url)
    soup = BeautifulSoup(html, "html.parser")

    title = soup.select_one("#productTitle")
    price = soup.select_one(".a-price .a-offscreen") or soup.select_one("#priceblock_ourprice")
    rating = soup.select_one("#acrPopover .a-icon-alt")
    reviews = soup.select_one("#acrCustomerReviewText")
    description = soup.select_one("#productDescription")
    features = soup.select("#feature-bullets li span.a-list-item")

    return {
        "title": title.get_text(strip=True) if title else None,
        "price": price.get_text(strip=True) if price else None,
        "rating": rating.get_text(strip=True) if rating else None,
        "reviews_count": reviews.get_text(strip=True) if reviews else None,
      
amazon_reviews function · python · L106-L141 (36 LOC)
src/hive_mcp/platforms/ecommerce.py
async def amazon_reviews(
    asin: str,
    country: str = "us",
    count: int = 10,
    proxy_url: Optional[str] = None,
) -> list[dict]:
    """获取 Amazon 商品评论。"""
    try:
        from bs4 import BeautifulSoup
    except ImportError:
        return [{"error": "beautifulsoup4 未安装"}]

    domains = {"us": "www.amazon.com", "uk": "www.amazon.co.uk", "de": "www.amazon.de", "jp": "www.amazon.co.jp"}
    domain = domains.get(country, "www.amazon.com")
    url = f"https://{domain}/product-reviews/{asin}?sortBy=recent"

    html = await _fetch_html(url, proxy_url)
    soup = BeautifulSoup(html, "html.parser")

    reviews = []
    for review in soup.select('[data-hook="review"]')[:count]:
        title_el = review.select_one('[data-hook="review-title"] span:last-child')
        body_el = review.select_one('[data-hook="review-body"] span')
        rating_el = review.select_one('[data-hook="review-star-rating"] .a-icon-alt')
        author_el = review.select_one('.a-profile-name')
        da
_get_loader function · python · L34-L57 (24 LOC)
src/hive_mcp/platforms/instagram.py
def _get_loader() -> instaloader.Instaloader:
    global _loader_instance
    if _loader_instance is None:
        _loader_instance = instaloader.Instaloader(
            download_pictures=False,
            download_video_thumbnails=False,
            download_geotags=False,
            download_comments=False,       # 评论需要登录
            save_metadata=False,
            compress_json=False,
            quiet=True,
            request_timeout=30,
            max_connection_attempts=3,
            sleep=True,                    # instaloader 自带随机延迟
        )
        # T-04: 加载 cookie session 提升稳定性
        session_file = config.INSTAGRAM_SESSION_FILE
        if session_file:
            try:
                _loader_instance.load_session_from_file(username=None, filename=session_file)
                logger.info("Instagram session 已加载: %s", session_file)
            except Exception as e:
                logger.warning("Instagram session 加载失败 (将使用匿名模式): %s", e)
    return _loader_instance
_patch_proxy function · python · L60-L71 (12 LOC)
src/hive_mcp/platforms/instagram.py
def _patch_proxy(
    loader: instaloader.Instaloader,
    proxy_url: str,
    country_code: Optional[str] = None,
) -> None:
    """将代理和地理定向注入 instaloader 的 requests.Session。"""
    loader.context._session.proxies = {
        "http": proxy_url,
        "https": proxy_url,
    }
    if country_code:
        loader.context._session.headers["X-Hive-Country"] = country_code.lower()
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
_retry_on_fail function · python · L80-L92 (13 LOC)
src/hive_mcp/platforms/instagram.py
def _retry_on_fail(func, max_retries: int = 2):
    """指数退避重试包装器。"""
    last_err = None
    for attempt in range(max_retries + 1):
        try:
            return func()
        except Exception as e:
            last_err = e
            if attempt < max_retries:
                wait = (2 ** attempt) + random.uniform(0, 1)
                logger.warning("Instagram 请求失败 (第%d次), %0.1fs 后重试: %s", attempt + 1, wait, e)
                time.sleep(wait)
    raise last_err
_fmt_profile function · python · L103-L116 (14 LOC)
src/hive_mcp/platforms/instagram.py
def _fmt_profile(p: instaloader.Profile) -> dict:
    return {
        "username": p.username,
        "full_name": p.full_name,
        "biography": p.biography,
        "external_url": p.external_url,
        "followers": p.followers,
        "followees": p.followees,
        "post_count": p.mediacount,
        "is_verified": p.is_verified,
        "is_private": p.is_private,
        "profile_pic_url": p.profile_pic_url,
        "profile_url": f"https://www.instagram.com/{p.username}/",
    }
_fmt_post function · python · L119-L135 (17 LOC)
src/hive_mcp/platforms/instagram.py
def _fmt_post(post: instaloader.Post) -> dict:
    return {
        "shortcode": post.shortcode,
        "url": f"https://www.instagram.com/p/{post.shortcode}/",
        "type": "video" if post.is_video else "image",
        "caption": (post.caption or "")[:300],
        "hashtags": list(post.caption_hashtags),
        "mentions": list(post.caption_mentions),
        "likes": post.likes,
        "comments_count": post.comments,
        "timestamp": post.date_utc.isoformat() if post.date_utc else None,
        "location": post.location.name if post.location else None,
        "display_url": post.url,
        "video_url": post.video_url if post.is_video else None,
        "duration_s": post.video_duration if post.is_video else None,
        "owner_username": post.owner_username,
    }
get_profile function · python · L141-L158 (18 LOC)
src/hive_mcp/platforms/instagram.py
async def get_profile(
    username: str,
    proxy_url: Optional[str] = None,
    country_code: Optional[str] = None,
) -> dict:
    """
    获取 Instagram 公开用户资料。
    无需登录,仅限公开账号(is_private=False)。
    """
    def _fetch():
        loader = _get_loader()
        if proxy_url:
            _patch_proxy(loader, proxy_url, country_code=country_code)
        _random_delay()
        profile = instaloader.Profile.from_username(loader.context, username)
        return _fmt_profile(profile)

    return await _run_sync(lambda: _retry_on_fail(_fetch))
get_user_posts function · python · L161-L190 (30 LOC)
src/hive_mcp/platforms/instagram.py
async def get_user_posts(
    username: str,
    count: int = 20,
    proxy_url: Optional[str] = None,
    country_code: Optional[str] = None,
) -> list[dict]:
    """
    获取用户最新的公开帖子列表。
    最多返回 INSTAGRAM_MAX_POSTS 条(默认50),避免速率限制。
    """
    count = min(count, config.INSTAGRAM_MAX_POSTS)

    def _fetch():
        loader = _get_loader()
        if proxy_url:
            _patch_proxy(loader, proxy_url, country_code=country_code)
        _random_delay()
        profile = instaloader.Profile.from_username(loader.context, username)
        if profile.is_private:
            return {"error": f"@{username} 是私密账号,无法在未登录状态下获取帖子"}
        posts = []
        for i, post in enumerate(profile.get_posts()):
            if i >= count:
                break
            posts.append(_fmt_post(post))
            if i > 0 and i % 5 == 0:
                _random_delay(1.0)  # 每5条额外延迟,避免触发速率限制
        return posts

    return await _run_sync(lambda: _retry_on_fail(_fetch))
get_hashtag_posts function · python · L193-L220 (28 LOC)
src/hive_mcp/platforms/instagram.py
async def get_hashtag_posts(
    hashtag: str,
    count: int = 20,
    proxy_url: Optional[str] = None,
    country_code: Optional[str] = None,
) -> list[dict]:
    """
    获取 Instagram 话题标签下的公开帖子。
    hashtag 不含 # 号,例如 "travel" 或 "旅行"
    """
    count = min(count, config.INSTAGRAM_MAX_POSTS)

    def _fetch():
        loader = _get_loader()
        if proxy_url:
            _patch_proxy(loader, proxy_url, country_code=country_code)
        _random_delay()
        tag = instaloader.Hashtag.from_name(loader.context, hashtag.lstrip("#"))
        posts = []
        for i, post in enumerate(tag.get_posts()):
            if i >= count:
                break
            posts.append(_fmt_post(post))
            if i > 0 and i % 5 == 0:
                _random_delay(1.0)
        return posts

    return await _run_sync(lambda: _retry_on_fail(_fetch))
get_post_detail function · python · L223-L247 (25 LOC)
src/hive_mcp/platforms/instagram.py
async def get_post_detail(
    shortcode_or_url: str,
    proxy_url: Optional[str] = None,
    country_code: Optional[str] = None,
) -> dict:
    """
    获取单条 Instagram 帖子的详细信息(通过 shortcode 或完整 URL)。
    """
    # 提取 shortcode
    if "instagram.com/p/" in shortcode_or_url:
        import re
        m = re.search(r"/p/([A-Za-z0-9_-]+)", shortcode_or_url)
        shortcode = m.group(1) if m else shortcode_or_url
    else:
        shortcode = shortcode_or_url

    def _fetch():
        loader = _get_loader()
        if proxy_url:
            _patch_proxy(loader, proxy_url, country_code=country_code)
        _random_delay()
        post = instaloader.Post.from_shortcode(loader.context, shortcode)
        return _fmt_post(post)

    return await _run_sync(lambda: _retry_on_fail(_fetch))
get_user_reels function · python · L250-L284 (35 LOC)
src/hive_mcp/platforms/instagram.py
async def get_user_reels(
    username: str,
    count: int = 20,
    proxy_url: Optional[str] = None,
    country_code: Optional[str] = None,
) -> list[dict]:
    """
    获取用户最新的 Reels 视频列表 (is_video + product_type='clips')。
    """
    count = min(count, config.INSTAGRAM_MAX_POSTS)

    def _fetch():
        loader = _get_loader()
        if proxy_url:
            _patch_proxy(loader, proxy_url, country_code=country_code)
        _random_delay()
        profile = instaloader.Profile.from_username(loader.context, username)
        if profile.is_private:
            return {"error": f"@{username} 是私密账号"}
        reels = []
        for i, post in enumerate(profile.get_posts()):
            if i >= count * 3:  # 扫描更多帖子以找到足够 Reels
                break
            if post.is_video and post.typename == "GraphVideo":
                reels.append({
                    **_fmt_post(post),
                    "is_reel": True,
                })
                if len(reels) >= count:
          
Source: Repobility analyzer · https://repobility.com
_headers function · python · L28-L38 (11 LOC)
src/hive_mcp/platforms/linkedin.py
def _headers(cookie: Optional[str] = None) -> dict:
    """构建 LinkedIn 请求头。"""
    h = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.9",
    }
    c = cookie or LINKEDIN_COOKIE
    if c:
        h["Cookie"] = c if "li_at" in c else f"li_at={c}"
    return h
_fetch function · python · L41-L52 (12 LOC)
src/hive_mcp/platforms/linkedin.py
async def _fetch(
    url: str,
    proxy_url: Optional[str] = None,
    cookie: Optional[str] = None,
) -> str:
    """通过住宅代理获取 LinkedIn 页面。"""
    async with httpx.AsyncClient(
        proxy=proxy_url, timeout=30.0, follow_redirects=True
    ) as client:
        resp = await client.get(url, headers=_headers(cookie))
        resp.raise_for_status()
        return resp.text
get_profile function · python · L55-L119 (65 LOC)
src/hive_mcp/platforms/linkedin.py
async def get_profile(
    username: str,
    proxy_url: Optional[str] = None,
    cookie: Optional[str] = None,
) -> dict:
    """获取 LinkedIn 用户公开资料。"""
    try:
        from bs4 import BeautifulSoup
    except ImportError:
        return {"error": "beautifulsoup4 未安装", "status": "error"}

    url = f"https://www.linkedin.com/in/{username}/"
    try:
        html = await _fetch(url, proxy_url, cookie)
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 999:
            return {
                "error": "LinkedIn 封锁请求 (999)。需要有效的 LINKEDIN_COOKIE 和住宅 IP",
                "status": "blocked",
            }
        raise

    soup = BeautifulSoup(html, "html.parser")

    # Try to extract from JSON-LD
    json_ld = soup.find("script", {"type": "application/ld+json"})
    if json_ld:
        try:
            data = json.loads(json_ld.string)
            image = data.get("image")
            if isinstance(image, dict):
                image = image.get("contentUrl")
get_company function · python · L122-L171 (50 LOC)
src/hive_mcp/platforms/linkedin.py
async def get_company(
    company_slug: str,
    proxy_url: Optional[str] = None,
    cookie: Optional[str] = None,
) -> dict:
    """获取 LinkedIn 公司页面信息。"""
    try:
        from bs4 import BeautifulSoup
    except ImportError:
        return {"error": "beautifulsoup4 未安装", "status": "error"}

    url = f"https://www.linkedin.com/company/{company_slug}/"
    html = await _fetch(url, proxy_url, cookie)
    soup = BeautifulSoup(html, "html.parser")

    # Try JSON-LD
    json_ld = soup.find("script", {"type": "application/ld+json"})
    if json_ld:
        try:
            data = json.loads(json_ld.string)
            employee_count = data.get("numberOfEmployees")
            if isinstance(employee_count, dict):
                employee_count = employee_count.get("value")
            return {
                "company": company_slug,
                "name": data.get("name"),
                "description": (data.get("description") or "")[:500],
                "url": f"https://www.linkedi
search_jobs function · python · L174-L210 (37 LOC)
src/hive_mcp/platforms/linkedin.py
async def search_jobs(
    query: str,
    location: Optional[str] = None,
    count: int = 10,
    proxy_url: Optional[str] = None,
    cookie: Optional[str] = None,
) -> list[dict]:
    """搜索 LinkedIn 职位。"""
    try:
        from bs4 import BeautifulSoup
    except ImportError:
        return [{"error": "beautifulsoup4 未安装"}]

    url = f"https://www.linkedin.com/jobs/search/?keywords={quote_plus(query)}"
    if location:
        url += f"&location={quote_plus(location)}"

    html = await _fetch(url, proxy_url, cookie)
    soup = BeautifulSoup(html, "html.parser")

    jobs = []
    for card in soup.select(".base-card, .job-search-card")[:count]:
        title = card.select_one(".base-search-card__title, h3")
        company = card.select_one(".base-search-card__subtitle, h4")
        loc = card.select_one(".job-search-card__location")
        link = card.select_one("a")
        date = card.select_one("time")

        jobs.append({
            "title": title.get_text(strip=True) if 
_fetch_json function · python · L20-L36 (17 LOC)
src/hive_mcp/platforms/reddit.py
async def _fetch_json(url: str, proxy_url: Optional[str] = None) -> dict | list:
    """Fetch Reddit JSON endpoint with rate-limit awareness."""
    if not url.endswith('.json'):
        url = url.rstrip('/') + '.json'
    async with httpx.AsyncClient(proxy=proxy_url, timeout=30.0, follow_redirects=True) as client:
        resp = await client.get(url, headers=_HEADERS)
        resp.raise_for_status()
        
        # Check rate-limit headers
        remaining = resp.headers.get('x-ratelimit-remaining')
        reset = resp.headers.get('x-ratelimit-reset')
        if remaining:
            logger.debug(f"Reddit rate-limit remaining: {remaining}")
        if remaining and int(remaining) < 10:
            logger.warning(f"Reddit rate-limit nearly exhausted: {remaining} requests remaining, reset at {reset}")
        
        return resp.json()
get_subreddit_posts function · python · L39-L67 (29 LOC)
src/hive_mcp/platforms/reddit.py
async def get_subreddit_posts(
    subreddit: str,
    sort: str = "hot",  # hot, new, top, rising
    count: int = 20,
    time_filter: str = "day",  # hour, day, week, month, year, all (for sort=top)
    proxy_url: Optional[str] = None,
) -> list[dict]:
    """获取 subreddit 帖子列表。"""
    url = f"https://www.reddit.com/r/{subreddit}/{sort}.json?limit={min(count, 100)}"
    if sort == "top":
        url += f"&t={time_filter}"

    data = await _fetch_json(url, proxy_url)
    posts = []
    for child in data.get("data", {}).get("children", [])[:count]:
        p = child.get("data", {})
        posts.append({
            "title": p.get("title"),
            "author": p.get("author"),
            "score": p.get("score"),
            "upvote_ratio": p.get("upvote_ratio"),
            "num_comments": p.get("num_comments"),
            "url": p.get("url"),
            "permalink": f"https://reddit.com{p.get('permalink', '')}",
            "selftext": (p.get("selftext") or "")[:500],
          
get_post_comments function · python · L70-L110 (41 LOC)
src/hive_mcp/platforms/reddit.py
async def get_post_comments(
    post_url: str,
    count: int = 20,
    sort: str = "best",  # best, top, new, controversial
    proxy_url: Optional[str] = None,
) -> dict:
    """获取帖子详情和评论。"""
    # Ensure proper URL format
    if "reddit.com" in post_url:
        url = post_url.rstrip('/') + '.json?sort=' + sort
    else:
        url = f"https://www.reddit.com{post_url}.json?sort={sort}"

    data = await _fetch_json(url, proxy_url)

    # First element is the post, second is comments
    post_data = data[0]["data"]["children"][0]["data"] if len(data) > 0 else {}
    comments_data = data[1]["data"]["children"] if len(data) > 1 else []

    comments = []
    for child in comments_data[:count]:
        if child.get("kind") != "t1":
            continue
        c = child.get("data", {})
        comments.append({
            "author": c.get("author"),
            "body": (c.get("body") or "")[:500],
            "score": c.get("score"),
            "created_utc": c.get("created_utc"),
  
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
search_reddit function · python · L113-L141 (29 LOC)
src/hive_mcp/platforms/reddit.py
async def search_reddit(
    query: str,
    subreddit: Optional[str] = None,
    sort: str = "relevance",  # relevance, hot, top, new, comments
    count: int = 20,
    time_filter: str = "all",
    proxy_url: Optional[str] = None,
) -> list[dict]:
    """搜索 Reddit 帖子。"""
    base = f"https://www.reddit.com/r/{subreddit}" if subreddit else "https://www.reddit.com"
    url = f"{base}/search.json?q={query}&sort={sort}&limit={min(count, 100)}&t={time_filter}"
    if subreddit:
        url += "&restrict_sr=on"

    data = await _fetch_json(url, proxy_url)
    results = []
    for child in data.get("data", {}).get("children", [])[:count]:
        p = child.get("data", {})
        results.append({
            "title": p.get("title"),
            "author": p.get("author"),
            "subreddit": p.get("subreddit"),
            "score": p.get("score"),
            "num_comments": p.get("num_comments"),
            "permalink": f"https://reddit.com{p.get('permalink', '')}",
            "self
CircuitBreaker.__init__ method · python · L46-L55 (10 LOC)
src/hive_mcp/platforms/tiktok.py
    def __init__(
        self,
        failure_threshold: int = config.TIKTOK_CB_FAILURE_THRESHOLD,
        recovery_s: int = config.TIKTOK_CB_RECOVERY_S,
    ):
        self._threshold = failure_threshold
        self._recovery_s = recovery_s
        self._failures = 0
        self._state = CBState.CLOSED
        self._opened_at: float = 0
CircuitBreaker.record_failure method · python · L68-L76 (9 LOC)
src/hive_mcp/platforms/tiktok.py
    def record_failure(self) -> None:
        self._failures += 1
        if self._failures >= self._threshold:
            self._state = CBState.OPEN
            self._opened_at = time.monotonic()
            logger.warning(
                "TikTok 断路器已触发(%d 次失败),暂停 %ds",
                self._failures, self._recovery_s,
            )
CircuitBreaker.allow_request method · python · L78-L84 (7 LOC)
src/hive_mcp/platforms/tiktok.py
    def allow_request(self) -> bool:
        s = self.state
        if s == CBState.CLOSED:
            return True
        if s == CBState.HALF_OPEN:
            return True   # 允许一次试探
        return False      # OPEN: 拒绝
CircuitBreaker.status method · python · L86-L95 (10 LOC)
src/hive_mcp/platforms/tiktok.py
    def status(self) -> dict:
        s = self.state
        remaining = 0
        if s == CBState.OPEN:
            remaining = max(0, self._recovery_s - int(time.monotonic() - self._opened_at))
        return {
            "state": s.value,
            "failures": self._failures,
            "recovery_remaining_s": remaining,
        }
_get_api function · python · L129-L167 (39 LOC)
src/hive_mcp/platforms/tiktok.py
async def _get_api(proxy_url: Optional[str] = None):
    """获取 TikTokApi 实例(懒加载,单例;proxy_url 变更时自动重建)。"""
    global _api_instance, _api_proxy_url

    try:
        from TikTokApi import TikTokApi  # pip install TikTokApi
    except ImportError:
        raise RuntimeError("TikTokApi 未安装,请运行: pip install TikTokApi")

    async with _api_lock:
        # proxy_url 变更时,关闭旧实例并重建
        if _api_instance is not None and proxy_url == _api_proxy_url:
            return _api_instance
        if _api_instance is not None:
            logger.info("proxy_url 变更 (%s → %s),重建 TikTokApi session", _api_proxy_url, proxy_url)
            try:
                await _api_instance.close_sessions()
            except Exception:
                pass
            _api_instance = None

        ms_token = config.TIKTOK_MS_TOKEN
        if not ms_token:
            raise RuntimeError("TIKTOK_MS_TOKEN 未配置,请从 tiktok.com Cookie 中提取")

        proxy_provider = _SimpleProxyProvider(proxy_url) if proxy_url else None

 
_reset_api function · python · L170-L181 (12 LOC)
src/hive_mcp/platforms/tiktok.py
async def _reset_api() -> None:
    """断路器恢复后重置 API session。"""
    global _api_instance, _api_proxy_url
    async with _api_lock:
        if _api_instance:
            try:
                await _api_instance.close_sessions()
            except Exception:
                pass
        _api_instance = None
        _api_proxy_url = None
        logger.info("TikTokApi session 已重置")
_execute function · python · L187-L205 (19 LOC)
src/hive_mcp/platforms/tiktok.py
async def _execute(coro_factory, operation: str) -> Any:
    """断路器包装器:记录成功/失败,自动触发断路。"""
    if not _cb.allow_request():
        status = _cb.status()
        raise RuntimeError(
            f"TikTok 断路器已开启({status['failures']} 次连续失败),"
            f"将在 {status['recovery_remaining_s']}s 后自动恢复"
        )

    try:
        result = await coro_factory()
        _cb.record_success()
        return result
    except Exception as e:
        _cb.record_failure()
        if _cb.state in (CBState.OPEN, CBState.HALF_OPEN):
            await _reset_api()
        logger.error("TikTok %s 失败: %s", operation, e)
        raise
Repobility — same analyzer, your code, free for public repos · /scan/
_ytdlp_fallback_user_info function · python · L210-L240 (31 LOC)
src/hive_mcp/platforms/tiktok.py
async def _ytdlp_fallback_user_info(username: str, proxy_url: Optional[str] = None) -> dict:
    """yt-dlp fallback: 获取 TikTok 用户信息。"""
    try:
        import yt_dlp
    except ImportError:
        raise RuntimeError("yt-dlp 未安装,无法使用 fallback")

    opts = {
        "quiet": True, "no_warnings": True, "skip_download": True,
        "extract_flat": True,
    }
    if proxy_url:
        opts["proxy"] = proxy_url

    def _extract():
        url = f"https://www.tiktok.com/@{username}"
        with yt_dlp.YoutubeDL(opts) as ydl:
            info = ydl.extract_info(url, download=False)
            return {
                "unique_id": username,
                "nickname": info.get("uploader") or info.get("channel") or username,
                "follower_count": info.get("follower_count"),
                "video_count": info.get("playlist_count"),
                "profile_url": f"https://www.tiktok.com/@{username}",
                "_source": "yt-dlp_fallback",
            }

    loop = asy
_ytdlp_fallback_video_detail function · python · L243-L276 (34 LOC)
src/hive_mcp/platforms/tiktok.py
async def _ytdlp_fallback_video_detail(video_id: str, proxy_url: Optional[str] = None) -> dict:
    """yt-dlp fallback: 获取 TikTok 视频详情。"""
    try:
        import yt_dlp
    except ImportError:
        raise RuntimeError("yt-dlp 未安装")

    opts = {
        "quiet": True, "no_warnings": True, "skip_download": True,
    }
    if proxy_url:
        opts["proxy"] = proxy_url

    def _extract():
        url = f"https://www.tiktok.com/video/{video_id}"
        with yt_dlp.YoutubeDL(opts) as ydl:
            info = ydl.extract_info(url, download=False)
            return {
                "video_id": video_id,
                "desc": (info.get("description") or "")[:300],
                "create_time": info.get("timestamp"),
                "play_count": info.get("view_count"),
                "like_count": info.get("like_count"),
                "comment_count": info.get("comment_count"),
                "author_unique_id": info.get("uploader_id") or info.get("uploader"),
                "d
_ytdlp_fallback_user_videos function · python · L279-L312 (34 LOC)
src/hive_mcp/platforms/tiktok.py
async def _ytdlp_fallback_user_videos(username: str, count: int = 20, proxy_url: Optional[str] = None) -> list[dict]:
    """yt-dlp fallback: 获取用户视频列表。"""
    try:
        import yt_dlp
    except ImportError:
        raise RuntimeError("yt-dlp 未安装")

    opts = {
        "quiet": True, "no_warnings": True, "skip_download": True,
        "extract_flat": "in_playlist", "playlistend": count,
    }
    if proxy_url:
        opts["proxy"] = proxy_url

    def _extract():
        url = f"https://www.tiktok.com/@{username}"
        with yt_dlp.YoutubeDL(opts) as ydl:
            info = ydl.extract_info(url, download=False)
            entries = info.get("entries") or []
            return [
                {
                    "video_id": e.get("id"),
                    "desc": (e.get("title") or "")[:300],
                    "play_count": e.get("view_count"),
                    "url": e.get("url") or e.get("webpage_url") or f"https://www.tiktok.com/video/{e.get('id')}",
                
page 1 / 11next ›