Function bodies 86 total
detect_device function · python · L56-L82 (27 LOC)main.py
def detect_device() -> str:
"""Detect the best available compute device and set the global ``device`` variable.
Checks ``FORCE_CPU`` env var first, then CUDA, then MPS, falling back to CPU.
Returns:
The device string (``"cuda"``, ``"mps"``, or ``"cpu"``).
"""
global device
force_cpu = os.environ.get("FORCE_CPU", "false").lower() in ("true", "1", "yes")
if force_cpu:
device = "cpu"
logger.info("Device: CPU (forced via FORCE_CPU)")
return device
if torch.cuda.is_available():
device = "cuda"
gpu_name = torch.cuda.get_device_name(0)
vram = torch.cuda.get_device_properties(0).total_memory / (1024**3)
logger.info("Device: CUDA — %s (%.1f GB VRAM)", gpu_name, vram)
elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
device = "mps"
logger.info("Device: MPS (Apple Silicon GPU)")
else:
device = "cpu"
logger.info("Device: CPU (no GPU avalifespan function · python · L121-L149 (29 LOC)main.py
async def lifespan(app: FastAPI):
"""Manage application startup and shutdown.
On startup: detects the compute device, downloads model weights (if needed),
instantiates wrapper objects, and populates ``model_registry``.
On shutdown: clears the registry and frees CUDA memory.
"""
detect_device()
for category, cfg in MODEL_CONFIG.items():
variant = os.environ.get(cfg["env_var"], cfg["default"])
try:
if cfg.get("multi_file"):
model_path = ensure_model_files_exist(category, variant)
else:
model_path = ensure_model_exists(category, variant)
wrapper = cfg["wrapper"](model_path, device, variant=variant)
model_registry[category] = wrapper
logger.info("Loaded model: %s/%s", category, variant)
except Exception:
logger.exception("Failed to load model %s/%s", category, variant)
logger.info("Startup complete — %d/%d models loaded", len(modRequestLoggingMiddleware.dispatch method · python · L167-L185 (19 LOC)main.py
async def dispatch(self, request: Request, call_next):
start = time.perf_counter()
response = await call_next(request)
duration_ms = (time.perf_counter() - start) * 1000
logger.info(
"%s %s %d %.1fms",
request.method,
request.url.path,
response.status_code,
duration_ms,
extra={
"method": request.method,
"path": request.url.path,
"status_code": response.status_code,
"duration_ms": round(duration_ms, 1),
"client_ip": request.client.host if request.client else None,
},
)
return responseget_model function · python · L207-L218 (12 LOC)main.py
def get_model(name: str):
"""Look up a model wrapper by category name.
Args:
name: Model category key (e.g. ``"colorize"``, ``"restore"``).
Returns:
The model wrapper instance, or ``None`` if the model is not loaded.
"""
if name not in model_registry:
return None
return model_registry[name]check_file_size function · python · L227-L241 (15 LOC)main.py
def check_file_size(file_bytes: bytes) -> None:
"""Reject uploads that exceed the maximum allowed file size.
Args:
file_bytes: Raw uploaded file content.
Raises:
JSONResponse: 413 status if the file exceeds ``MAX_FILE_SIZE``.
"""
if len(file_bytes) > MAX_FILE_SIZE:
size_mb = len(file_bytes) / (1024 * 1024)
max_mb = MAX_FILE_SIZE / (1024 * 1024)
raise FileTooLargeError(
f"File too large ({size_mb:.1f} MB). Maximum allowed size is {max_mb:.0f} MB."
)colorize function · python · L256-L301 (46 LOC)main.py
def colorize(
file: UploadFile = File(...),
render_factor: int = Query(35, ge=1, le=100),
output_format: str = Query("png", pattern="^(png|jpg|jpeg|webp)$"),
):
"""Colorize a grayscale or black-and-white photo using the DDColor model.
Args:
file: Input image file (JPEG, PNG, WebP, etc.).
render_factor: Controls colorization intensity (1-100).
output_format: Desired output image format.
Returns:
The colorized image as binary content.
"""
try:
file_bytes = file.file.read()
check_file_size(file_bytes)
image = read_image(file_bytes)
image = validate_and_resize(image)
except FileTooLargeError as e:
return JSONResponse(status_code=413, content={"detail": str(e)})
except ValueError as e:
return JSONResponse(status_code=400, content={"detail": str(e)})
model = get_model("colorize")
if model is None:
return JSONResponse(
status_code=503,
restore function · python · L305-L350 (46 LOC)main.py
def restore(
file: UploadFile = File(...),
tile_size: int = Query(0, ge=0),
output_format: str = Query("png", pattern="^(png|jpg|jpeg|webp)$"),
):
"""Remove noise or blur from a photo using the NAFNet model.
Args:
file: Input image file.
tile_size: Tile size for processing (0 = no tiling).
output_format: Desired output image format.
Returns:
The restored image as binary content.
"""
try:
file_bytes = file.file.read()
check_file_size(file_bytes)
image = read_image(file_bytes)
image = validate_and_resize(image)
except FileTooLargeError as e:
return JSONResponse(status_code=413, content={"detail": str(e)})
except ValueError as e:
return JSONResponse(status_code=400, content={"detail": str(e)})
model = get_model("restore")
if model is None:
return JSONResponse(
status_code=503,
content={"detail": "Restoration model not loaded"},
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
face_restore function · python · L354-L404 (51 LOC)main.py
def face_restore(
file: UploadFile = File(...),
fidelity: float = Query(0.5, ge=0.0, le=1.0),
upscale: int = Query(2, ge=1, le=4),
output_format: str = Query("png", pattern="^(png|jpg|jpeg|webp)$"),
):
"""Enhance and restore faces in a photo using the CodeFormer model.
Detects all faces, restores each one, and pastes them back. Returns a
bicubic upscale if no faces are detected.
Args:
file: Input image file.
fidelity: Quality vs fidelity balance (0.0-1.0).
upscale: Output upscale factor (1-4).
output_format: Desired output image format.
Returns:
The face-restored image as binary content.
"""
try:
file_bytes = file.file.read()
check_file_size(file_bytes)
image = read_image(file_bytes)
image = validate_and_resize(image)
except FileTooLargeError as e:
return JSONResponse(status_code=413, content={"detail": str(e)})
except ValueError as e:
return JSupscale function · python · L408-L455 (48 LOC)main.py
def upscale(
file: UploadFile = File(...),
scale: int = Query(4, ge=1, le=8),
tile_size: int = Query(512, ge=0),
output_format: str = Query("png", pattern="^(png|jpg|jpeg|webp)$"),
):
"""Upscale an image using the Real-ESRGAN super-resolution model.
Args:
file: Input image file.
scale: Desired upscale factor (1-8).
tile_size: Tile size for processing (0 = no tiling).
output_format: Desired output image format.
Returns:
The upscaled image as binary content.
"""
try:
file_bytes = file.file.read()
check_file_size(file_bytes)
image = read_image(file_bytes)
image = validate_and_resize(image)
except FileTooLargeError as e:
return JSONResponse(status_code=413, content={"detail": str(e)})
except ValueError as e:
return JSONResponse(status_code=400, content={"detail": str(e)})
model = get_model("upscale")
if model is None:
return JSONResponse(
old_photo_restore function · python · L459-L516 (58 LOC)main.py
def old_photo_restore(
file: UploadFile = File(...),
with_scratch: bool = Query(True),
with_face: bool = Query(True),
scratch_threshold: float = Query(0.4, ge=0.0, le=1.0),
output_format: str = Query("png", pattern="^(png|jpg|jpeg|webp)$"),
):
"""Restore an old or damaged photo.
Detects and repairs scratches, restores global quality via VAE, and
optionally enhances detected faces using a SPADE generator.
Args:
file: Input image file.
with_scratch: Enable automatic scratch detection and repair.
with_face: Enable face enhancement.
scratch_threshold: Sensitivity for scratch detection (0.0-1.0).
output_format: Desired output image format.
Returns:
The restored image as binary content.
"""
try:
file_bytes = file.file.read()
check_file_size(file_bytes)
image = read_image(file_bytes)
image = validate_and_resize(image)
except FileTooLargeError as e:
retinpaint function · python · L520-L608 (89 LOC)main.py
def inpaint(
file: UploadFile = File(...),
points: str = Query(
...,
description=(
"JSON array of [x,y] points defining the inpaint polygon, "
"e.g. [[10,20],[30,40],[50,60]]"
),
),
output_format: str = Query("png", pattern="^(png|jpg|jpeg|webp)$"),
):
"""Inpaint a polygon-shaped region of an image using the LaMa model.
The ``points`` query parameter defines a polygon on the image. The polygon
interior is filled white to create the inpaint mask.
Args:
file: Input image file (JPEG, PNG, WebP, etc.).
points: JSON array of ``[x, y]`` coordinate pairs (at least 3).
output_format: Desired output image format.
Returns:
The inpainted image as binary content.
"""
try:
file_bytes = file.file.read()
check_file_size(file_bytes)
image = read_image(file_bytes)
image = validate_and_resize(image)
except FileTooLargeError as e:
returpipeline function · python · L612-L736 (125 LOC)main.py
def pipeline(
file: UploadFile = File(...),
old_photo_restore: bool = Query(False),
restore: bool = Query(True),
face_restore: bool = Query(True),
colorize: bool = Query(True),
upscale: bool = Query(True),
width: int = Query(2400, ge=1),
height: int = Query(2400, ge=1),
output_format: str = Query("png", pattern="^(png|jpg|jpeg|webp)$"),
# old photo restore params
with_scratch: bool = Query(True),
scratch_threshold: float = Query(0.4, ge=0.0, le=1.0),
# colorize params
render_factor: int = Query(35, ge=1, le=100),
# restore params
restore_tile_size: int = Query(0, ge=0),
# face restore params
fidelity: float = Query(0.7, ge=0.0, le=1.0),
# upscale params
upscale_tile_size: int = Query(512, ge=0),
):
"""Run multiple enhancement steps in a single request.
Pipeline order: old_photo_restore -> colorize -> restore -> upscale -> resize -> face restore.
Args:
file: Input image file.
olhealth function · python · L780-L799 (20 LOC)main.py
def health():
"""Return server health status, device info, and loaded models.
Returns:
JSON with status, device, loaded model list, and optional CUDA info.
"""
cuda_info = None
if torch.cuda.is_available():
cuda_info = {
"gpu_name": torch.cuda.get_device_name(0),
"vram_total_gb": round(torch.cuda.get_device_properties(0).total_memory / (1024**3), 2),
"vram_used_gb": round(torch.cuda.memory_allocated(0) / (1024**3), 2),
}
return {
"status": "healthy",
"device": device,
"loaded_models": list(model_registry.keys()),
"cuda_info": cuda_info,
}TransformerSALayer.__init__ method · python · L9-L29 (21 LOC)models/archs/codeformer_arch.py
def __init__(self, embed_dim, nhead=8, dim_mlp=2048, dropout=0.0, activation="gelu"):
super().__init__()
self.self_attn = nn.MultiheadAttention(
embed_dim,
nhead,
dropout=dropout,
batch_first=False,
)
self.linear1 = nn.Linear(embed_dim, dim_mlp)
self.dropout = nn.Dropout(dropout)
self.linear2 = nn.Linear(dim_mlp, embed_dim)
self.norm1 = nn.LayerNorm(embed_dim)
self.norm2 = nn.LayerNorm(embed_dim)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
if activation == "gelu":
self.activation = F.gelu
else:
self.activation = F.reluTransformerSALayer.forward method · python · L35-L52 (18 LOC)models/archs/codeformer_arch.py
def forward(self, tgt, tgt_mask=None, tgt_key_padding_mask=None, query_pos=None):
# Self-attention with positional encoding
tgt2 = self.norm1(tgt)
q = k = self.with_pos_embed(tgt2, query_pos)
tgt2 = self.self_attn(
q,
k,
value=tgt2,
attn_mask=tgt_mask,
key_padding_mask=tgt_key_padding_mask,
)[0]
tgt = tgt + self.dropout1(tgt2)
# Feed-forward
tgt2 = self.norm2(tgt)
tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt2))))
tgt = tgt + self.dropout2(tgt2)
return tgtCitation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
CodeFormer.__init__ method · python · L56-L124 (69 LOC)models/archs/codeformer_arch.py
def __init__(
self,
dim_embd=512,
n_head=8,
n_layers=9,
codebook_size=1024,
latent_size=256,
connect_list=None,
fix_modules=None,
):
if connect_list is None:
connect_list = ["32", "64", "128", "256"]
if fix_modules is None:
fix_modules = ["quantize", "generator"]
super().__init__(
512,
64,
[1, 2, 2, 4, 4, 8],
quantizer="nearest",
res_blocks=2,
attn_resolutions=[16],
codebook_size=codebook_size,
emb_dim=256,
)
if fix_modules is not None:
for module in fix_modules:
for param in getattr(self, module).parameters():
param.requires_grad = False
self.connect_list = connect_list
self.n_layers = n_layers
self.dim_embd = dim_embd
self.dim_mlp = dim_embd * 2
self.position_emb = CodeFormer.forward method · python · L126-L191 (66 LOC)models/archs/codeformer_arch.py
def forward(self, x, w=0, detach_16=True, code_only=False, adain=False):
# ---------- Encoder ----------
enc_feat_dict = {}
out_list = [self.connect_list] # noqa: F841
for i, block in enumerate(self.encoder.blocks):
x = block(x)
if isinstance(block, ResBlock):
if x.shape[-1] in [int(f) for f in self.connect_list]:
enc_feat_dict[str(x.shape[-1])] = x.clone()
lq_feat = x
# ---------- Transformer ----------
pos_emb = self.position_emb.unsqueeze(1).repeat(1, x.shape[0], 1)
# BCHW -> BC(HW) -> (HW)BC
feat_emb = self.feat_emb(lq_feat.flatten(2).permute(2, 0, 1))
query_emb = feat_emb
for layer in self.ft_layers:
query_emb = layer(query_emb, query_pos=pos_emb)
# Predict codebook indices
logits = self.idx_pred_layer(query_emb) # (HW, B, codebook_size)
soft_one_hot = F.softmax(logits, dim=2)
_, toCodeFormer._adaptive_instance_normalization method · python · L194-L201 (8 LOC)models/archs/codeformer_arch.py
def _adaptive_instance_normalization(content_feat, style_feat):
size = content_feat.size() # noqa: F841
style_mean = style_feat.mean([2, 3], keepdim=True)
style_std = style_feat.std([2, 3], keepdim=True) + 1e-8
content_mean = content_feat.mean([2, 3], keepdim=True)
content_std = content_feat.std([2, 3], keepdim=True) + 1e-8
normalized = (content_feat - content_mean) / content_std
return normalized * style_std + style_meanLayerNorm2d.forward method · python · L13-L18 (6 LOC)models/archs/nafnet_arch.py
def forward(self, x):
u = x.mean(1, keepdim=True)
s = (x - u).pow(2).mean(1, keepdim=True)
x = (x - u) / torch.sqrt(s + self.eps)
x = self.weight[:, None, None] * x + self.bias[:, None, None]
return xNAFBlock.__init__ method · python · L28-L61 (34 LOC)models/archs/nafnet_arch.py
def __init__(self, c, dw_expand=2, ffn_expand=2, drop_out_rate=0.0):
super().__init__()
dw_channel = c * dw_expand
self.conv1 = nn.Conv2d(c, dw_channel, 1, 1, 0)
self.conv2 = nn.Conv2d(
dw_channel,
dw_channel,
3,
1,
1,
groups=dw_channel,
)
self.conv3 = nn.Conv2d(dw_channel // 2, c, 1, 1, 0)
# Simplified Channel Attention
self.sca = nn.Sequential(
nn.AdaptiveAvgPool2d(1),
nn.Conv2d(dw_channel // 2, dw_channel // 2, 1, 1, 0),
)
self.sg = SimpleGate()
ffn_channel = ffn_expand * c
self.conv4 = nn.Conv2d(c, ffn_channel, 1, 1, 0)
self.conv5 = nn.Conv2d(ffn_channel // 2, c, 1, 1, 0)
self.norm1 = LayerNorm2d(c)
self.norm2 = LayerNorm2d(c)
self.dropout1 = nn.Dropout(drop_out_rate) if drop_out_rate > 0.0 else nn.Identity()
self.dropout2 = nn.Dropout(drop_out_NAFBlock.forward method · python · L63-L78 (16 LOC)models/archs/nafnet_arch.py
def forward(self, inp):
x = inp
x = self.norm1(x)
x = self.conv1(x)
x = self.conv2(x)
x = self.sg(x)
x = x * self.sca(x)
x = self.conv3(x)
x = self.dropout1(x)
y = inp + x * self.beta
x = self.conv4(self.norm2(y))
x = self.sg(x)
x = self.conv5(x)
x = self.dropout2(x)
return y + x * self.gammaNAFNet.__init__ method · python · L82-L122 (41 LOC)models/archs/nafnet_arch.py
def __init__(
self,
img_channel=3,
width=64,
middle_blk_num=12,
enc_blk_nums=None,
dec_blk_nums=None,
):
super().__init__()
if enc_blk_nums is None:
enc_blk_nums = []
if dec_blk_nums is None:
dec_blk_nums = []
self.intro = nn.Conv2d(img_channel, width, 3, 1, 1)
self.ending = nn.Conv2d(width, img_channel, 3, 1, 1)
self.encoders = nn.ModuleList()
self.decoders = nn.ModuleList()
self.ups = nn.ModuleList()
self.downs = nn.ModuleList()
chan = width
for num in enc_blk_nums:
self.encoders.append(nn.Sequential(*[NAFBlock(chan) for _ in range(num)]))
self.downs.append(nn.Conv2d(chan, 2 * chan, 2, 2))
chan = chan * 2
self.middle_blks = nn.Sequential(*[NAFBlock(chan) for _ in range(middle_blk_num)])
for num in dec_blk_nums:
self.ups.append(
nn.SequNAFNet.forward method · python · L124-L146 (23 LOC)models/archs/nafnet_arch.py
def forward(self, inp):
B, C, H, W = inp.shape
inp = self._check_image_size(inp)
x = self.intro(inp)
encs = []
for encoder, down in zip(self.encoders, self.downs):
x = encoder(x)
encs.append(x)
x = down(x)
x = self.middle_blks(x)
for decoder, up, enc_skip in zip(self.decoders, self.ups, encs[::-1]):
x = up(x)
x = x + enc_skip
x = decoder(x)
x = self.ending(x)
x = x + inp
return x[:, :, :H, :W]Repobility · severity-and-effort ranking · https://repobility.com
NAFNet._check_image_size method · python · L148-L153 (6 LOC)models/archs/nafnet_arch.py
def _check_image_size(self, x):
_, _, h, w = x.size()
mod_pad_h = (self.padder_size - h % self.padder_size) % self.padder_size
mod_pad_w = (self.padder_size - w % self.padder_size) % self.padder_size
x = F.pad(x, (0, mod_pad_w, 0, mod_pad_h))
return xDownsample.__init__ method · python · L18-L53 (36 LOC)models/archs/old_photo_detect_arch.py
def __init__(self, pad_type="reflect", filt_size=3, stride=2, channels=None, pad_off=0):
super().__init__()
self.filt_size = filt_size
self.pad_off = pad_off
self.pad_sizes = [
int(1.0 * (filt_size - 1) / 2),
int(np.ceil(1.0 * (filt_size - 1) / 2)),
int(1.0 * (filt_size - 1) / 2),
int(np.ceil(1.0 * (filt_size - 1) / 2)),
]
self.pad_sizes = [pad_size + pad_off for pad_size in self.pad_sizes]
self.stride = stride
self.off = int((self.stride - 1) / 2.0)
self.channels = channels
if self.filt_size == 1:
a = np.array([1.0])
elif self.filt_size == 2:
a = np.array([1.0, 1.0])
elif self.filt_size == 3:
a = np.array([1.0, 2.0, 1.0])
elif self.filt_size == 4:
a = np.array([1.0, 3.0, 3.0, 1.0])
elif self.filt_size == 5:
a = np.array([1.0, 4.0, 6.0, 4.0, 1.0])
elif selfDownsample.forward method · python · L55-L62 (8 LOC)models/archs/old_photo_detect_arch.py
def forward(self, inp):
if self.filt_size == 1:
if self.pad_off == 0:
return inp[:, :, :: self.stride, :: self.stride]
else:
return self.pad(inp)[:, :, :: self.stride, :: self.stride]
else:
return F.conv2d(self.pad(inp), self.filt, stride=self.stride, groups=inp.shape[1])_get_pad_layer function · python · L65-L73 (9 LOC)models/archs/old_photo_detect_arch.py
def _get_pad_layer(pad_type):
if pad_type in ["refl", "reflect"]:
return nn.ReflectionPad2d
elif pad_type in ["repl", "replicate"]:
return nn.ReplicationPad2d
elif pad_type == "zero":
return nn.ZeroPad2d
else:
raise ValueError(f"Pad type [{pad_type}] not recognized")UNetConvBlock.__init__ method · python · L77-L87 (11 LOC)models/archs/old_photo_detect_arch.py
def __init__(self, conv_num, in_size, out_size, padding, batch_norm):
super().__init__()
block = []
for _ in range(conv_num):
block.append(nn.ReflectionPad2d(padding=int(padding)))
block.append(nn.Conv2d(in_size, out_size, kernel_size=3, padding=0))
if batch_norm:
block.append(nn.BatchNorm2d(out_size))
block.append(nn.LeakyReLU(0.2, True))
in_size = out_size
self.block = nn.Sequential(*block)UNetUpBlock.__init__ method · python · L94-L104 (11 LOC)models/archs/old_photo_detect_arch.py
def __init__(self, conv_num, in_size, out_size, up_mode, padding, batch_norm):
super().__init__()
if up_mode == "upconv":
self.up = nn.ConvTranspose2d(in_size, out_size, kernel_size=2, stride=2)
elif up_mode == "upsample":
self.up = nn.Sequential(
nn.Upsample(mode="bilinear", scale_factor=2, align_corners=False),
nn.ReflectionPad2d(1),
nn.Conv2d(in_size, out_size, kernel_size=3, padding=0),
)
self.conv_block = UNetConvBlock(conv_num, in_size, out_size, padding, batch_norm)UNet.__init__ method · python · L126-L194 (69 LOC)models/archs/old_photo_detect_arch.py
def __init__(
self,
in_channels=1,
out_channels=1,
depth=4,
conv_num=2,
wf=6,
padding=True,
batch_norm=True,
up_mode="upsample",
with_tanh=False,
antialiasing=True,
):
super().__init__()
assert up_mode in ("upconv", "upsample")
prev_channels = in_channels
self.first = nn.Sequential(
nn.ReflectionPad2d(3),
nn.Conv2d(in_channels, 2**wf, kernel_size=7),
nn.LeakyReLU(0.2, True),
)
prev_channels = 2**wf
self.down_path = nn.ModuleList()
self.down_sample = nn.ModuleList()
for i in range(depth):
if antialiasing:
self.down_sample.append(
nn.Sequential(
nn.ReflectionPad2d(1),
nn.Conv2d(prev_channels, prev_channels, kernel_size=3, stride=1, padding=0),
nn.BatchNorm2d(prev_chanUNet.forward method · python · L196-L205 (10 LOC)models/archs/old_photo_detect_arch.py
def forward(self, x):
x = self.first(x)
blocks = []
for i, down_block in enumerate(self.down_path):
blocks.append(x)
x = self.down_sample[i](x)
x = down_block(x)
for i, up in enumerate(self.up_path):
x = up(x, blocks[-i - 1])
return self.last(x)Repobility · code-quality intelligence · https://repobility.com
SPADE.__init__ method · python · L20-L30 (11 LOC)models/archs/old_photo_face_arch.py
def __init__(self, norm_nc, label_nc, nhidden=128):
super().__init__()
self.param_free_norm = nn.BatchNorm2d(norm_nc, affine=False)
self.mlp_shared = nn.Sequential(
nn.Conv2d(label_nc, nhidden, kernel_size=3, padding=1),
nn.ReLU(),
)
self.mlp_gamma = nn.Conv2d(nhidden, norm_nc, kernel_size=3, padding=1)
self.mlp_beta = nn.Conv2d(nhidden, norm_nc, kernel_size=3, padding=1)SPADE.forward method · python · L32-L38 (7 LOC)models/archs/old_photo_face_arch.py
def forward(self, x, segmap):
normalized = self.param_free_norm(x)
segmap = F.interpolate(segmap, size=x.size()[2:], mode="nearest")
actv = self.mlp_shared(segmap)
gamma = self.mlp_gamma(actv)
beta = self.mlp_beta(actv)
return normalized * (1 + gamma) + betaSPADEResnetBlock.__init__ method · python · L44-L57 (14 LOC)models/archs/old_photo_face_arch.py
def __init__(self, fin, fout, semantic_nc):
super().__init__()
self.learned_shortcut = fin != fout
fmiddle = min(fin, fout)
self.conv_0 = spectral_norm(nn.Conv2d(fin, fmiddle, kernel_size=3, padding=1))
self.conv_1 = spectral_norm(nn.Conv2d(fmiddle, fout, kernel_size=3, padding=1))
if self.learned_shortcut:
self.conv_s = spectral_norm(nn.Conv2d(fin, fout, kernel_size=1, bias=False))
self.norm_0 = SPADE(fin, semantic_nc)
self.norm_1 = SPADE(fmiddle, semantic_nc)
if self.learned_shortcut:
self.norm_s = SPADE(fin, semantic_nc)SPADEResnetBlock.shortcut method · python · L59-L64 (6 LOC)models/archs/old_photo_face_arch.py
def shortcut(self, x, seg):
if self.learned_shortcut:
x_s = self.conv_s(self.norm_s(x, seg))
else:
x_s = x
return x_sConvEncoder.__init__ method · python · L79-L93 (15 LOC)models/archs/old_photo_face_arch.py
def __init__(self, input_nc=3, ngf=64, norm_layer=nn.BatchNorm2d):
super().__init__()
kw = 3
pw = 1
self.layer1 = norm_layer(nn.Conv2d(input_nc, ngf, kw, stride=2, padding=pw))
self.layer2 = norm_layer(nn.Conv2d(ngf, ngf * 2, kw, stride=2, padding=pw))
self.layer3 = norm_layer(nn.Conv2d(ngf * 2, ngf * 4, kw, stride=2, padding=pw))
self.layer4 = norm_layer(nn.Conv2d(ngf * 4, ngf * 8, kw, stride=2, padding=pw))
self.layer5 = norm_layer(nn.Conv2d(ngf * 8, ngf * 8, kw, stride=2, padding=pw))
self.actvn = nn.LeakyReLU(0.2, False)
self.fc_mu = nn.Linear(ngf * 8 * 4 * 4, 256)
self.fc_var = nn.Linear(ngf * 8 * 4 * 4, 256)ConvEncoder.forward method · python · L95-L106 (12 LOC)models/archs/old_photo_face_arch.py
def forward(self, x):
x = self.layer1(x)
x = self.layer2(self.actvn(x))
x = self.layer3(self.actvn(x))
x = self.layer4(self.actvn(x))
x = self.layer5(self.actvn(x))
x = self.actvn(x)
x = x.view(x.size(0), -1)
mu = self.fc_mu(x)
logvar = self.fc_var(x)
return mu, logvarSPADEGenerator.__init__ method · python · L116-L137 (22 LOC)models/archs/old_photo_face_arch.py
def __init__(self, input_nc=3, output_nc=3, ngf=64, semantic_nc=3, use_vae=False):
super().__init__()
self.use_vae = use_vae
nf = ngf
self.sw = self.sh = 4 # Spatial size of first feature map
if use_vae:
self.fc = nn.Linear(256, 16 * nf * self.sw * self.sh)
else:
self.fc = nn.Conv2d(semantic_nc, 16 * nf, kernel_size=3, padding=1)
self.head_0 = SPADEResnetBlock(16 * nf, 16 * nf, semantic_nc)
self.G_middle_0 = SPADEResnetBlock(16 * nf, 16 * nf, semantic_nc)
self.G_middle_1 = SPADEResnetBlock(16 * nf, 16 * nf, semantic_nc)
self.up_0 = SPADEResnetBlock(16 * nf, 8 * nf, semantic_nc)
self.up_1 = SPADEResnetBlock(8 * nf, 4 * nf, semantic_nc)
self.up_2 = SPADEResnetBlock(4 * nf, 2 * nf, semantic_nc)
self.up_3 = SPADEResnetBlock(2 * nf, 1 * nf, semantic_nc)
self.conv_img = nn.Conv2d(nf, output_nc, kernel_size=3, padding=1)
self.up = nn.UpsampleSPADEGenerator.forward method · python · L139-L165 (27 LOC)models/archs/old_photo_face_arch.py
def forward(self, input, z=None):
seg = input
if self.use_vae:
x = self.fc(z)
x = x.view(-1, 16 * 64, self.sh, self.sw) # ngf=64
else:
x = F.interpolate(seg, size=(self.sh, self.sw), mode="bilinear", align_corners=False)
x = self.fc(x)
x = self.head_0(x, seg)
x = self.up(x)
x = self.G_middle_0(x, seg)
x = self.up(x)
x = self.G_middle_1(x, seg)
x = self.up(x)
x = self.up_0(x, seg)
x = self.up(x)
x = self.up_1(x, seg)
x = self.up(x)
x = self.up_2(x, seg)
x = self.up(x)
x = self.up_3(x, seg)
x = self.conv_img(F.leaky_relu(x, 2e-1))
x = torch.tanh(x)
return xHi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
ResnetBlock._build_conv_block method · python · L21-L46 (26 LOC)models/archs/old_photo_global_arch.py
def _build_conv_block(self, dim, padding_type, norm_layer, dilation):
conv_block = []
p = 0
if padding_type == "reflect":
conv_block += [nn.ReflectionPad2d(dilation)]
elif padding_type == "replicate":
conv_block += [nn.ReplicationPad2d(dilation)]
elif padding_type == "zero":
p = dilation
conv_block += [
nn.Conv2d(dim, dim, kernel_size=3, padding=p, dilation=dilation),
norm_layer(dim),
nn.ReLU(True),
]
p = 0
if padding_type == "reflect":
conv_block += [nn.ReflectionPad2d(1)]
elif padding_type == "replicate":
conv_block += [nn.ReplicationPad2d(1)]
elif padding_type == "zero":
p = 1
conv_block += [nn.Conv2d(dim, dim, kernel_size=3, padding=p), norm_layer(dim)]
return nn.Sequential(*conv_block)GlobalGenerator_DCDCv2.__init__ method · python · L62-L193 (132 LOC)models/archs/old_photo_global_arch.py
def __init__(
self,
input_nc=3,
output_nc=3,
ngf=64,
k_size=4,
n_downsampling=3,
norm_layer=nn.InstanceNorm2d,
padding_type="reflect",
start_r=1,
mc=64,
spatio_size=64,
):
super().__init__()
activation = nn.ReLU(True)
# Encoder
encoder = [
nn.ReflectionPad2d(3),
nn.Conv2d(input_nc, min(ngf, mc), kernel_size=7, padding=0),
norm_layer(min(ngf, mc)),
activation,
]
# Plain downsampling (no ResBlocks)
for i in range(start_r):
mult = 2**i
in_ch = min(ngf * mult, mc)
out_ch = min(ngf * mult * 2, mc)
encoder += [
nn.Conv2d(in_ch, out_ch, kernel_size=k_size, stride=2, padding=1),
norm_layer(out_ch),
activation,
]
# Downsampling with ResBlocks
for i in range(start_r, n_downsGlobalGenerator_DCDCv2.forward method · python · L195-L205 (11 LOC)models/archs/old_photo_global_arch.py
def forward(self, input, flow="enc_dec"):
if flow == "enc":
return self.encoder(input)
elif flow == "dec":
return self.decoder(input)
elif flow == "enc_dec":
x = self.encoder(input)
x = self.decoder(x)
return x
else:
raise ValueError(f"Unknown flow: {flow}")NonLocalBlock2D_with_mask_Res.__init__ method · python · L216-L242 (27 LOC)models/archs/old_photo_global_arch.py
def __init__(self, in_channels, inter_channels=None, temperature=1.0, use_self=False):
super().__init__()
self.in_channels = in_channels
self.inter_channels = inter_channels or in_channels
self.g = nn.Conv2d(in_channels, self.inter_channels, kernel_size=1)
self.theta = nn.Conv2d(in_channels, self.inter_channels, kernel_size=1)
self.phi = nn.Conv2d(in_channels, self.inter_channels, kernel_size=1)
self.W = nn.Conv2d(self.inter_channels, in_channels, kernel_size=1)
nn.init.constant_(self.W.weight, 0)
nn.init.constant_(self.W.bias, 0)
self.temperature = temperature
self.use_self = use_self
norm_layer = nn.InstanceNorm2d
res_blocks = []
for _ in range(3):
res_blocks.append(
ResnetBlock(
inter_channels,
padding_type="reflect",
norm_layer=norm_layer,
)
)
NonLocalBlock2D_with_mask_Res.forward method · python · L244-L284 (41 LOC)models/archs/old_photo_global_arch.py
def forward(self, x, mask):
batch_size = x.size(0)
g_x = self.g(x).view(batch_size, self.inter_channels, -1).permute(0, 2, 1)
theta_x = self.theta(x).view(batch_size, self.inter_channels, -1).permute(0, 2, 1)
phi_x = self.phi(x).view(batch_size, self.inter_channels, -1)
f = torch.matmul(theta_x, phi_x)
f = f / self.temperature
f_div_C = F.softmax(f, dim=2)
# Mask processing
tmp = 1 - mask
mask_interp = F.interpolate(
mask, (x.size(2), x.size(3)), mode="bilinear", align_corners=False
)
mask_interp[mask_interp > 0] = 1.0
mask_interp = 1 - mask_interp
tmp = F.interpolate(tmp, (x.size(2), x.size(3)))
mask_interp = mask_interp * tmp
mask_expand = mask_interp.view(batch_size, 1, -1)
mask_expand = mask_expand.repeat(1, x.size(2) * x.size(3), 1)
if self.use_self:
diag_idx = range(x.size(2) * x.size(3))
maskMapping_Model_with_mask_2.__init__ method · python · L298-L324 (27 LOC)models/archs/old_photo_global_arch.py
def __init__(self, nc=64, mc=512):
super().__init__()
norm_layer = nn.InstanceNorm2d
activation = nn.ReLU(True)
# before_NL: 4 conv layers progressively increasing channels
before = []
n_up = 4
for i in range(n_up):
ic = min(nc * (2**i), mc)
oc = min(nc * (2 ** (i + 1)), mc)
before += [nn.Conv2d(ic, oc, 3, 1, 1), norm_layer(oc), activation]
self.before_NL = nn.Sequential(*before)
# NL: single non-local attention block
self.NL = NonLocalBlock2D_with_mask_Res(mc, inter_channels=mc)
# after_NL: 6 ResBlocks + downscale convs back to nc
after = []
for _ in range(6):
after.append(ResnetBlock(mc, padding_type="reflect", norm_layer=norm_layer))
for i in range(n_up - 1):
ic = min(nc * (2 ** (n_up - i)), mc)
oc = min(nc * (2 ** (n_up - 1 - i)), mc)
after += [nn.Conv2d(ic, oc, 3, 1, 1), norm_layerResidualDenseBlock.__init__ method · python · L14-L21 (8 LOC)models/archs/rrdbnet_arch.py
def __init__(self, num_feat=64, num_grow_ch=32):
super().__init__()
self.conv1 = nn.Conv2d(num_feat, num_grow_ch, 3, 1, 1)
self.conv2 = nn.Conv2d(num_feat + num_grow_ch, num_grow_ch, 3, 1, 1)
self.conv3 = nn.Conv2d(num_feat + 2 * num_grow_ch, num_grow_ch, 3, 1, 1)
self.conv4 = nn.Conv2d(num_feat + 3 * num_grow_ch, num_grow_ch, 3, 1, 1)
self.conv5 = nn.Conv2d(num_feat + 4 * num_grow_ch, num_feat, 3, 1, 1)
self.lrelu = nn.LeakyReLU(negative_slope=0.2, inplace=True)ResidualDenseBlock.forward method · python · L23-L29 (7 LOC)models/archs/rrdbnet_arch.py
def forward(self, x):
x1 = self.lrelu(self.conv1(x))
x2 = self.lrelu(self.conv2(torch.cat((x, x1), 1)))
x3 = self.lrelu(self.conv3(torch.cat((x, x1, x2), 1)))
x4 = self.lrelu(self.conv4(torch.cat((x, x1, x2, x3), 1)))
x5 = self.conv5(torch.cat((x, x1, x2, x3, x4), 1))
return x5 * 0.2 + xCitation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
RRDBNet.__init__ method · python · L47-L68 (22 LOC)models/archs/rrdbnet_arch.py
def __init__(self, num_in_ch, num_out_ch, scale=4, num_feat=64, num_block=23, num_grow_ch=32):
super().__init__()
self.scale = scale
if scale == 2:
num_in_ch = num_in_ch * 4
elif scale == 1:
num_in_ch = num_in_ch * 16
self.conv_first = nn.Conv2d(num_in_ch, num_feat, 3, 1, 1)
self.body = make_layer(
RRDB,
num_block,
num_feat=num_feat,
num_grow_ch=num_grow_ch,
)
self.conv_body = nn.Conv2d(num_feat, num_feat, 3, 1, 1)
# upsample
self.conv_up1 = nn.Conv2d(num_feat, num_feat, 3, 1, 1)
self.conv_up2 = nn.Conv2d(num_feat, num_feat, 3, 1, 1)
self.conv_hr = nn.Conv2d(num_feat, num_feat, 3, 1, 1)
self.conv_last = nn.Conv2d(num_feat, num_out_ch, 3, 1, 1)
self.lrelu = nn.LeakyReLU(negative_slope=0.2, inplace=True)RRDBNet.forward method · python · L70-L84 (15 LOC)models/archs/rrdbnet_arch.py
def forward(self, x):
if self.scale == 2:
feat = F.pixel_unshuffle(x, 2)
elif self.scale == 1:
feat = F.pixel_unshuffle(x, 4)
else:
feat = x
feat = self.conv_first(feat)
body_feat = self.conv_body(self.body(feat))
feat = feat + body_feat
# upsample 4x (two 2x steps)
feat = self.lrelu(self.conv_up1(F.interpolate(feat, scale_factor=2, mode="nearest")))
feat = self.lrelu(self.conv_up2(F.interpolate(feat, scale_factor=2, mode="nearest")))
out = self.conv_last(self.lrelu(self.conv_hr(feat)))
return outnormalize function · python · L6-L12 (7 LOC)models/archs/vqgan_arch.py
def normalize(in_channels):
return nn.GroupNorm(
num_groups=32,
num_channels=in_channels,
eps=1e-6,
affine=True,
)page 1 / 2next ›