Function bodies 40 total
generate_product_js function · python · L21-L49 (29 LOC)scraper/generator.py
def generate_product_js(products: list[MasterProduct]) -> str:
"""Generate the JavaScript `const P = [...]` array from product data."""
lines = []
lines.append("const P=[")
# Group by category
current_cat = None
for p in products:
if p.category != current_cat:
current_cat = p.category
lines.append(f"// ===== {current_cat.upper()} =====")
# Build price dict
prices = {}
for sid in SHOP_IDS:
prices[sid] = p.prices.get(sid, 0)
# Escape product name for JS string
name_escaped = p.name.replace("\\", "\\\\").replace('"', '\\"')
price_parts = ",".join(f"{sid}:{prices[sid]}" for sid in SHOP_IDS)
line = (
f'{{c:"{p.category}",n:"{name_escaped}",'
f'r:{p.retail_price},p:{{{price_parts}}}}}'
)
lines.append(line + ",")
lines.append("];")
return "\n".join(lines)generate_html function · python · L52-L90 (39 LOC)scraper/generator.py
def generate_html(
products: list[MasterProduct],
template_path: Path | None = None,
output_path: Path | None = None,
) -> str:
"""Generate index.html from template and product data.
Args:
products: List of master products with prices filled in.
template_path: Path to template.html (default: project root/template.html)
output_path: Path to write index.html (default: project root/index.html)
Returns:
The generated HTML content.
"""
project_root = Path(__file__).resolve().parent.parent
if template_path is None:
template_path = project_root / "template.html"
if output_path is None:
output_path = project_root / "index.html"
template = template_path.read_text(encoding="utf-8")
# Generate product data JS
product_js = generate_product_js(products)
# Generate update date in JST
now = datetime.now(JST)
update_date = now.strftime("%Y/%m/%d %H:%M")
# Replace placeholders
hmain function · python · L25-L76 (52 LOC)scraper/main.py
def main() -> None:
logger.info("Starting price scraper for %d shops", len(ALL_SCRAPERS))
# Reset all prices before scraping
for product in MASTER_PRODUCTS:
product.prices.clear()
# Scrape each shop
success_count = 0
for scraper_cls in ALL_SCRAPERS:
scraper = scraper_cls()
shop_id = scraper.shop_id
shop_name = scraper.shop_name
logger.info("--- Scraping %s (%s) ---", shop_name, shop_id)
try:
items = scraper.scrape()
if items:
# Convert to (name, price) tuples for matcher
scraped = [(item.name, item.price) for item in items]
match_products(scraped, shop_id)
success_count += 1
else:
logger.warning("%s: no items scraped", shop_name)
except Exception:
logger.error(
"%s: scraping failed:\n%s", shop_name, traceback.format_exc()
)
logger.info(
"MasterProduct class · python · L44-L50 (7 LOC)scraper/matcher.py
class MasterProduct:
"""A master product entry to match scraped items against."""
category: str # "mega" or "sv"
name: str # canonical display name
retail_price: int # retail price (0 = unknown)
keywords: list[str] = field(default_factory=list) # matching keywords
prices: dict[str, int] = field(default_factory=dict) # shop_id -> pricenormalize function · python · L125-L136 (12 LOC)scraper/matcher.py
def normalize(text: str) -> str:
"""Normalize text for matching: NFKC + lowercase + strip symbols."""
text = unicodedata.normalize("NFKC", text)
# Remove common packaging words
text = re.sub(r"[【】\[\]()()「」『』\-\s]+", " ", text)
# Remove common noise words
noise = ["BOX", "box", "Box", "シュリンク付", "シュリンク", "未開封",
"新品", "日本語版", "ポケモンカードゲーム", "ポケカ",
"1BOX", "1box", "1Box"]
for word in noise:
text = text.replace(word, "")
return text.strip()_keyword_match function · python · L139-L146 (8 LOC)scraper/matcher.py
def _keyword_match(scraped_name: str, product: MasterProduct) -> bool:
"""Check if any keyword from the product matches in the scraped name."""
norm_name = normalize(scraped_name)
for kw in product.keywords:
norm_kw = normalize(kw)
if norm_kw and norm_kw in norm_name:
return True
return False_is_single_card function · python · L149-L164 (16 LOC)scraper/matcher.py
def _is_single_card(name: str) -> bool:
"""Check if the product name looks like a single card (not a BOX).
Only returns True if no BOX indicators are present AND single card
indicators are found.
"""
# If any BOX indicator is present, it's not a single card
for indicator in BOX_INDICATORS:
if indicator in name:
return False
# Check for single card indicators
for indicator in SINGLE_CARD_INDICATORS:
if indicator in name:
return True
return FalseAbout: code-quality intelligence by Repobility · https://repobility.com
_disambiguate_dx function · python · L167-L180 (14 LOC)scraper/matcher.py
def _disambiguate_dx(scraped_name: str) -> str | None:
"""Distinguish between DX and non-DX versions of same-name packs.
Returns 'dx' if the item is a DX pack, 'normal' if normal, None if unclear.
"""
norm = normalize(scraped_name).lower()
if "dx" in norm or "DX" in scraped_name:
return "dx"
if "拡張パックdx" in norm or "拡張パックDX" in scraped_name:
return "dx"
# Runto uses "デラックス" instead of "DX"
if "デラックス" in scraped_name:
return "dx"
return "normal"match_products function · python · L183-L282 (100 LOC)scraper/matcher.py
def match_products(
scraped_items: list[tuple[str, int]],
shop_id: str,
products: list[MasterProduct] | None = None,
) -> None:
"""Match scraped items to master product list and set prices.
Args:
scraped_items: list of (product_name, price) tuples
shop_id: the shop identifier (e.g., "morimori")
products: master product list (uses MASTER_PRODUCTS if None)
"""
if products is None:
products = MASTER_PRODUCTS
matched = set()
for name, price in scraped_items:
if price <= 0:
continue
# Skip items that are clearly single cards (not BOX)
if _is_single_card(name):
continue
# Skip unreasonably low prices (likely accessories/sleeves)
if price < MIN_BOX_PRICE:
logger.debug(" SKIP (price too low): %s = %d", name, price)
continue
# Skip unreasonably high prices (likely single rare cards or errors)
if price > MAX_BOX_PRICE:
ScrapedItem class · python · L18-L21 (4 LOC)scraper/shops/base.py
class ScrapedItem:
"""A single scraped product with name and buyback price."""
name: str
price: int # buyback price in yen (0 = not available)BaseScraper class · python · L24-L74 (51 LOC)scraper/shops/base.py
class BaseScraper(ABC):
"""Base class for shop scrapers."""
shop_id: str = ""
shop_name: str = ""
use_playwright: bool = False
# Common HTTP headers
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "ja,en-US;q=0.7,en;q=0.3",
}
def __init__(self):
self.session = requests.Session()
self.session.headers.update(self.HEADERS)
@abstractmethod
def scrape(self) -> list[ScrapedItem]:
"""Scrape the shop and return a list of items with prices."""
def _get_soup(self, url: str, **kwargs) -> BeautifulSoup:
"""Fetch a URL and return a BeautifulSoup object."""
for attempt in range(3):
try:
resp = self.session.get(url, timeout__init__ method · python · L42-L44 (3 LOC)scraper/shops/base.py
def __init__(self):
self.session = requests.Session()
self.session.headers.update(self.HEADERS)_get_soup method · python · L50-L65 (16 LOC)scraper/shops/base.py
def _get_soup(self, url: str, **kwargs) -> BeautifulSoup:
"""Fetch a URL and return a BeautifulSoup object."""
for attempt in range(3):
try:
resp = self.session.get(url, timeout=30, **kwargs)
resp.raise_for_status()
return BeautifulSoup(resp.text, "html.parser")
except requests.RequestException as e:
logger.warning(
"%s: attempt %d failed for %s: %s",
self.shop_name, attempt + 1, url, e,
)
if attempt < 2:
time.sleep(3 * (attempt + 1))
else:
raiseparse_price method · python · L68-L74 (7 LOC)scraper/shops/base.py
def parse_price(text: str) -> int:
"""Extract an integer price from text like '¥14,300' or '14300円'."""
if not text:
return 0
# Remove all non-digit characters
digits = re.sub(r"[^\d]", "", text)
return int(digits) if digits else 0HomuraScraper class · python · L19-L83 (65 LOC)scraper/shops/homura.py
class HomuraScraper(BaseScraper):
shop_id = "homura"
shop_name = "ホムラ"
def scrape(self) -> list[ScrapedItem]:
items: list[ScrapedItem] = []
page = 1
while True:
url = f"{CATEGORY_URL}&page={page}" if page > 1 else CATEGORY_URL
try:
soup = self._get_soup(url)
except Exception:
break
# Each product is in a div[data-controller="dialog"]
dialogs = soup.select('div[data-controller="dialog"]')
if not dialogs:
break
found = 0
for dialog in dialogs:
# Product name in h5 inside a link
name_el = dialog.select_one('a[href^="/products/"] h5')
if not name_el:
name_el = dialog.select_one("h5")
if not name_el:
continue
# Price in span.font-semibold inside items-end container
price_el =Source: Repobility analyzer · https://repobility.com
scrape method · python · L23-L83 (61 LOC)scraper/shops/homura.py
def scrape(self) -> list[ScrapedItem]:
items: list[ScrapedItem] = []
page = 1
while True:
url = f"{CATEGORY_URL}&page={page}" if page > 1 else CATEGORY_URL
try:
soup = self._get_soup(url)
except Exception:
break
# Each product is in a div[data-controller="dialog"]
dialogs = soup.select('div[data-controller="dialog"]')
if not dialogs:
break
found = 0
for dialog in dialogs:
# Product name in h5 inside a link
name_el = dialog.select_one('a[href^="/products/"] h5')
if not name_el:
name_el = dialog.select_one("h5")
if not name_el:
continue
# Price in span.font-semibold inside items-end container
price_el = dialog.select_one(
"div.items-end span.font-semibold"
IcchomeScraper class · python · L20-L76 (57 LOC)scraper/shops/icchome.py
class IcchomeScraper(BaseScraper):
shop_id = "icchome"
shop_name = "一丁目"
use_playwright = False
def scrape(self) -> list[ScrapedItem]:
items: list[ScrapedItem] = []
try:
resp = self.session.get(
API_URL,
params={
"page": 1,
"size": 100,
"keyword": "",
"isImpo": "false",
"isCampaign": "false",
"cateCode": POKEMON_CATE_CODE,
"kbNames": "",
"cateName": "",
},
timeout=30,
headers=self.HEADERS,
)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.error("%s: API request failed: %s", self.shop_name, e)
return items
if data.get("code") != 200:
logger.error(
"%s: API error: %s", self.shop_scrape method · python · L25-L76 (52 LOC)scraper/shops/icchome.py
def scrape(self) -> list[ScrapedItem]:
items: list[ScrapedItem] = []
try:
resp = self.session.get(
API_URL,
params={
"page": 1,
"size": 100,
"keyword": "",
"isImpo": "false",
"isCampaign": "false",
"cateCode": POKEMON_CATE_CODE,
"kbNames": "",
"cateName": "",
},
timeout=30,
headers=self.HEADERS,
)
resp.raise_for_status()
data = resp.json()
except Exception as e:
logger.error("%s: API request failed: %s", self.shop_name, e)
return items
if data.get("code") != 200:
logger.error(
"%s: API error: %s", self.shop_name, data.get("msg", "unknown")
)
return items
content = data.get("data", {KaikyoScraper class · python · L35-L148 (114 LOC)scraper/shops/kaikyo.py
class KaikyoScraper(BaseScraper):
shop_id = "kaikyo"
shop_name = "海峡"
use_playwright = True
def scrape(self) -> list[ScrapedItem]:
items: list[ScrapedItem] = []
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
# JavaScript MUST be disabled. The site's nb.js wraps its
# initialisation in a try/catch and redirects to "/" on any
# error, which always fires in a headless context. With JS
# off the server-rendered HTML (including all product cards)
# is preserved as-is.
ctx = browser.new_context(
java_script_enabled=False,
user_agent=self.HEADERS["User-Agent"],
)
page = ctx.new_page()
try:
# --- Page 1 (GET) ---
page.goto(URL, wait_until="commit", timeout=60000)
page.wait_for_timscrape method · python · L40-L114 (75 LOC)scraper/shops/kaikyo.py
def scrape(self) -> list[ScrapedItem]:
items: list[ScrapedItem] = []
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
# JavaScript MUST be disabled. The site's nb.js wraps its
# initialisation in a try/catch and redirects to "/" on any
# error, which always fires in a headless context. With JS
# off the server-rendered HTML (including all product cards)
# is preserved as-is.
ctx = browser.new_context(
java_script_enabled=False,
user_agent=self.HEADERS["User-Agent"],
)
page = ctx.new_page()
try:
# --- Page 1 (GET) ---
page.goto(URL, wait_until="commit", timeout=60000)
page.wait_for_timeout(2000)
html = page.content()
self._extract_from_html(html, items)
_extract_from_html method · python · L117-L148 (32 LOC)scraper/shops/kaikyo.py
def _extract_from_html(self, html: str, items: list[ScrapedItem]) -> None:
"""Parse product cards from an HTML string (full page or AJAX
fragment) and append results to *items*."""
soup = BeautifulSoup(html, "html.parser")
# Each product lives in a ``div.card`` that contains a product
# image (``img.card-img-top``). Store-info cards on the
# homepage also use ``.card`` but never have an image, so the
# ``:has(.card-img-top)`` filter keeps only real products.
cards = soup.select("div.card:has(.card-img-top)")
for card in cards:
# Product name --------------------------------------------------
# Stored in the ``title`` attribute of the first
# ``label.hideText`` inside the card-body.
name_label = card.select_one("label.hideText")
name = (name_label.get("title") or "").strip() if name_label else ""
if not name:
continue
MorimoriScraper class · python · L24-L177 (154 LOC)scraper/shops/morimori.py
class MorimoriScraper(BaseScraper):
shop_id = "morimori"
shop_name = "森森"
use_playwright = True
def scrape(self) -> list[ScrapedItem]:
items: list[ScrapedItem] = []
seen_names: set[str] = set()
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context(
user_agent=self.HEADERS["User-Agent"],
viewport={"width": 1920, "height": 1080},
locale="ja-JP",
)
page = context.new_page()
try:
# Load search page and wait for products to render
page.goto(SEARCH_URL, wait_until="networkidle", timeout=60000)
page.wait_for_timeout(5000)
# Extract products from initial load
self._extract_from_page(page, items, seen_names)
logger.info(
"%s: inscrape method · python · L29-L82 (54 LOC)scraper/shops/morimori.py
def scrape(self) -> list[ScrapedItem]:
items: list[ScrapedItem] = []
seen_names: set[str] = set()
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context(
user_agent=self.HEADERS["User-Agent"],
viewport={"width": 1920, "height": 1080},
locale="ja-JP",
)
page = context.new_page()
try:
# Load search page and wait for products to render
page.goto(SEARCH_URL, wait_until="networkidle", timeout=60000)
page.wait_for_timeout(5000)
# Extract products from initial load
self._extract_from_page(page, items, seen_names)
logger.info(
"%s: initial search: %d items",
self.shop_name, len(items),
)
#Open data scored by Repobility · https://repobility.com
_extract_from_page method · python · L84-L134 (51 LOC)scraper/shops/morimori.py
def _extract_from_page(
self, page, items: list[ScrapedItem], seen: set[str],
) -> None:
"""Extract products from the current Playwright page DOM."""
products = page.evaluate(
r"""() => {
const results = [];
const items = document.querySelectorAll('div.product-item');
for (const item of items) {
// Search page uses search-product-details-name
const nameEl = item.querySelector(
'h4[class*="product-details-name"]'
);
// Price: try multiple selectors for search vs category page
const priceEl = item.querySelector(
'div[class*="price-normal-number"]'
) || item.querySelector(
'span[class*="price-normal-number"]'
) || item.querySelector(
'[class*="price"] [class*="number_click_next_page method · python · L137-L177 (41 LOC)scraper/shops/morimori.py
def _click_next_page(page, page_num: int) -> bool:
"""Click the next page button. Returns False if no more pages."""
# Try clicking numbered pagination link
result = page.evaluate(
"""(pageNum) => {
// Look for pagination links
const links = document.querySelectorAll(
'.pagination a, .page-link, a[href*="page="]'
);
for (const link of links) {
const text = link.textContent.trim();
if (text === String(pageNum)) {
link.click();
return true;
}
}
// Look for "next" arrow/button
const nextBtns = document.querySelectorAll(
'a.next, a[rel="next"], .pagination .next a, button.next'
);
for (const btn of nextBtns) {
btn.click();
rRudeyaScraper class · python · L19-L50 (32 LOC)scraper/shops/rudeya.py
class RudeyaScraper(BaseScraper):
shop_id = "rudeya"
shop_name = "ルデヤ"
def scrape(self) -> list[ScrapedItem]:
items: list[ScrapedItem] = []
soup = self._get_soup(URL)
# Products in CSS-table: div.tbody > div.tr
rows = soup.select("div.tbody > div.tr")
for row in rows:
# Product name
name_el = row.select_one(".ttl a h2")
if not name_el:
name_el = row.select_one(".ttl h2")
if not name_el:
continue
# Price in div.td2wrap
price_el = row.select_one("div.td2wrap")
if not price_el:
continue
name = name_el.get_text(strip=True)
price = self.parse_price(price_el.get_text(strip=True))
if name and price > 0:
items.append(ScrapedItem(name=name, price=price))
logger.info("%s: scraped %d items", self.shop_name, len(items))
return itemsscrape method · python · L23-L50 (28 LOC)scraper/shops/rudeya.py
def scrape(self) -> list[ScrapedItem]:
items: list[ScrapedItem] = []
soup = self._get_soup(URL)
# Products in CSS-table: div.tbody > div.tr
rows = soup.select("div.tbody > div.tr")
for row in rows:
# Product name
name_el = row.select_one(".ttl a h2")
if not name_el:
name_el = row.select_one(".ttl h2")
if not name_el:
continue
# Price in div.td2wrap
price_el = row.select_one("div.td2wrap")
if not price_el:
continue
name = name_el.get_text(strip=True)
price = self.parse_price(price_el.get_text(strip=True))
if name and price > 0:
items.append(ScrapedItem(name=name, price=price))
logger.info("%s: scraped %d items", self.shop_name, len(items))
return itemsRuntoScraper class · python · L20-L75 (56 LOC)scraper/shops/runto.py
class RuntoScraper(BaseScraper):
shop_id = "runto"
shop_name = "ラントゥ"
def scrape(self) -> list[ScrapedItem]:
items: list[ScrapedItem] = []
for page in range(1, 15): # up to 14 pages safety limit
url = f"{BASE_URL}page/{page}/" if page > 1 else BASE_URL
try:
soup = self._get_soup(url)
except Exception:
break # no more pages (404)
# WooCommerce product cards
products = soup.select("[data-products] .product, li.product, div.product.type-product")
if not products:
break
for product in products:
# Product title
name_el = product.select_one("h2.woocommerce-loop-product__title")
if not name_el:
name_el = product.select_one("h2")
if not name_el:
continue
# Price - for WooCommerce products
price_escrape method · python · L24-L75 (52 LOC)scraper/shops/runto.py
def scrape(self) -> list[ScrapedItem]:
items: list[ScrapedItem] = []
for page in range(1, 15): # up to 14 pages safety limit
url = f"{BASE_URL}page/{page}/" if page > 1 else BASE_URL
try:
soup = self._get_soup(url)
except Exception:
break # no more pages (404)
# WooCommerce product cards
products = soup.select("[data-products] .product, li.product, div.product.type-product")
if not products:
break
for product in products:
# Product title
name_el = product.select_one("h2.woocommerce-loop-product__title")
if not name_el:
name_el = product.select_one("h2")
if not name_el:
continue
# Price - for WooCommerce products
price_elements = product.select(
"span.woocommerce-Price-amount.amShoutenScraper class · python · L57-L300 (244 LOC)scraper/shops/shouten.py
class ShoutenScraper(BaseScraper):
shop_id = "shouten"
shop_name = "商店"
use_playwright = True
def scrape(self) -> list[ScrapedItem]:
items: list[ScrapedItem] = []
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context(
user_agent=self.HEADERS["User-Agent"],
viewport={"width": 1920, "height": 1080},
locale="ja-JP",
)
context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
""")
page = context.new_page()
# Intercept sprite images -- capture the latest one
sprite_holder: dict[str, bytes | None] = {"latest": None}
def handle_route(route):
resp = route.fetch()
body = resp.boscrape method · python · L62-L127 (66 LOC)scraper/shops/shouten.py
def scrape(self) -> list[ScrapedItem]:
items: list[ScrapedItem] = []
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context(
user_agent=self.HEADERS["User-Agent"],
viewport={"width": 1920, "height": 1080},
locale="ja-JP",
)
context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
""")
page = context.new_page()
# Intercept sprite images -- capture the latest one
sprite_holder: dict[str, bytes | None] = {"latest": None}
def handle_route(route):
resp = route.fetch()
body = resp.body()
sprite_holder["latest"] = body
route.fulfill(response=resp, body=body)Want this analysis on your repo? https://repobility.com/scan/
_click_category method · python · L134-L147 (14 LOC)scraper/shops/shouten.py
def _click_category(page, category_id: str) -> None:
"""Click a .do-product-list sidebar link by data-category."""
page.evaluate(
"""(catId) => {
const links = document.querySelectorAll('.do-product-list');
for (const link of links) {
if (link.dataset.category === catId) {
link.click();
return;
}
}
}""",
category_id,
)_goto_page method · python · L150-L177 (28 LOC)scraper/shops/shouten.py
def _goto_page(page, page_num: int) -> bool:
"""Invoke the site's goto_page(N) for pagination.
Returns False when there is no next page.
"""
return bool(
page.evaluate(
"""(pageNum) => {
if (typeof goto_page !== 'function') return false;
const links = document.querySelectorAll(
'.ec-pager__item a'
);
let found = false;
for (const a of links) {
const href = a.getAttribute('href') || '';
if (href.includes("goto_page('" + pageNum + "')") ||
href.includes('goto_page("' + pageNum + '")')) {
found = true;
break;
}
}
if (!found) return false;
goto_page(String(pageNum));
retur_scrape_current_page method · python · L183-L271 (89 LOC)scraper/shops/shouten.py
def _scrape_current_page(
self,
page,
sprite_holder: dict[str, bytes | None],
page_num: int,
) -> list[ScrapedItem]:
"""Extract items from the currently rendered #search-content."""
items: list[ScrapedItem] = []
# Prefer route-intercepted sprite; fall back to direct download
sprite_bytes = sprite_holder.get("latest")
if not sprite_bytes:
sprite_bytes = self._download_sprite_from_dom(page)
if not sprite_bytes:
logger.warning(
"%s: no sprite captured for page %d",
self.shop_name, page_num,
)
return items
# Build position -> character map from the sprite
digit_map = _decode_sprite(sprite_bytes)
if not digit_map:
logger.warning(
"%s: sprite decode failed for page %d",
self.shop_name, page_num,
)
return items
# Extract produ_download_sprite_from_dom method · python · L277-L300 (24 LOC)scraper/shops/shouten.py
def _download_sprite_from_dom(self, page) -> bytes | None:
"""Extract the sprite URL from computed style and download it."""
sprite_url = page.evaluate(
r"""() => {
const el = document.querySelector(
'#search-content .encrypt-num'
);
if (!el) return null;
const bg = window.getComputedStyle(el).backgroundImage;
const m = bg.match(/url\("?([^"]+)"?\)/);
return m ? m[1] : null;
}"""
)
if not sprite_url:
return None
if sprite_url.startswith("/"):
sprite_url = f"{BASE_URL}{sprite_url}"
try:
resp = page.context.request.get(sprite_url)
if resp.ok:
return resp.body()
except Exception as e:
logger.debug("%s: sprite download failed: %s", self.shop_name, e)
return None_slot_fingerprint function · python · L307-L313 (7 LOC)scraper/shops/shouten.py
def _slot_fingerprint(img, x0: int, slot_width: int = 10) -> str:
"""Compute an MD5 fingerprint of a single sprite slot's binarised pixels."""
bits = []
for y in range(img.size[1]):
for x in range(x0, x0 + slot_width):
bits.append(1 if img.getpixel((x, y)) < 128 else 0)
return hashlib.md5(bytes(bits)).hexdigest()[:12]_decode_sprite function · python · L316-L346 (31 LOC)scraper/shops/shouten.py
def _decode_sprite(sprite_data: bytes) -> dict[int, str] | None:
"""Decode a sprite image to a {position_px: character} mapping.
Each of the 11 ten-pixel-wide slots is fingerprinted and looked up
in the pre-computed reference table. Returns None on failure.
"""
try:
from PIL import Image
except ImportError:
logger.warning("Pillow not installed -- cannot decode sprite")
return None
img = Image.open(io.BytesIO(sprite_data)).convert("L")
w, h = img.size
if w < 110 or h < 10:
return None
digit_map: dict[int, str] = {}
for slot_idx in range(11):
x0 = slot_idx * 10
fp = _slot_fingerprint(img, x0)
char = _GLYPH_FINGERPRINTS.get(fp)
if char is None:
logger.warning(
"Unknown sprite glyph fingerprint %s at slot %d",
fp, slot_idx,
)
return None
digit_map[x0] = char
return digit_map_positions_to_price function · python · L349-L363 (15 LOC)scraper/shops/shouten.py
def _positions_to_price(
positions: list[int], digit_map: dict[int, str],
) -> int:
"""Convert background-position offsets to an integer price."""
chars = []
for pos in positions:
ch = digit_map.get(pos)
if ch is None:
return 0
chars.append(ch)
price_str = "".join(chars).replace(",", "")
try:
return int(price_str)
except ValueError:
return 0SommelierScraper class · python · L18-L79 (62 LOC)scraper/shops/sommelier.py
class SommelierScraper(BaseScraper):
shop_id = "sommelier"
shop_name = "ソムリエ"
use_playwright = True
def scrape(self) -> list[ScrapedItem]:
items: list[ScrapedItem] = []
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.set_extra_http_headers({
"User-Agent": self.HEADERS["User-Agent"],
})
try:
page.goto(URL, wait_until="networkidle", timeout=60000)
page.wait_for_timeout(5000) # Wait for React hydration
# Scroll to load all lazy content
for _ in range(3):
page.evaluate(
"window.scrollTo(0, document.body.scrollHeight)"
)
page.wait_for_timeout(1500)
# Ant Design product cards
cards = page.querAbout: code-quality intelligence by Repobility · https://repobility.com
scrape method · python · L23-L79 (57 LOC)scraper/shops/sommelier.py
def scrape(self) -> list[ScrapedItem]:
items: list[ScrapedItem] = []
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.set_extra_http_headers({
"User-Agent": self.HEADERS["User-Agent"],
})
try:
page.goto(URL, wait_until="networkidle", timeout=60000)
page.wait_for_timeout(5000) # Wait for React hydration
# Scroll to load all lazy content
for _ in range(3):
page.evaluate(
"window.scrollTo(0, document.body.scrollHeight)"
)
page.wait_for_timeout(1500)
# Ant Design product cards
cards = page.query_selector_all(".ant-card")
for card in cards:
# Product name