Function bodies 125 total
baseline_als function · python · L32-L96 (65 LOC)src/spectrakit/baseline/als.py
def baseline_als(
intensities: np.ndarray,
lam: float = DEFAULT_LAMBDA,
p: float = DEFAULT_P,
max_iter: int = DEFAULT_MAX_ITER,
tol: float = DEFAULT_TOL,
return_info: bool = False,
) -> np.ndarray | ConvergenceInfo:
"""Estimate baseline using Asymmetric Least Squares smoothing.
Iteratively fits a smooth baseline by penalizing deviations
asymmetrically: points above the baseline are penalized less
(weight p) than points below (weight 1-p).
Args:
intensities: Spectral intensities, shape (W,) or (N, W).
lam: Smoothness parameter (lambda). Larger = smoother.
Typical range: 1e4 to 1e9.
p: Asymmetry parameter. Smaller values push baseline lower.
Typical range: 0.001 to 0.05.
max_iter: Maximum number of iterations.
tol: Convergence tolerance on weight change.
return_info: If ``True``, return a :class:`ConvergenceInfo`
object with iteration count, convergence status_baseline_als_1d function · python · L99-L121 (23 LOC)src/spectrakit/baseline/als.py
def _baseline_als_1d(
intensities: np.ndarray,
penalty: sparse.spmatrix,
p: float = DEFAULT_P,
max_iter: int = DEFAULT_MAX_ITER,
tol: float = DEFAULT_TOL,
) -> np.ndarray:
"""ALS baseline for a single 1-D spectrum."""
n = len(intensities)
w = np.ones(n)
y = intensities
for _ in range(max_iter):
W = sparse.diags(w, 0)
Z = W + penalty
z = spsolve(Z, w * y)
w_new = np.where(y > z, p, 1.0 - p)
if np.sum(np.abs(w_new - w)) < tol:
break
w = w_new # type: ignore[assignment]
return z # type: ignore[no-any-return]_baseline_als_1d_info function · python · L124-L157 (34 LOC)src/spectrakit/baseline/als.py
def _baseline_als_1d_info(
intensities: np.ndarray,
penalty: sparse.spmatrix,
p: float = DEFAULT_P,
max_iter: int = DEFAULT_MAX_ITER,
tol: float = DEFAULT_TOL,
) -> ConvergenceInfo:
"""ALS baseline for a single 1-D spectrum, returning convergence info."""
n = len(intensities)
w = np.ones(n)
y = intensities
converged = False
residual = float("inf")
iteration = 0
for _i in range(1, max_iter + 1):
iteration = _i
W = sparse.diags(w, 0)
Z = W + penalty
z = spsolve(Z, w * y)
w_new = np.where(y > z, p, 1.0 - p)
residual = float(np.sum(np.abs(w_new - w)))
if residual < tol:
converged = True
break
w = w_new # type: ignore[assignment]
return ConvergenceInfo(
iterations=iteration,
converged=converged,
final_residual=residual,
baseline=z,
)baseline_arpls function · python · L32-L92 (61 LOC)src/spectrakit/baseline/arpls.py
def baseline_arpls(
intensities: np.ndarray,
lam: float = DEFAULT_LAMBDA,
max_iter: int = DEFAULT_MAX_ITER,
tol: float = DEFAULT_TOL,
return_info: bool = False,
) -> np.ndarray | ConvergenceInfo:
"""Estimate baseline using Asymmetrically Reweighted PLS (ArPLS).
Unlike standard ALS which uses a fixed asymmetry parameter *p*,
ArPLS adaptively reweights based on the residual distribution.
Points below the fitted baseline are given exponentially
increasing weights, producing more robust estimates for spectra
with varying peak densities.
Args:
intensities: Spectral intensities, shape ``(W,)`` or ``(N, W)``.
lam: Smoothness parameter (lambda). Larger = smoother.
Typical range: 1e4 to 1e9.
max_iter: Maximum number of iterations.
tol: Convergence tolerance on weight change.
return_info: If ``True``, return a :class:`ConvergenceInfo`
object. Only supported for 1-D input.
Return_baseline_arpls_1d function · python · L95-L130 (36 LOC)src/spectrakit/baseline/arpls.py
def _baseline_arpls_1d(
intensities: np.ndarray,
penalty: sparse.spmatrix,
max_iter: int = DEFAULT_MAX_ITER,
tol: float = DEFAULT_TOL,
) -> np.ndarray:
"""ArPLS baseline for a single 1-D spectrum."""
n = len(intensities)
w = np.ones(n)
y = intensities
for _ in range(max_iter):
W = sparse.diags(w, 0)
Z = W + penalty
z = spsolve(Z, w * y)
# Residuals: negative = below baseline
d = y - z
d_neg = d[d < 0]
if len(d_neg) == 0:
break
# Adaptive reweighting: sigmoid of standardized negative residuals
m = np.mean(d_neg)
s = np.std(d_neg, ddof=1) if len(d_neg) > 1 else 1.0
if s < 1e-10:
s = 1.0
exponent = np.clip(2.0 * (d - (2.0 * s - m)) / s, -500.0, 500.0)
w_new = 1.0 / (1.0 + np.exp(exponent))
if np.sum(np.abs(w_new - w)) < tol:
break
w = w_new
return z # type: ignore[no-any-return]_baseline_arpls_1d_info function · python · L133-L179 (47 LOC)src/spectrakit/baseline/arpls.py
def _baseline_arpls_1d_info(
intensities: np.ndarray,
penalty: sparse.spmatrix,
max_iter: int = DEFAULT_MAX_ITER,
tol: float = DEFAULT_TOL,
) -> ConvergenceInfo:
"""ArPLS baseline for a single 1-D spectrum, returning convergence info."""
n = len(intensities)
w = np.ones(n)
y = intensities
converged = False
residual = float("inf")
iteration = 0
for _i in range(1, max_iter + 1):
iteration = _i
W = sparse.diags(w, 0)
Z = W + penalty
z = spsolve(Z, w * y)
d = y - z
d_neg = d[d < 0]
if len(d_neg) == 0:
converged = True
residual = 0.0
break
m = np.mean(d_neg)
s = np.std(d_neg, ddof=1) if len(d_neg) > 1 else 1.0
if s < 1e-10:
s = 1.0
exponent = np.clip(2.0 * (d - (2.0 * s - m)) / s, -500.0, 500.0)
w_new = 1.0 / (1.0 + np.exp(exponent))
residual = float(np.sum(np.abs(w_new - w)))
if rbaseline_polynomial function · python · L24-L70 (47 LOC)src/spectrakit/baseline/polynomial.py
def baseline_polynomial(
intensities: np.ndarray,
degree: int = DEFAULT_DEGREE,
max_iter: int = DEFAULT_MAX_ITER,
tol: float = DEFAULT_TOL,
return_info: bool = False,
) -> np.ndarray | ConvergenceInfo:
"""Estimate baseline using iterative polynomial fitting.
Fits a polynomial, then iteratively removes points above it
(peaks) and refits until convergence.
Args:
intensities: Spectral intensities, shape (W,) or (N, W).
degree: Polynomial degree. Higher = more complex baselines.
max_iter: Maximum iterations for peak-removal loop.
tol: Convergence tolerance (fraction of points changed).
return_info: If ``True``, return a :class:`ConvergenceInfo`
object. Only supported for 1-D input.
Returns:
Estimated baseline (same shape as input), or
:class:`ConvergenceInfo` if ``return_info=True``.
Raises:
SpectrumShapeError: If input is not 1-D or 2-D.
EmptySpectrumError: IHi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
_baseline_polynomial_1d function · python · L73-L97 (25 LOC)src/spectrakit/baseline/polynomial.py
def _baseline_polynomial_1d(
intensities: np.ndarray,
degree: int = DEFAULT_DEGREE,
max_iter: int = DEFAULT_MAX_ITER,
tol: float = DEFAULT_TOL,
) -> np.ndarray:
"""Iterative polynomial baseline for a single 1-D spectrum."""
n = len(intensities)
x = np.arange(n, dtype=np.float64)
y = intensities
mask = np.ones(n, dtype=bool)
for _ in range(max_iter):
coeffs = np.polyfit(x[mask], y[mask], degree)
baseline = np.polyval(coeffs, x)
new_mask = y <= baseline
if np.sum(new_mask != mask) / n < tol:
break
mask = new_mask # type: ignore[assignment]
if np.sum(mask) < degree + 1:
break
return baseline_baseline_polynomial_1d_info function · python · L100-L135 (36 LOC)src/spectrakit/baseline/polynomial.py
def _baseline_polynomial_1d_info(
intensities: np.ndarray,
degree: int = DEFAULT_DEGREE,
max_iter: int = DEFAULT_MAX_ITER,
tol: float = DEFAULT_TOL,
) -> ConvergenceInfo:
"""Iterative polynomial baseline, returning convergence info."""
n = len(intensities)
x = np.arange(n, dtype=np.float64)
y = intensities
mask = np.ones(n, dtype=bool)
converged = False
residual = 1.0
iteration = 0
for _i in range(1, max_iter + 1):
iteration = _i
coeffs = np.polyfit(x[mask], y[mask], degree)
baseline = np.polyval(coeffs, x)
new_mask = y <= baseline
residual = float(np.sum(new_mask != mask)) / n
if residual < tol:
converged = True
break
mask = new_mask # type: ignore[assignment]
if np.sum(mask) < degree + 1:
break
return ConvergenceInfo(
iterations=iteration,
converged=converged,
final_residual=residual,
baseline=baseline_rubberband function · python · L21-L41 (21 LOC)src/spectrakit/baseline/rubberband.py
def baseline_rubberband(intensities: np.ndarray) -> np.ndarray:
"""Estimate baseline using the rubberband (convex hull) method.
Computes the lower convex hull of the spectrum and interpolates
between the hull vertices to form the baseline.
Args:
intensities: Spectral intensities, shape (W,) or (N, W).
Returns:
Estimated baseline, same shape as intensities.
Raises:
SpectrumShapeError: If input is not 1-D or 2-D.
EmptySpectrumError: If input has zero elements.
"""
intensities = ensure_float64(intensities)
validate_1d_or_2d(intensities)
warn_if_not_finite(intensities)
return apply_along_spectra(_baseline_rubberband_1d, intensities)_baseline_rubberband_1d function · python · L44-L87 (44 LOC)src/spectrakit/baseline/rubberband.py
def _baseline_rubberband_1d(intensities: np.ndarray) -> np.ndarray:
"""Rubberband baseline for a single 1-D spectrum.
Uses convex hull facet normals to identify lower hull vertices
rather than a median heuristic, which can fail for skewed spectra.
"""
n = len(intensities)
x = np.arange(n)
points = np.column_stack([x, intensities])
low_val = intensities.min() - 1.0
points_ext = np.vstack(
[
points,
[0, low_val],
[n - 1, low_val],
]
)
hull = ConvexHull(points_ext)
# Identify lower hull vertices via facet normals.
# A facet whose outward normal has a negative y-component belongs
# to the lower envelope of the convex hull.
lower_vertices: set[int] = set()
for i, eq in enumerate(hull.equations):
if eq[1] < 0: # y-component of outward normal is negative
for vi in hull.simplices[i]:
if vi < n: # belongs to the original spectrum
baseline_snip function · python · L28-L63 (36 LOC)src/spectrakit/baseline/snip.py
def baseline_snip(
intensities: np.ndarray,
max_half_window: int = DEFAULT_MAX_HALF_WINDOW,
decreasing: bool = True,
) -> np.ndarray:
"""Estimate baseline using the SNIP algorithm.
Iteratively clips peaks by comparing each point to the average of
its neighbors at increasing window sizes.
Args:
intensities: Spectral intensities, shape (W,) or (N, W).
max_half_window: Maximum half-window size. Controls how broad
the features that get clipped can be.
decreasing: If True, iterate from max_half_window down to 1.
Returns:
Estimated baseline, same shape as intensities.
Raises:
SpectrumShapeError: If input is not 1-D or 2-D.
EmptySpectrumError: If input has zero elements.
"""
if max_half_window < 1:
raise ValueError(f"max_half_window must be >= 1, got {max_half_window}")
intensities = ensure_float64(intensities)
validate_1d_or_2d(intensities)
warn_if_not_finite(inten_baseline_snip_1d function · python · L66-L86 (21 LOC)src/spectrakit/baseline/snip.py
def _baseline_snip_1d(
intensities: np.ndarray,
max_half_window: int = DEFAULT_MAX_HALF_WINDOW,
decreasing: bool = True,
) -> np.ndarray:
"""SNIP baseline for a single 1-D spectrum."""
y = np.log(np.log(np.sqrt(np.maximum(intensities, EPSILON) + 1) + 1) + 1)
if decreasing:
window_range = range(max_half_window, 0, -1)
else:
window_range = range(1, max_half_window + 1)
for hw in window_range:
avg = (y[: -2 * hw] + y[2 * hw :]) / 2.0
y[hw:-hw] = np.minimum(y[hw:-hw], avg)
# Inverse of y = log(log(sqrt(x+1)+1)+1):
# x = (exp(exp(y)-1)-1)^2 - 1
baseline = (np.exp(np.exp(y) - 1) - 1) ** 2 - 1.0
return np.maximum(baseline, 0.0) # type: ignore[no-any-return]_get_app function · python · L10-L24 (15 LOC)src/spectrakit/cli.py
def _get_app() -> Any:
"""Lazy-import typer to avoid hard dependency."""
try:
import typer
except ImportError:
print(
"CLI requires typer. Install with: pip install spectrakit[cli]",
file=sys.stderr,
)
sys.exit(1)
return typer.Typer(
name="spectrakit",
help="SpectraKit: spectral data processing toolkit.",
add_completion=False,
)info function · python · L31-L56 (26 LOC)src/spectrakit/cli.py
def info(path: str) -> None:
"""Print metadata and summary statistics for a spectral file."""
from spectrakit.io.auto import read_spectrum
file_path = Path(path)
try:
spec = read_spectrum(file_path)
except (FileNotFoundError, ValueError) as exc:
print(str(exc), file=sys.stderr)
sys.exit(1)
print(f"File: {file_path.name}")
print(f"Format: {spec.source_format}")
print(f"Shape: {spec.shape}")
print(f"N spectra: {spec.n_spectra}")
print(f"N points: {spec.n_points}")
if spec.wavenumbers is not None:
print(f"X range: {spec.wavenumbers.min():.2f} - {spec.wavenumbers.max():.2f}")
print(f"Y range: {spec.intensities.min():.6f} - {spec.intensities.max():.6f}")
print(f"Y mean: {spec.intensities.mean():.6f}")
print(f"Y std: {spec.intensities.std():.6f}")
if spec.metadata:
print("Metadata:")
for key, value in list(spec.metadata.items())[:10]:
priRepobility · MCP-ready · https://repobility.com
convert function · python · L60-L75 (16 LOC)src/spectrakit/cli.py
def convert(input_path: str, output_path: str) -> None:
"""Convert a spectral file to HDF5 format."""
from spectrakit.io.auto import read_spectrum
file_path = Path(input_path)
try:
spec = read_spectrum(file_path)
except (FileNotFoundError, ValueError) as exc:
print(str(exc), file=sys.stderr)
sys.exit(1)
from spectrakit.io.hdf5 import write_hdf5
write_hdf5(spec, output_path)
print(f"Converted {file_path.name} -> {output_path}")_get_lmfit function · python · L56-L65 (10 LOC)src/spectrakit/contrib/_lmfit.py
def _get_lmfit() -> Any:
"""Import lmfit, raising DependencyError if missing."""
try:
import lmfit
return lmfit
except ImportError as e:
raise DependencyError(
"lmfit is required for peak fitting. Install with: pip install spectrakit[fitting]"
) from efit_peaks function · python · L68-L155 (88 LOC)src/spectrakit/contrib/_lmfit.py
def fit_peaks(
intensities: np.ndarray,
wavenumbers: np.ndarray,
peak_positions: list[float],
model: str = "gaussian",
**kwargs: Any,
) -> FitResult:
"""Fit spectral peaks using lmfit models.
Creates a composite model of multiple peaks and fits them to the
data. Each peak is initialized near the specified position.
Args:
intensities: Spectral intensities, shape ``(W,)``.
wavenumbers: Wavenumber axis, shape ``(W,)``.
peak_positions: Approximate peak center positions in
wavenumber units.
model: Peak shape model. One of ``"gaussian"``,
``"lorentzian"``, ``"voigt"``, ``"pseudo_voigt"``.
**kwargs: Additional keyword arguments passed to ``lmfit.Model.fit``.
Returns:
``FitResult`` with the best fit, individual components,
parameters, and residual.
Raises:
DependencyError: If lmfit is not installed.
ValueError: If the model name is not supported.
"_get_pybaselines function · python · L77-L87 (11 LOC)src/spectrakit/contrib/_pybaselines.py
def _get_pybaselines() -> Any:
"""Import and return pybaselines.Baseline, raising DependencyError if missing."""
try:
from pybaselines import Baseline
return Baseline
except ImportError as e:
raise DependencyError(
"pybaselines is required for the contrib baseline backend. "
"Install with: pip install spectrakit[baselines]"
) from epybaselines_method function · python · L90-L120 (31 LOC)src/spectrakit/contrib/_pybaselines.py
def pybaselines_method(
intensities: np.ndarray,
method: str,
**kwargs: Any,
) -> np.ndarray:
"""Apply any pybaselines method through a unified interface.
Args:
intensities: Spectral intensities, shape ``(W,)`` or ``(N, W)``.
method: Name of the pybaselines method (e.g., ``"asls"``,
``"airpls"``, ``"mor"``).
**kwargs: Keyword arguments forwarded to the pybaselines method.
Returns:
Estimated baseline, same shape as intensities.
Raises:
DependencyError: If pybaselines is not installed.
ValueError: If the method name is not recognized.
"""
baseline_cls = _get_pybaselines()
intensities = ensure_float64(intensities)
validate_1d_or_2d(intensities)
return apply_along_spectra(
_pybaselines_1d,
intensities,
method=method,
baseline_cls=baseline_cls,
**kwargs,
)_pybaselines_1d function · python · L123-L142 (20 LOC)src/spectrakit/contrib/_pybaselines.py
def _pybaselines_1d(
intensities: np.ndarray,
method: str,
baseline_cls: Any,
**kwargs: Any,
) -> np.ndarray:
"""Apply a single pybaselines method to a 1-D spectrum."""
baseline_fitter = baseline_cls(x_data=np.arange(len(intensities)))
# Find the method on the fitter
fn = getattr(baseline_fitter, method, None)
if fn is None:
raise ValueError(
f"Unknown pybaselines method: '{method}'. "
f"Available categories: {list(_PYBASELINES_CATEGORIES.keys())}"
)
result = fn(intensities, **kwargs)
# pybaselines returns (baseline, params_dict)
return result[0] # type: ignore[no-any-return]list_pybaselines_methods function · python · L145-L151 (7 LOC)src/spectrakit/contrib/_pybaselines.py
def list_pybaselines_methods() -> dict[str, list[str]]:
"""Return a dictionary of available pybaselines methods by category.
Returns:
Dict mapping category names to lists of method names.
"""
return dict(_PYBASELINES_CATEGORIES)ConvergenceInfo.__repr__ method · python · L37-L43 (7 LOC)src/spectrakit/convergence.py
def __repr__(self) -> str:
status = "converged" if self.converged else "not converged"
return (
f"ConvergenceInfo({status} in {self.iterations} iterations, "
f"residual={self.final_residual:.2e}, "
f"baseline shape={self.baseline.shape})"
)Repobility — the code-quality scanner for AI-generated software · https://repobility.com
derivative_gap_segment function · python · L29-L72 (44 LOC)src/spectrakit/derivative/gap_segment.py
def derivative_gap_segment(
intensities: np.ndarray,
gap: int = DEFAULT_GAP,
segment: int = DEFAULT_SEGMENT,
deriv: int = DEFAULT_DERIV,
) -> np.ndarray:
"""Compute gap-segment derivative (Norris-Williams method).
Averages over ``segment`` points then takes differences separated
by ``gap`` points. Commonly used for NIR pre-treatment.
Args:
intensities: Spectral intensities, shape ``(W,)`` or ``(N, W)``.
gap: Gap size (number of points between segments).
segment: Segment size (number of points to average).
deriv: Derivative order. 1 = first derivative, 2 = second.
Returns:
Derivative spectrum, same shape as input (padded with zeros
at edges where the computation is undefined).
Raises:
SpectrumShapeError: If input is not 1-D or 2-D.
EmptySpectrumError: If input has zero elements.
ValueError: If ``deriv`` is not 1 or 2.
"""
if deriv not in (1, 2):
raise Value_derivative_gap_segment_1d function · python · L75-L97 (23 LOC)src/spectrakit/derivative/gap_segment.py
def _derivative_gap_segment_1d(
intensities: np.ndarray,
gap: int = DEFAULT_GAP,
segment: int = DEFAULT_SEGMENT,
deriv: int = DEFAULT_DERIV,
) -> np.ndarray:
"""Gap-segment derivative for a single 1-D spectrum."""
n = len(intensities)
# Compute segment averages using a moving average
kernel = np.ones(segment) / segment
averaged = np.convolve(intensities, kernel, mode="same")
# First derivative: difference of segment averages separated
# by exactly `gap` points. The result is centered in the output
# array; edges are zero-padded.
result = _gap_diff(averaged, gap, n)
if deriv == 2:
# Apply the same gap difference again for second derivative
result = _gap_diff(result, gap, n)
return result_gap_diff function · python · L100-L107 (8 LOC)src/spectrakit/derivative/gap_segment.py
def _gap_diff(arr: np.ndarray, gap: int, n: int) -> np.ndarray:
"""Compute centered gap difference: out[i] ~ arr[i+gap/2] - arr[i-gap/2]."""
out = np.zeros(n, dtype=np.float64)
if gap < n:
diff = arr[gap:] - arr[:-gap] # length n - gap
start = gap // 2
out[start : start + len(diff)] = diff
return outderivative_savgol function · python · L30-L85 (56 LOC)src/spectrakit/derivative/savgol.py
def derivative_savgol(
intensities: np.ndarray,
window_length: int = DEFAULT_WINDOW_LENGTH,
polyorder: int = DEFAULT_POLYORDER,
deriv: int = DEFAULT_DERIV,
delta: float = 1.0,
) -> np.ndarray:
"""Compute spectral derivative using Savitzky-Golay filter.
Simultaneously smooths and differentiates spectral data. First and
second derivatives are the most commonly used in chemometrics.
Args:
intensities: Spectral intensities, shape ``(W,)`` or ``(N, W)``.
window_length: Length of the filter window (must be odd and
greater than ``polyorder``).
polyorder: Order of the polynomial used to fit the samples.
deriv: Order of the derivative to compute. Common values:
1 (first derivative) or 2 (second derivative).
delta: Spacing of the samples to which the filter is applied.
Used to scale the derivative by ``1 / delta**deriv``.
Returns:
Derivative spectrum, same shape as input_derivative_savgol_1d function · python · L88-L102 (15 LOC)src/spectrakit/derivative/savgol.py
def _derivative_savgol_1d(
intensities: np.ndarray,
window_length: int = DEFAULT_WINDOW_LENGTH,
polyorder: int = DEFAULT_POLYORDER,
deriv: int = DEFAULT_DERIV,
delta: float = 1.0,
) -> np.ndarray:
"""Savitzky-Golay derivative for a single 1-D spectrum."""
return savgol_filter( # type: ignore[no-any-return]
intensities,
window_length=window_length,
polyorder=polyorder,
deriv=deriv,
delta=delta,
)despike_whitaker_hayes function · python · L34-L96 (63 LOC)src/spectrakit/despike/whitaker_hayes.py
def despike_whitaker_hayes(
intensities: np.ndarray,
*,
threshold: float = DEFAULT_THRESHOLD,
window_size: int | None = None,
) -> np.ndarray:
"""Remove cosmic ray spikes using the Whitaker-Hayes method.
Computes the modified Z-score of the second finite difference to
detect outlier points (cosmic rays, electronic spikes). Detected
spikes are replaced by linear interpolation from neighboring
non-spike points.
Args:
intensities: Spectral intensities, shape ``(W,)`` or ``(N, W)``.
threshold: Modified Z-score threshold for spike detection.
Values of 3.0–5.0 are typical; lower catches more spikes.
window_size: Optional rolling window for local MAD estimation.
If ``None`` (default), uses global MAD over the full spectrum.
If provided, must be odd and >= 3.
Returns:
Despiked intensities, same shape as input.
Raises:
SpectrumShapeError: If *intensities* is not 1-D o_despike_whitaker_hayes_1d function · python · L99-L148 (50 LOC)src/spectrakit/despike/whitaker_hayes.py
def _despike_whitaker_hayes_1d(
intensities: np.ndarray,
*,
threshold: float,
window_size: int | None,
) -> np.ndarray:
"""Whitaker-Hayes despiking for a single spectrum."""
n = len(intensities)
# Step 1: Second finite difference
d2 = np.diff(intensities, n=2)
# Step 2: Modified Z-score of d2
if window_size is not None:
# Windowed MAD via scipy.ndimage
from scipy.ndimage import median_filter
median_d2 = median_filter(d2, size=window_size, mode="nearest")
abs_dev = np.abs(d2 - median_d2)
mad_d2 = median_filter(abs_dev, size=window_size, mode="nearest")
else:
# Global MAD
median_d2 = np.median(d2)
mad_d2 = np.median(np.abs(d2 - median_d2))
modified_z = _MAD_SCALE * (d2 - median_d2) / (mad_d2 + EPSILON)
# Step 3: Identify spike indices (d2 has length n-2, maps to indices 1..n-2)
spike_mask_d2 = np.abs(modified_z) > threshold
spike_indices = np.where(spike_mask_ddespike_zscore function · python · L27-L79 (53 LOC)src/spectrakit/despike/zscore.py
def despike_zscore(
intensities: np.ndarray,
*,
threshold: float = DEFAULT_THRESHOLD,
window_size: int = DEFAULT_WINDOW_SIZE,
) -> np.ndarray:
"""Remove spikes using rolling modified Z-score detection.
For each spectral point, a modified Z-score is computed relative to
the local median within a rolling window. Points exceeding the
threshold are replaced with the local median.
Args:
intensities: Spectral intensities, shape ``(W,)`` or ``(N, W)``.
threshold: Modified Z-score threshold for spike detection.
Values of 3.0–5.0 are typical; lower catches more spikes.
window_size: Size of the rolling window for local statistics.
Must be odd and >= 3.
Returns:
Despiked intensities, same shape as input.
Raises:
SpectrumShapeError: If *intensities* is not 1-D or 2-D.
EmptySpectrumError: If *intensities* has zero elements.
ValueError: If *threshold* <= 0, *window_size* <Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
_despike_zscore_1d function · python · L82-L98 (17 LOC)src/spectrakit/despike/zscore.py
def _despike_zscore_1d(
intensities: np.ndarray,
*,
threshold: float,
window_size: int,
) -> np.ndarray:
"""Rolling Z-score despiking for a single spectrum."""
medians = median_filter(intensities, size=window_size, mode="nearest")
abs_dev = np.abs(intensities - medians)
mads = median_filter(abs_dev, size=window_size, mode="nearest")
z_scores = _MAD_SCALE * (intensities - medians) / (mads + EPSILON)
spike_mask = np.abs(z_scores) > threshold
result = intensities.copy()
result[spike_mask] = medians[spike_mask]
return resultread_spectrum function · python · L52-L95 (44 LOC)src/spectrakit/io/auto.py
def read_spectrum(
path: str | Path,
*,
format: str | None = None,
**kwargs: Any,
) -> Spectrum:
"""Read a spectral file with automatic format detection.
Determines the file format from the extension or the *format* hint,
then delegates to the appropriate reader. This is the recommended
entry point when the exact format is not known in advance.
Args:
path: Path to the spectral file.
format: Explicit format hint. One of ``"jcamp"``, ``"spc"``,
``"csv"``, ``"opus"``, ``"hdf5"``. If ``None`` (default),
the format is inferred from the file extension.
**kwargs: Additional keyword arguments passed to the
underlying reader. For CSV files, common options include
``delimiter``, ``x_column``, ``y_column``.
Returns:
A Spectrum object loaded from the file.
Raises:
FileFormatError: If the format cannot be determined or is
not supported.
FileNotFdetect_format function · python · L98-L143 (46 LOC)src/spectrakit/io/auto.py
def detect_format(path: str | Path) -> str:
"""Detect the spectral file format from extension and magic bytes.
Args:
path: Path to the spectral file.
Returns:
Format string: ``"jcamp"``, ``"spc"``, ``"csv"``, ``"opus"``,
or ``"hdf5"``.
Raises:
FileFormatError: If the format cannot be determined.
Examples:
>>> detect_format("sample.jdx")
'jcamp'
>>> detect_format("data.csv")
'csv'
>>> detect_format("spectrum.0")
'opus'
"""
file_path = Path(path)
suffix = file_path.suffix.lower()
# Check direct extension mapping
if suffix in _EXTENSION_MAP:
module_path, func_name = _EXTENSION_MAP[suffix]
# Derive format name from function name: read_jcamp → jcamp
return func_name.replace("read_", "")
# Check OPUS numeric extension pattern
if _OPUS_PATTERN.match(suffix):
return "opus"
# Try magic bytes if file exists
if file_path.exi_read_with_format function · python · L146-L173 (28 LOC)src/spectrakit/io/auto.py
def _read_with_format(
path: Path,
format: str,
**kwargs: Any,
) -> Spectrum:
"""Read a file using an explicit format hint."""
format_lower = format.lower().strip()
format_to_reader: dict[str, tuple[str, str]] = {
"jcamp": ("spectrakit.io.jcamp", "read_jcamp"),
"jdx": ("spectrakit.io.jcamp", "read_jcamp"),
"spc": ("spectrakit.io.spc", "read_spc"),
"csv": ("spectrakit.io.csv", "read_csv"),
"tsv": ("spectrakit.io.csv", "read_csv"),
"opus": ("spectrakit.io.opus", "read_opus"),
"hdf5": ("spectrakit.io.hdf5", "read_hdf5"),
"h5": ("spectrakit.io.hdf5", "read_hdf5"),
}
if format_lower not in format_to_reader:
supported = ", ".join(sorted(format_to_reader))
raise FileFormatError(f"Unknown format '{format}'. Supported: {supported}")
module_path, func_name = format_to_reader[format_lower]
reader = _import_reader(module_path, func_name)
logger.info("Reading '%s' as %s (e_read_with_detection function · python · L176-L211 (36 LOC)src/spectrakit/io/auto.py
def _read_with_detection(path: Path, **kwargs: Any) -> Spectrum:
"""Read a file by detecting its format."""
suffix = path.suffix.lower()
# Direct extension match
if suffix in _EXTENSION_MAP:
module_path, func_name = _EXTENSION_MAP[suffix]
reader = _import_reader(module_path, func_name)
# For CSV/TSV, infer delimiter from extension
if suffix in _CSV_EXTENSIONS and "delimiter" not in kwargs:
if suffix == ".tsv":
kwargs["delimiter"] = "\t"
format_name = func_name.replace("read_", "")
logger.info("Reading '%s' as %s (detected from extension)", path.name, format_name)
return reader(path, **kwargs)
# OPUS numeric extension
if _OPUS_PATTERN.match(suffix):
from spectrakit.io.opus import read_opus
logger.info("Reading '%s' as opus (numeric extension)", path.name)
return read_opus(path, **kwargs)
# Fall back to magic bytes
detected = _detect_from_magic_detect_from_magic function · python · L214-L237 (24 LOC)src/spectrakit/io/auto.py
def _detect_from_magic(path: Path) -> str | None:
"""Try to identify format from the first few bytes of the file."""
try:
with open(path, "rb") as f:
header = f.read(16)
except OSError:
return None
if len(header) < 2:
return None
# Check OPUS magic bytes (0x0a 0x0a at offset 0)
if header[:2] == b"\x0a\x0a":
return "opus"
# Check JCAMP-DX (starts with ##)
if header[:2] == b"##":
return "jcamp"
# Check HDF5 magic (\x89HDF\r\n\x1a\n)
if header[:8] == b"\x89HDF\r\n\x1a\n":
return "hdf5"
return None_import_reader function · python · L240-L245 (6 LOC)src/spectrakit/io/auto.py
def _import_reader(module_path: str, func_name: str) -> Any:
"""Lazily import a reader function from a module."""
import importlib
module = importlib.import_module(module_path)
return getattr(module, func_name)read_csv function · python · L17-L90 (74 LOC)src/spectrakit/io/csv.py
def read_csv(
path: str | Path,
delimiter: str = ",",
x_column: int = 0,
y_column: int = 1,
skip_header: int = 0,
orientation: Literal["columns", "rows"] = "columns",
) -> Spectrum:
"""Read spectral data from a CSV or TSV file.
Args:
path: Path to the CSV file.
delimiter: Column separator. Use "\\t" for TSV.
x_column: Index of the wavenumber/wavelength column. Set to -1
to indicate no x-axis column (y data only).
y_column: Index of the intensity column (for single-spectrum files).
Ignored when orientation="rows".
skip_header: Number of header lines to skip.
orientation: "columns" means each column is a variable (x, y1, y2...);
"rows" means each row is a full spectrum.
Returns:
Spectrum with intensities and optional wavenumbers.
"""
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"CSV file not found: {path}")
validateHi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
write_csv function · python · L93-L145 (53 LOC)src/spectrakit/io/csv.py
def write_csv(
spectrum: Spectrum,
path: str | Path,
delimiter: str = ",",
header: bool = True,
) -> None:
"""Write a Spectrum to a CSV file.
Writes wavenumbers (if available) as the first column and
intensities as subsequent columns. For multi-spectrum data
``(N, W)``, each spectrum becomes a column.
Args:
spectrum: Spectrum to save.
path: Output file path.
delimiter: Column separator. Use ``"\\t"`` for TSV output.
header: If ``True``, write a header row with column names.
Raises:
ValueError: If *spectrum* has no data.
"""
path = Path(path)
intensities = spectrum.intensities
wavenumbers = spectrum.wavenumbers
if intensities.ndim == 1:
if wavenumbers is not None:
data = np.column_stack([wavenumbers, intensities])
col_names = ["wavenumber", "intensity"]
else:
data = intensities.reshape(-1, 1)
col_names = ["intensity"]
read_hdf5 function · python · L18-L68 (51 LOC)src/spectrakit/io/hdf5.py
def read_hdf5(
path: str | Path,
intensities_key: str = "intensities",
wavenumbers_key: str = "wavenumbers",
) -> Spectrum:
"""Read spectral data from an HDF5 file.
Args:
path: Path to the .h5 / .hdf5 file.
intensities_key: Dataset key for intensity values.
wavenumbers_key: Dataset key for wavenumber values.
Returns:
Spectrum loaded from the HDF5 datasets.
Raises:
ImportError: If h5py is not installed.
FileNotFoundError: If path does not exist.
"""
try:
import h5py
except ImportError as e:
raise DependencyError(
"h5py is required for HDF5 files. Install with: pip install spectrakit[io]"
) from e
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"HDF5 file not found: {path}")
validate_file_size(path.stat().st_size, path_name=str(path))
with h5py.File(path, "r") as f:
intensities = np.array(f[intensities_key], dtywrite_hdf5 function · python · L71-L103 (33 LOC)src/spectrakit/io/hdf5.py
def write_hdf5(
spectrum: Spectrum,
path: str | Path,
intensities_key: str = "intensities",
wavenumbers_key: str = "wavenumbers",
) -> None:
"""Write a Spectrum to an HDF5 file.
Args:
spectrum: Spectrum to save.
path: Output file path.
intensities_key: Dataset key for intensity values.
wavenumbers_key: Dataset key for wavenumber values.
"""
try:
import h5py
except ImportError as e:
raise DependencyError(
"h5py is required for HDF5 files. Install with: pip install spectrakit[io]"
) from e
path = Path(path)
with h5py.File(path, "w") as f:
f.create_dataset(intensities_key, data=spectrum.intensities)
if spectrum.wavenumbers is not None:
f.create_dataset(wavenumbers_key, data=spectrum.wavenumbers)
for key, value in spectrum.metadata.items():
try:
f.attrs[key] = value
except TypeError:
f.aread_jcamp function · python · L21-L95 (75 LOC)src/spectrakit/io/jcamp.py
def read_jcamp(path: str | Path) -> Spectrum:
"""Read a JCAMP-DX file and return a Spectrum.
Parses ##XYDATA=(X++(Y..Y)) format. Supports AFFN (ASCII free-format
numeric) encoding.
Args:
path: Path to the .dx / .jdx / .jcamp file.
Returns:
Spectrum with intensities shape (W,) and wavenumbers shape (W,).
Raises:
FileNotFoundError: If path does not exist.
ValueError: If the file cannot be parsed.
"""
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"JCAMP file not found: {path}")
validate_file_size(path.stat().st_size, path_name=str(path))
metadata: dict[str, str] = {}
x_values: list[float] = []
y_values: list[float] = []
in_xydata = False
with open(path, encoding="utf-8", errors="replace") as f:
for line in f:
line = line.strip()
if not line:
continue
ldr_match = _LDR_PATTERN.match(line)
if ldrwrite_jcamp function · python · L101-L165 (65 LOC)src/spectrakit/io/jcamp.py
def write_jcamp(
spectrum: Spectrum,
path: str | Path,
title: str = "",
data_type: str = "INFRARED SPECTRUM",
x_units: str = "1/CM",
y_units: str = "ABSORBANCE",
) -> None:
"""Write a single-spectrum Spectrum to a JCAMP-DX file.
Produces a standard JCAMP-DX 5.0 file with ``##XYDATA=(X++(Y..Y))``
format (AFFN encoding). Only 1-D spectra are supported; for batch
data, iterate and write each spectrum individually.
Args:
spectrum: Spectrum to save (must be 1-D).
path: Output file path (.dx or .jdx).
title: Title label for ``##TITLE=``.
data_type: Value for ``##DATA TYPE=``.
x_units: Value for ``##XUNITS=``.
y_units: Value for ``##YUNITS=``.
Raises:
ValueError: If *spectrum* intensities are not 1-D or
wavenumbers are missing.
"""
if spectrum.intensities.ndim != 1:
raise ValueError(
f"write_jcamp requires 1-D intensities, got shape {spectrum.inten_parse_directory function · python · L60-L151 (92 LOC)src/spectrakit/io/opus.py
def _parse_directory(
raw: bytes,
) -> list[tuple[int, int, int]]:
"""Parse the OPUS block directory from raw file bytes.
The directory starts at a known offset in the file header. Each entry
is 12 bytes: block_type (uint32-LE), block_length (uint32-LE),
block_offset (uint32-LE). The first directory entry is typically at
byte offset 24.
Args:
raw: Complete file contents as bytes.
Returns:
List of (block_type, block_length, block_offset) tuples. Entries
with block_type == 0 (end sentinel) are excluded.
Raises:
FileFormatError: If the directory cannot be parsed.
"""
if len(raw) < _MIN_FILE_SIZE:
raise FileFormatError(
f"File too small to be OPUS format ({len(raw)} bytes, minimum {_MIN_FILE_SIZE})."
)
# The first 4 bytes are magic / version marker.
# Byte 4 (or offset ~8 in some variants) typically contains metadata.
# The directory pointer is at offset 8 (uint32-LE) in_parse_parameter_block function · python · L154-L244 (91 LOC)src/spectrakit/io/opus.py
def _parse_parameter_block(
raw: bytes,
offset: int,
length: int,
) -> dict[str, Any]:
"""Parse an OPUS parameter block into a tag-value dictionary.
OPUS parameter blocks store values as sequences of:
- 3-byte ASCII tag (e.g., ``NPT``, ``FXV``, ``LXV``)
- 1-byte type code (0=int, 1=double, 2=string, 3=enum)
- 2-byte size (uint16-LE, total size of value in bytes)
- N bytes value
Args:
raw: Complete file contents as bytes.
offset: Byte offset where the parameter block starts.
length: Length of the parameter block in 4-byte words (as stored
in the directory; multiply by 4 for byte length, though we
use it as a byte limit hint).
Returns:
Dictionary mapping 3-char tags to their parsed values.
"""
params: dict[str, Any] = {}
# Block length in the directory is in bytes for our purposes.
# We clamp to file size to be safe.
block_end = min(offset + length, l_find_blocks_by_type function · python · L247-L264 (18 LOC)src/spectrakit/io/opus.py
def _find_blocks_by_type(
entries: list[tuple[int, int, int]],
type_mask: int,
) -> list[tuple[int, int, int]]:
"""Filter directory entries matching a given type in the lower byte(s).
Args:
entries: Parsed directory entries.
type_mask: Block type to match against the lower byte of each entry.
Returns:
Matching entries.
"""
return [
(btype, blen, boff)
for btype, blen, boff in entries
if (btype & 0xFF) == type_mask or (btype & 0xFFFF) == type_mask
]Repobility · MCP-ready · https://repobility.com
_read_float32_block function · python · L267-L293 (27 LOC)src/spectrakit/io/opus.py
def _read_float32_block(
raw: bytes,
offset: int,
n_points: int,
) -> np.ndarray:
"""Read *n_points* IEEE 754 float32 values from *raw* at *offset*.
Args:
raw: Complete file contents.
offset: Byte offset where the float32 array begins.
n_points: Number of float32 values to read.
Returns:
1-D numpy array of shape ``(n_points,)`` with dtype float64.
Raises:
FileFormatError: If insufficient bytes are available.
"""
needed = n_points * 4
if offset + needed > len(raw):
raise FileFormatError(
f"Data block at offset {offset} requires {needed} bytes for "
f"{n_points} float32 values, but only "
f"{len(raw) - offset} bytes remain."
)
arr = np.frombuffer(raw, dtype="<f4", count=n_points, offset=offset)
return arr.astype(np.float64)read_opus function · python · L296-L446 (151 LOC)src/spectrakit/io/opus.py
def read_opus(path: str | Path) -> Spectrum:
"""Read a Bruker OPUS binary file and return a Spectrum.
Parses the OPUS binary format natively without external dependencies.
Extracts the absorbance/transmittance spectrum, wavenumber axis
(computed from FXV/LXV parameters), and metadata.
Args:
path: Path to the OPUS file (.0, .1, .2, etc.).
Returns:
Spectrum with intensities shape ``(W,)`` and wavenumbers ``(W,)``.
Raises:
FileNotFoundError: If *path* does not exist.
FileFormatError: If the file cannot be parsed as valid OPUS.
"""
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"OPUS file not found: {path}")
try:
raw = path.read_bytes()
except OSError as exc:
raise FileFormatError(f"Cannot read OPUS file: {exc}") from exc
validate_file_size(len(raw), path_name=str(path))
if len(raw) < _MIN_FILE_SIZE:
raise FileFormatError(f"File too small to be OPread_spc function · python · L17-L68 (52 LOC)src/spectrakit/io/spc.py
def read_spc(path: str | Path) -> Spectrum:
"""Read a Galactic SPC file and return a Spectrum.
Requires the ``spc-spectra`` package (install via
``pip install spectrakit[io]``).
Args:
path: Path to the .spc file.
Returns:
Spectrum with intensities shape (W,) or (N, W) for multi-trace files.
Raises:
ImportError: If spc-spectra is not installed.
FileNotFoundError: If path does not exist.
"""
try:
import spc
except ImportError as e:
raise DependencyError(
"spc-spectra is required for SPC files. Install with: pip install spectrakit[io]"
) from e
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"SPC file not found: {path}")
validate_file_size(path.stat().st_size, path_name=str(path))
f = spc.File(str(path))
if f.fnsub == 1:
wavenumbers = np.array(f.x, dtype=np.float64)
intensities = np.array(f.sub[0].y, dtype=np.float64)
page 1 / 3next ›