← back to ionworks__ionworksdata

Function bodies 175 total

All specs Real LLM only Function bodies
DataLoader.copy method · python · L1378-L1386 (9 LOC)
ionworksdata/load.py
    def copy(self) -> DataLoader:
        """Create a copy of the DataLoader instance."""
        return DataLoader.from_processed_data(
            data=self._data_pl.clone(),
            steps=self._steps_pl.clone() if self._steps_pl is not None else None,
            initial_voltage=self.initial_voltage,
            start_idx=self._start_idx,
            end_idx=self._end_idx,
        )
OCPDataLoader.__init__ method · python · L1392-L1413 (22 LOC)
ionworksdata/load.py
    def __init__(self, data, **kwargs):
        warnings.warn(
            "OCPDataLoader is deprecated. Use DataLoader(data) instead.",
            DeprecationWarning,
            stacklevel=2,
        )
        # Map old flat options into transforms
        options = kwargs.pop("options", None) or {}
        merged = {**options, **kwargs}
        transforms = dict(merged.pop("transforms", None) or {})
        for key in (
            "sort",
            "remove_duplicates",
            "remove_extremes",
            "filters",
            "interpolate",
        ):
            if key in merged and key not in transforms:
                transforms[key] = merged.pop(key)
        if transforms:
            merged["transforms"] = transforms
        super().__init__(data, steps=None, **merged)
OCPDataLoader.from_db method · python · L1416-L1424 (9 LOC)
ionworksdata/load.py
    def from_db(cls, measurement_id, options=None, use_cache=True, timeout=None):
        warnings.warn(
            "OCPDataLoader.from_db is deprecated. Use DataLoader.from_db instead.",
            DeprecationWarning,
            stacklevel=2,
        )
        return DataLoader.from_db(
            measurement_id, options=options, use_cache=use_cache, timeout=timeout
        )
get_log_level_func function · python · L19-L24 (6 LOC)
ionworksdata/logger.py
def get_log_level_func(value_to_log: int):
    def func(self, message: str, *args, **kws) -> None:
        if self.isEnabledFor(value_to_log):
            self._log(value_to_log, message, args, **kws)

    return func
_get_new_logger function · python · L46-L54 (9 LOC)
ionworksdata/logger.py
def _get_new_logger(name: str, filename: str | None = None) -> logging.Logger:
    new_logger = logging.getLogger(name)
    if filename is None:
        handler = logging.StreamHandler()
    else:
        handler = logging.FileHandler(filename)
    handler.setFormatter(LOG_FORMATTER)
    new_logger.addHandler(handler)
    return new_logger
_trapezoid function · python · L24-L50 (27 LOC)
ionworksdata/piecewise_linear_timeseries.py
def _trapezoid(y, x=None, dx=1.0, axis=-1):
    """
    Compatibility wrapper for numpy trapezoid integration.

    Uses np.trapezoid for numpy >= 2.0.0, otherwise falls back to np.trapz.

    Parameters
    ----------
    y : array_like
        Input array to integrate.
    x : array_like, optional
        The sample points corresponding to the y values.
    dx : scalar, optional
        The spacing between sample points when x is None.
    axis : int, optional
        The axis along which to integrate.

    Returns
    -------
    float or ndarray
        Definite integral as approximated by trapezoidal rule.
    """
    # Try np.trapezoid first (NumPy >= 2.0), fall back to np.trapz (NumPy < 2.0)
    try:
        return np.trapezoid(y, x=x, dx=dx, axis=axis)
    except AttributeError:
        return np.trapz(y, x=x, dx=dx, axis=axis)
PiecewiseLinearTimeseries.__init__ method · python · L84-L118 (35 LOC)
ionworksdata/piecewise_linear_timeseries.py
    def __init__(
        self,
        t_data: np.ndarray,
        y_data: np.ndarray,
        atol: float | None = None,
        rtol: float | None = None,
        name: str | None = None,
        options: dict[str, Any] | None = None,
    ):
        if atol is None:
            atol = _default_atol()
        self.atol = atol

        if rtol is None:
            rtol = _default_rtol()
        self.rtol = rtol

        self.name = name or "Piecewise linear timeseries"

        # Set options
        options = options or {}
        default_options = {
            "solver_max_save_points": None,
            "interactive_preprocessing": False,
            "window_max": _default_window_max(),
        }
        options = iwutil.check_and_combine_options(
            default_options, options, filter_unknown=True
        )
        self.options = options

        self.t_data = t_data
        self.y_data = y_data

        self._linearize()
All rows scored by the Repobility analyzer (https://repobility.com)
PiecewiseLinearTimeseries._linearize method · python · L120-L194 (75 LOC)
ionworksdata/piecewise_linear_timeseries.py
    def _linearize(self) -> None:
        """
        Linearizes the time series data based on the provided solver and options.
        This method processes the time series data (`t_data` and `y_data`) and
        linearizes it according to the specified solver and options. The method
        also identifies discontinuities in the data and reduces the number of
        save points if necessary.
        Attributes:
        -----------
        t_data : array-like
            The time data points.
        y_data : array-like
            The corresponding data values.
        atol : float
            Absolute tolerance for the solver.
        rtol : float
            Relative tolerance for the solver.
        options : dict
            Dictionary containing various options for preprocessing and solver
            settings.
        """

        t = self.t_data
        y = self.y_data
        atol = self.atol
        rtol = self.rtol
        name = self.name

        solver_max_save_points 
PiecewiseLinearTimeseries.interpolant method · python · L196-L226 (31 LOC)
ionworksdata/piecewise_linear_timeseries.py
    def interpolant(
        self, interpolator: str = "linear", name: str | None = None, **kwargs
    ) -> pybamm.Interpolant:
        r"""
        Generate an interpolant for the given sparse time series data.

        Parameters
        ----------
        interpolator : str, optional
            The type of interpolation to use. Default is "linear".
        name : str, optional
            The name of the interpolant. Default is the name of the timeseries.
        \*\*kwargs
            Additional keyword arguments to pass to pybamm.Interpolant.

        Returns
        -------
        pybamm.Interpolant
            An interpolant object for the sparse time series data.
        """
        name = name or self.name

        itp = pybamm.Interpolant(
            self.t_sparse,
            self.y_sparse,
            pybamm.t,
            interpolator=interpolator,
            name=name,
            **kwargs,
        )
        return itp
process_input_data function · python · L229-L315 (87 LOC)
ionworksdata/piecewise_linear_timeseries.py
def process_input_data(
    t: np.ndarray,
    y: np.ndarray,
    rtol: float,
    atol: float,
    window_max: int,
) -> np.ndarray:
    """
    Process data to find significant segments and points for efficient representation.

    This function identifies key points in the data that represent significant changes
    or important features, allowing for a more compact representation of the data
    while preserving its essential characteristics.

    Parameters
    ----------
    t : array-like
        Full time array of the data.
    y : array-like
        Full value array of the data.
    rtol : float
        Relative tolerance for identifying significant changes.
    atol : float
        Absolute tolerance for identifying significant changes.
    window_max : int
        Maximum window size for removing neighboring points.

    Returns
    -------
    array
        An array of indices representing key points in the data.
    """

    def atol_check(y: np.ndarray, atol: float):
    
find_input_discontinuities function · python · L318-L357 (40 LOC)
ionworksdata/piecewise_linear_timeseries.py
def find_input_discontinuities(
    t: np.ndarray,
    y: np.ndarray,
    atol: float,
    rtol: float,
    scale_factor: float | None = None,
) -> np.ndarray:
    """
    Find discontinuities in the data based on changes in slope.

    This function identifies points where the change in slope exceeds a threshold,
    which is determined by both absolute and relative tolerances.

    Parameters
    ----------
    t : array-like
        Time array of the data.
    y : array-like
        Value array of the data.
    atol : float
        Absolute tolerance for identifying discontinuities.
    rtol : float
        Relative tolerance for identifying discontinuities.
    scale_factor : float, optional (default=1)
        Factor to scale the threshold for identifying discontinuities.

    Returns
    -------
    array
        An array of indices where discontinuities are detected.
    """
    if scale_factor is None:
        scale_factor = 1
    t_diff = np.diff(t)
    t_diff[t_diff == 0] = a
find_contiguous_segments function · python · L360-L389 (30 LOC)
ionworksdata/piecewise_linear_timeseries.py
def find_contiguous_segments(mask: np.ndarray) -> list[list[int]]:
    """
    Find contiguous segments in a boolean mask.

    Parameters
    ----------
    mask : array-like
        Boolean mask to find segments in.

    Returns
    -------
    list
        List of [start, end] indices for each contiguous segment.
    """
    segments = []
    start = None

    for i, val in enumerate(mask):
        if val:
            if start is None:
                start = i
        else:
            if start is not None and i >= start + 2:
                segments.append([start, i])
            start = None

    if start is not None and len(mask) >= start + 2:
        segments.append([start, len(mask) - 1])

    return segments
calculate_linear_fit_error function · python · L392-L413 (22 LOC)
ionworksdata/piecewise_linear_timeseries.py
def calculate_linear_fit_error(t: np.ndarray, y: np.ndarray, atol: float) -> float:
    """
    Calculate the error of a linear fit to the data.

    Parameters
    ----------
    t : array-like
        Time array for the segment.
    y : array-like
        Value array for the segment.
    atol : float
        Absolute tolerance for calculations.

    Returns
    -------
    float
        Error of the linear fit.
    """
    if len(t) <= 2 or t[0] == t[-1]:
        return 0
    slope = (y[-1] - y[0]) / (t[-1] - t[0])
    return _trapezoid(np.abs(y - (slope * (t - t[0]) + y[0])), t)
calculate_normalized_linear_fit_error function · python · L416-L439 (24 LOC)
ionworksdata/piecewise_linear_timeseries.py
def calculate_normalized_linear_fit_error(
    t: np.ndarray, y: np.ndarray, atol: float
) -> float:
    """
    Calculate the normalized error of a linear fit to the data.

    Parameters
    ----------
    t : array-like
        Time array for the segment.
    y : array-like
        Value array for the segment.
    atol : float
        Absolute tolerance for calculations.

    Returns
    -------
    float
        Normalized error of the linear fit.
    """
    numerator = calculate_linear_fit_error(t, y, atol)
    if numerator == 0:
        return 0
    return numerator / (np.abs(_trapezoid(y, t)) + atol)
find_linear_segment_end function · python · L442-L467 (26 LOC)
ionworksdata/piecewise_linear_timeseries.py
def find_linear_segment_end(
    t: np.ndarray, y: np.ndarray, rtol: float, atol: float
) -> int:
    """
    Find the end index of a linear segment.

    Parameters
    ----------
    t : array-like
        Time array for the segment.
    y : array-like
        Value array for the segment.
    rtol : float
        Relative tolerance for identifying significant changes.
    atol : float
        Absolute tolerance for calculations.

    Returns
    -------
    int
        Index where the segment stops being linear within tolerance.
    """
    for i in range(2, len(t)):
        if calculate_normalized_linear_fit_error(t[: i + 1], y[: i + 1], atol) > rtol:
            return i - 1
    return len(t) - 1
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
find_linear_segments function · python · L470-L503 (34 LOC)
ionworksdata/piecewise_linear_timeseries.py
def find_linear_segments(
    t: np.ndarray, y: np.ndarray, rtol: float, atol: float
) -> np.ndarray:
    """
    Find all linear segments in the data.

    Parameters
    ----------
    t : array-like
        Time array for the data.
    y : array-like
        Value array for the data.
    rtol : float
        Relative tolerance for identifying significant changes.
    atol : float
        Absolute tolerance for calculations.

    Returns
    -------
    array
        Array of indices where linear segments end.
    """
    points = [0, len(t) - 1]

    start = 0
    while True:
        seg_end = find_linear_segment_end(t, y, rtol=rtol, atol=atol)
        start += seg_end
        if start in points:
            break
        points.append(start)
        t = t[seg_end:]
        y = y[seg_end:]
    return np.sort(points)
find_optimal_midpoint function · python · L506-L556 (51 LOC)
ionworksdata/piecewise_linear_timeseries.py
def find_optimal_midpoint(t: np.ndarray, y: np.ndarray, N_max: int, atol: float) -> int:
    """
    Find the optimal midpoint in a segment that minimizes linear fit error.

    Parameters
    ----------
    t : array-like
        Time array for the segment.
    y : array-like
        Value array for the segment.
    N_max : int
        Maximum number of segments to consider in optimization steps.
    atol : float
        Absolute tolerance for calculations.

    Returns
    -------
    int
        Index of the optimal midpoint.
    """
    err_best = np.inf
    idx_best = 0
    idx_left = 0
    idx_right = 0

    reduced_segments = len(t) > N_max
    if reduced_segments:
        segments = np.linspace(0, len(t) - 1, N_max, dtype=int)
    else:
        segments = np.arange(len(t))

    for idx, i in enumerate(segments):
        t_vec1 = t[: i + 1]
        y_vec1 = y[: i + 1]
        t_vec2 = t[i:]
        y_vec2 = y[i:]
        err = calculate_linear_fit_error(
            t_vec1, y_ve
calc_r2 function · python · L559-L584 (26 LOC)
ionworksdata/piecewise_linear_timeseries.py
def calc_r2(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    """
    Compute the R^2 (coefficient of determination).

    Parameters
    ----------
    y_true : array-like, shape (n,)
        Observed (true) values.
    y_pred : array-like, shape (n,)
        Predicted values.

    Returns
    -------
    r2_weighted : float
        The R^2 value.
    """
    # Weighted sum of squares of residuals
    ss_res = np.sum((y_true - y_pred) ** 2)

    # Weighted total sum of squares
    ss_tot = np.sum((y_true - np.average(y_true)) ** 2)

    # Compute R^2. Add a small number to the denominator to avoid division by zero.
    r2 = 1 - ss_res / (ss_tot + 1e-100)

    return r2
interactive_time_stepping_fit function · python · L587-L657 (71 LOC)
ionworksdata/piecewise_linear_timeseries.py
def interactive_time_stepping_fit(
    t: np.ndarray, y: np.ndarray, atol: float, rtol: float, window_max: int, name: str
) -> tuple[float, float]:
    # Create the plot
    fig, ax = plt.subplots()
    plt.subplots_adjust(bottom=0.35)

    # Plot the raw data (which doesn't change)
    ax.plot(t, y, "o", label="Raw data", color="gray", alpha=0.5, ms=4)

    # Initial plot for the piecewise linear fit
    segments = process_input_data(t, y, atol=atol, rtol=rtol, window_max=window_max)
    t_sparse = t[segments]
    y_sparse = y[segments]

    [line_fit] = ax.plot(
        t_sparse, y_sparse, "x-", label="Piecewise linear fit", color="red", lw=2
    )
    ax.set_xlabel("Time")
    ax.set_ylabel("Value")

    def make_title(t_sparse: np.ndarray, y_sparse: np.ndarray) -> str:
        y_linear = np.interp(t, t_sparse, y_sparse)
        R2 = calc_r2(y, y_linear)
        title = f"Piecewise linear fit - {100 * (1 - len(t_sparse) / len(t)):.2f}% reduction, {R2:.5f} R²"
        return title

 
_default_atol function · python · L660-L665 (6 LOC)
ionworksdata/piecewise_linear_timeseries.py
def _default_atol() -> float:
    """
    Default absolute tolerance for the solver. Matches
    the `IDAKLUSolver` default.
    """
    return 1e-6
_default_rtol function · python · L668-L673 (6 LOC)
ionworksdata/piecewise_linear_timeseries.py
def _default_rtol() -> float:
    """
    Default relative tolerance for the solver. Matches
    the `IDAKLUSolver` default.
    """
    return 1e-4
Biologic._get_file_args method · python · L34-L78 (45 LOC)
ionworksdata/read/biologic.py
    def _get_file_args(
        filename: str | Path, options: dict[str, str] | None = None
    ) -> tuple[int, str]:
        """
        Get file arguments for reading a Biologic file.

        Parameters
        ----------
        filename : str | Path
            Path to the Biologic file.
        options : dict[str, str] | None
            Options dict with file_encoding key.

        Returns
        -------
        tuple[int, str]
            Tuple of (skiprows, sep).
        """
        encoding = options["file_encoding"]
        ext = Path(filename).suffix.lower()

        with open(filename, encoding=encoding) as f:
            lines = f.readlines()

        # Determine separator based on file type
        if ext == ".mpt":
            sep = "\t"
        else:
            # Auto-detect for other file types
            sep = "\t" if any("\t" in line for line in lines[:20]) else ","

        # Determine skiprows
        # Try to find line with "Nb header lines : int" and extract 
Biologic._get_column_renamings method · python · L81-L100 (20 LOC)
ionworksdata/read/biologic.py
    def _get_column_renamings() -> dict[str, str]:
        """
        Get standard column renaming mappings for Biologic files.

        Returns
        -------
        dict[str, str]
            Dictionary mapping original column names to standardized names.
        """
        return {
            "Ecell/V": "Voltage [V]",
            "Ewe/V": "Voltage [V]",
            "<Ewe>/V": "Voltage [V]",
            "I/mA": "Current [mA]",
            "<I>/mA": "Current [mA]",
            "time/s": "Time [s]",
            "Ns": "Step from cycler",
            "Cycle number": "Cycle from cycler",
            "cycle number": "Cycle from cycler",
        }
Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
Biologic.run method · python · L102-L207 (106 LOC)
ionworksdata/read/biologic.py
    def run(
        self,
        filename: str | Path,
        extra_column_mappings: dict[str, str] | None = None,
        options: dict[str, str] | None = None,
    ) -> pl.DataFrame:
        """
        Read and process data from a BioLogic file.

        The following column mappings are applied by default:

            - "Ecell/V" -> "Voltage [V]"
            - "Ewe/V" -> "Voltage [V]"
            - "I/mA" -> "Current [mA]"
            - "<I>/mA" -> "Current [mA]"
            - "time/s" -> "Time [s]"
            - "Ns" -> "Step from cycler"
            - "Cycle number" -> "Cycle from cycler"

        Additional column mappings can be provided via extra_column_mappings.

        Parameters
        ----------
        filename : str
            Path to the BioLogic file to be read (.mpt or .txt).
        extra_column_mappings : dict of str to str, optional
            Dictionary of additional column mappings. Keys are original column
            names, values are the new column nam
Biologic.read_start_time method · python · L209-L251 (43 LOC)
ionworksdata/read/biologic.py
    def read_start_time(
        self,
        filename: str | Path,
        extra_column_mappings: dict[str, str] | None = None,
        options: dict[str, str] | None = None,
    ) -> datetime | None:
        """
        Read the start time from a BioLogic file.

        Parameters
        ----------
        filename : str
            Path to the BioLogic file to be read (.mpt or .txt).
        options : dict of str to str, optional
            Dictionary of options for reading the BioLogic file.

        Returns
        -------
        datetime | None
            The start time of the BioLogic file, or None if not found.
        """
        options = iwutil.check_and_combine_options(self.default_options, options)
        skiprows, sep = self._get_file_args(filename, options)

        # Try to load the date column
        data = pl.read_csv(
            filename,
            encoding=options["file_encoding"],
            separator=sep,
            skip_rows=skiprows,
            trun
biologic function · python · L265-L272 (8 LOC)
ionworksdata/read/biologic.py
def biologic(
    filename: str | Path,
    extra_column_mappings: dict[str, str] | None = None,
    options: dict[str, str] | None = None,
) -> pl.DataFrame:
    return Biologic().run(
        filename, extra_column_mappings=extra_column_mappings, options=options
    )
biologic_mpt function · python · L275-L282 (8 LOC)
ionworksdata/read/biologic.py
def biologic_mpt(
    filename: str | Path,
    extra_column_mappings: dict[str, str] | None = None,
    options: dict[str, str] | None = None,
) -> pl.DataFrame:
    return BiologicMPT().run(
        filename, extra_column_mappings=extra_column_mappings, options=options
    )
_find_column function · python · L113-L124 (12 LOC)
ionworksdata/read/csv.py
def _find_column(
    data_columns: list[str], options: list[dict]
) -> tuple[str, float, float]:
    """
    Find the first column in a list of options that is present in a DataFrame.
    """
    for values_scale_shift in options:
        for column in values_scale_shift["values"]:
            if column in data_columns:
                return column, values_scale_shift["scale"], values_scale_shift["shift"]
    else:
        raise ValueError(f"Could not find appropriate column out of {options}")
CSV.run method · python · L133-L245 (113 LOC)
ionworksdata/read/csv.py
    def run(
        self,
        filename: str | Path,
        extra_column_mappings: dict[str, str] | None = None,
        options: dict[str, str] | None = None,
    ) -> pl.DataFrame:
        """
        Read a CSV file and return a Polars DataFrame with appropriate column names.

        Parameters
        ----------
        filename : str | Path
            Path to the CSV file to be read.
        extra_column_mappings : dict[str, str] | None, optional
            Dictionary of additional column mappings to use when reading the CSV file.
            The keys are the original column names and the values are the new column
            names. Default is None.
        options : dict[str, str] | None, optional
            Dictionary of options to use when reading the CSV file.

            Options are:

                - cell_metadata: dict, optional
                    Additional metadata about the cell. Default is empty dict.

        Returns
        -------
        pl.DataFrame
   
CSV.read_start_time method · python · L247-L257 (11 LOC)
ionworksdata/read/csv.py
    def read_start_time(
        self,
        filename: str | Path,
        extra_column_mappings: dict[str, str] | None = None,
        options: dict[str, str] | None = None,
    ) -> None:
        warnings.warn(
            "CSV reader does not support reading start time from file",
            stacklevel=2,
        )
        return None
csv function · python · L260-L267 (8 LOC)
ionworksdata/read/csv.py
def csv(
    filename: str | Path,
    extra_column_mappings: dict[str, str] | None = None,
    options: dict[str, str] | None = None,
) -> pl.DataFrame:
    return CSV().run(
        filename, extra_column_mappings=extra_column_mappings, options=options
    )
Repobility (the analyzer behind this table) · https://repobility.com
_is_maccor_excel function · python · L65-L98 (34 LOC)
ionworksdata/read/detect.py
def _is_maccor_excel(filename: Path) -> bool:
    """
    Check if an Excel file is a Maccor file by examining column headers.

    Assumes the first row is always the header row.

    Parameters
    ----------
    filename : Path
        Path to the Excel file to check.

    Returns
    -------
    bool
        True if the file appears to be a Maccor file, False otherwise.
    """
    try:
        # Import here to avoid circular dependency
        df, column_names = read_excel_and_get_column_names(filename)
        # Check column headers for Maccor signature
        has_step = any("step" in col for col in column_names)
        has_time = any(_has_maccor_time_col(col) for col in column_names)
        if has_step and has_time:
            return True

        maccor_col_count = sum(
            1 for col in column_names if any(mc in col for mc in MACCOR_COLUMNS)
        )
        if maccor_col_count >= 3:
            return True

        return False
    except Exception:
        return
_read_first_lines function · python · L101-L109 (9 LOC)
ionworksdata/read/detect.py
def _read_first_lines(filename: Path, num_lines: int = 10) -> list[str]:
    """Read first lines from file, trying multiple encodings."""
    for encoding in ["utf-8", "latin1", "ISO-8859-1"]:
        try:
            with open(filename, encoding=encoding) as f:
                return [f.readline() for _ in range(num_lines)]
        except UnicodeDecodeError:
            continue
    return []
detect_reader function · python · L112-L184 (73 LOC)
ionworksdata/read/detect.py
def detect_reader(filename: str | Path) -> str:
    """
    Automatically detect the reader type based on file content.

    Parameters
    ----------
    filename : str | Path
        Path to the file to detect the reader for.

    Returns
    -------
    str
        The detected reader name (e.g., "novonix", "maccor", "neware",
        "repower").

    Raises
    ------
    ValueError
        If the reader type cannot be determined from the file.
    """
    filename = Path(filename)
    ext = filename.suffix.lower()

    # Check for Excel files (Neware or Maccor)
    if ext in [".xls", ".xlsx"]:
        if _is_neware_excel(filename):
            return "neware"
        if _is_maccor_excel(filename):
            return "maccor"

    # Read first few lines to check file signatures
    first_lines = _read_first_lines(filename)
    first_line = first_lines[0] if first_lines else ""
    first_10_lines = "".join(first_lines)

    # Check for Novonix: starts with [Summary] and contains "No
_is_neware_excel function · python · L187-L224 (38 LOC)
ionworksdata/read/detect.py
def _is_neware_excel(filename: Path) -> bool:
    """
    Check if an Excel file is a Neware file by examining column headers.

    Parameters
    ----------
    filename : Path
        Path to the Excel file to check.

    Returns
    -------
    bool
        True if the file appears to be a Neware file, False otherwise.
    """
    try:
        xl_reader = fastexcel.read_excel(filename)
        # Check each sheet for Neware column signatures
        for sheet_name in xl_reader.sheet_names:
            try:
                with suppress_excel_dtype_warnings():
                    df = pl.read_excel(filename, sheet_name=sheet_name)
                if df is None or df.height == 0:
                    continue

                columns = df.columns
                has_timestamp = any(col in columns for col in NEWARE_TIMESTAMP_COLS)
                has_current = any(col in columns for col in NEWARE_CURRENT_COLS)
                has_voltage = any(col in columns for col in NEWARE_VOLTAGE_COL
Maccor._get_file_args method · python · L36-L92 (57 LOC)
ionworksdata/read/maccor.py
    def _get_file_args(
        filename: str | Path, options: dict[str, str] | None = None
    ) -> tuple[str, list[int], str, str | None, str | None, bool]:
        # Find how many header rows to skip and set the read kwargs based on the file extension
        encoding = options["file_encoding"]
        thousands = None
        is_excel = False
        ext = Path(filename).suffix.lower()

        if ext in [".xls", ".xlsx"]:
            # Excel files - return special flag
            is_excel = True
            # For Excel, we'll handle header detection separately
            return encoding, [], ",", None, None, is_excel

        with open(filename, encoding=encoding) as f:
            if ext == ".csv":
                lines = f.readlines()
                # Detect delimiter: some Maccor .csv files are tab-separated (e.g. export)
                skiprows = None
                sep = ","
                units_row = True
                comment = "#"
                for i, line in enu
Maccor.run method · python · L94-L286 (193 LOC)
ionworksdata/read/maccor.py
    def run(
        self,
        filename: str | Path,
        extra_column_mappings: dict[str, str] | None = None,
        options: dict[str, str] | None = None,
    ) -> pl.DataFrame:
        """
        Read and process data from a Maccor file. The following column mappings are applied by default:

            - "Voltage", "Volts", "Voltage (V)" -> "Voltage [V]"
            - "Current", "Amps", "Current (A)" -> "Current [A]"
            - "Prog Time", "Test (Sec)", "Test Time (sec)" -> "Time [s]"
            - "Test Time (Hr)" -> "Time [h]"
            - "Cycle", "Cyc#", "Cycle ID", "Cycle P" -> "Cycle from cycler"
            - "Step", "Step ID" -> "Step from cycler"
            - "LogTemp001", "Temperature (°C)", "EVTemp (C)" -> "Temperature [degC]"
            - "Status", "State", "MD" -> "Status"
            - "Capacity (Ah)", "Capacity (AHr)", "Cap. (Ah)" -> "Capacity [A.h]"
            - "Energy (Wh)", "Energy (WHr)" -> "Energy [W.h]"
            - "Chg Capacity (Ah)", "Chg 
Maccor._parse_timestamp_column method · python · L289-L334 (46 LOC)
ionworksdata/read/maccor.py
    def _parse_timestamp_column(data: pl.DataFrame) -> pl.DataFrame:
        """
        Parse Timestamp column and compute Time [s] if needed.

        Parameters
        ----------
        data : pl.DataFrame
            Input dataframe with potential "Timestamp" column.

        Returns
        -------
        pl.DataFrame
            Dataframe with parsed timestamps and computed Time [s] if applicable.
        """
        if "Timestamp" not in data.columns:
            return data

        # Parse datetime with multiple format attempts
        data = data.with_columns(
            pl.coalesce(
                # Try MM/DD/YYYY HH:MM:SS format (common for Maccor DPT)
                pl.col("Timestamp").str.strptime(
                    pl.Datetime, format="%m/%d/%Y %H:%M:%S", strict=False
                ),
                # Try YYYY-MM-DD HH:MM:SS format
                pl.col("Timestamp").str.strptime(
                    pl.Datetime, format="%Y-%m-%d %H:%M:%S", strict=False
      
Maccor._validate_and_fix_time method · python · L336-L404 (69 LOC)
ionworksdata/read/maccor.py
    def _validate_and_fix_time(
        self, data: pl.DataFrame, time_offset_fix: float
    ) -> pl.DataFrame:
        """
        Validate that time is strictly increasing and optionally fix it.

        Parameters
        ----------
        data : pl.DataFrame
            Input dataframe with "Time [s]" column.
        time_offset_fix : float
            Minimum time difference to enforce when fixing.
            If -1, raises ValueError. If >= 0, ensures all time differences are at least this value.

        Returns
        -------
        pl.DataFrame
            Dataframe with validated or fixed time.

        Raises
        ------
        ValueError
            If time is not strictly increasing and time_offset_fix is -1.
        """
        if "Time [s]" not in data.columns:
            return data

        # Vectorized check: compute differences between consecutive times
        time_col = data["Time [s]"]
        time_diff = time_col.diff()  # time[i] - time[i-1]

        # C
All rows scored by the Repobility analyzer (https://repobility.com)
Maccor._fix_unsigned_current method · python · L406-L454 (49 LOC)
ionworksdata/read/maccor.py
    def _fix_unsigned_current(self, data: pl.DataFrame) -> pl.DataFrame:
        """
        Fix unsigned current by flipping sign during charge if needed.

        If both "D" (discharge) and "C" (charge) are in the "Status" column
        and the current is always positive, then the current isn't signed,
        so we need to flip it during charge.

        Parameters
        ----------
        data : pl.DataFrame
            Input dataframe with potential "Status" and "Current [A]" columns.

        Returns
        -------
        pl.DataFrame
            Dataframe with current sign corrected if needed.
        """
        if "Status" not in data.columns or "Current [A]" not in data.columns:
            return data

        statuses = set(data.select(pl.col("Status").unique()).to_series().to_list())
        if "D" not in statuses or "C" not in statuses:
            return data

        # Ensure numeric current
        data = self._coerce_numeric(data, "Current [A]")

        c_min =
Maccor._get_column_renamings method · python · L457-L519 (63 LOC)
ionworksdata/read/maccor.py
    def _get_column_renamings(options: dict[str, Any] | None = None) -> dict[str, str]:
        """
        Get standard column renaming mappings for Maccor files.

        Parameters
        ----------
        options : dict, optional
            Options dict. If options["skip_capacity_columns"] is True,
            capacity and energy column mappings are excluded, forcing
            ionworksdata to compute them from current/power integration.

        Returns
        -------
        dict[str, str]
            Dictionary mapping original column names to standardized names.
        """
        renamings = {
            "Voltage": "Voltage [V]",
            "Volts": "Voltage [V]",
            "Voltage (V)": "Voltage [V]",
            "Current": "Current [A]",
            "Amps": "Current [A]",
            "Current (A)": "Current [A]",
            "Prog Time": "Time [s]",
            "Test (Sec)": "Time [s]",
            "Test Time (sec)": "Time [s]",
            "Test Time (Hr)": "Time
Maccor._parse_excel_duration method · python · L522-L545 (24 LOC)
ionworksdata/read/maccor.py
    def _parse_excel_duration(duration_str: str) -> float | None:
        """
        Parse Excel duration format :D:HH:MM:SS to total seconds.

        Parameters
        ----------
        duration_str : str
            Duration string in format ":D:HH:MM:SS"

        Returns
        -------
        float | None
            Total seconds, or None if parsing fails.
        """
        if not duration_str.startswith(":"):
            return None
        parts = duration_str[1:].split(":")
        if len(parts) != 4:
            return None
        try:
            days, hours, minutes, seconds = map(int, parts)
            return float(days * 86400 + hours * 3600 + minutes * 60 + seconds)
        except (ValueError, TypeError):
            return None
Maccor._process_test_time_column method · python · L547-L598 (52 LOC)
ionworksdata/read/maccor.py
    def _process_test_time_column(
        self, data: pl.DataFrame, column_renamings: dict[str, str]
    ) -> tuple[pl.DataFrame, dict[str, str]]:
        """
        Process "Test Time" column and determine its format.

        Handles three formats:
        1. Excel duration (":D:HH:MM:SS") -> converts to seconds
        2. Datetime strings (contains "/" or "-") -> maps to Timestamp
        3. Numeric values -> leaves as-is

        Parameters
        ----------
        data : pl.DataFrame
            Input dataframe with potential "Test Time" column.
        column_renamings : dict[str, str]
            Column renaming dictionary to update.

        Returns
        -------
        tuple[pl.DataFrame, dict[str, str]]
            Updated dataframe and column_renamings dict.
        """
        if "Test Time" not in data.columns:
            return data, column_renamings

        # Sample first non-null value to determine type
        sample = (
            data.select(pl.col("Test Ti
Maccor._read_excel_file method · python · L600-L627 (28 LOC)
ionworksdata/read/maccor.py
    def _read_excel_file(self, filename: str | Path, encoding: str) -> pl.DataFrame:
        """
        Read Maccor data from an Excel file (.xls or .xlsx).

        Parameters
        ----------
        filename : str | Path
            Path to the Excel file.
        encoding : str
            File encoding (not used for Excel but kept for consistency).

        Returns
        -------
        pl.DataFrame
            Raw data from Excel file with header row identified.
        """
        # Read Excel file - first row is always the header
        xl_reader = fastexcel.read_excel(filename)
        sheet_names = xl_reader.sheet_names

        # Read the first sheet, assuming first row is header
        # Suppress pandas dtype warning when reading Excel (printed to stderr)
        with suppress_excel_dtype_warnings():
            data, _ = read_excel_and_get_column_names(
                filename, sheet_name=sheet_names[0]
            )

        return data
Maccor.read_header method · python · L629-L656 (28 LOC)
ionworksdata/read/maccor.py
    def read_header(
        self, filename: str | Path, options: dict[str, str] | None = None
    ) -> str:
        """
        Read the header from a Maccor file.
        """
        options = iwutil.check_and_combine_options(self.default_options, options)
        encoding, skiprows, _, _, _, is_excel = self._get_file_args(filename, options)

        if is_excel:
            # For Excel files, first row is always the header
            xl_reader = fastexcel.read_excel(filename)
            sheet_names = xl_reader.sheet_names
            # Suppress pandas dtype warning when reading Excel (printed to stderr)
            with suppress_excel_dtype_warnings():
                df_raw = pl.read_excel(filename, sheet_name=sheet_names[0])

            # Return header row as string (column names are the header)
            return "\t".join(str(col) for col in df_raw.columns)
        else:
            with open(filename, encoding=encoding) as f:
                if len(skiprows) == 1:
          
Maccor.read_start_time method · python · L658-L721 (64 LOC)
ionworksdata/read/maccor.py
    def read_start_time(
        self,
        filename: str | Path,
        extra_column_mappings: dict[str, str] | None = None,
        options: dict[str, str] | None = None,
    ) -> datetime | None:
        """
        Read the start time from a Maccor file.

        Parameters
        ----------
        filename : str | Path
            Path to the Maccor file to be read. Supports:
            - .txt files (tab-separated)
            - .csv files (comma-separated with units row)
            - .xls/.xlsx files (Excel format)
            - Files with .+3digits extension (e.g., .123, .456)
        options : dict of str to str, optional
            See :func:`ionworksdata.read.Maccor.run`.

        Returns
        -------
        datetime | None
            The start time of the Maccor file, or None if not found.
        """
        options = iwutil.check_and_combine_options(self.default_options, options)

        # Load the header row
        start_datetime = None
        header_text
maccor function · python · L724-L731 (8 LOC)
ionworksdata/read/maccor.py
def maccor(
    filename: str | Path,
    extra_column_mappings: dict[str, str] | None = None,
    options: dict[str, str] | None = None,
) -> pl.DataFrame:
    return Maccor().run(
        filename, extra_column_mappings=extra_column_mappings, options=options
    )
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
Neware._read_file_data method · python · L33-L95 (63 LOC)
ionworksdata/read/neware.py
    def _read_file_data(
        self, filename: str | Path, sheets: dict | None = None
    ) -> pl.DataFrame:
        """Read data from CSV or Excel with Polars, optional sheet filtering."""
        filename = Path(filename)

        if filename.suffix.lower() in [".xls", ".xlsx"]:
            # Read Excel file with Polars
            if sheets is None:
                # No sheet specification - read the first sheet
                df_pl = pl.read_excel(filename)
                # Cast raw numeric columns to Float64 to handle type inference issues
                df_pl = self._coerce_numeric_columns(
                    df_pl, columns=self._raw_numeric_columns
                )
                return df_pl

            # Get all sheet names in the Excel file (using fastexcel for sheet discovery)
            xl_reader = fastexcel.read_excel(filename)
            available_sheets = xl_reader.sheet_names

            # Determine which sheets to read based on specification
            she
Neware._get_sheets_to_read method · python · L97-L164 (68 LOC)
ionworksdata/read/neware.py
    def _get_sheets_to_read(
        self, sheets: dict, available_sheets: list[str]
    ) -> list[str]:
        """Parse sheet specification and return list of sheet names to read."""
        if not isinstance(sheets, dict):
            raise ValueError(
                "'sheets' must be a dictionary with 'type' and 'value' keys"
            )

        if "type" not in sheets:
            raise ValueError("'sheets' dict must contain 'type' key")

        sheet_type = sheets["type"]
        sheet_value = sheets.get("value")

        if sheet_type == "name":
            if sheet_value is None:
                raise ValueError(
                    "For 'name' type, 'value' must be a sheet name or list of sheet names"
                )

            # Convert single string to list for uniform processing
            if isinstance(sheet_value, str):
                sheet_names = [sheet_value]
            elif isinstance(sheet_value, list):
                sheet_names = sheet_value
          
Neware._apply_column_renamings method · python · L166-L194 (29 LOC)
ionworksdata/read/neware.py
    def _apply_column_renamings(
        self, data: pl.DataFrame, extra_column_mappings: dict[str, str] | None = None
    ) -> tuple[pl.DataFrame, dict[str, str]]:
        """Apply column renamings to Neware files data."""
        column_renamings = {
            "Current (mA)": "Current [mA]",
            "Cur(mA)": "Current [mA]",
            "Current (A)": "Current [A]",
            "Current(A)": "Current [A]",
            "Voltage (V)": "Voltage [V]",
            "Voltage(V)": "Voltage [V]",
            "Temperature 1 (degC)": "Temperature [degC]",
            "Step ID": "Step from cycler",
            "Step": "Step from cycler",
            "Cycle ID": "Cycle from cycler",
            "Cycle": "Cycle from cycler",
            "Status": "Status",
            "DateTime": "Timestamp",
            "Absolute Time": "Timestamp",
            "Date(h:min:s.ms)": "Timestamp",
        }
        column_renamings.update(extra_column_mappings or {})
        # Validate duplicates (check_for_du
‹ prevpage 2 / 4next ›