Function bodies 529 total
_int_env function · python · L7-L15 (9 LOC)src/hive_mcp/config.py
def _int_env(key: str, default: int) -> int:
"""安全解析整数环境变量"""
val = os.getenv(key, "")
if not val:
return default
try:
return int(val)
except ValueError:
return default_float_env function · python · L18-L26 (9 LOC)src/hive_mcp/config.py
def _float_env(key: str, default: float) -> float:
"""安全解析浮点环境变量"""
val = os.getenv(key, "")
if not val:
return default
try:
return float(val)
except ValueError:
return defaultis_consented function · python · L30-L38 (9 LOC)src/hive_mcp/consent.py
def is_consented() -> bool:
"""检查本地缓存是否已同意协议"""
p = Path(CONSENT_FILE)
if not p.exists():
return False
try:
return json.loads(p.read_text()).get("consented", False)
except Exception:
return Falsecheck_consent_from_server function · python · L41-L59 (19 LOC)src/hive_mcp/consent.py
async def check_consent_from_server() -> bool:
"""从服务器查询用户是否在注册时已同意协议,同意则自动写入本地缓存"""
if not API_KEY:
return False
try:
async with httpx.AsyncClient(timeout=5, trust_env=False) as client:
r = await client.get(
f"{DIRECTOR_URL}/api/v1/user/consent",
headers={"Authorization": f"Bearer {API_KEY}"},
)
if r.is_success:
data = r.json()
if data.get("consented"):
save_consent(True)
logger.info("用户已在网站注册时同意协议,自动激活")
return True
except Exception as e:
logger.debug("查询服务器 consent 失败: %s", e)
return Falsesave_consent function · python · L62-L67 (6 LOC)src/hive_mcp/consent.py
def save_consent(agreed: bool) -> None:
p = Path(CONSENT_FILE)
p.parent.mkdir(parents=True, exist_ok=True)
tmp = p.with_suffix(".tmp")
tmp.write_text(json.dumps({"consented": agreed, "timestamp": datetime.now(timezone.utc).isoformat()}, indent=2))
tmp.replace(p)clean_html function · python · L62-L88 (27 LOC)src/hive_mcp/content_utils.py
def clean_html(raw_html: str) -> str:
"""移除 script/style/广告/噪音标签,返回干净 HTML。(T-02)"""
if not raw_html:
return ""
if _HAS_BS4:
soup = BeautifulSoup(raw_html, "html.parser")
# 移除噪音标签
for tag in soup.find_all(_NOISE_TAGS):
tag.decompose()
# 按 class/id 移除广告类元素
for tag in soup.find_all(True):
cls = " ".join(tag.get("class", []))
tid = tag.get("id", "")
if _NOISE_PATTERN.search(cls) or _NOISE_PATTERN.search(tid):
tag.decompose()
return str(soup)
else:
# Fallback:正则清洗
content = re.sub(r"<script[^>]*>.*?</script>", "", raw_html,
flags=re.DOTALL | re.IGNORECASE)
content = re.sub(r"<style[^>]*>.*?</style>", "", content,
flags=re.DOTALL | re.IGNORECASE)
return contenthtml_to_markdown function · python · L95-L130 (36 LOC)src/hive_mcp/content_utils.py
def html_to_markdown(raw_html: str, base_url: str = "") -> str:
"""将 HTML 转换为 Markdown,优先用 trafilatura 提取正文。(T-01 + T-02)"""
if not raw_html:
return ""
# T-02: 优先使用 trafilatura 提取正文 (Readability 算法)
if _HAS_TRAFILATURA:
try:
extracted = trafilatura.extract(
raw_html, url=base_url,
include_links=True, include_formatting=True,
output_format="txt",
favor_precision=True,
)
if extracted and len(extracted) > 100:
return extracted.strip()
except Exception:
pass # fallback to html2text
# Fallback: html2text (全页转换)
cleaned = clean_html(raw_html)
if _HAS_HTML2TEXT:
h = _html2text.HTML2Text()
h.ignore_links = False
h.ignore_images = True # 减少干扰
h.ignore_emphasis = False
h.body_width = 0 # 不强制换行
h.skip_internal_links = True
h.inline_links =Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
_html_to_text_fallback function · python · L139-L145 (7 LOC)src/hive_mcp/content_utils.py
def _html_to_text_fallback(html_content: str) -> str:
"""Fallback:基础正则去标签。"""
text = re.sub(r"<[^>]+>", " ", html_content)
text = html.unescape(text)
text = re.sub(r"[ \t]+", " ", text)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()format_as_json function · python · L152-L158 (7 LOC)src/hive_mcp/content_utils.py
def format_as_json(raw_body: str) -> str:
"""尝试解析 + 美化 JSON,失败则原样返回。"""
try:
obj = json.loads(raw_body)
return json.dumps(obj, ensure_ascii=False, indent=2)
except (json.JSONDecodeError, TypeError):
return raw_bodybuild_search_url function · python · L165-L187 (23 LOC)src/hive_mcp/content_utils.py
def build_search_url(query: str, engine: str = "duckduckgo",
country_code: Optional[str] = None) -> str:
"""
DEPRECATED: Use hive_mcp.tools.search.web_search() instead.
构建搜索 URL。
"""
q = quote_plus(query)
if engine == "bing":
url = f"https://www.bing.com/search?q={q}&count=20&setlang=zh-CN"
if country_code:
url += f"&cc={country_code.upper()}"
return url
elif engine == "google":
url = f"https://www.google.com/search?q={q}&num=20&hl=zh-CN"
if country_code:
url += f"&gl={country_code.lower()}"
return url
else: # duckduckgo (默认,无需 API Key)
url = f"https://html.duckduckgo.com/html/?q={q}"
if country_code:
url += f"&kl={country_code.lower()}-{country_code.lower()}"
return urlparse_search_results function · python · L190-L210 (21 LOC)src/hive_mcp/content_utils.py
def parse_search_results(raw_html: str, engine: str = "duckduckgo",
max_results: int = 10) -> list[dict]:
"""
DEPRECATED: Use hive_mcp.tools.search.web_search() instead.
解析搜索结果页面,返回结构化列表。
"""
if not _HAS_BS4:
return []
soup = BeautifulSoup(raw_html, "html.parser")
results = []
if engine == "duckduckgo":
results = _parse_ddg(soup, max_results)
elif engine == "bing":
results = _parse_bing(soup, max_results)
elif engine == "google":
results = _parse_google(soup, max_results)
return results_parse_ddg function · python · L213-L245 (33 LOC)src/hive_mcp/content_utils.py
def _parse_ddg(soup: "BeautifulSoup", max_results: int) -> list[dict]:
"""DEPRECATED: Use hive_mcp.tools.search.web_search() instead."""
results = []
# DDG HTML endpoint 的结果结构
for item in soup.select(".result, .web-result")[:max_results * 2]:
title_el = item.select_one(".result__title a, .result__a, h2 a")
snippet_el = item.select_one(".result__snippet, .result__body")
if not title_el:
continue
title = title_el.get_text(strip=True)
href = title_el.get("href", "")
# DDG 通过 uddg 参数传递真实 URL
if "//duckduckgo.com/l/" in href or href.startswith("/l/"):
from urllib.parse import urlparse, parse_qs
try:
params = parse_qs(urlparse(href).query)
href = params.get("uddg", [href])[0]
except Exception:
pass
if href.startswith("//"):
href = "https:" + href
snippet = snippet_el.get_text(strip=True) if sni_parse_bing function · python · L248-L261 (14 LOC)src/hive_mcp/content_utils.py
def _parse_bing(soup: "BeautifulSoup", max_results: int) -> list[dict]:
"""DEPRECATED: Use hive_mcp.tools.search.web_search() instead."""
results = []
for item in soup.select("li.b_algo")[:max_results]:
title_el = item.select_one("h2 a")
snippet_el = item.select_one(".b_caption p, .b_snippet, p")
if not title_el:
continue
title = title_el.get_text(strip=True)
url = title_el.get("href", "")
snippet = snippet_el.get_text(strip=True) if snippet_el else ""
if title and url:
results.append({"title": title, "url": url, "snippet": snippet})
return results_parse_google function · python · L264-L287 (24 LOC)src/hive_mcp/content_utils.py
def _parse_google(soup: "BeautifulSoup", max_results: int) -> list[dict]:
"""DEPRECATED: Use hive_mcp.tools.search.web_search() instead."""
results = []
for item in soup.select("div.g, div[data-sokoban-container]")[:max_results * 2]:
title_el = item.select_one("h3")
link_el = item.select_one("a[href]")
snippet_el = item.select_one("div.VwiC3b, span.aCOpRe, div[data-sncf]")
if not (title_el and link_el):
continue
title = title_el.get_text(strip=True)
url = link_el.get("href", "")
if url.startswith("/url?"):
from urllib.parse import urlparse, parse_qs
try:
params = parse_qs(urlparse(url).query)
url = params.get("q", [url])[0]
except Exception:
pass
snippet = snippet_el.get_text(strip=True) if snippet_el else ""
if title and url and url.startswith("http"):
results.append({"title": title, "url": url, format_search_results_md function · python · L290-L310 (21 LOC)src/hive_mcp/content_utils.py
def format_search_results_md(results: list[dict], query: str, engine: str) -> str:
"""将搜索结果格式化为 Markdown 字符串。"""
if not results:
return (
f"❌ 未找到 `{query}` 的搜索结果。\n"
f"建议:检查关键词拼写,或换用其他搜索引擎(bing/google/duckduckgo)。"
)
lines = [
f"## 🔍 搜索结果",
f"**关键词:** `{query}` | **引擎:** `{engine}` | **共 {len(results)} 条**",
"",
"---",
]
for i, r in enumerate(results, 1):
lines.append(f"\n### {i}. {r['title']}")
lines.append(f"🔗 <{r['url']}>")
if r.get("snippet"):
lines.append(f"\n> {r['snippet']}")
return "\n".join(lines)Repobility — same analyzer, your code, free for public repos · /scan/
truncate function · python · L317-L337 (21 LOC)src/hive_mcp/content_utils.py
def truncate(content: str, max_chars: int, start_index: int = 0) -> tuple[str, bool, int]:
"""
截断内容,返回 (内容, 是否被截断, 下一次 start_index)。
start_index: 从第 N 个字符开始返回 (用于分页读取长文档)
"""
if not content:
return content, False, 0
# 应用 start_index 偏移
if start_index > 0:
content = content[start_index:]
if len(content) <= max_chars:
return content, False, 0
# 尽量在段落边界截断
cutoff = content.rfind("\n\n", 0, max_chars)
if cutoff < max_chars * 0.8:
cutoff = max_chars
next_index = start_index + cutoff
return content[:cutoff], True, next_indexdependency_status function · python · L340-L348 (9 LOC)src/hive_mcp/content_utils.py
def dependency_status() -> dict:
"""返回软依赖安装状态(用于调试)。"""
return {
"html2text": _HAS_HTML2TEXT,
"beautifulsoup4": _HAS_BS4,
"trafilatura": _HAS_TRAFILATURA,
"markdown_conversion": _HAS_HTML2TEXT or _HAS_TRAFILATURA,
"search_parsing": _HAS_BS4,
}_fetch_html function · python · L24-L30 (7 LOC)src/hive_mcp/platforms/ecommerce.py
async def _fetch_html(url: str, proxy_url: Optional[str] = None, headers: Optional[dict] = None) -> str:
"""通过代理获取 HTML。"""
h = {**_HEADERS, **(headers or {})}
async with httpx.AsyncClient(proxy=proxy_url, timeout=30.0, follow_redirects=True) as client:
resp = await client.get(url, headers=h)
resp.raise_for_status()
return resp.textamazon_search function · python · L33-L72 (40 LOC)src/hive_mcp/platforms/ecommerce.py
async def amazon_search(
query: str,
country: str = "us", # us, uk, de, jp, etc.
count: int = 10,
proxy_url: Optional[str] = None,
) -> list[dict]:
"""搜索 Amazon 商品。"""
try:
from bs4 import BeautifulSoup
except ImportError:
return [{"error": "beautifulsoup4 未安装"}]
domains = {"us": "www.amazon.com", "uk": "www.amazon.co.uk", "de": "www.amazon.de", "jp": "www.amazon.co.jp", "cn": "www.amazon.cn"}
domain = domains.get(country, "www.amazon.com")
url = f"https://{domain}/s?k={quote_plus(query)}"
html = await _fetch_html(url, proxy_url)
soup = BeautifulSoup(html, "html.parser")
products = []
for item in soup.select('[data-component-type="s-search-result"]')[:count]:
title_el = item.select_one("h2 a span") or item.select_one("h2 span")
price_el = item.select_one(".a-price .a-offscreen")
rating_el = item.select_one(".a-icon-alt")
reviews_el = item.select_one('[aria-label*="stars"] + span'amazon_product_detail function · python · L75-L103 (29 LOC)src/hive_mcp/platforms/ecommerce.py
async def amazon_product_detail(
url: str,
proxy_url: Optional[str] = None,
) -> dict:
"""获取 Amazon 商品详情。"""
try:
from bs4 import BeautifulSoup
except ImportError:
return {"error": "beautifulsoup4 未安装"}
html = await _fetch_html(url, proxy_url)
soup = BeautifulSoup(html, "html.parser")
title = soup.select_one("#productTitle")
price = soup.select_one(".a-price .a-offscreen") or soup.select_one("#priceblock_ourprice")
rating = soup.select_one("#acrPopover .a-icon-alt")
reviews = soup.select_one("#acrCustomerReviewText")
description = soup.select_one("#productDescription")
features = soup.select("#feature-bullets li span.a-list-item")
return {
"title": title.get_text(strip=True) if title else None,
"price": price.get_text(strip=True) if price else None,
"rating": rating.get_text(strip=True) if rating else None,
"reviews_count": reviews.get_text(strip=True) if reviews else None,
amazon_reviews function · python · L106-L141 (36 LOC)src/hive_mcp/platforms/ecommerce.py
async def amazon_reviews(
asin: str,
country: str = "us",
count: int = 10,
proxy_url: Optional[str] = None,
) -> list[dict]:
"""获取 Amazon 商品评论。"""
try:
from bs4 import BeautifulSoup
except ImportError:
return [{"error": "beautifulsoup4 未安装"}]
domains = {"us": "www.amazon.com", "uk": "www.amazon.co.uk", "de": "www.amazon.de", "jp": "www.amazon.co.jp"}
domain = domains.get(country, "www.amazon.com")
url = f"https://{domain}/product-reviews/{asin}?sortBy=recent"
html = await _fetch_html(url, proxy_url)
soup = BeautifulSoup(html, "html.parser")
reviews = []
for review in soup.select('[data-hook="review"]')[:count]:
title_el = review.select_one('[data-hook="review-title"] span:last-child')
body_el = review.select_one('[data-hook="review-body"] span')
rating_el = review.select_one('[data-hook="review-star-rating"] .a-icon-alt')
author_el = review.select_one('.a-profile-name')
da_get_loader function · python · L34-L57 (24 LOC)src/hive_mcp/platforms/instagram.py
def _get_loader() -> instaloader.Instaloader:
global _loader_instance
if _loader_instance is None:
_loader_instance = instaloader.Instaloader(
download_pictures=False,
download_video_thumbnails=False,
download_geotags=False,
download_comments=False, # 评论需要登录
save_metadata=False,
compress_json=False,
quiet=True,
request_timeout=30,
max_connection_attempts=3,
sleep=True, # instaloader 自带随机延迟
)
# T-04: 加载 cookie session 提升稳定性
session_file = config.INSTAGRAM_SESSION_FILE
if session_file:
try:
_loader_instance.load_session_from_file(username=None, filename=session_file)
logger.info("Instagram session 已加载: %s", session_file)
except Exception as e:
logger.warning("Instagram session 加载失败 (将使用匿名模式): %s", e)
return _loader_instance_patch_proxy function · python · L60-L71 (12 LOC)src/hive_mcp/platforms/instagram.py
def _patch_proxy(
loader: instaloader.Instaloader,
proxy_url: str,
country_code: Optional[str] = None,
) -> None:
"""将代理和地理定向注入 instaloader 的 requests.Session。"""
loader.context._session.proxies = {
"http": proxy_url,
"https": proxy_url,
}
if country_code:
loader.context._session.headers["X-Hive-Country"] = country_code.lower()Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
_retry_on_fail function · python · L80-L92 (13 LOC)src/hive_mcp/platforms/instagram.py
def _retry_on_fail(func, max_retries: int = 2):
"""指数退避重试包装器。"""
last_err = None
for attempt in range(max_retries + 1):
try:
return func()
except Exception as e:
last_err = e
if attempt < max_retries:
wait = (2 ** attempt) + random.uniform(0, 1)
logger.warning("Instagram 请求失败 (第%d次), %0.1fs 后重试: %s", attempt + 1, wait, e)
time.sleep(wait)
raise last_err_fmt_profile function · python · L103-L116 (14 LOC)src/hive_mcp/platforms/instagram.py
def _fmt_profile(p: instaloader.Profile) -> dict:
return {
"username": p.username,
"full_name": p.full_name,
"biography": p.biography,
"external_url": p.external_url,
"followers": p.followers,
"followees": p.followees,
"post_count": p.mediacount,
"is_verified": p.is_verified,
"is_private": p.is_private,
"profile_pic_url": p.profile_pic_url,
"profile_url": f"https://www.instagram.com/{p.username}/",
}_fmt_post function · python · L119-L135 (17 LOC)src/hive_mcp/platforms/instagram.py
def _fmt_post(post: instaloader.Post) -> dict:
return {
"shortcode": post.shortcode,
"url": f"https://www.instagram.com/p/{post.shortcode}/",
"type": "video" if post.is_video else "image",
"caption": (post.caption or "")[:300],
"hashtags": list(post.caption_hashtags),
"mentions": list(post.caption_mentions),
"likes": post.likes,
"comments_count": post.comments,
"timestamp": post.date_utc.isoformat() if post.date_utc else None,
"location": post.location.name if post.location else None,
"display_url": post.url,
"video_url": post.video_url if post.is_video else None,
"duration_s": post.video_duration if post.is_video else None,
"owner_username": post.owner_username,
}get_profile function · python · L141-L158 (18 LOC)src/hive_mcp/platforms/instagram.py
async def get_profile(
username: str,
proxy_url: Optional[str] = None,
country_code: Optional[str] = None,
) -> dict:
"""
获取 Instagram 公开用户资料。
无需登录,仅限公开账号(is_private=False)。
"""
def _fetch():
loader = _get_loader()
if proxy_url:
_patch_proxy(loader, proxy_url, country_code=country_code)
_random_delay()
profile = instaloader.Profile.from_username(loader.context, username)
return _fmt_profile(profile)
return await _run_sync(lambda: _retry_on_fail(_fetch))get_user_posts function · python · L161-L190 (30 LOC)src/hive_mcp/platforms/instagram.py
async def get_user_posts(
username: str,
count: int = 20,
proxy_url: Optional[str] = None,
country_code: Optional[str] = None,
) -> list[dict]:
"""
获取用户最新的公开帖子列表。
最多返回 INSTAGRAM_MAX_POSTS 条(默认50),避免速率限制。
"""
count = min(count, config.INSTAGRAM_MAX_POSTS)
def _fetch():
loader = _get_loader()
if proxy_url:
_patch_proxy(loader, proxy_url, country_code=country_code)
_random_delay()
profile = instaloader.Profile.from_username(loader.context, username)
if profile.is_private:
return {"error": f"@{username} 是私密账号,无法在未登录状态下获取帖子"}
posts = []
for i, post in enumerate(profile.get_posts()):
if i >= count:
break
posts.append(_fmt_post(post))
if i > 0 and i % 5 == 0:
_random_delay(1.0) # 每5条额外延迟,避免触发速率限制
return posts
return await _run_sync(lambda: _retry_on_fail(_fetch))get_hashtag_posts function · python · L193-L220 (28 LOC)src/hive_mcp/platforms/instagram.py
async def get_hashtag_posts(
hashtag: str,
count: int = 20,
proxy_url: Optional[str] = None,
country_code: Optional[str] = None,
) -> list[dict]:
"""
获取 Instagram 话题标签下的公开帖子。
hashtag 不含 # 号,例如 "travel" 或 "旅行"
"""
count = min(count, config.INSTAGRAM_MAX_POSTS)
def _fetch():
loader = _get_loader()
if proxy_url:
_patch_proxy(loader, proxy_url, country_code=country_code)
_random_delay()
tag = instaloader.Hashtag.from_name(loader.context, hashtag.lstrip("#"))
posts = []
for i, post in enumerate(tag.get_posts()):
if i >= count:
break
posts.append(_fmt_post(post))
if i > 0 and i % 5 == 0:
_random_delay(1.0)
return posts
return await _run_sync(lambda: _retry_on_fail(_fetch))get_post_detail function · python · L223-L247 (25 LOC)src/hive_mcp/platforms/instagram.py
async def get_post_detail(
shortcode_or_url: str,
proxy_url: Optional[str] = None,
country_code: Optional[str] = None,
) -> dict:
"""
获取单条 Instagram 帖子的详细信息(通过 shortcode 或完整 URL)。
"""
# 提取 shortcode
if "instagram.com/p/" in shortcode_or_url:
import re
m = re.search(r"/p/([A-Za-z0-9_-]+)", shortcode_or_url)
shortcode = m.group(1) if m else shortcode_or_url
else:
shortcode = shortcode_or_url
def _fetch():
loader = _get_loader()
if proxy_url:
_patch_proxy(loader, proxy_url, country_code=country_code)
_random_delay()
post = instaloader.Post.from_shortcode(loader.context, shortcode)
return _fmt_post(post)
return await _run_sync(lambda: _retry_on_fail(_fetch))get_user_reels function · python · L250-L284 (35 LOC)src/hive_mcp/platforms/instagram.py
async def get_user_reels(
username: str,
count: int = 20,
proxy_url: Optional[str] = None,
country_code: Optional[str] = None,
) -> list[dict]:
"""
获取用户最新的 Reels 视频列表 (is_video + product_type='clips')。
"""
count = min(count, config.INSTAGRAM_MAX_POSTS)
def _fetch():
loader = _get_loader()
if proxy_url:
_patch_proxy(loader, proxy_url, country_code=country_code)
_random_delay()
profile = instaloader.Profile.from_username(loader.context, username)
if profile.is_private:
return {"error": f"@{username} 是私密账号"}
reels = []
for i, post in enumerate(profile.get_posts()):
if i >= count * 3: # 扫描更多帖子以找到足够 Reels
break
if post.is_video and post.typename == "GraphVideo":
reels.append({
**_fmt_post(post),
"is_reel": True,
})
if len(reels) >= count:
Source: Repobility analyzer · https://repobility.com
_headers function · python · L28-L38 (11 LOC)src/hive_mcp/platforms/linkedin.py
def _headers(cookie: Optional[str] = None) -> dict:
"""构建 LinkedIn 请求头。"""
h = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
}
c = cookie or LINKEDIN_COOKIE
if c:
h["Cookie"] = c if "li_at" in c else f"li_at={c}"
return h_fetch function · python · L41-L52 (12 LOC)src/hive_mcp/platforms/linkedin.py
async def _fetch(
url: str,
proxy_url: Optional[str] = None,
cookie: Optional[str] = None,
) -> str:
"""通过住宅代理获取 LinkedIn 页面。"""
async with httpx.AsyncClient(
proxy=proxy_url, timeout=30.0, follow_redirects=True
) as client:
resp = await client.get(url, headers=_headers(cookie))
resp.raise_for_status()
return resp.textget_profile function · python · L55-L119 (65 LOC)src/hive_mcp/platforms/linkedin.py
async def get_profile(
username: str,
proxy_url: Optional[str] = None,
cookie: Optional[str] = None,
) -> dict:
"""获取 LinkedIn 用户公开资料。"""
try:
from bs4 import BeautifulSoup
except ImportError:
return {"error": "beautifulsoup4 未安装", "status": "error"}
url = f"https://www.linkedin.com/in/{username}/"
try:
html = await _fetch(url, proxy_url, cookie)
except httpx.HTTPStatusError as e:
if e.response.status_code == 999:
return {
"error": "LinkedIn 封锁请求 (999)。需要有效的 LINKEDIN_COOKIE 和住宅 IP",
"status": "blocked",
}
raise
soup = BeautifulSoup(html, "html.parser")
# Try to extract from JSON-LD
json_ld = soup.find("script", {"type": "application/ld+json"})
if json_ld:
try:
data = json.loads(json_ld.string)
image = data.get("image")
if isinstance(image, dict):
image = image.get("contentUrl")
get_company function · python · L122-L171 (50 LOC)src/hive_mcp/platforms/linkedin.py
async def get_company(
company_slug: str,
proxy_url: Optional[str] = None,
cookie: Optional[str] = None,
) -> dict:
"""获取 LinkedIn 公司页面信息。"""
try:
from bs4 import BeautifulSoup
except ImportError:
return {"error": "beautifulsoup4 未安装", "status": "error"}
url = f"https://www.linkedin.com/company/{company_slug}/"
html = await _fetch(url, proxy_url, cookie)
soup = BeautifulSoup(html, "html.parser")
# Try JSON-LD
json_ld = soup.find("script", {"type": "application/ld+json"})
if json_ld:
try:
data = json.loads(json_ld.string)
employee_count = data.get("numberOfEmployees")
if isinstance(employee_count, dict):
employee_count = employee_count.get("value")
return {
"company": company_slug,
"name": data.get("name"),
"description": (data.get("description") or "")[:500],
"url": f"https://www.linkedisearch_jobs function · python · L174-L210 (37 LOC)src/hive_mcp/platforms/linkedin.py
async def search_jobs(
query: str,
location: Optional[str] = None,
count: int = 10,
proxy_url: Optional[str] = None,
cookie: Optional[str] = None,
) -> list[dict]:
"""搜索 LinkedIn 职位。"""
try:
from bs4 import BeautifulSoup
except ImportError:
return [{"error": "beautifulsoup4 未安装"}]
url = f"https://www.linkedin.com/jobs/search/?keywords={quote_plus(query)}"
if location:
url += f"&location={quote_plus(location)}"
html = await _fetch(url, proxy_url, cookie)
soup = BeautifulSoup(html, "html.parser")
jobs = []
for card in soup.select(".base-card, .job-search-card")[:count]:
title = card.select_one(".base-search-card__title, h3")
company = card.select_one(".base-search-card__subtitle, h4")
loc = card.select_one(".job-search-card__location")
link = card.select_one("a")
date = card.select_one("time")
jobs.append({
"title": title.get_text(strip=True) if _fetch_json function · python · L20-L36 (17 LOC)src/hive_mcp/platforms/reddit.py
async def _fetch_json(url: str, proxy_url: Optional[str] = None) -> dict | list:
"""Fetch Reddit JSON endpoint with rate-limit awareness."""
if not url.endswith('.json'):
url = url.rstrip('/') + '.json'
async with httpx.AsyncClient(proxy=proxy_url, timeout=30.0, follow_redirects=True) as client:
resp = await client.get(url, headers=_HEADERS)
resp.raise_for_status()
# Check rate-limit headers
remaining = resp.headers.get('x-ratelimit-remaining')
reset = resp.headers.get('x-ratelimit-reset')
if remaining:
logger.debug(f"Reddit rate-limit remaining: {remaining}")
if remaining and int(remaining) < 10:
logger.warning(f"Reddit rate-limit nearly exhausted: {remaining} requests remaining, reset at {reset}")
return resp.json()get_subreddit_posts function · python · L39-L67 (29 LOC)src/hive_mcp/platforms/reddit.py
async def get_subreddit_posts(
subreddit: str,
sort: str = "hot", # hot, new, top, rising
count: int = 20,
time_filter: str = "day", # hour, day, week, month, year, all (for sort=top)
proxy_url: Optional[str] = None,
) -> list[dict]:
"""获取 subreddit 帖子列表。"""
url = f"https://www.reddit.com/r/{subreddit}/{sort}.json?limit={min(count, 100)}"
if sort == "top":
url += f"&t={time_filter}"
data = await _fetch_json(url, proxy_url)
posts = []
for child in data.get("data", {}).get("children", [])[:count]:
p = child.get("data", {})
posts.append({
"title": p.get("title"),
"author": p.get("author"),
"score": p.get("score"),
"upvote_ratio": p.get("upvote_ratio"),
"num_comments": p.get("num_comments"),
"url": p.get("url"),
"permalink": f"https://reddit.com{p.get('permalink', '')}",
"selftext": (p.get("selftext") or "")[:500],
get_post_comments function · python · L70-L110 (41 LOC)src/hive_mcp/platforms/reddit.py
async def get_post_comments(
post_url: str,
count: int = 20,
sort: str = "best", # best, top, new, controversial
proxy_url: Optional[str] = None,
) -> dict:
"""获取帖子详情和评论。"""
# Ensure proper URL format
if "reddit.com" in post_url:
url = post_url.rstrip('/') + '.json?sort=' + sort
else:
url = f"https://www.reddit.com{post_url}.json?sort={sort}"
data = await _fetch_json(url, proxy_url)
# First element is the post, second is comments
post_data = data[0]["data"]["children"][0]["data"] if len(data) > 0 else {}
comments_data = data[1]["data"]["children"] if len(data) > 1 else []
comments = []
for child in comments_data[:count]:
if child.get("kind") != "t1":
continue
c = child.get("data", {})
comments.append({
"author": c.get("author"),
"body": (c.get("body") or "")[:500],
"score": c.get("score"),
"created_utc": c.get("created_utc"),
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
search_reddit function · python · L113-L141 (29 LOC)src/hive_mcp/platforms/reddit.py
async def search_reddit(
query: str,
subreddit: Optional[str] = None,
sort: str = "relevance", # relevance, hot, top, new, comments
count: int = 20,
time_filter: str = "all",
proxy_url: Optional[str] = None,
) -> list[dict]:
"""搜索 Reddit 帖子。"""
base = f"https://www.reddit.com/r/{subreddit}" if subreddit else "https://www.reddit.com"
url = f"{base}/search.json?q={query}&sort={sort}&limit={min(count, 100)}&t={time_filter}"
if subreddit:
url += "&restrict_sr=on"
data = await _fetch_json(url, proxy_url)
results = []
for child in data.get("data", {}).get("children", [])[:count]:
p = child.get("data", {})
results.append({
"title": p.get("title"),
"author": p.get("author"),
"subreddit": p.get("subreddit"),
"score": p.get("score"),
"num_comments": p.get("num_comments"),
"permalink": f"https://reddit.com{p.get('permalink', '')}",
"selfCircuitBreaker.__init__ method · python · L46-L55 (10 LOC)src/hive_mcp/platforms/tiktok.py
def __init__(
self,
failure_threshold: int = config.TIKTOK_CB_FAILURE_THRESHOLD,
recovery_s: int = config.TIKTOK_CB_RECOVERY_S,
):
self._threshold = failure_threshold
self._recovery_s = recovery_s
self._failures = 0
self._state = CBState.CLOSED
self._opened_at: float = 0CircuitBreaker.record_failure method · python · L68-L76 (9 LOC)src/hive_mcp/platforms/tiktok.py
def record_failure(self) -> None:
self._failures += 1
if self._failures >= self._threshold:
self._state = CBState.OPEN
self._opened_at = time.monotonic()
logger.warning(
"TikTok 断路器已触发(%d 次失败),暂停 %ds",
self._failures, self._recovery_s,
)CircuitBreaker.allow_request method · python · L78-L84 (7 LOC)src/hive_mcp/platforms/tiktok.py
def allow_request(self) -> bool:
s = self.state
if s == CBState.CLOSED:
return True
if s == CBState.HALF_OPEN:
return True # 允许一次试探
return False # OPEN: 拒绝CircuitBreaker.status method · python · L86-L95 (10 LOC)src/hive_mcp/platforms/tiktok.py
def status(self) -> dict:
s = self.state
remaining = 0
if s == CBState.OPEN:
remaining = max(0, self._recovery_s - int(time.monotonic() - self._opened_at))
return {
"state": s.value,
"failures": self._failures,
"recovery_remaining_s": remaining,
}_get_api function · python · L129-L167 (39 LOC)src/hive_mcp/platforms/tiktok.py
async def _get_api(proxy_url: Optional[str] = None):
"""获取 TikTokApi 实例(懒加载,单例;proxy_url 变更时自动重建)。"""
global _api_instance, _api_proxy_url
try:
from TikTokApi import TikTokApi # pip install TikTokApi
except ImportError:
raise RuntimeError("TikTokApi 未安装,请运行: pip install TikTokApi")
async with _api_lock:
# proxy_url 变更时,关闭旧实例并重建
if _api_instance is not None and proxy_url == _api_proxy_url:
return _api_instance
if _api_instance is not None:
logger.info("proxy_url 变更 (%s → %s),重建 TikTokApi session", _api_proxy_url, proxy_url)
try:
await _api_instance.close_sessions()
except Exception:
pass
_api_instance = None
ms_token = config.TIKTOK_MS_TOKEN
if not ms_token:
raise RuntimeError("TIKTOK_MS_TOKEN 未配置,请从 tiktok.com Cookie 中提取")
proxy_provider = _SimpleProxyProvider(proxy_url) if proxy_url else None
_reset_api function · python · L170-L181 (12 LOC)src/hive_mcp/platforms/tiktok.py
async def _reset_api() -> None:
"""断路器恢复后重置 API session。"""
global _api_instance, _api_proxy_url
async with _api_lock:
if _api_instance:
try:
await _api_instance.close_sessions()
except Exception:
pass
_api_instance = None
_api_proxy_url = None
logger.info("TikTokApi session 已重置")_execute function · python · L187-L205 (19 LOC)src/hive_mcp/platforms/tiktok.py
async def _execute(coro_factory, operation: str) -> Any:
"""断路器包装器:记录成功/失败,自动触发断路。"""
if not _cb.allow_request():
status = _cb.status()
raise RuntimeError(
f"TikTok 断路器已开启({status['failures']} 次连续失败),"
f"将在 {status['recovery_remaining_s']}s 后自动恢复"
)
try:
result = await coro_factory()
_cb.record_success()
return result
except Exception as e:
_cb.record_failure()
if _cb.state in (CBState.OPEN, CBState.HALF_OPEN):
await _reset_api()
logger.error("TikTok %s 失败: %s", operation, e)
raiseRepobility — same analyzer, your code, free for public repos · /scan/
_ytdlp_fallback_user_info function · python · L210-L240 (31 LOC)src/hive_mcp/platforms/tiktok.py
async def _ytdlp_fallback_user_info(username: str, proxy_url: Optional[str] = None) -> dict:
"""yt-dlp fallback: 获取 TikTok 用户信息。"""
try:
import yt_dlp
except ImportError:
raise RuntimeError("yt-dlp 未安装,无法使用 fallback")
opts = {
"quiet": True, "no_warnings": True, "skip_download": True,
"extract_flat": True,
}
if proxy_url:
opts["proxy"] = proxy_url
def _extract():
url = f"https://www.tiktok.com/@{username}"
with yt_dlp.YoutubeDL(opts) as ydl:
info = ydl.extract_info(url, download=False)
return {
"unique_id": username,
"nickname": info.get("uploader") or info.get("channel") or username,
"follower_count": info.get("follower_count"),
"video_count": info.get("playlist_count"),
"profile_url": f"https://www.tiktok.com/@{username}",
"_source": "yt-dlp_fallback",
}
loop = asy_ytdlp_fallback_video_detail function · python · L243-L276 (34 LOC)src/hive_mcp/platforms/tiktok.py
async def _ytdlp_fallback_video_detail(video_id: str, proxy_url: Optional[str] = None) -> dict:
"""yt-dlp fallback: 获取 TikTok 视频详情。"""
try:
import yt_dlp
except ImportError:
raise RuntimeError("yt-dlp 未安装")
opts = {
"quiet": True, "no_warnings": True, "skip_download": True,
}
if proxy_url:
opts["proxy"] = proxy_url
def _extract():
url = f"https://www.tiktok.com/video/{video_id}"
with yt_dlp.YoutubeDL(opts) as ydl:
info = ydl.extract_info(url, download=False)
return {
"video_id": video_id,
"desc": (info.get("description") or "")[:300],
"create_time": info.get("timestamp"),
"play_count": info.get("view_count"),
"like_count": info.get("like_count"),
"comment_count": info.get("comment_count"),
"author_unique_id": info.get("uploader_id") or info.get("uploader"),
"d_ytdlp_fallback_user_videos function · python · L279-L312 (34 LOC)src/hive_mcp/platforms/tiktok.py
async def _ytdlp_fallback_user_videos(username: str, count: int = 20, proxy_url: Optional[str] = None) -> list[dict]:
"""yt-dlp fallback: 获取用户视频列表。"""
try:
import yt_dlp
except ImportError:
raise RuntimeError("yt-dlp 未安装")
opts = {
"quiet": True, "no_warnings": True, "skip_download": True,
"extract_flat": "in_playlist", "playlistend": count,
}
if proxy_url:
opts["proxy"] = proxy_url
def _extract():
url = f"https://www.tiktok.com/@{username}"
with yt_dlp.YoutubeDL(opts) as ydl:
info = ydl.extract_info(url, download=False)
entries = info.get("entries") or []
return [
{
"video_id": e.get("id"),
"desc": (e.get("title") or "")[:300],
"play_count": e.get("view_count"),
"url": e.get("url") or e.get("webpage_url") or f"https://www.tiktok.com/video/{e.get('id')}",
page 1 / 11next ›